1 /*-------------------------------------------------------------------------
2  *
3  * syncrep.c
4  *
5  * Synchronous replication is new as of PostgreSQL 9.1.
6  *
7  * If requested, transaction commits wait until their commit LSN are
8  * acknowledged by the synchronous standbys.
9  *
10  * This module contains the code for waiting and release of backends.
11  * All code in this module executes on the primary. The core streaming
12  * replication transport remains within WALreceiver/WALsender modules.
13  *
14  * The essence of this design is that it isolates all logic about
15  * waiting/releasing onto the primary. The primary defines which standbys
16  * it wishes to wait for. The standbys are completely unaware of the
17  * durability requirements of transactions on the primary, reducing the
18  * complexity of the code and streamlining both standby operations and
19  * network bandwidth because there is no requirement to ship
20  * per-transaction state information.
21  *
22  * Replication is either synchronous or not synchronous (async). If it is
23  * async, we just fastpath out of here. If it is sync, then we wait for
24  * the write, flush or apply location on the standby before releasing
25  * the waiting backend. Further complexity in that interaction is
26  * expected in later releases.
27  *
28  * The best performing way to manage the waiting backends is to have a
29  * single ordered queue of waiting backends, so that we can avoid
30  * searching the through all waiters each time we receive a reply.
31  *
32  * In 9.5 or before only a single standby could be considered as
33  * synchronous. In 9.6 we support a priority-based multiple synchronous
34  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35  * supported. The number of synchronous standbys that transactions
36  * must wait for replies from is specified in synchronous_standby_names.
37  * This parameter also specifies a list of standby names and the method
38  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39  *
40  * The method FIRST specifies a priority-based synchronous replication
41  * and makes transaction commits wait until their WAL records are
42  * replicated to the requested number of synchronous standbys chosen based
43  * on their priorities. The standbys whose names appear earlier in the list
44  * are given higher priority and will be considered as synchronous.
45  * Other standby servers appearing later in this list represent potential
46  * synchronous standbys. If any of the current synchronous standbys
47  * disconnects for whatever reason, it will be replaced immediately with
48  * the next-highest-priority standby.
49  *
50  * The method ANY specifies a quorum-based synchronous replication
51  * and makes transaction commits wait until their WAL records are
52  * replicated to at least the requested number of synchronous standbys
53  * in the list. All the standbys appearing in the list are considered as
54  * candidates for quorum synchronous standbys.
55  *
56  * If neither FIRST nor ANY is specified, FIRST is used as the method.
57  * This is for backward compatibility with 9.6 or before where only a
58  * priority-based sync replication was supported.
59  *
60  * Before the standbys chosen from synchronous_standby_names can
61  * become the synchronous standbys they must have caught up with
62  * the primary; that may take some time. Once caught up,
63  * the standbys which are considered as synchronous at that moment
64  * will release waiters from the queue.
65  *
66  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
67  *
68  * IDENTIFICATION
69  *	  src/backend/replication/syncrep.c
70  *
71  *-------------------------------------------------------------------------
72  */
73 #include "postgres.h"
74 
75 #include <unistd.h>
76 
77 #include "access/xact.h"
78 #include "miscadmin.h"
79 #include "pgstat.h"
80 #include "replication/syncrep.h"
81 #include "replication/walsender.h"
82 #include "replication/walsender_private.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "tcop/tcopprot.h"
86 #include "utils/builtins.h"
87 #include "utils/ps_status.h"
88 
89 /* User-settable parameters for sync rep */
90 char	   *SyncRepStandbyNames;
91 
92 #define SyncStandbysDefined() \
93 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94 
95 static bool announce_next_takeover = true;
96 
97 SyncRepConfigData *SyncRepConfig = NULL;
98 static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
99 
100 static void SyncRepQueueInsert(int mode);
101 static void SyncRepCancelWait(void);
102 static int	SyncRepWakeQueue(bool all, int mode);
103 
104 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 					 XLogRecPtr *flushPtr,
106 					 XLogRecPtr *applyPtr,
107 					 bool *am_sync);
108 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 						   XLogRecPtr *flushPtr,
110 						   XLogRecPtr *applyPtr,
111 						   SyncRepStandbyData *sync_standbys,
112 						   int num_standbys);
113 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 							  XLogRecPtr *flushPtr,
115 							  XLogRecPtr *applyPtr,
116 							  SyncRepStandbyData *sync_standbys,
117 							  int num_standbys,
118 							  uint8 nth);
119 static int	SyncRepGetStandbyPriority(void);
120 static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
121 static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
122 static int	standby_priority_comparator(const void *a, const void *b);
123 static int	cmp_lsn(const void *a, const void *b);
124 
125 #ifdef USE_ASSERT_CHECKING
126 static bool SyncRepQueueIsOrderedByLSN(int mode);
127 #endif
128 
129 /*
130  * ===========================================================
131  * Synchronous Replication functions for normal user backends
132  * ===========================================================
133  */
134 
135 /*
136  * Wait for synchronous replication, if requested by user.
137  *
138  * Initially backends start in state SYNC_REP_NOT_WAITING and then
139  * change that state to SYNC_REP_WAITING before adding ourselves
140  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
141  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
142  * This backend then resets its state to SYNC_REP_NOT_WAITING.
143  *
144  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
145  * represents a commit record.  If it doesn't, then we wait only for the WAL
146  * to be flushed if synchronous_commit is set to the higher level of
147  * remote_apply, because only commit records provide apply feedback.
148  */
149 void
SyncRepWaitForLSN(XLogRecPtr lsn,bool commit)150 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
151 {
152 	char	   *new_status = NULL;
153 	const char *old_status;
154 	int			mode;
155 
156 	/* Cap the level for anything other than commit to remote flush only. */
157 	if (commit)
158 		mode = SyncRepWaitMode;
159 	else
160 		mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
161 
162 	/*
163 	 * Fast exit if user has not requested sync replication, or there are no
164 	 * sync replication standby names defined. Note that those standbys don't
165 	 * need to be connected.
166 	 */
167 	if (!SyncRepRequested() || !SyncStandbysDefined())
168 		return;
169 
170 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
171 	Assert(WalSndCtl != NULL);
172 
173 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
174 	Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
175 
176 	/*
177 	 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
178 	 * set.  See SyncRepUpdateSyncStandbysDefined.
179 	 *
180 	 * Also check that the standby hasn't already replied. Unlikely race
181 	 * condition but we'll be fetching that cache line anyway so it's likely
182 	 * to be a low cost check.
183 	 */
184 	if (!WalSndCtl->sync_standbys_defined ||
185 		lsn <= WalSndCtl->lsn[mode])
186 	{
187 		LWLockRelease(SyncRepLock);
188 		return;
189 	}
190 
191 	/*
192 	 * Set our waitLSN so WALSender will know when to wake us, and add
193 	 * ourselves to the queue.
194 	 */
195 	MyProc->waitLSN = lsn;
196 	MyProc->syncRepState = SYNC_REP_WAITING;
197 	SyncRepQueueInsert(mode);
198 	Assert(SyncRepQueueIsOrderedByLSN(mode));
199 	LWLockRelease(SyncRepLock);
200 
201 	/* Alter ps display to show waiting for sync rep. */
202 	if (update_process_title)
203 	{
204 		int			len;
205 
206 		old_status = get_ps_display(&len);
207 		new_status = (char *) palloc(len + 32 + 1);
208 		memcpy(new_status, old_status, len);
209 		sprintf(new_status + len, " waiting for %X/%X",
210 				(uint32) (lsn >> 32), (uint32) lsn);
211 		set_ps_display(new_status, false);
212 		new_status[len] = '\0'; /* truncate off " waiting ..." */
213 	}
214 
215 	/*
216 	 * Wait for specified LSN to be confirmed.
217 	 *
218 	 * Each proc has its own wait latch, so we perform a normal latch
219 	 * check/wait loop here.
220 	 */
221 	for (;;)
222 	{
223 		/* Must reset the latch before testing state. */
224 		ResetLatch(MyLatch);
225 
226 		/*
227 		 * Acquiring the lock is not needed, the latch ensures proper
228 		 * barriers. If it looks like we're done, we must really be done,
229 		 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
230 		 * it will never update it again, so we can't be seeing a stale value
231 		 * in that case.
232 		 */
233 		if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
234 			break;
235 
236 		/*
237 		 * If a wait for synchronous replication is pending, we can neither
238 		 * acknowledge the commit nor raise ERROR or FATAL.  The latter would
239 		 * lead the client to believe that the transaction aborted, which is
240 		 * not true: it's already committed locally. The former is no good
241 		 * either: the client has requested synchronous replication, and is
242 		 * entitled to assume that an acknowledged commit is also replicated,
243 		 * which might not be true. So in this case we issue a WARNING (which
244 		 * some clients may be able to interpret) and shut off further output.
245 		 * We do NOT reset ProcDiePending, so that the process will die after
246 		 * the commit is cleaned up.
247 		 */
248 		if (ProcDiePending)
249 		{
250 			ereport(WARNING,
251 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
252 					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
253 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
254 			whereToSendOutput = DestNone;
255 			SyncRepCancelWait();
256 			break;
257 		}
258 
259 		/*
260 		 * It's unclear what to do if a query cancel interrupt arrives.  We
261 		 * can't actually abort at this point, but ignoring the interrupt
262 		 * altogether is not helpful, so we just terminate the wait with a
263 		 * suitable warning.
264 		 */
265 		if (QueryCancelPending)
266 		{
267 			QueryCancelPending = false;
268 			ereport(WARNING,
269 					(errmsg("canceling wait for synchronous replication due to user request"),
270 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
271 			SyncRepCancelWait();
272 			break;
273 		}
274 
275 		/*
276 		 * If the postmaster dies, we'll probably never get an
277 		 * acknowledgement, because all the wal sender processes will exit. So
278 		 * just bail out.
279 		 */
280 		if (!PostmasterIsAlive())
281 		{
282 			ProcDiePending = true;
283 			whereToSendOutput = DestNone;
284 			SyncRepCancelWait();
285 			break;
286 		}
287 
288 		/*
289 		 * Wait on latch.  Any condition that should wake us up will set the
290 		 * latch, so no need for timeout.
291 		 */
292 		WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
293 				  WAIT_EVENT_SYNC_REP);
294 	}
295 
296 	/*
297 	 * WalSender has checked our LSN and has removed us from queue. Clean up
298 	 * state and leave.  It's OK to reset these shared memory fields without
299 	 * holding SyncRepLock, because any walsenders will ignore us anyway when
300 	 * we're not on the queue.  We need a read barrier to make sure we see the
301 	 * changes to the queue link (this might be unnecessary without
302 	 * assertions, but better safe than sorry).
303 	 */
304 	pg_read_barrier();
305 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
306 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
307 	MyProc->waitLSN = 0;
308 
309 	if (new_status)
310 	{
311 		/* Reset ps display */
312 		set_ps_display(new_status, false);
313 		pfree(new_status);
314 	}
315 }
316 
317 /*
318  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
319  *
320  * Usually we will go at tail of queue, though it's possible that we arrive
321  * here out of order, so start at tail and work back to insertion point.
322  */
323 static void
SyncRepQueueInsert(int mode)324 SyncRepQueueInsert(int mode)
325 {
326 	PGPROC	   *proc;
327 
328 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
329 	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
330 								   &(WalSndCtl->SyncRepQueue[mode]),
331 								   offsetof(PGPROC, syncRepLinks));
332 
333 	while (proc)
334 	{
335 		/*
336 		 * Stop at the queue element that we should after to ensure the queue
337 		 * is ordered by LSN.
338 		 */
339 		if (proc->waitLSN < MyProc->waitLSN)
340 			break;
341 
342 		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
343 									   &(proc->syncRepLinks),
344 									   offsetof(PGPROC, syncRepLinks));
345 	}
346 
347 	if (proc)
348 		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
349 	else
350 		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
351 }
352 
353 /*
354  * Acquire SyncRepLock and cancel any wait currently in progress.
355  */
356 static void
SyncRepCancelWait(void)357 SyncRepCancelWait(void)
358 {
359 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
360 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
361 		SHMQueueDelete(&(MyProc->syncRepLinks));
362 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
363 	LWLockRelease(SyncRepLock);
364 }
365 
366 void
SyncRepCleanupAtProcExit(void)367 SyncRepCleanupAtProcExit(void)
368 {
369 	/*
370 	 * First check if we are removed from the queue without the lock to not
371 	 * slow down backend exit.
372 	 */
373 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
374 	{
375 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
376 
377 		/* maybe we have just been removed, so recheck */
378 		if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
379 			SHMQueueDelete(&(MyProc->syncRepLinks));
380 
381 		LWLockRelease(SyncRepLock);
382 	}
383 }
384 
385 /*
386  * ===========================================================
387  * Synchronous Replication functions for wal sender processes
388  * ===========================================================
389  */
390 
391 /*
392  * Take any action required to initialise sync rep state from config
393  * data. Called at WALSender startup and after each SIGHUP.
394  */
395 void
SyncRepInitConfig(void)396 SyncRepInitConfig(void)
397 {
398 	int			priority;
399 
400 	/*
401 	 * Determine if we are a potential sync standby and remember the result
402 	 * for handling replies from standby.
403 	 */
404 	priority = SyncRepGetStandbyPriority();
405 	if (MyWalSnd->sync_standby_priority != priority)
406 	{
407 		SpinLockAcquire(&MyWalSnd->mutex);
408 		MyWalSnd->sync_standby_priority = priority;
409 		SpinLockRelease(&MyWalSnd->mutex);
410 
411 		ereport(DEBUG1,
412 				(errmsg("standby \"%s\" now has synchronous standby priority %u",
413 						application_name, priority)));
414 	}
415 }
416 
417 /*
418  * Update the LSNs on each queue based upon our latest state. This
419  * implements a simple policy of first-valid-sync-standby-releases-waiter.
420  *
421  * Other policies are possible, which would change what we do here and
422  * perhaps also which information we store as well.
423  */
424 void
SyncRepReleaseWaiters(void)425 SyncRepReleaseWaiters(void)
426 {
427 	volatile WalSndCtlData *walsndctl = WalSndCtl;
428 	XLogRecPtr	writePtr;
429 	XLogRecPtr	flushPtr;
430 	XLogRecPtr	applyPtr;
431 	bool		got_recptr;
432 	bool		am_sync;
433 	int			numwrite = 0;
434 	int			numflush = 0;
435 	int			numapply = 0;
436 
437 	/*
438 	 * If this WALSender is serving a standby that is not on the list of
439 	 * potential sync standbys then we have nothing to do. If we are still
440 	 * starting up, still running base backup or the current flush position is
441 	 * still invalid, then leave quickly also.  Streaming or stopping WAL
442 	 * senders are allowed to release waiters.
443 	 */
444 	if (MyWalSnd->sync_standby_priority == 0 ||
445 		(MyWalSnd->state != WALSNDSTATE_STREAMING &&
446 		 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
447 		XLogRecPtrIsInvalid(MyWalSnd->flush))
448 	{
449 		announce_next_takeover = true;
450 		return;
451 	}
452 
453 	/*
454 	 * We're a potential sync standby. Release waiters if there are enough
455 	 * sync standbys and we are considered as sync.
456 	 */
457 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
458 
459 	/*
460 	 * Check whether we are a sync standby or not, and calculate the synced
461 	 * positions among all sync standbys.  (Note: although this step does not
462 	 * of itself require holding SyncRepLock, it seems like a good idea to do
463 	 * it after acquiring the lock.  This ensures that the WAL pointers we use
464 	 * to release waiters are newer than any previous execution of this
465 	 * routine used.)
466 	 */
467 	got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
468 
469 	/*
470 	 * If we are managing a sync standby, though we weren't prior to this,
471 	 * then announce we are now a sync standby.
472 	 */
473 	if (announce_next_takeover && am_sync)
474 	{
475 		announce_next_takeover = false;
476 
477 		if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
478 			ereport(LOG,
479 					(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
480 							application_name, MyWalSnd->sync_standby_priority)));
481 		else
482 			ereport(LOG,
483 					(errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
484 							application_name)));
485 	}
486 
487 	/*
488 	 * If the number of sync standbys is less than requested or we aren't
489 	 * managing a sync standby then just leave.
490 	 */
491 	if (!got_recptr || !am_sync)
492 	{
493 		LWLockRelease(SyncRepLock);
494 		announce_next_takeover = !am_sync;
495 		return;
496 	}
497 
498 	/*
499 	 * Set the lsn first so that when we wake backends they will release up to
500 	 * this location.
501 	 */
502 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
503 	{
504 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
505 		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
506 	}
507 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
508 	{
509 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
510 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
511 	}
512 	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
513 	{
514 		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
515 		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
516 	}
517 
518 	LWLockRelease(SyncRepLock);
519 
520 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
521 		 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
522 		 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
523 		 numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
524 }
525 
526 /*
527  * Calculate the synced Write, Flush and Apply positions among sync standbys.
528  *
529  * Return false if the number of sync standbys is less than
530  * synchronous_standby_names specifies. Otherwise return true and
531  * store the positions into *writePtr, *flushPtr and *applyPtr.
532  *
533  * On return, *am_sync is set to true if this walsender is connecting to
534  * sync standby. Otherwise it's set to false.
535  */
536 static bool
SyncRepGetSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,bool * am_sync)537 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
538 					 XLogRecPtr *applyPtr, bool *am_sync)
539 {
540 	SyncRepStandbyData *sync_standbys;
541 	int			num_standbys;
542 	int			i;
543 
544 	/* Initialize default results */
545 	*writePtr = InvalidXLogRecPtr;
546 	*flushPtr = InvalidXLogRecPtr;
547 	*applyPtr = InvalidXLogRecPtr;
548 	*am_sync = false;
549 
550 	/* Quick out if not even configured to be synchronous */
551 	if (SyncRepConfig == NULL)
552 		return false;
553 
554 	/* Get standbys that are considered as synchronous at this moment */
555 	num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
556 
557 	/* Am I among the candidate sync standbys? */
558 	for (i = 0; i < num_standbys; i++)
559 	{
560 		if (sync_standbys[i].is_me)
561 		{
562 			*am_sync = true;
563 			break;
564 		}
565 	}
566 
567 	/*
568 	 * Nothing more to do if we are not managing a sync standby or there are
569 	 * not enough synchronous standbys.
570 	 */
571 	if (!(*am_sync) ||
572 		num_standbys < SyncRepConfig->num_sync)
573 	{
574 		pfree(sync_standbys);
575 		return false;
576 	}
577 
578 	/*
579 	 * In a priority-based sync replication, the synced positions are the
580 	 * oldest ones among sync standbys. In a quorum-based, they are the Nth
581 	 * latest ones.
582 	 *
583 	 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
584 	 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
585 	 * because it's a bit more efficient.
586 	 *
587 	 * XXX If the numbers of current and requested sync standbys are the same,
588 	 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
589 	 * positions even in a quorum-based sync replication.
590 	 */
591 	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
592 	{
593 		SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
594 								   sync_standbys, num_standbys);
595 	}
596 	else
597 	{
598 		SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
599 									  sync_standbys, num_standbys,
600 									  SyncRepConfig->num_sync);
601 	}
602 
603 	pfree(sync_standbys);
604 	return true;
605 }
606 
607 /*
608  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
609  */
610 static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys)611 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
612 						   XLogRecPtr *flushPtr,
613 						   XLogRecPtr *applyPtr,
614 						   SyncRepStandbyData *sync_standbys,
615 						   int num_standbys)
616 {
617 	int			i;
618 
619 	/*
620 	 * Scan through all sync standbys and calculate the oldest Write, Flush
621 	 * and Apply positions.  We assume *writePtr et al were initialized to
622 	 * InvalidXLogRecPtr.
623 	 */
624 	for (i = 0; i < num_standbys; i++)
625 	{
626 		XLogRecPtr	write = sync_standbys[i].write;
627 		XLogRecPtr	flush = sync_standbys[i].flush;
628 		XLogRecPtr	apply = sync_standbys[i].apply;
629 
630 		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
631 			*writePtr = write;
632 		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
633 			*flushPtr = flush;
634 		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
635 			*applyPtr = apply;
636 	}
637 }
638 
639 /*
640  * Calculate the Nth latest Write, Flush and Apply positions among sync
641  * standbys.
642  */
643 static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys,uint8 nth)644 SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
645 							  XLogRecPtr *flushPtr,
646 							  XLogRecPtr *applyPtr,
647 							  SyncRepStandbyData *sync_standbys,
648 							  int num_standbys,
649 							  uint8 nth)
650 {
651 	XLogRecPtr *write_array;
652 	XLogRecPtr *flush_array;
653 	XLogRecPtr *apply_array;
654 	int			i;
655 
656 	/* Should have enough candidates, or somebody messed up */
657 	Assert(nth > 0 && nth <= num_standbys);
658 
659 	write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
660 	flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
661 	apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
662 
663 	for (i = 0; i < num_standbys; i++)
664 	{
665 		write_array[i] = sync_standbys[i].write;
666 		flush_array[i] = sync_standbys[i].flush;
667 		apply_array[i] = sync_standbys[i].apply;
668 	}
669 
670 	/* Sort each array in descending order */
671 	qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
672 	qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
673 	qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
674 
675 	/* Get Nth latest Write, Flush, Apply positions */
676 	*writePtr = write_array[nth - 1];
677 	*flushPtr = flush_array[nth - 1];
678 	*applyPtr = apply_array[nth - 1];
679 
680 	pfree(write_array);
681 	pfree(flush_array);
682 	pfree(apply_array);
683 }
684 
685 /*
686  * Compare lsn in order to sort array in descending order.
687  */
688 static int
cmp_lsn(const void * a,const void * b)689 cmp_lsn(const void *a, const void *b)
690 {
691 	XLogRecPtr	lsn1 = *((const XLogRecPtr *) a);
692 	XLogRecPtr	lsn2 = *((const XLogRecPtr *) b);
693 
694 	if (lsn1 > lsn2)
695 		return -1;
696 	else if (lsn1 == lsn2)
697 		return 0;
698 	else
699 		return 1;
700 }
701 
702 /*
703  * Return data about walsenders that are candidates to be sync standbys.
704  *
705  * *standbys is set to a palloc'd array of structs of per-walsender data,
706  * and the number of valid entries (candidate sync senders) is returned.
707  * (This might be more or fewer than num_sync; caller must check.)
708  */
709 int
SyncRepGetCandidateStandbys(SyncRepStandbyData ** standbys)710 SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
711 {
712 	int			i;
713 	int			n;
714 
715 	/* Create result array */
716 	*standbys = (SyncRepStandbyData *)
717 		palloc(max_wal_senders * sizeof(SyncRepStandbyData));
718 
719 	/* Quick exit if sync replication is not requested */
720 	if (SyncRepConfig == NULL)
721 		return 0;
722 
723 	/* Collect raw data from shared memory */
724 	n = 0;
725 	for (i = 0; i < max_wal_senders; i++)
726 	{
727 		volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
728 									 * rearrangement */
729 		SyncRepStandbyData *stby;
730 		WalSndState state;		/* not included in SyncRepStandbyData */
731 
732 		walsnd = &WalSndCtl->walsnds[i];
733 		stby = *standbys + n;
734 
735 		SpinLockAcquire(&walsnd->mutex);
736 		stby->pid = walsnd->pid;
737 		state = walsnd->state;
738 		stby->write = walsnd->write;
739 		stby->flush = walsnd->flush;
740 		stby->apply = walsnd->apply;
741 		stby->sync_standby_priority = walsnd->sync_standby_priority;
742 		SpinLockRelease(&walsnd->mutex);
743 
744 		/* Must be active */
745 		if (stby->pid == 0)
746 			continue;
747 
748 		/* Must be streaming or stopping */
749 		if (state != WALSNDSTATE_STREAMING &&
750 			state != WALSNDSTATE_STOPPING)
751 			continue;
752 
753 		/* Must be synchronous */
754 		if (stby->sync_standby_priority == 0)
755 			continue;
756 
757 		/* Must have a valid flush position */
758 		if (XLogRecPtrIsInvalid(stby->flush))
759 			continue;
760 
761 		/* OK, it's a candidate */
762 		stby->walsnd_index = i;
763 		stby->is_me = (walsnd == MyWalSnd);
764 		n++;
765 	}
766 
767 	/*
768 	 * In quorum mode, we return all the candidates.  In priority mode, if we
769 	 * have too many candidates then return only the num_sync ones of highest
770 	 * priority.
771 	 */
772 	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
773 		n > SyncRepConfig->num_sync)
774 	{
775 		/* Sort by priority ... */
776 		qsort(*standbys, n, sizeof(SyncRepStandbyData),
777 			  standby_priority_comparator);
778 		/* ... then report just the first num_sync ones */
779 		n = SyncRepConfig->num_sync;
780 	}
781 
782 	return n;
783 }
784 
785 /*
786  * qsort comparator to sort SyncRepStandbyData entries by priority
787  */
788 static int
standby_priority_comparator(const void * a,const void * b)789 standby_priority_comparator(const void *a, const void *b)
790 {
791 	const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
792 	const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
793 
794 	/* First, sort by increasing priority value */
795 	if (sa->sync_standby_priority != sb->sync_standby_priority)
796 		return sa->sync_standby_priority - sb->sync_standby_priority;
797 
798 	/*
799 	 * We might have equal priority values; arbitrarily break ties by position
800 	 * in the WALSnd array.  (This is utterly bogus, since that is arrival
801 	 * order dependent, but there are regression tests that rely on it.)
802 	 */
803 	return sa->walsnd_index - sb->walsnd_index;
804 }
805 
806 
807 /*
808  * Return the list of sync standbys, or NIL if no sync standby is connected.
809  *
810  * The caller must hold SyncRepLock.
811  *
812  * On return, *am_sync is set to true if this walsender is connecting to
813  * sync standby. Otherwise it's set to false.
814  *
815  * XXX This function is BROKEN and should not be used in new code.  It has
816  * an inherent race condition, since the returned list of integer indexes
817  * might no longer correspond to reality.
818  */
819 List *
SyncRepGetSyncStandbys(bool * am_sync)820 SyncRepGetSyncStandbys(bool *am_sync)
821 {
822 	/* Set default result */
823 	if (am_sync != NULL)
824 		*am_sync = false;
825 
826 	/* Quick exit if sync replication is not requested */
827 	if (SyncRepConfig == NULL)
828 		return NIL;
829 
830 	return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
831 		SyncRepGetSyncStandbysPriority(am_sync) :
832 		SyncRepGetSyncStandbysQuorum(am_sync);
833 }
834 
835 /*
836  * Return the list of all the candidates for quorum sync standbys,
837  * or NIL if no such standby is connected.
838  *
839  * The caller must hold SyncRepLock. This function must be called only in
840  * a quorum-based sync replication.
841  *
842  * On return, *am_sync is set to true if this walsender is connecting to
843  * sync standby. Otherwise it's set to false.
844  */
845 static List *
SyncRepGetSyncStandbysQuorum(bool * am_sync)846 SyncRepGetSyncStandbysQuorum(bool *am_sync)
847 {
848 	List	   *result = NIL;
849 	int			i;
850 	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
851 								 * rearrangement */
852 
853 	Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
854 
855 	for (i = 0; i < max_wal_senders; i++)
856 	{
857 		XLogRecPtr	flush;
858 		WalSndState state;
859 		int			pid;
860 
861 		walsnd = &WalSndCtl->walsnds[i];
862 
863 		SpinLockAcquire(&walsnd->mutex);
864 		pid = walsnd->pid;
865 		flush = walsnd->flush;
866 		state = walsnd->state;
867 		SpinLockRelease(&walsnd->mutex);
868 
869 		/* Must be active */
870 		if (pid == 0)
871 			continue;
872 
873 		/* Must be streaming or stopping */
874 		if (state != WALSNDSTATE_STREAMING &&
875 			state != WALSNDSTATE_STOPPING)
876 			continue;
877 
878 		/* Must be synchronous */
879 		if (walsnd->sync_standby_priority == 0)
880 			continue;
881 
882 		/* Must have a valid flush position */
883 		if (XLogRecPtrIsInvalid(flush))
884 			continue;
885 
886 		/*
887 		 * Consider this standby as a candidate for quorum sync standbys and
888 		 * append it to the result.
889 		 */
890 		result = lappend_int(result, i);
891 		if (am_sync != NULL && walsnd == MyWalSnd)
892 			*am_sync = true;
893 	}
894 
895 	return result;
896 }
897 
898 /*
899  * Return the list of sync standbys chosen based on their priorities,
900  * or NIL if no sync standby is connected.
901  *
902  * If there are multiple standbys with the same priority,
903  * the first one found is selected preferentially.
904  *
905  * The caller must hold SyncRepLock. This function must be called only in
906  * a priority-based sync replication.
907  *
908  * On return, *am_sync is set to true if this walsender is connecting to
909  * sync standby. Otherwise it's set to false.
910  */
911 static List *
SyncRepGetSyncStandbysPriority(bool * am_sync)912 SyncRepGetSyncStandbysPriority(bool *am_sync)
913 {
914 	List	   *result = NIL;
915 	List	   *pending = NIL;
916 	int			lowest_priority;
917 	int			next_highest_priority;
918 	int			this_priority;
919 	int			priority;
920 	int			i;
921 	bool		am_in_pending = false;
922 	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
923 								 * rearrangement */
924 
925 	Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
926 
927 	lowest_priority = SyncRepConfig->nmembers;
928 	next_highest_priority = lowest_priority + 1;
929 
930 	/*
931 	 * Find the sync standbys which have the highest priority (i.e, 1). Also
932 	 * store all the other potential sync standbys into the pending list, in
933 	 * order to scan it later and find other sync standbys from it quickly.
934 	 */
935 	for (i = 0; i < max_wal_senders; i++)
936 	{
937 		XLogRecPtr	flush;
938 		WalSndState state;
939 		int			pid;
940 
941 		walsnd = &WalSndCtl->walsnds[i];
942 
943 		SpinLockAcquire(&walsnd->mutex);
944 		pid = walsnd->pid;
945 		flush = walsnd->flush;
946 		state = walsnd->state;
947 		SpinLockRelease(&walsnd->mutex);
948 
949 		/* Must be active */
950 		if (pid == 0)
951 			continue;
952 
953 		/* Must be streaming or stopping */
954 		if (state != WALSNDSTATE_STREAMING &&
955 			state != WALSNDSTATE_STOPPING)
956 			continue;
957 
958 		/* Must be synchronous */
959 		this_priority = walsnd->sync_standby_priority;
960 		if (this_priority == 0)
961 			continue;
962 
963 		/* Must have a valid flush position */
964 		if (XLogRecPtrIsInvalid(flush))
965 			continue;
966 
967 		/*
968 		 * If the priority is equal to 1, consider this standby as sync and
969 		 * append it to the result. Otherwise append this standby to the
970 		 * pending list to check if it's actually sync or not later.
971 		 */
972 		if (this_priority == 1)
973 		{
974 			result = lappend_int(result, i);
975 			if (am_sync != NULL && walsnd == MyWalSnd)
976 				*am_sync = true;
977 			if (list_length(result) == SyncRepConfig->num_sync)
978 			{
979 				list_free(pending);
980 				return result;	/* Exit if got enough sync standbys */
981 			}
982 		}
983 		else
984 		{
985 			pending = lappend_int(pending, i);
986 			if (am_sync != NULL && walsnd == MyWalSnd)
987 				am_in_pending = true;
988 
989 			/*
990 			 * Track the highest priority among the standbys in the pending
991 			 * list, in order to use it as the starting priority for later
992 			 * scan of the list. This is useful to find quickly the sync
993 			 * standbys from the pending list later because we can skip
994 			 * unnecessary scans for the unused priorities.
995 			 */
996 			if (this_priority < next_highest_priority)
997 				next_highest_priority = this_priority;
998 		}
999 	}
1000 
1001 	/*
1002 	 * Consider all pending standbys as sync if the number of them plus
1003 	 * already-found sync ones is lower than the configuration requests.
1004 	 */
1005 	if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
1006 	{
1007 		bool		needfree = (result != NIL && pending != NIL);
1008 
1009 		/*
1010 		 * Set *am_sync to true if this walsender is in the pending list
1011 		 * because all pending standbys are considered as sync.
1012 		 */
1013 		if (am_sync != NULL && !(*am_sync))
1014 			*am_sync = am_in_pending;
1015 
1016 		result = list_concat(result, pending);
1017 		if (needfree)
1018 			pfree(pending);
1019 		return result;
1020 	}
1021 
1022 	/*
1023 	 * Find the sync standbys from the pending list.
1024 	 */
1025 	priority = next_highest_priority;
1026 	while (priority <= lowest_priority)
1027 	{
1028 		ListCell   *cell;
1029 		ListCell   *prev = NULL;
1030 		ListCell   *next;
1031 
1032 		next_highest_priority = lowest_priority + 1;
1033 
1034 		for (cell = list_head(pending); cell != NULL; cell = next)
1035 		{
1036 			i = lfirst_int(cell);
1037 			walsnd = &WalSndCtl->walsnds[i];
1038 
1039 			next = lnext(cell);
1040 
1041 			this_priority = walsnd->sync_standby_priority;
1042 			if (this_priority == priority)
1043 			{
1044 				result = lappend_int(result, i);
1045 				if (am_sync != NULL && walsnd == MyWalSnd)
1046 					*am_sync = true;
1047 
1048 				/*
1049 				 * We should always exit here after the scan of pending list
1050 				 * starts because we know that the list has enough elements to
1051 				 * reach SyncRepConfig->num_sync.
1052 				 */
1053 				if (list_length(result) == SyncRepConfig->num_sync)
1054 				{
1055 					list_free(pending);
1056 					return result;	/* Exit if got enough sync standbys */
1057 				}
1058 
1059 				/*
1060 				 * Remove the entry for this sync standby from the list to
1061 				 * prevent us from looking at the same entry again.
1062 				 */
1063 				pending = list_delete_cell(pending, cell, prev);
1064 
1065 				continue;
1066 			}
1067 
1068 			if (this_priority < next_highest_priority)
1069 				next_highest_priority = this_priority;
1070 
1071 			prev = cell;
1072 		}
1073 
1074 		priority = next_highest_priority;
1075 	}
1076 
1077 	/*
1078 	 * We might get here if the set of sync_standby_priority values in shared
1079 	 * memory is inconsistent, as can happen transiently after a change in the
1080 	 * synchronous_standby_names setting.  In that case, just return the
1081 	 * incomplete list we have so far.  That will cause the caller to decide
1082 	 * there aren't enough synchronous candidates, which should be a safe
1083 	 * choice until the priority values become consistent again.
1084 	 */
1085 	list_free(pending);
1086 	return result;
1087 }
1088 
1089 /*
1090  * Check if we are in the list of sync standbys, and if so, determine
1091  * priority sequence. Return priority if set, or zero to indicate that
1092  * we are not a potential sync standby.
1093  *
1094  * Compare the parameter SyncRepStandbyNames against the application_name
1095  * for this WALSender, or allow any name if we find a wildcard "*".
1096  */
1097 static int
SyncRepGetStandbyPriority(void)1098 SyncRepGetStandbyPriority(void)
1099 {
1100 	const char *standby_name;
1101 	int			priority;
1102 	bool		found = false;
1103 
1104 	/*
1105 	 * Since synchronous cascade replication is not allowed, we always set the
1106 	 * priority of cascading walsender to zero.
1107 	 */
1108 	if (am_cascading_walsender)
1109 		return 0;
1110 
1111 	if (!SyncStandbysDefined() || SyncRepConfig == NULL)
1112 		return 0;
1113 
1114 	standby_name = SyncRepConfig->member_names;
1115 	for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
1116 	{
1117 		if (pg_strcasecmp(standby_name, application_name) == 0 ||
1118 			strcmp(standby_name, "*") == 0)
1119 		{
1120 			found = true;
1121 			break;
1122 		}
1123 		standby_name += strlen(standby_name) + 1;
1124 	}
1125 
1126 	if (!found)
1127 		return 0;
1128 
1129 	/*
1130 	 * In quorum-based sync replication, all the standbys in the list have the
1131 	 * same priority, one.
1132 	 */
1133 	return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
1134 }
1135 
1136 /*
1137  * Walk the specified queue from head.  Set the state of any backends that
1138  * need to be woken, remove them from the queue, and then wake them.
1139  * Pass all = true to wake whole queue; otherwise, just wake up to
1140  * the walsender's LSN.
1141  *
1142  * Must hold SyncRepLock.
1143  */
1144 static int
SyncRepWakeQueue(bool all,int mode)1145 SyncRepWakeQueue(bool all, int mode)
1146 {
1147 	volatile WalSndCtlData *walsndctl = WalSndCtl;
1148 	PGPROC	   *proc = NULL;
1149 	PGPROC	   *thisproc = NULL;
1150 	int			numprocs = 0;
1151 
1152 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1153 	Assert(SyncRepQueueIsOrderedByLSN(mode));
1154 
1155 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1156 								   &(WalSndCtl->SyncRepQueue[mode]),
1157 								   offsetof(PGPROC, syncRepLinks));
1158 
1159 	while (proc)
1160 	{
1161 		/*
1162 		 * Assume the queue is ordered by LSN
1163 		 */
1164 		if (!all && walsndctl->lsn[mode] < proc->waitLSN)
1165 			return numprocs;
1166 
1167 		/*
1168 		 * Move to next proc, so we can delete thisproc from the queue.
1169 		 * thisproc is valid, proc may be NULL after this.
1170 		 */
1171 		thisproc = proc;
1172 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1173 									   &(proc->syncRepLinks),
1174 									   offsetof(PGPROC, syncRepLinks));
1175 
1176 		/*
1177 		 * Remove thisproc from queue.
1178 		 */
1179 		SHMQueueDelete(&(thisproc->syncRepLinks));
1180 
1181 		/*
1182 		 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
1183 		 * make sure that it sees the queue link being removed before the
1184 		 * syncRepState change.
1185 		 */
1186 		pg_write_barrier();
1187 
1188 		/*
1189 		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
1190 		 * the various states.
1191 		 */
1192 		thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
1193 
1194 		/*
1195 		 * Wake only when we have set state and removed from queue.
1196 		 */
1197 		SetLatch(&(thisproc->procLatch));
1198 
1199 		numprocs++;
1200 	}
1201 
1202 	return numprocs;
1203 }
1204 
1205 /*
1206  * The checkpointer calls this as needed to update the shared
1207  * sync_standbys_defined flag, so that backends don't remain permanently wedged
1208  * if synchronous_standby_names is unset.  It's safe to check the current value
1209  * without the lock, because it's only ever updated by one process.  But we
1210  * must take the lock to change it.
1211  */
1212 void
SyncRepUpdateSyncStandbysDefined(void)1213 SyncRepUpdateSyncStandbysDefined(void)
1214 {
1215 	bool		sync_standbys_defined = SyncStandbysDefined();
1216 
1217 	if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
1218 	{
1219 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1220 
1221 		/*
1222 		 * If synchronous_standby_names has been reset to empty, it's futile
1223 		 * for backends to continue to waiting.  Since the user no longer
1224 		 * wants synchronous replication, we'd better wake them up.
1225 		 */
1226 		if (!sync_standbys_defined)
1227 		{
1228 			int			i;
1229 
1230 			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
1231 				SyncRepWakeQueue(true, i);
1232 		}
1233 
1234 		/*
1235 		 * Only allow people to join the queue when there are synchronous
1236 		 * standbys defined.  Without this interlock, there's a race
1237 		 * condition: we might wake up all the current waiters; then, some
1238 		 * backend that hasn't yet reloaded its config might go to sleep on
1239 		 * the queue (and never wake up).  This prevents that.
1240 		 */
1241 		WalSndCtl->sync_standbys_defined = sync_standbys_defined;
1242 
1243 		LWLockRelease(SyncRepLock);
1244 	}
1245 }
1246 
1247 #ifdef USE_ASSERT_CHECKING
1248 static bool
SyncRepQueueIsOrderedByLSN(int mode)1249 SyncRepQueueIsOrderedByLSN(int mode)
1250 {
1251 	PGPROC	   *proc = NULL;
1252 	XLogRecPtr	lastLSN;
1253 
1254 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1255 
1256 	lastLSN = 0;
1257 
1258 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1259 								   &(WalSndCtl->SyncRepQueue[mode]),
1260 								   offsetof(PGPROC, syncRepLinks));
1261 
1262 	while (proc)
1263 	{
1264 		/*
1265 		 * Check the queue is ordered by LSN and that multiple procs don't
1266 		 * have matching LSNs
1267 		 */
1268 		if (proc->waitLSN <= lastLSN)
1269 			return false;
1270 
1271 		lastLSN = proc->waitLSN;
1272 
1273 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1274 									   &(proc->syncRepLinks),
1275 									   offsetof(PGPROC, syncRepLinks));
1276 	}
1277 
1278 	return true;
1279 }
1280 #endif
1281 
1282 /*
1283  * ===========================================================
1284  * Synchronous Replication functions executed by any process
1285  * ===========================================================
1286  */
1287 
1288 bool
check_synchronous_standby_names(char ** newval,void ** extra,GucSource source)1289 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1290 {
1291 	if (*newval != NULL && (*newval)[0] != '\0')
1292 	{
1293 		int			parse_rc;
1294 		SyncRepConfigData *pconf;
1295 
1296 		/* Reset communication variables to ensure a fresh start */
1297 		syncrep_parse_result = NULL;
1298 		syncrep_parse_error_msg = NULL;
1299 
1300 		/* Parse the synchronous_standby_names string */
1301 		syncrep_scanner_init(*newval);
1302 		parse_rc = syncrep_yyparse();
1303 		syncrep_scanner_finish();
1304 
1305 		if (parse_rc != 0 || syncrep_parse_result == NULL)
1306 		{
1307 			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1308 			if (syncrep_parse_error_msg)
1309 				GUC_check_errdetail("%s", syncrep_parse_error_msg);
1310 			else
1311 				GUC_check_errdetail("synchronous_standby_names parser failed");
1312 			return false;
1313 		}
1314 
1315 		if (syncrep_parse_result->num_sync <= 0)
1316 		{
1317 			GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1318 							 syncrep_parse_result->num_sync);
1319 			return false;
1320 		}
1321 
1322 		/* GUC extra value must be malloc'd, not palloc'd */
1323 		pconf = (SyncRepConfigData *)
1324 			malloc(syncrep_parse_result->config_size);
1325 		if (pconf == NULL)
1326 			return false;
1327 		memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1328 
1329 		*extra = (void *) pconf;
1330 
1331 		/*
1332 		 * We need not explicitly clean up syncrep_parse_result.  It, and any
1333 		 * other cruft generated during parsing, will be freed when the
1334 		 * current memory context is deleted.  (This code is generally run in
1335 		 * a short-lived context used for config file processing, so that will
1336 		 * not be very long.)
1337 		 */
1338 	}
1339 	else
1340 		*extra = NULL;
1341 
1342 	return true;
1343 }
1344 
1345 void
assign_synchronous_standby_names(const char * newval,void * extra)1346 assign_synchronous_standby_names(const char *newval, void *extra)
1347 {
1348 	SyncRepConfig = (SyncRepConfigData *) extra;
1349 }
1350 
1351 void
assign_synchronous_commit(int newval,void * extra)1352 assign_synchronous_commit(int newval, void *extra)
1353 {
1354 	switch (newval)
1355 	{
1356 		case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1357 			SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1358 			break;
1359 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1360 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1361 			break;
1362 		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1363 			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1364 			break;
1365 		default:
1366 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
1367 			break;
1368 	}
1369 }
1370