1 /*-------------------------------------------------------------------------
2 *
3 * lwlock.c
4 * Lightweight lock manager
5 *
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
12 *
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
22 *
23 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
28 *
29 * NOTES:
30 *
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
37 *
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
40 *
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
46 *
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 *
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
54 *
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
58 *
59 *
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
65
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
72 *
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
76 */
77 #include "postgres.h"
78
79 #include "miscadmin.h"
80 #include "pgstat.h"
81 #include "pg_trace.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/spin.h"
88 #include "utils/memutils.h"
89
90 #ifdef LWLOCK_STATS
91 #include "utils/hsearch.h"
92 #endif
93
94
95 /* We use the ShmemLock spinlock to protect LWLockCounter */
96 extern slock_t *ShmemLock;
97
98 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
99 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
100 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
101
102 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
103 #define LW_VAL_SHARED 1
104
105 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
106 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
107 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
108
109 /*
110 * This is indexed by tranche ID and stores metadata for all tranches known
111 * to the current backend.
112 */
113 static LWLockTranche **LWLockTrancheArray = NULL;
114 static int LWLockTranchesAllocated = 0;
115
116 #define T_NAME(lock) \
117 (LWLockTrancheArray[(lock)->tranche]->name)
118 #define T_ID(lock) \
119 ((int) ((((char *) lock) - \
120 ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
121 LWLockTrancheArray[(lock)->tranche]->array_stride))
122
123 /*
124 * This points to the main array of LWLocks in shared memory. Backends inherit
125 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
126 * where we have special measures to pass it down).
127 */
128 LWLockPadded *MainLWLockArray = NULL;
129 static LWLockTranche MainLWLockTranche;
130 static LWLockTranche BufMappingLWLockTranche;
131 static LWLockTranche LockManagerLWLockTranche;
132 static LWLockTranche PredicateLockManagerLWLockTranche;
133
134 /*
135 * We use this structure to keep track of locked LWLocks for release
136 * during error recovery. Normally, only a few will be held at once, but
137 * occasionally the number can be much higher; for example, the pg_buffercache
138 * extension locks all buffer partitions simultaneously.
139 */
140 #define MAX_SIMUL_LWLOCKS 200
141
142 /* struct representing the LWLocks we're holding */
143 typedef struct LWLockHandle
144 {
145 LWLock *lock;
146 LWLockMode mode;
147 } LWLockHandle;
148
149 static int num_held_lwlocks = 0;
150 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
151
152 /* struct representing the LWLock tranche request for named tranche */
153 typedef struct NamedLWLockTrancheRequest
154 {
155 char tranche_name[NAMEDATALEN];
156 int num_lwlocks;
157 } NamedLWLockTrancheRequest;
158
159 NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
160 static int NamedLWLockTrancheRequestsAllocated = 0;
161 int NamedLWLockTrancheRequests = 0;
162
163 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
164
165 static bool lock_named_request_allowed = true;
166
167 static void InitializeLWLocks(void);
168 static void RegisterLWLockTranches(void);
169
170 static inline void LWLockReportWaitStart(LWLock *lock);
171 static inline void LWLockReportWaitEnd(void);
172
173 #ifdef LWLOCK_STATS
174 typedef struct lwlock_stats_key
175 {
176 int tranche;
177 int instance;
178 } lwlock_stats_key;
179
180 typedef struct lwlock_stats
181 {
182 lwlock_stats_key key;
183 int sh_acquire_count;
184 int ex_acquire_count;
185 int block_count;
186 int dequeue_self_count;
187 int spin_delay_count;
188 } lwlock_stats;
189
190 static HTAB *lwlock_stats_htab;
191 static lwlock_stats lwlock_stats_dummy;
192 #endif
193
194 #ifdef LOCK_DEBUG
195 bool Trace_lwlocks = false;
196
197 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)198 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
199 {
200 /* hide statement & context here, otherwise the log is just too verbose */
201 if (Trace_lwlocks)
202 {
203 uint32 state = pg_atomic_read_u32(&lock->state);
204 int id = T_ID(lock);
205
206 if (lock->tranche == 0 && id < NUM_INDIVIDUAL_LWLOCKS)
207 ereport(LOG,
208 (errhidestmt(true),
209 errhidecontext(true),
210 errmsg_internal("%d: %s(%s): excl %u shared %u haswaiters %u waiters %u rOK %d",
211 MyProcPid,
212 where, MainLWLockNames[id],
213 (state & LW_VAL_EXCLUSIVE) != 0,
214 state & LW_SHARED_MASK,
215 (state & LW_FLAG_HAS_WAITERS) != 0,
216 pg_atomic_read_u32(&lock->nwaiters),
217 (state & LW_FLAG_RELEASE_OK) != 0)));
218 else
219 ereport(LOG,
220 (errhidestmt(true),
221 errhidecontext(true),
222 errmsg_internal("%d: %s(%s %d): excl %u shared %u haswaiters %u waiters %u rOK %d",
223 MyProcPid,
224 where, T_NAME(lock), id,
225 (state & LW_VAL_EXCLUSIVE) != 0,
226 state & LW_SHARED_MASK,
227 (state & LW_FLAG_HAS_WAITERS) != 0,
228 pg_atomic_read_u32(&lock->nwaiters),
229 (state & LW_FLAG_RELEASE_OK) != 0)));
230 }
231 }
232
233 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)234 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
235 {
236 /* hide statement & context here, otherwise the log is just too verbose */
237 if (Trace_lwlocks)
238 {
239 int id = T_ID(lock);
240
241 if (lock->tranche == 0 && id < NUM_INDIVIDUAL_LWLOCKS)
242 ereport(LOG,
243 (errhidestmt(true),
244 errhidecontext(true),
245 errmsg_internal("%s(%s): %s", where,
246 MainLWLockNames[id], msg)));
247 else
248 ereport(LOG,
249 (errhidestmt(true),
250 errhidecontext(true),
251 errmsg_internal("%s(%s %d): %s", where,
252 T_NAME(lock), id, msg)));
253 }
254 }
255
256 #else /* not LOCK_DEBUG */
257 #define PRINT_LWDEBUG(a,b,c) ((void)0)
258 #define LOG_LWDEBUG(a,b,c) ((void)0)
259 #endif /* LOCK_DEBUG */
260
261 #ifdef LWLOCK_STATS
262
263 static void init_lwlock_stats(void);
264 static void print_lwlock_stats(int code, Datum arg);
265 static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
266
267 static void
init_lwlock_stats(void)268 init_lwlock_stats(void)
269 {
270 HASHCTL ctl;
271 static MemoryContext lwlock_stats_cxt = NULL;
272 static bool exit_registered = false;
273
274 if (lwlock_stats_cxt != NULL)
275 MemoryContextDelete(lwlock_stats_cxt);
276
277 /*
278 * The LWLock stats will be updated within a critical section, which
279 * requires allocating new hash entries. Allocations within a critical
280 * section are normally not allowed because running out of memory would
281 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
282 * turned on in production, so that's an acceptable risk. The hash entries
283 * are small, so the risk of running out of memory is minimal in practice.
284 */
285 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
286 "LWLock stats",
287 ALLOCSET_DEFAULT_SIZES);
288 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
289
290 MemSet(&ctl, 0, sizeof(ctl));
291 ctl.keysize = sizeof(lwlock_stats_key);
292 ctl.entrysize = sizeof(lwlock_stats);
293 ctl.hcxt = lwlock_stats_cxt;
294 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
295 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
296 if (!exit_registered)
297 {
298 on_shmem_exit(print_lwlock_stats, 0);
299 exit_registered = true;
300 }
301 }
302
303 static void
print_lwlock_stats(int code,Datum arg)304 print_lwlock_stats(int code, Datum arg)
305 {
306 HASH_SEQ_STATUS scan;
307 lwlock_stats *lwstats;
308
309 hash_seq_init(&scan, lwlock_stats_htab);
310
311 /* Grab an LWLock to keep different backends from mixing reports */
312 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
313
314 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
315 {
316 fprintf(stderr,
317 "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
318 MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
319 lwstats->key.instance, lwstats->sh_acquire_count,
320 lwstats->ex_acquire_count, lwstats->block_count,
321 lwstats->spin_delay_count, lwstats->dequeue_self_count);
322 }
323
324 LWLockRelease(&MainLWLockArray[0].lock);
325 }
326
327 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)328 get_lwlock_stats_entry(LWLock *lock)
329 {
330 lwlock_stats_key key;
331 lwlock_stats *lwstats;
332 bool found;
333
334 /*
335 * During shared memory initialization, the hash table doesn't exist yet.
336 * Stats of that phase aren't very interesting, so just collect operations
337 * on all locks in a single dummy entry.
338 */
339 if (lwlock_stats_htab == NULL)
340 return &lwlock_stats_dummy;
341
342 /* Fetch or create the entry. */
343 key.tranche = lock->tranche;
344 key.instance = T_ID(lock);
345 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
346 if (!found)
347 {
348 lwstats->sh_acquire_count = 0;
349 lwstats->ex_acquire_count = 0;
350 lwstats->block_count = 0;
351 lwstats->dequeue_self_count = 0;
352 lwstats->spin_delay_count = 0;
353 }
354 return lwstats;
355 }
356 #endif /* LWLOCK_STATS */
357
358
359 /*
360 * Compute number of LWLocks required by named tranches. These will be
361 * allocated in the main array.
362 */
363 static int
NumLWLocksByNamedTranches(void)364 NumLWLocksByNamedTranches(void)
365 {
366 int numLocks = 0;
367 int i;
368
369 for (i = 0; i < NamedLWLockTrancheRequests; i++)
370 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
371
372 return numLocks;
373 }
374
375 /*
376 * Compute shmem space needed for LWLocks and named tranches.
377 */
378 Size
LWLockShmemSize(void)379 LWLockShmemSize(void)
380 {
381 Size size;
382 int i;
383 int numLocks = NUM_FIXED_LWLOCKS;
384
385 numLocks += NumLWLocksByNamedTranches();
386
387 /* Space for the LWLock array. */
388 size = mul_size(numLocks, sizeof(LWLockPadded));
389
390 /* Space for dynamic allocation counter, plus room for alignment. */
391 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
392
393 /* space for named tranches. */
394 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
395
396 /* space for name of each tranche. */
397 for (i = 0; i < NamedLWLockTrancheRequests; i++)
398 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
399
400 /* Disallow named LWLocks' requests after startup */
401 lock_named_request_allowed = false;
402
403 return size;
404 }
405
406 /*
407 * Allocate shmem space for the main LWLock array and all tranches and
408 * initialize it. We also register all the LWLock tranches here.
409 */
410 void
CreateLWLocks(void)411 CreateLWLocks(void)
412 {
413 StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
414 "MAX_BACKENDS too big for lwlock.c");
415
416 StaticAssertExpr(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
417 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
418 "Miscalculated LWLock padding");
419
420 if (!IsUnderPostmaster)
421 {
422 Size spaceLocks = LWLockShmemSize();
423 int *LWLockCounter;
424 char *ptr;
425
426 /* Allocate space */
427 ptr = (char *) ShmemAlloc(spaceLocks);
428
429 /* Leave room for dynamic allocation of tranches */
430 ptr += sizeof(int);
431
432 /* Ensure desired alignment of LWLock array */
433 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
434
435 MainLWLockArray = (LWLockPadded *) ptr;
436
437 /*
438 * Initialize the dynamic-allocation counter for tranches, which is
439 * stored just before the first LWLock.
440 */
441 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
442 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
443
444 /* Initialize all LWLocks */
445 InitializeLWLocks();
446 }
447
448 /* Register all LWLock tranches */
449 RegisterLWLockTranches();
450 }
451
452 /*
453 * Initialize LWLocks that are fixed and those belonging to named tranches.
454 */
455 static void
InitializeLWLocks(void)456 InitializeLWLocks(void)
457 {
458 int numNamedLocks = NumLWLocksByNamedTranches();
459 int id;
460 int i;
461 int j;
462 LWLockPadded *lock;
463
464 /* Initialize all individual LWLocks in main array */
465 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
466 LWLockInitialize(&lock->lock, LWTRANCHE_MAIN);
467
468 /* Initialize buffer mapping LWLocks in main array */
469 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
470 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
471 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
472
473 /* Initialize lmgrs' LWLocks in main array */
474 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
475 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
476 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
477
478 /* Initialize predicate lmgrs' LWLocks in main array */
479 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
480 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
481 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
482 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
483
484 /* Initialize named tranches. */
485 if (NamedLWLockTrancheRequests > 0)
486 {
487 char *trancheNames;
488
489 NamedLWLockTrancheArray = (NamedLWLockTranche *)
490 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
491
492 trancheNames = (char *) NamedLWLockTrancheArray +
493 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
494 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
495
496 for (i = 0; i < NamedLWLockTrancheRequests; i++)
497 {
498 NamedLWLockTrancheRequest *request;
499 NamedLWLockTranche *tranche;
500 char *name;
501
502 request = &NamedLWLockTrancheRequestArray[i];
503 tranche = &NamedLWLockTrancheArray[i];
504
505 name = trancheNames;
506 trancheNames += strlen(request->tranche_name) + 1;
507 strcpy(name, request->tranche_name);
508 tranche->lwLockTranche.name = name;
509 tranche->trancheId = LWLockNewTrancheId();
510 tranche->lwLockTranche.array_base = lock;
511 tranche->lwLockTranche.array_stride = sizeof(LWLockPadded);
512
513 for (j = 0; j < request->num_lwlocks; j++, lock++)
514 LWLockInitialize(&lock->lock, tranche->trancheId);
515 }
516 }
517 }
518
519 /*
520 * Register named tranches and tranches for fixed LWLocks.
521 */
522 static void
RegisterLWLockTranches(void)523 RegisterLWLockTranches(void)
524 {
525 int i;
526
527 if (LWLockTrancheArray == NULL)
528 {
529 LWLockTranchesAllocated = 32;
530 LWLockTrancheArray = (LWLockTranche **)
531 MemoryContextAllocZero(TopMemoryContext,
532 LWLockTranchesAllocated * sizeof(LWLockTranche *));
533 Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED);
534 }
535
536 MainLWLockTranche.name = "main";
537 MainLWLockTranche.array_base = MainLWLockArray;
538 MainLWLockTranche.array_stride = sizeof(LWLockPadded);
539 LWLockRegisterTranche(LWTRANCHE_MAIN, &MainLWLockTranche);
540
541 BufMappingLWLockTranche.name = "buffer_mapping";
542 BufMappingLWLockTranche.array_base = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
543 BufMappingLWLockTranche.array_stride = sizeof(LWLockPadded);
544 LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, &BufMappingLWLockTranche);
545
546 LockManagerLWLockTranche.name = "lock_manager";
547 LockManagerLWLockTranche.array_base = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
548 NUM_BUFFER_PARTITIONS;
549 LockManagerLWLockTranche.array_stride = sizeof(LWLockPadded);
550 LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, &LockManagerLWLockTranche);
551
552 PredicateLockManagerLWLockTranche.name = "predicate_lock_manager";
553 PredicateLockManagerLWLockTranche.array_base = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
554 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
555 PredicateLockManagerLWLockTranche.array_stride = sizeof(LWLockPadded);
556 LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER, &PredicateLockManagerLWLockTranche);
557
558 /* Register named tranches. */
559 for (i = 0; i < NamedLWLockTrancheRequests; i++)
560 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
561 &NamedLWLockTrancheArray[i].lwLockTranche);
562 }
563
564 /*
565 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
566 */
567 void
InitLWLockAccess(void)568 InitLWLockAccess(void)
569 {
570 #ifdef LWLOCK_STATS
571 init_lwlock_stats();
572 #endif
573 }
574
575 /*
576 * GetNamedLWLockTranche - returns the base address of LWLock from the
577 * specified tranche.
578 *
579 * Caller needs to retrieve the requested number of LWLocks starting from
580 * the base lock address returned by this API. This can be used for
581 * tranches that are requested by using RequestNamedLWLockTranche() API.
582 */
583 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)584 GetNamedLWLockTranche(const char *tranche_name)
585 {
586 int lock_pos;
587 int i;
588
589 /*
590 * Obtain the position of base address of LWLock belonging to requested
591 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
592 * in MainLWLockArray after fixed locks.
593 */
594 lock_pos = NUM_FIXED_LWLOCKS;
595 for (i = 0; i < NamedLWLockTrancheRequests; i++)
596 {
597 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
598 tranche_name) == 0)
599 return &MainLWLockArray[lock_pos];
600
601 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
602 }
603
604 if (i >= NamedLWLockTrancheRequests)
605 elog(ERROR, "requested tranche is not registered");
606
607 /* just to keep compiler quiet */
608 return NULL;
609 }
610
611 /*
612 * Allocate a new tranche ID.
613 */
614 int
LWLockNewTrancheId(void)615 LWLockNewTrancheId(void)
616 {
617 int result;
618 int *LWLockCounter;
619
620 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
621 SpinLockAcquire(ShmemLock);
622 result = (*LWLockCounter)++;
623 SpinLockRelease(ShmemLock);
624
625 return result;
626 }
627
628 /*
629 * Register a tranche ID in the lookup table for the current process. This
630 * routine will save a pointer to the tranche object passed as an argument,
631 * so that object should be allocated in a backend-lifetime context
632 * (TopMemoryContext, static variable, or similar).
633 */
634 void
LWLockRegisterTranche(int tranche_id,LWLockTranche * tranche)635 LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
636 {
637 Assert(LWLockTrancheArray != NULL);
638
639 if (tranche_id >= LWLockTranchesAllocated)
640 {
641 int i = LWLockTranchesAllocated;
642 int j = LWLockTranchesAllocated;
643
644 while (i <= tranche_id)
645 i *= 2;
646
647 LWLockTrancheArray = (LWLockTranche **)
648 repalloc(LWLockTrancheArray,
649 i * sizeof(LWLockTranche *));
650 LWLockTranchesAllocated = i;
651 while (j < LWLockTranchesAllocated)
652 LWLockTrancheArray[j++] = NULL;
653 }
654
655 LWLockTrancheArray[tranche_id] = tranche;
656 }
657
658 /*
659 * RequestNamedLWLockTranche
660 * Request that extra LWLocks be allocated during postmaster
661 * startup.
662 *
663 * This is only useful for extensions if called from the _PG_init hook
664 * of a library that is loaded into the postmaster via
665 * shared_preload_libraries. Once shared memory has been allocated, calls
666 * will be ignored. (We could raise an error, but it seems better to make
667 * it a no-op, so that libraries containing such calls can be reloaded if
668 * needed.)
669 */
670 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)671 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
672 {
673 NamedLWLockTrancheRequest *request;
674
675 if (IsUnderPostmaster || !lock_named_request_allowed)
676 return; /* too late */
677
678 if (NamedLWLockTrancheRequestArray == NULL)
679 {
680 NamedLWLockTrancheRequestsAllocated = 16;
681 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
682 MemoryContextAlloc(TopMemoryContext,
683 NamedLWLockTrancheRequestsAllocated
684 * sizeof(NamedLWLockTrancheRequest));
685 }
686
687 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
688 {
689 int i = NamedLWLockTrancheRequestsAllocated;
690
691 while (i <= NamedLWLockTrancheRequests)
692 i *= 2;
693
694 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
695 repalloc(NamedLWLockTrancheRequestArray,
696 i * sizeof(NamedLWLockTrancheRequest));
697 NamedLWLockTrancheRequestsAllocated = i;
698 }
699
700 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
701 Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
702 StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
703 request->num_lwlocks = num_lwlocks;
704 NamedLWLockTrancheRequests++;
705 }
706
707 /*
708 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
709 */
710 void
LWLockInitialize(LWLock * lock,int tranche_id)711 LWLockInitialize(LWLock *lock, int tranche_id)
712 {
713 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
714 #ifdef LOCK_DEBUG
715 pg_atomic_init_u32(&lock->nwaiters, 0);
716 #endif
717 lock->tranche = tranche_id;
718 dlist_init(&lock->waiters);
719 }
720
721 /*
722 * Report start of wait event for light-weight locks.
723 *
724 * This function will be used by all the light-weight lock calls which
725 * needs to wait to acquire the lock. This function distinguishes wait
726 * event based on tranche and lock id.
727 */
728 static inline void
LWLockReportWaitStart(LWLock * lock)729 LWLockReportWaitStart(LWLock *lock)
730 {
731 int lockId = T_ID(lock);
732
733 if (lock->tranche == 0)
734 pgstat_report_wait_start(WAIT_LWLOCK_NAMED, (uint16) lockId);
735 else
736 pgstat_report_wait_start(WAIT_LWLOCK_TRANCHE, lock->tranche);
737 }
738
739 /*
740 * Report end of wait event for light-weight locks.
741 */
742 static inline void
LWLockReportWaitEnd(void)743 LWLockReportWaitEnd(void)
744 {
745 pgstat_report_wait_end();
746 }
747
748 /*
749 * Return an identifier for an LWLock based on the wait class and event.
750 */
751 const char *
GetLWLockIdentifier(uint8 classId,uint16 eventId)752 GetLWLockIdentifier(uint8 classId, uint16 eventId)
753 {
754 if (classId == WAIT_LWLOCK_NAMED)
755 return MainLWLockNames[eventId];
756
757 Assert(classId == WAIT_LWLOCK_TRANCHE);
758
759 /*
760 * It is quite possible that user has registered tranche in one of the
761 * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
762 * all of them, so we can't assume the tranche is registered here.
763 */
764 if (eventId >= LWLockTranchesAllocated ||
765 LWLockTrancheArray[eventId]->name == NULL)
766 return "extension";
767
768 return LWLockTrancheArray[eventId]->name;
769 }
770
771 /*
772 * Internal function that tries to atomically acquire the lwlock in the passed
773 * in mode.
774 *
775 * This function will not block waiting for a lock to become free - that's the
776 * callers job.
777 *
778 * Returns true if the lock isn't free and we need to wait.
779 */
780 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)781 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
782 {
783 uint32 old_state;
784
785 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
786
787 /*
788 * Read once outside the loop, later iterations will get the newer value
789 * via compare & exchange.
790 */
791 old_state = pg_atomic_read_u32(&lock->state);
792
793 /* loop until we've determined whether we could acquire the lock or not */
794 while (true)
795 {
796 uint32 desired_state;
797 bool lock_free;
798
799 desired_state = old_state;
800
801 if (mode == LW_EXCLUSIVE)
802 {
803 lock_free = (old_state & LW_LOCK_MASK) == 0;
804 if (lock_free)
805 desired_state += LW_VAL_EXCLUSIVE;
806 }
807 else
808 {
809 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
810 if (lock_free)
811 desired_state += LW_VAL_SHARED;
812 }
813
814 /*
815 * Attempt to swap in the state we are expecting. If we didn't see
816 * lock to be free, that's just the old value. If we saw it as free,
817 * we'll attempt to mark it acquired. The reason that we always swap
818 * in the value is that this doubles as a memory barrier. We could try
819 * to be smarter and only swap in values if we saw the lock as free,
820 * but benchmark haven't shown it as beneficial so far.
821 *
822 * Retry if the value changed since we last looked at it.
823 */
824 if (pg_atomic_compare_exchange_u32(&lock->state,
825 &old_state, desired_state))
826 {
827 if (lock_free)
828 {
829 /* Great! Got the lock. */
830 #ifdef LOCK_DEBUG
831 if (mode == LW_EXCLUSIVE)
832 lock->owner = MyProc;
833 #endif
834 return false;
835 }
836 else
837 return true; /* somebody else has the lock */
838 }
839 }
840 pg_unreachable();
841 }
842
843 /*
844 * Lock the LWLock's wait list against concurrent activity.
845 *
846 * NB: even though the wait list is locked, non-conflicting lock operations
847 * may still happen concurrently.
848 *
849 * Time spent holding mutex should be short!
850 */
851 static void
LWLockWaitListLock(LWLock * lock)852 LWLockWaitListLock(LWLock *lock)
853 {
854 uint32 old_state;
855 #ifdef LWLOCK_STATS
856 lwlock_stats *lwstats;
857 uint32 delays = 0;
858
859 lwstats = get_lwlock_stats_entry(lock);
860 #endif
861
862 while (true)
863 {
864 /* always try once to acquire lock directly */
865 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
866 if (!(old_state & LW_FLAG_LOCKED))
867 break; /* got lock */
868
869 /* and then spin without atomic operations until lock is released */
870 {
871 SpinDelayStatus delayStatus;
872
873 init_local_spin_delay(&delayStatus);
874
875 while (old_state & LW_FLAG_LOCKED)
876 {
877 perform_spin_delay(&delayStatus);
878 old_state = pg_atomic_read_u32(&lock->state);
879 }
880 #ifdef LWLOCK_STATS
881 delays += delayStatus.delays;
882 #endif
883 finish_spin_delay(&delayStatus);
884 }
885
886 /*
887 * Retry. The lock might obviously already be re-acquired by the time
888 * we're attempting to get it again.
889 */
890 }
891
892 #ifdef LWLOCK_STATS
893 lwstats->spin_delay_count += delays;
894 #endif
895 }
896
897 /*
898 * Unlock the LWLock's wait list.
899 *
900 * Note that it can be more efficient to manipulate flags and release the
901 * locks in a single atomic operation.
902 */
903 static void
LWLockWaitListUnlock(LWLock * lock)904 LWLockWaitListUnlock(LWLock *lock)
905 {
906 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
907
908 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
909
910 Assert(old_state & LW_FLAG_LOCKED);
911 }
912
913 /*
914 * Wakeup all the lockers that currently have a chance to acquire the lock.
915 */
916 static void
LWLockWakeup(LWLock * lock)917 LWLockWakeup(LWLock *lock)
918 {
919 bool new_release_ok;
920 bool wokeup_somebody = false;
921 dlist_head wakeup;
922 dlist_mutable_iter iter;
923
924 dlist_init(&wakeup);
925
926 new_release_ok = true;
927
928 /* lock wait list while collecting backends to wake up */
929 LWLockWaitListLock(lock);
930
931 dlist_foreach_modify(iter, &lock->waiters)
932 {
933 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
934
935 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
936 continue;
937
938 dlist_delete(&waiter->lwWaitLink);
939 dlist_push_tail(&wakeup, &waiter->lwWaitLink);
940
941 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
942 {
943 /*
944 * Prevent additional wakeups until retryer gets to run. Backends
945 * that are just waiting for the lock to become free don't retry
946 * automatically.
947 */
948 new_release_ok = false;
949
950 /*
951 * Don't wakeup (further) exclusive locks.
952 */
953 wokeup_somebody = true;
954 }
955
956 /*
957 * Once we've woken up an exclusive lock, there's no point in waking
958 * up anybody else.
959 */
960 if (waiter->lwWaitMode == LW_EXCLUSIVE)
961 break;
962 }
963
964 Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
965
966 /* unset required flags, and release lock, in one fell swoop */
967 {
968 uint32 old_state;
969 uint32 desired_state;
970
971 old_state = pg_atomic_read_u32(&lock->state);
972 while (true)
973 {
974 desired_state = old_state;
975
976 /* compute desired flags */
977
978 if (new_release_ok)
979 desired_state |= LW_FLAG_RELEASE_OK;
980 else
981 desired_state &= ~LW_FLAG_RELEASE_OK;
982
983 if (dlist_is_empty(&wakeup))
984 desired_state &= ~LW_FLAG_HAS_WAITERS;
985
986 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
987
988 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
989 desired_state))
990 break;
991 }
992 }
993
994 /* Awaken any waiters I removed from the queue. */
995 dlist_foreach_modify(iter, &wakeup)
996 {
997 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
998
999 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1000 dlist_delete(&waiter->lwWaitLink);
1001
1002 /*
1003 * Guarantee that lwWaiting being unset only becomes visible once the
1004 * unlink from the link has completed. Otherwise the target backend
1005 * could be woken up for other reason and enqueue for a new lock - if
1006 * that happens before the list unlink happens, the list would end up
1007 * being corrupted.
1008 *
1009 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1010 * another lock.
1011 */
1012 pg_write_barrier();
1013 waiter->lwWaiting = false;
1014 PGSemaphoreUnlock(&waiter->sem);
1015 }
1016 }
1017
1018 /*
1019 * Add ourselves to the end of the queue.
1020 *
1021 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1022 */
1023 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)1024 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1025 {
1026 /*
1027 * If we don't have a PGPROC structure, there's no way to wait. This
1028 * should never occur, since MyProc should only be null during shared
1029 * memory initialization.
1030 */
1031 if (MyProc == NULL)
1032 elog(PANIC, "cannot wait without a PGPROC structure");
1033
1034 if (MyProc->lwWaiting)
1035 elog(PANIC, "queueing for lock while waiting on another one");
1036
1037 LWLockWaitListLock(lock);
1038
1039 /* setting the flag is protected by the spinlock */
1040 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1041
1042 MyProc->lwWaiting = true;
1043 MyProc->lwWaitMode = mode;
1044
1045 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1046 if (mode == LW_WAIT_UNTIL_FREE)
1047 dlist_push_head(&lock->waiters, &MyProc->lwWaitLink);
1048 else
1049 dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
1050
1051 /* Can release the mutex now */
1052 LWLockWaitListUnlock(lock);
1053
1054 #ifdef LOCK_DEBUG
1055 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1056 #endif
1057
1058 }
1059
1060 /*
1061 * Remove ourselves from the waitlist.
1062 *
1063 * This is used if we queued ourselves because we thought we needed to sleep
1064 * but, after further checking, we discovered that we don't actually need to
1065 * do so.
1066 */
1067 static void
LWLockDequeueSelf(LWLock * lock)1068 LWLockDequeueSelf(LWLock *lock)
1069 {
1070 bool found = false;
1071 dlist_mutable_iter iter;
1072
1073 #ifdef LWLOCK_STATS
1074 lwlock_stats *lwstats;
1075
1076 lwstats = get_lwlock_stats_entry(lock);
1077
1078 lwstats->dequeue_self_count++;
1079 #endif
1080
1081 LWLockWaitListLock(lock);
1082
1083 /*
1084 * Can't just remove ourselves from the list, but we need to iterate over
1085 * all entries as somebody else could have dequeued us.
1086 */
1087 dlist_foreach_modify(iter, &lock->waiters)
1088 {
1089 PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
1090
1091 if (proc == MyProc)
1092 {
1093 found = true;
1094 dlist_delete(&proc->lwWaitLink);
1095 break;
1096 }
1097 }
1098
1099 if (dlist_is_empty(&lock->waiters) &&
1100 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1101 {
1102 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1103 }
1104
1105 /* XXX: combine with fetch_and above? */
1106 LWLockWaitListUnlock(lock);
1107
1108 /* clear waiting state again, nice for debugging */
1109 if (found)
1110 MyProc->lwWaiting = false;
1111 else
1112 {
1113 int extraWaits = 0;
1114
1115 /*
1116 * Somebody else dequeued us and has or will wake us up. Deal with the
1117 * superfluous absorption of a wakeup.
1118 */
1119
1120 /*
1121 * Reset releaseOk if somebody woke us before we removed ourselves -
1122 * they'll have set it to false.
1123 */
1124 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1125
1126 /*
1127 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1128 * get reset at some inconvenient point later. Most of the time this
1129 * will immediately return.
1130 */
1131 for (;;)
1132 {
1133 PGSemaphoreLock(&MyProc->sem);
1134 if (!MyProc->lwWaiting)
1135 break;
1136 extraWaits++;
1137 }
1138
1139 /*
1140 * Fix the process wait semaphore's count for any absorbed wakeups.
1141 */
1142 while (extraWaits-- > 0)
1143 PGSemaphoreUnlock(&MyProc->sem);
1144 }
1145
1146 #ifdef LOCK_DEBUG
1147 {
1148 /* not waiting anymore */
1149 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1150
1151 Assert(nwaiters < MAX_BACKENDS);
1152 }
1153 #endif
1154 }
1155
1156 /*
1157 * LWLockAcquire - acquire a lightweight lock in the specified mode
1158 *
1159 * If the lock is not available, sleep until it is. Returns true if the lock
1160 * was available immediately, false if we had to sleep.
1161 *
1162 * Side effect: cancel/die interrupts are held off until lock release.
1163 */
1164 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1165 LWLockAcquire(LWLock *lock, LWLockMode mode)
1166 {
1167 PGPROC *proc = MyProc;
1168 bool result = true;
1169 int extraWaits = 0;
1170 #ifdef LWLOCK_STATS
1171 lwlock_stats *lwstats;
1172
1173 lwstats = get_lwlock_stats_entry(lock);
1174 #endif
1175
1176 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1177
1178 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1179
1180 #ifdef LWLOCK_STATS
1181 /* Count lock acquisition attempts */
1182 if (mode == LW_EXCLUSIVE)
1183 lwstats->ex_acquire_count++;
1184 else
1185 lwstats->sh_acquire_count++;
1186 #endif /* LWLOCK_STATS */
1187
1188 /*
1189 * We can't wait if we haven't got a PGPROC. This should only occur
1190 * during bootstrap or shared memory initialization. Put an Assert here
1191 * to catch unsafe coding practices.
1192 */
1193 Assert(!(proc == NULL && IsUnderPostmaster));
1194
1195 /* Ensure we will have room to remember the lock */
1196 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1197 elog(ERROR, "too many LWLocks taken");
1198
1199 /*
1200 * Lock out cancel/die interrupts until we exit the code section protected
1201 * by the LWLock. This ensures that interrupts will not interfere with
1202 * manipulations of data structures in shared memory.
1203 */
1204 HOLD_INTERRUPTS();
1205
1206 /*
1207 * Loop here to try to acquire lock after each time we are signaled by
1208 * LWLockRelease.
1209 *
1210 * NOTE: it might seem better to have LWLockRelease actually grant us the
1211 * lock, rather than retrying and possibly having to go back to sleep. But
1212 * in practice that is no good because it means a process swap for every
1213 * lock acquisition when two or more processes are contending for the same
1214 * lock. Since LWLocks are normally used to protect not-very-long
1215 * sections of computation, a process needs to be able to acquire and
1216 * release the same lock many times during a single CPU time slice, even
1217 * in the presence of contention. The efficiency of being able to do that
1218 * outweighs the inefficiency of sometimes wasting a process dispatch
1219 * cycle because the lock is not free when a released waiter finally gets
1220 * to run. See pgsql-hackers archives for 29-Dec-01.
1221 */
1222 for (;;)
1223 {
1224 bool mustwait;
1225
1226 /*
1227 * Try to grab the lock the first time, we're not in the waitqueue
1228 * yet/anymore.
1229 */
1230 mustwait = LWLockAttemptLock(lock, mode);
1231
1232 if (!mustwait)
1233 {
1234 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1235 break; /* got the lock */
1236 }
1237
1238 /*
1239 * Ok, at this point we couldn't grab the lock on the first try. We
1240 * cannot simply queue ourselves to the end of the list and wait to be
1241 * woken up because by now the lock could long have been released.
1242 * Instead add us to the queue and try to grab the lock again. If we
1243 * succeed we need to revert the queuing and be happy, otherwise we
1244 * recheck the lock. If we still couldn't grab it, we know that the
1245 * other lock will see our queue entries when releasing since they
1246 * existed before we checked for the lock.
1247 */
1248
1249 /* add to the queue */
1250 LWLockQueueSelf(lock, mode);
1251
1252 /* we're now guaranteed to be woken up if necessary */
1253 mustwait = LWLockAttemptLock(lock, mode);
1254
1255 /* ok, grabbed the lock the second time round, need to undo queueing */
1256 if (!mustwait)
1257 {
1258 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1259
1260 LWLockDequeueSelf(lock);
1261 break;
1262 }
1263
1264 /*
1265 * Wait until awakened.
1266 *
1267 * It is possible that we get awakened for a reason other than being
1268 * signaled by LWLockRelease. If so, loop back and wait again. Once
1269 * we've gotten the LWLock, re-increment the sema by the number of
1270 * additional signals received.
1271 */
1272 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1273
1274 #ifdef LWLOCK_STATS
1275 lwstats->block_count++;
1276 #endif
1277
1278 LWLockReportWaitStart(lock);
1279 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1280
1281 for (;;)
1282 {
1283 PGSemaphoreLock(&proc->sem);
1284 if (!proc->lwWaiting)
1285 break;
1286 extraWaits++;
1287 }
1288
1289 /* Retrying, allow LWLockRelease to release waiters again. */
1290 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1291
1292 #ifdef LOCK_DEBUG
1293 {
1294 /* not waiting anymore */
1295 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1296
1297 Assert(nwaiters < MAX_BACKENDS);
1298 }
1299 #endif
1300
1301 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1302 LWLockReportWaitEnd();
1303
1304 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1305
1306 /* Now loop back and try to acquire lock again. */
1307 result = false;
1308 }
1309
1310 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
1311
1312 /* Add lock to list of locks held by this backend */
1313 held_lwlocks[num_held_lwlocks].lock = lock;
1314 held_lwlocks[num_held_lwlocks++].mode = mode;
1315
1316 /*
1317 * Fix the process wait semaphore's count for any absorbed wakeups.
1318 */
1319 while (extraWaits-- > 0)
1320 PGSemaphoreUnlock(&proc->sem);
1321
1322 return result;
1323 }
1324
1325 /*
1326 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1327 *
1328 * If the lock is not available, return FALSE with no side-effects.
1329 *
1330 * If successful, cancel/die interrupts are held off until lock release.
1331 */
1332 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1333 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1334 {
1335 bool mustwait;
1336
1337 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1338
1339 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1340
1341 /* Ensure we will have room to remember the lock */
1342 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1343 elog(ERROR, "too many LWLocks taken");
1344
1345 /*
1346 * Lock out cancel/die interrupts until we exit the code section protected
1347 * by the LWLock. This ensures that interrupts will not interfere with
1348 * manipulations of data structures in shared memory.
1349 */
1350 HOLD_INTERRUPTS();
1351
1352 /* Check for the lock */
1353 mustwait = LWLockAttemptLock(lock, mode);
1354
1355 if (mustwait)
1356 {
1357 /* Failed to get lock, so release interrupt holdoff */
1358 RESUME_INTERRUPTS();
1359
1360 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1361 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode);
1362 }
1363 else
1364 {
1365 /* Add lock to list of locks held by this backend */
1366 held_lwlocks[num_held_lwlocks].lock = lock;
1367 held_lwlocks[num_held_lwlocks++].mode = mode;
1368 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode);
1369 }
1370 return !mustwait;
1371 }
1372
1373 /*
1374 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1375 *
1376 * The semantics of this function are a bit funky. If the lock is currently
1377 * free, it is acquired in the given mode, and the function returns true. If
1378 * the lock isn't immediately free, the function waits until it is released
1379 * and returns false, but does not acquire the lock.
1380 *
1381 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1382 * holding WALWriteLock, it can flush the commit records of many other
1383 * backends as a side-effect. Those other backends need to wait until the
1384 * flush finishes, but don't need to acquire the lock anymore. They can just
1385 * wake up, observe that their records have already been flushed, and return.
1386 */
1387 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1388 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1389 {
1390 PGPROC *proc = MyProc;
1391 bool mustwait;
1392 int extraWaits = 0;
1393 #ifdef LWLOCK_STATS
1394 lwlock_stats *lwstats;
1395
1396 lwstats = get_lwlock_stats_entry(lock);
1397 #endif
1398
1399 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1400
1401 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1402
1403 /* Ensure we will have room to remember the lock */
1404 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1405 elog(ERROR, "too many LWLocks taken");
1406
1407 /*
1408 * Lock out cancel/die interrupts until we exit the code section protected
1409 * by the LWLock. This ensures that interrupts will not interfere with
1410 * manipulations of data structures in shared memory.
1411 */
1412 HOLD_INTERRUPTS();
1413
1414 /*
1415 * NB: We're using nearly the same twice-in-a-row lock acquisition
1416 * protocol as LWLockAcquire(). Check its comments for details.
1417 */
1418 mustwait = LWLockAttemptLock(lock, mode);
1419
1420 if (mustwait)
1421 {
1422 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1423
1424 mustwait = LWLockAttemptLock(lock, mode);
1425
1426 if (mustwait)
1427 {
1428 /*
1429 * Wait until awakened. Like in LWLockAcquire, be prepared for
1430 * bogus wakeups.
1431 */
1432 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1433
1434 #ifdef LWLOCK_STATS
1435 lwstats->block_count++;
1436 #endif
1437
1438 LWLockReportWaitStart(lock);
1439 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1440
1441 for (;;)
1442 {
1443 PGSemaphoreLock(&proc->sem);
1444 if (!proc->lwWaiting)
1445 break;
1446 extraWaits++;
1447 }
1448
1449 #ifdef LOCK_DEBUG
1450 {
1451 /* not waiting anymore */
1452 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1453
1454 Assert(nwaiters < MAX_BACKENDS);
1455 }
1456 #endif
1457 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1458 LWLockReportWaitEnd();
1459
1460 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1461 }
1462 else
1463 {
1464 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1465
1466 /*
1467 * Got lock in the second attempt, undo queueing. We need to treat
1468 * this as having successfully acquired the lock, otherwise we'd
1469 * not necessarily wake up people we've prevented from acquiring
1470 * the lock.
1471 */
1472 LWLockDequeueSelf(lock);
1473 }
1474 }
1475
1476 /*
1477 * Fix the process wait semaphore's count for any absorbed wakeups.
1478 */
1479 while (extraWaits-- > 0)
1480 PGSemaphoreUnlock(&proc->sem);
1481
1482 if (mustwait)
1483 {
1484 /* Failed to get lock, so release interrupt holdoff */
1485 RESUME_INTERRUPTS();
1486 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1487 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock),
1488 mode);
1489 }
1490 else
1491 {
1492 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1493 /* Add lock to list of locks held by this backend */
1494 held_lwlocks[num_held_lwlocks].lock = lock;
1495 held_lwlocks[num_held_lwlocks++].mode = mode;
1496 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode);
1497 }
1498
1499 return !mustwait;
1500 }
1501
1502 /*
1503 * Does the lwlock in its current state need to wait for the variable value to
1504 * change?
1505 *
1506 * If we don't need to wait, and it's because the value of the variable has
1507 * changed, store the current value in newval.
1508 *
1509 * *result is set to true if the lock was free, and false otherwise.
1510 */
1511 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1512 LWLockConflictsWithVar(LWLock *lock,
1513 uint64 *valptr, uint64 oldval, uint64 *newval,
1514 bool *result)
1515 {
1516 bool mustwait;
1517 uint64 value;
1518
1519 /*
1520 * Test first to see if it the slot is free right now.
1521 *
1522 * XXX: the caller uses a spinlock before this, so we don't need a memory
1523 * barrier here as far as the current usage is concerned. But that might
1524 * not be safe in general.
1525 */
1526 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1527
1528 if (!mustwait)
1529 {
1530 *result = true;
1531 return false;
1532 }
1533
1534 *result = false;
1535
1536 /*
1537 * Read value using the lwlock's wait list lock, as we can't generally
1538 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1539 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1540 */
1541 LWLockWaitListLock(lock);
1542 value = *valptr;
1543 LWLockWaitListUnlock(lock);
1544
1545 if (value != oldval)
1546 {
1547 mustwait = false;
1548 *newval = value;
1549 }
1550 else
1551 {
1552 mustwait = true;
1553 }
1554
1555 return mustwait;
1556 }
1557
1558 /*
1559 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1560 *
1561 * If the lock is held and *valptr equals oldval, waits until the lock is
1562 * either freed, or the lock holder updates *valptr by calling
1563 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1564 * waiting), returns true. If the lock is still held, but *valptr no longer
1565 * matches oldval, returns false and sets *newval to the current value in
1566 * *valptr.
1567 *
1568 * Note: this function ignores shared lock holders; if the lock is held
1569 * in shared mode, returns 'true'.
1570 */
1571 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1572 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1573 {
1574 PGPROC *proc = MyProc;
1575 int extraWaits = 0;
1576 bool result = false;
1577 #ifdef LWLOCK_STATS
1578 lwlock_stats *lwstats;
1579
1580 lwstats = get_lwlock_stats_entry(lock);
1581 #endif
1582
1583 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1584
1585 /*
1586 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1587 * cleanup mechanism to remove us from the wait queue if we got
1588 * interrupted.
1589 */
1590 HOLD_INTERRUPTS();
1591
1592 /*
1593 * Loop here to check the lock's status after each time we are signaled.
1594 */
1595 for (;;)
1596 {
1597 bool mustwait;
1598
1599 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1600 &result);
1601
1602 if (!mustwait)
1603 break; /* the lock was free or value didn't match */
1604
1605 /*
1606 * Add myself to wait queue. Note that this is racy, somebody else
1607 * could wakeup before we're finished queuing. NB: We're using nearly
1608 * the same twice-in-a-row lock acquisition protocol as
1609 * LWLockAcquire(). Check its comments for details. The only
1610 * difference is that we also have to check the variable's values when
1611 * checking the state of the lock.
1612 */
1613 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1614
1615 /*
1616 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1617 * lock is released.
1618 */
1619 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1620
1621 /*
1622 * We're now guaranteed to be woken up if necessary. Recheck the lock
1623 * and variables state.
1624 */
1625 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1626 &result);
1627
1628 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1629 if (!mustwait)
1630 {
1631 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1632
1633 LWLockDequeueSelf(lock);
1634 break;
1635 }
1636
1637 /*
1638 * Wait until awakened.
1639 *
1640 * It is possible that we get awakened for a reason other than being
1641 * signaled by LWLockRelease. If so, loop back and wait again. Once
1642 * we've gotten the LWLock, re-increment the sema by the number of
1643 * additional signals received.
1644 */
1645 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1646
1647 #ifdef LWLOCK_STATS
1648 lwstats->block_count++;
1649 #endif
1650
1651 LWLockReportWaitStart(lock);
1652 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock),
1653 LW_EXCLUSIVE);
1654
1655 for (;;)
1656 {
1657 PGSemaphoreLock(&proc->sem);
1658 if (!proc->lwWaiting)
1659 break;
1660 extraWaits++;
1661 }
1662
1663 #ifdef LOCK_DEBUG
1664 {
1665 /* not waiting anymore */
1666 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1667
1668 Assert(nwaiters < MAX_BACKENDS);
1669 }
1670 #endif
1671
1672 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock),
1673 LW_EXCLUSIVE);
1674 LWLockReportWaitEnd();
1675
1676 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1677
1678 /* Now loop back and check the status of the lock again. */
1679 }
1680
1681 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE);
1682
1683 /*
1684 * Fix the process wait semaphore's count for any absorbed wakeups.
1685 */
1686 while (extraWaits-- > 0)
1687 PGSemaphoreUnlock(&proc->sem);
1688
1689 /*
1690 * Now okay to allow cancel/die interrupts.
1691 */
1692 RESUME_INTERRUPTS();
1693
1694 return result;
1695 }
1696
1697
1698 /*
1699 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1700 *
1701 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1702 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1703 * atomically so that any process calling LWLockWaitForVar() on the same lock
1704 * is guaranteed to see the new value, and act accordingly.
1705 *
1706 * The caller must be holding the lock in exclusive mode.
1707 */
1708 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1709 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1710 {
1711 dlist_head wakeup;
1712 dlist_mutable_iter iter;
1713
1714 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1715
1716 dlist_init(&wakeup);
1717
1718 LWLockWaitListLock(lock);
1719
1720 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1721
1722 /* Update the lock's value */
1723 *valptr = val;
1724
1725 /*
1726 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1727 * up. They are always in the front of the queue.
1728 */
1729 dlist_foreach_modify(iter, &lock->waiters)
1730 {
1731 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1732
1733 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1734 break;
1735
1736 dlist_delete(&waiter->lwWaitLink);
1737 dlist_push_tail(&wakeup, &waiter->lwWaitLink);
1738 }
1739
1740 /* We are done updating shared state of the lock itself. */
1741 LWLockWaitListUnlock(lock);
1742
1743 /*
1744 * Awaken any waiters I removed from the queue.
1745 */
1746 dlist_foreach_modify(iter, &wakeup)
1747 {
1748 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1749
1750 dlist_delete(&waiter->lwWaitLink);
1751 /* check comment in LWLockWakeup() about this barrier */
1752 pg_write_barrier();
1753 waiter->lwWaiting = false;
1754 PGSemaphoreUnlock(&waiter->sem);
1755 }
1756 }
1757
1758
1759 /*
1760 * LWLockRelease - release a previously acquired lock
1761 */
1762 void
LWLockRelease(LWLock * lock)1763 LWLockRelease(LWLock *lock)
1764 {
1765 LWLockMode mode;
1766 uint32 oldstate;
1767 bool check_waiters;
1768 int i;
1769
1770 /*
1771 * Remove lock from list of locks held. Usually, but not always, it will
1772 * be the latest-acquired lock; so search array backwards.
1773 */
1774 for (i = num_held_lwlocks; --i >= 0;)
1775 if (lock == held_lwlocks[i].lock)
1776 break;
1777
1778 if (i < 0)
1779 elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock));
1780
1781 mode = held_lwlocks[i].mode;
1782
1783 num_held_lwlocks--;
1784 for (; i < num_held_lwlocks; i++)
1785 held_lwlocks[i] = held_lwlocks[i + 1];
1786
1787 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1788
1789 /*
1790 * Release my hold on lock, after that it can immediately be acquired by
1791 * others, even if we still have to wakeup other waiters.
1792 */
1793 if (mode == LW_EXCLUSIVE)
1794 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1795 else
1796 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1797
1798 /* nobody else can have that kind of lock */
1799 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1800
1801
1802 /*
1803 * We're still waiting for backends to get scheduled, don't wake them up
1804 * again.
1805 */
1806 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1807 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1808 (oldstate & LW_LOCK_MASK) == 0)
1809 check_waiters = true;
1810 else
1811 check_waiters = false;
1812
1813 /*
1814 * As waking up waiters requires the spinlock to be acquired, only do so
1815 * if necessary.
1816 */
1817 if (check_waiters)
1818 {
1819 /* XXX: remove before commit? */
1820 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1821 LWLockWakeup(lock);
1822 }
1823
1824 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
1825
1826 /*
1827 * Now okay to allow cancel/die interrupts.
1828 */
1829 RESUME_INTERRUPTS();
1830 }
1831
1832 /*
1833 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1834 */
1835 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1836 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1837 {
1838 LWLockWaitListLock(lock);
1839
1840 /*
1841 * Set the variable's value before releasing the lock, that prevents race
1842 * a race condition wherein a new locker acquires the lock, but hasn't yet
1843 * set the variables value.
1844 */
1845 *valptr = val;
1846 LWLockWaitListUnlock(lock);
1847
1848 LWLockRelease(lock);
1849 }
1850
1851
1852 /*
1853 * LWLockReleaseAll - release all currently-held locks
1854 *
1855 * Used to clean up after ereport(ERROR). An important difference between this
1856 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1857 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1858 * has been set to an appropriate level earlier in error recovery. We could
1859 * decrement it below zero if we allow it to drop for each released lock!
1860 */
1861 void
LWLockReleaseAll(void)1862 LWLockReleaseAll(void)
1863 {
1864 while (num_held_lwlocks > 0)
1865 {
1866 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1867
1868 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1869 }
1870 }
1871
1872
1873 /*
1874 * LWLockHeldByMe - test whether my process currently holds a lock
1875 *
1876 * This is meant as debug support only. We currently do not distinguish
1877 * whether the lock is held shared or exclusive.
1878 */
1879 bool
LWLockHeldByMe(LWLock * l)1880 LWLockHeldByMe(LWLock *l)
1881 {
1882 int i;
1883
1884 for (i = 0; i < num_held_lwlocks; i++)
1885 {
1886 if (held_lwlocks[i].lock == l)
1887 return true;
1888 }
1889 return false;
1890 }
1891