1 /*-------------------------------------------------------------------------
2 *
3 * sinvaladt.c
4 * POSTGRES shared cache invalidation data manager.
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/ipc/sinvaladt.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include <signal.h>
18 #include <unistd.h>
19
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/backendid.h"
23 #include "storage/ipc.h"
24 #include "storage/proc.h"
25 #include "storage/procsignal.h"
26 #include "storage/shmem.h"
27 #include "storage/sinvaladt.h"
28 #include "storage/spin.h"
29
30 /*
31 * Conceptually, the shared cache invalidation messages are stored in an
32 * infinite array, where maxMsgNum is the next array subscript to store a
33 * submitted message in, minMsgNum is the smallest array subscript containing
34 * a message not yet read by all backends, and we always have maxMsgNum >=
35 * minMsgNum. (They are equal when there are no messages pending.) For each
36 * active backend, there is a nextMsgNum pointer indicating the next message it
37 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38 * backend.
39 *
40 * (In the current implementation, minMsgNum is a lower bound for the
41 * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42 * smallest nextMsgNum --- it may lag behind. We only update it when
43 * SICleanupQueue is called, and we try not to do that often.)
44 *
45 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46 * entries. We translate MsgNum values into circular-buffer indexes by
47 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48 * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50 * in the buffer. If the buffer does overflow, we recover by setting the
51 * "reset" flag for each backend that has fallen too far behind. A backend
52 * that is in "reset" state is ignored while determining minMsgNum. When
53 * it does finally attempt to receive inval messages, it must discard all
54 * its invalidatable state, since it won't know what it missed.
55 *
56 * To reduce the probability of needing resets, we send a "catchup" interrupt
57 * to any backend that seems to be falling unreasonably far behind. The
58 * normal behavior is that at most one such interrupt is in flight at a time;
59 * when a backend completes processing a catchup interrupt, it executes
60 * SICleanupQueue, which will signal the next-furthest-behind backend if
61 * needed. This avoids undue contention from multiple backends all trying
62 * to catch up at once. However, the furthest-back backend might be stuck
63 * in a state where it can't catch up. Eventually it will get reset, so it
64 * won't cause any more problems for anyone but itself. But we don't want
65 * to find that a bunch of other backends are now too close to the reset
66 * threshold to be saved. So SICleanupQueue is designed to occasionally
67 * send extra catchup interrupts as the queue gets fuller, to backends that
68 * are far behind and haven't gotten one yet. As long as there aren't a lot
69 * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70 * that aren't stuck will propagate their interrupts to the next guy.
71 *
72 * We would have problems if the MsgNum values overflow an integer, so
73 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74 * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75 * large so that we don't need to do this often. It must be a multiple of
76 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77 * to be moved when we do it.
78 *
79 * Access to the shared sinval array is protected by two locks, SInvalReadLock
80 * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81 * authorizes them to modify their own ProcState but not to modify or even
82 * look at anyone else's. When we need to perform array-wide updates,
83 * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84 * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85 * mode) to serialize adding messages to the queue. Note that a writer
86 * can operate in parallel with one or more readers, because the writer
87 * has no need to touch anyone's ProcState, except in the infrequent cases
88 * when SICleanupQueue is needed. The only point of overlap is that
89 * the writer wants to change maxMsgNum while readers need to read it.
90 * We deal with that by having a spinlock that readers must take for just
91 * long enough to read maxMsgNum, while writers take it for just long enough
92 * to write maxMsgNum. (The exact rule is that you need the spinlock to
93 * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94 * spinlock to write maxMsgNum unless you are holding both locks.)
95 *
96 * Note: since maxMsgNum is an int and hence presumably atomically readable/
97 * writable, the spinlock might seem unnecessary. The reason it is needed
98 * is to provide a memory barrier: we need to be sure that messages written
99 * to the array are actually there before maxMsgNum is increased, and that
100 * readers will see that data after fetching maxMsgNum. Multiprocessors
101 * that have weak memory-ordering guarantees can fail without the memory
102 * barrier instructions that are included in the spinlock sequences.
103 */
104
105
106 /*
107 * Configurable parameters.
108 *
109 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110 * Must be a power of 2 for speed.
111 *
112 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113 * Must be a multiple of MAXNUMMESSAGES. Should be large.
114 *
115 * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116 * before we bother to call SICleanupQueue.
117 *
118 * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119 * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120 *
121 * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122 * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123 *
124 * WRITE_QUANTUM: the max number of messages to push into the buffer per
125 * iteration of SIInsertDataEntries. Noncritical but should be less than
126 * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127 * per iteration.
128 */
129
130 #define MAXNUMMESSAGES 4096
131 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 #define WRITE_QUANTUM 64
136
137 /* Per-backend state in shared invalidation structure */
138 typedef struct ProcState
139 {
140 /* procPid is zero in an inactive ProcState array entry. */
141 pid_t procPid; /* PID of backend, for signaling */
142 PGPROC *proc; /* PGPROC of backend */
143 /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
144 int nextMsgNum; /* next message number to read */
145 bool resetState; /* backend needs to reset its state */
146 bool signaled; /* backend has been sent catchup signal */
147 bool hasMessages; /* backend has unread messages */
148
149 /*
150 * Backend only sends invalidations, never receives them. This only makes
151 * sense for Startup process during recovery because it doesn't maintain a
152 * relcache, yet it fires inval messages to allow query backends to see
153 * schema changes.
154 */
155 bool sendOnly; /* backend only sends, never receives */
156
157 /*
158 * Next LocalTransactionId to use for each idle backend slot. We keep
159 * this here because it is indexed by BackendId and it is convenient to
160 * copy the value to and from local memory when MyBackendId is set. It's
161 * meaningless in an active ProcState entry.
162 */
163 LocalTransactionId nextLXID;
164 } ProcState;
165
166 /* Shared cache invalidation memory segment */
167 typedef struct SISeg
168 {
169 /*
170 * General state information
171 */
172 int minMsgNum; /* oldest message still needed */
173 int maxMsgNum; /* next message number to be assigned */
174 int nextThreshold; /* # of messages to call SICleanupQueue */
175 int lastBackend; /* index of last active procState entry, +1 */
176 int maxBackends; /* size of procState array */
177
178 slock_t msgnumLock; /* spinlock protecting maxMsgNum */
179
180 /*
181 * Circular buffer holding shared-inval messages
182 */
183 SharedInvalidationMessage buffer[MAXNUMMESSAGES];
184
185 /*
186 * Per-backend invalidation state info (has MaxBackends entries).
187 */
188 ProcState procState[FLEXIBLE_ARRAY_MEMBER];
189 } SISeg;
190
191 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
192
193
194 static LocalTransactionId nextLocalTransactionId;
195
196 static void CleanupInvalidationState(int status, Datum arg);
197
198
199 /*
200 * SInvalShmemSize --- return shared-memory space needed
201 */
202 Size
SInvalShmemSize(void)203 SInvalShmemSize(void)
204 {
205 Size size;
206
207 size = offsetof(SISeg, procState);
208 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
209
210 return size;
211 }
212
213 /*
214 * CreateSharedInvalidationState
215 * Create and initialize the SI message buffer
216 */
217 void
CreateSharedInvalidationState(void)218 CreateSharedInvalidationState(void)
219 {
220 int i;
221 bool found;
222
223 /* Allocate space in shared memory */
224 shmInvalBuffer = (SISeg *)
225 ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
226 if (found)
227 return;
228
229 /* Clear message counters, save size of procState array, init spinlock */
230 shmInvalBuffer->minMsgNum = 0;
231 shmInvalBuffer->maxMsgNum = 0;
232 shmInvalBuffer->nextThreshold = CLEANUP_MIN;
233 shmInvalBuffer->lastBackend = 0;
234 shmInvalBuffer->maxBackends = MaxBackends;
235 SpinLockInit(&shmInvalBuffer->msgnumLock);
236
237 /* The buffer[] array is initially all unused, so we need not fill it */
238
239 /* Mark all backends inactive, and initialize nextLXID */
240 for (i = 0; i < shmInvalBuffer->maxBackends; i++)
241 {
242 shmInvalBuffer->procState[i].procPid = 0; /* inactive */
243 shmInvalBuffer->procState[i].proc = NULL;
244 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
245 shmInvalBuffer->procState[i].resetState = false;
246 shmInvalBuffer->procState[i].signaled = false;
247 shmInvalBuffer->procState[i].hasMessages = false;
248 shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
249 }
250 }
251
252 /*
253 * SharedInvalBackendInit
254 * Initialize a new backend to operate on the sinval buffer
255 */
256 void
SharedInvalBackendInit(bool sendOnly)257 SharedInvalBackendInit(bool sendOnly)
258 {
259 int index;
260 ProcState *stateP = NULL;
261 SISeg *segP = shmInvalBuffer;
262
263 /*
264 * This can run in parallel with read operations, but not with write
265 * operations, since SIInsertDataEntries relies on lastBackend to set
266 * hasMessages appropriately.
267 */
268 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
269
270 /* Look for a free entry in the procState array */
271 for (index = 0; index < segP->lastBackend; index++)
272 {
273 if (segP->procState[index].procPid == 0) /* inactive slot? */
274 {
275 stateP = &segP->procState[index];
276 break;
277 }
278 }
279
280 if (stateP == NULL)
281 {
282 if (segP->lastBackend < segP->maxBackends)
283 {
284 stateP = &segP->procState[segP->lastBackend];
285 Assert(stateP->procPid == 0);
286 segP->lastBackend++;
287 }
288 else
289 {
290 /*
291 * out of procState slots: MaxBackends exceeded -- report normally
292 */
293 MyBackendId = InvalidBackendId;
294 LWLockRelease(SInvalWriteLock);
295 ereport(FATAL,
296 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
297 errmsg("sorry, too many clients already")));
298 }
299 }
300
301 MyBackendId = (stateP - &segP->procState[0]) + 1;
302
303 /* Advertise assigned backend ID in MyProc */
304 MyProc->backendId = MyBackendId;
305
306 /* Fetch next local transaction ID into local memory */
307 nextLocalTransactionId = stateP->nextLXID;
308
309 /* mark myself active, with all extant messages already read */
310 stateP->procPid = MyProcPid;
311 stateP->proc = MyProc;
312 stateP->nextMsgNum = segP->maxMsgNum;
313 stateP->resetState = false;
314 stateP->signaled = false;
315 stateP->hasMessages = false;
316 stateP->sendOnly = sendOnly;
317
318 LWLockRelease(SInvalWriteLock);
319
320 /* register exit routine to mark my entry inactive at exit */
321 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
322
323 elog(DEBUG4, "my backend ID is %d", MyBackendId);
324 }
325
326 /*
327 * CleanupInvalidationState
328 * Mark the current backend as no longer active.
329 *
330 * This function is called via on_shmem_exit() during backend shutdown.
331 *
332 * arg is really of type "SISeg*".
333 */
334 static void
CleanupInvalidationState(int status,Datum arg)335 CleanupInvalidationState(int status, Datum arg)
336 {
337 SISeg *segP = (SISeg *) DatumGetPointer(arg);
338 ProcState *stateP;
339 int i;
340
341 Assert(PointerIsValid(segP));
342
343 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
344
345 stateP = &segP->procState[MyBackendId - 1];
346
347 /* Update next local transaction ID for next holder of this backendID */
348 stateP->nextLXID = nextLocalTransactionId;
349
350 /* Mark myself inactive */
351 stateP->procPid = 0;
352 stateP->proc = NULL;
353 stateP->nextMsgNum = 0;
354 stateP->resetState = false;
355 stateP->signaled = false;
356
357 /* Recompute index of last active backend */
358 for (i = segP->lastBackend; i > 0; i--)
359 {
360 if (segP->procState[i - 1].procPid != 0)
361 break;
362 }
363 segP->lastBackend = i;
364
365 LWLockRelease(SInvalWriteLock);
366 }
367
368 /*
369 * BackendIdGetProc
370 * Get the PGPROC structure for a backend, given the backend ID.
371 * The result may be out of date arbitrarily quickly, so the caller
372 * must be careful about how this information is used. NULL is
373 * returned if the backend is not active.
374 */
375 PGPROC *
BackendIdGetProc(int backendID)376 BackendIdGetProc(int backendID)
377 {
378 PGPROC *result = NULL;
379 SISeg *segP = shmInvalBuffer;
380
381 /* Need to lock out additions/removals of backends */
382 LWLockAcquire(SInvalWriteLock, LW_SHARED);
383
384 if (backendID > 0 && backendID <= segP->lastBackend)
385 {
386 ProcState *stateP = &segP->procState[backendID - 1];
387
388 result = stateP->proc;
389 }
390
391 LWLockRelease(SInvalWriteLock);
392
393 return result;
394 }
395
396 /*
397 * BackendIdGetTransactionIds
398 * Get the xid and xmin of the backend. The result may be out of date
399 * arbitrarily quickly, so the caller must be careful about how this
400 * information is used.
401 */
402 void
BackendIdGetTransactionIds(int backendID,TransactionId * xid,TransactionId * xmin)403 BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
404 {
405 SISeg *segP = shmInvalBuffer;
406
407 *xid = InvalidTransactionId;
408 *xmin = InvalidTransactionId;
409
410 /* Need to lock out additions/removals of backends */
411 LWLockAcquire(SInvalWriteLock, LW_SHARED);
412
413 if (backendID > 0 && backendID <= segP->lastBackend)
414 {
415 ProcState *stateP = &segP->procState[backendID - 1];
416 PGPROC *proc = stateP->proc;
417
418 if (proc != NULL)
419 {
420 PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
421
422 *xid = xact->xid;
423 *xmin = xact->xmin;
424 }
425 }
426
427 LWLockRelease(SInvalWriteLock);
428 }
429
430 /*
431 * SIInsertDataEntries
432 * Add new invalidation message(s) to the buffer.
433 */
434 void
SIInsertDataEntries(const SharedInvalidationMessage * data,int n)435 SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
436 {
437 SISeg *segP = shmInvalBuffer;
438
439 /*
440 * N can be arbitrarily large. We divide the work into groups of no more
441 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
442 * an unreasonably long time. (This is not so much because we care about
443 * letting in other writers, as that some just-caught-up backend might be
444 * trying to do SICleanupQueue to pass on its signal, and we don't want it
445 * to have to wait a long time.) Also, we need to consider calling
446 * SICleanupQueue every so often.
447 */
448 while (n > 0)
449 {
450 int nthistime = Min(n, WRITE_QUANTUM);
451 int numMsgs;
452 int max;
453 int i;
454
455 n -= nthistime;
456
457 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
458
459 /*
460 * If the buffer is full, we *must* acquire some space. Clean the
461 * queue and reset anyone who is preventing space from being freed.
462 * Otherwise, clean the queue only when it's exceeded the next
463 * fullness threshold. We have to loop and recheck the buffer state
464 * after any call of SICleanupQueue.
465 */
466 for (;;)
467 {
468 numMsgs = segP->maxMsgNum - segP->minMsgNum;
469 if (numMsgs + nthistime > MAXNUMMESSAGES ||
470 numMsgs >= segP->nextThreshold)
471 SICleanupQueue(true, nthistime);
472 else
473 break;
474 }
475
476 /*
477 * Insert new message(s) into proper slot of circular buffer
478 */
479 max = segP->maxMsgNum;
480 while (nthistime-- > 0)
481 {
482 segP->buffer[max % MAXNUMMESSAGES] = *data++;
483 max++;
484 }
485
486 /* Update current value of maxMsgNum using spinlock */
487 SpinLockAcquire(&segP->msgnumLock);
488 segP->maxMsgNum = max;
489 SpinLockRelease(&segP->msgnumLock);
490
491 /*
492 * Now that the maxMsgNum change is globally visible, we give everyone
493 * a swift kick to make sure they read the newly added messages.
494 * Releasing SInvalWriteLock will enforce a full memory barrier, so
495 * these (unlocked) changes will be committed to memory before we exit
496 * the function.
497 */
498 for (i = 0; i < segP->lastBackend; i++)
499 {
500 ProcState *stateP = &segP->procState[i];
501
502 stateP->hasMessages = true;
503 }
504
505 LWLockRelease(SInvalWriteLock);
506 }
507 }
508
509 /*
510 * SIGetDataEntries
511 * get next SI message(s) for current backend, if there are any
512 *
513 * Possible return values:
514 * 0: no SI message available
515 * n>0: next n SI messages have been extracted into data[]
516 * -1: SI reset message extracted
517 *
518 * If the return value is less than the array size "datasize", the caller
519 * can assume that there are no more SI messages after the one(s) returned.
520 * Otherwise, another call is needed to collect more messages.
521 *
522 * NB: this can run in parallel with other instances of SIGetDataEntries
523 * executing on behalf of other backends, since each instance will modify only
524 * fields of its own backend's ProcState, and no instance will look at fields
525 * of other backends' ProcStates. We express this by grabbing SInvalReadLock
526 * in shared mode. Note that this is not exactly the normal (read-only)
527 * interpretation of a shared lock! Look closely at the interactions before
528 * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
529 *
530 * NB: this can also run in parallel with SIInsertDataEntries. It is not
531 * guaranteed that we will return any messages added after the routine is
532 * entered.
533 *
534 * Note: we assume that "datasize" is not so large that it might be important
535 * to break our hold on SInvalReadLock into segments.
536 */
537 int
SIGetDataEntries(SharedInvalidationMessage * data,int datasize)538 SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
539 {
540 SISeg *segP;
541 ProcState *stateP;
542 int max;
543 int n;
544
545 segP = shmInvalBuffer;
546 stateP = &segP->procState[MyBackendId - 1];
547
548 /*
549 * Before starting to take locks, do a quick, unlocked test to see whether
550 * there can possibly be anything to read. On a multiprocessor system,
551 * it's possible that this load could migrate backwards and occur before
552 * we actually enter this function, so we might miss a sinval message that
553 * was just added by some other processor. But they can't migrate
554 * backwards over a preceding lock acquisition, so it should be OK. If we
555 * haven't acquired a lock preventing against further relevant
556 * invalidations, any such occurrence is not much different than if the
557 * invalidation had arrived slightly later in the first place.
558 */
559 if (!stateP->hasMessages)
560 return 0;
561
562 LWLockAcquire(SInvalReadLock, LW_SHARED);
563
564 /*
565 * We must reset hasMessages before determining how many messages we're
566 * going to read. That way, if new messages arrive after we have
567 * determined how many we're reading, the flag will get reset and we'll
568 * notice those messages part-way through.
569 *
570 * Note that, if we don't end up reading all of the messages, we had
571 * better be certain to reset this flag before exiting!
572 */
573 stateP->hasMessages = false;
574
575 /* Fetch current value of maxMsgNum using spinlock */
576 SpinLockAcquire(&segP->msgnumLock);
577 max = segP->maxMsgNum;
578 SpinLockRelease(&segP->msgnumLock);
579
580 if (stateP->resetState)
581 {
582 /*
583 * Force reset. We can say we have dealt with any messages added
584 * since the reset, as well; and that means we should clear the
585 * signaled flag, too.
586 */
587 stateP->nextMsgNum = max;
588 stateP->resetState = false;
589 stateP->signaled = false;
590 LWLockRelease(SInvalReadLock);
591 return -1;
592 }
593
594 /*
595 * Retrieve messages and advance backend's counter, until data array is
596 * full or there are no more messages.
597 *
598 * There may be other backends that haven't read the message(s), so we
599 * cannot delete them here. SICleanupQueue() will eventually remove them
600 * from the queue.
601 */
602 n = 0;
603 while (n < datasize && stateP->nextMsgNum < max)
604 {
605 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
606 stateP->nextMsgNum++;
607 }
608
609 /*
610 * If we have caught up completely, reset our "signaled" flag so that
611 * we'll get another signal if we fall behind again.
612 *
613 * If we haven't caught up completely, reset the hasMessages flag so that
614 * we see the remaining messages next time.
615 */
616 if (stateP->nextMsgNum >= max)
617 stateP->signaled = false;
618 else
619 stateP->hasMessages = true;
620
621 LWLockRelease(SInvalReadLock);
622 return n;
623 }
624
625 /*
626 * SICleanupQueue
627 * Remove messages that have been consumed by all active backends
628 *
629 * callerHasWriteLock is true if caller is holding SInvalWriteLock.
630 * minFree is the minimum number of message slots to make free.
631 *
632 * Possible side effects of this routine include marking one or more
633 * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
634 * to some backend that seems to be getting too far behind. We signal at
635 * most one backend at a time, for reasons explained at the top of the file.
636 *
637 * Caution: because we transiently release write lock when we have to signal
638 * some other backend, it is NOT guaranteed that there are still minFree
639 * free message slots at exit. Caller must recheck and perhaps retry.
640 */
641 void
SICleanupQueue(bool callerHasWriteLock,int minFree)642 SICleanupQueue(bool callerHasWriteLock, int minFree)
643 {
644 SISeg *segP = shmInvalBuffer;
645 int min,
646 minsig,
647 lowbound,
648 numMsgs,
649 i;
650 ProcState *needSig = NULL;
651
652 /* Lock out all writers and readers */
653 if (!callerHasWriteLock)
654 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
655 LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
656
657 /*
658 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
659 * furthest-back backend that needs signaling (if any), and reset any
660 * backends that are too far back. Note that because we ignore sendOnly
661 * backends here it is possible for them to keep sending messages without
662 * a problem even when they are the only active backend.
663 */
664 min = segP->maxMsgNum;
665 minsig = min - SIG_THRESHOLD;
666 lowbound = min - MAXNUMMESSAGES + minFree;
667
668 for (i = 0; i < segP->lastBackend; i++)
669 {
670 ProcState *stateP = &segP->procState[i];
671 int n = stateP->nextMsgNum;
672
673 /* Ignore if inactive or already in reset state */
674 if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
675 continue;
676
677 /*
678 * If we must free some space and this backend is preventing it, force
679 * him into reset state and then ignore until he catches up.
680 */
681 if (n < lowbound)
682 {
683 stateP->resetState = true;
684 /* no point in signaling him ... */
685 continue;
686 }
687
688 /* Track the global minimum nextMsgNum */
689 if (n < min)
690 min = n;
691
692 /* Also see who's furthest back of the unsignaled backends */
693 if (n < minsig && !stateP->signaled)
694 {
695 minsig = n;
696 needSig = stateP;
697 }
698 }
699 segP->minMsgNum = min;
700
701 /*
702 * When minMsgNum gets really large, decrement all message counters so as
703 * to forestall overflow of the counters. This happens seldom enough that
704 * folding it into the previous loop would be a loser.
705 */
706 if (min >= MSGNUMWRAPAROUND)
707 {
708 segP->minMsgNum -= MSGNUMWRAPAROUND;
709 segP->maxMsgNum -= MSGNUMWRAPAROUND;
710 for (i = 0; i < segP->lastBackend; i++)
711 {
712 /* we don't bother skipping inactive entries here */
713 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
714 }
715 }
716
717 /*
718 * Determine how many messages are still in the queue, and set the
719 * threshold at which we should repeat SICleanupQueue().
720 */
721 numMsgs = segP->maxMsgNum - segP->minMsgNum;
722 if (numMsgs < CLEANUP_MIN)
723 segP->nextThreshold = CLEANUP_MIN;
724 else
725 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
726
727 /*
728 * Lastly, signal anyone who needs a catchup interrupt. Since
729 * SendProcSignal() might not be fast, we don't want to hold locks while
730 * executing it.
731 */
732 if (needSig)
733 {
734 pid_t his_pid = needSig->procPid;
735 BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
736
737 needSig->signaled = true;
738 LWLockRelease(SInvalReadLock);
739 LWLockRelease(SInvalWriteLock);
740 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
741 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
742 if (callerHasWriteLock)
743 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
744 }
745 else
746 {
747 LWLockRelease(SInvalReadLock);
748 if (!callerHasWriteLock)
749 LWLockRelease(SInvalWriteLock);
750 }
751 }
752
753
754 /*
755 * GetNextLocalTransactionId --- allocate a new LocalTransactionId
756 *
757 * We split VirtualTransactionIds into two parts so that it is possible
758 * to allocate a new one without any contention for shared memory, except
759 * for a bit of additional overhead during backend startup/shutdown.
760 * The high-order part of a VirtualTransactionId is a BackendId, and the
761 * low-order part is a LocalTransactionId, which we assign from a local
762 * counter. To avoid the risk of a VirtualTransactionId being reused
763 * within a short interval, successive procs occupying the same backend ID
764 * slot should use a consecutive sequence of local IDs, which is implemented
765 * by copying nextLocalTransactionId as seen above.
766 */
767 LocalTransactionId
GetNextLocalTransactionId(void)768 GetNextLocalTransactionId(void)
769 {
770 LocalTransactionId result;
771
772 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
773 do
774 {
775 result = nextLocalTransactionId++;
776 } while (!LocalTransactionIdIsValid(result));
777
778 return result;
779 }
780