1 /*-------------------------------------------------------------------------
2 *
3 * syncrep.c
4 *
5 * Synchronous replication is new as of PostgreSQL 9.1.
6 *
7 * If requested, transaction commits wait until their commit LSN are
8 * acknowledged by the synchronous standbys.
9 *
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
13 *
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standbys are completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
21 *
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write, flush or apply location on the standby before releasing
25 * the waiting backend. Further complexity in that interaction is
26 * expected in later releases.
27 *
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
31 *
32 * In 9.5 or before only a single standby could be considered as
33 * synchronous. In 9.6 we support a priority-based multiple synchronous
34 * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 * supported. The number of synchronous standbys that transactions
36 * must wait for replies from is specified in synchronous_standby_names.
37 * This parameter also specifies a list of standby names and the method
38 * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 *
40 * The method FIRST specifies a priority-based synchronous replication
41 * and makes transaction commits wait until their WAL records are
42 * replicated to the requested number of synchronous standbys chosen based
43 * on their priorities. The standbys whose names appear earlier in the list
44 * are given higher priority and will be considered as synchronous.
45 * Other standby servers appearing later in this list represent potential
46 * synchronous standbys. If any of the current synchronous standbys
47 * disconnects for whatever reason, it will be replaced immediately with
48 * the next-highest-priority standby.
49 *
50 * The method ANY specifies a quorum-based synchronous replication
51 * and makes transaction commits wait until their WAL records are
52 * replicated to at least the requested number of synchronous standbys
53 * in the list. All the standbys appearing in the list are considered as
54 * candidates for quorum synchronous standbys.
55 *
56 * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 * This is for backward compatibility with 9.6 or before where only a
58 * priority-based sync replication was supported.
59 *
60 * Before the standbys chosen from synchronous_standby_names can
61 * become the synchronous standbys they must have caught up with
62 * the primary; that may take some time. Once caught up,
63 * the standbys which are considered as synchronous at that moment
64 * will release waiters from the queue.
65 *
66 * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
67 *
68 * IDENTIFICATION
69 * src/backend/replication/syncrep.c
70 *
71 *-------------------------------------------------------------------------
72 */
73 #include "postgres.h"
74
75 #include <unistd.h>
76
77 #include "access/xact.h"
78 #include "miscadmin.h"
79 #include "pgstat.h"
80 #include "replication/syncrep.h"
81 #include "replication/walsender.h"
82 #include "replication/walsender_private.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "tcop/tcopprot.h"
86 #include "utils/builtins.h"
87 #include "utils/ps_status.h"
88
89 /* User-settable parameters for sync rep */
90 char *SyncRepStandbyNames;
91
92 #define SyncStandbysDefined() \
93 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94
95 static bool announce_next_takeover = true;
96
97 SyncRepConfigData *SyncRepConfig = NULL;
98 static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
99
100 static void SyncRepQueueInsert(int mode);
101 static void SyncRepCancelWait(void);
102 static int SyncRepWakeQueue(bool all, int mode);
103
104 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 XLogRecPtr *flushPtr,
106 XLogRecPtr *applyPtr,
107 bool *am_sync);
108 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 XLogRecPtr *flushPtr,
110 XLogRecPtr *applyPtr,
111 SyncRepStandbyData *sync_standbys,
112 int num_standbys);
113 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 XLogRecPtr *flushPtr,
115 XLogRecPtr *applyPtr,
116 SyncRepStandbyData *sync_standbys,
117 int num_standbys,
118 uint8 nth);
119 static int SyncRepGetStandbyPriority(void);
120 static int standby_priority_comparator(const void *a, const void *b);
121 static int cmp_lsn(const void *a, const void *b);
122
123 #ifdef USE_ASSERT_CHECKING
124 static bool SyncRepQueueIsOrderedByLSN(int mode);
125 #endif
126
127 /*
128 * ===========================================================
129 * Synchronous Replication functions for normal user backends
130 * ===========================================================
131 */
132
133 /*
134 * Wait for synchronous replication, if requested by user.
135 *
136 * Initially backends start in state SYNC_REP_NOT_WAITING and then
137 * change that state to SYNC_REP_WAITING before adding ourselves
138 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
139 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
140 * This backend then resets its state to SYNC_REP_NOT_WAITING.
141 *
142 * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
143 * represents a commit record. If it doesn't, then we wait only for the WAL
144 * to be flushed if synchronous_commit is set to the higher level of
145 * remote_apply, because only commit records provide apply feedback.
146 */
147 void
SyncRepWaitForLSN(XLogRecPtr lsn,bool commit)148 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
149 {
150 char *new_status = NULL;
151 const char *old_status;
152 int mode;
153
154 /*
155 * This should be called while holding interrupts during a transaction
156 * commit to prevent the follow-up shared memory queue cleanups to be
157 * influenced by external interruptions.
158 */
159 Assert(InterruptHoldoffCount > 0);
160
161 /* Cap the level for anything other than commit to remote flush only. */
162 if (commit)
163 mode = SyncRepWaitMode;
164 else
165 mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
166
167 /*
168 * Fast exit if user has not requested sync replication.
169 */
170 if (!SyncRepRequested())
171 return;
172
173 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
174 Assert(WalSndCtl != NULL);
175
176 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
177 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
178
179 /*
180 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
181 * set. See SyncRepUpdateSyncStandbysDefined.
182 *
183 * Also check that the standby hasn't already replied. Unlikely race
184 * condition but we'll be fetching that cache line anyway so it's likely
185 * to be a low cost check.
186 */
187 if (!WalSndCtl->sync_standbys_defined ||
188 lsn <= WalSndCtl->lsn[mode])
189 {
190 LWLockRelease(SyncRepLock);
191 return;
192 }
193
194 /*
195 * Set our waitLSN so WALSender will know when to wake us, and add
196 * ourselves to the queue.
197 */
198 MyProc->waitLSN = lsn;
199 MyProc->syncRepState = SYNC_REP_WAITING;
200 SyncRepQueueInsert(mode);
201 Assert(SyncRepQueueIsOrderedByLSN(mode));
202 LWLockRelease(SyncRepLock);
203
204 /* Alter ps display to show waiting for sync rep. */
205 if (update_process_title)
206 {
207 int len;
208
209 old_status = get_ps_display(&len);
210 new_status = (char *) palloc(len + 32 + 1);
211 memcpy(new_status, old_status, len);
212 sprintf(new_status + len, " waiting for %X/%X",
213 (uint32) (lsn >> 32), (uint32) lsn);
214 set_ps_display(new_status);
215 new_status[len] = '\0'; /* truncate off " waiting ..." */
216 }
217
218 /*
219 * Wait for specified LSN to be confirmed.
220 *
221 * Each proc has its own wait latch, so we perform a normal latch
222 * check/wait loop here.
223 */
224 for (;;)
225 {
226 int rc;
227
228 /* Must reset the latch before testing state. */
229 ResetLatch(MyLatch);
230
231 /*
232 * Acquiring the lock is not needed, the latch ensures proper
233 * barriers. If it looks like we're done, we must really be done,
234 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
235 * it will never update it again, so we can't be seeing a stale value
236 * in that case.
237 */
238 if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
239 break;
240
241 /*
242 * If a wait for synchronous replication is pending, we can neither
243 * acknowledge the commit nor raise ERROR or FATAL. The latter would
244 * lead the client to believe that the transaction aborted, which is
245 * not true: it's already committed locally. The former is no good
246 * either: the client has requested synchronous replication, and is
247 * entitled to assume that an acknowledged commit is also replicated,
248 * which might not be true. So in this case we issue a WARNING (which
249 * some clients may be able to interpret) and shut off further output.
250 * We do NOT reset ProcDiePending, so that the process will die after
251 * the commit is cleaned up.
252 */
253 if (ProcDiePending)
254 {
255 ereport(WARNING,
256 (errcode(ERRCODE_ADMIN_SHUTDOWN),
257 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
258 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
259 whereToSendOutput = DestNone;
260 SyncRepCancelWait();
261 break;
262 }
263
264 /*
265 * It's unclear what to do if a query cancel interrupt arrives. We
266 * can't actually abort at this point, but ignoring the interrupt
267 * altogether is not helpful, so we just terminate the wait with a
268 * suitable warning.
269 */
270 if (QueryCancelPending)
271 {
272 QueryCancelPending = false;
273 ereport(WARNING,
274 (errmsg("canceling wait for synchronous replication due to user request"),
275 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
276 SyncRepCancelWait();
277 break;
278 }
279
280 /*
281 * Wait on latch. Any condition that should wake us up will set the
282 * latch, so no need for timeout.
283 */
284 rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
285 WAIT_EVENT_SYNC_REP);
286
287 /*
288 * If the postmaster dies, we'll probably never get an acknowledgment,
289 * because all the wal sender processes will exit. So just bail out.
290 */
291 if (rc & WL_POSTMASTER_DEATH)
292 {
293 ProcDiePending = true;
294 whereToSendOutput = DestNone;
295 SyncRepCancelWait();
296 break;
297 }
298 }
299
300 /*
301 * WalSender has checked our LSN and has removed us from queue. Clean up
302 * state and leave. It's OK to reset these shared memory fields without
303 * holding SyncRepLock, because any walsenders will ignore us anyway when
304 * we're not on the queue. We need a read barrier to make sure we see the
305 * changes to the queue link (this might be unnecessary without
306 * assertions, but better safe than sorry).
307 */
308 pg_read_barrier();
309 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
310 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
311 MyProc->waitLSN = 0;
312
313 if (new_status)
314 {
315 /* Reset ps display */
316 set_ps_display(new_status);
317 pfree(new_status);
318 }
319 }
320
321 /*
322 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
323 *
324 * Usually we will go at tail of queue, though it's possible that we arrive
325 * here out of order, so start at tail and work back to insertion point.
326 */
327 static void
SyncRepQueueInsert(int mode)328 SyncRepQueueInsert(int mode)
329 {
330 PGPROC *proc;
331
332 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
333 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
334 &(WalSndCtl->SyncRepQueue[mode]),
335 offsetof(PGPROC, syncRepLinks));
336
337 while (proc)
338 {
339 /*
340 * Stop at the queue element that we should after to ensure the queue
341 * is ordered by LSN.
342 */
343 if (proc->waitLSN < MyProc->waitLSN)
344 break;
345
346 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
347 &(proc->syncRepLinks),
348 offsetof(PGPROC, syncRepLinks));
349 }
350
351 if (proc)
352 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
353 else
354 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
355 }
356
357 /*
358 * Acquire SyncRepLock and cancel any wait currently in progress.
359 */
360 static void
SyncRepCancelWait(void)361 SyncRepCancelWait(void)
362 {
363 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
364 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
365 SHMQueueDelete(&(MyProc->syncRepLinks));
366 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
367 LWLockRelease(SyncRepLock);
368 }
369
370 void
SyncRepCleanupAtProcExit(void)371 SyncRepCleanupAtProcExit(void)
372 {
373 /*
374 * First check if we are removed from the queue without the lock to not
375 * slow down backend exit.
376 */
377 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
378 {
379 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
380
381 /* maybe we have just been removed, so recheck */
382 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
383 SHMQueueDelete(&(MyProc->syncRepLinks));
384
385 LWLockRelease(SyncRepLock);
386 }
387 }
388
389 /*
390 * ===========================================================
391 * Synchronous Replication functions for wal sender processes
392 * ===========================================================
393 */
394
395 /*
396 * Take any action required to initialise sync rep state from config
397 * data. Called at WALSender startup and after each SIGHUP.
398 */
399 void
SyncRepInitConfig(void)400 SyncRepInitConfig(void)
401 {
402 int priority;
403
404 /*
405 * Determine if we are a potential sync standby and remember the result
406 * for handling replies from standby.
407 */
408 priority = SyncRepGetStandbyPriority();
409 if (MyWalSnd->sync_standby_priority != priority)
410 {
411 SpinLockAcquire(&MyWalSnd->mutex);
412 MyWalSnd->sync_standby_priority = priority;
413 SpinLockRelease(&MyWalSnd->mutex);
414
415 ereport(DEBUG1,
416 (errmsg("standby \"%s\" now has synchronous standby priority %u",
417 application_name, priority)));
418 }
419 }
420
421 /*
422 * Update the LSNs on each queue based upon our latest state. This
423 * implements a simple policy of first-valid-sync-standby-releases-waiter.
424 *
425 * Other policies are possible, which would change what we do here and
426 * perhaps also which information we store as well.
427 */
428 void
SyncRepReleaseWaiters(void)429 SyncRepReleaseWaiters(void)
430 {
431 volatile WalSndCtlData *walsndctl = WalSndCtl;
432 XLogRecPtr writePtr;
433 XLogRecPtr flushPtr;
434 XLogRecPtr applyPtr;
435 bool got_recptr;
436 bool am_sync;
437 int numwrite = 0;
438 int numflush = 0;
439 int numapply = 0;
440
441 /*
442 * If this WALSender is serving a standby that is not on the list of
443 * potential sync standbys then we have nothing to do. If we are still
444 * starting up, still running base backup or the current flush position is
445 * still invalid, then leave quickly also. Streaming or stopping WAL
446 * senders are allowed to release waiters.
447 */
448 if (MyWalSnd->sync_standby_priority == 0 ||
449 (MyWalSnd->state != WALSNDSTATE_STREAMING &&
450 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
451 XLogRecPtrIsInvalid(MyWalSnd->flush))
452 {
453 announce_next_takeover = true;
454 return;
455 }
456
457 /*
458 * We're a potential sync standby. Release waiters if there are enough
459 * sync standbys and we are considered as sync.
460 */
461 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
462
463 /*
464 * Check whether we are a sync standby or not, and calculate the synced
465 * positions among all sync standbys. (Note: although this step does not
466 * of itself require holding SyncRepLock, it seems like a good idea to do
467 * it after acquiring the lock. This ensures that the WAL pointers we use
468 * to release waiters are newer than any previous execution of this
469 * routine used.)
470 */
471 got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
472
473 /*
474 * If we are managing a sync standby, though we weren't prior to this,
475 * then announce we are now a sync standby.
476 */
477 if (announce_next_takeover && am_sync)
478 {
479 announce_next_takeover = false;
480
481 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
482 ereport(LOG,
483 (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
484 application_name, MyWalSnd->sync_standby_priority)));
485 else
486 ereport(LOG,
487 (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
488 application_name)));
489 }
490
491 /*
492 * If the number of sync standbys is less than requested or we aren't
493 * managing a sync standby then just leave.
494 */
495 if (!got_recptr || !am_sync)
496 {
497 LWLockRelease(SyncRepLock);
498 announce_next_takeover = !am_sync;
499 return;
500 }
501
502 /*
503 * Set the lsn first so that when we wake backends they will release up to
504 * this location.
505 */
506 if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
507 {
508 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
509 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
510 }
511 if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
512 {
513 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
514 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
515 }
516 if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
517 {
518 walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
519 numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
520 }
521
522 LWLockRelease(SyncRepLock);
523
524 elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
525 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
526 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
527 numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
528 }
529
530 /*
531 * Calculate the synced Write, Flush and Apply positions among sync standbys.
532 *
533 * Return false if the number of sync standbys is less than
534 * synchronous_standby_names specifies. Otherwise return true and
535 * store the positions into *writePtr, *flushPtr and *applyPtr.
536 *
537 * On return, *am_sync is set to true if this walsender is connecting to
538 * sync standby. Otherwise it's set to false.
539 */
540 static bool
SyncRepGetSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,bool * am_sync)541 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
542 XLogRecPtr *applyPtr, bool *am_sync)
543 {
544 SyncRepStandbyData *sync_standbys;
545 int num_standbys;
546 int i;
547
548 /* Initialize default results */
549 *writePtr = InvalidXLogRecPtr;
550 *flushPtr = InvalidXLogRecPtr;
551 *applyPtr = InvalidXLogRecPtr;
552 *am_sync = false;
553
554 /* Quick out if not even configured to be synchronous */
555 if (SyncRepConfig == NULL)
556 return false;
557
558 /* Get standbys that are considered as synchronous at this moment */
559 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
560
561 /* Am I among the candidate sync standbys? */
562 for (i = 0; i < num_standbys; i++)
563 {
564 if (sync_standbys[i].is_me)
565 {
566 *am_sync = true;
567 break;
568 }
569 }
570
571 /*
572 * Nothing more to do if we are not managing a sync standby or there are
573 * not enough synchronous standbys.
574 */
575 if (!(*am_sync) ||
576 num_standbys < SyncRepConfig->num_sync)
577 {
578 pfree(sync_standbys);
579 return false;
580 }
581
582 /*
583 * In a priority-based sync replication, the synced positions are the
584 * oldest ones among sync standbys. In a quorum-based, they are the Nth
585 * latest ones.
586 *
587 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
588 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
589 * because it's a bit more efficient.
590 *
591 * XXX If the numbers of current and requested sync standbys are the same,
592 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
593 * positions even in a quorum-based sync replication.
594 */
595 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
596 {
597 SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
598 sync_standbys, num_standbys);
599 }
600 else
601 {
602 SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
603 sync_standbys, num_standbys,
604 SyncRepConfig->num_sync);
605 }
606
607 pfree(sync_standbys);
608 return true;
609 }
610
611 /*
612 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
613 */
614 static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys)615 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
616 XLogRecPtr *flushPtr,
617 XLogRecPtr *applyPtr,
618 SyncRepStandbyData *sync_standbys,
619 int num_standbys)
620 {
621 int i;
622
623 /*
624 * Scan through all sync standbys and calculate the oldest Write, Flush
625 * and Apply positions. We assume *writePtr et al were initialized to
626 * InvalidXLogRecPtr.
627 */
628 for (i = 0; i < num_standbys; i++)
629 {
630 XLogRecPtr write = sync_standbys[i].write;
631 XLogRecPtr flush = sync_standbys[i].flush;
632 XLogRecPtr apply = sync_standbys[i].apply;
633
634 if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
635 *writePtr = write;
636 if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
637 *flushPtr = flush;
638 if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
639 *applyPtr = apply;
640 }
641 }
642
643 /*
644 * Calculate the Nth latest Write, Flush and Apply positions among sync
645 * standbys.
646 */
647 static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys,uint8 nth)648 SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
649 XLogRecPtr *flushPtr,
650 XLogRecPtr *applyPtr,
651 SyncRepStandbyData *sync_standbys,
652 int num_standbys,
653 uint8 nth)
654 {
655 XLogRecPtr *write_array;
656 XLogRecPtr *flush_array;
657 XLogRecPtr *apply_array;
658 int i;
659
660 /* Should have enough candidates, or somebody messed up */
661 Assert(nth > 0 && nth <= num_standbys);
662
663 write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
664 flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
665 apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
666
667 for (i = 0; i < num_standbys; i++)
668 {
669 write_array[i] = sync_standbys[i].write;
670 flush_array[i] = sync_standbys[i].flush;
671 apply_array[i] = sync_standbys[i].apply;
672 }
673
674 /* Sort each array in descending order */
675 qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
676 qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
677 qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
678
679 /* Get Nth latest Write, Flush, Apply positions */
680 *writePtr = write_array[nth - 1];
681 *flushPtr = flush_array[nth - 1];
682 *applyPtr = apply_array[nth - 1];
683
684 pfree(write_array);
685 pfree(flush_array);
686 pfree(apply_array);
687 }
688
689 /*
690 * Compare lsn in order to sort array in descending order.
691 */
692 static int
cmp_lsn(const void * a,const void * b)693 cmp_lsn(const void *a, const void *b)
694 {
695 XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
696 XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
697
698 if (lsn1 > lsn2)
699 return -1;
700 else if (lsn1 == lsn2)
701 return 0;
702 else
703 return 1;
704 }
705
706 /*
707 * Return data about walsenders that are candidates to be sync standbys.
708 *
709 * *standbys is set to a palloc'd array of structs of per-walsender data,
710 * and the number of valid entries (candidate sync senders) is returned.
711 * (This might be more or fewer than num_sync; caller must check.)
712 */
713 int
SyncRepGetCandidateStandbys(SyncRepStandbyData ** standbys)714 SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
715 {
716 int i;
717 int n;
718
719 /* Create result array */
720 *standbys = (SyncRepStandbyData *)
721 palloc(max_wal_senders * sizeof(SyncRepStandbyData));
722
723 /* Quick exit if sync replication is not requested */
724 if (SyncRepConfig == NULL)
725 return 0;
726
727 /* Collect raw data from shared memory */
728 n = 0;
729 for (i = 0; i < max_wal_senders; i++)
730 {
731 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
732 * rearrangement */
733 SyncRepStandbyData *stby;
734 WalSndState state; /* not included in SyncRepStandbyData */
735
736 walsnd = &WalSndCtl->walsnds[i];
737 stby = *standbys + n;
738
739 SpinLockAcquire(&walsnd->mutex);
740 stby->pid = walsnd->pid;
741 state = walsnd->state;
742 stby->write = walsnd->write;
743 stby->flush = walsnd->flush;
744 stby->apply = walsnd->apply;
745 stby->sync_standby_priority = walsnd->sync_standby_priority;
746 SpinLockRelease(&walsnd->mutex);
747
748 /* Must be active */
749 if (stby->pid == 0)
750 continue;
751
752 /* Must be streaming or stopping */
753 if (state != WALSNDSTATE_STREAMING &&
754 state != WALSNDSTATE_STOPPING)
755 continue;
756
757 /* Must be synchronous */
758 if (stby->sync_standby_priority == 0)
759 continue;
760
761 /* Must have a valid flush position */
762 if (XLogRecPtrIsInvalid(stby->flush))
763 continue;
764
765 /* OK, it's a candidate */
766 stby->walsnd_index = i;
767 stby->is_me = (walsnd == MyWalSnd);
768 n++;
769 }
770
771 /*
772 * In quorum mode, we return all the candidates. In priority mode, if we
773 * have too many candidates then return only the num_sync ones of highest
774 * priority.
775 */
776 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
777 n > SyncRepConfig->num_sync)
778 {
779 /* Sort by priority ... */
780 qsort(*standbys, n, sizeof(SyncRepStandbyData),
781 standby_priority_comparator);
782 /* ... then report just the first num_sync ones */
783 n = SyncRepConfig->num_sync;
784 }
785
786 return n;
787 }
788
789 /*
790 * qsort comparator to sort SyncRepStandbyData entries by priority
791 */
792 static int
standby_priority_comparator(const void * a,const void * b)793 standby_priority_comparator(const void *a, const void *b)
794 {
795 const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
796 const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
797
798 /* First, sort by increasing priority value */
799 if (sa->sync_standby_priority != sb->sync_standby_priority)
800 return sa->sync_standby_priority - sb->sync_standby_priority;
801
802 /*
803 * We might have equal priority values; arbitrarily break ties by position
804 * in the WALSnd array. (This is utterly bogus, since that is arrival
805 * order dependent, but there are regression tests that rely on it.)
806 */
807 return sa->walsnd_index - sb->walsnd_index;
808 }
809
810
811 /*
812 * Check if we are in the list of sync standbys, and if so, determine
813 * priority sequence. Return priority if set, or zero to indicate that
814 * we are not a potential sync standby.
815 *
816 * Compare the parameter SyncRepStandbyNames against the application_name
817 * for this WALSender, or allow any name if we find a wildcard "*".
818 */
819 static int
SyncRepGetStandbyPriority(void)820 SyncRepGetStandbyPriority(void)
821 {
822 const char *standby_name;
823 int priority;
824 bool found = false;
825
826 /*
827 * Since synchronous cascade replication is not allowed, we always set the
828 * priority of cascading walsender to zero.
829 */
830 if (am_cascading_walsender)
831 return 0;
832
833 if (!SyncStandbysDefined() || SyncRepConfig == NULL)
834 return 0;
835
836 standby_name = SyncRepConfig->member_names;
837 for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
838 {
839 if (pg_strcasecmp(standby_name, application_name) == 0 ||
840 strcmp(standby_name, "*") == 0)
841 {
842 found = true;
843 break;
844 }
845 standby_name += strlen(standby_name) + 1;
846 }
847
848 if (!found)
849 return 0;
850
851 /*
852 * In quorum-based sync replication, all the standbys in the list have the
853 * same priority, one.
854 */
855 return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
856 }
857
858 /*
859 * Walk the specified queue from head. Set the state of any backends that
860 * need to be woken, remove them from the queue, and then wake them.
861 * Pass all = true to wake whole queue; otherwise, just wake up to
862 * the walsender's LSN.
863 *
864 * The caller must hold SyncRepLock in exclusive mode.
865 */
866 static int
SyncRepWakeQueue(bool all,int mode)867 SyncRepWakeQueue(bool all, int mode)
868 {
869 volatile WalSndCtlData *walsndctl = WalSndCtl;
870 PGPROC *proc = NULL;
871 PGPROC *thisproc = NULL;
872 int numprocs = 0;
873
874 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
875 Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
876 Assert(SyncRepQueueIsOrderedByLSN(mode));
877
878 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
879 &(WalSndCtl->SyncRepQueue[mode]),
880 offsetof(PGPROC, syncRepLinks));
881
882 while (proc)
883 {
884 /*
885 * Assume the queue is ordered by LSN
886 */
887 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
888 return numprocs;
889
890 /*
891 * Move to next proc, so we can delete thisproc from the queue.
892 * thisproc is valid, proc may be NULL after this.
893 */
894 thisproc = proc;
895 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
896 &(proc->syncRepLinks),
897 offsetof(PGPROC, syncRepLinks));
898
899 /*
900 * Remove thisproc from queue.
901 */
902 SHMQueueDelete(&(thisproc->syncRepLinks));
903
904 /*
905 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
906 * make sure that it sees the queue link being removed before the
907 * syncRepState change.
908 */
909 pg_write_barrier();
910
911 /*
912 * Set state to complete; see SyncRepWaitForLSN() for discussion of
913 * the various states.
914 */
915 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
916
917 /*
918 * Wake only when we have set state and removed from queue.
919 */
920 SetLatch(&(thisproc->procLatch));
921
922 numprocs++;
923 }
924
925 return numprocs;
926 }
927
928 /*
929 * The checkpointer calls this as needed to update the shared
930 * sync_standbys_defined flag, so that backends don't remain permanently wedged
931 * if synchronous_standby_names is unset. It's safe to check the current value
932 * without the lock, because it's only ever updated by one process. But we
933 * must take the lock to change it.
934 */
935 void
SyncRepUpdateSyncStandbysDefined(void)936 SyncRepUpdateSyncStandbysDefined(void)
937 {
938 bool sync_standbys_defined = SyncStandbysDefined();
939
940 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
941 {
942 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
943
944 /*
945 * If synchronous_standby_names has been reset to empty, it's futile
946 * for backends to continue waiting. Since the user no longer wants
947 * synchronous replication, we'd better wake them up.
948 */
949 if (!sync_standbys_defined)
950 {
951 int i;
952
953 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
954 SyncRepWakeQueue(true, i);
955 }
956
957 /*
958 * Only allow people to join the queue when there are synchronous
959 * standbys defined. Without this interlock, there's a race
960 * condition: we might wake up all the current waiters; then, some
961 * backend that hasn't yet reloaded its config might go to sleep on
962 * the queue (and never wake up). This prevents that.
963 */
964 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
965
966 LWLockRelease(SyncRepLock);
967 }
968 }
969
970 #ifdef USE_ASSERT_CHECKING
971 static bool
SyncRepQueueIsOrderedByLSN(int mode)972 SyncRepQueueIsOrderedByLSN(int mode)
973 {
974 PGPROC *proc = NULL;
975 XLogRecPtr lastLSN;
976
977 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
978
979 lastLSN = 0;
980
981 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
982 &(WalSndCtl->SyncRepQueue[mode]),
983 offsetof(PGPROC, syncRepLinks));
984
985 while (proc)
986 {
987 /*
988 * Check the queue is ordered by LSN and that multiple procs don't
989 * have matching LSNs
990 */
991 if (proc->waitLSN <= lastLSN)
992 return false;
993
994 lastLSN = proc->waitLSN;
995
996 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
997 &(proc->syncRepLinks),
998 offsetof(PGPROC, syncRepLinks));
999 }
1000
1001 return true;
1002 }
1003 #endif
1004
1005 /*
1006 * ===========================================================
1007 * Synchronous Replication functions executed by any process
1008 * ===========================================================
1009 */
1010
1011 bool
check_synchronous_standby_names(char ** newval,void ** extra,GucSource source)1012 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1013 {
1014 if (*newval != NULL && (*newval)[0] != '\0')
1015 {
1016 int parse_rc;
1017 SyncRepConfigData *pconf;
1018
1019 /* Reset communication variables to ensure a fresh start */
1020 syncrep_parse_result = NULL;
1021 syncrep_parse_error_msg = NULL;
1022
1023 /* Parse the synchronous_standby_names string */
1024 syncrep_scanner_init(*newval);
1025 parse_rc = syncrep_yyparse();
1026 syncrep_scanner_finish();
1027
1028 if (parse_rc != 0 || syncrep_parse_result == NULL)
1029 {
1030 GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1031 if (syncrep_parse_error_msg)
1032 GUC_check_errdetail("%s", syncrep_parse_error_msg);
1033 else
1034 GUC_check_errdetail("synchronous_standby_names parser failed");
1035 return false;
1036 }
1037
1038 if (syncrep_parse_result->num_sync <= 0)
1039 {
1040 GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1041 syncrep_parse_result->num_sync);
1042 return false;
1043 }
1044
1045 /* GUC extra value must be malloc'd, not palloc'd */
1046 pconf = (SyncRepConfigData *)
1047 malloc(syncrep_parse_result->config_size);
1048 if (pconf == NULL)
1049 return false;
1050 memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1051
1052 *extra = (void *) pconf;
1053
1054 /*
1055 * We need not explicitly clean up syncrep_parse_result. It, and any
1056 * other cruft generated during parsing, will be freed when the
1057 * current memory context is deleted. (This code is generally run in
1058 * a short-lived context used for config file processing, so that will
1059 * not be very long.)
1060 */
1061 }
1062 else
1063 *extra = NULL;
1064
1065 return true;
1066 }
1067
1068 void
assign_synchronous_standby_names(const char * newval,void * extra)1069 assign_synchronous_standby_names(const char *newval, void *extra)
1070 {
1071 SyncRepConfig = (SyncRepConfigData *) extra;
1072 }
1073
1074 void
assign_synchronous_commit(int newval,void * extra)1075 assign_synchronous_commit(int newval, void *extra)
1076 {
1077 switch (newval)
1078 {
1079 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1080 SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1081 break;
1082 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1083 SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1084 break;
1085 case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1086 SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1087 break;
1088 default:
1089 SyncRepWaitMode = SYNC_REP_NO_WAIT;
1090 break;
1091 }
1092 }
1093