1 /*-------------------------------------------------------------------------
2 *
3 * lwlock.c
4 * Lightweight lock manager
5 *
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
12 *
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
22 *
23 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
28 *
29 * NOTES:
30 *
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
37 *
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
40 *
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
46 *
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 *
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
54 *
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
58 *
59 *
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
65
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
72 *
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
76 */
77 #include "postgres.h"
78
79 #include "miscadmin.h"
80 #include "pgstat.h"
81 #include "pg_trace.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94
95
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98
99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
102
103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104 #define LW_VAL_SHARED 1
105
106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
109
110 /*
111 * This is indexed by tranche ID and stores the names of all tranches known
112 * to the current backend.
113 */
114 static char **LWLockTrancheArray = NULL;
115 static int LWLockTranchesAllocated = 0;
116
117 #define T_NAME(lock) \
118 (LWLockTrancheArray[(lock)->tranche])
119
120 /*
121 * This points to the main array of LWLocks in shared memory. Backends inherit
122 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
123 * where we have special measures to pass it down).
124 */
125 LWLockPadded *MainLWLockArray = NULL;
126
127 /*
128 * We use this structure to keep track of locked LWLocks for release
129 * during error recovery. Normally, only a few will be held at once, but
130 * occasionally the number can be much higher; for example, the pg_buffercache
131 * extension locks all buffer partitions simultaneously.
132 */
133 #define MAX_SIMUL_LWLOCKS 200
134
135 /* struct representing the LWLocks we're holding */
136 typedef struct LWLockHandle
137 {
138 LWLock *lock;
139 LWLockMode mode;
140 } LWLockHandle;
141
142 static int num_held_lwlocks = 0;
143 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
144
145 /* struct representing the LWLock tranche request for named tranche */
146 typedef struct NamedLWLockTrancheRequest
147 {
148 char tranche_name[NAMEDATALEN];
149 int num_lwlocks;
150 } NamedLWLockTrancheRequest;
151
152 NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
153 static int NamedLWLockTrancheRequestsAllocated = 0;
154 int NamedLWLockTrancheRequests = 0;
155
156 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
157
158 static bool lock_named_request_allowed = true;
159
160 static void InitializeLWLocks(void);
161 static void RegisterLWLockTranches(void);
162
163 static inline void LWLockReportWaitStart(LWLock *lock);
164 static inline void LWLockReportWaitEnd(void);
165
166 #ifdef LWLOCK_STATS
167 typedef struct lwlock_stats_key
168 {
169 int tranche;
170 void *instance;
171 } lwlock_stats_key;
172
173 typedef struct lwlock_stats
174 {
175 lwlock_stats_key key;
176 int sh_acquire_count;
177 int ex_acquire_count;
178 int block_count;
179 int dequeue_self_count;
180 int spin_delay_count;
181 } lwlock_stats;
182
183 static HTAB *lwlock_stats_htab;
184 static lwlock_stats lwlock_stats_dummy;
185 #endif
186
187 #ifdef LOCK_DEBUG
188 bool Trace_lwlocks = false;
189
190 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)191 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
192 {
193 /* hide statement & context here, otherwise the log is just too verbose */
194 if (Trace_lwlocks)
195 {
196 uint32 state = pg_atomic_read_u32(&lock->state);
197
198 ereport(LOG,
199 (errhidestmt(true),
200 errhidecontext(true),
201 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
202 MyProcPid,
203 where, T_NAME(lock), lock,
204 (state & LW_VAL_EXCLUSIVE) != 0,
205 state & LW_SHARED_MASK,
206 (state & LW_FLAG_HAS_WAITERS) != 0,
207 pg_atomic_read_u32(&lock->nwaiters),
208 (state & LW_FLAG_RELEASE_OK) != 0)));
209 }
210 }
211
212 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)213 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
214 {
215 /* hide statement & context here, otherwise the log is just too verbose */
216 if (Trace_lwlocks)
217 {
218 ereport(LOG,
219 (errhidestmt(true),
220 errhidecontext(true),
221 errmsg_internal("%s(%s %p): %s", where,
222 T_NAME(lock), lock, msg)));
223 }
224 }
225
226 #else /* not LOCK_DEBUG */
227 #define PRINT_LWDEBUG(a,b,c) ((void)0)
228 #define LOG_LWDEBUG(a,b,c) ((void)0)
229 #endif /* LOCK_DEBUG */
230
231 #ifdef LWLOCK_STATS
232
233 static void init_lwlock_stats(void);
234 static void print_lwlock_stats(int code, Datum arg);
235 static lwlock_stats * get_lwlock_stats_entry(LWLock *lockid);
236
237 static void
init_lwlock_stats(void)238 init_lwlock_stats(void)
239 {
240 HASHCTL ctl;
241 static MemoryContext lwlock_stats_cxt = NULL;
242 static bool exit_registered = false;
243
244 if (lwlock_stats_cxt != NULL)
245 MemoryContextDelete(lwlock_stats_cxt);
246
247 /*
248 * The LWLock stats will be updated within a critical section, which
249 * requires allocating new hash entries. Allocations within a critical
250 * section are normally not allowed because running out of memory would
251 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
252 * turned on in production, so that's an acceptable risk. The hash entries
253 * are small, so the risk of running out of memory is minimal in practice.
254 */
255 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
256 "LWLock stats",
257 ALLOCSET_DEFAULT_SIZES);
258 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
259
260 MemSet(&ctl, 0, sizeof(ctl));
261 ctl.keysize = sizeof(lwlock_stats_key);
262 ctl.entrysize = sizeof(lwlock_stats);
263 ctl.hcxt = lwlock_stats_cxt;
264 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
265 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
266 if (!exit_registered)
267 {
268 on_shmem_exit(print_lwlock_stats, 0);
269 exit_registered = true;
270 }
271 }
272
273 static void
print_lwlock_stats(int code,Datum arg)274 print_lwlock_stats(int code, Datum arg)
275 {
276 HASH_SEQ_STATUS scan;
277 lwlock_stats *lwstats;
278
279 hash_seq_init(&scan, lwlock_stats_htab);
280
281 /* Grab an LWLock to keep different backends from mixing reports */
282 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
283
284 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
285 {
286 fprintf(stderr,
287 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
288 MyProcPid, LWLockTrancheArray[lwstats->key.tranche],
289 lwstats->key.instance, lwstats->sh_acquire_count,
290 lwstats->ex_acquire_count, lwstats->block_count,
291 lwstats->spin_delay_count, lwstats->dequeue_self_count);
292 }
293
294 LWLockRelease(&MainLWLockArray[0].lock);
295 }
296
297 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)298 get_lwlock_stats_entry(LWLock *lock)
299 {
300 lwlock_stats_key key;
301 lwlock_stats *lwstats;
302 bool found;
303
304 /*
305 * During shared memory initialization, the hash table doesn't exist yet.
306 * Stats of that phase aren't very interesting, so just collect operations
307 * on all locks in a single dummy entry.
308 */
309 if (lwlock_stats_htab == NULL)
310 return &lwlock_stats_dummy;
311
312 /* Fetch or create the entry. */
313 MemSet(&key, 0, sizeof(key));
314 key.tranche = lock->tranche;
315 key.instance = lock;
316 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
317 if (!found)
318 {
319 lwstats->sh_acquire_count = 0;
320 lwstats->ex_acquire_count = 0;
321 lwstats->block_count = 0;
322 lwstats->dequeue_self_count = 0;
323 lwstats->spin_delay_count = 0;
324 }
325 return lwstats;
326 }
327 #endif /* LWLOCK_STATS */
328
329
330 /*
331 * Compute number of LWLocks required by named tranches. These will be
332 * allocated in the main array.
333 */
334 static int
NumLWLocksByNamedTranches(void)335 NumLWLocksByNamedTranches(void)
336 {
337 int numLocks = 0;
338 int i;
339
340 for (i = 0; i < NamedLWLockTrancheRequests; i++)
341 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
342
343 return numLocks;
344 }
345
346 /*
347 * Compute shmem space needed for LWLocks and named tranches.
348 */
349 Size
LWLockShmemSize(void)350 LWLockShmemSize(void)
351 {
352 Size size;
353 int i;
354 int numLocks = NUM_FIXED_LWLOCKS;
355
356 numLocks += NumLWLocksByNamedTranches();
357
358 /* Space for the LWLock array. */
359 size = mul_size(numLocks, sizeof(LWLockPadded));
360
361 /* Space for dynamic allocation counter, plus room for alignment. */
362 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
363
364 /* space for named tranches. */
365 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
366
367 /* space for name of each tranche. */
368 for (i = 0; i < NamedLWLockTrancheRequests; i++)
369 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
370
371 /* Disallow named LWLocks' requests after startup */
372 lock_named_request_allowed = false;
373
374 return size;
375 }
376
377 /*
378 * Allocate shmem space for the main LWLock array and all tranches and
379 * initialize it. We also register all the LWLock tranches here.
380 */
381 void
CreateLWLocks(void)382 CreateLWLocks(void)
383 {
384 StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
385 "MAX_BACKENDS too big for lwlock.c");
386
387 StaticAssertExpr(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
388 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
389 "Miscalculated LWLock padding");
390
391 if (!IsUnderPostmaster)
392 {
393 Size spaceLocks = LWLockShmemSize();
394 int *LWLockCounter;
395 char *ptr;
396
397 /* Allocate space */
398 ptr = (char *) ShmemAlloc(spaceLocks);
399
400 /* Leave room for dynamic allocation of tranches */
401 ptr += sizeof(int);
402
403 /* Ensure desired alignment of LWLock array */
404 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
405
406 MainLWLockArray = (LWLockPadded *) ptr;
407
408 /*
409 * Initialize the dynamic-allocation counter for tranches, which is
410 * stored just before the first LWLock.
411 */
412 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
413 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
414
415 /* Initialize all LWLocks */
416 InitializeLWLocks();
417 }
418
419 /* Register all LWLock tranches */
420 RegisterLWLockTranches();
421 }
422
423 /*
424 * Initialize LWLocks that are fixed and those belonging to named tranches.
425 */
426 static void
InitializeLWLocks(void)427 InitializeLWLocks(void)
428 {
429 int numNamedLocks = NumLWLocksByNamedTranches();
430 int id;
431 int i;
432 int j;
433 LWLockPadded *lock;
434
435 /* Initialize all individual LWLocks in main array */
436 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
437 LWLockInitialize(&lock->lock, id);
438
439 /* Initialize buffer mapping LWLocks in main array */
440 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
441 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
442 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
443
444 /* Initialize lmgrs' LWLocks in main array */
445 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
446 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
447 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
448
449 /* Initialize predicate lmgrs' LWLocks in main array */
450 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
451 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
452 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
453 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
454
455 /* Initialize named tranches. */
456 if (NamedLWLockTrancheRequests > 0)
457 {
458 char *trancheNames;
459
460 NamedLWLockTrancheArray = (NamedLWLockTranche *)
461 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
462
463 trancheNames = (char *) NamedLWLockTrancheArray +
464 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
465 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
466
467 for (i = 0; i < NamedLWLockTrancheRequests; i++)
468 {
469 NamedLWLockTrancheRequest *request;
470 NamedLWLockTranche *tranche;
471 char *name;
472
473 request = &NamedLWLockTrancheRequestArray[i];
474 tranche = &NamedLWLockTrancheArray[i];
475
476 name = trancheNames;
477 trancheNames += strlen(request->tranche_name) + 1;
478 strcpy(name, request->tranche_name);
479 tranche->trancheId = LWLockNewTrancheId();
480 tranche->trancheName = name;
481
482 for (j = 0; j < request->num_lwlocks; j++, lock++)
483 LWLockInitialize(&lock->lock, tranche->trancheId);
484 }
485 }
486 }
487
488 /*
489 * Register named tranches and tranches for fixed LWLocks.
490 */
491 static void
RegisterLWLockTranches(void)492 RegisterLWLockTranches(void)
493 {
494 int i;
495
496 if (LWLockTrancheArray == NULL)
497 {
498 LWLockTranchesAllocated = 128;
499 LWLockTrancheArray = (char **)
500 MemoryContextAllocZero(TopMemoryContext,
501 LWLockTranchesAllocated * sizeof(char *));
502 Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED);
503 }
504
505 for (i = 0; i < NUM_INDIVIDUAL_LWLOCKS; ++i)
506 LWLockRegisterTranche(i, MainLWLockNames[i]);
507
508 LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, "buffer_mapping");
509 LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, "lock_manager");
510 LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER,
511 "predicate_lock_manager");
512 LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
513 "parallel_query_dsa");
514 LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
515
516 /* Register named tranches. */
517 for (i = 0; i < NamedLWLockTrancheRequests; i++)
518 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
519 NamedLWLockTrancheArray[i].trancheName);
520 }
521
522 /*
523 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
524 */
525 void
InitLWLockAccess(void)526 InitLWLockAccess(void)
527 {
528 #ifdef LWLOCK_STATS
529 init_lwlock_stats();
530 #endif
531 }
532
533 /*
534 * GetNamedLWLockTranche - returns the base address of LWLock from the
535 * specified tranche.
536 *
537 * Caller needs to retrieve the requested number of LWLocks starting from
538 * the base lock address returned by this API. This can be used for
539 * tranches that are requested by using RequestNamedLWLockTranche() API.
540 */
541 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)542 GetNamedLWLockTranche(const char *tranche_name)
543 {
544 int lock_pos;
545 int i;
546
547 /*
548 * Obtain the position of base address of LWLock belonging to requested
549 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
550 * in MainLWLockArray after fixed locks.
551 */
552 lock_pos = NUM_FIXED_LWLOCKS;
553 for (i = 0; i < NamedLWLockTrancheRequests; i++)
554 {
555 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
556 tranche_name) == 0)
557 return &MainLWLockArray[lock_pos];
558
559 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
560 }
561
562 if (i >= NamedLWLockTrancheRequests)
563 elog(ERROR, "requested tranche is not registered");
564
565 /* just to keep compiler quiet */
566 return NULL;
567 }
568
569 /*
570 * Allocate a new tranche ID.
571 */
572 int
LWLockNewTrancheId(void)573 LWLockNewTrancheId(void)
574 {
575 int result;
576 int *LWLockCounter;
577
578 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
579 SpinLockAcquire(ShmemLock);
580 result = (*LWLockCounter)++;
581 SpinLockRelease(ShmemLock);
582
583 return result;
584 }
585
586 /*
587 * Register a tranche ID in the lookup table for the current process. This
588 * routine will save a pointer to the tranche name passed as an argument,
589 * so the name should be allocated in a backend-lifetime context
590 * (TopMemoryContext, static variable, or similar).
591 */
592 void
LWLockRegisterTranche(int tranche_id,char * tranche_name)593 LWLockRegisterTranche(int tranche_id, char *tranche_name)
594 {
595 Assert(LWLockTrancheArray != NULL);
596
597 if (tranche_id >= LWLockTranchesAllocated)
598 {
599 int i = LWLockTranchesAllocated;
600 int j = LWLockTranchesAllocated;
601
602 while (i <= tranche_id)
603 i *= 2;
604
605 LWLockTrancheArray = (char **)
606 repalloc(LWLockTrancheArray, i * sizeof(char *));
607 LWLockTranchesAllocated = i;
608 while (j < LWLockTranchesAllocated)
609 LWLockTrancheArray[j++] = NULL;
610 }
611
612 LWLockTrancheArray[tranche_id] = tranche_name;
613 }
614
615 /*
616 * RequestNamedLWLockTranche
617 * Request that extra LWLocks be allocated during postmaster
618 * startup.
619 *
620 * This is only useful for extensions if called from the _PG_init hook
621 * of a library that is loaded into the postmaster via
622 * shared_preload_libraries. Once shared memory has been allocated, calls
623 * will be ignored. (We could raise an error, but it seems better to make
624 * it a no-op, so that libraries containing such calls can be reloaded if
625 * needed.)
626 */
627 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)628 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
629 {
630 NamedLWLockTrancheRequest *request;
631
632 if (IsUnderPostmaster || !lock_named_request_allowed)
633 return; /* too late */
634
635 if (NamedLWLockTrancheRequestArray == NULL)
636 {
637 NamedLWLockTrancheRequestsAllocated = 16;
638 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
639 MemoryContextAlloc(TopMemoryContext,
640 NamedLWLockTrancheRequestsAllocated
641 * sizeof(NamedLWLockTrancheRequest));
642 }
643
644 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
645 {
646 int i = NamedLWLockTrancheRequestsAllocated;
647
648 while (i <= NamedLWLockTrancheRequests)
649 i *= 2;
650
651 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
652 repalloc(NamedLWLockTrancheRequestArray,
653 i * sizeof(NamedLWLockTrancheRequest));
654 NamedLWLockTrancheRequestsAllocated = i;
655 }
656
657 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
658 Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
659 StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
660 request->num_lwlocks = num_lwlocks;
661 NamedLWLockTrancheRequests++;
662 }
663
664 /*
665 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
666 */
667 void
LWLockInitialize(LWLock * lock,int tranche_id)668 LWLockInitialize(LWLock *lock, int tranche_id)
669 {
670 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
671 #ifdef LOCK_DEBUG
672 pg_atomic_init_u32(&lock->nwaiters, 0);
673 #endif
674 lock->tranche = tranche_id;
675 proclist_init(&lock->waiters);
676 }
677
678 /*
679 * Report start of wait event for light-weight locks.
680 *
681 * This function will be used by all the light-weight lock calls which
682 * needs to wait to acquire the lock. This function distinguishes wait
683 * event based on tranche and lock id.
684 */
685 static inline void
LWLockReportWaitStart(LWLock * lock)686 LWLockReportWaitStart(LWLock *lock)
687 {
688 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
689 }
690
691 /*
692 * Report end of wait event for light-weight locks.
693 */
694 static inline void
LWLockReportWaitEnd(void)695 LWLockReportWaitEnd(void)
696 {
697 pgstat_report_wait_end();
698 }
699
700 /*
701 * Return an identifier for an LWLock based on the wait class and event.
702 */
703 const char *
GetLWLockIdentifier(uint32 classId,uint16 eventId)704 GetLWLockIdentifier(uint32 classId, uint16 eventId)
705 {
706 Assert(classId == PG_WAIT_LWLOCK);
707
708 /*
709 * It is quite possible that user has registered tranche in one of the
710 * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
711 * all of them, so we can't assume the tranche is registered here.
712 */
713 if (eventId >= LWLockTranchesAllocated ||
714 LWLockTrancheArray[eventId] == NULL)
715 return "extension";
716
717 return LWLockTrancheArray[eventId];
718 }
719
720 /*
721 * Internal function that tries to atomically acquire the lwlock in the passed
722 * in mode.
723 *
724 * This function will not block waiting for a lock to become free - that's the
725 * callers job.
726 *
727 * Returns true if the lock isn't free and we need to wait.
728 */
729 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)730 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
731 {
732 uint32 old_state;
733
734 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
735
736 /*
737 * Read once outside the loop, later iterations will get the newer value
738 * via compare & exchange.
739 */
740 old_state = pg_atomic_read_u32(&lock->state);
741
742 /* loop until we've determined whether we could acquire the lock or not */
743 while (true)
744 {
745 uint32 desired_state;
746 bool lock_free;
747
748 desired_state = old_state;
749
750 if (mode == LW_EXCLUSIVE)
751 {
752 lock_free = (old_state & LW_LOCK_MASK) == 0;
753 if (lock_free)
754 desired_state += LW_VAL_EXCLUSIVE;
755 }
756 else
757 {
758 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
759 if (lock_free)
760 desired_state += LW_VAL_SHARED;
761 }
762
763 /*
764 * Attempt to swap in the state we are expecting. If we didn't see
765 * lock to be free, that's just the old value. If we saw it as free,
766 * we'll attempt to mark it acquired. The reason that we always swap
767 * in the value is that this doubles as a memory barrier. We could try
768 * to be smarter and only swap in values if we saw the lock as free,
769 * but benchmark haven't shown it as beneficial so far.
770 *
771 * Retry if the value changed since we last looked at it.
772 */
773 if (pg_atomic_compare_exchange_u32(&lock->state,
774 &old_state, desired_state))
775 {
776 if (lock_free)
777 {
778 /* Great! Got the lock. */
779 #ifdef LOCK_DEBUG
780 if (mode == LW_EXCLUSIVE)
781 lock->owner = MyProc;
782 #endif
783 return false;
784 }
785 else
786 return true; /* somebody else has the lock */
787 }
788 }
789 pg_unreachable();
790 }
791
792 /*
793 * Lock the LWLock's wait list against concurrent activity.
794 *
795 * NB: even though the wait list is locked, non-conflicting lock operations
796 * may still happen concurrently.
797 *
798 * Time spent holding mutex should be short!
799 */
800 static void
LWLockWaitListLock(LWLock * lock)801 LWLockWaitListLock(LWLock *lock)
802 {
803 uint32 old_state;
804 #ifdef LWLOCK_STATS
805 lwlock_stats *lwstats;
806 uint32 delays = 0;
807
808 lwstats = get_lwlock_stats_entry(lock);
809 #endif
810
811 while (true)
812 {
813 /* always try once to acquire lock directly */
814 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
815 if (!(old_state & LW_FLAG_LOCKED))
816 break; /* got lock */
817
818 /* and then spin without atomic operations until lock is released */
819 {
820 SpinDelayStatus delayStatus;
821
822 init_local_spin_delay(&delayStatus);
823
824 while (old_state & LW_FLAG_LOCKED)
825 {
826 perform_spin_delay(&delayStatus);
827 old_state = pg_atomic_read_u32(&lock->state);
828 }
829 #ifdef LWLOCK_STATS
830 delays += delayStatus.delays;
831 #endif
832 finish_spin_delay(&delayStatus);
833 }
834
835 /*
836 * Retry. The lock might obviously already be re-acquired by the time
837 * we're attempting to get it again.
838 */
839 }
840
841 #ifdef LWLOCK_STATS
842 lwstats->spin_delay_count += delays;
843 #endif
844 }
845
846 /*
847 * Unlock the LWLock's wait list.
848 *
849 * Note that it can be more efficient to manipulate flags and release the
850 * locks in a single atomic operation.
851 */
852 static void
LWLockWaitListUnlock(LWLock * lock)853 LWLockWaitListUnlock(LWLock *lock)
854 {
855 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
856
857 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
858
859 Assert(old_state & LW_FLAG_LOCKED);
860 }
861
862 /*
863 * Wakeup all the lockers that currently have a chance to acquire the lock.
864 */
865 static void
LWLockWakeup(LWLock * lock)866 LWLockWakeup(LWLock *lock)
867 {
868 bool new_release_ok;
869 bool wokeup_somebody = false;
870 proclist_head wakeup;
871 proclist_mutable_iter iter;
872
873 proclist_init(&wakeup);
874
875 new_release_ok = true;
876
877 /* lock wait list while collecting backends to wake up */
878 LWLockWaitListLock(lock);
879
880 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
881 {
882 PGPROC *waiter = GetPGProcByNumber(iter.cur);
883
884 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
885 continue;
886
887 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
888 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
889
890 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
891 {
892 /*
893 * Prevent additional wakeups until retryer gets to run. Backends
894 * that are just waiting for the lock to become free don't retry
895 * automatically.
896 */
897 new_release_ok = false;
898
899 /*
900 * Don't wakeup (further) exclusive locks.
901 */
902 wokeup_somebody = true;
903 }
904
905 /*
906 * Once we've woken up an exclusive lock, there's no point in waking
907 * up anybody else.
908 */
909 if (waiter->lwWaitMode == LW_EXCLUSIVE)
910 break;
911 }
912
913 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
914
915 /* unset required flags, and release lock, in one fell swoop */
916 {
917 uint32 old_state;
918 uint32 desired_state;
919
920 old_state = pg_atomic_read_u32(&lock->state);
921 while (true)
922 {
923 desired_state = old_state;
924
925 /* compute desired flags */
926
927 if (new_release_ok)
928 desired_state |= LW_FLAG_RELEASE_OK;
929 else
930 desired_state &= ~LW_FLAG_RELEASE_OK;
931
932 if (proclist_is_empty(&wakeup))
933 desired_state &= ~LW_FLAG_HAS_WAITERS;
934
935 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
936
937 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
938 desired_state))
939 break;
940 }
941 }
942
943 /* Awaken any waiters I removed from the queue. */
944 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
945 {
946 PGPROC *waiter = GetPGProcByNumber(iter.cur);
947
948 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
949 proclist_delete(&wakeup, iter.cur, lwWaitLink);
950
951 /*
952 * Guarantee that lwWaiting being unset only becomes visible once the
953 * unlink from the link has completed. Otherwise the target backend
954 * could be woken up for other reason and enqueue for a new lock - if
955 * that happens before the list unlink happens, the list would end up
956 * being corrupted.
957 *
958 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
959 * another lock.
960 */
961 pg_write_barrier();
962 waiter->lwWaiting = false;
963 PGSemaphoreUnlock(waiter->sem);
964 }
965 }
966
967 /*
968 * Add ourselves to the end of the queue.
969 *
970 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
971 */
972 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)973 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
974 {
975 /*
976 * If we don't have a PGPROC structure, there's no way to wait. This
977 * should never occur, since MyProc should only be null during shared
978 * memory initialization.
979 */
980 if (MyProc == NULL)
981 elog(PANIC, "cannot wait without a PGPROC structure");
982
983 if (MyProc->lwWaiting)
984 elog(PANIC, "queueing for lock while waiting on another one");
985
986 LWLockWaitListLock(lock);
987
988 /* setting the flag is protected by the spinlock */
989 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
990
991 MyProc->lwWaiting = true;
992 MyProc->lwWaitMode = mode;
993
994 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
995 if (mode == LW_WAIT_UNTIL_FREE)
996 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
997 else
998 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
999
1000 /* Can release the mutex now */
1001 LWLockWaitListUnlock(lock);
1002
1003 #ifdef LOCK_DEBUG
1004 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1005 #endif
1006
1007 }
1008
1009 /*
1010 * Remove ourselves from the waitlist.
1011 *
1012 * This is used if we queued ourselves because we thought we needed to sleep
1013 * but, after further checking, we discovered that we don't actually need to
1014 * do so.
1015 */
1016 static void
LWLockDequeueSelf(LWLock * lock)1017 LWLockDequeueSelf(LWLock *lock)
1018 {
1019 bool found = false;
1020 proclist_mutable_iter iter;
1021
1022 #ifdef LWLOCK_STATS
1023 lwlock_stats *lwstats;
1024
1025 lwstats = get_lwlock_stats_entry(lock);
1026
1027 lwstats->dequeue_self_count++;
1028 #endif
1029
1030 LWLockWaitListLock(lock);
1031
1032 /*
1033 * Can't just remove ourselves from the list, but we need to iterate over
1034 * all entries as somebody else could have dequeued us.
1035 */
1036 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1037 {
1038 if (iter.cur == MyProc->pgprocno)
1039 {
1040 found = true;
1041 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1042 break;
1043 }
1044 }
1045
1046 if (proclist_is_empty(&lock->waiters) &&
1047 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1048 {
1049 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1050 }
1051
1052 /* XXX: combine with fetch_and above? */
1053 LWLockWaitListUnlock(lock);
1054
1055 /* clear waiting state again, nice for debugging */
1056 if (found)
1057 MyProc->lwWaiting = false;
1058 else
1059 {
1060 int extraWaits = 0;
1061
1062 /*
1063 * Somebody else dequeued us and has or will wake us up. Deal with the
1064 * superfluous absorption of a wakeup.
1065 */
1066
1067 /*
1068 * Reset releaseOk if somebody woke us before we removed ourselves -
1069 * they'll have set it to false.
1070 */
1071 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1072
1073 /*
1074 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1075 * get reset at some inconvenient point later. Most of the time this
1076 * will immediately return.
1077 */
1078 for (;;)
1079 {
1080 PGSemaphoreLock(MyProc->sem);
1081 if (!MyProc->lwWaiting)
1082 break;
1083 extraWaits++;
1084 }
1085
1086 /*
1087 * Fix the process wait semaphore's count for any absorbed wakeups.
1088 */
1089 while (extraWaits-- > 0)
1090 PGSemaphoreUnlock(MyProc->sem);
1091 }
1092
1093 #ifdef LOCK_DEBUG
1094 {
1095 /* not waiting anymore */
1096 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1097
1098 Assert(nwaiters < MAX_BACKENDS);
1099 }
1100 #endif
1101 }
1102
1103 /*
1104 * LWLockAcquire - acquire a lightweight lock in the specified mode
1105 *
1106 * If the lock is not available, sleep until it is. Returns true if the lock
1107 * was available immediately, false if we had to sleep.
1108 *
1109 * Side effect: cancel/die interrupts are held off until lock release.
1110 */
1111 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1112 LWLockAcquire(LWLock *lock, LWLockMode mode)
1113 {
1114 PGPROC *proc = MyProc;
1115 bool result = true;
1116 int extraWaits = 0;
1117 #ifdef LWLOCK_STATS
1118 lwlock_stats *lwstats;
1119
1120 lwstats = get_lwlock_stats_entry(lock);
1121 #endif
1122
1123 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1124
1125 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1126
1127 #ifdef LWLOCK_STATS
1128 /* Count lock acquisition attempts */
1129 if (mode == LW_EXCLUSIVE)
1130 lwstats->ex_acquire_count++;
1131 else
1132 lwstats->sh_acquire_count++;
1133 #endif /* LWLOCK_STATS */
1134
1135 /*
1136 * We can't wait if we haven't got a PGPROC. This should only occur
1137 * during bootstrap or shared memory initialization. Put an Assert here
1138 * to catch unsafe coding practices.
1139 */
1140 Assert(!(proc == NULL && IsUnderPostmaster));
1141
1142 /* Ensure we will have room to remember the lock */
1143 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1144 elog(ERROR, "too many LWLocks taken");
1145
1146 /*
1147 * Lock out cancel/die interrupts until we exit the code section protected
1148 * by the LWLock. This ensures that interrupts will not interfere with
1149 * manipulations of data structures in shared memory.
1150 */
1151 HOLD_INTERRUPTS();
1152
1153 /*
1154 * Loop here to try to acquire lock after each time we are signaled by
1155 * LWLockRelease.
1156 *
1157 * NOTE: it might seem better to have LWLockRelease actually grant us the
1158 * lock, rather than retrying and possibly having to go back to sleep. But
1159 * in practice that is no good because it means a process swap for every
1160 * lock acquisition when two or more processes are contending for the same
1161 * lock. Since LWLocks are normally used to protect not-very-long
1162 * sections of computation, a process needs to be able to acquire and
1163 * release the same lock many times during a single CPU time slice, even
1164 * in the presence of contention. The efficiency of being able to do that
1165 * outweighs the inefficiency of sometimes wasting a process dispatch
1166 * cycle because the lock is not free when a released waiter finally gets
1167 * to run. See pgsql-hackers archives for 29-Dec-01.
1168 */
1169 for (;;)
1170 {
1171 bool mustwait;
1172
1173 /*
1174 * Try to grab the lock the first time, we're not in the waitqueue
1175 * yet/anymore.
1176 */
1177 mustwait = LWLockAttemptLock(lock, mode);
1178
1179 if (!mustwait)
1180 {
1181 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1182 break; /* got the lock */
1183 }
1184
1185 /*
1186 * Ok, at this point we couldn't grab the lock on the first try. We
1187 * cannot simply queue ourselves to the end of the list and wait to be
1188 * woken up because by now the lock could long have been released.
1189 * Instead add us to the queue and try to grab the lock again. If we
1190 * succeed we need to revert the queuing and be happy, otherwise we
1191 * recheck the lock. If we still couldn't grab it, we know that the
1192 * other locker will see our queue entries when releasing since they
1193 * existed before we checked for the lock.
1194 */
1195
1196 /* add to the queue */
1197 LWLockQueueSelf(lock, mode);
1198
1199 /* we're now guaranteed to be woken up if necessary */
1200 mustwait = LWLockAttemptLock(lock, mode);
1201
1202 /* ok, grabbed the lock the second time round, need to undo queueing */
1203 if (!mustwait)
1204 {
1205 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1206
1207 LWLockDequeueSelf(lock);
1208 break;
1209 }
1210
1211 /*
1212 * Wait until awakened.
1213 *
1214 * It is possible that we get awakened for a reason other than being
1215 * signaled by LWLockRelease. If so, loop back and wait again. Once
1216 * we've gotten the LWLock, re-increment the sema by the number of
1217 * additional signals received.
1218 */
1219 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1220
1221 #ifdef LWLOCK_STATS
1222 lwstats->block_count++;
1223 #endif
1224
1225 LWLockReportWaitStart(lock);
1226 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1227
1228 for (;;)
1229 {
1230 PGSemaphoreLock(proc->sem);
1231 if (!proc->lwWaiting)
1232 break;
1233 extraWaits++;
1234 }
1235
1236 /* Retrying, allow LWLockRelease to release waiters again. */
1237 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1238
1239 #ifdef LOCK_DEBUG
1240 {
1241 /* not waiting anymore */
1242 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1243
1244 Assert(nwaiters < MAX_BACKENDS);
1245 }
1246 #endif
1247
1248 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1249 LWLockReportWaitEnd();
1250
1251 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1252
1253 /* Now loop back and try to acquire lock again. */
1254 result = false;
1255 }
1256
1257 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1258
1259 /* Add lock to list of locks held by this backend */
1260 held_lwlocks[num_held_lwlocks].lock = lock;
1261 held_lwlocks[num_held_lwlocks++].mode = mode;
1262
1263 /*
1264 * Fix the process wait semaphore's count for any absorbed wakeups.
1265 */
1266 while (extraWaits-- > 0)
1267 PGSemaphoreUnlock(proc->sem);
1268
1269 return result;
1270 }
1271
1272 /*
1273 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1274 *
1275 * If the lock is not available, return FALSE with no side-effects.
1276 *
1277 * If successful, cancel/die interrupts are held off until lock release.
1278 */
1279 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1280 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1281 {
1282 bool mustwait;
1283
1284 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1285
1286 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1287
1288 /* Ensure we will have room to remember the lock */
1289 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1290 elog(ERROR, "too many LWLocks taken");
1291
1292 /*
1293 * Lock out cancel/die interrupts until we exit the code section protected
1294 * by the LWLock. This ensures that interrupts will not interfere with
1295 * manipulations of data structures in shared memory.
1296 */
1297 HOLD_INTERRUPTS();
1298
1299 /* Check for the lock */
1300 mustwait = LWLockAttemptLock(lock, mode);
1301
1302 if (mustwait)
1303 {
1304 /* Failed to get lock, so release interrupt holdoff */
1305 RESUME_INTERRUPTS();
1306
1307 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1308 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1309 }
1310 else
1311 {
1312 /* Add lock to list of locks held by this backend */
1313 held_lwlocks[num_held_lwlocks].lock = lock;
1314 held_lwlocks[num_held_lwlocks++].mode = mode;
1315 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1316 }
1317 return !mustwait;
1318 }
1319
1320 /*
1321 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1322 *
1323 * The semantics of this function are a bit funky. If the lock is currently
1324 * free, it is acquired in the given mode, and the function returns true. If
1325 * the lock isn't immediately free, the function waits until it is released
1326 * and returns false, but does not acquire the lock.
1327 *
1328 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1329 * holding WALWriteLock, it can flush the commit records of many other
1330 * backends as a side-effect. Those other backends need to wait until the
1331 * flush finishes, but don't need to acquire the lock anymore. They can just
1332 * wake up, observe that their records have already been flushed, and return.
1333 */
1334 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1335 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1336 {
1337 PGPROC *proc = MyProc;
1338 bool mustwait;
1339 int extraWaits = 0;
1340 #ifdef LWLOCK_STATS
1341 lwlock_stats *lwstats;
1342
1343 lwstats = get_lwlock_stats_entry(lock);
1344 #endif
1345
1346 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1347
1348 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1349
1350 /* Ensure we will have room to remember the lock */
1351 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1352 elog(ERROR, "too many LWLocks taken");
1353
1354 /*
1355 * Lock out cancel/die interrupts until we exit the code section protected
1356 * by the LWLock. This ensures that interrupts will not interfere with
1357 * manipulations of data structures in shared memory.
1358 */
1359 HOLD_INTERRUPTS();
1360
1361 /*
1362 * NB: We're using nearly the same twice-in-a-row lock acquisition
1363 * protocol as LWLockAcquire(). Check its comments for details.
1364 */
1365 mustwait = LWLockAttemptLock(lock, mode);
1366
1367 if (mustwait)
1368 {
1369 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1370
1371 mustwait = LWLockAttemptLock(lock, mode);
1372
1373 if (mustwait)
1374 {
1375 /*
1376 * Wait until awakened. Like in LWLockAcquire, be prepared for
1377 * bogus wakeups.
1378 */
1379 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1380
1381 #ifdef LWLOCK_STATS
1382 lwstats->block_count++;
1383 #endif
1384
1385 LWLockReportWaitStart(lock);
1386 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1387
1388 for (;;)
1389 {
1390 PGSemaphoreLock(proc->sem);
1391 if (!proc->lwWaiting)
1392 break;
1393 extraWaits++;
1394 }
1395
1396 #ifdef LOCK_DEBUG
1397 {
1398 /* not waiting anymore */
1399 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1400
1401 Assert(nwaiters < MAX_BACKENDS);
1402 }
1403 #endif
1404 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1405 LWLockReportWaitEnd();
1406
1407 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1408 }
1409 else
1410 {
1411 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1412
1413 /*
1414 * Got lock in the second attempt, undo queueing. We need to treat
1415 * this as having successfully acquired the lock, otherwise we'd
1416 * not necessarily wake up people we've prevented from acquiring
1417 * the lock.
1418 */
1419 LWLockDequeueSelf(lock);
1420 }
1421 }
1422
1423 /*
1424 * Fix the process wait semaphore's count for any absorbed wakeups.
1425 */
1426 while (extraWaits-- > 0)
1427 PGSemaphoreUnlock(proc->sem);
1428
1429 if (mustwait)
1430 {
1431 /* Failed to get lock, so release interrupt holdoff */
1432 RESUME_INTERRUPTS();
1433 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1434 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1435 }
1436 else
1437 {
1438 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1439 /* Add lock to list of locks held by this backend */
1440 held_lwlocks[num_held_lwlocks].lock = lock;
1441 held_lwlocks[num_held_lwlocks++].mode = mode;
1442 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1443 }
1444
1445 return !mustwait;
1446 }
1447
1448 /*
1449 * Does the lwlock in its current state need to wait for the variable value to
1450 * change?
1451 *
1452 * If we don't need to wait, and it's because the value of the variable has
1453 * changed, store the current value in newval.
1454 *
1455 * *result is set to true if the lock was free, and false otherwise.
1456 */
1457 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1458 LWLockConflictsWithVar(LWLock *lock,
1459 uint64 *valptr, uint64 oldval, uint64 *newval,
1460 bool *result)
1461 {
1462 bool mustwait;
1463 uint64 value;
1464
1465 /*
1466 * Test first to see if it the slot is free right now.
1467 *
1468 * XXX: the caller uses a spinlock before this, so we don't need a memory
1469 * barrier here as far as the current usage is concerned. But that might
1470 * not be safe in general.
1471 */
1472 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1473
1474 if (!mustwait)
1475 {
1476 *result = true;
1477 return false;
1478 }
1479
1480 *result = false;
1481
1482 /*
1483 * Read value using the lwlock's wait list lock, as we can't generally
1484 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1485 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1486 */
1487 LWLockWaitListLock(lock);
1488 value = *valptr;
1489 LWLockWaitListUnlock(lock);
1490
1491 if (value != oldval)
1492 {
1493 mustwait = false;
1494 *newval = value;
1495 }
1496 else
1497 {
1498 mustwait = true;
1499 }
1500
1501 return mustwait;
1502 }
1503
1504 /*
1505 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1506 *
1507 * If the lock is held and *valptr equals oldval, waits until the lock is
1508 * either freed, or the lock holder updates *valptr by calling
1509 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1510 * waiting), returns true. If the lock is still held, but *valptr no longer
1511 * matches oldval, returns false and sets *newval to the current value in
1512 * *valptr.
1513 *
1514 * Note: this function ignores shared lock holders; if the lock is held
1515 * in shared mode, returns 'true'.
1516 */
1517 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1518 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1519 {
1520 PGPROC *proc = MyProc;
1521 int extraWaits = 0;
1522 bool result = false;
1523 #ifdef LWLOCK_STATS
1524 lwlock_stats *lwstats;
1525
1526 lwstats = get_lwlock_stats_entry(lock);
1527 #endif
1528
1529 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1530
1531 /*
1532 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1533 * cleanup mechanism to remove us from the wait queue if we got
1534 * interrupted.
1535 */
1536 HOLD_INTERRUPTS();
1537
1538 /*
1539 * Loop here to check the lock's status after each time we are signaled.
1540 */
1541 for (;;)
1542 {
1543 bool mustwait;
1544
1545 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1546 &result);
1547
1548 if (!mustwait)
1549 break; /* the lock was free or value didn't match */
1550
1551 /*
1552 * Add myself to wait queue. Note that this is racy, somebody else
1553 * could wakeup before we're finished queuing. NB: We're using nearly
1554 * the same twice-in-a-row lock acquisition protocol as
1555 * LWLockAcquire(). Check its comments for details. The only
1556 * difference is that we also have to check the variable's values when
1557 * checking the state of the lock.
1558 */
1559 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1560
1561 /*
1562 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1563 * lock is released.
1564 */
1565 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1566
1567 /*
1568 * We're now guaranteed to be woken up if necessary. Recheck the lock
1569 * and variables state.
1570 */
1571 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1572 &result);
1573
1574 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1575 if (!mustwait)
1576 {
1577 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1578
1579 LWLockDequeueSelf(lock);
1580 break;
1581 }
1582
1583 /*
1584 * Wait until awakened.
1585 *
1586 * It is possible that we get awakened for a reason other than being
1587 * signaled by LWLockRelease. If so, loop back and wait again. Once
1588 * we've gotten the LWLock, re-increment the sema by the number of
1589 * additional signals received.
1590 */
1591 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1592
1593 #ifdef LWLOCK_STATS
1594 lwstats->block_count++;
1595 #endif
1596
1597 LWLockReportWaitStart(lock);
1598 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1599
1600 for (;;)
1601 {
1602 PGSemaphoreLock(proc->sem);
1603 if (!proc->lwWaiting)
1604 break;
1605 extraWaits++;
1606 }
1607
1608 #ifdef LOCK_DEBUG
1609 {
1610 /* not waiting anymore */
1611 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1612
1613 Assert(nwaiters < MAX_BACKENDS);
1614 }
1615 #endif
1616
1617 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1618 LWLockReportWaitEnd();
1619
1620 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1621
1622 /* Now loop back and check the status of the lock again. */
1623 }
1624
1625 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1626
1627 /*
1628 * Fix the process wait semaphore's count for any absorbed wakeups.
1629 */
1630 while (extraWaits-- > 0)
1631 PGSemaphoreUnlock(proc->sem);
1632
1633 /*
1634 * Now okay to allow cancel/die interrupts.
1635 */
1636 RESUME_INTERRUPTS();
1637
1638 return result;
1639 }
1640
1641
1642 /*
1643 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1644 *
1645 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1646 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1647 * atomically so that any process calling LWLockWaitForVar() on the same lock
1648 * is guaranteed to see the new value, and act accordingly.
1649 *
1650 * The caller must be holding the lock in exclusive mode.
1651 */
1652 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1653 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1654 {
1655 proclist_head wakeup;
1656 proclist_mutable_iter iter;
1657
1658 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1659
1660 proclist_init(&wakeup);
1661
1662 LWLockWaitListLock(lock);
1663
1664 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1665
1666 /* Update the lock's value */
1667 *valptr = val;
1668
1669 /*
1670 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1671 * up. They are always in the front of the queue.
1672 */
1673 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1674 {
1675 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1676
1677 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1678 break;
1679
1680 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1681 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1682 }
1683
1684 /* We are done updating shared state of the lock itself. */
1685 LWLockWaitListUnlock(lock);
1686
1687 /*
1688 * Awaken any waiters I removed from the queue.
1689 */
1690 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1691 {
1692 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1693
1694 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1695 /* check comment in LWLockWakeup() about this barrier */
1696 pg_write_barrier();
1697 waiter->lwWaiting = false;
1698 PGSemaphoreUnlock(waiter->sem);
1699 }
1700 }
1701
1702
1703 /*
1704 * LWLockRelease - release a previously acquired lock
1705 */
1706 void
LWLockRelease(LWLock * lock)1707 LWLockRelease(LWLock *lock)
1708 {
1709 LWLockMode mode;
1710 uint32 oldstate;
1711 bool check_waiters;
1712 int i;
1713
1714 /*
1715 * Remove lock from list of locks held. Usually, but not always, it will
1716 * be the latest-acquired lock; so search array backwards.
1717 */
1718 for (i = num_held_lwlocks; --i >= 0;)
1719 if (lock == held_lwlocks[i].lock)
1720 break;
1721
1722 if (i < 0)
1723 elog(ERROR, "lock %s is not held", T_NAME(lock));
1724
1725 mode = held_lwlocks[i].mode;
1726
1727 num_held_lwlocks--;
1728 for (; i < num_held_lwlocks; i++)
1729 held_lwlocks[i] = held_lwlocks[i + 1];
1730
1731 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1732
1733 /*
1734 * Release my hold on lock, after that it can immediately be acquired by
1735 * others, even if we still have to wakeup other waiters.
1736 */
1737 if (mode == LW_EXCLUSIVE)
1738 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1739 else
1740 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1741
1742 /* nobody else can have that kind of lock */
1743 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1744
1745
1746 /*
1747 * We're still waiting for backends to get scheduled, don't wake them up
1748 * again.
1749 */
1750 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1751 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1752 (oldstate & LW_LOCK_MASK) == 0)
1753 check_waiters = true;
1754 else
1755 check_waiters = false;
1756
1757 /*
1758 * As waking up waiters requires the spinlock to be acquired, only do so
1759 * if necessary.
1760 */
1761 if (check_waiters)
1762 {
1763 /* XXX: remove before commit? */
1764 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1765 LWLockWakeup(lock);
1766 }
1767
1768 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1769
1770 /*
1771 * Now okay to allow cancel/die interrupts.
1772 */
1773 RESUME_INTERRUPTS();
1774 }
1775
1776 /*
1777 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1778 */
1779 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1780 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1781 {
1782 LWLockWaitListLock(lock);
1783
1784 /*
1785 * Set the variable's value before releasing the lock, that prevents race
1786 * a race condition wherein a new locker acquires the lock, but hasn't yet
1787 * set the variables value.
1788 */
1789 *valptr = val;
1790 LWLockWaitListUnlock(lock);
1791
1792 LWLockRelease(lock);
1793 }
1794
1795
1796 /*
1797 * LWLockReleaseAll - release all currently-held locks
1798 *
1799 * Used to clean up after ereport(ERROR). An important difference between this
1800 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1801 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1802 * has been set to an appropriate level earlier in error recovery. We could
1803 * decrement it below zero if we allow it to drop for each released lock!
1804 */
1805 void
LWLockReleaseAll(void)1806 LWLockReleaseAll(void)
1807 {
1808 while (num_held_lwlocks > 0)
1809 {
1810 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1811
1812 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1813 }
1814 }
1815
1816
1817 /*
1818 * LWLockHeldByMe - test whether my process holds a lock in any mode
1819 *
1820 * This is meant as debug support only.
1821 */
1822 bool
LWLockHeldByMe(LWLock * l)1823 LWLockHeldByMe(LWLock *l)
1824 {
1825 int i;
1826
1827 for (i = 0; i < num_held_lwlocks; i++)
1828 {
1829 if (held_lwlocks[i].lock == l)
1830 return true;
1831 }
1832 return false;
1833 }
1834
1835 /*
1836 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1837 *
1838 * This is meant as debug support only.
1839 */
1840 bool
LWLockHeldByMeInMode(LWLock * l,LWLockMode mode)1841 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1842 {
1843 int i;
1844
1845 for (i = 0; i < num_held_lwlocks; i++)
1846 {
1847 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1848 return true;
1849 }
1850 return false;
1851 }
1852