1 /*-------------------------------------------------------------------------
2  *
3  * syncrep.c
4  *
5  * Synchronous replication is new as of PostgreSQL 9.1.
6  *
7  * If requested, transaction commits wait until their commit LSN are
8  * acknowledged by the synchronous standbys.
9  *
10  * This module contains the code for waiting and release of backends.
11  * All code in this module executes on the primary. The core streaming
12  * replication transport remains within WALreceiver/WALsender modules.
13  *
14  * The essence of this design is that it isolates all logic about
15  * waiting/releasing onto the primary. The primary defines which standbys
16  * it wishes to wait for. The standbys are completely unaware of the
17  * durability requirements of transactions on the primary, reducing the
18  * complexity of the code and streamlining both standby operations and
19  * network bandwidth because there is no requirement to ship
20  * per-transaction state information.
21  *
22  * Replication is either synchronous or not synchronous (async). If it is
23  * async, we just fastpath out of here. If it is sync, then we wait for
24  * the write, flush or apply location on the standby before releasing
25  * the waiting backend. Further complexity in that interaction is
26  * expected in later releases.
27  *
28  * The best performing way to manage the waiting backends is to have a
29  * single ordered queue of waiting backends, so that we can avoid
30  * searching the through all waiters each time we receive a reply.
31  *
32  * In 9.5 or before only a single standby could be considered as
33  * synchronous. In 9.6 we support multiple synchronous standbys.
34  * The number of synchronous standbys that transactions must wait for
35  * replies from is specified in synchronous_standby_names.
36  * This parameter also specifies a list of standby names,
37  * which determines the priority of each standby for being chosen as
38  * a synchronous standby. The standbys whose names appear earlier
39  * in the list are given higher priority and will be considered as
40  * synchronous. Other standby servers appearing later in this list
41  * represent potential synchronous standbys. If any of the current
42  * synchronous standbys disconnects for whatever reason, it will be
43  * replaced immediately with the next-highest-priority standby.
44  *
45  * Before the standbys chosen from synchronous_standby_names can
46  * become the synchronous standbys they must have caught up with
47  * the primary; that may take some time. Once caught up,
48  * the current higher priority standbys which are considered as
49  * synchronous at that moment will release waiters from the queue.
50  *
51  * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
52  *
53  * IDENTIFICATION
54  *	  src/backend/replication/syncrep.c
55  *
56  *-------------------------------------------------------------------------
57  */
58 #include "postgres.h"
59 
60 #include <unistd.h>
61 
62 #include "access/xact.h"
63 #include "miscadmin.h"
64 #include "replication/syncrep.h"
65 #include "replication/walsender.h"
66 #include "replication/walsender_private.h"
67 #include "storage/pmsignal.h"
68 #include "storage/proc.h"
69 #include "tcop/tcopprot.h"
70 #include "utils/builtins.h"
71 #include "utils/ps_status.h"
72 
73 /* User-settable parameters for sync rep */
74 char	   *SyncRepStandbyNames;
75 
76 #define SyncStandbysDefined() \
77 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
78 
79 static bool announce_next_takeover = true;
80 
81 static SyncRepConfigData *SyncRepConfig = NULL;
82 static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
83 
84 static void SyncRepQueueInsert(int mode);
85 static void SyncRepCancelWait(void);
86 static int	SyncRepWakeQueue(bool all, int mode);
87 
88 static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
89 						   XLogRecPtr *flushPtr,
90 						   XLogRecPtr *applyPtr,
91 						   bool *am_sync);
92 static int	SyncRepGetStandbyPriority(void);
93 
94 #ifdef USE_ASSERT_CHECKING
95 static bool SyncRepQueueIsOrderedByLSN(int mode);
96 #endif
97 
98 /*
99  * ===========================================================
100  * Synchronous Replication functions for normal user backends
101  * ===========================================================
102  */
103 
104 /*
105  * Wait for synchronous replication, if requested by user.
106  *
107  * Initially backends start in state SYNC_REP_NOT_WAITING and then
108  * change that state to SYNC_REP_WAITING before adding ourselves
109  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
110  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
111  * This backend then resets its state to SYNC_REP_NOT_WAITING.
112  *
113  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
114  * represents a commit record.  If it doesn't, then we wait only for the WAL
115  * to be flushed if synchronous_commit is set to the higher level of
116  * remote_apply, because only commit records provide apply feedback.
117  */
118 void
SyncRepWaitForLSN(XLogRecPtr lsn,bool commit)119 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
120 {
121 	char	   *new_status = NULL;
122 	const char *old_status;
123 	int			mode;
124 
125 	/* Cap the level for anything other than commit to remote flush only. */
126 	if (commit)
127 		mode = SyncRepWaitMode;
128 	else
129 		mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
130 
131 	/*
132 	 * Fast exit if user has not requested sync replication, or there are no
133 	 * sync replication standby names defined. Note that those standbys don't
134 	 * need to be connected.
135 	 */
136 	if (!SyncRepRequested() || !SyncStandbysDefined())
137 		return;
138 
139 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
140 	Assert(WalSndCtl != NULL);
141 
142 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
143 	Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
144 
145 	/*
146 	 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
147 	 * set.  See SyncRepUpdateSyncStandbysDefined.
148 	 *
149 	 * Also check that the standby hasn't already replied. Unlikely race
150 	 * condition but we'll be fetching that cache line anyway so it's likely
151 	 * to be a low cost check.
152 	 */
153 	if (!WalSndCtl->sync_standbys_defined ||
154 		lsn <= WalSndCtl->lsn[mode])
155 	{
156 		LWLockRelease(SyncRepLock);
157 		return;
158 	}
159 
160 	/*
161 	 * Set our waitLSN so WALSender will know when to wake us, and add
162 	 * ourselves to the queue.
163 	 */
164 	MyProc->waitLSN = lsn;
165 	MyProc->syncRepState = SYNC_REP_WAITING;
166 	SyncRepQueueInsert(mode);
167 	Assert(SyncRepQueueIsOrderedByLSN(mode));
168 	LWLockRelease(SyncRepLock);
169 
170 	/* Alter ps display to show waiting for sync rep. */
171 	if (update_process_title)
172 	{
173 		int			len;
174 
175 		old_status = get_ps_display(&len);
176 		new_status = (char *) palloc(len + 32 + 1);
177 		memcpy(new_status, old_status, len);
178 		sprintf(new_status + len, " waiting for %X/%X",
179 				(uint32) (lsn >> 32), (uint32) lsn);
180 		set_ps_display(new_status, false);
181 		new_status[len] = '\0'; /* truncate off " waiting ..." */
182 	}
183 
184 	/*
185 	 * Wait for specified LSN to be confirmed.
186 	 *
187 	 * Each proc has its own wait latch, so we perform a normal latch
188 	 * check/wait loop here.
189 	 */
190 	for (;;)
191 	{
192 		/* Must reset the latch before testing state. */
193 		ResetLatch(MyLatch);
194 
195 		/*
196 		 * Acquiring the lock is not needed, the latch ensures proper
197 		 * barriers. If it looks like we're done, we must really be done,
198 		 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
199 		 * it will never update it again, so we can't be seeing a stale value
200 		 * in that case.
201 		 */
202 		if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
203 			break;
204 
205 		/*
206 		 * If a wait for synchronous replication is pending, we can neither
207 		 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
208 		 * lead the client to believe that the transaction aborted, which is
209 		 * not true: it's already committed locally. The former is no good
210 		 * either: the client has requested synchronous replication, and is
211 		 * entitled to assume that an acknowledged commit is also replicated,
212 		 * which might not be true. So in this case we issue a WARNING (which
213 		 * some clients may be able to interpret) and shut off further output.
214 		 * We do NOT reset ProcDiePending, so that the process will die after
215 		 * the commit is cleaned up.
216 		 */
217 		if (ProcDiePending)
218 		{
219 			ereport(WARNING,
220 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
221 					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
222 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
223 			whereToSendOutput = DestNone;
224 			SyncRepCancelWait();
225 			break;
226 		}
227 
228 		/*
229 		 * It's unclear what to do if a query cancel interrupt arrives.  We
230 		 * can't actually abort at this point, but ignoring the interrupt
231 		 * altogether is not helpful, so we just terminate the wait with a
232 		 * suitable warning.
233 		 */
234 		if (QueryCancelPending)
235 		{
236 			QueryCancelPending = false;
237 			ereport(WARNING,
238 					(errmsg("canceling wait for synchronous replication due to user request"),
239 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
240 			SyncRepCancelWait();
241 			break;
242 		}
243 
244 		/*
245 		 * If the postmaster dies, we'll probably never get an
246 		 * acknowledgement, because all the wal sender processes will exit. So
247 		 * just bail out.
248 		 */
249 		if (!PostmasterIsAlive())
250 		{
251 			ProcDiePending = true;
252 			whereToSendOutput = DestNone;
253 			SyncRepCancelWait();
254 			break;
255 		}
256 
257 		/*
258 		 * Wait on latch.  Any condition that should wake us up will set the
259 		 * latch, so no need for timeout.
260 		 */
261 		WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
262 	}
263 
264 	/*
265 	 * WalSender has checked our LSN and has removed us from queue. Clean up
266 	 * state and leave.  It's OK to reset these shared memory fields without
267 	 * holding SyncRepLock, because any walsenders will ignore us anyway when
268 	 * we're not on the queue.  We need a read barrier to make sure we see
269 	 * the changes to the queue link (this might be unnecessary without
270 	 * assertions, but better safe than sorry).
271 	 */
272 	pg_read_barrier();
273 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
274 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
275 	MyProc->waitLSN = 0;
276 
277 	if (new_status)
278 	{
279 		/* Reset ps display */
280 		set_ps_display(new_status, false);
281 		pfree(new_status);
282 	}
283 }
284 
285 /*
286  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
287  *
288  * Usually we will go at tail of queue, though it's possible that we arrive
289  * here out of order, so start at tail and work back to insertion point.
290  */
291 static void
SyncRepQueueInsert(int mode)292 SyncRepQueueInsert(int mode)
293 {
294 	PGPROC	   *proc;
295 
296 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
297 	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
298 								   &(WalSndCtl->SyncRepQueue[mode]),
299 								   offsetof(PGPROC, syncRepLinks));
300 
301 	while (proc)
302 	{
303 		/*
304 		 * Stop at the queue element that we should after to ensure the queue
305 		 * is ordered by LSN.
306 		 */
307 		if (proc->waitLSN < MyProc->waitLSN)
308 			break;
309 
310 		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
311 									   &(proc->syncRepLinks),
312 									   offsetof(PGPROC, syncRepLinks));
313 	}
314 
315 	if (proc)
316 		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
317 	else
318 		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
319 }
320 
321 /*
322  * Acquire SyncRepLock and cancel any wait currently in progress.
323  */
324 static void
SyncRepCancelWait(void)325 SyncRepCancelWait(void)
326 {
327 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
328 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
329 		SHMQueueDelete(&(MyProc->syncRepLinks));
330 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
331 	LWLockRelease(SyncRepLock);
332 }
333 
334 void
SyncRepCleanupAtProcExit(void)335 SyncRepCleanupAtProcExit(void)
336 {
337 	/*
338 	 * First check if we are removed from the queue without the lock to not
339 	 * slow down backend exit.
340 	 */
341 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
342 	{
343 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
344 
345 		/* maybe we have just been removed, so recheck */
346 		if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
347 			SHMQueueDelete(&(MyProc->syncRepLinks));
348 
349 		LWLockRelease(SyncRepLock);
350 	}
351 }
352 
353 /*
354  * ===========================================================
355  * Synchronous Replication functions for wal sender processes
356  * ===========================================================
357  */
358 
359 /*
360  * Take any action required to initialise sync rep state from config
361  * data. Called at WALSender startup and after each SIGHUP.
362  */
363 void
SyncRepInitConfig(void)364 SyncRepInitConfig(void)
365 {
366 	int			priority;
367 
368 	/*
369 	 * Determine if we are a potential sync standby and remember the result
370 	 * for handling replies from standby.
371 	 */
372 	priority = SyncRepGetStandbyPriority();
373 	if (MyWalSnd->sync_standby_priority != priority)
374 	{
375 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
376 		MyWalSnd->sync_standby_priority = priority;
377 		LWLockRelease(SyncRepLock);
378 		ereport(DEBUG1,
379 			(errmsg("standby \"%s\" now has synchronous standby priority %u",
380 					application_name, priority)));
381 	}
382 }
383 
384 /*
385  * Update the LSNs on each queue based upon our latest state. This
386  * implements a simple policy of first-valid-sync-standby-releases-waiter.
387  *
388  * Other policies are possible, which would change what we do here and
389  * perhaps also which information we store as well.
390  */
391 void
SyncRepReleaseWaiters(void)392 SyncRepReleaseWaiters(void)
393 {
394 	volatile WalSndCtlData *walsndctl = WalSndCtl;
395 	XLogRecPtr	writePtr;
396 	XLogRecPtr	flushPtr;
397 	XLogRecPtr	applyPtr;
398 	bool		got_oldest;
399 	bool		am_sync;
400 	int			numwrite = 0;
401 	int			numflush = 0;
402 	int			numapply = 0;
403 
404 	/*
405 	 * If this WALSender is serving a standby that is not on the list of
406 	 * potential sync standbys then we have nothing to do. If we are still
407 	 * starting up, still running base backup or the current flush position is
408 	 * still invalid, then leave quickly also.  Streaming or stopping WAL
409 	 * senders are allowed to release waiters.
410 	 */
411 	if (MyWalSnd->sync_standby_priority == 0 ||
412 		(MyWalSnd->state != WALSNDSTATE_STREAMING &&
413 		 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
414 		XLogRecPtrIsInvalid(MyWalSnd->flush))
415 	{
416 		announce_next_takeover = true;
417 		return;
418 	}
419 
420 	/*
421 	 * We're a potential sync standby. Release waiters if there are enough
422 	 * sync standbys and we are considered as sync.
423 	 */
424 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
425 
426 	/*
427 	 * Check whether we are a sync standby or not, and calculate the oldest
428 	 * positions among all sync standbys.
429 	 */
430 	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
431 											&applyPtr, &am_sync);
432 
433 	/*
434 	 * If we are managing a sync standby, though we weren't prior to this,
435 	 * then announce we are now a sync standby.
436 	 */
437 	if (announce_next_takeover && am_sync)
438 	{
439 		announce_next_takeover = false;
440 		ereport(LOG,
441 				(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
442 						application_name, MyWalSnd->sync_standby_priority)));
443 	}
444 
445 	/*
446 	 * If the number of sync standbys is less than requested or we aren't
447 	 * managing a sync standby then just leave.
448 	 */
449 	if (!got_oldest || !am_sync)
450 	{
451 		LWLockRelease(SyncRepLock);
452 		announce_next_takeover = !am_sync;
453 		return;
454 	}
455 
456 	/*
457 	 * Set the lsn first so that when we wake backends they will release up to
458 	 * this location.
459 	 */
460 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
461 	{
462 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
463 		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
464 	}
465 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
466 	{
467 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
468 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
469 	}
470 	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
471 	{
472 		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
473 		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
474 	}
475 
476 	LWLockRelease(SyncRepLock);
477 
478 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
479 		 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
480 		 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
481 		 numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
482 }
483 
484 /*
485  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
486  *
487  * Return false if the number of sync standbys is less than
488  * synchronous_standby_names specifies. Otherwise return true and
489  * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
490  *
491  * On return, *am_sync is set to true if this walsender is connecting to
492  * sync standby. Otherwise it's set to false.
493  */
494 static bool
SyncRepGetOldestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,bool * am_sync)495 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
496 						   XLogRecPtr *applyPtr, bool *am_sync)
497 {
498 	List	   *sync_standbys;
499 	ListCell   *cell;
500 
501 	*writePtr = InvalidXLogRecPtr;
502 	*flushPtr = InvalidXLogRecPtr;
503 	*applyPtr = InvalidXLogRecPtr;
504 	*am_sync = false;
505 
506 	/* Get standbys that are considered as synchronous at this moment */
507 	sync_standbys = SyncRepGetSyncStandbys(am_sync);
508 
509 	/*
510 	 * Quick exit if we are not managing a sync standby or there are not
511 	 * enough synchronous standbys.
512 	 */
513 	if (!(*am_sync) ||
514 		SyncRepConfig == NULL ||
515 		list_length(sync_standbys) < SyncRepConfig->num_sync)
516 	{
517 		list_free(sync_standbys);
518 		return false;
519 	}
520 
521 	/*
522 	 * Scan through all sync standbys and calculate the oldest Write, Flush
523 	 * and Apply positions.
524 	 */
525 	foreach(cell, sync_standbys)
526 	{
527 		WalSnd	   *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
528 		XLogRecPtr	write;
529 		XLogRecPtr	flush;
530 		XLogRecPtr	apply;
531 
532 		SpinLockAcquire(&walsnd->mutex);
533 		write = walsnd->write;
534 		flush = walsnd->flush;
535 		apply = walsnd->apply;
536 		SpinLockRelease(&walsnd->mutex);
537 
538 		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
539 			*writePtr = write;
540 		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
541 			*flushPtr = flush;
542 		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
543 			*applyPtr = apply;
544 	}
545 
546 	list_free(sync_standbys);
547 	return true;
548 }
549 
550 /*
551  * Return the list of sync standbys, or NIL if no sync standby is connected.
552  *
553  * If there are multiple standbys with the same priority,
554  * the first one found is selected preferentially.
555  * The caller must hold SyncRepLock.
556  *
557  * On return, *am_sync is set to true if this walsender is connecting to
558  * sync standby. Otherwise it's set to false.
559  */
560 List *
SyncRepGetSyncStandbys(bool * am_sync)561 SyncRepGetSyncStandbys(bool *am_sync)
562 {
563 	List	   *result = NIL;
564 	List	   *pending = NIL;
565 	int			lowest_priority;
566 	int			next_highest_priority;
567 	int			this_priority;
568 	int			priority;
569 	int			i;
570 	bool		am_in_pending = false;
571 	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
572 								 * rearrangement */
573 
574 	/* Set default result */
575 	if (am_sync != NULL)
576 		*am_sync = false;
577 
578 	/* Quick exit if sync replication is not requested */
579 	if (SyncRepConfig == NULL)
580 		return NIL;
581 
582 	lowest_priority = SyncRepConfig->nmembers;
583 	next_highest_priority = lowest_priority + 1;
584 
585 	/*
586 	 * Find the sync standbys which have the highest priority (i.e, 1). Also
587 	 * store all the other potential sync standbys into the pending list, in
588 	 * order to scan it later and find other sync standbys from it quickly.
589 	 */
590 	for (i = 0; i < max_wal_senders; i++)
591 	{
592 		walsnd = &WalSndCtl->walsnds[i];
593 
594 		/* Must be active */
595 		if (walsnd->pid == 0)
596 			continue;
597 
598 		/* Must be streaming or stopping */
599 		if (walsnd->state != WALSNDSTATE_STREAMING &&
600 			walsnd->state != WALSNDSTATE_STOPPING)
601 			continue;
602 
603 		/* Must be synchronous */
604 		this_priority = walsnd->sync_standby_priority;
605 		if (this_priority == 0)
606 			continue;
607 
608 		/* Must have a valid flush position */
609 		if (XLogRecPtrIsInvalid(walsnd->flush))
610 			continue;
611 
612 		/*
613 		 * If the priority is equal to 1, consider this standby as sync and
614 		 * append it to the result. Otherwise append this standby to the
615 		 * pending list to check if it's actually sync or not later.
616 		 */
617 		if (this_priority == 1)
618 		{
619 			result = lappend_int(result, i);
620 			if (am_sync != NULL && walsnd == MyWalSnd)
621 				*am_sync = true;
622 			if (list_length(result) == SyncRepConfig->num_sync)
623 			{
624 				list_free(pending);
625 				return result;	/* Exit if got enough sync standbys */
626 			}
627 		}
628 		else
629 		{
630 			pending = lappend_int(pending, i);
631 			if (am_sync != NULL && walsnd == MyWalSnd)
632 				am_in_pending = true;
633 
634 			/*
635 			 * Track the highest priority among the standbys in the pending
636 			 * list, in order to use it as the starting priority for later
637 			 * scan of the list. This is useful to find quickly the sync
638 			 * standbys from the pending list later because we can skip
639 			 * unnecessary scans for the unused priorities.
640 			 */
641 			if (this_priority < next_highest_priority)
642 				next_highest_priority = this_priority;
643 		}
644 	}
645 
646 	/*
647 	 * Consider all pending standbys as sync if the number of them plus
648 	 * already-found sync ones is lower than the configuration requests.
649 	 */
650 	if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
651 	{
652 		bool		needfree = (result != NIL && pending != NIL);
653 
654 		/*
655 		 * Set *am_sync to true if this walsender is in the pending list
656 		 * because all pending standbys are considered as sync.
657 		 */
658 		if (am_sync != NULL && !(*am_sync))
659 			*am_sync = am_in_pending;
660 
661 		result = list_concat(result, pending);
662 		if (needfree)
663 			pfree(pending);
664 		return result;
665 	}
666 
667 	/*
668 	 * Find the sync standbys from the pending list.
669 	 */
670 	priority = next_highest_priority;
671 	while (priority <= lowest_priority)
672 	{
673 		ListCell   *cell;
674 		ListCell   *prev = NULL;
675 		ListCell   *next;
676 
677 		next_highest_priority = lowest_priority + 1;
678 
679 		for (cell = list_head(pending); cell != NULL; cell = next)
680 		{
681 			i = lfirst_int(cell);
682 			walsnd = &WalSndCtl->walsnds[i];
683 
684 			next = lnext(cell);
685 
686 			this_priority = walsnd->sync_standby_priority;
687 			if (this_priority == priority)
688 			{
689 				result = lappend_int(result, i);
690 				if (am_sync != NULL && walsnd == MyWalSnd)
691 					*am_sync = true;
692 
693 				/*
694 				 * We should always exit here after the scan of pending list
695 				 * starts because we know that the list has enough elements to
696 				 * reach SyncRepConfig->num_sync.
697 				 */
698 				if (list_length(result) == SyncRepConfig->num_sync)
699 				{
700 					list_free(pending);
701 					return result;		/* Exit if got enough sync standbys */
702 				}
703 
704 				/*
705 				 * Remove the entry for this sync standby from the list to
706 				 * prevent us from looking at the same entry again.
707 				 */
708 				pending = list_delete_cell(pending, cell, prev);
709 
710 				continue;
711 			}
712 
713 			if (this_priority < next_highest_priority)
714 				next_highest_priority = this_priority;
715 
716 			prev = cell;
717 		}
718 
719 		priority = next_highest_priority;
720 	}
721 
722 	/*
723 	 * We might get here if the set of sync_standby_priority values in shared
724 	 * memory is inconsistent, as can happen transiently after a change in the
725 	 * synchronous_standby_names setting.  In that case, just return the
726 	 * incomplete list we have so far.  That will cause the caller to decide
727 	 * there aren't enough synchronous candidates, which should be a safe
728 	 * choice until the priority values become consistent again.
729 	 */
730 	list_free(pending);
731 	return result;
732 }
733 
734 /*
735  * Check if we are in the list of sync standbys, and if so, determine
736  * priority sequence. Return priority if set, or zero to indicate that
737  * we are not a potential sync standby.
738  *
739  * Compare the parameter SyncRepStandbyNames against the application_name
740  * for this WALSender, or allow any name if we find a wildcard "*".
741  */
742 static int
SyncRepGetStandbyPriority(void)743 SyncRepGetStandbyPriority(void)
744 {
745 	const char *standby_name;
746 	int			priority;
747 	bool		found = false;
748 
749 	/*
750 	 * Since synchronous cascade replication is not allowed, we always set the
751 	 * priority of cascading walsender to zero.
752 	 */
753 	if (am_cascading_walsender)
754 		return 0;
755 
756 	if (!SyncStandbysDefined() || SyncRepConfig == NULL)
757 		return 0;
758 
759 	standby_name = SyncRepConfig->member_names;
760 	for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
761 	{
762 		if (pg_strcasecmp(standby_name, application_name) == 0 ||
763 			strcmp(standby_name, "*") == 0)
764 		{
765 			found = true;
766 			break;
767 		}
768 		standby_name += strlen(standby_name) + 1;
769 	}
770 
771 	return (found ? priority : 0);
772 }
773 
774 /*
775  * Walk the specified queue from head.  Set the state of any backends that
776  * need to be woken, remove them from the queue, and then wake them.
777  * Pass all = true to wake whole queue; otherwise, just wake up to
778  * the walsender's LSN.
779  *
780  * Must hold SyncRepLock.
781  */
782 static int
SyncRepWakeQueue(bool all,int mode)783 SyncRepWakeQueue(bool all, int mode)
784 {
785 	volatile WalSndCtlData *walsndctl = WalSndCtl;
786 	PGPROC	   *proc = NULL;
787 	PGPROC	   *thisproc = NULL;
788 	int			numprocs = 0;
789 
790 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
791 	Assert(SyncRepQueueIsOrderedByLSN(mode));
792 
793 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
794 								   &(WalSndCtl->SyncRepQueue[mode]),
795 								   offsetof(PGPROC, syncRepLinks));
796 
797 	while (proc)
798 	{
799 		/*
800 		 * Assume the queue is ordered by LSN
801 		 */
802 		if (!all && walsndctl->lsn[mode] < proc->waitLSN)
803 			return numprocs;
804 
805 		/*
806 		 * Move to next proc, so we can delete thisproc from the queue.
807 		 * thisproc is valid, proc may be NULL after this.
808 		 */
809 		thisproc = proc;
810 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
811 									   &(proc->syncRepLinks),
812 									   offsetof(PGPROC, syncRepLinks));
813 
814 		/*
815 		 * Remove thisproc from queue.
816 		 */
817 		SHMQueueDelete(&(thisproc->syncRepLinks));
818 
819 		/*
820 		 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
821 		 * make sure that it sees the queue link being removed before the
822 		 * syncRepState change.
823 		 */
824 		pg_write_barrier();
825 
826 		/*
827 		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
828 		 * the various states.
829 		 */
830 		thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
831 
832 		/*
833 		 * Wake only when we have set state and removed from queue.
834 		 */
835 		SetLatch(&(thisproc->procLatch));
836 
837 		numprocs++;
838 	}
839 
840 	return numprocs;
841 }
842 
843 /*
844  * The checkpointer calls this as needed to update the shared
845  * sync_standbys_defined flag, so that backends don't remain permanently wedged
846  * if synchronous_standby_names is unset.  It's safe to check the current value
847  * without the lock, because it's only ever updated by one process.  But we
848  * must take the lock to change it.
849  */
850 void
SyncRepUpdateSyncStandbysDefined(void)851 SyncRepUpdateSyncStandbysDefined(void)
852 {
853 	bool		sync_standbys_defined = SyncStandbysDefined();
854 
855 	if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
856 	{
857 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
858 
859 		/*
860 		 * If synchronous_standby_names has been reset to empty, it's futile
861 		 * for backends to continue to waiting.  Since the user no longer
862 		 * wants synchronous replication, we'd better wake them up.
863 		 */
864 		if (!sync_standbys_defined)
865 		{
866 			int			i;
867 
868 			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
869 				SyncRepWakeQueue(true, i);
870 		}
871 
872 		/*
873 		 * Only allow people to join the queue when there are synchronous
874 		 * standbys defined.  Without this interlock, there's a race
875 		 * condition: we might wake up all the current waiters; then, some
876 		 * backend that hasn't yet reloaded its config might go to sleep on
877 		 * the queue (and never wake up).  This prevents that.
878 		 */
879 		WalSndCtl->sync_standbys_defined = sync_standbys_defined;
880 
881 		LWLockRelease(SyncRepLock);
882 	}
883 }
884 
885 #ifdef USE_ASSERT_CHECKING
886 static bool
SyncRepQueueIsOrderedByLSN(int mode)887 SyncRepQueueIsOrderedByLSN(int mode)
888 {
889 	PGPROC	   *proc = NULL;
890 	XLogRecPtr	lastLSN;
891 
892 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
893 
894 	lastLSN = 0;
895 
896 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
897 								   &(WalSndCtl->SyncRepQueue[mode]),
898 								   offsetof(PGPROC, syncRepLinks));
899 
900 	while (proc)
901 	{
902 		/*
903 		 * Check the queue is ordered by LSN and that multiple procs don't
904 		 * have matching LSNs
905 		 */
906 		if (proc->waitLSN <= lastLSN)
907 			return false;
908 
909 		lastLSN = proc->waitLSN;
910 
911 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
912 									   &(proc->syncRepLinks),
913 									   offsetof(PGPROC, syncRepLinks));
914 	}
915 
916 	return true;
917 }
918 #endif
919 
920 /*
921  * ===========================================================
922  * Synchronous Replication functions executed by any process
923  * ===========================================================
924  */
925 
926 bool
check_synchronous_standby_names(char ** newval,void ** extra,GucSource source)927 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
928 {
929 	if (*newval != NULL && (*newval)[0] != '\0')
930 	{
931 		int			parse_rc;
932 		SyncRepConfigData *pconf;
933 
934 		/* Reset communication variables to ensure a fresh start */
935 		syncrep_parse_result = NULL;
936 		syncrep_parse_error_msg = NULL;
937 
938 		/* Parse the synchronous_standby_names string */
939 		syncrep_scanner_init(*newval);
940 		parse_rc = syncrep_yyparse();
941 		syncrep_scanner_finish();
942 
943 		if (parse_rc != 0 || syncrep_parse_result == NULL)
944 		{
945 			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
946 			if (syncrep_parse_error_msg)
947 				GUC_check_errdetail("%s", syncrep_parse_error_msg);
948 			else
949 				GUC_check_errdetail("synchronous_standby_names parser failed");
950 			return false;
951 		}
952 
953 		if (syncrep_parse_result->num_sync <= 0)
954 		{
955 			GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
956 							 syncrep_parse_result->num_sync);
957 			return false;
958 		}
959 
960 		/* GUC extra value must be malloc'd, not palloc'd */
961 		pconf = (SyncRepConfigData *)
962 			malloc(syncrep_parse_result->config_size);
963 		if (pconf == NULL)
964 			return false;
965 		memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
966 
967 		*extra = (void *) pconf;
968 
969 		/*
970 		 * We need not explicitly clean up syncrep_parse_result.  It, and any
971 		 * other cruft generated during parsing, will be freed when the
972 		 * current memory context is deleted.  (This code is generally run in
973 		 * a short-lived context used for config file processing, so that will
974 		 * not be very long.)
975 		 */
976 	}
977 	else
978 		*extra = NULL;
979 
980 	return true;
981 }
982 
983 void
assign_synchronous_standby_names(const char * newval,void * extra)984 assign_synchronous_standby_names(const char *newval, void *extra)
985 {
986 	SyncRepConfig = (SyncRepConfigData *) extra;
987 }
988 
989 void
assign_synchronous_commit(int newval,void * extra)990 assign_synchronous_commit(int newval, void *extra)
991 {
992 	switch (newval)
993 	{
994 		case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
995 			SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
996 			break;
997 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
998 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
999 			break;
1000 		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1001 			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1002 			break;
1003 		default:
1004 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
1005 			break;
1006 	}
1007 }
1008