1 /*-------------------------------------------------------------------------
2  *
3  * sinvaladt.c
4  *	  POSTGRES shared cache invalidation data manager.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/ipc/sinvaladt.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <signal.h>
18 #include <unistd.h>
19 
20 #include "miscadmin.h"
21 #include "storage/backendid.h"
22 #include "storage/ipc.h"
23 #include "storage/proc.h"
24 #include "storage/procsignal.h"
25 #include "storage/shmem.h"
26 #include "storage/sinvaladt.h"
27 #include "storage/spin.h"
28 #include "access/transam.h"
29 
30 
31 /*
32  * Conceptually, the shared cache invalidation messages are stored in an
33  * infinite array, where maxMsgNum is the next array subscript to store a
34  * submitted message in, minMsgNum is the smallest array subscript containing
35  * a message not yet read by all backends, and we always have maxMsgNum >=
36  * minMsgNum.  (They are equal when there are no messages pending.)  For each
37  * active backend, there is a nextMsgNum pointer indicating the next message it
38  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
39  * backend.
40  *
41  * (In the current implementation, minMsgNum is a lower bound for the
42  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
43  * smallest nextMsgNum --- it may lag behind.  We only update it when
44  * SICleanupQueue is called, and we try not to do that often.)
45  *
46  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
47  * entries.  We translate MsgNum values into circular-buffer indexes by
48  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
49  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
50  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
51  * in the buffer.  If the buffer does overflow, we recover by setting the
52  * "reset" flag for each backend that has fallen too far behind.  A backend
53  * that is in "reset" state is ignored while determining minMsgNum.  When
54  * it does finally attempt to receive inval messages, it must discard all
55  * its invalidatable state, since it won't know what it missed.
56  *
57  * To reduce the probability of needing resets, we send a "catchup" interrupt
58  * to any backend that seems to be falling unreasonably far behind.  The
59  * normal behavior is that at most one such interrupt is in flight at a time;
60  * when a backend completes processing a catchup interrupt, it executes
61  * SICleanupQueue, which will signal the next-furthest-behind backend if
62  * needed.  This avoids undue contention from multiple backends all trying
63  * to catch up at once.  However, the furthest-back backend might be stuck
64  * in a state where it can't catch up.  Eventually it will get reset, so it
65  * won't cause any more problems for anyone but itself.  But we don't want
66  * to find that a bunch of other backends are now too close to the reset
67  * threshold to be saved.  So SICleanupQueue is designed to occasionally
68  * send extra catchup interrupts as the queue gets fuller, to backends that
69  * are far behind and haven't gotten one yet.  As long as there aren't a lot
70  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
71  * that aren't stuck will propagate their interrupts to the next guy.
72  *
73  * We would have problems if the MsgNum values overflow an integer, so
74  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
75  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
76  * large so that we don't need to do this often.  It must be a multiple of
77  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
78  * to be moved when we do it.
79  *
80  * Access to the shared sinval array is protected by two locks, SInvalReadLock
81  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
82  * authorizes them to modify their own ProcState but not to modify or even
83  * look at anyone else's.  When we need to perform array-wide updates,
84  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
85  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
86  * mode) to serialize adding messages to the queue.  Note that a writer
87  * can operate in parallel with one or more readers, because the writer
88  * has no need to touch anyone's ProcState, except in the infrequent cases
89  * when SICleanupQueue is needed.  The only point of overlap is that
90  * the writer wants to change maxMsgNum while readers need to read it.
91  * We deal with that by having a spinlock that readers must take for just
92  * long enough to read maxMsgNum, while writers take it for just long enough
93  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
94  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
95  * spinlock to write maxMsgNum unless you are holding both locks.)
96  *
97  * Note: since maxMsgNum is an int and hence presumably atomically readable/
98  * writable, the spinlock might seem unnecessary.  The reason it is needed
99  * is to provide a memory barrier: we need to be sure that messages written
100  * to the array are actually there before maxMsgNum is increased, and that
101  * readers will see that data after fetching maxMsgNum.  Multiprocessors
102  * that have weak memory-ordering guarantees can fail without the memory
103  * barrier instructions that are included in the spinlock sequences.
104  */
105 
106 
107 /*
108  * Configurable parameters.
109  *
110  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
111  * Must be a power of 2 for speed.
112  *
113  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
114  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
115  *
116  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
117  * before we bother to call SICleanupQueue.
118  *
119  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
120  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
121  *
122  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
123  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
124  *
125  * WRITE_QUANTUM: the max number of messages to push into the buffer per
126  * iteration of SIInsertDataEntries.  Noncritical but should be less than
127  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
128  * per iteration.
129  */
130 
131 #define MAXNUMMESSAGES 4096
132 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
133 #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
134 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
135 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
136 #define WRITE_QUANTUM 64
137 
138 /* Per-backend state in shared invalidation structure */
139 typedef struct ProcState
140 {
141 	/* procPid is zero in an inactive ProcState array entry. */
142 	pid_t		procPid;		/* PID of backend, for signaling */
143 	PGPROC	   *proc;			/* PGPROC of backend */
144 	/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
145 	int			nextMsgNum;		/* next message number to read */
146 	bool		resetState;		/* backend needs to reset its state */
147 	bool		signaled;		/* backend has been sent catchup signal */
148 	bool		hasMessages;	/* backend has unread messages */
149 
150 	/*
151 	 * Backend only sends invalidations, never receives them. This only makes
152 	 * sense for Startup process during recovery because it doesn't maintain a
153 	 * relcache, yet it fires inval messages to allow query backends to see
154 	 * schema changes.
155 	 */
156 	bool		sendOnly;		/* backend only sends, never receives */
157 
158 	/*
159 	 * Next LocalTransactionId to use for each idle backend slot.  We keep
160 	 * this here because it is indexed by BackendId and it is convenient to
161 	 * copy the value to and from local memory when MyBackendId is set. It's
162 	 * meaningless in an active ProcState entry.
163 	 */
164 	LocalTransactionId nextLXID;
165 } ProcState;
166 
167 /* Shared cache invalidation memory segment */
168 typedef struct SISeg
169 {
170 	/*
171 	 * General state information
172 	 */
173 	int			minMsgNum;		/* oldest message still needed */
174 	int			maxMsgNum;		/* next message number to be assigned */
175 	int			nextThreshold;	/* # of messages to call SICleanupQueue */
176 	int			lastBackend;	/* index of last active procState entry, +1 */
177 	int			maxBackends;	/* size of procState array */
178 
179 	slock_t		msgnumLock;		/* spinlock protecting maxMsgNum */
180 
181 	/*
182 	 * Circular buffer holding shared-inval messages
183 	 */
184 	SharedInvalidationMessage buffer[MAXNUMMESSAGES];
185 
186 	/*
187 	 * Per-backend invalidation state info (has MaxBackends entries).
188 	 */
189 	ProcState	procState[FLEXIBLE_ARRAY_MEMBER];
190 } SISeg;
191 
192 static SISeg *shmInvalBuffer;	/* pointer to the shared inval buffer */
193 
194 
195 static LocalTransactionId nextLocalTransactionId;
196 
197 static void CleanupInvalidationState(int status, Datum arg);
198 
199 
200 /*
201  * SInvalShmemSize --- return shared-memory space needed
202  */
203 Size
204 SInvalShmemSize(void)
205 {
206 	Size		size;
207 
208 	size = offsetof(SISeg, procState);
209 	size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
210 
211 	return size;
212 }
213 
214 /*
215  * CreateSharedInvalidationState
216  *		Create and initialize the SI message buffer
217  */
218 void
219 CreateSharedInvalidationState(void)
220 {
221 	int			i;
222 	bool		found;
223 
224 	/* Allocate space in shared memory */
225 	shmInvalBuffer = (SISeg *)
226 		ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
227 	if (found)
228 		return;
229 
230 	/* Clear message counters, save size of procState array, init spinlock */
231 	shmInvalBuffer->minMsgNum = 0;
232 	shmInvalBuffer->maxMsgNum = 0;
233 	shmInvalBuffer->nextThreshold = CLEANUP_MIN;
234 	shmInvalBuffer->lastBackend = 0;
235 	shmInvalBuffer->maxBackends = MaxBackends;
236 	SpinLockInit(&shmInvalBuffer->msgnumLock);
237 
238 	/* The buffer[] array is initially all unused, so we need not fill it */
239 
240 	/* Mark all backends inactive, and initialize nextLXID */
241 	for (i = 0; i < shmInvalBuffer->maxBackends; i++)
242 	{
243 		shmInvalBuffer->procState[i].procPid = 0;	/* inactive */
244 		shmInvalBuffer->procState[i].proc = NULL;
245 		shmInvalBuffer->procState[i].nextMsgNum = 0;	/* meaningless */
246 		shmInvalBuffer->procState[i].resetState = false;
247 		shmInvalBuffer->procState[i].signaled = false;
248 		shmInvalBuffer->procState[i].hasMessages = false;
249 		shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
250 	}
251 }
252 
253 /*
254  * SharedInvalBackendInit
255  *		Initialize a new backend to operate on the sinval buffer
256  */
257 void
258 SharedInvalBackendInit(bool sendOnly)
259 {
260 	int			index;
261 	ProcState  *stateP = NULL;
262 	SISeg	   *segP = shmInvalBuffer;
263 
264 	/*
265 	 * This can run in parallel with read operations, but not with write
266 	 * operations, since SIInsertDataEntries relies on lastBackend to set
267 	 * hasMessages appropriately.
268 	 */
269 	LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
270 
271 	/* Look for a free entry in the procState array */
272 	for (index = 0; index < segP->lastBackend; index++)
273 	{
274 		if (segP->procState[index].procPid == 0)	/* inactive slot? */
275 		{
276 			stateP = &segP->procState[index];
277 			break;
278 		}
279 	}
280 
281 	if (stateP == NULL)
282 	{
283 		if (segP->lastBackend < segP->maxBackends)
284 		{
285 			stateP = &segP->procState[segP->lastBackend];
286 			Assert(stateP->procPid == 0);
287 			segP->lastBackend++;
288 		}
289 		else
290 		{
291 			/*
292 			 * out of procState slots: MaxBackends exceeded -- report normally
293 			 */
294 			MyBackendId = InvalidBackendId;
295 			LWLockRelease(SInvalWriteLock);
296 			ereport(FATAL,
297 					(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298 					 errmsg("sorry, too many clients already")));
299 		}
300 	}
301 
302 	MyBackendId = (stateP - &segP->procState[0]) + 1;
303 
304 	/* Advertise assigned backend ID in MyProc */
305 	MyProc->backendId = MyBackendId;
306 
307 	/* Fetch next local transaction ID into local memory */
308 	nextLocalTransactionId = stateP->nextLXID;
309 
310 	/* mark myself active, with all extant messages already read */
311 	stateP->procPid = MyProcPid;
312 	stateP->proc = MyProc;
313 	stateP->nextMsgNum = segP->maxMsgNum;
314 	stateP->resetState = false;
315 	stateP->signaled = false;
316 	stateP->hasMessages = false;
317 	stateP->sendOnly = sendOnly;
318 
319 	LWLockRelease(SInvalWriteLock);
320 
321 	/* register exit routine to mark my entry inactive at exit */
322 	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
323 
324 	elog(DEBUG4, "my backend ID is %d", MyBackendId);
325 }
326 
327 /*
328  * CleanupInvalidationState
329  *		Mark the current backend as no longer active.
330  *
331  * This function is called via on_shmem_exit() during backend shutdown.
332  *
333  * arg is really of type "SISeg*".
334  */
335 static void
336 CleanupInvalidationState(int status, Datum arg)
337 {
338 	SISeg	   *segP = (SISeg *) DatumGetPointer(arg);
339 	ProcState  *stateP;
340 	int			i;
341 
342 	Assert(PointerIsValid(segP));
343 
344 	LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
345 
346 	stateP = &segP->procState[MyBackendId - 1];
347 
348 	/* Update next local transaction ID for next holder of this backendID */
349 	stateP->nextLXID = nextLocalTransactionId;
350 
351 	/* Mark myself inactive */
352 	stateP->procPid = 0;
353 	stateP->proc = NULL;
354 	stateP->nextMsgNum = 0;
355 	stateP->resetState = false;
356 	stateP->signaled = false;
357 
358 	/* Recompute index of last active backend */
359 	for (i = segP->lastBackend; i > 0; i--)
360 	{
361 		if (segP->procState[i - 1].procPid != 0)
362 			break;
363 	}
364 	segP->lastBackend = i;
365 
366 	LWLockRelease(SInvalWriteLock);
367 }
368 
369 /*
370  * BackendIdGetProc
371  *		Get the PGPROC structure for a backend, given the backend ID.
372  *		The result may be out of date arbitrarily quickly, so the caller
373  *		must be careful about how this information is used.  NULL is
374  *		returned if the backend is not active.
375  */
376 PGPROC *
377 BackendIdGetProc(int backendID)
378 {
379 	PGPROC	   *result = NULL;
380 	SISeg	   *segP = shmInvalBuffer;
381 
382 	/* Need to lock out additions/removals of backends */
383 	LWLockAcquire(SInvalWriteLock, LW_SHARED);
384 
385 	if (backendID > 0 && backendID <= segP->lastBackend)
386 	{
387 		ProcState  *stateP = &segP->procState[backendID - 1];
388 
389 		result = stateP->proc;
390 	}
391 
392 	LWLockRelease(SInvalWriteLock);
393 
394 	return result;
395 }
396 
397 /*
398  * BackendIdGetTransactionIds
399  *		Get the xid and xmin of the backend. The result may be out of date
400  *		arbitrarily quickly, so the caller must be careful about how this
401  *		information is used.
402  */
403 void
404 BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
405 {
406 	SISeg	   *segP = shmInvalBuffer;
407 
408 	*xid = InvalidTransactionId;
409 	*xmin = InvalidTransactionId;
410 
411 	/* Need to lock out additions/removals of backends */
412 	LWLockAcquire(SInvalWriteLock, LW_SHARED);
413 
414 	if (backendID > 0 && backendID <= segP->lastBackend)
415 	{
416 		ProcState  *stateP = &segP->procState[backendID - 1];
417 		PGPROC	   *proc = stateP->proc;
418 
419 		if (proc != NULL)
420 		{
421 			PGXACT	   *xact = &ProcGlobal->allPgXact[proc->pgprocno];
422 
423 			*xid = xact->xid;
424 			*xmin = xact->xmin;
425 		}
426 	}
427 
428 	LWLockRelease(SInvalWriteLock);
429 }
430 
431 /*
432  * SIInsertDataEntries
433  *		Add new invalidation message(s) to the buffer.
434  */
435 void
436 SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
437 {
438 	SISeg	   *segP = shmInvalBuffer;
439 
440 	/*
441 	 * N can be arbitrarily large.  We divide the work into groups of no more
442 	 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
443 	 * an unreasonably long time.  (This is not so much because we care about
444 	 * letting in other writers, as that some just-caught-up backend might be
445 	 * trying to do SICleanupQueue to pass on its signal, and we don't want it
446 	 * to have to wait a long time.)  Also, we need to consider calling
447 	 * SICleanupQueue every so often.
448 	 */
449 	while (n > 0)
450 	{
451 		int			nthistime = Min(n, WRITE_QUANTUM);
452 		int			numMsgs;
453 		int			max;
454 		int			i;
455 
456 		n -= nthistime;
457 
458 		LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
459 
460 		/*
461 		 * If the buffer is full, we *must* acquire some space.  Clean the
462 		 * queue and reset anyone who is preventing space from being freed.
463 		 * Otherwise, clean the queue only when it's exceeded the next
464 		 * fullness threshold.  We have to loop and recheck the buffer state
465 		 * after any call of SICleanupQueue.
466 		 */
467 		for (;;)
468 		{
469 			numMsgs = segP->maxMsgNum - segP->minMsgNum;
470 			if (numMsgs + nthistime > MAXNUMMESSAGES ||
471 				numMsgs >= segP->nextThreshold)
472 				SICleanupQueue(true, nthistime);
473 			else
474 				break;
475 		}
476 
477 		/*
478 		 * Insert new message(s) into proper slot of circular buffer
479 		 */
480 		max = segP->maxMsgNum;
481 		while (nthistime-- > 0)
482 		{
483 			segP->buffer[max % MAXNUMMESSAGES] = *data++;
484 			max++;
485 		}
486 
487 		/* Update current value of maxMsgNum using spinlock */
488 		SpinLockAcquire(&segP->msgnumLock);
489 		segP->maxMsgNum = max;
490 		SpinLockRelease(&segP->msgnumLock);
491 
492 		/*
493 		 * Now that the maxMsgNum change is globally visible, we give everyone
494 		 * a swift kick to make sure they read the newly added messages.
495 		 * Releasing SInvalWriteLock will enforce a full memory barrier, so
496 		 * these (unlocked) changes will be committed to memory before we exit
497 		 * the function.
498 		 */
499 		for (i = 0; i < segP->lastBackend; i++)
500 		{
501 			ProcState  *stateP = &segP->procState[i];
502 
503 			stateP->hasMessages = true;
504 		}
505 
506 		LWLockRelease(SInvalWriteLock);
507 	}
508 }
509 
510 /*
511  * SIGetDataEntries
512  *		get next SI message(s) for current backend, if there are any
513  *
514  * Possible return values:
515  *	0:	 no SI message available
516  *	n>0: next n SI messages have been extracted into data[]
517  * -1:	 SI reset message extracted
518  *
519  * If the return value is less than the array size "datasize", the caller
520  * can assume that there are no more SI messages after the one(s) returned.
521  * Otherwise, another call is needed to collect more messages.
522  *
523  * NB: this can run in parallel with other instances of SIGetDataEntries
524  * executing on behalf of other backends, since each instance will modify only
525  * fields of its own backend's ProcState, and no instance will look at fields
526  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
527  * in shared mode.  Note that this is not exactly the normal (read-only)
528  * interpretation of a shared lock! Look closely at the interactions before
529  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
530  *
531  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
532  * guaranteed that we will return any messages added after the routine is
533  * entered.
534  *
535  * Note: we assume that "datasize" is not so large that it might be important
536  * to break our hold on SInvalReadLock into segments.
537  */
538 int
539 SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
540 {
541 	SISeg	   *segP;
542 	ProcState  *stateP;
543 	int			max;
544 	int			n;
545 
546 	segP = shmInvalBuffer;
547 	stateP = &segP->procState[MyBackendId - 1];
548 
549 	/*
550 	 * Before starting to take locks, do a quick, unlocked test to see whether
551 	 * there can possibly be anything to read.  On a multiprocessor system,
552 	 * it's possible that this load could migrate backwards and occur before
553 	 * we actually enter this function, so we might miss a sinval message that
554 	 * was just added by some other processor.  But they can't migrate
555 	 * backwards over a preceding lock acquisition, so it should be OK.  If we
556 	 * haven't acquired a lock preventing against further relevant
557 	 * invalidations, any such occurrence is not much different than if the
558 	 * invalidation had arrived slightly later in the first place.
559 	 */
560 	if (!stateP->hasMessages)
561 		return 0;
562 
563 	LWLockAcquire(SInvalReadLock, LW_SHARED);
564 
565 	/*
566 	 * We must reset hasMessages before determining how many messages we're
567 	 * going to read.  That way, if new messages arrive after we have
568 	 * determined how many we're reading, the flag will get reset and we'll
569 	 * notice those messages part-way through.
570 	 *
571 	 * Note that, if we don't end up reading all of the messages, we had
572 	 * better be certain to reset this flag before exiting!
573 	 */
574 	stateP->hasMessages = false;
575 
576 	/* Fetch current value of maxMsgNum using spinlock */
577 	SpinLockAcquire(&segP->msgnumLock);
578 	max = segP->maxMsgNum;
579 	SpinLockRelease(&segP->msgnumLock);
580 
581 	if (stateP->resetState)
582 	{
583 		/*
584 		 * Force reset.  We can say we have dealt with any messages added
585 		 * since the reset, as well; and that means we should clear the
586 		 * signaled flag, too.
587 		 */
588 		stateP->nextMsgNum = max;
589 		stateP->resetState = false;
590 		stateP->signaled = false;
591 		LWLockRelease(SInvalReadLock);
592 		return -1;
593 	}
594 
595 	/*
596 	 * Retrieve messages and advance backend's counter, until data array is
597 	 * full or there are no more messages.
598 	 *
599 	 * There may be other backends that haven't read the message(s), so we
600 	 * cannot delete them here.  SICleanupQueue() will eventually remove them
601 	 * from the queue.
602 	 */
603 	n = 0;
604 	while (n < datasize && stateP->nextMsgNum < max)
605 	{
606 		data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
607 		stateP->nextMsgNum++;
608 	}
609 
610 	/*
611 	 * If we have caught up completely, reset our "signaled" flag so that
612 	 * we'll get another signal if we fall behind again.
613 	 *
614 	 * If we haven't caught up completely, reset the hasMessages flag so that
615 	 * we see the remaining messages next time.
616 	 */
617 	if (stateP->nextMsgNum >= max)
618 		stateP->signaled = false;
619 	else
620 		stateP->hasMessages = true;
621 
622 	LWLockRelease(SInvalReadLock);
623 	return n;
624 }
625 
626 /*
627  * SICleanupQueue
628  *		Remove messages that have been consumed by all active backends
629  *
630  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
631  * minFree is the minimum number of message slots to make free.
632  *
633  * Possible side effects of this routine include marking one or more
634  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
635  * to some backend that seems to be getting too far behind.  We signal at
636  * most one backend at a time, for reasons explained at the top of the file.
637  *
638  * Caution: because we transiently release write lock when we have to signal
639  * some other backend, it is NOT guaranteed that there are still minFree
640  * free message slots at exit.  Caller must recheck and perhaps retry.
641  */
642 void
643 SICleanupQueue(bool callerHasWriteLock, int minFree)
644 {
645 	SISeg	   *segP = shmInvalBuffer;
646 	int			min,
647 				minsig,
648 				lowbound,
649 				numMsgs,
650 				i;
651 	ProcState  *needSig = NULL;
652 
653 	/* Lock out all writers and readers */
654 	if (!callerHasWriteLock)
655 		LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
656 	LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
657 
658 	/*
659 	 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
660 	 * furthest-back backend that needs signaling (if any), and reset any
661 	 * backends that are too far back.  Note that because we ignore sendOnly
662 	 * backends here it is possible for them to keep sending messages without
663 	 * a problem even when they are the only active backend.
664 	 */
665 	min = segP->maxMsgNum;
666 	minsig = min - SIG_THRESHOLD;
667 	lowbound = min - MAXNUMMESSAGES + minFree;
668 
669 	for (i = 0; i < segP->lastBackend; i++)
670 	{
671 		ProcState  *stateP = &segP->procState[i];
672 		int			n = stateP->nextMsgNum;
673 
674 		/* Ignore if inactive or already in reset state */
675 		if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
676 			continue;
677 
678 		/*
679 		 * If we must free some space and this backend is preventing it, force
680 		 * him into reset state and then ignore until he catches up.
681 		 */
682 		if (n < lowbound)
683 		{
684 			stateP->resetState = true;
685 			/* no point in signaling him ... */
686 			continue;
687 		}
688 
689 		/* Track the global minimum nextMsgNum */
690 		if (n < min)
691 			min = n;
692 
693 		/* Also see who's furthest back of the unsignaled backends */
694 		if (n < minsig && !stateP->signaled)
695 		{
696 			minsig = n;
697 			needSig = stateP;
698 		}
699 	}
700 	segP->minMsgNum = min;
701 
702 	/*
703 	 * When minMsgNum gets really large, decrement all message counters so as
704 	 * to forestall overflow of the counters.  This happens seldom enough that
705 	 * folding it into the previous loop would be a loser.
706 	 */
707 	if (min >= MSGNUMWRAPAROUND)
708 	{
709 		segP->minMsgNum -= MSGNUMWRAPAROUND;
710 		segP->maxMsgNum -= MSGNUMWRAPAROUND;
711 		for (i = 0; i < segP->lastBackend; i++)
712 		{
713 			/* we don't bother skipping inactive entries here */
714 			segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
715 		}
716 	}
717 
718 	/*
719 	 * Determine how many messages are still in the queue, and set the
720 	 * threshold at which we should repeat SICleanupQueue().
721 	 */
722 	numMsgs = segP->maxMsgNum - segP->minMsgNum;
723 	if (numMsgs < CLEANUP_MIN)
724 		segP->nextThreshold = CLEANUP_MIN;
725 	else
726 		segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
727 
728 	/*
729 	 * Lastly, signal anyone who needs a catchup interrupt.  Since
730 	 * SendProcSignal() might not be fast, we don't want to hold locks while
731 	 * executing it.
732 	 */
733 	if (needSig)
734 	{
735 		pid_t		his_pid = needSig->procPid;
736 		BackendId	his_backendId = (needSig - &segP->procState[0]) + 1;
737 
738 		needSig->signaled = true;
739 		LWLockRelease(SInvalReadLock);
740 		LWLockRelease(SInvalWriteLock);
741 		elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
742 		SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
743 		if (callerHasWriteLock)
744 			LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
745 	}
746 	else
747 	{
748 		LWLockRelease(SInvalReadLock);
749 		if (!callerHasWriteLock)
750 			LWLockRelease(SInvalWriteLock);
751 	}
752 }
753 
754 
755 /*
756  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
757  *
758  * We split VirtualTransactionIds into two parts so that it is possible
759  * to allocate a new one without any contention for shared memory, except
760  * for a bit of additional overhead during backend startup/shutdown.
761  * The high-order part of a VirtualTransactionId is a BackendId, and the
762  * low-order part is a LocalTransactionId, which we assign from a local
763  * counter.  To avoid the risk of a VirtualTransactionId being reused
764  * within a short interval, successive procs occupying the same backend ID
765  * slot should use a consecutive sequence of local IDs, which is implemented
766  * by copying nextLocalTransactionId as seen above.
767  */
768 LocalTransactionId
769 GetNextLocalTransactionId(void)
770 {
771 	LocalTransactionId result;
772 
773 	/* loop to avoid returning InvalidLocalTransactionId at wraparound */
774 	do
775 	{
776 		result = nextLocalTransactionId++;
777 	} while (!LocalTransactionIdIsValid(result));
778 
779 	return result;
780 }
781