1 /*------------------------------------------------------------------------- 2 * 3 * lwlock.c 4 * Lightweight lock manager 5 * 6 * Lightweight locks are intended primarily to provide mutual exclusion of 7 * access to shared-memory data structures. Therefore, they offer both 8 * exclusive and shared lock modes (to support read/write and read-only 9 * access to a shared object). There are few other frammishes. User-level 10 * locking should be done with the full lock manager --- which depends on 11 * LWLocks to protect its shared state. 12 * 13 * In addition to exclusive and shared modes, lightweight locks can be used to 14 * wait until a variable changes value. The variable is initially not set 15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the 16 * value it was set to when the lock was released last, and can be updated 17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar 18 * waits for the variable to be updated, or until the lock is free. When 19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an 20 * appropriate value for a free lock. The meaning of the variable is up to 21 * the caller, the lightweight lock code just assigns and compares it. 22 * 23 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 24 * Portions Copyright (c) 1994, Regents of the University of California 25 * 26 * IDENTIFICATION 27 * src/backend/storage/lmgr/lwlock.c 28 * 29 * NOTES: 30 * 31 * This used to be a pretty straight forward reader-writer lock 32 * implementation, in which the internal state was protected by a 33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be 34 * too high for workloads/locks that were taken in shared mode very 35 * frequently. Often we were spinning in the (obviously exclusive) spinlock, 36 * while trying to acquire a shared lock that was actually free. 37 * 38 * Thus a new implementation was devised that provides wait-free shared lock 39 * acquisition for locks that aren't exclusively locked. 40 * 41 * The basic idea is to have a single atomic variable 'lockcount' instead of 42 * the formerly separate shared and exclusive counters and to use atomic 43 * operations to acquire the lock. That's fairly easy to do for plain 44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait 45 * in the OS. PrintPreview(List pages,Rectangle bounds)46 * 47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount 48 * variable. For exclusive lock we swap in a sentinel value 49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders. 50 * 51 * To release the lock we use an atomic decrement to release the lock. If the 52 * new value is zero (we get that atomically), we know we can/have to release 53 * waiters. 54 * 55 * Obviously it is important that the sentinel value for exclusive locks 56 * doesn't conflict with the maximum number of possible share lockers - 57 * luckily MAX_BACKENDS makes that easily possible. 58 * 59 * 60 * The attentive reader might have noticed that naively doing the above has a 61 * glaring race condition: We try to lock using the atomic operations and 62 * notice that we have to wait. Unfortunately by the time we have finished 63 * queuing, the former locker very well might have already finished it's 64 * work. That's problematic because we're now stuck waiting inside the OS. 65 66 * To mitigate those races we use a two phased attempt at locking: 67 * Phase 1: Try to do it atomically, if we succeed, nice 68 * Phase 2: Add ourselves to the waitqueue of the lock 69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from 70 * the queue 71 * Phase 4: Sleep till wake-up, goto Phase 1 72 * 73 * This protects us against the problem from above as nobody can release too 74 * quick, before we're queued, since after Phase 2 we're already queued. 75 * ------------------------------------------------------------------------- 76 */ 77 #include "postgres.h" 78 79 #include "miscadmin.h" 80 #include "pg_trace.h" 81 #include "pgstat.h" 82 #include "postmaster/postmaster.h" 83 #include "replication/slot.h" 84 #include "storage/ipc.h" 85 #include "storage/predicate.h" 86 #include "storage/proc.h" 87 #include "storage/proclist.h" 88 #include "storage/spin.h" 89 #include "utils/memutils.h" 90 91 #ifdef LWLOCK_STATS 92 #include "utils/hsearch.h" 93 #endif 94 95 96 /* We use the ShmemLock spinlock to protect LWLockCounter */ 97 extern slock_t *ShmemLock; 98 99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30) 100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29) 101 #define LW_FLAG_LOCKED ((uint32) 1 << 28) 102 103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24) 104 #define LW_VAL_SHARED 1 105 106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1)) 107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */ 108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1)) 109 110 /* 111 * There are three sorts of LWLock "tranches": getButtonData()112 * 113 * 1. The individually-named locks defined in lwlocknames.h each have their 114 * own tranche. The names of these tranches appear in IndividualLWLockNames[] 115 * in lwlocknames.c. 116 * 117 * 2. There are some predefined tranches for built-in groups of locks. 118 * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names 119 * appear in BuiltinTrancheNames[] below. 120 * 121 * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche 122 * or LWLockRegisterTranche. The names of these that are known in the current 123 * process appear in LWLockTrancheNames[]. 124 * 125 * All these names are user-visible as wait event names, so choose with care 126 * ... and do not forget to update the documentation's list of wait events. 127 */ 128 extern const char *const IndividualLWLockNames[]; /* in lwlocknames.c */ 129 130 static const char *const BuiltinTrancheNames[] = { 131 /* LWTRANCHE_XACT_BUFFER: */ 132 "XactBuffer", 133 /* LWTRANCHE_COMMITTS_BUFFER: */ 134 "CommitTSBuffer", 135 /* LWTRANCHE_SUBTRANS_BUFFER: */ 136 "SubtransBuffer", 137 /* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */ 138 "MultiXactOffsetBuffer", 139 /* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */ 140 "MultiXactMemberBuffer", 141 /* LWTRANCHE_NOTIFY_BUFFER: */ 142 "NotifyBuffer", 143 /* LWTRANCHE_SERIAL_BUFFER: */ 144 "SerialBuffer", 145 /* LWTRANCHE_WAL_INSERT: */ 146 "WALInsert", 147 /* LWTRANCHE_BUFFER_CONTENT: */ 148 "BufferContent", 149 /* LWTRANCHE_BUFFER_IO: */ 150 "BufferIO", 151 /* LWTRANCHE_REPLICATION_ORIGIN_STATE: */ 152 "ReplicationOriginState", 153 /* LWTRANCHE_REPLICATION_SLOT_IO: */ 154 "ReplicationSlotIO", 155 /* LWTRANCHE_LOCK_FASTPATH: */ updateScroll()156 "LockFastPath", 157 /* LWTRANCHE_BUFFER_MAPPING: */ 158 "BufferMapping", 159 /* LWTRANCHE_LOCK_MANAGER: */ 160 "LockManager", 161 /* LWTRANCHE_PREDICATE_LOCK_MANAGER: */ 162 "PredicateLockManager", 163 /* LWTRANCHE_PARALLEL_HASH_JOIN: */ 164 "ParallelHashJoin", 165 /* LWTRANCHE_PARALLEL_QUERY_DSA: */ 166 "ParallelQueryDSA", 167 /* LWTRANCHE_PER_SESSION_DSA: */ 168 "PerSessionDSA", 169 /* LWTRANCHE_PER_SESSION_RECORD_TYPE: */ 170 "PerSessionRecordType", 171 /* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */ 172 "PerSessionRecordTypmod", 173 /* LWTRANCHE_SHARED_TUPLESTORE: */ 174 "SharedTupleStore", 175 /* LWTRANCHE_SHARED_TIDBITMAP: */ 176 "SharedTidBitmap", 177 /* LWTRANCHE_PARALLEL_APPEND: */ 178 "ParallelAppend", 179 /* LWTRANCHE_PER_XACT_PREDICATE_LIST: */ 180 "PerXactPredicateList" 181 }; 182 183 StaticAssertDecl(lengthof(BuiltinTrancheNames) == 184 LWTRANCHE_FIRST_USER_DEFINED - NUM_INDIVIDUAL_LWLOCKS, 185 "missing entries in BuiltinTrancheNames[]"); 186 187 /* 188 * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and 189 * stores the names of all dynamically-created tranches known to the current 190 * process. Any unused entries in the array will contain NULL. 191 */ 192 static const char **LWLockTrancheNames = NULL; 193 static int LWLockTrancheNamesAllocated = 0; 194 195 /* 196 * This points to the main array of LWLocks in shared memory. Backends inherit 197 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case, 198 * where we have special measures to pass it down). 199 */ 200 LWLockPadded *MainLWLockArray = NULL; 201 202 /* 203 * We use this structure to keep track of locked LWLocks for release 204 * during error recovery. Normally, only a few will be held at once, but 205 * occasionally the number can be much higher; for example, the pg_buffercache 206 * extension locks all buffer partitions simultaneously. 207 */ 208 #define MAX_SIMUL_LWLOCKS 200 209 210 /* struct representing the LWLocks we're holding */ 211 typedef struct LWLockHandle 212 { 213 LWLock *lock; 214 LWLockMode mode; 215 } LWLockHandle; 216 217 static int num_held_lwlocks = 0; 218 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; 219 220 /* struct representing the LWLock tranche request for named tranche */ 221 typedef struct NamedLWLockTrancheRequest 222 { 223 char tranche_name[NAMEDATALEN]; 224 int num_lwlocks; 225 } NamedLWLockTrancheRequest; 226 227 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL; 228 static int NamedLWLockTrancheRequestsAllocated = 0; 229 230 /* 231 * NamedLWLockTrancheRequests is both the valid length of the request array, 232 * and the length of the shared-memory NamedLWLockTrancheArray later on. 233 * This variable and NamedLWLockTrancheArray are non-static so that 234 * postmaster.c can copy them to child processes in EXEC_BACKEND builds. 235 */ 236 int NamedLWLockTrancheRequests = 0; 237 238 /* points to data in shared memory: */ 239 NamedLWLockTranche *NamedLWLockTrancheArray = NULL; 240 241 static bool lock_named_request_allowed = true; 242 243 static void InitializeLWLocks(void); 244 static inline void LWLockReportWaitStart(LWLock *lock); 245 static inline void LWLockReportWaitEnd(void); 246 static const char *GetLWTrancheName(uint16 trancheId); 247 248 #define T_NAME(lock) \ 249 GetLWTrancheName((lock)->tranche) 250 251 #ifdef LWLOCK_STATS 252 typedef struct lwlock_stats_key 253 { 254 int tranche; 255 void *instance; 256 } lwlock_stats_key; 257 258 typedef struct lwlock_stats 259 { 260 lwlock_stats_key key; 261 int sh_acquire_count; 262 int ex_acquire_count; 263 int block_count; 264 int dequeue_self_count; 265 int spin_delay_count; 266 } lwlock_stats; 267 268 static HTAB *lwlock_stats_htab; 269 static lwlock_stats lwlock_stats_dummy; 270 #endif 271 272 #ifdef LOCK_DEBUG 273 bool Trace_lwlocks = false; 274 275 inline static void 276 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode) 277 { 278 /* hide statement & context here, otherwise the log is just too verbose */ 279 if (Trace_lwlocks) 280 { 281 uint32 state = pg_atomic_read_u32(&lock->state); 282 283 ereport(LOG, 284 (errhidestmt(true), 285 errhidecontext(true), 286 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d", 287 MyProcPid, 288 where, T_NAME(lock), lock, 289 (state & LW_VAL_EXCLUSIVE) != 0, 290 state & LW_SHARED_MASK, 291 (state & LW_FLAG_HAS_WAITERS) != 0, 292 pg_atomic_read_u32(&lock->nwaiters), 293 (state & LW_FLAG_RELEASE_OK) != 0))); 294 } 295 } 296 297 inline static void 298 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg) 299 { 300 /* hide statement & context here, otherwise the log is just too verbose */ 301 if (Trace_lwlocks) 302 { 303 ereport(LOG, 304 (errhidestmt(true), 305 errhidecontext(true), 306 errmsg_internal("%s(%s %p): %s", where, 307 T_NAME(lock), lock, msg))); 308 } 309 } 310 311 #else /* not LOCK_DEBUG */ 312 #define PRINT_LWDEBUG(a,b,c) ((void)0) 313 #define LOG_LWDEBUG(a,b,c) ((void)0) 314 #endif /* LOCK_DEBUG */ 315 316 #ifdef LWLOCK_STATS 317 318 static void init_lwlock_stats(void); 319 static void print_lwlock_stats(int code, Datum arg); 320 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock); 321 322 static void 323 init_lwlock_stats(void) 324 { 325 HASHCTL ctl; 326 static MemoryContext lwlock_stats_cxt = NULL; 327 static bool exit_registered = false; 328 329 if (lwlock_stats_cxt != NULL) 330 MemoryContextDelete(lwlock_stats_cxt); 331 332 /* 333 * The LWLock stats will be updated within a critical section, which 334 * requires allocating new hash entries. Allocations within a critical 335 * section are normally not allowed because running out of memory would 336 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally 337 * turned on in production, so that's an acceptable risk. The hash entries 338 * are small, so the risk of running out of memory is minimal in practice. 339 */ 340 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext, 341 "LWLock stats", 342 ALLOCSET_DEFAULT_SIZES); 343 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true); 344 345 MemSet(&ctl, 0, sizeof(ctl)); 346 ctl.keysize = sizeof(lwlock_stats_key); 347 ctl.entrysize = sizeof(lwlock_stats); 348 ctl.hcxt = lwlock_stats_cxt; 349 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl, 350 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); 351 if (!exit_registered) 352 { 353 on_shmem_exit(print_lwlock_stats, 0); 354 exit_registered = true; 355 } 356 } 357 358 static void 359 print_lwlock_stats(int code, Datum arg) 360 { 361 HASH_SEQ_STATUS scan; 362 lwlock_stats *lwstats; 363 364 hash_seq_init(&scan, lwlock_stats_htab); 365 366 /* Grab an LWLock to keep different backends from mixing reports */ 367 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE); 368 369 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL) 370 { 371 fprintf(stderr, 372 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n", 373 MyProcPid, GetLWTrancheName(lwstats->key.tranche), 374 lwstats->key.instance, lwstats->sh_acquire_count, 375 lwstats->ex_acquire_count, lwstats->block_count, 376 lwstats->spin_delay_count, lwstats->dequeue_self_count); 377 } 378 379 LWLockRelease(&MainLWLockArray[0].lock); 380 } 381 382 static lwlock_stats * 383 get_lwlock_stats_entry(LWLock *lock) 384 { 385 lwlock_stats_key key; 386 lwlock_stats *lwstats; 387 bool found; 388 389 /* 390 * During shared memory initialization, the hash table doesn't exist yet. 391 * Stats of that phase aren't very interesting, so just collect operations 392 * on all locks in a single dummy entry. 393 */ 394 if (lwlock_stats_htab == NULL) 395 return &lwlock_stats_dummy; 396 397 /* Fetch or create the entry. */ 398 MemSet(&key, 0, sizeof(key)); 399 key.tranche = lock->tranche; 400 key.instance = lock; 401 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found); 402 if (!found) 403 { 404 lwstats->sh_acquire_count = 0; 405 lwstats->ex_acquire_count = 0; 406 lwstats->block_count = 0; 407 lwstats->dequeue_self_count = 0; 408 lwstats->spin_delay_count = 0; 409 } 410 return lwstats; 411 } 412 #endif /* LWLOCK_STATS */ 413 414 415 /* 416 * Compute number of LWLocks required by named tranches. These will be 417 * allocated in the main array. 418 */ 419 static int 420 NumLWLocksForNamedTranches(void) 421 { 422 int numLocks = 0; 423 int i; 424 425 for (i = 0; i < NamedLWLockTrancheRequests; i++) 426 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks; 427 428 return numLocks; 429 } 430 431 /* 432 * Compute shmem space needed for LWLocks and named tranches. 433 */ 434 Size 435 LWLockShmemSize(void) 436 { 437 Size size; 438 int i; 439 int numLocks = NUM_FIXED_LWLOCKS; 440 441 /* Calculate total number of locks needed in the main array. */ 442 numLocks += NumLWLocksForNamedTranches(); 443 444 /* Space for the LWLock array. */ 445 size = mul_size(numLocks, sizeof(LWLockPadded)); 446 447 /* Space for dynamic allocation counter, plus room for alignment. */ 448 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE); 449 450 /* space for named tranches. */ 451 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche))); 452 453 /* space for name of each tranche. */ 454 for (i = 0; i < NamedLWLockTrancheRequests; i++) 455 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1); 456 457 /* Disallow adding any more named tranches. */ 458 lock_named_request_allowed = false; 459 460 return size; 461 } 462 463 /* 464 * Allocate shmem space for the main LWLock array and all tranches and 465 * initialize it. We also register extension LWLock tranches here. 466 */ 467 void 468 CreateLWLocks(void) 469 { 470 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS, 471 "MAX_BACKENDS too big for lwlock.c"); 472 473 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE && 474 sizeof(LWLock) <= LWLOCK_PADDED_SIZE, 475 "Miscalculated LWLock padding"); 476 477 if (!IsUnderPostmaster) 478 { 479 Size spaceLocks = LWLockShmemSize(); 480 int *LWLockCounter; 481 char *ptr; 482 483 /* Allocate space */ 484 ptr = (char *) ShmemAlloc(spaceLocks); 485 486 /* Leave room for dynamic allocation of tranches */ 487 ptr += sizeof(int); 488 489 /* Ensure desired alignment of LWLock array */ 490 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; 491 492 MainLWLockArray = (LWLockPadded *) ptr; 493 494 /* 495 * Initialize the dynamic-allocation counter for tranches, which is 496 * stored just before the first LWLock. 497 */ 498 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); 499 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED; 500 501 /* Initialize all LWLocks */ 502 InitializeLWLocks(); 503 } 504 505 /* Register named extension LWLock tranches in the current process. */ 506 for (int i = 0; i < NamedLWLockTrancheRequests; i++) 507 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId, 508 NamedLWLockTrancheArray[i].trancheName); 509 } 510 511 /* 512 * Initialize LWLocks that are fixed and those belonging to named tranches. 513 */ 514 static void 515 InitializeLWLocks(void) 516 { 517 int numNamedLocks = NumLWLocksForNamedTranches(); 518 int id; 519 int i; 520 int j; 521 LWLockPadded *lock; 522 523 /* Initialize all individual LWLocks in main array */ 524 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++) 525 LWLockInitialize(&lock->lock, id); 526 527 /* Initialize buffer mapping LWLocks in main array */ 528 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS; 529 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++) 530 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING); 531 532 /* Initialize lmgrs' LWLocks in main array */ 533 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS; 534 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++) 535 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER); 536 537 /* Initialize predicate lmgrs' LWLocks in main array */ 538 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + 539 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS; 540 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++) 541 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER); 542 543 /* 544 * Copy the info about any named tranches into shared memory (so that 545 * other processes can see it), and initialize the requested LWLocks. 546 */ 547 if (NamedLWLockTrancheRequests > 0) 548 { 549 char *trancheNames; 550 551 NamedLWLockTrancheArray = (NamedLWLockTranche *) 552 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks]; 553 554 trancheNames = (char *) NamedLWLockTrancheArray + 555 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche)); 556 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS]; 557 558 for (i = 0; i < NamedLWLockTrancheRequests; i++) 559 { 560 NamedLWLockTrancheRequest *request; 561 NamedLWLockTranche *tranche; 562 char *name; 563 564 request = &NamedLWLockTrancheRequestArray[i]; 565 tranche = &NamedLWLockTrancheArray[i]; 566 567 name = trancheNames; 568 trancheNames += strlen(request->tranche_name) + 1; 569 strcpy(name, request->tranche_name); 570 tranche->trancheId = LWLockNewTrancheId(); 571 tranche->trancheName = name; 572 573 for (j = 0; j < request->num_lwlocks; j++, lock++) 574 LWLockInitialize(&lock->lock, tranche->trancheId); 575 } 576 } 577 } 578 579 /* 580 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks 581 */ 582 void 583 InitLWLockAccess(void) 584 { 585 #ifdef LWLOCK_STATS 586 init_lwlock_stats(); 587 #endif 588 } 589 590 /* 591 * GetNamedLWLockTranche - returns the base address of LWLock from the 592 * specified tranche. 593 * 594 * Caller needs to retrieve the requested number of LWLocks starting from 595 * the base lock address returned by this API. This can be used for 596 * tranches that are requested by using RequestNamedLWLockTranche() API. 597 */ 598 LWLockPadded * 599 GetNamedLWLockTranche(const char *tranche_name) 600 { 601 int lock_pos; 602 int i; 603 604 /* 605 * Obtain the position of base address of LWLock belonging to requested 606 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed 607 * in MainLWLockArray after fixed locks. 608 */ 609 lock_pos = NUM_FIXED_LWLOCKS; 610 for (i = 0; i < NamedLWLockTrancheRequests; i++) 611 { 612 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name, 613 tranche_name) == 0) 614 return &MainLWLockArray[lock_pos]; 615 616 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks; 617 } 618 619 elog(ERROR, "requested tranche is not registered"); 620 621 /* just to keep compiler quiet */ 622 return NULL; 623 } 624 625 /* 626 * Allocate a new tranche ID. 627 */ 628 int 629 LWLockNewTrancheId(void) 630 { 631 int result; 632 int *LWLockCounter; 633 634 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); 635 SpinLockAcquire(ShmemLock); 636 result = (*LWLockCounter)++; 637 SpinLockRelease(ShmemLock); 638 639 return result; 640 } 641 642 /* 643 * Register a dynamic tranche name in the lookup table of the current process. 644 * 645 * This routine will save a pointer to the tranche name passed as an argument, 646 * so the name should be allocated in a backend-lifetime context 647 * (shared memory, TopMemoryContext, static constant, or similar). 648 * 649 * The tranche name will be user-visible as a wait event name, so try to 650 * use a name that fits the style for those. 651 */ 652 void 653 LWLockRegisterTranche(int tranche_id, const char *tranche_name) 654 { 655 /* This should only be called for user-defined tranches. */ 656 if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED) 657 return; 658 659 /* Convert to array index. */ 660 tranche_id -= LWTRANCHE_FIRST_USER_DEFINED; 661 662 /* If necessary, create or enlarge array. */ 663 if (tranche_id >= LWLockTrancheNamesAllocated) 664 { 665 int newalloc; 666 667 newalloc = Max(LWLockTrancheNamesAllocated, 8); 668 while (newalloc <= tranche_id) 669 newalloc *= 2; 670 671 if (LWLockTrancheNames == NULL) 672 LWLockTrancheNames = (const char **) 673 MemoryContextAllocZero(TopMemoryContext, 674 newalloc * sizeof(char *)); 675 else 676 { 677 LWLockTrancheNames = (const char **) 678 repalloc(LWLockTrancheNames, newalloc * sizeof(char *)); 679 memset(LWLockTrancheNames + LWLockTrancheNamesAllocated, 680 0, 681 (newalloc - LWLockTrancheNamesAllocated) * sizeof(char *)); 682 } 683 LWLockTrancheNamesAllocated = newalloc; 684 } 685 686 LWLockTrancheNames[tranche_id] = tranche_name; 687 } 688 689 /* 690 * RequestNamedLWLockTranche 691 * Request that extra LWLocks be allocated during postmaster 692 * startup. 693 * 694 * This is only useful for extensions if called from the _PG_init hook 695 * of a library that is loaded into the postmaster via 696 * shared_preload_libraries. Once shared memory has been allocated, calls 697 * will be ignored. (We could raise an error, but it seems better to make 698 * it a no-op, so that libraries containing such calls can be reloaded if 699 * needed.) 700 * 701 * The tranche name will be user-visible as a wait event name, so try to 702 * use a name that fits the style for those. 703 */ 704 void 705 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks) 706 { 707 NamedLWLockTrancheRequest *request; 708 709 if (IsUnderPostmaster || !lock_named_request_allowed) 710 return; /* too late */ 711 712 if (NamedLWLockTrancheRequestArray == NULL) 713 { 714 NamedLWLockTrancheRequestsAllocated = 16; 715 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) 716 MemoryContextAlloc(TopMemoryContext, 717 NamedLWLockTrancheRequestsAllocated 718 * sizeof(NamedLWLockTrancheRequest)); 719 } 720 721 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated) 722 { 723 int i = NamedLWLockTrancheRequestsAllocated; 724 725 while (i <= NamedLWLockTrancheRequests) 726 i *= 2; 727 728 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) 729 repalloc(NamedLWLockTrancheRequestArray, 730 i * sizeof(NamedLWLockTrancheRequest)); 731 NamedLWLockTrancheRequestsAllocated = i; 732 } 733 734 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests]; 735 Assert(strlen(tranche_name) + 1 <= NAMEDATALEN); 736 strlcpy(request->tranche_name, tranche_name, NAMEDATALEN); 737 request->num_lwlocks = num_lwlocks; 738 NamedLWLockTrancheRequests++; 739 } 740 741 /* 742 * LWLockInitialize - initialize a new lwlock; it's initially unlocked 743 */ 744 void 745 LWLockInitialize(LWLock *lock, int tranche_id) 746 { 747 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); 748 #ifdef LOCK_DEBUG 749 pg_atomic_init_u32(&lock->nwaiters, 0); 750 #endif 751 lock->tranche = tranche_id; 752 proclist_init(&lock->waiters); 753 } 754 755 /* 756 * Report start of wait event for light-weight locks. 757 * 758 * This function will be used by all the light-weight lock calls which 759 * needs to wait to acquire the lock. This function distinguishes wait 760 * event based on tranche and lock id. 761 */ 762 static inline void 763 LWLockReportWaitStart(LWLock *lock) 764 { 765 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche); 766 } 767 768 /* 769 * Report end of wait event for light-weight locks. 770 */ 771 static inline void 772 LWLockReportWaitEnd(void) 773 { 774 pgstat_report_wait_end(); 775 } 776 777 /* 778 * Return the name of an LWLock tranche. 779 */ 780 static const char * 781 GetLWTrancheName(uint16 trancheId) 782 { 783 /* Individual LWLock? */ 784 if (trancheId < NUM_INDIVIDUAL_LWLOCKS) 785 return IndividualLWLockNames[trancheId]; 786 787 /* Built-in tranche? */ 788 if (trancheId < LWTRANCHE_FIRST_USER_DEFINED) 789 return BuiltinTrancheNames[trancheId - NUM_INDIVIDUAL_LWLOCKS]; 790 791 /* 792 * It's an extension tranche, so look in LWLockTrancheNames[]. However, 793 * it's possible that the tranche has never been registered in the current 794 * process, in which case give up and return "extension". 795 */ 796 trancheId -= LWTRANCHE_FIRST_USER_DEFINED; 797 798 if (trancheId >= LWLockTrancheNamesAllocated || 799 LWLockTrancheNames[trancheId] == NULL) 800 return "extension"; 801 802 return LWLockTrancheNames[trancheId]; 803 } 804 805 /* 806 * Return an identifier for an LWLock based on the wait class and event. 807 */ 808 const char * 809 GetLWLockIdentifier(uint32 classId, uint16 eventId) 810 { 811 Assert(classId == PG_WAIT_LWLOCK); 812 /* The event IDs are just tranche numbers. */ 813 return GetLWTrancheName(eventId); 814 } 815 816 /* 817 * Internal function that tries to atomically acquire the lwlock in the passed 818 * in mode. 819 * 820 * This function will not block waiting for a lock to become free - that's the 821 * callers job. 822 * 823 * Returns true if the lock isn't free and we need to wait. 824 */ 825 static bool 826 LWLockAttemptLock(LWLock *lock, LWLockMode mode) 827 { 828 uint32 old_state; 829 830 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED); 831 832 /* 833 * Read once outside the loop, later iterations will get the newer value 834 * via compare & exchange. 835 */ 836 old_state = pg_atomic_read_u32(&lock->state); 837 838 /* loop until we've determined whether we could acquire the lock or not */ 839 while (true) 840 { 841 uint32 desired_state; 842 bool lock_free; 843 844 desired_state = old_state; 845 846 if (mode == LW_EXCLUSIVE) 847 { 848 lock_free = (old_state & LW_LOCK_MASK) == 0; 849 if (lock_free) 850 desired_state += LW_VAL_EXCLUSIVE; 851 } 852 else 853 { 854 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0; 855 if (lock_free) 856 desired_state += LW_VAL_SHARED; 857 } 858 859 /* 860 * Attempt to swap in the state we are expecting. If we didn't see 861 * lock to be free, that's just the old value. If we saw it as free, 862 * we'll attempt to mark it acquired. The reason that we always swap 863 * in the value is that this doubles as a memory barrier. We could try 864 * to be smarter and only swap in values if we saw the lock as free, 865 * but benchmark haven't shown it as beneficial so far. 866 * 867 * Retry if the value changed since we last looked at it. 868 */ 869 if (pg_atomic_compare_exchange_u32(&lock->state, 870 &old_state, desired_state)) 871 { 872 if (lock_free) 873 { 874 /* Great! Got the lock. */ 875 #ifdef LOCK_DEBUG 876 if (mode == LW_EXCLUSIVE) 877 lock->owner = MyProc; 878 #endif 879 return false; 880 } 881 else 882 return true; /* somebody else has the lock */ 883 } 884 } 885 pg_unreachable(); 886 } 887 888 /* 889 * Lock the LWLock's wait list against concurrent activity. 890 * 891 * NB: even though the wait list is locked, non-conflicting lock operations 892 * may still happen concurrently. 893 * 894 * Time spent holding mutex should be short! 895 */ 896 static void 897 LWLockWaitListLock(LWLock *lock) 898 { 899 uint32 old_state; 900 #ifdef LWLOCK_STATS 901 lwlock_stats *lwstats; 902 uint32 delays = 0; 903 904 lwstats = get_lwlock_stats_entry(lock); 905 #endif 906 907 while (true) 908 { 909 /* always try once to acquire lock directly */ 910 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED); 911 if (!(old_state & LW_FLAG_LOCKED)) 912 break; /* got lock */ 913 914 /* and then spin without atomic operations until lock is released */ 915 { 916 SpinDelayStatus delayStatus; 917 918 init_local_spin_delay(&delayStatus); 919 920 while (old_state & LW_FLAG_LOCKED) 921 { 922 perform_spin_delay(&delayStatus); 923 old_state = pg_atomic_read_u32(&lock->state); 924 } 925 #ifdef LWLOCK_STATS 926 delays += delayStatus.delays; 927 #endif 928 finish_spin_delay(&delayStatus); 929 } 930 931 /* 932 * Retry. The lock might obviously already be re-acquired by the time 933 * we're attempting to get it again. 934 */ 935 } 936 937 #ifdef LWLOCK_STATS 938 lwstats->spin_delay_count += delays; 939 #endif 940 } 941 942 /* 943 * Unlock the LWLock's wait list. 944 * 945 * Note that it can be more efficient to manipulate flags and release the 946 * locks in a single atomic operation. 947 */ 948 static void 949 LWLockWaitListUnlock(LWLock *lock) 950 { 951 uint32 old_state PG_USED_FOR_ASSERTS_ONLY; 952 953 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED); 954 955 Assert(old_state & LW_FLAG_LOCKED); 956 } 957 958 /* 959 * Wakeup all the lockers that currently have a chance to acquire the lock. 960 */ 961 static void 962 LWLockWakeup(LWLock *lock) 963 { 964 bool new_release_ok; 965 bool wokeup_somebody = false; 966 proclist_head wakeup; 967 proclist_mutable_iter iter; 968 969 proclist_init(&wakeup); 970 971 new_release_ok = true; 972 973 /* lock wait list while collecting backends to wake up */ 974 LWLockWaitListLock(lock); 975 976 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 977 { 978 PGPROC *waiter = GetPGProcByNumber(iter.cur); 979 980 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) 981 continue; 982 983 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 984 proclist_push_tail(&wakeup, iter.cur, lwWaitLink); 985 986 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) 987 { 988 /* 989 * Prevent additional wakeups until retryer gets to run. Backends 990 * that are just waiting for the lock to become free don't retry 991 * automatically. 992 */ 993 new_release_ok = false; 994 995 /* 996 * Don't wakeup (further) exclusive locks. 997 */ 998 wokeup_somebody = true; 999 } 1000 1001 /* 1002 * Once we've woken up an exclusive lock, there's no point in waking 1003 * up anybody else. 1004 */ 1005 if (waiter->lwWaitMode == LW_EXCLUSIVE) 1006 break; 1007 } 1008 1009 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS); 1010 1011 /* unset required flags, and release lock, in one fell swoop */ 1012 { 1013 uint32 old_state; 1014 uint32 desired_state; 1015 1016 old_state = pg_atomic_read_u32(&lock->state); 1017 while (true) 1018 { 1019 desired_state = old_state; 1020 1021 /* compute desired flags */ 1022 1023 if (new_release_ok) 1024 desired_state |= LW_FLAG_RELEASE_OK; 1025 else 1026 desired_state &= ~LW_FLAG_RELEASE_OK; 1027 1028 if (proclist_is_empty(&wakeup)) 1029 desired_state &= ~LW_FLAG_HAS_WAITERS; 1030 1031 desired_state &= ~LW_FLAG_LOCKED; /* release lock */ 1032 1033 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state, 1034 desired_state)) 1035 break; 1036 } 1037 } 1038 1039 /* Awaken any waiters I removed from the queue. */ 1040 proclist_foreach_modify(iter, &wakeup, lwWaitLink) 1041 { 1042 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1043 1044 LOG_LWDEBUG("LWLockRelease", lock, "release waiter"); 1045 proclist_delete(&wakeup, iter.cur, lwWaitLink); 1046 1047 /* 1048 * Guarantee that lwWaiting being unset only becomes visible once the 1049 * unlink from the link has completed. Otherwise the target backend 1050 * could be woken up for other reason and enqueue for a new lock - if 1051 * that happens before the list unlink happens, the list would end up 1052 * being corrupted. 1053 * 1054 * The barrier pairs with the LWLockWaitListLock() when enqueuing for 1055 * another lock. 1056 */ 1057 pg_write_barrier(); 1058 waiter->lwWaiting = false; 1059 PGSemaphoreUnlock(waiter->sem); 1060 } 1061 } 1062 1063 /* 1064 * Add ourselves to the end of the queue. 1065 * 1066 * NB: Mode can be LW_WAIT_UNTIL_FREE here! 1067 */ 1068 static void 1069 LWLockQueueSelf(LWLock *lock, LWLockMode mode) 1070 { 1071 /* 1072 * If we don't have a PGPROC structure, there's no way to wait. This 1073 * should never occur, since MyProc should only be null during shared 1074 * memory initialization. 1075 */ 1076 if (MyProc == NULL) 1077 elog(PANIC, "cannot wait without a PGPROC structure"); 1078 1079 if (MyProc->lwWaiting) 1080 elog(PANIC, "queueing for lock while waiting on another one"); 1081 1082 LWLockWaitListLock(lock); 1083 1084 /* setting the flag is protected by the spinlock */ 1085 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS); 1086 1087 MyProc->lwWaiting = true; 1088 MyProc->lwWaitMode = mode; 1089 1090 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */ 1091 if (mode == LW_WAIT_UNTIL_FREE) 1092 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink); 1093 else 1094 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink); 1095 1096 /* Can release the mutex now */ 1097 LWLockWaitListUnlock(lock); 1098 1099 #ifdef LOCK_DEBUG 1100 pg_atomic_fetch_add_u32(&lock->nwaiters, 1); 1101 #endif 1102 1103 } 1104 1105 /* 1106 * Remove ourselves from the waitlist. 1107 * 1108 * This is used if we queued ourselves because we thought we needed to sleep 1109 * but, after further checking, we discovered that we don't actually need to 1110 * do so. 1111 */ 1112 static void 1113 LWLockDequeueSelf(LWLock *lock) 1114 { 1115 bool found = false; 1116 proclist_mutable_iter iter; 1117 1118 #ifdef LWLOCK_STATS 1119 lwlock_stats *lwstats; 1120 1121 lwstats = get_lwlock_stats_entry(lock); 1122 1123 lwstats->dequeue_self_count++; 1124 #endif 1125 1126 LWLockWaitListLock(lock); 1127 1128 /* 1129 * Can't just remove ourselves from the list, but we need to iterate over 1130 * all entries as somebody else could have dequeued us. 1131 */ 1132 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 1133 { 1134 if (iter.cur == MyProc->pgprocno) 1135 { 1136 found = true; 1137 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 1138 break; 1139 } 1140 } 1141 1142 if (proclist_is_empty(&lock->waiters) && 1143 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) 1144 { 1145 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); 1146 } 1147 1148 /* XXX: combine with fetch_and above? */ 1149 LWLockWaitListUnlock(lock); 1150 1151 /* clear waiting state again, nice for debugging */ 1152 if (found) 1153 MyProc->lwWaiting = false; 1154 else 1155 { 1156 int extraWaits = 0; 1157 1158 /* 1159 * Somebody else dequeued us and has or will wake us up. Deal with the 1160 * superfluous absorption of a wakeup. 1161 */ 1162 1163 /* 1164 * Reset RELEASE_OK flag if somebody woke us before we removed 1165 * ourselves - they'll have set it to false. 1166 */ 1167 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1168 1169 /* 1170 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would 1171 * get reset at some inconvenient point later. Most of the time this 1172 * will immediately return. 1173 */ 1174 for (;;) 1175 { 1176 PGSemaphoreLock(MyProc->sem); 1177 if (!MyProc->lwWaiting) 1178 break; 1179 extraWaits++; 1180 } 1181 1182 /* 1183 * Fix the process wait semaphore's count for any absorbed wakeups. 1184 */ 1185 while (extraWaits-- > 0) 1186 PGSemaphoreUnlock(MyProc->sem); 1187 } 1188 1189 #ifdef LOCK_DEBUG 1190 { 1191 /* not waiting anymore */ 1192 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1193 1194 Assert(nwaiters < MAX_BACKENDS); 1195 } 1196 #endif 1197 } 1198 1199 /* 1200 * LWLockAcquire - acquire a lightweight lock in the specified mode 1201 * 1202 * If the lock is not available, sleep until it is. Returns true if the lock 1203 * was available immediately, false if we had to sleep. 1204 * 1205 * Side effect: cancel/die interrupts are held off until lock release. 1206 */ 1207 bool 1208 LWLockAcquire(LWLock *lock, LWLockMode mode) 1209 { 1210 PGPROC *proc = MyProc; 1211 bool result = true; 1212 int extraWaits = 0; 1213 #ifdef LWLOCK_STATS 1214 lwlock_stats *lwstats; 1215 1216 lwstats = get_lwlock_stats_entry(lock); 1217 #endif 1218 1219 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1220 1221 PRINT_LWDEBUG("LWLockAcquire", lock, mode); 1222 1223 #ifdef LWLOCK_STATS 1224 /* Count lock acquisition attempts */ 1225 if (mode == LW_EXCLUSIVE) 1226 lwstats->ex_acquire_count++; 1227 else 1228 lwstats->sh_acquire_count++; 1229 #endif /* LWLOCK_STATS */ 1230 1231 /* 1232 * We can't wait if we haven't got a PGPROC. This should only occur 1233 * during bootstrap or shared memory initialization. Put an Assert here 1234 * to catch unsafe coding practices. 1235 */ 1236 Assert(!(proc == NULL && IsUnderPostmaster)); 1237 1238 /* Ensure we will have room to remember the lock */ 1239 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1240 elog(ERROR, "too many LWLocks taken"); 1241 1242 /* 1243 * Lock out cancel/die interrupts until we exit the code section protected 1244 * by the LWLock. This ensures that interrupts will not interfere with 1245 * manipulations of data structures in shared memory. 1246 */ 1247 HOLD_INTERRUPTS(); 1248 1249 /* 1250 * Loop here to try to acquire lock after each time we are signaled by 1251 * LWLockRelease. 1252 * 1253 * NOTE: it might seem better to have LWLockRelease actually grant us the 1254 * lock, rather than retrying and possibly having to go back to sleep. But 1255 * in practice that is no good because it means a process swap for every 1256 * lock acquisition when two or more processes are contending for the same 1257 * lock. Since LWLocks are normally used to protect not-very-long 1258 * sections of computation, a process needs to be able to acquire and 1259 * release the same lock many times during a single CPU time slice, even 1260 * in the presence of contention. The efficiency of being able to do that 1261 * outweighs the inefficiency of sometimes wasting a process dispatch 1262 * cycle because the lock is not free when a released waiter finally gets 1263 * to run. See pgsql-hackers archives for 29-Dec-01. 1264 */ 1265 for (;;) 1266 { 1267 bool mustwait; 1268 1269 /* 1270 * Try to grab the lock the first time, we're not in the waitqueue 1271 * yet/anymore. 1272 */ 1273 mustwait = LWLockAttemptLock(lock, mode); 1274 1275 if (!mustwait) 1276 { 1277 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock"); 1278 break; /* got the lock */ 1279 } 1280 1281 /* 1282 * Ok, at this point we couldn't grab the lock on the first try. We 1283 * cannot simply queue ourselves to the end of the list and wait to be 1284 * woken up because by now the lock could long have been released. 1285 * Instead add us to the queue and try to grab the lock again. If we 1286 * succeed we need to revert the queuing and be happy, otherwise we 1287 * recheck the lock. If we still couldn't grab it, we know that the 1288 * other locker will see our queue entries when releasing since they 1289 * existed before we checked for the lock. 1290 */ 1291 1292 /* add to the queue */ 1293 LWLockQueueSelf(lock, mode); 1294 1295 /* we're now guaranteed to be woken up if necessary */ 1296 mustwait = LWLockAttemptLock(lock, mode); 1297 1298 /* ok, grabbed the lock the second time round, need to undo queueing */ 1299 if (!mustwait) 1300 { 1301 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue"); 1302 1303 LWLockDequeueSelf(lock); 1304 break; 1305 } 1306 1307 /* 1308 * Wait until awakened. 1309 * 1310 * It is possible that we get awakened for a reason other than being 1311 * signaled by LWLockRelease. If so, loop back and wait again. Once 1312 * we've gotten the LWLock, re-increment the sema by the number of 1313 * additional signals received. 1314 */ 1315 LOG_LWDEBUG("LWLockAcquire", lock, "waiting"); 1316 1317 #ifdef LWLOCK_STATS 1318 lwstats->block_count++; 1319 #endif 1320 1321 LWLockReportWaitStart(lock); 1322 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED()) 1323 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); 1324 1325 for (;;) 1326 { 1327 PGSemaphoreLock(proc->sem); 1328 if (!proc->lwWaiting) 1329 break; 1330 extraWaits++; 1331 } 1332 1333 /* Retrying, allow LWLockRelease to release waiters again. */ 1334 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1335 1336 #ifdef LOCK_DEBUG 1337 { 1338 /* not waiting anymore */ 1339 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1340 1341 Assert(nwaiters < MAX_BACKENDS); 1342 } 1343 #endif 1344 1345 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED()) 1346 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); 1347 LWLockReportWaitEnd(); 1348 1349 LOG_LWDEBUG("LWLockAcquire", lock, "awakened"); 1350 1351 /* Now loop back and try to acquire lock again. */ 1352 result = false; 1353 } 1354 1355 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED()) 1356 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode); 1357 1358 /* Add lock to list of locks held by this backend */ 1359 held_lwlocks[num_held_lwlocks].lock = lock; 1360 held_lwlocks[num_held_lwlocks++].mode = mode; 1361 1362 /* 1363 * Fix the process wait semaphore's count for any absorbed wakeups. 1364 */ 1365 while (extraWaits-- > 0) 1366 PGSemaphoreUnlock(proc->sem); 1367 1368 return result; 1369 } 1370 1371 /* 1372 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode 1373 * 1374 * If the lock is not available, return false with no side-effects. 1375 * 1376 * If successful, cancel/die interrupts are held off until lock release. 1377 */ 1378 bool 1379 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) 1380 { 1381 bool mustwait; 1382 1383 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1384 1385 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode); 1386 1387 /* Ensure we will have room to remember the lock */ 1388 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1389 elog(ERROR, "too many LWLocks taken"); 1390 1391 /* 1392 * Lock out cancel/die interrupts until we exit the code section protected 1393 * by the LWLock. This ensures that interrupts will not interfere with 1394 * manipulations of data structures in shared memory. 1395 */ 1396 HOLD_INTERRUPTS(); 1397 1398 /* Check for the lock */ 1399 mustwait = LWLockAttemptLock(lock, mode); 1400 1401 if (mustwait) 1402 { 1403 /* Failed to get lock, so release interrupt holdoff */ 1404 RESUME_INTERRUPTS(); 1405 1406 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed"); 1407 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED()) 1408 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode); 1409 } 1410 else 1411 { 1412 /* Add lock to list of locks held by this backend */ 1413 held_lwlocks[num_held_lwlocks].lock = lock; 1414 held_lwlocks[num_held_lwlocks++].mode = mode; 1415 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED()) 1416 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode); 1417 } 1418 return !mustwait; 1419 } 1420 1421 /* 1422 * LWLockAcquireOrWait - Acquire lock, or wait until it's free 1423 * 1424 * The semantics of this function are a bit funky. If the lock is currently 1425 * free, it is acquired in the given mode, and the function returns true. If 1426 * the lock isn't immediately free, the function waits until it is released 1427 * and returns false, but does not acquire the lock. 1428 * 1429 * This is currently used for WALWriteLock: when a backend flushes the WAL, 1430 * holding WALWriteLock, it can flush the commit records of many other 1431 * backends as a side-effect. Those other backends need to wait until the 1432 * flush finishes, but don't need to acquire the lock anymore. They can just 1433 * wake up, observe that their records have already been flushed, and return. 1434 */ 1435 bool 1436 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) 1437 { 1438 PGPROC *proc = MyProc; 1439 bool mustwait; 1440 int extraWaits = 0; 1441 #ifdef LWLOCK_STATS 1442 lwlock_stats *lwstats; 1443 1444 lwstats = get_lwlock_stats_entry(lock); 1445 #endif 1446 1447 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE); 1448 1449 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode); 1450 1451 /* Ensure we will have room to remember the lock */ 1452 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) 1453 elog(ERROR, "too many LWLocks taken"); 1454 1455 /* 1456 * Lock out cancel/die interrupts until we exit the code section protected 1457 * by the LWLock. This ensures that interrupts will not interfere with 1458 * manipulations of data structures in shared memory. 1459 */ 1460 HOLD_INTERRUPTS(); 1461 1462 /* 1463 * NB: We're using nearly the same twice-in-a-row lock acquisition 1464 * protocol as LWLockAcquire(). Check its comments for details. 1465 */ 1466 mustwait = LWLockAttemptLock(lock, mode); 1467 1468 if (mustwait) 1469 { 1470 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); 1471 1472 mustwait = LWLockAttemptLock(lock, mode); 1473 1474 if (mustwait) 1475 { 1476 /* 1477 * Wait until awakened. Like in LWLockAcquire, be prepared for 1478 * bogus wakeups. 1479 */ 1480 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting"); 1481 1482 #ifdef LWLOCK_STATS 1483 lwstats->block_count++; 1484 #endif 1485 1486 LWLockReportWaitStart(lock); 1487 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED()) 1488 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); 1489 1490 for (;;) 1491 { 1492 PGSemaphoreLock(proc->sem); 1493 if (!proc->lwWaiting) 1494 break; 1495 extraWaits++; 1496 } 1497 1498 #ifdef LOCK_DEBUG 1499 { 1500 /* not waiting anymore */ 1501 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1502 1503 Assert(nwaiters < MAX_BACKENDS); 1504 } 1505 #endif 1506 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED()) 1507 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); 1508 LWLockReportWaitEnd(); 1509 1510 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened"); 1511 } 1512 else 1513 { 1514 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue"); 1515 1516 /* 1517 * Got lock in the second attempt, undo queueing. We need to treat 1518 * this as having successfully acquired the lock, otherwise we'd 1519 * not necessarily wake up people we've prevented from acquiring 1520 * the lock. 1521 */ 1522 LWLockDequeueSelf(lock); 1523 } 1524 } 1525 1526 /* 1527 * Fix the process wait semaphore's count for any absorbed wakeups. 1528 */ 1529 while (extraWaits-- > 0) 1530 PGSemaphoreUnlock(proc->sem); 1531 1532 if (mustwait) 1533 { 1534 /* Failed to get lock, so release interrupt holdoff */ 1535 RESUME_INTERRUPTS(); 1536 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed"); 1537 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED()) 1538 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode); 1539 } 1540 else 1541 { 1542 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded"); 1543 /* Add lock to list of locks held by this backend */ 1544 held_lwlocks[num_held_lwlocks].lock = lock; 1545 held_lwlocks[num_held_lwlocks++].mode = mode; 1546 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED()) 1547 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode); 1548 } 1549 1550 return !mustwait; 1551 } 1552 1553 /* 1554 * Does the lwlock in its current state need to wait for the variable value to 1555 * change? 1556 * 1557 * If we don't need to wait, and it's because the value of the variable has 1558 * changed, store the current value in newval. 1559 * 1560 * *result is set to true if the lock was free, and false otherwise. 1561 */ 1562 static bool 1563 LWLockConflictsWithVar(LWLock *lock, 1564 uint64 *valptr, uint64 oldval, uint64 *newval, 1565 bool *result) 1566 { 1567 bool mustwait; 1568 uint64 value; 1569 1570 /* 1571 * Test first to see if it the slot is free right now. 1572 * 1573 * XXX: the caller uses a spinlock before this, so we don't need a memory 1574 * barrier here as far as the current usage is concerned. But that might 1575 * not be safe in general. 1576 */ 1577 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; 1578 1579 if (!mustwait) 1580 { 1581 *result = true; 1582 return false; 1583 } 1584 1585 *result = false; 1586 1587 /* 1588 * Read value using the lwlock's wait list lock, as we can't generally 1589 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to 1590 * do atomic 64 bit reads/writes the spinlock should be optimized away. 1591 */ 1592 LWLockWaitListLock(lock); 1593 value = *valptr; 1594 LWLockWaitListUnlock(lock); 1595 1596 if (value != oldval) 1597 { 1598 mustwait = false; 1599 *newval = value; 1600 } 1601 else 1602 { 1603 mustwait = true; 1604 } 1605 1606 return mustwait; 1607 } 1608 1609 /* 1610 * LWLockWaitForVar - Wait until lock is free, or a variable is updated. 1611 * 1612 * If the lock is held and *valptr equals oldval, waits until the lock is 1613 * either freed, or the lock holder updates *valptr by calling 1614 * LWLockUpdateVar. If the lock is free on exit (immediately or after 1615 * waiting), returns true. If the lock is still held, but *valptr no longer 1616 * matches oldval, returns false and sets *newval to the current value in 1617 * *valptr. 1618 * 1619 * Note: this function ignores shared lock holders; if the lock is held 1620 * in shared mode, returns 'true'. 1621 */ 1622 bool 1623 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) 1624 { 1625 PGPROC *proc = MyProc; 1626 int extraWaits = 0; 1627 bool result = false; 1628 #ifdef LWLOCK_STATS 1629 lwlock_stats *lwstats; 1630 1631 lwstats = get_lwlock_stats_entry(lock); 1632 #endif 1633 1634 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE); 1635 1636 /* 1637 * Lock out cancel/die interrupts while we sleep on the lock. There is no 1638 * cleanup mechanism to remove us from the wait queue if we got 1639 * interrupted. 1640 */ 1641 HOLD_INTERRUPTS(); 1642 1643 /* 1644 * Loop here to check the lock's status after each time we are signaled. 1645 */ 1646 for (;;) 1647 { 1648 bool mustwait; 1649 1650 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, 1651 &result); 1652 1653 if (!mustwait) 1654 break; /* the lock was free or value didn't match */ 1655 1656 /* 1657 * Add myself to wait queue. Note that this is racy, somebody else 1658 * could wakeup before we're finished queuing. NB: We're using nearly 1659 * the same twice-in-a-row lock acquisition protocol as 1660 * LWLockAcquire(). Check its comments for details. The only 1661 * difference is that we also have to check the variable's values when 1662 * checking the state of the lock. 1663 */ 1664 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); 1665 1666 /* 1667 * Set RELEASE_OK flag, to make sure we get woken up as soon as the 1668 * lock is released. 1669 */ 1670 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); 1671 1672 /* 1673 * We're now guaranteed to be woken up if necessary. Recheck the lock 1674 * and variables state. 1675 */ 1676 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, 1677 &result); 1678 1679 /* Ok, no conflict after we queued ourselves. Undo queueing. */ 1680 if (!mustwait) 1681 { 1682 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue"); 1683 1684 LWLockDequeueSelf(lock); 1685 break; 1686 } 1687 1688 /* 1689 * Wait until awakened. 1690 * 1691 * It is possible that we get awakened for a reason other than being 1692 * signaled by LWLockRelease. If so, loop back and wait again. Once 1693 * we've gotten the LWLock, re-increment the sema by the number of 1694 * additional signals received. 1695 */ 1696 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting"); 1697 1698 #ifdef LWLOCK_STATS 1699 lwstats->block_count++; 1700 #endif 1701 1702 LWLockReportWaitStart(lock); 1703 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED()) 1704 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE); 1705 1706 for (;;) 1707 { 1708 PGSemaphoreLock(proc->sem); 1709 if (!proc->lwWaiting) 1710 break; 1711 extraWaits++; 1712 } 1713 1714 #ifdef LOCK_DEBUG 1715 { 1716 /* not waiting anymore */ 1717 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); 1718 1719 Assert(nwaiters < MAX_BACKENDS); 1720 } 1721 #endif 1722 1723 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED()) 1724 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE); 1725 LWLockReportWaitEnd(); 1726 1727 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened"); 1728 1729 /* Now loop back and check the status of the lock again. */ 1730 } 1731 1732 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED()) 1733 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE); 1734 1735 /* 1736 * Fix the process wait semaphore's count for any absorbed wakeups. 1737 */ 1738 while (extraWaits-- > 0) 1739 PGSemaphoreUnlock(proc->sem); 1740 1741 /* 1742 * Now okay to allow cancel/die interrupts. 1743 */ 1744 RESUME_INTERRUPTS(); 1745 1746 return result; 1747 } 1748 1749 1750 /* 1751 * LWLockUpdateVar - Update a variable and wake up waiters atomically 1752 * 1753 * Sets *valptr to 'val', and wakes up all processes waiting for us with 1754 * LWLockWaitForVar(). Setting the value and waking up the processes happen 1755 * atomically so that any process calling LWLockWaitForVar() on the same lock 1756 * is guaranteed to see the new value, and act accordingly. 1757 * 1758 * The caller must be holding the lock in exclusive mode. 1759 */ 1760 void 1761 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) 1762 { 1763 proclist_head wakeup; 1764 proclist_mutable_iter iter; 1765 1766 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE); 1767 1768 proclist_init(&wakeup); 1769 1770 LWLockWaitListLock(lock); 1771 1772 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE); 1773 1774 /* Update the lock's value */ 1775 *valptr = val; 1776 1777 /* 1778 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken 1779 * up. They are always in the front of the queue. 1780 */ 1781 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink) 1782 { 1783 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1784 1785 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) 1786 break; 1787 1788 proclist_delete(&lock->waiters, iter.cur, lwWaitLink); 1789 proclist_push_tail(&wakeup, iter.cur, lwWaitLink); 1790 } 1791 1792 /* We are done updating shared state of the lock itself. */ 1793 LWLockWaitListUnlock(lock); 1794 1795 /* 1796 * Awaken any waiters I removed from the queue. 1797 */ 1798 proclist_foreach_modify(iter, &wakeup, lwWaitLink) 1799 { 1800 PGPROC *waiter = GetPGProcByNumber(iter.cur); 1801 1802 proclist_delete(&wakeup, iter.cur, lwWaitLink); 1803 /* check comment in LWLockWakeup() about this barrier */ 1804 pg_write_barrier(); 1805 waiter->lwWaiting = false; 1806 PGSemaphoreUnlock(waiter->sem); 1807 } 1808 } 1809 1810 1811 /* 1812 * LWLockRelease - release a previously acquired lock 1813 */ 1814 void 1815 LWLockRelease(LWLock *lock) 1816 { 1817 LWLockMode mode; 1818 uint32 oldstate; 1819 bool check_waiters; 1820 int i; 1821 1822 /* 1823 * Remove lock from list of locks held. Usually, but not always, it will 1824 * be the latest-acquired lock; so search array backwards. 1825 */ 1826 for (i = num_held_lwlocks; --i >= 0;) 1827 if (lock == held_lwlocks[i].lock) 1828 break; 1829 1830 if (i < 0) 1831 elog(ERROR, "lock %s is not held", T_NAME(lock)); 1832 1833 mode = held_lwlocks[i].mode; 1834 1835 num_held_lwlocks--; 1836 for (; i < num_held_lwlocks; i++) 1837 held_lwlocks[i] = held_lwlocks[i + 1]; 1838 1839 PRINT_LWDEBUG("LWLockRelease", lock, mode); 1840 1841 /* 1842 * Release my hold on lock, after that it can immediately be acquired by 1843 * others, even if we still have to wakeup other waiters. 1844 */ 1845 if (mode == LW_EXCLUSIVE) 1846 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE); 1847 else 1848 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED); 1849 1850 /* nobody else can have that kind of lock */ 1851 Assert(!(oldstate & LW_VAL_EXCLUSIVE)); 1852 1853 1854 /* 1855 * We're still waiting for backends to get scheduled, don't wake them up 1856 * again. 1857 */ 1858 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) == 1859 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) && 1860 (oldstate & LW_LOCK_MASK) == 0) 1861 check_waiters = true; 1862 else 1863 check_waiters = false; 1864 1865 /* 1866 * As waking up waiters requires the spinlock to be acquired, only do so 1867 * if necessary. 1868 */ 1869 if (check_waiters) 1870 { 1871 /* XXX: remove before commit? */ 1872 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters"); 1873 LWLockWakeup(lock); 1874 } 1875 1876 if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED()) 1877 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock)); 1878 1879 /* 1880 * Now okay to allow cancel/die interrupts. 1881 */ 1882 RESUME_INTERRUPTS(); 1883 } 1884 1885 /* 1886 * LWLockReleaseClearVar - release a previously acquired lock, reset variable 1887 */ 1888 void 1889 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val) 1890 { 1891 LWLockWaitListLock(lock); 1892 1893 /* 1894 * Set the variable's value before releasing the lock, that prevents race 1895 * a race condition wherein a new locker acquires the lock, but hasn't yet 1896 * set the variables value. 1897 */ 1898 *valptr = val; 1899 LWLockWaitListUnlock(lock); 1900 1901 LWLockRelease(lock); 1902 } 1903 1904 1905 /* 1906 * LWLockReleaseAll - release all currently-held locks 1907 * 1908 * Used to clean up after ereport(ERROR). An important difference between this 1909 * function and retail LWLockRelease calls is that InterruptHoldoffCount is 1910 * unchanged by this operation. This is necessary since InterruptHoldoffCount 1911 * has been set to an appropriate level earlier in error recovery. We could 1912 * decrement it below zero if we allow it to drop for each released lock! 1913 */ 1914 void 1915 LWLockReleaseAll(void) 1916 { 1917 while (num_held_lwlocks > 0) 1918 { 1919 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ 1920 1921 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock); 1922 } 1923 } 1924 1925 1926 /* 1927 * LWLockHeldByMe - test whether my process holds a lock in any mode 1928 * 1929 * This is meant as debug support only. 1930 */ 1931 bool 1932 LWLockHeldByMe(LWLock *l) 1933 { 1934 int i; 1935 1936 for (i = 0; i < num_held_lwlocks; i++) 1937 { 1938 if (held_lwlocks[i].lock == l) 1939 return true; 1940 } 1941 return false; 1942 } 1943 1944 /* 1945 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode 1946 * 1947 * This is meant as debug support only. 1948 */ 1949 bool 1950 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode) 1951 { 1952 int i; 1953 1954 for (i = 0; i < num_held_lwlocks; i++) 1955 { 1956 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode) 1957 return true; 1958 } 1959 return false; 1960 } 1961