1 /*-------------------------------------------------------------------------
2  *
3  * syncrep.c
4  *
5  * Synchronous replication is new as of PostgreSQL 9.1.
6  *
7  * If requested, transaction commits wait until their commit LSN are
8  * acknowledged by the synchronous standbys.
9  *
10  * This module contains the code for waiting and release of backends.
11  * All code in this module executes on the primary. The core streaming
12  * replication transport remains within WALreceiver/WALsender modules.
13  *
14  * The essence of this design is that it isolates all logic about
15  * waiting/releasing onto the primary. The primary defines which standbys
16  * it wishes to wait for. The standbys are completely unaware of the
17  * durability requirements of transactions on the primary, reducing the
18  * complexity of the code and streamlining both standby operations and
19  * network bandwidth because there is no requirement to ship
20  * per-transaction state information.
21  *
22  * Replication is either synchronous or not synchronous (async). If it is
23  * async, we just fastpath out of here. If it is sync, then we wait for
24  * the write, flush or apply location on the standby before releasing
25  * the waiting backend. Further complexity in that interaction is
26  * expected in later releases.
27  *
28  * The best performing way to manage the waiting backends is to have a
29  * single ordered queue of waiting backends, so that we can avoid
30  * searching the through all waiters each time we receive a reply.
31  *
32  * In 9.5 or before only a single standby could be considered as
33  * synchronous. In 9.6 we support a priority-based multiple synchronous
34  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35  * supported. The number of synchronous standbys that transactions
36  * must wait for replies from is specified in synchronous_standby_names.
37  * This parameter also specifies a list of standby names and the method
38  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39  *
40  * The method FIRST specifies a priority-based synchronous replication
41  * and makes transaction commits wait until their WAL records are
42  * replicated to the requested number of synchronous standbys chosen based
43  * on their priorities. The standbys whose names appear earlier in the list
44  * are given higher priority and will be considered as synchronous.
45  * Other standby servers appearing later in this list represent potential
46  * synchronous standbys. If any of the current synchronous standbys
47  * disconnects for whatever reason, it will be replaced immediately with
48  * the next-highest-priority standby.
49  *
50  * The method ANY specifies a quorum-based synchronous replication
51  * and makes transaction commits wait until their WAL records are
52  * replicated to at least the requested number of synchronous standbys
53  * in the list. All the standbys appearing in the list are considered as
54  * candidates for quorum synchronous standbys.
55  *
56  * If neither FIRST nor ANY is specified, FIRST is used as the method.
57  * This is for backward compatibility with 9.6 or before where only a
58  * priority-based sync replication was supported.
59  *
60  * Before the standbys chosen from synchronous_standby_names can
61  * become the synchronous standbys they must have caught up with
62  * the primary; that may take some time. Once caught up,
63  * the standbys which are considered as synchronous at that moment
64  * will release waiters from the queue.
65  *
66  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
67  *
68  * IDENTIFICATION
69  *	  src/backend/replication/syncrep.c
70  *
71  *-------------------------------------------------------------------------
72  */
73 #include "postgres.h"
74 
75 #include <unistd.h>
76 
77 #include "access/xact.h"
78 #include "miscadmin.h"
79 #include "pgstat.h"
80 #include "replication/syncrep.h"
81 #include "replication/walsender.h"
82 #include "replication/walsender_private.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "tcop/tcopprot.h"
86 #include "utils/builtins.h"
87 #include "utils/ps_status.h"
88 
89 /* User-settable parameters for sync rep */
90 char	   *SyncRepStandbyNames;
91 
92 #define SyncStandbysDefined() \
93 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94 
95 static bool announce_next_takeover = true;
96 
97 SyncRepConfigData *SyncRepConfig = NULL;
98 static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
99 
100 static void SyncRepQueueInsert(int mode);
101 static void SyncRepCancelWait(void);
102 static int	SyncRepWakeQueue(bool all, int mode);
103 
104 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 								 XLogRecPtr *flushPtr,
106 								 XLogRecPtr *applyPtr,
107 								 bool *am_sync);
108 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 									   XLogRecPtr *flushPtr,
110 									   XLogRecPtr *applyPtr,
111 									   SyncRepStandbyData *sync_standbys,
112 									   int num_standbys);
113 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 										  XLogRecPtr *flushPtr,
115 										  XLogRecPtr *applyPtr,
116 										  SyncRepStandbyData *sync_standbys,
117 										  int num_standbys,
118 										  uint8 nth);
119 static int	SyncRepGetStandbyPriority(void);
120 static int	standby_priority_comparator(const void *a, const void *b);
121 static int	cmp_lsn(const void *a, const void *b);
122 
123 #ifdef USE_ASSERT_CHECKING
124 static bool SyncRepQueueIsOrderedByLSN(int mode);
125 #endif
126 
127 /*
128  * ===========================================================
129  * Synchronous Replication functions for normal user backends
130  * ===========================================================
131  */
132 
133 /*
134  * Wait for synchronous replication, if requested by user.
135  *
136  * Initially backends start in state SYNC_REP_NOT_WAITING and then
137  * change that state to SYNC_REP_WAITING before adding ourselves
138  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
139  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
140  * This backend then resets its state to SYNC_REP_NOT_WAITING.
141  *
142  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
143  * represents a commit record.  If it doesn't, then we wait only for the WAL
144  * to be flushed if synchronous_commit is set to the higher level of
145  * remote_apply, because only commit records provide apply feedback.
146  */
147 void
SyncRepWaitForLSN(XLogRecPtr lsn,bool commit)148 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
149 {
150 	char	   *new_status = NULL;
151 	const char *old_status;
152 	int			mode;
153 
154 	/*
155 	 * This should be called while holding interrupts during a transaction
156 	 * commit to prevent the follow-up shared memory queue cleanups to be
157 	 * influenced by external interruptions.
158 	 */
159 	Assert(InterruptHoldoffCount > 0);
160 
161 	/*
162 	 * Fast exit if user has not requested sync replication, or there are no
163 	 * sync replication standby names defined.
164 	 *
165 	 * Since this routine gets called every commit time, it's important to
166 	 * exit quickly if sync replication is not requested. So we check
167 	 * WalSndCtl->sync_standbys_defined flag without the lock and exit
168 	 * immediately if it's false. If it's true, we need to check it again
169 	 * later while holding the lock, to check the flag and operate the sync
170 	 * rep queue atomically. This is necessary to avoid the race condition
171 	 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
172 	 * it's false, the lock is not necessary because we don't touch the queue.
173 	 */
174 	if (!SyncRepRequested() ||
175 		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
176 		return;
177 
178 	/* Cap the level for anything other than commit to remote flush only. */
179 	if (commit)
180 		mode = SyncRepWaitMode;
181 	else
182 		mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
183 
184 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
185 	Assert(WalSndCtl != NULL);
186 
187 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
188 	Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
189 
190 	/*
191 	 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
192 	 * set.  See SyncRepUpdateSyncStandbysDefined.
193 	 *
194 	 * Also check that the standby hasn't already replied. Unlikely race
195 	 * condition but we'll be fetching that cache line anyway so it's likely
196 	 * to be a low cost check.
197 	 */
198 	if (!WalSndCtl->sync_standbys_defined ||
199 		lsn <= WalSndCtl->lsn[mode])
200 	{
201 		LWLockRelease(SyncRepLock);
202 		return;
203 	}
204 
205 	/*
206 	 * Set our waitLSN so WALSender will know when to wake us, and add
207 	 * ourselves to the queue.
208 	 */
209 	MyProc->waitLSN = lsn;
210 	MyProc->syncRepState = SYNC_REP_WAITING;
211 	SyncRepQueueInsert(mode);
212 	Assert(SyncRepQueueIsOrderedByLSN(mode));
213 	LWLockRelease(SyncRepLock);
214 
215 	/* Alter ps display to show waiting for sync rep. */
216 	if (update_process_title)
217 	{
218 		int			len;
219 
220 		old_status = get_ps_display(&len);
221 		new_status = (char *) palloc(len + 32 + 1);
222 		memcpy(new_status, old_status, len);
223 		sprintf(new_status + len, " waiting for %X/%X",
224 				LSN_FORMAT_ARGS(lsn));
225 		set_ps_display(new_status);
226 		new_status[len] = '\0'; /* truncate off " waiting ..." */
227 	}
228 
229 	/*
230 	 * Wait for specified LSN to be confirmed.
231 	 *
232 	 * Each proc has its own wait latch, so we perform a normal latch
233 	 * check/wait loop here.
234 	 */
235 	for (;;)
236 	{
237 		int			rc;
238 
239 		/* Must reset the latch before testing state. */
240 		ResetLatch(MyLatch);
241 
242 		/*
243 		 * Acquiring the lock is not needed, the latch ensures proper
244 		 * barriers. If it looks like we're done, we must really be done,
245 		 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
246 		 * it will never update it again, so we can't be seeing a stale value
247 		 * in that case.
248 		 */
249 		if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
250 			break;
251 
252 		/*
253 		 * If a wait for synchronous replication is pending, we can neither
254 		 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
255 		 * lead the client to believe that the transaction aborted, which is
256 		 * not true: it's already committed locally. The former is no good
257 		 * either: the client has requested synchronous replication, and is
258 		 * entitled to assume that an acknowledged commit is also replicated,
259 		 * which might not be true. So in this case we issue a WARNING (which
260 		 * some clients may be able to interpret) and shut off further output.
261 		 * We do NOT reset ProcDiePending, so that the process will die after
262 		 * the commit is cleaned up.
263 		 */
264 		if (ProcDiePending)
265 		{
266 			ereport(WARNING,
267 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
268 					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
269 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
270 			whereToSendOutput = DestNone;
271 			SyncRepCancelWait();
272 			break;
273 		}
274 
275 		/*
276 		 * It's unclear what to do if a query cancel interrupt arrives.  We
277 		 * can't actually abort at this point, but ignoring the interrupt
278 		 * altogether is not helpful, so we just terminate the wait with a
279 		 * suitable warning.
280 		 */
281 		if (QueryCancelPending)
282 		{
283 			QueryCancelPending = false;
284 			ereport(WARNING,
285 					(errmsg("canceling wait for synchronous replication due to user request"),
286 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
287 			SyncRepCancelWait();
288 			break;
289 		}
290 
291 		/*
292 		 * Wait on latch.  Any condition that should wake us up will set the
293 		 * latch, so no need for timeout.
294 		 */
295 		rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
296 					   WAIT_EVENT_SYNC_REP);
297 
298 		/*
299 		 * If the postmaster dies, we'll probably never get an acknowledgment,
300 		 * because all the wal sender processes will exit. So just bail out.
301 		 */
302 		if (rc & WL_POSTMASTER_DEATH)
303 		{
304 			ProcDiePending = true;
305 			whereToSendOutput = DestNone;
306 			SyncRepCancelWait();
307 			break;
308 		}
309 	}
310 
311 	/*
312 	 * WalSender has checked our LSN and has removed us from queue. Clean up
313 	 * state and leave.  It's OK to reset these shared memory fields without
314 	 * holding SyncRepLock, because any walsenders will ignore us anyway when
315 	 * we're not on the queue.  We need a read barrier to make sure we see the
316 	 * changes to the queue link (this might be unnecessary without
317 	 * assertions, but better safe than sorry).
318 	 */
319 	pg_read_barrier();
320 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
321 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
322 	MyProc->waitLSN = 0;
323 
324 	if (new_status)
325 	{
326 		/* Reset ps display */
327 		set_ps_display(new_status);
328 		pfree(new_status);
329 	}
330 }
331 
332 /*
333  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
334  *
335  * Usually we will go at tail of queue, though it's possible that we arrive
336  * here out of order, so start at tail and work back to insertion point.
337  */
338 static void
SyncRepQueueInsert(int mode)339 SyncRepQueueInsert(int mode)
340 {
341 	PGPROC	   *proc;
342 
343 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
344 	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
345 								   &(WalSndCtl->SyncRepQueue[mode]),
346 								   offsetof(PGPROC, syncRepLinks));
347 
348 	while (proc)
349 	{
350 		/*
351 		 * Stop at the queue element that we should after to ensure the queue
352 		 * is ordered by LSN.
353 		 */
354 		if (proc->waitLSN < MyProc->waitLSN)
355 			break;
356 
357 		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
358 									   &(proc->syncRepLinks),
359 									   offsetof(PGPROC, syncRepLinks));
360 	}
361 
362 	if (proc)
363 		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
364 	else
365 		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
366 }
367 
368 /*
369  * Acquire SyncRepLock and cancel any wait currently in progress.
370  */
371 static void
SyncRepCancelWait(void)372 SyncRepCancelWait(void)
373 {
374 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
375 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
376 		SHMQueueDelete(&(MyProc->syncRepLinks));
377 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
378 	LWLockRelease(SyncRepLock);
379 }
380 
381 void
SyncRepCleanupAtProcExit(void)382 SyncRepCleanupAtProcExit(void)
383 {
384 	/*
385 	 * First check if we are removed from the queue without the lock to not
386 	 * slow down backend exit.
387 	 */
388 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
389 	{
390 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
391 
392 		/* maybe we have just been removed, so recheck */
393 		if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
394 			SHMQueueDelete(&(MyProc->syncRepLinks));
395 
396 		LWLockRelease(SyncRepLock);
397 	}
398 }
399 
400 /*
401  * ===========================================================
402  * Synchronous Replication functions for wal sender processes
403  * ===========================================================
404  */
405 
406 /*
407  * Take any action required to initialise sync rep state from config
408  * data. Called at WALSender startup and after each SIGHUP.
409  */
410 void
SyncRepInitConfig(void)411 SyncRepInitConfig(void)
412 {
413 	int			priority;
414 
415 	/*
416 	 * Determine if we are a potential sync standby and remember the result
417 	 * for handling replies from standby.
418 	 */
419 	priority = SyncRepGetStandbyPriority();
420 	if (MyWalSnd->sync_standby_priority != priority)
421 	{
422 		SpinLockAcquire(&MyWalSnd->mutex);
423 		MyWalSnd->sync_standby_priority = priority;
424 		SpinLockRelease(&MyWalSnd->mutex);
425 
426 		ereport(DEBUG1,
427 				(errmsg_internal("standby \"%s\" now has synchronous standby priority %u",
428 								 application_name, priority)));
429 	}
430 }
431 
432 /*
433  * Update the LSNs on each queue based upon our latest state. This
434  * implements a simple policy of first-valid-sync-standby-releases-waiter.
435  *
436  * Other policies are possible, which would change what we do here and
437  * perhaps also which information we store as well.
438  */
439 void
SyncRepReleaseWaiters(void)440 SyncRepReleaseWaiters(void)
441 {
442 	volatile WalSndCtlData *walsndctl = WalSndCtl;
443 	XLogRecPtr	writePtr;
444 	XLogRecPtr	flushPtr;
445 	XLogRecPtr	applyPtr;
446 	bool		got_recptr;
447 	bool		am_sync;
448 	int			numwrite = 0;
449 	int			numflush = 0;
450 	int			numapply = 0;
451 
452 	/*
453 	 * If this WALSender is serving a standby that is not on the list of
454 	 * potential sync standbys then we have nothing to do. If we are still
455 	 * starting up, still running base backup or the current flush position is
456 	 * still invalid, then leave quickly also.  Streaming or stopping WAL
457 	 * senders are allowed to release waiters.
458 	 */
459 	if (MyWalSnd->sync_standby_priority == 0 ||
460 		(MyWalSnd->state != WALSNDSTATE_STREAMING &&
461 		 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
462 		XLogRecPtrIsInvalid(MyWalSnd->flush))
463 	{
464 		announce_next_takeover = true;
465 		return;
466 	}
467 
468 	/*
469 	 * We're a potential sync standby. Release waiters if there are enough
470 	 * sync standbys and we are considered as sync.
471 	 */
472 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
473 
474 	/*
475 	 * Check whether we are a sync standby or not, and calculate the synced
476 	 * positions among all sync standbys.  (Note: although this step does not
477 	 * of itself require holding SyncRepLock, it seems like a good idea to do
478 	 * it after acquiring the lock.  This ensures that the WAL pointers we use
479 	 * to release waiters are newer than any previous execution of this
480 	 * routine used.)
481 	 */
482 	got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
483 
484 	/*
485 	 * If we are managing a sync standby, though we weren't prior to this,
486 	 * then announce we are now a sync standby.
487 	 */
488 	if (announce_next_takeover && am_sync)
489 	{
490 		announce_next_takeover = false;
491 
492 		if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
493 			ereport(LOG,
494 					(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
495 							application_name, MyWalSnd->sync_standby_priority)));
496 		else
497 			ereport(LOG,
498 					(errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
499 							application_name)));
500 	}
501 
502 	/*
503 	 * If the number of sync standbys is less than requested or we aren't
504 	 * managing a sync standby then just leave.
505 	 */
506 	if (!got_recptr || !am_sync)
507 	{
508 		LWLockRelease(SyncRepLock);
509 		announce_next_takeover = !am_sync;
510 		return;
511 	}
512 
513 	/*
514 	 * Set the lsn first so that when we wake backends they will release up to
515 	 * this location.
516 	 */
517 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
518 	{
519 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
520 		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
521 	}
522 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
523 	{
524 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
525 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
526 	}
527 	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
528 	{
529 		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
530 		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
531 	}
532 
533 	LWLockRelease(SyncRepLock);
534 
535 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
536 		 numwrite, LSN_FORMAT_ARGS(writePtr),
537 		 numflush, LSN_FORMAT_ARGS(flushPtr),
538 		 numapply, LSN_FORMAT_ARGS(applyPtr));
539 }
540 
541 /*
542  * Calculate the synced Write, Flush and Apply positions among sync standbys.
543  *
544  * Return false if the number of sync standbys is less than
545  * synchronous_standby_names specifies. Otherwise return true and
546  * store the positions into *writePtr, *flushPtr and *applyPtr.
547  *
548  * On return, *am_sync is set to true if this walsender is connecting to
549  * sync standby. Otherwise it's set to false.
550  */
551 static bool
SyncRepGetSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,bool * am_sync)552 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
553 					 XLogRecPtr *applyPtr, bool *am_sync)
554 {
555 	SyncRepStandbyData *sync_standbys;
556 	int			num_standbys;
557 	int			i;
558 
559 	/* Initialize default results */
560 	*writePtr = InvalidXLogRecPtr;
561 	*flushPtr = InvalidXLogRecPtr;
562 	*applyPtr = InvalidXLogRecPtr;
563 	*am_sync = false;
564 
565 	/* Quick out if not even configured to be synchronous */
566 	if (SyncRepConfig == NULL)
567 		return false;
568 
569 	/* Get standbys that are considered as synchronous at this moment */
570 	num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
571 
572 	/* Am I among the candidate sync standbys? */
573 	for (i = 0; i < num_standbys; i++)
574 	{
575 		if (sync_standbys[i].is_me)
576 		{
577 			*am_sync = true;
578 			break;
579 		}
580 	}
581 
582 	/*
583 	 * Nothing more to do if we are not managing a sync standby or there are
584 	 * not enough synchronous standbys.
585 	 */
586 	if (!(*am_sync) ||
587 		num_standbys < SyncRepConfig->num_sync)
588 	{
589 		pfree(sync_standbys);
590 		return false;
591 	}
592 
593 	/*
594 	 * In a priority-based sync replication, the synced positions are the
595 	 * oldest ones among sync standbys. In a quorum-based, they are the Nth
596 	 * latest ones.
597 	 *
598 	 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
599 	 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
600 	 * because it's a bit more efficient.
601 	 *
602 	 * XXX If the numbers of current and requested sync standbys are the same,
603 	 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
604 	 * positions even in a quorum-based sync replication.
605 	 */
606 	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
607 	{
608 		SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
609 								   sync_standbys, num_standbys);
610 	}
611 	else
612 	{
613 		SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
614 									  sync_standbys, num_standbys,
615 									  SyncRepConfig->num_sync);
616 	}
617 
618 	pfree(sync_standbys);
619 	return true;
620 }
621 
622 /*
623  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
624  */
625 static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys)626 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
627 						   XLogRecPtr *flushPtr,
628 						   XLogRecPtr *applyPtr,
629 						   SyncRepStandbyData *sync_standbys,
630 						   int num_standbys)
631 {
632 	int			i;
633 
634 	/*
635 	 * Scan through all sync standbys and calculate the oldest Write, Flush
636 	 * and Apply positions.  We assume *writePtr et al were initialized to
637 	 * InvalidXLogRecPtr.
638 	 */
639 	for (i = 0; i < num_standbys; i++)
640 	{
641 		XLogRecPtr	write = sync_standbys[i].write;
642 		XLogRecPtr	flush = sync_standbys[i].flush;
643 		XLogRecPtr	apply = sync_standbys[i].apply;
644 
645 		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
646 			*writePtr = write;
647 		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
648 			*flushPtr = flush;
649 		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
650 			*applyPtr = apply;
651 	}
652 }
653 
654 /*
655  * Calculate the Nth latest Write, Flush and Apply positions among sync
656  * standbys.
657  */
658 static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys,uint8 nth)659 SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
660 							  XLogRecPtr *flushPtr,
661 							  XLogRecPtr *applyPtr,
662 							  SyncRepStandbyData *sync_standbys,
663 							  int num_standbys,
664 							  uint8 nth)
665 {
666 	XLogRecPtr *write_array;
667 	XLogRecPtr *flush_array;
668 	XLogRecPtr *apply_array;
669 	int			i;
670 
671 	/* Should have enough candidates, or somebody messed up */
672 	Assert(nth > 0 && nth <= num_standbys);
673 
674 	write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
675 	flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
676 	apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
677 
678 	for (i = 0; i < num_standbys; i++)
679 	{
680 		write_array[i] = sync_standbys[i].write;
681 		flush_array[i] = sync_standbys[i].flush;
682 		apply_array[i] = sync_standbys[i].apply;
683 	}
684 
685 	/* Sort each array in descending order */
686 	qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
687 	qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
688 	qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
689 
690 	/* Get Nth latest Write, Flush, Apply positions */
691 	*writePtr = write_array[nth - 1];
692 	*flushPtr = flush_array[nth - 1];
693 	*applyPtr = apply_array[nth - 1];
694 
695 	pfree(write_array);
696 	pfree(flush_array);
697 	pfree(apply_array);
698 }
699 
700 /*
701  * Compare lsn in order to sort array in descending order.
702  */
703 static int
cmp_lsn(const void * a,const void * b)704 cmp_lsn(const void *a, const void *b)
705 {
706 	XLogRecPtr	lsn1 = *((const XLogRecPtr *) a);
707 	XLogRecPtr	lsn2 = *((const XLogRecPtr *) b);
708 
709 	if (lsn1 > lsn2)
710 		return -1;
711 	else if (lsn1 == lsn2)
712 		return 0;
713 	else
714 		return 1;
715 }
716 
717 /*
718  * Return data about walsenders that are candidates to be sync standbys.
719  *
720  * *standbys is set to a palloc'd array of structs of per-walsender data,
721  * and the number of valid entries (candidate sync senders) is returned.
722  * (This might be more or fewer than num_sync; caller must check.)
723  */
724 int
SyncRepGetCandidateStandbys(SyncRepStandbyData ** standbys)725 SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
726 {
727 	int			i;
728 	int			n;
729 
730 	/* Create result array */
731 	*standbys = (SyncRepStandbyData *)
732 		palloc(max_wal_senders * sizeof(SyncRepStandbyData));
733 
734 	/* Quick exit if sync replication is not requested */
735 	if (SyncRepConfig == NULL)
736 		return 0;
737 
738 	/* Collect raw data from shared memory */
739 	n = 0;
740 	for (i = 0; i < max_wal_senders; i++)
741 	{
742 		volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
743 									 * rearrangement */
744 		SyncRepStandbyData *stby;
745 		WalSndState state;		/* not included in SyncRepStandbyData */
746 
747 		walsnd = &WalSndCtl->walsnds[i];
748 		stby = *standbys + n;
749 
750 		SpinLockAcquire(&walsnd->mutex);
751 		stby->pid = walsnd->pid;
752 		state = walsnd->state;
753 		stby->write = walsnd->write;
754 		stby->flush = walsnd->flush;
755 		stby->apply = walsnd->apply;
756 		stby->sync_standby_priority = walsnd->sync_standby_priority;
757 		SpinLockRelease(&walsnd->mutex);
758 
759 		/* Must be active */
760 		if (stby->pid == 0)
761 			continue;
762 
763 		/* Must be streaming or stopping */
764 		if (state != WALSNDSTATE_STREAMING &&
765 			state != WALSNDSTATE_STOPPING)
766 			continue;
767 
768 		/* Must be synchronous */
769 		if (stby->sync_standby_priority == 0)
770 			continue;
771 
772 		/* Must have a valid flush position */
773 		if (XLogRecPtrIsInvalid(stby->flush))
774 			continue;
775 
776 		/* OK, it's a candidate */
777 		stby->walsnd_index = i;
778 		stby->is_me = (walsnd == MyWalSnd);
779 		n++;
780 	}
781 
782 	/*
783 	 * In quorum mode, we return all the candidates.  In priority mode, if we
784 	 * have too many candidates then return only the num_sync ones of highest
785 	 * priority.
786 	 */
787 	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
788 		n > SyncRepConfig->num_sync)
789 	{
790 		/* Sort by priority ... */
791 		qsort(*standbys, n, sizeof(SyncRepStandbyData),
792 			  standby_priority_comparator);
793 		/* ... then report just the first num_sync ones */
794 		n = SyncRepConfig->num_sync;
795 	}
796 
797 	return n;
798 }
799 
800 /*
801  * qsort comparator to sort SyncRepStandbyData entries by priority
802  */
803 static int
standby_priority_comparator(const void * a,const void * b)804 standby_priority_comparator(const void *a, const void *b)
805 {
806 	const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
807 	const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
808 
809 	/* First, sort by increasing priority value */
810 	if (sa->sync_standby_priority != sb->sync_standby_priority)
811 		return sa->sync_standby_priority - sb->sync_standby_priority;
812 
813 	/*
814 	 * We might have equal priority values; arbitrarily break ties by position
815 	 * in the WALSnd array.  (This is utterly bogus, since that is arrival
816 	 * order dependent, but there are regression tests that rely on it.)
817 	 */
818 	return sa->walsnd_index - sb->walsnd_index;
819 }
820 
821 
822 /*
823  * Check if we are in the list of sync standbys, and if so, determine
824  * priority sequence. Return priority if set, or zero to indicate that
825  * we are not a potential sync standby.
826  *
827  * Compare the parameter SyncRepStandbyNames against the application_name
828  * for this WALSender, or allow any name if we find a wildcard "*".
829  */
830 static int
SyncRepGetStandbyPriority(void)831 SyncRepGetStandbyPriority(void)
832 {
833 	const char *standby_name;
834 	int			priority;
835 	bool		found = false;
836 
837 	/*
838 	 * Since synchronous cascade replication is not allowed, we always set the
839 	 * priority of cascading walsender to zero.
840 	 */
841 	if (am_cascading_walsender)
842 		return 0;
843 
844 	if (!SyncStandbysDefined() || SyncRepConfig == NULL)
845 		return 0;
846 
847 	standby_name = SyncRepConfig->member_names;
848 	for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
849 	{
850 		if (pg_strcasecmp(standby_name, application_name) == 0 ||
851 			strcmp(standby_name, "*") == 0)
852 		{
853 			found = true;
854 			break;
855 		}
856 		standby_name += strlen(standby_name) + 1;
857 	}
858 
859 	if (!found)
860 		return 0;
861 
862 	/*
863 	 * In quorum-based sync replication, all the standbys in the list have the
864 	 * same priority, one.
865 	 */
866 	return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
867 }
868 
869 /*
870  * Walk the specified queue from head.  Set the state of any backends that
871  * need to be woken, remove them from the queue, and then wake them.
872  * Pass all = true to wake whole queue; otherwise, just wake up to
873  * the walsender's LSN.
874  *
875  * The caller must hold SyncRepLock in exclusive mode.
876  */
877 static int
SyncRepWakeQueue(bool all,int mode)878 SyncRepWakeQueue(bool all, int mode)
879 {
880 	volatile WalSndCtlData *walsndctl = WalSndCtl;
881 	PGPROC	   *proc = NULL;
882 	PGPROC	   *thisproc = NULL;
883 	int			numprocs = 0;
884 
885 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
886 	Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
887 	Assert(SyncRepQueueIsOrderedByLSN(mode));
888 
889 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
890 								   &(WalSndCtl->SyncRepQueue[mode]),
891 								   offsetof(PGPROC, syncRepLinks));
892 
893 	while (proc)
894 	{
895 		/*
896 		 * Assume the queue is ordered by LSN
897 		 */
898 		if (!all && walsndctl->lsn[mode] < proc->waitLSN)
899 			return numprocs;
900 
901 		/*
902 		 * Move to next proc, so we can delete thisproc from the queue.
903 		 * thisproc is valid, proc may be NULL after this.
904 		 */
905 		thisproc = proc;
906 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
907 									   &(proc->syncRepLinks),
908 									   offsetof(PGPROC, syncRepLinks));
909 
910 		/*
911 		 * Remove thisproc from queue.
912 		 */
913 		SHMQueueDelete(&(thisproc->syncRepLinks));
914 
915 		/*
916 		 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
917 		 * make sure that it sees the queue link being removed before the
918 		 * syncRepState change.
919 		 */
920 		pg_write_barrier();
921 
922 		/*
923 		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
924 		 * the various states.
925 		 */
926 		thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
927 
928 		/*
929 		 * Wake only when we have set state and removed from queue.
930 		 */
931 		SetLatch(&(thisproc->procLatch));
932 
933 		numprocs++;
934 	}
935 
936 	return numprocs;
937 }
938 
939 /*
940  * The checkpointer calls this as needed to update the shared
941  * sync_standbys_defined flag, so that backends don't remain permanently wedged
942  * if synchronous_standby_names is unset.  It's safe to check the current value
943  * without the lock, because it's only ever updated by one process.  But we
944  * must take the lock to change it.
945  */
946 void
SyncRepUpdateSyncStandbysDefined(void)947 SyncRepUpdateSyncStandbysDefined(void)
948 {
949 	bool		sync_standbys_defined = SyncStandbysDefined();
950 
951 	if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
952 	{
953 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
954 
955 		/*
956 		 * If synchronous_standby_names has been reset to empty, it's futile
957 		 * for backends to continue waiting.  Since the user no longer wants
958 		 * synchronous replication, we'd better wake them up.
959 		 */
960 		if (!sync_standbys_defined)
961 		{
962 			int			i;
963 
964 			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
965 				SyncRepWakeQueue(true, i);
966 		}
967 
968 		/*
969 		 * Only allow people to join the queue when there are synchronous
970 		 * standbys defined.  Without this interlock, there's a race
971 		 * condition: we might wake up all the current waiters; then, some
972 		 * backend that hasn't yet reloaded its config might go to sleep on
973 		 * the queue (and never wake up).  This prevents that.
974 		 */
975 		WalSndCtl->sync_standbys_defined = sync_standbys_defined;
976 
977 		LWLockRelease(SyncRepLock);
978 	}
979 }
980 
981 #ifdef USE_ASSERT_CHECKING
982 static bool
SyncRepQueueIsOrderedByLSN(int mode)983 SyncRepQueueIsOrderedByLSN(int mode)
984 {
985 	PGPROC	   *proc = NULL;
986 	XLogRecPtr	lastLSN;
987 
988 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
989 
990 	lastLSN = 0;
991 
992 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
993 								   &(WalSndCtl->SyncRepQueue[mode]),
994 								   offsetof(PGPROC, syncRepLinks));
995 
996 	while (proc)
997 	{
998 		/*
999 		 * Check the queue is ordered by LSN and that multiple procs don't
1000 		 * have matching LSNs
1001 		 */
1002 		if (proc->waitLSN <= lastLSN)
1003 			return false;
1004 
1005 		lastLSN = proc->waitLSN;
1006 
1007 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1008 									   &(proc->syncRepLinks),
1009 									   offsetof(PGPROC, syncRepLinks));
1010 	}
1011 
1012 	return true;
1013 }
1014 #endif
1015 
1016 /*
1017  * ===========================================================
1018  * Synchronous Replication functions executed by any process
1019  * ===========================================================
1020  */
1021 
1022 bool
check_synchronous_standby_names(char ** newval,void ** extra,GucSource source)1023 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1024 {
1025 	if (*newval != NULL && (*newval)[0] != '\0')
1026 	{
1027 		int			parse_rc;
1028 		SyncRepConfigData *pconf;
1029 
1030 		/* Reset communication variables to ensure a fresh start */
1031 		syncrep_parse_result = NULL;
1032 		syncrep_parse_error_msg = NULL;
1033 
1034 		/* Parse the synchronous_standby_names string */
1035 		syncrep_scanner_init(*newval);
1036 		parse_rc = syncrep_yyparse();
1037 		syncrep_scanner_finish();
1038 
1039 		if (parse_rc != 0 || syncrep_parse_result == NULL)
1040 		{
1041 			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1042 			if (syncrep_parse_error_msg)
1043 				GUC_check_errdetail("%s", syncrep_parse_error_msg);
1044 			else
1045 				GUC_check_errdetail("synchronous_standby_names parser failed");
1046 			return false;
1047 		}
1048 
1049 		if (syncrep_parse_result->num_sync <= 0)
1050 		{
1051 			GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1052 							 syncrep_parse_result->num_sync);
1053 			return false;
1054 		}
1055 
1056 		/* GUC extra value must be malloc'd, not palloc'd */
1057 		pconf = (SyncRepConfigData *)
1058 			malloc(syncrep_parse_result->config_size);
1059 		if (pconf == NULL)
1060 			return false;
1061 		memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1062 
1063 		*extra = (void *) pconf;
1064 
1065 		/*
1066 		 * We need not explicitly clean up syncrep_parse_result.  It, and any
1067 		 * other cruft generated during parsing, will be freed when the
1068 		 * current memory context is deleted.  (This code is generally run in
1069 		 * a short-lived context used for config file processing, so that will
1070 		 * not be very long.)
1071 		 */
1072 	}
1073 	else
1074 		*extra = NULL;
1075 
1076 	return true;
1077 }
1078 
1079 void
assign_synchronous_standby_names(const char * newval,void * extra)1080 assign_synchronous_standby_names(const char *newval, void *extra)
1081 {
1082 	SyncRepConfig = (SyncRepConfigData *) extra;
1083 }
1084 
1085 void
assign_synchronous_commit(int newval,void * extra)1086 assign_synchronous_commit(int newval, void *extra)
1087 {
1088 	switch (newval)
1089 	{
1090 		case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1091 			SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1092 			break;
1093 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1094 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1095 			break;
1096 		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1097 			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1098 			break;
1099 		default:
1100 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
1101 			break;
1102 	}
1103 }
1104