1 /*------------------------------------------------------------------------- 2 * 3 * lwlock.c 4 * Lightweight lock manager 5 * 6 * Lightweight locks are intended primarily to provide mutual exclusion of 7 * access to shared-memory data structures. Therefore, they offer both 8 * exclusive and shared lock modes (to support read/write and read-only 9 * access to a shared object). There are few other frammishes. User-level 10 * locking should be done with the full lock manager --- which depends on 11 * LWLocks to protect its shared state. 12 * 13 * In addition to exclusive and shared modes, lightweight locks can be used to 14 * wait until a variable changes value. The variable is initially not set 15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the 16 * value it was set to when the lock was released last, and can be updated 17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar 18 * waits for the variable to be updated, or until the lock is free. When 19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an 20 * appropriate value for a free lock. The meaning of the variable is up to 21 * the caller, the lightweight lock code just assigns and compares it. 22 * 23 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 24 * Portions Copyright (c) 1994, Regents of the University of California 25 * 26 * IDENTIFICATION 27 * src/backend/storage/lmgr/lwlock.c 28 * 29 * NOTES: 30 * 31 * This used to be a pretty straight forward reader-writer lock 32 * implementation, in which the internal state was protected by a 33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be 34 * too high for workloads/locks that were taken in shared mode very 35 * frequently. Often we were spinning in the (obviously exclusive) spinlock, 36 * while trying to acquire a shared lock that was actually free. 37 * 38 * Thus a new implementation was devised that provides wait-free shared lock 39 * acquisition for locks that aren't exclusively locked. 40 * 41 * The basic idea is to have a single atomic variable 'lockcount' instead of 42 * the formerly separate shared and exclusive counters and to use atomic 43 * operations to acquire the lock. That's fairly easy to do for plain 44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait 45 * in the OS. 46 * 47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount 48 * variable. For exclusive lock we swap in a sentinel value 49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders. 50 * 51 * To release the lock we use an atomic decrement to release the lock. If the 52 * new value is zero (we get that atomically), we know we can/have to release 53 * waiters. 54 * 55 * Obviously it is important that the sentinel value for exclusive locks 56 * doesn't conflict with the maximum number of possible share lockers - 57 * luckily MAX_BACKENDS makes that easily possible. 58 * 59 * 60 * The attentive reader might have noticed that naively doing the above has a 61 * glaring race condition: We try to lock using the atomic operations and 62 * notice that we have to wait. Unfortunately by the time we have finished 63 * queuing, the former locker very well might have already finished it's 64 * work. That's problematic because we're now stuck waiting inside the OS. 65 66 * To mitigate those races we use a two phased attempt at locking: 67 * Phase 1: Try to do it atomically, if we succeed, nice 68 * Phase 2: Add ourselves to the waitqueue of the lock 69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from 70 * the queue 71 * Phase 4: Sleep till wake-up, goto Phase 1 72 * 73 * This protects us against the problem from above as nobody can release too 74 * quick, before we're queued, since after Phase 2 we're already queued. 75 * ------------------------------------------------------------------------- 76 */ 77 #include "postgres.h" 78 79 #include "miscadmin.h" 80 #include "pgstat.h" 81 #include "pg_trace.h" 82 #include "postmaster/postmaster.h" 83 #include "replication/slot.h" 84 #include "storage/ipc.h" 85 #include "storage/predicate.h" 86 #include "storage/proc.h" 87 #include "storage/proclist.h" 88 #include "storage/spin.h" 89 #include "utils/memutils.h" 90 91 #ifdef LWLOCK_STATS 92 #include "utils/hsearch.h" 93 #endif 94 95 96 /* We use the ShmemLock spinlock to protect LWLockCounter */ 97 extern slock_t *ShmemLock; 98 99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30) 100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29) 101 #define LW_FLAG_LOCKED ((uint32) 1 << 28) 102 103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24) 104 #define LW_VAL_SHARED 1 105 106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1)) 107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */ 108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1)) 109 110 /* 111 * This is indexed by tranche ID and stores the names of all tranches known 112 * to the current backend. 113 */ 114 static const char **LWLockTrancheArray = NULL; 115 static int LWLockTranchesAllocated = 0; 116 117 #define T_NAME(lock) \ 118 (LWLockTrancheArray[(lock)->tranche]) 119 120 /* 121 * This points to the main array of LWLocks in shared memory. Backends inherit 122 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case, 123 * where we have special measures to pass it down). 124 */ 125 LWLockPadded *MainLWLockArray = NULL; 126 127 /* 128 * We use this structure to keep track of locked LWLocks for release 129 * during error recovery. Normally, only a few will be held at once, but 130 * occasionally the number can be much higher; for example, the pg_buffercache 131 * extension locks all buffer partitions simultaneously. 132 */ 133 #define MAX_SIMUL_LWLOCKS 200 134 135 /* struct representing the LWLocks we're holding */ 136 typedef struct LWLockHandle 137 { 138 LWLock *lock; 139 LWLockMode mode; 140 } LWLockHandle; 141 142 static int num_held_lwlocks = 0; 143 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; 144 145 /* struct representing the LWLock tranche request for named tranche */ 146 typedef struct NamedLWLockTrancheRequest 147 { 148 char tranche_name[NAMEDATALEN]; 149 int num_lwlocks; 150 } NamedLWLockTrancheRequest; 151 152 NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL; 153 static int NamedLWLockTrancheRequestsAllocated = 0; 154 int NamedLWLockTrancheRequests = 0; 155 156 NamedLWLockTranche *NamedLWLockTrancheArray = NULL; 157 158 static bool lock_named_request_allowed = true; 159 160 static void InitializeLWLocks(void); 161 static void RegisterLWLockTranches(void); 162 163 static inline void LWLockReportWaitStart(LWLock *lock); 164 static inline void LWLockReportWaitEnd(void); 165 166 #ifdef LWLOCK_STATS 167 typedef struct lwlock_stats_key 168 { 169 int tranche; 170 void *instance; 171 } lwlock_stats_key; 172 173 typedef struct lwlock_stats 174 { 175 lwlock_stats_key key; 176 int sh_acquire_count; 177 int ex_acquire_count; 178 int block_count; 179 int dequeue_self_count; 180 int spin_delay_count; 181 } lwlock_stats; 182 183 static HTAB *lwlock_stats_htab; 184 static lwlock_stats lwlock_stats_dummy; 185 #endif 186 187 #ifdef LOCK_DEBUG 188 bool Trace_lwlocks = false; 189 190 inline static void 191 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode) 192 { 193 /* hide statement & context here, otherwise the log is just too verbose */ 194 if (Trace_lwlocks) 195 { 196 uint32 state = pg_atomic_read_u32(&lock->state); 197 198 ereport(LOG, 199 (errhidestmt(true), 200 errhidecontext(true), 201 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d", 202 MyProcPid, 203 where, T_NAME(lock), lock, 204 (state & LW_VAL_EXCLUSIVE) != 0, 205 state & LW_SHARED_MASK, 206 (state & LW_FLAG_HAS_WAITERS) != 0, 207 pg_atomic_read_u32(&lock->nwaiters), 208 (state & LW_FLAG_RELEASE_OK) != 0))); 209 } 210 } 211 212 inline static void 213 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg) 214 { 215 /* hide statement & context here, otherwise the log is just too verbose */ 216 if (Trace_lwlocks) 217 { 218 ereport(LOG, 219 (errhidestmt(true), 220 errhidecontext(true), 221 errmsg_internal("%s(%s %p): %s", where, 222 T_NAME(lock), lock, msg))); 223 } 224 } 225 226 #else /* not LOCK_DEBUG */ 227 #define PRINT_LWDEBUG(a,b,c) ((void)0) 228 #define LOG_LWDEBUG(a,b,c) ((void)0) 229 #endif /* LOCK_DEBUG */ 230 231 #ifdef LWLOCK_STATS 232 233 static void init_lwlock_stats(void); 234 static void print_lwlock_stats(int code, Datum arg); 235 static lwlock_stats * get_lwlock_stats_entry(LWLock *lockid); 236 237 static void 238 init_lwlock_stats(void) 239 { 240 HASHCTL ctl; 241 static MemoryContext lwlock_stats_cxt = NULL; 242 static bool exit_registered = false; 243 244 if (lwlock_stats_cxt != NULL) 245 MemoryContextDelete(lwlock_stats_cxt); 246 247 /* 248 * The LWLock stats will be updated within a critical section, which 249 * requires allocating new hash entries. Allocations within a critical 250 * section are normally not allowed because running out of memory would 251 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally 252 * turned on in production, so that's an acceptable risk. The hash entries 253 * are small, so the risk of running out of memory is minimal in practice. 254 */ 255 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext, 256 "LWLock stats", 257 ALLOCSET_DEFAULT_SIZES); 258 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true); 259 260 MemSet(&ctl, 0, sizeof(ctl)); 261 ctl.keysize = sizeof(lwlock_stats_key); 262 ctl.entrysize = sizeof(lwlock_stats); 263 ctl.hcxt = lwlock_stats_cxt; 264 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl, 265 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); 266 if (!exit_registered) 267 { 268 on_shmem_exit(print_lwlock_stats, 0); 269 exit_registered = true; 270 } 271 } 272 273 static void 274 print_lwlock_stats(int code, Datum arg) 275 { 276 HASH_SEQ_STATUS scan; 277 lwlock_stats *lwstats; 278 279 hash_seq_init(&scan, lwlock_stats_htab); 280 281 /* Grab an LWLock to keep different backends from mixing reports */ 282 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE); 283 284 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL) 285 { 286 fprintf(stderr, 287 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n", 288 MyProcPid, LWLockTrancheArray[lwstats->key.tranche], 289 lwstats->key.instance, lwstats->sh_acquire_count, 290 lwstats->ex_acquire_count, lwstats->block_count, 291 lwstats->spin_delay_count, lwstats->dequeue_self_count); 292 } 293 294 LWLockRelease(&MainLWLockArray[0].lock); 295 } 296 297 static lwlock_stats * 298 get_lwlock_stats_entry(LWLock *lock) 299 { 300 lwlock_stats_key key; 301 lwlock_stats *lwstats; 302 bool found; 303 304 /* 305 * During shared memory initialization, the hash table doesn't exist yet. 306 * Stats of that phase aren't very interesting, so just collect operations 307 * on all locks in a single dummy entry. 308 */ 309 if (lwlock_stats_htab == NULL) 310 return &lwlock_stats_dummy; 311 312 /* Fetch or create the entry. */ 313 MemSet(&key, 0, sizeof(key)); 314 key.tranche = lock->tranche; 315 key.instance = lock; 316 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found); 317 if (!found) 318 { 319 lwstats->sh_acquire_count = 0; 320 lwstats->ex_acquire_count = 0; 321 lwstats->block_count = 0; 322 lwstats->dequeue_self_count = 0; 323 lwstats->spin_delay_count = 0; 324 } 325 return lwstats; 326 } 327 #endif /* LWLOCK_STATS */ 328 329 330 /* 331 * Compute number of LWLocks required by named tranches. These will be 332 * allocated in the main array. 333 */ 334 static int 335 NumLWLocksByNamedTranches(void) 336 { 337 int numLocks = 0; 338 int i; 339 340 for (i = 0; i < NamedLWLockTrancheRequests; i++) 341 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks; 342 343 return numLocks; 344 } 345 346 /* 347 * Compute shmem space needed for LWLocks and named tranches. 348 */ 349 Size 350 LWLockShmemSize(void) 351 { 352 Size size; 353 int i; 354 int numLocks = NUM_FIXED_LWLOCKS; 355 356 numLocks += NumLWLocksByNamedTranches(); 357 358 /* Space for the LWLock array. */ 359 size = mul_size(numLocks, sizeof(LWLockPadded)); 360 361 /* Space for dynamic allocation counter, plus room for alignment. */ 362 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE); 363 364 /* space for named tranches. */ 365 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche))); 366 367 /* space for name of each tranche. */ 368 for (i = 0; i < NamedLWLockTrancheRequests; i++) 369 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1); 370 371 /* Disallow named LWLocks' requests after startup */ 372 lock_named_request_allowed = false; 373 374 return size; 375 } 376 377 /* 378 * Allocate shmem space for the main LWLock array and all tranches and 379 * initialize it. We also register all the LWLock tranches here. 380 */ 381 void 382 CreateLWLocks(void) 383 { 384 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS, 385 "MAX_BACKENDS too big for lwlock.c"); 386 387 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE && 388 sizeof(LWLock) <= LWLOCK_PADDED_SIZE, 389 "Miscalculated LWLock padding"); 390 391 if (!IsUnderPostmaster) 392 { 393 Size spaceLocks = LWLockShmemSize(); 394 int *LWLockCounter; 395 char *ptr; 396 397 /* Allocate space */ 398 ptr = (char *) ShmemAlloc(spaceLocks); 399 400 /* Leave room for dynamic allocation of tranches */ 401 ptr += sizeof(int); 402 403 /* Ensure desired alignment of LWLock array */ 404 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; 405 406 MainLWLockArray = (LWLockPadded *) ptr; 407 408 /* 409 * Initialize the dynamic-allocation counter for tranches, which is 410 * stored just before the first LWLock. 411 */ 412 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); 413 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED; 414 415 /* Initialize all LWLocks */ 416 InitializeLWLocks(); 417 } 418 419 /* Register all LWLock tranches */ 420 RegisterLWLockTranches(); 421 } 422 423 /* 424 * Initialize LWLocks that are fixed and those belonging to named tranches. 425 */ 426 static void 427 InitializeLWLocks(void) 428 { 429 int numNamedLocks = NumLWLocksByNamedTranches(); 430 int id; 431 int i; 432 int j; 433 LWLockPadded *lock; 434 435 /* Initialize all individual LWLocks in main array */ 436 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++) 437 LWLockInitialize(&lock->lock, id); 438 439 /* Initialize buffer mapping LWLocks in main array */ 440 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS; 441 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++) 442 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING); 443 444 /* Initialize lmgrs' LWLocks in main array */ 445 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS; 446 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++) 447 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER); 448 449 /* Initialize predicate lmgrs' LWLocks in main array */ 450 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + 451 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS; 452 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++) 453 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER); 454 455 /* Initialize named tranches. */ 456 if (NamedLWLockTrancheRequests > 0) 457 { 458 char *trancheNames; 459 460 NamedLWLockTrancheArray = (NamedLWLockTranche *) 461 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks]; 462 463 trancheNames = (char *) NamedLWLockTrancheArray + 464 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche)); 465 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS]; 466 467 for (i = 0; i < NamedLWLockTrancheRequests; i++) 468 { 469 NamedLWLockTrancheRequest *request; 470 NamedLWLockTranche *tranche; 471 char *name; 472 473 request = &NamedLWLockTrancheRequestArray[i]; 474 tranche = &NamedLWLockTrancheArray[i]; 475 476 name = trancheNames; 477 trancheNames += strlen(request->tranche_name) + 1; 478 strcpy(name, request->tranche_name); 479 tranche->trancheId = LWLockNewTrancheId(); 480 tranche->trancheName = name; 481 482 for (j = 0; j < request->num_lwlocks; j++, lock++) 483 LWLockInitialize(&lock->lock, tranche->trancheId); 484 } 485 } 486 } 487 488 /* 489 * Register named tranches and tranches for fixed LWLocks. 490 */ 491 static void 492 RegisterLWLockTranches(void) 493 { 494 int i; 495 496 if (LWLockTrancheArray == NULL) 497 { 498 LWLockTranchesAllocated = 128; 499 LWLockTrancheArray = (const char **) 500 MemoryContextAllocZero(TopMemoryContext, 501 LWLockTranchesAllocated * sizeof(char *)); 502 Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED); 503 } 504 505 for (i = 0; i < NUM_INDIVIDUAL_LWLOCKS; ++i) 506 LWLockRegisterTranche(i, MainLWLockNames[i]); 507 508 LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, "buffer_mapping"); 509 LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, "lock_manager"); 510 LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER, 511 "predicate_lock_manager"); 512 LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA, 513 "parallel_query_dsa"); 514 LWLockRegisterTranche(LWTRANCHE_SESSION_DSA, 515 "session_dsa"); 516 LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE, 517 "session_record_table"); 518 LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE, 519 "session_typmod_table"); 520 LWLockRegisterTranche(LWTRANCHE_SHARED_TUPLESTORE, 521 "shared_tuplestore"); 522 LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); 523 LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); 524 LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join"); 525 526 /* Register named tranches. */ 527 for (i = 0; i < NamedLWLockTrancheRequests; i++) 528 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId, 529 NamedLWLockTrancheArray[i].trancheName); 530 } 531 532 /* 533 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks 534 */ 535 void 536 InitLWLockAccess(void) 537 { 538 #ifdef LWLOCK_STATS 539 init_lwlock_stats(); 540 #endif 541 } 542 543 /* 544 * GetNamedLWLockTranche - returns the base address of LWLock from the 545 * specified tranche. 546 * 547 * Caller needs to retrieve the requested number of LWLocks starting from 548 * the base lock address returned by this API. This can be used for 549 * tranches that are requested by using RequestNamedLWLockTranche() API. 550 */ 551 LWLockPadded * 552 GetNamedLWLockTranche(const char *tranche_name) 553 { 554 int lock_pos; 555 int i; 556 557 /* 558 * Obtain the position of base address of LWLock belonging to requested 559 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed 560 * in MainLWLockArray after fixed locks. 561 */ 562 lock_pos = NUM_FIXED_LWLOCKS; 563 for (i = 0; i < NamedLWLockTrancheRequests; i++) 564 { 565 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name, 566 tranche_name) == 0) 567 return &MainLWLockArray[lock_pos]; 568 569 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks; 570 } 571 572 if (i >= NamedLWLockTrancheRequests) 573 elog(ERROR, "requested tranche is not registered"); 574 575 /* just to keep compiler quiet */ 576 return NULL; 577 } 578 579 /* 580 * Allocate a new tranche ID. 581 */ 582 int 583 LWLockNewTrancheId(void) 584 { 585 int result; 586 int *LWLockCounter; 587 588 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); 589 SpinLockAcquire(ShmemLock); 590 result = (*LWLockCounter)++; 591 SpinLockRelease(ShmemLock); 592 593 return result; 594 } 595 596 /* 597 * Register a tranche ID in the lookup table for the current process. This 598 * routine will save a pointer to the tranche name passed as an argument, 599 * so the name should be allocated in a backend-lifetime context 600 * (TopMemoryContext, static variable, or similar). 601 */ 602 void 603 LWLockRegisterTranche(int tranche_id, const char *tranche_name) 604 { 605 Assert(LWLockTrancheArray != NULL); 606 607 if (tranche_id >= LWLockTranchesAllocated) 608 { 609 int i = LWLockTranchesAllocated; 610 int j = LWLockTranchesAllocated; 611 612 while (i <= tranche_id) 613 i *= 2; 614 615 LWLockTrancheArray = (const char **) 616 repalloc(LWLockTrancheArray, i * sizeof(char *)); 617 LWLockTranchesAllocated = i; 618 while (j < LWLockTranchesAllocated) 619 LWLockTrancheArray[j++] = NULL; 620 } 621 622 LWLockTrancheArray[tranche_id] = tranche_name; 623 } 624 625 /* 626 * RequestNamedLWLockTranche 627 * Request that extra LWLocks be allocated during postmaster 628 * startup. 629 * 630 * This is only useful for extensions if called from the _PG_init hook 631 * of a library that is loaded into the postmaster via 632 * shared_preload_libraries. Once shared memory has been allocated, calls 633 * will be ignored. (We could raise an error, but it seems better to make 634 * it a no-op, so that libraries containing such calls can be reloaded if 635 * needed.) 636 */ 637 void 638 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks) 639 { 640 NamedLWLockTrancheRequest *request; 641 642 if (IsUnderPostmaster || !lock_named_request_allowed) 643 return; /* too late */ 644 645 if (NamedLWLockTrancheRequestArray == NULL) 646 { 647 NamedLWLockTrancheRequestsAllocated = 16; 648 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) 649 MemoryContextAlloc(TopMemoryContext, 650 NamedLWLockTrancheRequestsAllocated 651 * sizeof(NamedLWLockTrancheRequest)); 652 } 653 654 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated) 655 { 656 int i = NamedLWLockTrancheRequestsAllocated; 657 658 while (i <= NamedLWLockTrancheRequests) 659 i *= 2; 660 661 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) 662 repalloc(NamedLWLockTrancheRequestArray, 663 i * sizeof(NamedLWLockTrancheRequest)); 664 NamedLWLockTrancheRequestsAllocated = i; 665 } 666 667 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests]; 668 Assert(strlen(tranche_name) + 1 < NAMEDATALEN); 669 StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN); 670 request->num_lwlocks = num_lwlocks; 671 NamedLWLockTrancheRequests++; 672 } 673 674 /* 675 * LWLockInitialize - initialize a new lwlock; it's initially unlocked 676 */ 677 void 678 LWLockInitialize(LWLock *lock, int tranche_id) 679 { 680 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); 681 #ifdef LOCK_DEBUG 682 pg_atomic_init_u32(&lock->nwaiters, 0); 683 #endif 684 lock->tranche = tranche_id; 685 proclist_init(&lock->waiters); 686 } 687 688 /* 689 * Report start of wait event for light-weight locks. 690 * 691 * This function will be used by all the light-weight lock calls which 692 * needs to wait to acquire the lock. This function distinguishes wait 693 * event based on tranche and lock id. 694 */ 695 static inline void 696 LWLockReportWaitStart(LWLock *lock) 697 { 698 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche); 699 } 700 701 /* 702 * Report end of wait event for light-weight locks. 703 */ 704 static inline void 705 LWLockReportWaitEnd(void) 706 { 707 pgstat_report_wait_end(); 708 } 709 710 /* 711 * Return an identifier for an LWLock based on the wait class and event. 712 */ 713 const char * 714 GetLWLockIdentifier(uint32 classId, uint16 eventId) 715 { 716 Assert(classId == PG_WAIT_LWLOCK); 717 718 /* 719 * It is quite possible that user has registered tranche in one of the 720 * backends (e.g. by allocating lwlocks in dynamic shared memory) but not 721 * all of them, so we can't assume the tranche is registered here. 722 */ 723 if (eventId >= LWLockTranchesAllocated || 724 LWLockTrancheArray[eventId] == NULL) 725 return "extension"; 726 727 return LWLockTrancheArray[eventId]; 728 } 729 730 /* 731 * Internal function that tries to atomically acquire the lwlock in the passed 732 * in mode. 733 * 734 * This function will not block waiting for a lock to become free - that's the 735 * callers job. 736 * 737 * Returns true if the lock isn't free and we need to wait. 738 */ 739 static bool 740 LWLockAttemptLock(LWLock *lock, LWLockMode mode) 741 { 742 uint32 old_state; 743 744 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED); 745 746 /* 747 * Read once outside the loop, later iterations will get the newer value 748 * via compare & exchange. 749 */ 750 old_state = pg_atomic_read_u32(&lock->state); 751 752 /* loop until we've determined whether we could acquire the lock or not */ 753 while (true) 754 { 755 uint32 desired_state; 756 bool lock_free; 757 758 desired_state = old_state; 759 760 if (mode == LW_EXCLUSIVE) 761 { 762 lock_free = (old_state & LW_LOCK_MASK) == 0; 763 if (lock_free) 764 desired_state += LW_VAL_EXCLUSIVE; 765 } 766 else 767 { 768 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0; 769 if (lock_free) 770 desired_state += LW_VAL_SHARED; 771 } 772 773 /* 774 * Attempt to swap in the state we are expecting. If we didn't see 775 * lock to be free, that's just the old value. If we saw it as free, 776 * we'll attempt to mark it acquired. The reason that we always swap 777 * in the value is that this doubles as a memory barrier. We could try 778 * to be smarter and only swap in values if we saw the lock as free, 779 * but benchmark haven't shown it as beneficial so far. 780 * 781 * Retry if the value changed since we last looked at it. 782 */ 783 if (pg_atomic_compare_exchange_u32(&lock->state, 784 &old_state, desired_state)) 785 { 786 if (lock_free) 787 { 788 /* Great! Got the lock. */ 789 #ifdef LOCK_DEBUG 790 if (mode == LW_EXCLUSIVE) 791 lock->owner = MyProc; 792 #endif 793 return false; 794 } 795 else 796 return true; /* somebody else has the lock */ 797 } 798 } 799 pg_unreachable(); 800 } 801 802 /* 803 * Lock the LWLock's wait list against concurrent activity. 804 * 805 * NB: even though the wait list is locked, non-conflicting lock operations 806 * may still happen concurrently. 807 * 808 * Time spent holding mutex should be short! 809 */ 810 static void 811 LWLockWaitListLock(LWLock *lock) 812 { 813 uint32 old_state; 814 #ifdef LWLOCK_STATS 815 lwlock_stats *lwstats; 816 uint32 delays = 0; 817 818 lwstats = get_lwlock_stats_entry(lock); 819 #endif 820 821 while (true) 822 { 823 /* always try once to acquire lock directly */ 824 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED); 825 if (!(old_state & LW_FLAG_LOCKED)) 826 break; /* got lock */ 827 828 /* and then spin without atomic operations until lock is released */ 829 { 830 SpinDelayStatus delayStatus; 831 832 init_local_spin_delay(&delayStatus); 833 834 while (old_state & LW_FLAG_LOCKED) 835 { 836 perform_spin_delay(&delayStatus); 837 old_state = pg_atomic_read_u32(&lock->state); 838 } 839 #ifdef LWLOCK_STATS 840 delays += delayStatus.delays; 841 #endif 842 finish_spin_delay(&delayStatus); 843 } 844 845 /* 846 * Retry. The lock might obviously already be re-acquired by the time 847 * we're attempting to get it again. 848 */ 849 } 850 851 #ifdef LWLOCK_STATS 852 lwstats->spin_delay_count += delays; 853 #endif 854 } 855 856 /* 857 * Unlock the LWLock's wait list. 858 * 859 * Note that it can be more efficient to manipulate flags and release the 860 * locks in a single atomic operation. 861 */ 862 static void 863 LWLockWaitListUnlock(LWLock *lock) 864 { 865 uint32 old_state PG_USED_FOR_ASSERTS_ONLY; 866 867 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED); 868 869 Assert(old_state & LW_FLAG_LOCKED); 870 } 871 872 /* 873 * Wakeup all the lockers that currently have a chance to acquire the lock. 874 */ 875 static void 876 LWLockWakeup(LWLock *lock) 877 { 878 bool new_release_ok; 879 bool wokeup_somebody = false; 880 proclist_head wakeup; 881 proclist_mutable_iter iter; 882 883 proclist_init(&wakeup); 884 885 new_release_ok = true; 886 887 /* lock wait list while collecting backends to wake up */ 888 LWLockWaitListLock(lock); 889 890 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 891 { 892 PGPROC *waiter = GetPGProcByNumber(iter.cur); 893 894 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) 895 continue; 896 897 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 898 proclist_push_tail(&wakeup, iter.cur, lwWaitLink); 899 900 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) 901 { 902 /* 903 * Prevent additional wakeups until retryer gets to run. Backends 904 * that are just waiting for the lock to become free don't retry 905 * automatically. 906 */ 907 new_release_ok = false; 908 909 /* 910 * Don't wakeup (further) exclusive locks. 911 */ 912 wokeup_somebody = true; 913 } 914 915 /* 916 * Once we've woken up an exclusive lock, there's no point in waking 917 * up anybody else. 918 */ 919 if (waiter->lwWaitMode == LW_EXCLUSIVE) 920 break; 921 } 922 923 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS); 924 925 /* unset required flags, and release lock, in one fell swoop */ 926 { 927 uint32 old_state; 928 uint32 desired_state; 929 930 old_state = pg_atomic_read_u32(&lock->state); 931 while (true) 932 { 933 desired_state = old_state; 934 935 /* compute desired flags */ 936 937 if (new_release_ok) 938 desired_state |= LW_FLAG_RELEASE_OK; 939 else 940 desired_state &= ~LW_FLAG_RELEASE_OK; 941 942 if (proclist_is_empty(&wakeup)) 943 desired_state &= ~LW_FLAG_HAS_WAITERS; 944 945 desired_state &= ~LW_FLAG_LOCKED; /* release lock */ 946 947 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state, 948 desired_state)) 949 break; 950 } 951 } 952 953 /* Awaken any waiters I removed from the queue. */ 954 proclist_foreach_modify(iter, &wakeup, lwWaitLink) 955 { 956 PGPROC *waiter = GetPGProcByNumber(iter.cur); 957 958 LOG_LWDEBUG("LWLockRelease", lock, "release waiter"); 959 proclist_delete(&wakeup, iter.cur, lwWaitLink); 960 961 /* 962 * Guarantee that lwWaiting being unset only becomes visible once the 963 * unlink from the link has completed. Otherwise the target backend 964 * could be woken up for other reason and enqueue for a new lock - if 965 * that happens before the list unlink happens, the list would end up 966 * being corrupted. 967 * 968 * The barrier pairs with the LWLockWaitListLock() when enqueuing for 969 * another lock. 970 */ 971 pg_write_barrier(); 972 waiter->lwWaiting = false; 973 PGSemaphoreUnlock(waiter->sem); 974 } 975 } 976 977 /* 978 * Add ourselves to the end of the queue. 979 * 980 * NB: Mode can be LW_WAIT_UNTIL_FREE here! 981 */ 982 static void 983 LWLockQueueSelf(LWLock *lock, LWLockMode mode) 984 { 985 /* 986 * If we don't have a PGPROC structure, there's no way to wait. This 987 * should never occur, since MyProc should only be null during shared 988 * memory initialization. 989 */ 990 if (MyProc == NULL) 991 elog(PANIC, "cannot wait without a PGPROC structure"); 992 993 if (MyProc->lwWaiting) 994 elog(PANIC, "queueing for lock while waiting on another one"); 995 996 LWLockWaitListLock(lock); 997 998 /* setting the flag is protected by the spinlock */ 999 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS); 1000 1001 MyProc->lwWaiting = true; 1002 MyProc->lwWaitMode = mode; 1003 1004 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */ 1005 if (mode == LW_WAIT_UNTIL_FREE) 1006 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink); 1007 else 1008 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink); 1009 1010 /* Can release the mutex now */ 1011 LWLockWaitListUnlock(lock); 1012 1013 #ifdef LOCK_DEBUG 1014 pg_atomic_fetch_add_u32(&lock->nwaiters, 1); 1015 #endif 1016 1017 } 1018 1019 /* 1020 * Remove ourselves from the waitlist. 1021 * 1022 * This is used if we queued ourselves because we thought we needed to sleep 1023 * but, after further checking, we discovered that we don't actually need to 1024 * do so. 1025 */ 1026 static void 1027 LWLockDequeueSelf(LWLock *lock) 1028 { 1029 bool found = false; 1030 proclist_mutable_iter iter; 1031 1032 #ifdef LWLOCK_STATS 1033 lwlock_stats *lwstats; 1034 1035 lwstats = get_lwlock_stats_entry(lock); 1036 1037 lwstats->dequeue_self_count++; 1038 #endif 1039 1040 LWLockWaitListLock(lock); 1041 1042 /* 1043 * Can't just remove ourselves from the list, but we need to iterate over 1044 * all entries as somebody else could have dequeued us. 1045 */ 1046 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 1047 { 1048 if (iter.cur == MyProc->pgprocno) 1049 { 1050 found = true; 1051 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 1052 break; 1053 } 1054 } 1055 1056 if (proclist_is_empty(&lock->waiters) && 1057 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) 1058 { 1059 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); 1060 } 1061 1062 /* XXX: combine with fetch_and above? */ 1063 LWLockWaitListUnlock(lock); 1064 1065 /* clear waiting state again, nice for debugging */ 1066 if (found) 1067 MyProc->lwWaiting = false; 1068 else 1069 { 1070 int extraWaits = 0; 1071 1072 /* 1073 * Somebody else dequeued us and has or will wake us up. Deal with the 1074 * superfluous absorption of a wakeup. 1075 */ 1076 1077 /* 1078 * Reset releaseOk if somebody woke us before we removed ourselves - 1079 * they'll have set it to false. 1080 */ 1081 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1082 1083 /* 1084 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would 1085 * get reset at some inconvenient point later. Most of the time this 1086 * will immediately return. 1087 */ 1088 for (;;) 1089 { 1090 PGSemaphoreLock(MyProc->sem); 1091 if (!MyProc->lwWaiting) 1092 break; 1093 extraWaits++; 1094 } 1095 1096 /* 1097 * Fix the process wait semaphore's count for any absorbed wakeups. 1098 */ 1099 while (extraWaits-- > 0) 1100 PGSemaphoreUnlock(MyProc->sem); 1101 } 1102 1103 #ifdef LOCK_DEBUG 1104 { 1105 /* not waiting anymore */ 1106 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1107 1108 Assert(nwaiters < MAX_BACKENDS); 1109 } 1110 #endif 1111 } 1112 1113 /* 1114 * LWLockAcquire - acquire a lightweight lock in the specified mode 1115 * 1116 * If the lock is not available, sleep until it is. Returns true if the lock 1117 * was available immediately, false if we had to sleep. 1118 * 1119 * Side effect: cancel/die interrupts are held off until lock release. 1120 */ 1121 bool 1122 LWLockAcquire(LWLock *lock, LWLockMode mode) 1123 { 1124 PGPROC *proc = MyProc; 1125 bool result = true; 1126 int extraWaits = 0; 1127 #ifdef LWLOCK_STATS 1128 lwlock_stats *lwstats; 1129 1130 lwstats = get_lwlock_stats_entry(lock); 1131 #endif 1132 1133 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1134 1135 PRINT_LWDEBUG("LWLockAcquire", lock, mode); 1136 1137 #ifdef LWLOCK_STATS 1138 /* Count lock acquisition attempts */ 1139 if (mode == LW_EXCLUSIVE) 1140 lwstats->ex_acquire_count++; 1141 else 1142 lwstats->sh_acquire_count++; 1143 #endif /* LWLOCK_STATS */ 1144 1145 /* 1146 * We can't wait if we haven't got a PGPROC. This should only occur 1147 * during bootstrap or shared memory initialization. Put an Assert here 1148 * to catch unsafe coding practices. 1149 */ 1150 Assert(!(proc == NULL && IsUnderPostmaster)); 1151 1152 /* Ensure we will have room to remember the lock */ 1153 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1154 elog(ERROR, "too many LWLocks taken"); 1155 1156 /* 1157 * Lock out cancel/die interrupts until we exit the code section protected 1158 * by the LWLock. This ensures that interrupts will not interfere with 1159 * manipulations of data structures in shared memory. 1160 */ 1161 HOLD_INTERRUPTS(); 1162 1163 /* 1164 * Loop here to try to acquire lock after each time we are signaled by 1165 * LWLockRelease. 1166 * 1167 * NOTE: it might seem better to have LWLockRelease actually grant us the 1168 * lock, rather than retrying and possibly having to go back to sleep. But 1169 * in practice that is no good because it means a process swap for every 1170 * lock acquisition when two or more processes are contending for the same 1171 * lock. Since LWLocks are normally used to protect not-very-long 1172 * sections of computation, a process needs to be able to acquire and 1173 * release the same lock many times during a single CPU time slice, even 1174 * in the presence of contention. The efficiency of being able to do that 1175 * outweighs the inefficiency of sometimes wasting a process dispatch 1176 * cycle because the lock is not free when a released waiter finally gets 1177 * to run. See pgsql-hackers archives for 29-Dec-01. 1178 */ 1179 for (;;) 1180 { 1181 bool mustwait; 1182 1183 /* 1184 * Try to grab the lock the first time, we're not in the waitqueue 1185 * yet/anymore. 1186 */ 1187 mustwait = LWLockAttemptLock(lock, mode); 1188 1189 if (!mustwait) 1190 { 1191 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock"); 1192 break; /* got the lock */ 1193 } 1194 1195 /* 1196 * Ok, at this point we couldn't grab the lock on the first try. We 1197 * cannot simply queue ourselves to the end of the list and wait to be 1198 * woken up because by now the lock could long have been released. 1199 * Instead add us to the queue and try to grab the lock again. If we 1200 * succeed we need to revert the queuing and be happy, otherwise we 1201 * recheck the lock. If we still couldn't grab it, we know that the 1202 * other locker will see our queue entries when releasing since they 1203 * existed before we checked for the lock. 1204 */ 1205 1206 /* add to the queue */ 1207 LWLockQueueSelf(lock, mode); 1208 1209 /* we're now guaranteed to be woken up if necessary */ 1210 mustwait = LWLockAttemptLock(lock, mode); 1211 1212 /* ok, grabbed the lock the second time round, need to undo queueing */ 1213 if (!mustwait) 1214 { 1215 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue"); 1216 1217 LWLockDequeueSelf(lock); 1218 break; 1219 } 1220 1221 /* 1222 * Wait until awakened. 1223 * 1224 * It is possible that we get awakened for a reason other than being 1225 * signaled by LWLockRelease. If so, loop back and wait again. Once 1226 * we've gotten the LWLock, re-increment the sema by the number of 1227 * additional signals received. 1228 */ 1229 LOG_LWDEBUG("LWLockAcquire", lock, "waiting"); 1230 1231 #ifdef LWLOCK_STATS 1232 lwstats->block_count++; 1233 #endif 1234 1235 LWLockReportWaitStart(lock); 1236 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); 1237 1238 for (;;) 1239 { 1240 PGSemaphoreLock(proc->sem); 1241 if (!proc->lwWaiting) 1242 break; 1243 extraWaits++; 1244 } 1245 1246 /* Retrying, allow LWLockRelease to release waiters again. */ 1247 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1248 1249 #ifdef LOCK_DEBUG 1250 { 1251 /* not waiting anymore */ 1252 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1253 1254 Assert(nwaiters < MAX_BACKENDS); 1255 } 1256 #endif 1257 1258 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); 1259 LWLockReportWaitEnd(); 1260 1261 LOG_LWDEBUG("LWLockAcquire", lock, "awakened"); 1262 1263 /* Now loop back and try to acquire lock again. */ 1264 result = false; 1265 } 1266 1267 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode); 1268 1269 /* Add lock to list of locks held by this backend */ 1270 held_lwlocks[num_held_lwlocks].lock = lock; 1271 held_lwlocks[num_held_lwlocks++].mode = mode; 1272 1273 /* 1274 * Fix the process wait semaphore's count for any absorbed wakeups. 1275 */ 1276 while (extraWaits-- > 0) 1277 PGSemaphoreUnlock(proc->sem); 1278 1279 return result; 1280 } 1281 1282 /* 1283 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode 1284 * 1285 * If the lock is not available, return false with no side-effects. 1286 * 1287 * If successful, cancel/die interrupts are held off until lock release. 1288 */ 1289 bool 1290 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) 1291 { 1292 bool mustwait; 1293 1294 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1295 1296 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode); 1297 1298 /* Ensure we will have room to remember the lock */ 1299 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1300 elog(ERROR, "too many LWLocks taken"); 1301 1302 /* 1303 * Lock out cancel/die interrupts until we exit the code section protected 1304 * by the LWLock. This ensures that interrupts will not interfere with 1305 * manipulations of data structures in shared memory. 1306 */ 1307 HOLD_INTERRUPTS(); 1308 1309 /* Check for the lock */ 1310 mustwait = LWLockAttemptLock(lock, mode); 1311 1312 if (mustwait) 1313 { 1314 /* Failed to get lock, so release interrupt holdoff */ 1315 RESUME_INTERRUPTS(); 1316 1317 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed"); 1318 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode); 1319 } 1320 else 1321 { 1322 /* Add lock to list of locks held by this backend */ 1323 held_lwlocks[num_held_lwlocks].lock = lock; 1324 held_lwlocks[num_held_lwlocks++].mode = mode; 1325 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode); 1326 } 1327 return !mustwait; 1328 } 1329 1330 /* 1331 * LWLockAcquireOrWait - Acquire lock, or wait until it's free 1332 * 1333 * The semantics of this function are a bit funky. If the lock is currently 1334 * free, it is acquired in the given mode, and the function returns true. If 1335 * the lock isn't immediately free, the function waits until it is released 1336 * and returns false, but does not acquire the lock. 1337 * 1338 * This is currently used for WALWriteLock: when a backend flushes the WAL, 1339 * holding WALWriteLock, it can flush the commit records of many other 1340 * backends as a side-effect. Those other backends need to wait until the 1341 * flush finishes, but don't need to acquire the lock anymore. They can just 1342 * wake up, observe that their records have already been flushed, and return. 1343 */ 1344 bool 1345 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) 1346 { 1347 PGPROC *proc = MyProc; 1348 bool mustwait; 1349 int extraWaits = 0; 1350 #ifdef LWLOCK_STATS 1351 lwlock_stats *lwstats; 1352 1353 lwstats = get_lwlock_stats_entry(lock); 1354 #endif 1355 1356 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1357 1358 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode); 1359 1360 /* Ensure we will have room to remember the lock */ 1361 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1362 elog(ERROR, "too many LWLocks taken"); 1363 1364 /* 1365 * Lock out cancel/die interrupts until we exit the code section protected 1366 * by the LWLock. This ensures that interrupts will not interfere with 1367 * manipulations of data structures in shared memory. 1368 */ 1369 HOLD_INTERRUPTS(); 1370 1371 /* 1372 * NB: We're using nearly the same twice-in-a-row lock acquisition 1373 * protocol as LWLockAcquire(). Check its comments for details. 1374 */ 1375 mustwait = LWLockAttemptLock(lock, mode); 1376 1377 if (mustwait) 1378 { 1379 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); 1380 1381 mustwait = LWLockAttemptLock(lock, mode); 1382 1383 if (mustwait) 1384 { 1385 /* 1386 * Wait until awakened. Like in LWLockAcquire, be prepared for 1387 * bogus wakeups. 1388 */ 1389 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting"); 1390 1391 #ifdef LWLOCK_STATS 1392 lwstats->block_count++; 1393 #endif 1394 1395 LWLockReportWaitStart(lock); 1396 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); 1397 1398 for (;;) 1399 { 1400 PGSemaphoreLock(proc->sem); 1401 if (!proc->lwWaiting) 1402 break; 1403 extraWaits++; 1404 } 1405 1406 #ifdef LOCK_DEBUG 1407 { 1408 /* not waiting anymore */ 1409 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1410 1411 Assert(nwaiters < MAX_BACKENDS); 1412 } 1413 #endif 1414 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); 1415 LWLockReportWaitEnd(); 1416 1417 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened"); 1418 } 1419 else 1420 { 1421 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue"); 1422 1423 /* 1424 * Got lock in the second attempt, undo queueing. We need to treat 1425 * this as having successfully acquired the lock, otherwise we'd 1426 * not necessarily wake up people we've prevented from acquiring 1427 * the lock. 1428 */ 1429 LWLockDequeueSelf(lock); 1430 } 1431 } 1432 1433 /* 1434 * Fix the process wait semaphore's count for any absorbed wakeups. 1435 */ 1436 while (extraWaits-- > 0) 1437 PGSemaphoreUnlock(proc->sem); 1438 1439 if (mustwait) 1440 { 1441 /* Failed to get lock, so release interrupt holdoff */ 1442 RESUME_INTERRUPTS(); 1443 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed"); 1444 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode); 1445 } 1446 else 1447 { 1448 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded"); 1449 /* Add lock to list of locks held by this backend */ 1450 held_lwlocks[num_held_lwlocks].lock = lock; 1451 held_lwlocks[num_held_lwlocks++].mode = mode; 1452 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode); 1453 } 1454 1455 return !mustwait; 1456 } 1457 1458 /* 1459 * Does the lwlock in its current state need to wait for the variable value to 1460 * change? 1461 * 1462 * If we don't need to wait, and it's because the value of the variable has 1463 * changed, store the current value in newval. 1464 * 1465 * *result is set to true if the lock was free, and false otherwise. 1466 */ 1467 static bool 1468 LWLockConflictsWithVar(LWLock *lock, 1469 uint64 *valptr, uint64 oldval, uint64 *newval, 1470 bool *result) 1471 { 1472 bool mustwait; 1473 uint64 value; 1474 1475 /* 1476 * Test first to see if it the slot is free right now. 1477 * 1478 * XXX: the caller uses a spinlock before this, so we don't need a memory 1479 * barrier here as far as the current usage is concerned. But that might 1480 * not be safe in general. 1481 */ 1482 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; 1483 1484 if (!mustwait) 1485 { 1486 *result = true; 1487 return false; 1488 } 1489 1490 *result = false; 1491 1492 /* 1493 * Read value using the lwlock's wait list lock, as we can't generally 1494 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to 1495 * do atomic 64 bit reads/writes the spinlock should be optimized away. 1496 */ 1497 LWLockWaitListLock(lock); 1498 value = *valptr; 1499 LWLockWaitListUnlock(lock); 1500 1501 if (value != oldval) 1502 { 1503 mustwait = false; 1504 *newval = value; 1505 } 1506 else 1507 { 1508 mustwait = true; 1509 } 1510 1511 return mustwait; 1512 } 1513 1514 /* 1515 * LWLockWaitForVar - Wait until lock is free, or a variable is updated. 1516 * 1517 * If the lock is held and *valptr equals oldval, waits until the lock is 1518 * either freed, or the lock holder updates *valptr by calling 1519 * LWLockUpdateVar. If the lock is free on exit (immediately or after 1520 * waiting), returns true. If the lock is still held, but *valptr no longer 1521 * matches oldval, returns false and sets *newval to the current value in 1522 * *valptr. 1523 * 1524 * Note: this function ignores shared lock holders; if the lock is held 1525 * in shared mode, returns 'true'. 1526 */ 1527 bool 1528 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) 1529 { 1530 PGPROC *proc = MyProc; 1531 int extraWaits = 0; 1532 bool result = false; 1533 #ifdef LWLOCK_STATS 1534 lwlock_stats *lwstats; 1535 1536 lwstats = get_lwlock_stats_entry(lock); 1537 #endif 1538 1539 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE); 1540 1541 /* 1542 * Lock out cancel/die interrupts while we sleep on the lock. There is no 1543 * cleanup mechanism to remove us from the wait queue if we got 1544 * interrupted. 1545 */ 1546 HOLD_INTERRUPTS(); 1547 1548 /* 1549 * Loop here to check the lock's status after each time we are signaled. 1550 */ 1551 for (;;) 1552 { 1553 bool mustwait; 1554 1555 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, 1556 &result); 1557 1558 if (!mustwait) 1559 break; /* the lock was free or value didn't match */ 1560 1561 /* 1562 * Add myself to wait queue. Note that this is racy, somebody else 1563 * could wakeup before we're finished queuing. NB: We're using nearly 1564 * the same twice-in-a-row lock acquisition protocol as 1565 * LWLockAcquire(). Check its comments for details. The only 1566 * difference is that we also have to check the variable's values when 1567 * checking the state of the lock. 1568 */ 1569 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); 1570 1571 /* 1572 * Set RELEASE_OK flag, to make sure we get woken up as soon as the 1573 * lock is released. 1574 */ 1575 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1576 1577 /* 1578 * We're now guaranteed to be woken up if necessary. Recheck the lock 1579 * and variables state. 1580 */ 1581 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, 1582 &result); 1583 1584 /* Ok, no conflict after we queued ourselves. Undo queueing. */ 1585 if (!mustwait) 1586 { 1587 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue"); 1588 1589 LWLockDequeueSelf(lock); 1590 break; 1591 } 1592 1593 /* 1594 * Wait until awakened. 1595 * 1596 * It is possible that we get awakened for a reason other than being 1597 * signaled by LWLockRelease. If so, loop back and wait again. Once 1598 * we've gotten the LWLock, re-increment the sema by the number of 1599 * additional signals received. 1600 */ 1601 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting"); 1602 1603 #ifdef LWLOCK_STATS 1604 lwstats->block_count++; 1605 #endif 1606 1607 LWLockReportWaitStart(lock); 1608 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE); 1609 1610 for (;;) 1611 { 1612 PGSemaphoreLock(proc->sem); 1613 if (!proc->lwWaiting) 1614 break; 1615 extraWaits++; 1616 } 1617 1618 #ifdef LOCK_DEBUG 1619 { 1620 /* not waiting anymore */ 1621 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1622 1623 Assert(nwaiters < MAX_BACKENDS); 1624 } 1625 #endif 1626 1627 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE); 1628 LWLockReportWaitEnd(); 1629 1630 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened"); 1631 1632 /* Now loop back and check the status of the lock again. */ 1633 } 1634 1635 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE); 1636 1637 /* 1638 * Fix the process wait semaphore's count for any absorbed wakeups. 1639 */ 1640 while (extraWaits-- > 0) 1641 PGSemaphoreUnlock(proc->sem); 1642 1643 /* 1644 * Now okay to allow cancel/die interrupts. 1645 */ 1646 RESUME_INTERRUPTS(); 1647 1648 return result; 1649 } 1650 1651 1652 /* 1653 * LWLockUpdateVar - Update a variable and wake up waiters atomically 1654 * 1655 * Sets *valptr to 'val', and wakes up all processes waiting for us with 1656 * LWLockWaitForVar(). Setting the value and waking up the processes happen 1657 * atomically so that any process calling LWLockWaitForVar() on the same lock 1658 * is guaranteed to see the new value, and act accordingly. 1659 * 1660 * The caller must be holding the lock in exclusive mode. 1661 */ 1662 void 1663 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) 1664 { 1665 proclist_head wakeup; 1666 proclist_mutable_iter iter; 1667 1668 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE); 1669 1670 proclist_init(&wakeup); 1671 1672 LWLockWaitListLock(lock); 1673 1674 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE); 1675 1676 /* Update the lock's value */ 1677 *valptr = val; 1678 1679 /* 1680 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken 1681 * up. They are always in the front of the queue. 1682 */ 1683 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 1684 { 1685 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1686 1687 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) 1688 break; 1689 1690 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 1691 proclist_push_tail(&wakeup, iter.cur, lwWaitLink); 1692 } 1693 1694 /* We are done updating shared state of the lock itself. */ 1695 LWLockWaitListUnlock(lock); 1696 1697 /* 1698 * Awaken any waiters I removed from the queue. 1699 */ 1700 proclist_foreach_modify(iter, &wakeup, lwWaitLink) 1701 { 1702 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1703 1704 proclist_delete(&wakeup, iter.cur, lwWaitLink); 1705 /* check comment in LWLockWakeup() about this barrier */ 1706 pg_write_barrier(); 1707 waiter->lwWaiting = false; 1708 PGSemaphoreUnlock(waiter->sem); 1709 } 1710 } 1711 1712 1713 /* 1714 * LWLockRelease - release a previously acquired lock 1715 */ 1716 void 1717 LWLockRelease(LWLock *lock) 1718 { 1719 LWLockMode mode; 1720 uint32 oldstate; 1721 bool check_waiters; 1722 int i; 1723 1724 /* 1725 * Remove lock from list of locks held. Usually, but not always, it will 1726 * be the latest-acquired lock; so search array backwards. 1727 */ 1728 for (i = num_held_lwlocks; --i >= 0;) 1729 if (lock == held_lwlocks[i].lock) 1730 break; 1731 1732 if (i < 0) 1733 elog(ERROR, "lock %s is not held", T_NAME(lock)); 1734 1735 mode = held_lwlocks[i].mode; 1736 1737 num_held_lwlocks--; 1738 for (; i < num_held_lwlocks; i++) 1739 held_lwlocks[i] = held_lwlocks[i + 1]; 1740 1741 PRINT_LWDEBUG("LWLockRelease", lock, mode); 1742 1743 /* 1744 * Release my hold on lock, after that it can immediately be acquired by 1745 * others, even if we still have to wakeup other waiters. 1746 */ 1747 if (mode == LW_EXCLUSIVE) 1748 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE); 1749 else 1750 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED); 1751 1752 /* nobody else can have that kind of lock */ 1753 Assert(!(oldstate & LW_VAL_EXCLUSIVE)); 1754 1755 1756 /* 1757 * We're still waiting for backends to get scheduled, don't wake them up 1758 * again. 1759 */ 1760 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) == 1761 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) && 1762 (oldstate & LW_LOCK_MASK) == 0) 1763 check_waiters = true; 1764 else 1765 check_waiters = false; 1766 1767 /* 1768 * As waking up waiters requires the spinlock to be acquired, only do so 1769 * if necessary. 1770 */ 1771 if (check_waiters) 1772 { 1773 /* XXX: remove before commit? */ 1774 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters"); 1775 LWLockWakeup(lock); 1776 } 1777 1778 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock)); 1779 1780 /* 1781 * Now okay to allow cancel/die interrupts. 1782 */ 1783 RESUME_INTERRUPTS(); 1784 } 1785 1786 /* 1787 * LWLockReleaseClearVar - release a previously acquired lock, reset variable 1788 */ 1789 void 1790 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val) 1791 { 1792 LWLockWaitListLock(lock); 1793 1794 /* 1795 * Set the variable's value before releasing the lock, that prevents race 1796 * a race condition wherein a new locker acquires the lock, but hasn't yet 1797 * set the variables value. 1798 */ 1799 *valptr = val; 1800 LWLockWaitListUnlock(lock); 1801 1802 LWLockRelease(lock); 1803 } 1804 1805 1806 /* 1807 * LWLockReleaseAll - release all currently-held locks 1808 * 1809 * Used to clean up after ereport(ERROR). An important difference between this 1810 * function and retail LWLockRelease calls is that InterruptHoldoffCount is 1811 * unchanged by this operation. This is necessary since InterruptHoldoffCount 1812 * has been set to an appropriate level earlier in error recovery. We could 1813 * decrement it below zero if we allow it to drop for each released lock! 1814 */ 1815 void 1816 LWLockReleaseAll(void) 1817 { 1818 while (num_held_lwlocks > 0) 1819 { 1820 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ 1821 1822 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock); 1823 } 1824 } 1825 1826 1827 /* 1828 * LWLockHeldByMe - test whether my process holds a lock in any mode 1829 * 1830 * This is meant as debug support only. 1831 */ 1832 bool 1833 LWLockHeldByMe(LWLock *l) 1834 { 1835 int i; 1836 1837 for (i = 0; i < num_held_lwlocks; i++) 1838 { 1839 if (held_lwlocks[i].lock == l) 1840 return true; 1841 } 1842 return false; 1843 } 1844 1845 /* 1846 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode 1847 * 1848 * This is meant as debug support only. 1849 */ 1850 bool 1851 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode) 1852 { 1853 int i; 1854 1855 for (i = 0; i < num_held_lwlocks; i++) 1856 { 1857 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode) 1858 return true; 1859 } 1860 return false; 1861 } 1862