1 /*------------------------------------------------------------------------- 2 * 3 * lwlock.c 4 * Lightweight lock manager 5 * 6 * Lightweight locks are intended primarily to provide mutual exclusion of 7 * access to shared-memory data structures. Therefore, they offer both 8 * exclusive and shared lock modes (to support read/write and read-only 9 * access to a shared object). There are few other frammishes. User-level 10 * locking should be done with the full lock manager --- which depends on 11 * LWLocks to protect its shared state. 12 * 13 * In addition to exclusive and shared modes, lightweight locks can be used to 14 * wait until a variable changes value. The variable is initially not set 15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the 16 * value it was set to when the lock was released last, and can be updated 17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar 18 * waits for the variable to be updated, or until the lock is free. When 19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an 20 * appropriate value for a free lock. The meaning of the variable is up to 21 * the caller, the lightweight lock code just assigns and compares it. 22 * 23 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group 24 * Portions Copyright (c) 1994, Regents of the University of California 25 * 26 * IDENTIFICATION 27 * src/backend/storage/lmgr/lwlock.c 28 * 29 * NOTES: 30 * 31 * This used to be a pretty straight forward reader-writer lock 32 * implementation, in which the internal state was protected by a 33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be 34 * too high for workloads/locks that were taken in shared mode very 35 * frequently. Often we were spinning in the (obviously exclusive) spinlock, 36 * while trying to acquire a shared lock that was actually free. 37 * 38 * Thus a new implementation was devised that provides wait-free shared lock 39 * acquisition for locks that aren't exclusively locked. 40 * 41 * The basic idea is to have a single atomic variable 'lockcount' instead of 42 * the formerly separate shared and exclusive counters and to use atomic 43 * operations to acquire the lock. That's fairly easy to do for plain 44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait 45 * in the OS. 46 * 47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount 48 * variable. For exclusive lock we swap in a sentinel value 49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders. 50 * 51 * To release the lock we use an atomic decrement to release the lock. If the 52 * new value is zero (we get that atomically), we know we can/have to release 53 * waiters. 54 * 55 * Obviously it is important that the sentinel value for exclusive locks 56 * doesn't conflict with the maximum number of possible share lockers - 57 * luckily MAX_BACKENDS makes that easily possible. 58 * 59 * 60 * The attentive reader might have noticed that naively doing the above has a 61 * glaring race condition: We try to lock using the atomic operations and 62 * notice that we have to wait. Unfortunately by the time we have finished 63 * queuing, the former locker very well might have already finished it's 64 * work. That's problematic because we're now stuck waiting inside the OS. 65 66 * To mitigate those races we use a two phased attempt at locking: 67 * Phase 1: Try to do it atomically, if we succeed, nice 68 * Phase 2: Add ourselves to the waitqueue of the lock 69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from 70 * the queue 71 * Phase 4: Sleep till wake-up, goto Phase 1 72 * 73 * This protects us against the problem from above as nobody can release too 74 * quick, before we're queued, since after Phase 2 we're already queued. 75 * ------------------------------------------------------------------------- 76 */ 77 #include "postgres.h" 78 79 #include "miscadmin.h" 80 #include "pg_trace.h" 81 #include "pgstat.h" 82 #include "postmaster/postmaster.h" 83 #include "replication/slot.h" 84 #include "storage/ipc.h" 85 #include "storage/predicate.h" 86 #include "storage/proc.h" 87 #include "storage/proclist.h" 88 #include "storage/spin.h" 89 #include "utils/memutils.h" 90 91 #ifdef LWLOCK_STATS 92 #include "utils/hsearch.h" 93 #endif 94 95 96 /* We use the ShmemLock spinlock to protect LWLockCounter */ 97 extern slock_t *ShmemLock; 98 99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30) 100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29) 101 #define LW_FLAG_LOCKED ((uint32) 1 << 28) 102 103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24) 104 #define LW_VAL_SHARED 1 105 106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1)) 107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */ 108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1)) 109 110 /* 111 * There are three sorts of LWLock "tranches": 112 * 113 * 1. The individually-named locks defined in lwlocknames.h each have their 114 * own tranche. The names of these tranches appear in IndividualLWLockNames[] 115 * in lwlocknames.c. 116 * 117 * 2. There are some predefined tranches for built-in groups of locks. 118 * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names 119 * appear in BuiltinTrancheNames[] below. 120 * 121 * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche 122 * or LWLockRegisterTranche. The names of these that are known in the current 123 * process appear in LWLockTrancheNames[]. 124 * 125 * All these names are user-visible as wait event names, so choose with care 126 * ... and do not forget to update the documentation's list of wait events. 127 */ 128 extern const char *const IndividualLWLockNames[]; /* in lwlocknames.c */ 129 130 static const char *const BuiltinTrancheNames[] = { 131 /* LWTRANCHE_XACT_BUFFER: */ 132 "XactBuffer", 133 /* LWTRANCHE_COMMITTS_BUFFER: */ 134 "CommitTSBuffer", 135 /* LWTRANCHE_SUBTRANS_BUFFER: */ 136 "SubtransBuffer", 137 /* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */ 138 "MultiXactOffsetBuffer", 139 /* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */ 140 "MultiXactMemberBuffer", 141 /* LWTRANCHE_NOTIFY_BUFFER: */ 142 "NotifyBuffer", 143 /* LWTRANCHE_SERIAL_BUFFER: */ 144 "SerialBuffer", 145 /* LWTRANCHE_WAL_INSERT: */ 146 "WALInsert", 147 /* LWTRANCHE_BUFFER_CONTENT: */ 148 "BufferContent", 149 /* LWTRANCHE_REPLICATION_ORIGIN_STATE: */ 150 "ReplicationOriginState", 151 /* LWTRANCHE_REPLICATION_SLOT_IO: */ 152 "ReplicationSlotIO", 153 /* LWTRANCHE_LOCK_FASTPATH: */ 154 "LockFastPath", 155 /* LWTRANCHE_BUFFER_MAPPING: */ 156 "BufferMapping", 157 /* LWTRANCHE_LOCK_MANAGER: */ 158 "LockManager", 159 /* LWTRANCHE_PREDICATE_LOCK_MANAGER: */ 160 "PredicateLockManager", 161 /* LWTRANCHE_PARALLEL_HASH_JOIN: */ 162 "ParallelHashJoin", 163 /* LWTRANCHE_PARALLEL_QUERY_DSA: */ 164 "ParallelQueryDSA", 165 /* LWTRANCHE_PER_SESSION_DSA: */ 166 "PerSessionDSA", 167 /* LWTRANCHE_PER_SESSION_RECORD_TYPE: */ 168 "PerSessionRecordType", 169 /* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */ 170 "PerSessionRecordTypmod", 171 /* LWTRANCHE_SHARED_TUPLESTORE: */ 172 "SharedTupleStore", 173 /* LWTRANCHE_SHARED_TIDBITMAP: */ 174 "SharedTidBitmap", 175 /* LWTRANCHE_PARALLEL_APPEND: */ 176 "ParallelAppend", 177 /* LWTRANCHE_PER_XACT_PREDICATE_LIST: */ 178 "PerXactPredicateList" 179 }; 180 181 StaticAssertDecl(lengthof(BuiltinTrancheNames) == 182 LWTRANCHE_FIRST_USER_DEFINED - NUM_INDIVIDUAL_LWLOCKS, 183 "missing entries in BuiltinTrancheNames[]"); 184 185 /* 186 * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and 187 * stores the names of all dynamically-created tranches known to the current 188 * process. Any unused entries in the array will contain NULL. 189 */ 190 static const char **LWLockTrancheNames = NULL; 191 static int LWLockTrancheNamesAllocated = 0; 192 193 /* 194 * This points to the main array of LWLocks in shared memory. Backends inherit 195 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case, 196 * where we have special measures to pass it down). 197 */ 198 LWLockPadded *MainLWLockArray = NULL; 199 200 /* 201 * We use this structure to keep track of locked LWLocks for release 202 * during error recovery. Normally, only a few will be held at once, but 203 * occasionally the number can be much higher; for example, the pg_buffercache 204 * extension locks all buffer partitions simultaneously. 205 */ 206 #define MAX_SIMUL_LWLOCKS 200 207 208 /* struct representing the LWLocks we're holding */ 209 typedef struct LWLockHandle 210 { 211 LWLock *lock; 212 LWLockMode mode; 213 } LWLockHandle; 214 215 static int num_held_lwlocks = 0; 216 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; 217 218 /* struct representing the LWLock tranche request for named tranche */ 219 typedef struct NamedLWLockTrancheRequest 220 { 221 char tranche_name[NAMEDATALEN]; 222 int num_lwlocks; 223 } NamedLWLockTrancheRequest; 224 225 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL; 226 static int NamedLWLockTrancheRequestsAllocated = 0; 227 228 /* 229 * NamedLWLockTrancheRequests is both the valid length of the request array, 230 * and the length of the shared-memory NamedLWLockTrancheArray later on. 231 * This variable and NamedLWLockTrancheArray are non-static so that 232 * postmaster.c can copy them to child processes in EXEC_BACKEND builds. 233 */ 234 int NamedLWLockTrancheRequests = 0; 235 236 /* points to data in shared memory: */ 237 NamedLWLockTranche *NamedLWLockTrancheArray = NULL; 238 239 static bool lock_named_request_allowed = true; 240 241 static void InitializeLWLocks(void); 242 static inline void LWLockReportWaitStart(LWLock *lock); 243 static inline void LWLockReportWaitEnd(void); 244 static const char *GetLWTrancheName(uint16 trancheId); 245 246 #define T_NAME(lock) \ 247 GetLWTrancheName((lock)->tranche) 248 249 #ifdef LWLOCK_STATS 250 typedef struct lwlock_stats_key 251 { 252 int tranche; 253 void *instance; 254 } lwlock_stats_key; 255 256 typedef struct lwlock_stats 257 { 258 lwlock_stats_key key; 259 int sh_acquire_count; 260 int ex_acquire_count; 261 int block_count; 262 int dequeue_self_count; 263 int spin_delay_count; 264 } lwlock_stats; 265 266 static HTAB *lwlock_stats_htab; 267 static lwlock_stats lwlock_stats_dummy; 268 #endif 269 270 #ifdef LOCK_DEBUG 271 bool Trace_lwlocks = false; 272 273 inline static void 274 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode) 275 { 276 /* hide statement & context here, otherwise the log is just too verbose */ 277 if (Trace_lwlocks) 278 { 279 uint32 state = pg_atomic_read_u32(&lock->state); 280 281 ereport(LOG, 282 (errhidestmt(true), 283 errhidecontext(true), 284 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d", 285 MyProcPid, 286 where, T_NAME(lock), lock, 287 (state & LW_VAL_EXCLUSIVE) != 0, 288 state & LW_SHARED_MASK, 289 (state & LW_FLAG_HAS_WAITERS) != 0, 290 pg_atomic_read_u32(&lock->nwaiters), 291 (state & LW_FLAG_RELEASE_OK) != 0))); 292 } 293 } 294 295 inline static void 296 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg) 297 { 298 /* hide statement & context here, otherwise the log is just too verbose */ 299 if (Trace_lwlocks) 300 { 301 ereport(LOG, 302 (errhidestmt(true), 303 errhidecontext(true), 304 errmsg_internal("%s(%s %p): %s", where, 305 T_NAME(lock), lock, msg))); 306 } 307 } 308 309 #else /* not LOCK_DEBUG */ 310 #define PRINT_LWDEBUG(a,b,c) ((void)0) 311 #define LOG_LWDEBUG(a,b,c) ((void)0) 312 #endif /* LOCK_DEBUG */ 313 314 #ifdef LWLOCK_STATS 315 316 static void init_lwlock_stats(void); 317 static void print_lwlock_stats(int code, Datum arg); 318 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock); 319 320 static void 321 init_lwlock_stats(void) 322 { 323 HASHCTL ctl; 324 static MemoryContext lwlock_stats_cxt = NULL; 325 static bool exit_registered = false; 326 327 if (lwlock_stats_cxt != NULL) 328 MemoryContextDelete(lwlock_stats_cxt); 329 330 /* 331 * The LWLock stats will be updated within a critical section, which 332 * requires allocating new hash entries. Allocations within a critical 333 * section are normally not allowed because running out of memory would 334 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally 335 * turned on in production, so that's an acceptable risk. The hash entries 336 * are small, so the risk of running out of memory is minimal in practice. 337 */ 338 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext, 339 "LWLock stats", 340 ALLOCSET_DEFAULT_SIZES); 341 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true); 342 343 ctl.keysize = sizeof(lwlock_stats_key); 344 ctl.entrysize = sizeof(lwlock_stats); 345 ctl.hcxt = lwlock_stats_cxt; 346 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl, 347 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); 348 if (!exit_registered) 349 { 350 on_shmem_exit(print_lwlock_stats, 0); 351 exit_registered = true; 352 } 353 } 354 355 static void 356 print_lwlock_stats(int code, Datum arg) 357 { 358 HASH_SEQ_STATUS scan; 359 lwlock_stats *lwstats; 360 361 hash_seq_init(&scan, lwlock_stats_htab); 362 363 /* Grab an LWLock to keep different backends from mixing reports */ 364 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE); 365 366 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL) 367 { 368 fprintf(stderr, 369 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n", 370 MyProcPid, GetLWTrancheName(lwstats->key.tranche), 371 lwstats->key.instance, lwstats->sh_acquire_count, 372 lwstats->ex_acquire_count, lwstats->block_count, 373 lwstats->spin_delay_count, lwstats->dequeue_self_count); 374 } 375 376 LWLockRelease(&MainLWLockArray[0].lock); 377 } 378 379 static lwlock_stats * 380 get_lwlock_stats_entry(LWLock *lock) 381 { 382 lwlock_stats_key key; 383 lwlock_stats *lwstats; 384 bool found; 385 386 /* 387 * During shared memory initialization, the hash table doesn't exist yet. 388 * Stats of that phase aren't very interesting, so just collect operations 389 * on all locks in a single dummy entry. 390 */ 391 if (lwlock_stats_htab == NULL) 392 return &lwlock_stats_dummy; 393 394 /* Fetch or create the entry. */ 395 MemSet(&key, 0, sizeof(key)); 396 key.tranche = lock->tranche; 397 key.instance = lock; 398 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found); 399 if (!found) 400 { 401 lwstats->sh_acquire_count = 0; 402 lwstats->ex_acquire_count = 0; 403 lwstats->block_count = 0; 404 lwstats->dequeue_self_count = 0; 405 lwstats->spin_delay_count = 0; 406 } 407 return lwstats; 408 } 409 #endif /* LWLOCK_STATS */ 410 411 412 /* 413 * Compute number of LWLocks required by named tranches. These will be 414 * allocated in the main array. 415 */ 416 static int 417 NumLWLocksForNamedTranches(void) 418 { 419 int numLocks = 0; 420 int i; 421 422 for (i = 0; i < NamedLWLockTrancheRequests; i++) 423 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks; 424 425 return numLocks; 426 } 427 428 /* 429 * Compute shmem space needed for LWLocks and named tranches. 430 */ 431 Size 432 LWLockShmemSize(void) 433 { 434 Size size; 435 int i; 436 int numLocks = NUM_FIXED_LWLOCKS; 437 438 /* Calculate total number of locks needed in the main array. */ 439 numLocks += NumLWLocksForNamedTranches(); 440 441 /* Space for the LWLock array. */ 442 size = mul_size(numLocks, sizeof(LWLockPadded)); 443 444 /* Space for dynamic allocation counter, plus room for alignment. */ 445 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE); 446 447 /* space for named tranches. */ 448 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche))); 449 450 /* space for name of each tranche. */ 451 for (i = 0; i < NamedLWLockTrancheRequests; i++) 452 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1); 453 454 /* Disallow adding any more named tranches. */ 455 lock_named_request_allowed = false; 456 457 return size; 458 } 459 460 /* 461 * Allocate shmem space for the main LWLock array and all tranches and 462 * initialize it. We also register extension LWLock tranches here. 463 */ 464 void 465 CreateLWLocks(void) 466 { 467 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS, 468 "MAX_BACKENDS too big for lwlock.c"); 469 470 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_PADDED_SIZE, 471 "Miscalculated LWLock padding"); 472 473 if (!IsUnderPostmaster) 474 { 475 Size spaceLocks = LWLockShmemSize(); 476 int *LWLockCounter; 477 char *ptr; 478 479 /* Allocate space */ 480 ptr = (char *) ShmemAlloc(spaceLocks); 481 482 /* Leave room for dynamic allocation of tranches */ 483 ptr += sizeof(int); 484 485 /* Ensure desired alignment of LWLock array */ 486 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; 487 488 MainLWLockArray = (LWLockPadded *) ptr; 489 490 /* 491 * Initialize the dynamic-allocation counter for tranches, which is 492 * stored just before the first LWLock. 493 */ 494 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); 495 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED; 496 497 /* Initialize all LWLocks */ 498 InitializeLWLocks(); 499 } 500 501 /* Register named extension LWLock tranches in the current process. */ 502 for (int i = 0; i < NamedLWLockTrancheRequests; i++) 503 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId, 504 NamedLWLockTrancheArray[i].trancheName); 505 } 506 507 /* 508 * Initialize LWLocks that are fixed and those belonging to named tranches. 509 */ 510 static void 511 InitializeLWLocks(void) 512 { 513 int numNamedLocks = NumLWLocksForNamedTranches(); 514 int id; 515 int i; 516 int j; 517 LWLockPadded *lock; 518 519 /* Initialize all individual LWLocks in main array */ 520 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++) 521 LWLockInitialize(&lock->lock, id); 522 523 /* Initialize buffer mapping LWLocks in main array */ 524 lock = MainLWLockArray + BUFFER_MAPPING_LWLOCK_OFFSET; 525 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++) 526 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING); 527 528 /* Initialize lmgrs' LWLocks in main array */ 529 lock = MainLWLockArray + LOCK_MANAGER_LWLOCK_OFFSET; 530 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++) 531 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER); 532 533 /* Initialize predicate lmgrs' LWLocks in main array */ 534 lock = MainLWLockArray + PREDICATELOCK_MANAGER_LWLOCK_OFFSET; 535 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++) 536 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER); 537 538 /* 539 * Copy the info about any named tranches into shared memory (so that 540 * other processes can see it), and initialize the requested LWLocks. 541 */ 542 if (NamedLWLockTrancheRequests > 0) 543 { 544 char *trancheNames; 545 546 NamedLWLockTrancheArray = (NamedLWLockTranche *) 547 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks]; 548 549 trancheNames = (char *) NamedLWLockTrancheArray + 550 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche)); 551 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS]; 552 553 for (i = 0; i < NamedLWLockTrancheRequests; i++) 554 { 555 NamedLWLockTrancheRequest *request; 556 NamedLWLockTranche *tranche; 557 char *name; 558 559 request = &NamedLWLockTrancheRequestArray[i]; 560 tranche = &NamedLWLockTrancheArray[i]; 561 562 name = trancheNames; 563 trancheNames += strlen(request->tranche_name) + 1; 564 strcpy(name, request->tranche_name); 565 tranche->trancheId = LWLockNewTrancheId(); 566 tranche->trancheName = name; 567 568 for (j = 0; j < request->num_lwlocks; j++, lock++) 569 LWLockInitialize(&lock->lock, tranche->trancheId); 570 } 571 } 572 } 573 574 /* 575 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks 576 */ 577 void 578 InitLWLockAccess(void) 579 { 580 #ifdef LWLOCK_STATS 581 init_lwlock_stats(); 582 #endif 583 } 584 585 /* 586 * GetNamedLWLockTranche - returns the base address of LWLock from the 587 * specified tranche. 588 * 589 * Caller needs to retrieve the requested number of LWLocks starting from 590 * the base lock address returned by this API. This can be used for 591 * tranches that are requested by using RequestNamedLWLockTranche() API. 592 */ 593 LWLockPadded * 594 GetNamedLWLockTranche(const char *tranche_name) 595 { 596 int lock_pos; 597 int i; 598 599 /* 600 * Obtain the position of base address of LWLock belonging to requested 601 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed 602 * in MainLWLockArray after fixed locks. 603 */ 604 lock_pos = NUM_FIXED_LWLOCKS; 605 for (i = 0; i < NamedLWLockTrancheRequests; i++) 606 { 607 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name, 608 tranche_name) == 0) 609 return &MainLWLockArray[lock_pos]; 610 611 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks; 612 } 613 614 elog(ERROR, "requested tranche is not registered"); 615 616 /* just to keep compiler quiet */ 617 return NULL; 618 } 619 620 /* 621 * Allocate a new tranche ID. 622 */ 623 int 624 LWLockNewTrancheId(void) 625 { 626 int result; 627 int *LWLockCounter; 628 629 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); 630 SpinLockAcquire(ShmemLock); 631 result = (*LWLockCounter)++; 632 SpinLockRelease(ShmemLock); 633 634 return result; 635 } 636 637 /* 638 * Register a dynamic tranche name in the lookup table of the current process. 639 * 640 * This routine will save a pointer to the tranche name passed as an argument, 641 * so the name should be allocated in a backend-lifetime context 642 * (shared memory, TopMemoryContext, static constant, or similar). 643 * 644 * The tranche name will be user-visible as a wait event name, so try to 645 * use a name that fits the style for those. 646 */ 647 void 648 LWLockRegisterTranche(int tranche_id, const char *tranche_name) 649 { 650 /* This should only be called for user-defined tranches. */ 651 if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED) 652 return; 653 654 /* Convert to array index. */ 655 tranche_id -= LWTRANCHE_FIRST_USER_DEFINED; 656 657 /* If necessary, create or enlarge array. */ 658 if (tranche_id >= LWLockTrancheNamesAllocated) 659 { 660 int newalloc; 661 662 newalloc = Max(LWLockTrancheNamesAllocated, 8); 663 while (newalloc <= tranche_id) 664 newalloc *= 2; 665 666 if (LWLockTrancheNames == NULL) 667 LWLockTrancheNames = (const char **) 668 MemoryContextAllocZero(TopMemoryContext, 669 newalloc * sizeof(char *)); 670 else 671 { 672 LWLockTrancheNames = (const char **) 673 repalloc(LWLockTrancheNames, newalloc * sizeof(char *)); 674 memset(LWLockTrancheNames + LWLockTrancheNamesAllocated, 675 0, 676 (newalloc - LWLockTrancheNamesAllocated) * sizeof(char *)); 677 } 678 LWLockTrancheNamesAllocated = newalloc; 679 } 680 681 LWLockTrancheNames[tranche_id] = tranche_name; 682 } 683 684 /* 685 * RequestNamedLWLockTranche 686 * Request that extra LWLocks be allocated during postmaster 687 * startup. 688 * 689 * This is only useful for extensions if called from the _PG_init hook 690 * of a library that is loaded into the postmaster via 691 * shared_preload_libraries. Once shared memory has been allocated, calls 692 * will be ignored. (We could raise an error, but it seems better to make 693 * it a no-op, so that libraries containing such calls can be reloaded if 694 * needed.) 695 * 696 * The tranche name will be user-visible as a wait event name, so try to 697 * use a name that fits the style for those. 698 */ 699 void 700 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks) 701 { 702 NamedLWLockTrancheRequest *request; 703 704 if (IsUnderPostmaster || !lock_named_request_allowed) 705 return; /* too late */ 706 707 if (NamedLWLockTrancheRequestArray == NULL) 708 { 709 NamedLWLockTrancheRequestsAllocated = 16; 710 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) 711 MemoryContextAlloc(TopMemoryContext, 712 NamedLWLockTrancheRequestsAllocated 713 * sizeof(NamedLWLockTrancheRequest)); 714 } 715 716 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated) 717 { 718 int i = NamedLWLockTrancheRequestsAllocated; 719 720 while (i <= NamedLWLockTrancheRequests) 721 i *= 2; 722 723 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) 724 repalloc(NamedLWLockTrancheRequestArray, 725 i * sizeof(NamedLWLockTrancheRequest)); 726 NamedLWLockTrancheRequestsAllocated = i; 727 } 728 729 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests]; 730 Assert(strlen(tranche_name) + 1 <= NAMEDATALEN); 731 strlcpy(request->tranche_name, tranche_name, NAMEDATALEN); 732 request->num_lwlocks = num_lwlocks; 733 NamedLWLockTrancheRequests++; 734 } 735 736 /* 737 * LWLockInitialize - initialize a new lwlock; it's initially unlocked 738 */ 739 void 740 LWLockInitialize(LWLock *lock, int tranche_id) 741 { 742 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); 743 #ifdef LOCK_DEBUG 744 pg_atomic_init_u32(&lock->nwaiters, 0); 745 #endif 746 lock->tranche = tranche_id; 747 proclist_init(&lock->waiters); 748 } 749 750 /* 751 * Report start of wait event for light-weight locks. 752 * 753 * This function will be used by all the light-weight lock calls which 754 * needs to wait to acquire the lock. This function distinguishes wait 755 * event based on tranche and lock id. 756 */ 757 static inline void 758 LWLockReportWaitStart(LWLock *lock) 759 { 760 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche); 761 } 762 763 /* 764 * Report end of wait event for light-weight locks. 765 */ 766 static inline void 767 LWLockReportWaitEnd(void) 768 { 769 pgstat_report_wait_end(); 770 } 771 772 /* 773 * Return the name of an LWLock tranche. 774 */ 775 static const char * 776 GetLWTrancheName(uint16 trancheId) 777 { 778 /* Individual LWLock? */ 779 if (trancheId < NUM_INDIVIDUAL_LWLOCKS) 780 return IndividualLWLockNames[trancheId]; 781 782 /* Built-in tranche? */ 783 if (trancheId < LWTRANCHE_FIRST_USER_DEFINED) 784 return BuiltinTrancheNames[trancheId - NUM_INDIVIDUAL_LWLOCKS]; 785 786 /* 787 * It's an extension tranche, so look in LWLockTrancheNames[]. However, 788 * it's possible that the tranche has never been registered in the current 789 * process, in which case give up and return "extension". 790 */ 791 trancheId -= LWTRANCHE_FIRST_USER_DEFINED; 792 793 if (trancheId >= LWLockTrancheNamesAllocated || 794 LWLockTrancheNames[trancheId] == NULL) 795 return "extension"; 796 797 return LWLockTrancheNames[trancheId]; 798 } 799 800 /* 801 * Return an identifier for an LWLock based on the wait class and event. 802 */ 803 const char * 804 GetLWLockIdentifier(uint32 classId, uint16 eventId) 805 { 806 Assert(classId == PG_WAIT_LWLOCK); 807 /* The event IDs are just tranche numbers. */ 808 return GetLWTrancheName(eventId); 809 } 810 811 /* 812 * Internal function that tries to atomically acquire the lwlock in the passed 813 * in mode. 814 * 815 * This function will not block waiting for a lock to become free - that's the 816 * callers job. 817 * 818 * Returns true if the lock isn't free and we need to wait. 819 */ 820 static bool 821 LWLockAttemptLock(LWLock *lock, LWLockMode mode) 822 { 823 uint32 old_state; 824 825 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED); 826 827 /* 828 * Read once outside the loop, later iterations will get the newer value 829 * via compare & exchange. 830 */ 831 old_state = pg_atomic_read_u32(&lock->state); 832 833 /* loop until we've determined whether we could acquire the lock or not */ 834 while (true) 835 { 836 uint32 desired_state; 837 bool lock_free; 838 839 desired_state = old_state; 840 841 if (mode == LW_EXCLUSIVE) 842 { 843 lock_free = (old_state & LW_LOCK_MASK) == 0; 844 if (lock_free) 845 desired_state += LW_VAL_EXCLUSIVE; 846 } 847 else 848 { 849 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0; 850 if (lock_free) 851 desired_state += LW_VAL_SHARED; 852 } 853 854 /* 855 * Attempt to swap in the state we are expecting. If we didn't see 856 * lock to be free, that's just the old value. If we saw it as free, 857 * we'll attempt to mark it acquired. The reason that we always swap 858 * in the value is that this doubles as a memory barrier. We could try 859 * to be smarter and only swap in values if we saw the lock as free, 860 * but benchmark haven't shown it as beneficial so far. 861 * 862 * Retry if the value changed since we last looked at it. 863 */ 864 if (pg_atomic_compare_exchange_u32(&lock->state, 865 &old_state, desired_state)) 866 { 867 if (lock_free) 868 { 869 /* Great! Got the lock. */ 870 #ifdef LOCK_DEBUG 871 if (mode == LW_EXCLUSIVE) 872 lock->owner = MyProc; 873 #endif 874 return false; 875 } 876 else 877 return true; /* somebody else has the lock */ 878 } 879 } 880 pg_unreachable(); 881 } 882 883 /* 884 * Lock the LWLock's wait list against concurrent activity. 885 * 886 * NB: even though the wait list is locked, non-conflicting lock operations 887 * may still happen concurrently. 888 * 889 * Time spent holding mutex should be short! 890 */ 891 static void 892 LWLockWaitListLock(LWLock *lock) 893 { 894 uint32 old_state; 895 #ifdef LWLOCK_STATS 896 lwlock_stats *lwstats; 897 uint32 delays = 0; 898 899 lwstats = get_lwlock_stats_entry(lock); 900 #endif 901 902 while (true) 903 { 904 /* always try once to acquire lock directly */ 905 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED); 906 if (!(old_state & LW_FLAG_LOCKED)) 907 break; /* got lock */ 908 909 /* and then spin without atomic operations until lock is released */ 910 { 911 SpinDelayStatus delayStatus; 912 913 init_local_spin_delay(&delayStatus); 914 915 while (old_state & LW_FLAG_LOCKED) 916 { 917 perform_spin_delay(&delayStatus); 918 old_state = pg_atomic_read_u32(&lock->state); 919 } 920 #ifdef LWLOCK_STATS 921 delays += delayStatus.delays; 922 #endif 923 finish_spin_delay(&delayStatus); 924 } 925 926 /* 927 * Retry. The lock might obviously already be re-acquired by the time 928 * we're attempting to get it again. 929 */ 930 } 931 932 #ifdef LWLOCK_STATS 933 lwstats->spin_delay_count += delays; 934 #endif 935 } 936 937 /* 938 * Unlock the LWLock's wait list. 939 * 940 * Note that it can be more efficient to manipulate flags and release the 941 * locks in a single atomic operation. 942 */ 943 static void 944 LWLockWaitListUnlock(LWLock *lock) 945 { 946 uint32 old_state PG_USED_FOR_ASSERTS_ONLY; 947 948 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED); 949 950 Assert(old_state & LW_FLAG_LOCKED); 951 } 952 953 /* 954 * Wakeup all the lockers that currently have a chance to acquire the lock. 955 */ 956 static void 957 LWLockWakeup(LWLock *lock) 958 { 959 bool new_release_ok; 960 bool wokeup_somebody = false; 961 proclist_head wakeup; 962 proclist_mutable_iter iter; 963 964 proclist_init(&wakeup); 965 966 new_release_ok = true; 967 968 /* lock wait list while collecting backends to wake up */ 969 LWLockWaitListLock(lock); 970 971 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 972 { 973 PGPROC *waiter = GetPGProcByNumber(iter.cur); 974 975 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) 976 continue; 977 978 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 979 proclist_push_tail(&wakeup, iter.cur, lwWaitLink); 980 981 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) 982 { 983 /* 984 * Prevent additional wakeups until retryer gets to run. Backends 985 * that are just waiting for the lock to become free don't retry 986 * automatically. 987 */ 988 new_release_ok = false; 989 990 /* 991 * Don't wakeup (further) exclusive locks. 992 */ 993 wokeup_somebody = true; 994 } 995 996 /* 997 * Once we've woken up an exclusive lock, there's no point in waking 998 * up anybody else. 999 */ 1000 if (waiter->lwWaitMode == LW_EXCLUSIVE) 1001 break; 1002 } 1003 1004 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS); 1005 1006 /* unset required flags, and release lock, in one fell swoop */ 1007 { 1008 uint32 old_state; 1009 uint32 desired_state; 1010 1011 old_state = pg_atomic_read_u32(&lock->state); 1012 while (true) 1013 { 1014 desired_state = old_state; 1015 1016 /* compute desired flags */ 1017 1018 if (new_release_ok) 1019 desired_state |= LW_FLAG_RELEASE_OK; 1020 else 1021 desired_state &= ~LW_FLAG_RELEASE_OK; 1022 1023 if (proclist_is_empty(&wakeup)) 1024 desired_state &= ~LW_FLAG_HAS_WAITERS; 1025 1026 desired_state &= ~LW_FLAG_LOCKED; /* release lock */ 1027 1028 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state, 1029 desired_state)) 1030 break; 1031 } 1032 } 1033 1034 /* Awaken any waiters I removed from the queue. */ 1035 proclist_foreach_modify(iter, &wakeup, lwWaitLink) 1036 { 1037 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1038 1039 LOG_LWDEBUG("LWLockRelease", lock, "release waiter"); 1040 proclist_delete(&wakeup, iter.cur, lwWaitLink); 1041 1042 /* 1043 * Guarantee that lwWaiting being unset only becomes visible once the 1044 * unlink from the link has completed. Otherwise the target backend 1045 * could be woken up for other reason and enqueue for a new lock - if 1046 * that happens before the list unlink happens, the list would end up 1047 * being corrupted. 1048 * 1049 * The barrier pairs with the LWLockWaitListLock() when enqueuing for 1050 * another lock. 1051 */ 1052 pg_write_barrier(); 1053 waiter->lwWaiting = false; 1054 PGSemaphoreUnlock(waiter->sem); 1055 } 1056 } 1057 1058 /* 1059 * Add ourselves to the end of the queue. 1060 * 1061 * NB: Mode can be LW_WAIT_UNTIL_FREE here! 1062 */ 1063 static void 1064 LWLockQueueSelf(LWLock *lock, LWLockMode mode) 1065 { 1066 /* 1067 * If we don't have a PGPROC structure, there's no way to wait. This 1068 * should never occur, since MyProc should only be null during shared 1069 * memory initialization. 1070 */ 1071 if (MyProc == NULL) 1072 elog(PANIC, "cannot wait without a PGPROC structure"); 1073 1074 if (MyProc->lwWaiting) 1075 elog(PANIC, "queueing for lock while waiting on another one"); 1076 1077 LWLockWaitListLock(lock); 1078 1079 /* setting the flag is protected by the spinlock */ 1080 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS); 1081 1082 MyProc->lwWaiting = true; 1083 MyProc->lwWaitMode = mode; 1084 1085 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */ 1086 if (mode == LW_WAIT_UNTIL_FREE) 1087 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink); 1088 else 1089 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink); 1090 1091 /* Can release the mutex now */ 1092 LWLockWaitListUnlock(lock); 1093 1094 #ifdef LOCK_DEBUG 1095 pg_atomic_fetch_add_u32(&lock->nwaiters, 1); 1096 #endif 1097 1098 } 1099 1100 /* 1101 * Remove ourselves from the waitlist. 1102 * 1103 * This is used if we queued ourselves because we thought we needed to sleep 1104 * but, after further checking, we discovered that we don't actually need to 1105 * do so. 1106 */ 1107 static void 1108 LWLockDequeueSelf(LWLock *lock) 1109 { 1110 bool found = false; 1111 proclist_mutable_iter iter; 1112 1113 #ifdef LWLOCK_STATS 1114 lwlock_stats *lwstats; 1115 1116 lwstats = get_lwlock_stats_entry(lock); 1117 1118 lwstats->dequeue_self_count++; 1119 #endif 1120 1121 LWLockWaitListLock(lock); 1122 1123 /* 1124 * Can't just remove ourselves from the list, but we need to iterate over 1125 * all entries as somebody else could have dequeued us. 1126 */ 1127 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 1128 { 1129 if (iter.cur == MyProc->pgprocno) 1130 { 1131 found = true; 1132 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 1133 break; 1134 } 1135 } 1136 1137 if (proclist_is_empty(&lock->waiters) && 1138 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) 1139 { 1140 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); 1141 } 1142 1143 /* XXX: combine with fetch_and above? */ 1144 LWLockWaitListUnlock(lock); 1145 1146 /* clear waiting state again, nice for debugging */ 1147 if (found) 1148 MyProc->lwWaiting = false; 1149 else 1150 { 1151 int extraWaits = 0; 1152 1153 /* 1154 * Somebody else dequeued us and has or will wake us up. Deal with the 1155 * superfluous absorption of a wakeup. 1156 */ 1157 1158 /* 1159 * Reset RELEASE_OK flag if somebody woke us before we removed 1160 * ourselves - they'll have set it to false. 1161 */ 1162 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1163 1164 /* 1165 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would 1166 * get reset at some inconvenient point later. Most of the time this 1167 * will immediately return. 1168 */ 1169 for (;;) 1170 { 1171 PGSemaphoreLock(MyProc->sem); 1172 if (!MyProc->lwWaiting) 1173 break; 1174 extraWaits++; 1175 } 1176 1177 /* 1178 * Fix the process wait semaphore's count for any absorbed wakeups. 1179 */ 1180 while (extraWaits-- > 0) 1181 PGSemaphoreUnlock(MyProc->sem); 1182 } 1183 1184 #ifdef LOCK_DEBUG 1185 { 1186 /* not waiting anymore */ 1187 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1188 1189 Assert(nwaiters < MAX_BACKENDS); 1190 } 1191 #endif 1192 } 1193 1194 /* 1195 * LWLockAcquire - acquire a lightweight lock in the specified mode 1196 * 1197 * If the lock is not available, sleep until it is. Returns true if the lock 1198 * was available immediately, false if we had to sleep. 1199 * 1200 * Side effect: cancel/die interrupts are held off until lock release. 1201 */ 1202 bool 1203 LWLockAcquire(LWLock *lock, LWLockMode mode) 1204 { 1205 PGPROC *proc = MyProc; 1206 bool result = true; 1207 int extraWaits = 0; 1208 #ifdef LWLOCK_STATS 1209 lwlock_stats *lwstats; 1210 1211 lwstats = get_lwlock_stats_entry(lock); 1212 #endif 1213 1214 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1215 1216 PRINT_LWDEBUG("LWLockAcquire", lock, mode); 1217 1218 #ifdef LWLOCK_STATS 1219 /* Count lock acquisition attempts */ 1220 if (mode == LW_EXCLUSIVE) 1221 lwstats->ex_acquire_count++; 1222 else 1223 lwstats->sh_acquire_count++; 1224 #endif /* LWLOCK_STATS */ 1225 1226 /* 1227 * We can't wait if we haven't got a PGPROC. This should only occur 1228 * during bootstrap or shared memory initialization. Put an Assert here 1229 * to catch unsafe coding practices. 1230 */ 1231 Assert(!(proc == NULL && IsUnderPostmaster)); 1232 1233 /* Ensure we will have room to remember the lock */ 1234 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1235 elog(ERROR, "too many LWLocks taken"); 1236 1237 /* 1238 * Lock out cancel/die interrupts until we exit the code section protected 1239 * by the LWLock. This ensures that interrupts will not interfere with 1240 * manipulations of data structures in shared memory. 1241 */ 1242 HOLD_INTERRUPTS(); 1243 1244 /* 1245 * Loop here to try to acquire lock after each time we are signaled by 1246 * LWLockRelease. 1247 * 1248 * NOTE: it might seem better to have LWLockRelease actually grant us the 1249 * lock, rather than retrying and possibly having to go back to sleep. But 1250 * in practice that is no good because it means a process swap for every 1251 * lock acquisition when two or more processes are contending for the same 1252 * lock. Since LWLocks are normally used to protect not-very-long 1253 * sections of computation, a process needs to be able to acquire and 1254 * release the same lock many times during a single CPU time slice, even 1255 * in the presence of contention. The efficiency of being able to do that 1256 * outweighs the inefficiency of sometimes wasting a process dispatch 1257 * cycle because the lock is not free when a released waiter finally gets 1258 * to run. See pgsql-hackers archives for 29-Dec-01. 1259 */ 1260 for (;;) 1261 { 1262 bool mustwait; 1263 1264 /* 1265 * Try to grab the lock the first time, we're not in the waitqueue 1266 * yet/anymore. 1267 */ 1268 mustwait = LWLockAttemptLock(lock, mode); 1269 1270 if (!mustwait) 1271 { 1272 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock"); 1273 break; /* got the lock */ 1274 } 1275 1276 /* 1277 * Ok, at this point we couldn't grab the lock on the first try. We 1278 * cannot simply queue ourselves to the end of the list and wait to be 1279 * woken up because by now the lock could long have been released. 1280 * Instead add us to the queue and try to grab the lock again. If we 1281 * succeed we need to revert the queuing and be happy, otherwise we 1282 * recheck the lock. If we still couldn't grab it, we know that the 1283 * other locker will see our queue entries when releasing since they 1284 * existed before we checked for the lock. 1285 */ 1286 1287 /* add to the queue */ 1288 LWLockQueueSelf(lock, mode); 1289 1290 /* we're now guaranteed to be woken up if necessary */ 1291 mustwait = LWLockAttemptLock(lock, mode); 1292 1293 /* ok, grabbed the lock the second time round, need to undo queueing */ 1294 if (!mustwait) 1295 { 1296 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue"); 1297 1298 LWLockDequeueSelf(lock); 1299 break; 1300 } 1301 1302 /* 1303 * Wait until awakened. 1304 * 1305 * It is possible that we get awakened for a reason other than being 1306 * signaled by LWLockRelease. If so, loop back and wait again. Once 1307 * we've gotten the LWLock, re-increment the sema by the number of 1308 * additional signals received. 1309 */ 1310 LOG_LWDEBUG("LWLockAcquire", lock, "waiting"); 1311 1312 #ifdef LWLOCK_STATS 1313 lwstats->block_count++; 1314 #endif 1315 1316 LWLockReportWaitStart(lock); 1317 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED()) 1318 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); 1319 1320 for (;;) 1321 { 1322 PGSemaphoreLock(proc->sem); 1323 if (!proc->lwWaiting) 1324 break; 1325 extraWaits++; 1326 } 1327 1328 /* Retrying, allow LWLockRelease to release waiters again. */ 1329 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1330 1331 #ifdef LOCK_DEBUG 1332 { 1333 /* not waiting anymore */ 1334 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1335 1336 Assert(nwaiters < MAX_BACKENDS); 1337 } 1338 #endif 1339 1340 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED()) 1341 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); 1342 LWLockReportWaitEnd(); 1343 1344 LOG_LWDEBUG("LWLockAcquire", lock, "awakened"); 1345 1346 /* Now loop back and try to acquire lock again. */ 1347 result = false; 1348 } 1349 1350 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED()) 1351 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode); 1352 1353 /* Add lock to list of locks held by this backend */ 1354 held_lwlocks[num_held_lwlocks].lock = lock; 1355 held_lwlocks[num_held_lwlocks++].mode = mode; 1356 1357 /* 1358 * Fix the process wait semaphore's count for any absorbed wakeups. 1359 */ 1360 while (extraWaits-- > 0) 1361 PGSemaphoreUnlock(proc->sem); 1362 1363 return result; 1364 } 1365 1366 /* 1367 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode 1368 * 1369 * If the lock is not available, return false with no side-effects. 1370 * 1371 * If successful, cancel/die interrupts are held off until lock release. 1372 */ 1373 bool 1374 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) 1375 { 1376 bool mustwait; 1377 1378 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1379 1380 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode); 1381 1382 /* Ensure we will have room to remember the lock */ 1383 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1384 elog(ERROR, "too many LWLocks taken"); 1385 1386 /* 1387 * Lock out cancel/die interrupts until we exit the code section protected 1388 * by the LWLock. This ensures that interrupts will not interfere with 1389 * manipulations of data structures in shared memory. 1390 */ 1391 HOLD_INTERRUPTS(); 1392 1393 /* Check for the lock */ 1394 mustwait = LWLockAttemptLock(lock, mode); 1395 1396 if (mustwait) 1397 { 1398 /* Failed to get lock, so release interrupt holdoff */ 1399 RESUME_INTERRUPTS(); 1400 1401 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed"); 1402 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED()) 1403 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode); 1404 } 1405 else 1406 { 1407 /* Add lock to list of locks held by this backend */ 1408 held_lwlocks[num_held_lwlocks].lock = lock; 1409 held_lwlocks[num_held_lwlocks++].mode = mode; 1410 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED()) 1411 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode); 1412 } 1413 return !mustwait; 1414 } 1415 1416 /* 1417 * LWLockAcquireOrWait - Acquire lock, or wait until it's free 1418 * 1419 * The semantics of this function are a bit funky. If the lock is currently 1420 * free, it is acquired in the given mode, and the function returns true. If 1421 * the lock isn't immediately free, the function waits until it is released 1422 * and returns false, but does not acquire the lock. 1423 * 1424 * This is currently used for WALWriteLock: when a backend flushes the WAL, 1425 * holding WALWriteLock, it can flush the commit records of many other 1426 * backends as a side-effect. Those other backends need to wait until the 1427 * flush finishes, but don't need to acquire the lock anymore. They can just 1428 * wake up, observe that their records have already been flushed, and return. 1429 */ 1430 bool 1431 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) 1432 { 1433 PGPROC *proc = MyProc; 1434 bool mustwait; 1435 int extraWaits = 0; 1436 #ifdef LWLOCK_STATS 1437 lwlock_stats *lwstats; 1438 1439 lwstats = get_lwlock_stats_entry(lock); 1440 #endif 1441 1442 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1443 1444 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode); 1445 1446 /* Ensure we will have room to remember the lock */ 1447 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1448 elog(ERROR, "too many LWLocks taken"); 1449 1450 /* 1451 * Lock out cancel/die interrupts until we exit the code section protected 1452 * by the LWLock. This ensures that interrupts will not interfere with 1453 * manipulations of data structures in shared memory. 1454 */ 1455 HOLD_INTERRUPTS(); 1456 1457 /* 1458 * NB: We're using nearly the same twice-in-a-row lock acquisition 1459 * protocol as LWLockAcquire(). Check its comments for details. 1460 */ 1461 mustwait = LWLockAttemptLock(lock, mode); 1462 1463 if (mustwait) 1464 { 1465 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); 1466 1467 mustwait = LWLockAttemptLock(lock, mode); 1468 1469 if (mustwait) 1470 { 1471 /* 1472 * Wait until awakened. Like in LWLockAcquire, be prepared for 1473 * bogus wakeups. 1474 */ 1475 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting"); 1476 1477 #ifdef LWLOCK_STATS 1478 lwstats->block_count++; 1479 #endif 1480 1481 LWLockReportWaitStart(lock); 1482 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED()) 1483 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); 1484 1485 for (;;) 1486 { 1487 PGSemaphoreLock(proc->sem); 1488 if (!proc->lwWaiting) 1489 break; 1490 extraWaits++; 1491 } 1492 1493 #ifdef LOCK_DEBUG 1494 { 1495 /* not waiting anymore */ 1496 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1497 1498 Assert(nwaiters < MAX_BACKENDS); 1499 } 1500 #endif 1501 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED()) 1502 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); 1503 LWLockReportWaitEnd(); 1504 1505 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened"); 1506 } 1507 else 1508 { 1509 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue"); 1510 1511 /* 1512 * Got lock in the second attempt, undo queueing. We need to treat 1513 * this as having successfully acquired the lock, otherwise we'd 1514 * not necessarily wake up people we've prevented from acquiring 1515 * the lock. 1516 */ 1517 LWLockDequeueSelf(lock); 1518 } 1519 } 1520 1521 /* 1522 * Fix the process wait semaphore's count for any absorbed wakeups. 1523 */ 1524 while (extraWaits-- > 0) 1525 PGSemaphoreUnlock(proc->sem); 1526 1527 if (mustwait) 1528 { 1529 /* Failed to get lock, so release interrupt holdoff */ 1530 RESUME_INTERRUPTS(); 1531 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed"); 1532 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED()) 1533 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode); 1534 } 1535 else 1536 { 1537 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded"); 1538 /* Add lock to list of locks held by this backend */ 1539 held_lwlocks[num_held_lwlocks].lock = lock; 1540 held_lwlocks[num_held_lwlocks++].mode = mode; 1541 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED()) 1542 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode); 1543 } 1544 1545 return !mustwait; 1546 } 1547 1548 /* 1549 * Does the lwlock in its current state need to wait for the variable value to 1550 * change? 1551 * 1552 * If we don't need to wait, and it's because the value of the variable has 1553 * changed, store the current value in newval. 1554 * 1555 * *result is set to true if the lock was free, and false otherwise. 1556 */ 1557 static bool 1558 LWLockConflictsWithVar(LWLock *lock, 1559 uint64 *valptr, uint64 oldval, uint64 *newval, 1560 bool *result) 1561 { 1562 bool mustwait; 1563 uint64 value; 1564 1565 /* 1566 * Test first to see if it the slot is free right now. 1567 * 1568 * XXX: the caller uses a spinlock before this, so we don't need a memory 1569 * barrier here as far as the current usage is concerned. But that might 1570 * not be safe in general. 1571 */ 1572 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; 1573 1574 if (!mustwait) 1575 { 1576 *result = true; 1577 return false; 1578 } 1579 1580 *result = false; 1581 1582 /* 1583 * Read value using the lwlock's wait list lock, as we can't generally 1584 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to 1585 * do atomic 64 bit reads/writes the spinlock should be optimized away. 1586 */ 1587 LWLockWaitListLock(lock); 1588 value = *valptr; 1589 LWLockWaitListUnlock(lock); 1590 1591 if (value != oldval) 1592 { 1593 mustwait = false; 1594 *newval = value; 1595 } 1596 else 1597 { 1598 mustwait = true; 1599 } 1600 1601 return mustwait; 1602 } 1603 1604 /* 1605 * LWLockWaitForVar - Wait until lock is free, or a variable is updated. 1606 * 1607 * If the lock is held and *valptr equals oldval, waits until the lock is 1608 * either freed, or the lock holder updates *valptr by calling 1609 * LWLockUpdateVar. If the lock is free on exit (immediately or after 1610 * waiting), returns true. If the lock is still held, but *valptr no longer 1611 * matches oldval, returns false and sets *newval to the current value in 1612 * *valptr. 1613 * 1614 * Note: this function ignores shared lock holders; if the lock is held 1615 * in shared mode, returns 'true'. 1616 */ 1617 bool 1618 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) 1619 { 1620 PGPROC *proc = MyProc; 1621 int extraWaits = 0; 1622 bool result = false; 1623 #ifdef LWLOCK_STATS 1624 lwlock_stats *lwstats; 1625 1626 lwstats = get_lwlock_stats_entry(lock); 1627 #endif 1628 1629 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE); 1630 1631 /* 1632 * Lock out cancel/die interrupts while we sleep on the lock. There is no 1633 * cleanup mechanism to remove us from the wait queue if we got 1634 * interrupted. 1635 */ 1636 HOLD_INTERRUPTS(); 1637 1638 /* 1639 * Loop here to check the lock's status after each time we are signaled. 1640 */ 1641 for (;;) 1642 { 1643 bool mustwait; 1644 1645 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, 1646 &result); 1647 1648 if (!mustwait) 1649 break; /* the lock was free or value didn't match */ 1650 1651 /* 1652 * Add myself to wait queue. Note that this is racy, somebody else 1653 * could wakeup before we're finished queuing. NB: We're using nearly 1654 * the same twice-in-a-row lock acquisition protocol as 1655 * LWLockAcquire(). Check its comments for details. The only 1656 * difference is that we also have to check the variable's values when 1657 * checking the state of the lock. 1658 */ 1659 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); 1660 1661 /* 1662 * Set RELEASE_OK flag, to make sure we get woken up as soon as the 1663 * lock is released. 1664 */ 1665 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1666 1667 /* 1668 * We're now guaranteed to be woken up if necessary. Recheck the lock 1669 * and variables state. 1670 */ 1671 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, 1672 &result); 1673 1674 /* Ok, no conflict after we queued ourselves. Undo queueing. */ 1675 if (!mustwait) 1676 { 1677 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue"); 1678 1679 LWLockDequeueSelf(lock); 1680 break; 1681 } 1682 1683 /* 1684 * Wait until awakened. 1685 * 1686 * It is possible that we get awakened for a reason other than being 1687 * signaled by LWLockRelease. If so, loop back and wait again. Once 1688 * we've gotten the LWLock, re-increment the sema by the number of 1689 * additional signals received. 1690 */ 1691 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting"); 1692 1693 #ifdef LWLOCK_STATS 1694 lwstats->block_count++; 1695 #endif 1696 1697 LWLockReportWaitStart(lock); 1698 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED()) 1699 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE); 1700 1701 for (;;) 1702 { 1703 PGSemaphoreLock(proc->sem); 1704 if (!proc->lwWaiting) 1705 break; 1706 extraWaits++; 1707 } 1708 1709 #ifdef LOCK_DEBUG 1710 { 1711 /* not waiting anymore */ 1712 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1713 1714 Assert(nwaiters < MAX_BACKENDS); 1715 } 1716 #endif 1717 1718 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED()) 1719 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE); 1720 LWLockReportWaitEnd(); 1721 1722 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened"); 1723 1724 /* Now loop back and check the status of the lock again. */ 1725 } 1726 1727 /* 1728 * Fix the process wait semaphore's count for any absorbed wakeups. 1729 */ 1730 while (extraWaits-- > 0) 1731 PGSemaphoreUnlock(proc->sem); 1732 1733 /* 1734 * Now okay to allow cancel/die interrupts. 1735 */ 1736 RESUME_INTERRUPTS(); 1737 1738 return result; 1739 } 1740 1741 1742 /* 1743 * LWLockUpdateVar - Update a variable and wake up waiters atomically 1744 * 1745 * Sets *valptr to 'val', and wakes up all processes waiting for us with 1746 * LWLockWaitForVar(). Setting the value and waking up the processes happen 1747 * atomically so that any process calling LWLockWaitForVar() on the same lock 1748 * is guaranteed to see the new value, and act accordingly. 1749 * 1750 * The caller must be holding the lock in exclusive mode. 1751 */ 1752 void 1753 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) 1754 { 1755 proclist_head wakeup; 1756 proclist_mutable_iter iter; 1757 1758 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE); 1759 1760 proclist_init(&wakeup); 1761 1762 LWLockWaitListLock(lock); 1763 1764 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE); 1765 1766 /* Update the lock's value */ 1767 *valptr = val; 1768 1769 /* 1770 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken 1771 * up. They are always in the front of the queue. 1772 */ 1773 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 1774 { 1775 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1776 1777 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) 1778 break; 1779 1780 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 1781 proclist_push_tail(&wakeup, iter.cur, lwWaitLink); 1782 } 1783 1784 /* We are done updating shared state of the lock itself. */ 1785 LWLockWaitListUnlock(lock); 1786 1787 /* 1788 * Awaken any waiters I removed from the queue. 1789 */ 1790 proclist_foreach_modify(iter, &wakeup, lwWaitLink) 1791 { 1792 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1793 1794 proclist_delete(&wakeup, iter.cur, lwWaitLink); 1795 /* check comment in LWLockWakeup() about this barrier */ 1796 pg_write_barrier(); 1797 waiter->lwWaiting = false; 1798 PGSemaphoreUnlock(waiter->sem); 1799 } 1800 } 1801 1802 1803 /* 1804 * LWLockRelease - release a previously acquired lock 1805 */ 1806 void 1807 LWLockRelease(LWLock *lock) 1808 { 1809 LWLockMode mode; 1810 uint32 oldstate; 1811 bool check_waiters; 1812 int i; 1813 1814 /* 1815 * Remove lock from list of locks held. Usually, but not always, it will 1816 * be the latest-acquired lock; so search array backwards. 1817 */ 1818 for (i = num_held_lwlocks; --i >= 0;) 1819 if (lock == held_lwlocks[i].lock) 1820 break; 1821 1822 if (i < 0) 1823 elog(ERROR, "lock %s is not held", T_NAME(lock)); 1824 1825 mode = held_lwlocks[i].mode; 1826 1827 num_held_lwlocks--; 1828 for (; i < num_held_lwlocks; i++) 1829 held_lwlocks[i] = held_lwlocks[i + 1]; 1830 1831 PRINT_LWDEBUG("LWLockRelease", lock, mode); 1832 1833 /* 1834 * Release my hold on lock, after that it can immediately be acquired by 1835 * others, even if we still have to wakeup other waiters. 1836 */ 1837 if (mode == LW_EXCLUSIVE) 1838 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE); 1839 else 1840 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED); 1841 1842 /* nobody else can have that kind of lock */ 1843 Assert(!(oldstate & LW_VAL_EXCLUSIVE)); 1844 1845 if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED()) 1846 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock)); 1847 1848 /* 1849 * We're still waiting for backends to get scheduled, don't wake them up 1850 * again. 1851 */ 1852 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) == 1853 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) && 1854 (oldstate & LW_LOCK_MASK) == 0) 1855 check_waiters = true; 1856 else 1857 check_waiters = false; 1858 1859 /* 1860 * As waking up waiters requires the spinlock to be acquired, only do so 1861 * if necessary. 1862 */ 1863 if (check_waiters) 1864 { 1865 /* XXX: remove before commit? */ 1866 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters"); 1867 LWLockWakeup(lock); 1868 } 1869 1870 /* 1871 * Now okay to allow cancel/die interrupts. 1872 */ 1873 RESUME_INTERRUPTS(); 1874 } 1875 1876 /* 1877 * LWLockReleaseClearVar - release a previously acquired lock, reset variable 1878 */ 1879 void 1880 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val) 1881 { 1882 LWLockWaitListLock(lock); 1883 1884 /* 1885 * Set the variable's value before releasing the lock, that prevents race 1886 * a race condition wherein a new locker acquires the lock, but hasn't yet 1887 * set the variables value. 1888 */ 1889 *valptr = val; 1890 LWLockWaitListUnlock(lock); 1891 1892 LWLockRelease(lock); 1893 } 1894 1895 1896 /* 1897 * LWLockReleaseAll - release all currently-held locks 1898 * 1899 * Used to clean up after ereport(ERROR). An important difference between this 1900 * function and retail LWLockRelease calls is that InterruptHoldoffCount is 1901 * unchanged by this operation. This is necessary since InterruptHoldoffCount 1902 * has been set to an appropriate level earlier in error recovery. We could 1903 * decrement it below zero if we allow it to drop for each released lock! 1904 */ 1905 void 1906 LWLockReleaseAll(void) 1907 { 1908 while (num_held_lwlocks > 0) 1909 { 1910 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ 1911 1912 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock); 1913 } 1914 } 1915 1916 1917 /* 1918 * LWLockHeldByMe - test whether my process holds a lock in any mode 1919 * 1920 * This is meant as debug support only. 1921 */ 1922 bool 1923 LWLockHeldByMe(LWLock *l) 1924 { 1925 int i; 1926 1927 for (i = 0; i < num_held_lwlocks; i++) 1928 { 1929 if (held_lwlocks[i].lock == l) 1930 return true; 1931 } 1932 return false; 1933 } 1934 1935 /* 1936 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode 1937 * 1938 * This is meant as debug support only. 1939 */ 1940 bool 1941 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode) 1942 { 1943 int i; 1944 1945 for (i = 0; i < num_held_lwlocks; i++) 1946 { 1947 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode) 1948 return true; 1949 } 1950 return false; 1951 } 1952