1 /*------------------------------------------------------------------------- 2 * 3 * sinvaladt.c 4 * POSTGRES shared cache invalidation data manager. 5 * 6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/storage/ipc/sinvaladt.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 #include "postgres.h" 16 17 #include <signal.h> 18 #include <unistd.h> 19 20 #include "miscadmin.h" 21 #include "storage/backendid.h" 22 #include "storage/ipc.h" 23 #include "storage/proc.h" 24 #include "storage/procsignal.h" 25 #include "storage/shmem.h" 26 #include "storage/sinvaladt.h" 27 #include "storage/spin.h" 28 #include "access/transam.h" 29 30 31 /* 32 * Conceptually, the shared cache invalidation messages are stored in an 33 * infinite array, where maxMsgNum is the next array subscript to store a 34 * submitted message in, minMsgNum is the smallest array subscript containing 35 * a message not yet read by all backends, and we always have maxMsgNum >= 36 * minMsgNum. (They are equal when there are no messages pending.) For each 37 * active backend, there is a nextMsgNum pointer indicating the next message it 38 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every 39 * backend. 40 * 41 * (In the current implementation, minMsgNum is a lower bound for the 42 * per-process nextMsgNum values, but it isn't rigorously kept equal to the 43 * smallest nextMsgNum --- it may lag behind. We only update it when 44 * SICleanupQueue is called, and we try not to do that often.) 45 * 46 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES 47 * entries. We translate MsgNum values into circular-buffer indexes by 48 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as 49 * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum 50 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space 51 * in the buffer. If the buffer does overflow, we recover by setting the 52 * "reset" flag for each backend that has fallen too far behind. A backend 53 * that is in "reset" state is ignored while determining minMsgNum. When 54 * it does finally attempt to receive inval messages, it must discard all 55 * its invalidatable state, since it won't know what it missed. 56 * 57 * To reduce the probability of needing resets, we send a "catchup" interrupt 58 * to any backend that seems to be falling unreasonably far behind. The 59 * normal behavior is that at most one such interrupt is in flight at a time; 60 * when a backend completes processing a catchup interrupt, it executes 61 * SICleanupQueue, which will signal the next-furthest-behind backend if 62 * needed. This avoids undue contention from multiple backends all trying 63 * to catch up at once. However, the furthest-back backend might be stuck 64 * in a state where it can't catch up. Eventually it will get reset, so it 65 * won't cause any more problems for anyone but itself. But we don't want 66 * to find that a bunch of other backends are now too close to the reset 67 * threshold to be saved. So SICleanupQueue is designed to occasionally 68 * send extra catchup interrupts as the queue gets fuller, to backends that 69 * are far behind and haven't gotten one yet. As long as there aren't a lot 70 * of "stuck" backends, we won't need a lot of extra interrupts, since ones 71 * that aren't stuck will propagate their interrupts to the next guy. 72 * 73 * We would have problems if the MsgNum values overflow an integer, so 74 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND 75 * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be 76 * large so that we don't need to do this often. It must be a multiple of 77 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need 78 * to be moved when we do it. 79 * 80 * Access to the shared sinval array is protected by two locks, SInvalReadLock 81 * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this 82 * authorizes them to modify their own ProcState but not to modify or even 83 * look at anyone else's. When we need to perform array-wide updates, 84 * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to 85 * lock out all readers. Writers take SInvalWriteLock (always in exclusive 86 * mode) to serialize adding messages to the queue. Note that a writer 87 * can operate in parallel with one or more readers, because the writer 88 * has no need to touch anyone's ProcState, except in the infrequent cases 89 * when SICleanupQueue is needed. The only point of overlap is that 90 * the writer wants to change maxMsgNum while readers need to read it. 91 * We deal with that by having a spinlock that readers must take for just 92 * long enough to read maxMsgNum, while writers take it for just long enough 93 * to write maxMsgNum. (The exact rule is that you need the spinlock to 94 * read maxMsgNum if you are not holding SInvalWriteLock, and you need the 95 * spinlock to write maxMsgNum unless you are holding both locks.) 96 * 97 * Note: since maxMsgNum is an int and hence presumably atomically readable/ 98 * writable, the spinlock might seem unnecessary. The reason it is needed 99 * is to provide a memory barrier: we need to be sure that messages written 100 * to the array are actually there before maxMsgNum is increased, and that 101 * readers will see that data after fetching maxMsgNum. Multiprocessors 102 * that have weak memory-ordering guarantees can fail without the memory 103 * barrier instructions that are included in the spinlock sequences. 104 */ 105 106 107 /* 108 * Configurable parameters. 109 * 110 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. 111 * Must be a power of 2 for speed. 112 * 113 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. 114 * Must be a multiple of MAXNUMMESSAGES. Should be large. 115 * 116 * CLEANUP_MIN: the minimum number of messages that must be in the buffer 117 * before we bother to call SICleanupQueue. 118 * 119 * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once 120 * we exceed CLEANUP_MIN. Should be a power of 2 for speed. 121 * 122 * SIG_THRESHOLD: the minimum number of messages a backend must have fallen 123 * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT. 124 * 125 * WRITE_QUANTUM: the max number of messages to push into the buffer per 126 * iteration of SIInsertDataEntries. Noncritical but should be less than 127 * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once 128 * per iteration. 129 */ 130 131 #define MAXNUMMESSAGES 4096 132 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) 133 #define CLEANUP_MIN (MAXNUMMESSAGES / 2) 134 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) 135 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2) 136 #define WRITE_QUANTUM 64 137 138 /* Per-backend state in shared invalidation structure */ 139 typedef struct ProcState 140 { 141 /* procPid is zero in an inactive ProcState array entry. */ 142 pid_t procPid; /* PID of backend, for signaling */ 143 PGPROC *proc; /* PGPROC of backend */ 144 /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ 145 int nextMsgNum; /* next message number to read */ 146 bool resetState; /* backend needs to reset its state */ 147 bool signaled; /* backend has been sent catchup signal */ 148 bool hasMessages; /* backend has unread messages */ 149 150 /* 151 * Backend only sends invalidations, never receives them. This only makes 152 * sense for Startup process during recovery because it doesn't maintain a 153 * relcache, yet it fires inval messages to allow query backends to see 154 * schema changes. 155 */ 156 bool sendOnly; /* backend only sends, never receives */ 157 158 /* 159 * Next LocalTransactionId to use for each idle backend slot. We keep 160 * this here because it is indexed by BackendId and it is convenient to 161 * copy the value to and from local memory when MyBackendId is set. It's 162 * meaningless in an active ProcState entry. 163 */ 164 LocalTransactionId nextLXID; 165 } ProcState; 166 167 /* Shared cache invalidation memory segment */ 168 typedef struct SISeg 169 { 170 /* 171 * General state information 172 */ 173 int minMsgNum; /* oldest message still needed */ 174 int maxMsgNum; /* next message number to be assigned */ 175 int nextThreshold; /* # of messages to call SICleanupQueue */ 176 int lastBackend; /* index of last active procState entry, +1 */ 177 int maxBackends; /* size of procState array */ 178 179 slock_t msgnumLock; /* spinlock protecting maxMsgNum */ 180 181 /* 182 * Circular buffer holding shared-inval messages 183 */ 184 SharedInvalidationMessage buffer[MAXNUMMESSAGES]; 185 186 /* 187 * Per-backend invalidation state info (has MaxBackends entries). 188 */ 189 ProcState procState[FLEXIBLE_ARRAY_MEMBER]; 190 } SISeg; 191 192 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ 193 194 195 static LocalTransactionId nextLocalTransactionId; 196 197 static void CleanupInvalidationState(int status, Datum arg); 198 199 200 /* 201 * SInvalShmemSize --- return shared-memory space needed 202 */ 203 Size 204 SInvalShmemSize(void) 205 { 206 Size size; 207 208 size = offsetof(SISeg, procState); 209 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); 210 211 return size; 212 } 213 214 /* 215 * CreateSharedInvalidationState 216 * Create and initialize the SI message buffer 217 */ 218 void 219 CreateSharedInvalidationState(void) 220 { 221 int i; 222 bool found; 223 224 /* Allocate space in shared memory */ 225 shmInvalBuffer = (SISeg *) 226 ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found); 227 if (found) 228 return; 229 230 /* Clear message counters, save size of procState array, init spinlock */ 231 shmInvalBuffer->minMsgNum = 0; 232 shmInvalBuffer->maxMsgNum = 0; 233 shmInvalBuffer->nextThreshold = CLEANUP_MIN; 234 shmInvalBuffer->lastBackend = 0; 235 shmInvalBuffer->maxBackends = MaxBackends; 236 SpinLockInit(&shmInvalBuffer->msgnumLock); 237 238 /* The buffer[] array is initially all unused, so we need not fill it */ 239 240 /* Mark all backends inactive, and initialize nextLXID */ 241 for (i = 0; i < shmInvalBuffer->maxBackends; i++) 242 { 243 shmInvalBuffer->procState[i].procPid = 0; /* inactive */ 244 shmInvalBuffer->procState[i].proc = NULL; 245 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ 246 shmInvalBuffer->procState[i].resetState = false; 247 shmInvalBuffer->procState[i].signaled = false; 248 shmInvalBuffer->procState[i].hasMessages = false; 249 shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; 250 } 251 } 252 253 /* 254 * SharedInvalBackendInit 255 * Initialize a new backend to operate on the sinval buffer 256 */ 257 void 258 SharedInvalBackendInit(bool sendOnly) 259 { 260 int index; 261 ProcState *stateP = NULL; 262 SISeg *segP = shmInvalBuffer; 263 264 /* 265 * This can run in parallel with read operations, but not with write 266 * operations, since SIInsertDataEntries relies on lastBackend to set 267 * hasMessages appropriately. 268 */ 269 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 270 271 /* Look for a free entry in the procState array */ 272 for (index = 0; index < segP->lastBackend; index++) 273 { 274 if (segP->procState[index].procPid == 0) /* inactive slot? */ 275 { 276 stateP = &segP->procState[index]; 277 break; 278 } 279 } 280 281 if (stateP == NULL) 282 { 283 if (segP->lastBackend < segP->maxBackends) 284 { 285 stateP = &segP->procState[segP->lastBackend]; 286 Assert(stateP->procPid == 0); 287 segP->lastBackend++; 288 } 289 else 290 { 291 /* 292 * out of procState slots: MaxBackends exceeded -- report normally 293 */ 294 MyBackendId = InvalidBackendId; 295 LWLockRelease(SInvalWriteLock); 296 ereport(FATAL, 297 (errcode(ERRCODE_TOO_MANY_CONNECTIONS), 298 errmsg("sorry, too many clients already"))); 299 } 300 } 301 302 MyBackendId = (stateP - &segP->procState[0]) + 1; 303 304 /* Advertise assigned backend ID in MyProc */ 305 MyProc->backendId = MyBackendId; 306 307 /* Fetch next local transaction ID into local memory */ 308 nextLocalTransactionId = stateP->nextLXID; 309 310 /* mark myself active, with all extant messages already read */ 311 stateP->procPid = MyProcPid; 312 stateP->proc = MyProc; 313 stateP->nextMsgNum = segP->maxMsgNum; 314 stateP->resetState = false; 315 stateP->signaled = false; 316 stateP->hasMessages = false; 317 stateP->sendOnly = sendOnly; 318 319 LWLockRelease(SInvalWriteLock); 320 321 /* register exit routine to mark my entry inactive at exit */ 322 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); 323 324 elog(DEBUG4, "my backend ID is %d", MyBackendId); 325 } 326 327 /* 328 * CleanupInvalidationState 329 * Mark the current backend as no longer active. 330 * 331 * This function is called via on_shmem_exit() during backend shutdown. 332 * 333 * arg is really of type "SISeg*". 334 */ 335 static void 336 CleanupInvalidationState(int status, Datum arg) 337 { 338 SISeg *segP = (SISeg *) DatumGetPointer(arg); 339 ProcState *stateP; 340 int i; 341 342 Assert(PointerIsValid(segP)); 343 344 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 345 346 stateP = &segP->procState[MyBackendId - 1]; 347 348 /* Update next local transaction ID for next holder of this backendID */ 349 stateP->nextLXID = nextLocalTransactionId; 350 351 /* Mark myself inactive */ 352 stateP->procPid = 0; 353 stateP->proc = NULL; 354 stateP->nextMsgNum = 0; 355 stateP->resetState = false; 356 stateP->signaled = false; 357 358 /* Recompute index of last active backend */ 359 for (i = segP->lastBackend; i > 0; i--) 360 { 361 if (segP->procState[i - 1].procPid != 0) 362 break; 363 } 364 segP->lastBackend = i; 365 366 LWLockRelease(SInvalWriteLock); 367 } 368 369 /* 370 * BackendIdGetProc 371 * Get the PGPROC structure for a backend, given the backend ID. 372 * The result may be out of date arbitrarily quickly, so the caller 373 * must be careful about how this information is used. NULL is 374 * returned if the backend is not active. 375 */ 376 PGPROC * 377 BackendIdGetProc(int backendID) 378 { 379 PGPROC *result = NULL; 380 SISeg *segP = shmInvalBuffer; 381 382 /* Need to lock out additions/removals of backends */ 383 LWLockAcquire(SInvalWriteLock, LW_SHARED); 384 385 if (backendID > 0 && backendID <= segP->lastBackend) 386 { 387 ProcState *stateP = &segP->procState[backendID - 1]; 388 389 result = stateP->proc; 390 } 391 392 LWLockRelease(SInvalWriteLock); 393 394 return result; 395 } 396 397 /* 398 * BackendIdGetTransactionIds 399 * Get the xid and xmin of the backend. The result may be out of date 400 * arbitrarily quickly, so the caller must be careful about how this 401 * information is used. 402 */ 403 void 404 BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin) 405 { 406 SISeg *segP = shmInvalBuffer; 407 408 *xid = InvalidTransactionId; 409 *xmin = InvalidTransactionId; 410 411 /* Need to lock out additions/removals of backends */ 412 LWLockAcquire(SInvalWriteLock, LW_SHARED); 413 414 if (backendID > 0 && backendID <= segP->lastBackend) 415 { 416 ProcState *stateP = &segP->procState[backendID - 1]; 417 PGPROC *proc = stateP->proc; 418 419 if (proc != NULL) 420 { 421 PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno]; 422 423 *xid = xact->xid; 424 *xmin = xact->xmin; 425 } 426 } 427 428 LWLockRelease(SInvalWriteLock); 429 } 430 431 /* 432 * SIInsertDataEntries 433 * Add new invalidation message(s) to the buffer. 434 */ 435 void 436 SIInsertDataEntries(const SharedInvalidationMessage *data, int n) 437 { 438 SISeg *segP = shmInvalBuffer; 439 440 /* 441 * N can be arbitrarily large. We divide the work into groups of no more 442 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for 443 * an unreasonably long time. (This is not so much because we care about 444 * letting in other writers, as that some just-caught-up backend might be 445 * trying to do SICleanupQueue to pass on its signal, and we don't want it 446 * to have to wait a long time.) Also, we need to consider calling 447 * SICleanupQueue every so often. 448 */ 449 while (n > 0) 450 { 451 int nthistime = Min(n, WRITE_QUANTUM); 452 int numMsgs; 453 int max; 454 int i; 455 456 n -= nthistime; 457 458 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 459 460 /* 461 * If the buffer is full, we *must* acquire some space. Clean the 462 * queue and reset anyone who is preventing space from being freed. 463 * Otherwise, clean the queue only when it's exceeded the next 464 * fullness threshold. We have to loop and recheck the buffer state 465 * after any call of SICleanupQueue. 466 */ 467 for (;;) 468 { 469 numMsgs = segP->maxMsgNum - segP->minMsgNum; 470 if (numMsgs + nthistime > MAXNUMMESSAGES || 471 numMsgs >= segP->nextThreshold) 472 SICleanupQueue(true, nthistime); 473 else 474 break; 475 } 476 477 /* 478 * Insert new message(s) into proper slot of circular buffer 479 */ 480 max = segP->maxMsgNum; 481 while (nthistime-- > 0) 482 { 483 segP->buffer[max % MAXNUMMESSAGES] = *data++; 484 max++; 485 } 486 487 /* Update current value of maxMsgNum using spinlock */ 488 SpinLockAcquire(&segP->msgnumLock); 489 segP->maxMsgNum = max; 490 SpinLockRelease(&segP->msgnumLock); 491 492 /* 493 * Now that the maxMsgNum change is globally visible, we give everyone 494 * a swift kick to make sure they read the newly added messages. 495 * Releasing SInvalWriteLock will enforce a full memory barrier, so 496 * these (unlocked) changes will be committed to memory before we exit 497 * the function. 498 */ 499 for (i = 0; i < segP->lastBackend; i++) 500 { 501 ProcState *stateP = &segP->procState[i]; 502 503 stateP->hasMessages = true; 504 } 505 506 LWLockRelease(SInvalWriteLock); 507 } 508 } 509 510 /* 511 * SIGetDataEntries 512 * get next SI message(s) for current backend, if there are any 513 * 514 * Possible return values: 515 * 0: no SI message available 516 * n>0: next n SI messages have been extracted into data[] 517 * -1: SI reset message extracted 518 * 519 * If the return value is less than the array size "datasize", the caller 520 * can assume that there are no more SI messages after the one(s) returned. 521 * Otherwise, another call is needed to collect more messages. 522 * 523 * NB: this can run in parallel with other instances of SIGetDataEntries 524 * executing on behalf of other backends, since each instance will modify only 525 * fields of its own backend's ProcState, and no instance will look at fields 526 * of other backends' ProcStates. We express this by grabbing SInvalReadLock 527 * in shared mode. Note that this is not exactly the normal (read-only) 528 * interpretation of a shared lock! Look closely at the interactions before 529 * allowing SInvalReadLock to be grabbed in shared mode for any other reason! 530 * 531 * NB: this can also run in parallel with SIInsertDataEntries. It is not 532 * guaranteed that we will return any messages added after the routine is 533 * entered. 534 * 535 * Note: we assume that "datasize" is not so large that it might be important 536 * to break our hold on SInvalReadLock into segments. 537 */ 538 int 539 SIGetDataEntries(SharedInvalidationMessage *data, int datasize) 540 { 541 SISeg *segP; 542 ProcState *stateP; 543 int max; 544 int n; 545 546 segP = shmInvalBuffer; 547 stateP = &segP->procState[MyBackendId - 1]; 548 549 /* 550 * Before starting to take locks, do a quick, unlocked test to see whether 551 * there can possibly be anything to read. On a multiprocessor system, 552 * it's possible that this load could migrate backwards and occur before 553 * we actually enter this function, so we might miss a sinval message that 554 * was just added by some other processor. But they can't migrate 555 * backwards over a preceding lock acquisition, so it should be OK. If we 556 * haven't acquired a lock preventing against further relevant 557 * invalidations, any such occurrence is not much different than if the 558 * invalidation had arrived slightly later in the first place. 559 */ 560 if (!stateP->hasMessages) 561 return 0; 562 563 LWLockAcquire(SInvalReadLock, LW_SHARED); 564 565 /* 566 * We must reset hasMessages before determining how many messages we're 567 * going to read. That way, if new messages arrive after we have 568 * determined how many we're reading, the flag will get reset and we'll 569 * notice those messages part-way through. 570 * 571 * Note that, if we don't end up reading all of the messages, we had 572 * better be certain to reset this flag before exiting! 573 */ 574 stateP->hasMessages = false; 575 576 /* Fetch current value of maxMsgNum using spinlock */ 577 SpinLockAcquire(&segP->msgnumLock); 578 max = segP->maxMsgNum; 579 SpinLockRelease(&segP->msgnumLock); 580 581 if (stateP->resetState) 582 { 583 /* 584 * Force reset. We can say we have dealt with any messages added 585 * since the reset, as well; and that means we should clear the 586 * signaled flag, too. 587 */ 588 stateP->nextMsgNum = max; 589 stateP->resetState = false; 590 stateP->signaled = false; 591 LWLockRelease(SInvalReadLock); 592 return -1; 593 } 594 595 /* 596 * Retrieve messages and advance backend's counter, until data array is 597 * full or there are no more messages. 598 * 599 * There may be other backends that haven't read the message(s), so we 600 * cannot delete them here. SICleanupQueue() will eventually remove them 601 * from the queue. 602 */ 603 n = 0; 604 while (n < datasize && stateP->nextMsgNum < max) 605 { 606 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; 607 stateP->nextMsgNum++; 608 } 609 610 /* 611 * If we have caught up completely, reset our "signaled" flag so that 612 * we'll get another signal if we fall behind again. 613 * 614 * If we haven't caught up completely, reset the hasMessages flag so that 615 * we see the remaining messages next time. 616 */ 617 if (stateP->nextMsgNum >= max) 618 stateP->signaled = false; 619 else 620 stateP->hasMessages = true; 621 622 LWLockRelease(SInvalReadLock); 623 return n; 624 } 625 626 /* 627 * SICleanupQueue 628 * Remove messages that have been consumed by all active backends 629 * 630 * callerHasWriteLock is true if caller is holding SInvalWriteLock. 631 * minFree is the minimum number of message slots to make free. 632 * 633 * Possible side effects of this routine include marking one or more 634 * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT 635 * to some backend that seems to be getting too far behind. We signal at 636 * most one backend at a time, for reasons explained at the top of the file. 637 * 638 * Caution: because we transiently release write lock when we have to signal 639 * some other backend, it is NOT guaranteed that there are still minFree 640 * free message slots at exit. Caller must recheck and perhaps retry. 641 */ 642 void 643 SICleanupQueue(bool callerHasWriteLock, int minFree) 644 { 645 SISeg *segP = shmInvalBuffer; 646 int min, 647 minsig, 648 lowbound, 649 numMsgs, 650 i; 651 ProcState *needSig = NULL; 652 653 /* Lock out all writers and readers */ 654 if (!callerHasWriteLock) 655 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 656 LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); 657 658 /* 659 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the 660 * furthest-back backend that needs signaling (if any), and reset any 661 * backends that are too far back. Note that because we ignore sendOnly 662 * backends here it is possible for them to keep sending messages without 663 * a problem even when they are the only active backend. 664 */ 665 min = segP->maxMsgNum; 666 minsig = min - SIG_THRESHOLD; 667 lowbound = min - MAXNUMMESSAGES + minFree; 668 669 for (i = 0; i < segP->lastBackend; i++) 670 { 671 ProcState *stateP = &segP->procState[i]; 672 int n = stateP->nextMsgNum; 673 674 /* Ignore if inactive or already in reset state */ 675 if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) 676 continue; 677 678 /* 679 * If we must free some space and this backend is preventing it, force 680 * him into reset state and then ignore until he catches up. 681 */ 682 if (n < lowbound) 683 { 684 stateP->resetState = true; 685 /* no point in signaling him ... */ 686 continue; 687 } 688 689 /* Track the global minimum nextMsgNum */ 690 if (n < min) 691 min = n; 692 693 /* Also see who's furthest back of the unsignaled backends */ 694 if (n < minsig && !stateP->signaled) 695 { 696 minsig = n; 697 needSig = stateP; 698 } 699 } 700 segP->minMsgNum = min; 701 702 /* 703 * When minMsgNum gets really large, decrement all message counters so as 704 * to forestall overflow of the counters. This happens seldom enough that 705 * folding it into the previous loop would be a loser. 706 */ 707 if (min >= MSGNUMWRAPAROUND) 708 { 709 segP->minMsgNum -= MSGNUMWRAPAROUND; 710 segP->maxMsgNum -= MSGNUMWRAPAROUND; 711 for (i = 0; i < segP->lastBackend; i++) 712 { 713 /* we don't bother skipping inactive entries here */ 714 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; 715 } 716 } 717 718 /* 719 * Determine how many messages are still in the queue, and set the 720 * threshold at which we should repeat SICleanupQueue(). 721 */ 722 numMsgs = segP->maxMsgNum - segP->minMsgNum; 723 if (numMsgs < CLEANUP_MIN) 724 segP->nextThreshold = CLEANUP_MIN; 725 else 726 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; 727 728 /* 729 * Lastly, signal anyone who needs a catchup interrupt. Since 730 * SendProcSignal() might not be fast, we don't want to hold locks while 731 * executing it. 732 */ 733 if (needSig) 734 { 735 pid_t his_pid = needSig->procPid; 736 BackendId his_backendId = (needSig - &segP->procState[0]) + 1; 737 738 needSig->signaled = true; 739 LWLockRelease(SInvalReadLock); 740 LWLockRelease(SInvalWriteLock); 741 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); 742 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); 743 if (callerHasWriteLock) 744 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 745 } 746 else 747 { 748 LWLockRelease(SInvalReadLock); 749 if (!callerHasWriteLock) 750 LWLockRelease(SInvalWriteLock); 751 } 752 } 753 754 755 /* 756 * GetNextLocalTransactionId --- allocate a new LocalTransactionId 757 * 758 * We split VirtualTransactionIds into two parts so that it is possible 759 * to allocate a new one without any contention for shared memory, except 760 * for a bit of additional overhead during backend startup/shutdown. 761 * The high-order part of a VirtualTransactionId is a BackendId, and the 762 * low-order part is a LocalTransactionId, which we assign from a local 763 * counter. To avoid the risk of a VirtualTransactionId being reused 764 * within a short interval, successive procs occupying the same backend ID 765 * slot should use a consecutive sequence of local IDs, which is implemented 766 * by copying nextLocalTransactionId as seen above. 767 */ 768 LocalTransactionId 769 GetNextLocalTransactionId(void) 770 { 771 LocalTransactionId result; 772 773 /* loop to avoid returning InvalidLocalTransactionId at wraparound */ 774 do 775 { 776 result = nextLocalTransactionId++; 777 } while (!LocalTransactionIdIsValid(result)); 778 779 return result; 780 } 781