1 /*-------------------------------------------------------------------------
2 *
3 * lwlock.c
4 * Lightweight lock manager
5 *
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
12 *
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
22 *
23 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
28 *
29 * NOTES:
30 *
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
37 *
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
40 *
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
46 *
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 *
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
54 *
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
58 *
59 *
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
65
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
72 *
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
76 */
77 #include "postgres.h"
78
79 #include "miscadmin.h"
80 #include "pg_trace.h"
81 #include "pgstat.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94
95
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98
99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
102
103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104 #define LW_VAL_SHARED 1
105
106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
109
110 /*
111 * There are three sorts of LWLock "tranches":
112 *
113 * 1. The individually-named locks defined in lwlocknames.h each have their
114 * own tranche. The names of these tranches appear in IndividualLWLockNames[]
115 * in lwlocknames.c.
116 *
117 * 2. There are some predefined tranches for built-in groups of locks.
118 * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
119 * appear in BuiltinTrancheNames[] below.
120 *
121 * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
122 * or LWLockRegisterTranche. The names of these that are known in the current
123 * process appear in LWLockTrancheNames[].
124 *
125 * All these names are user-visible as wait event names, so choose with care
126 * ... and do not forget to update the documentation's list of wait events.
127 */
128 extern const char *const IndividualLWLockNames[]; /* in lwlocknames.c */
129
130 static const char *const BuiltinTrancheNames[] = {
131 /* LWTRANCHE_XACT_BUFFER: */
132 "XactBuffer",
133 /* LWTRANCHE_COMMITTS_BUFFER: */
134 "CommitTSBuffer",
135 /* LWTRANCHE_SUBTRANS_BUFFER: */
136 "SubtransBuffer",
137 /* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
138 "MultiXactOffsetBuffer",
139 /* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
140 "MultiXactMemberBuffer",
141 /* LWTRANCHE_NOTIFY_BUFFER: */
142 "NotifyBuffer",
143 /* LWTRANCHE_SERIAL_BUFFER: */
144 "SerialBuffer",
145 /* LWTRANCHE_WAL_INSERT: */
146 "WALInsert",
147 /* LWTRANCHE_BUFFER_CONTENT: */
148 "BufferContent",
149 /* LWTRANCHE_BUFFER_IO: */
150 "BufferIO",
151 /* LWTRANCHE_REPLICATION_ORIGIN_STATE: */
152 "ReplicationOriginState",
153 /* LWTRANCHE_REPLICATION_SLOT_IO: */
154 "ReplicationSlotIO",
155 /* LWTRANCHE_LOCK_FASTPATH: */
156 "LockFastPath",
157 /* LWTRANCHE_BUFFER_MAPPING: */
158 "BufferMapping",
159 /* LWTRANCHE_LOCK_MANAGER: */
160 "LockManager",
161 /* LWTRANCHE_PREDICATE_LOCK_MANAGER: */
162 "PredicateLockManager",
163 /* LWTRANCHE_PARALLEL_HASH_JOIN: */
164 "ParallelHashJoin",
165 /* LWTRANCHE_PARALLEL_QUERY_DSA: */
166 "ParallelQueryDSA",
167 /* LWTRANCHE_PER_SESSION_DSA: */
168 "PerSessionDSA",
169 /* LWTRANCHE_PER_SESSION_RECORD_TYPE: */
170 "PerSessionRecordType",
171 /* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */
172 "PerSessionRecordTypmod",
173 /* LWTRANCHE_SHARED_TUPLESTORE: */
174 "SharedTupleStore",
175 /* LWTRANCHE_SHARED_TIDBITMAP: */
176 "SharedTidBitmap",
177 /* LWTRANCHE_PARALLEL_APPEND: */
178 "ParallelAppend",
179 /* LWTRANCHE_PER_XACT_PREDICATE_LIST: */
180 "PerXactPredicateList"
181 };
182
183 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
184 LWTRANCHE_FIRST_USER_DEFINED - NUM_INDIVIDUAL_LWLOCKS,
185 "missing entries in BuiltinTrancheNames[]");
186
187 /*
188 * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
189 * stores the names of all dynamically-created tranches known to the current
190 * process. Any unused entries in the array will contain NULL.
191 */
192 static const char **LWLockTrancheNames = NULL;
193 static int LWLockTrancheNamesAllocated = 0;
194
195 /*
196 * This points to the main array of LWLocks in shared memory. Backends inherit
197 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
198 * where we have special measures to pass it down).
199 */
200 LWLockPadded *MainLWLockArray = NULL;
201
202 /*
203 * We use this structure to keep track of locked LWLocks for release
204 * during error recovery. Normally, only a few will be held at once, but
205 * occasionally the number can be much higher; for example, the pg_buffercache
206 * extension locks all buffer partitions simultaneously.
207 */
208 #define MAX_SIMUL_LWLOCKS 200
209
210 /* struct representing the LWLocks we're holding */
211 typedef struct LWLockHandle
212 {
213 LWLock *lock;
214 LWLockMode mode;
215 } LWLockHandle;
216
217 static int num_held_lwlocks = 0;
218 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
219
220 /* struct representing the LWLock tranche request for named tranche */
221 typedef struct NamedLWLockTrancheRequest
222 {
223 char tranche_name[NAMEDATALEN];
224 int num_lwlocks;
225 } NamedLWLockTrancheRequest;
226
227 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
228 static int NamedLWLockTrancheRequestsAllocated = 0;
229
230 /*
231 * NamedLWLockTrancheRequests is both the valid length of the request array,
232 * and the length of the shared-memory NamedLWLockTrancheArray later on.
233 * This variable and NamedLWLockTrancheArray are non-static so that
234 * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
235 */
236 int NamedLWLockTrancheRequests = 0;
237
238 /* points to data in shared memory: */
239 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
240
241 static bool lock_named_request_allowed = true;
242
243 static void InitializeLWLocks(void);
244 static inline void LWLockReportWaitStart(LWLock *lock);
245 static inline void LWLockReportWaitEnd(void);
246 static const char *GetLWTrancheName(uint16 trancheId);
247
248 #define T_NAME(lock) \
249 GetLWTrancheName((lock)->tranche)
250
251 #ifdef LWLOCK_STATS
252 typedef struct lwlock_stats_key
253 {
254 int tranche;
255 void *instance;
256 } lwlock_stats_key;
257
258 typedef struct lwlock_stats
259 {
260 lwlock_stats_key key;
261 int sh_acquire_count;
262 int ex_acquire_count;
263 int block_count;
264 int dequeue_self_count;
265 int spin_delay_count;
266 } lwlock_stats;
267
268 static HTAB *lwlock_stats_htab;
269 static lwlock_stats lwlock_stats_dummy;
270 #endif
271
272 #ifdef LOCK_DEBUG
273 bool Trace_lwlocks = false;
274
275 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)276 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
277 {
278 /* hide statement & context here, otherwise the log is just too verbose */
279 if (Trace_lwlocks)
280 {
281 uint32 state = pg_atomic_read_u32(&lock->state);
282
283 ereport(LOG,
284 (errhidestmt(true),
285 errhidecontext(true),
286 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
287 MyProcPid,
288 where, T_NAME(lock), lock,
289 (state & LW_VAL_EXCLUSIVE) != 0,
290 state & LW_SHARED_MASK,
291 (state & LW_FLAG_HAS_WAITERS) != 0,
292 pg_atomic_read_u32(&lock->nwaiters),
293 (state & LW_FLAG_RELEASE_OK) != 0)));
294 }
295 }
296
297 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)298 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
299 {
300 /* hide statement & context here, otherwise the log is just too verbose */
301 if (Trace_lwlocks)
302 {
303 ereport(LOG,
304 (errhidestmt(true),
305 errhidecontext(true),
306 errmsg_internal("%s(%s %p): %s", where,
307 T_NAME(lock), lock, msg)));
308 }
309 }
310
311 #else /* not LOCK_DEBUG */
312 #define PRINT_LWDEBUG(a,b,c) ((void)0)
313 #define LOG_LWDEBUG(a,b,c) ((void)0)
314 #endif /* LOCK_DEBUG */
315
316 #ifdef LWLOCK_STATS
317
318 static void init_lwlock_stats(void);
319 static void print_lwlock_stats(int code, Datum arg);
320 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
321
322 static void
init_lwlock_stats(void)323 init_lwlock_stats(void)
324 {
325 HASHCTL ctl;
326 static MemoryContext lwlock_stats_cxt = NULL;
327 static bool exit_registered = false;
328
329 if (lwlock_stats_cxt != NULL)
330 MemoryContextDelete(lwlock_stats_cxt);
331
332 /*
333 * The LWLock stats will be updated within a critical section, which
334 * requires allocating new hash entries. Allocations within a critical
335 * section are normally not allowed because running out of memory would
336 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
337 * turned on in production, so that's an acceptable risk. The hash entries
338 * are small, so the risk of running out of memory is minimal in practice.
339 */
340 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
341 "LWLock stats",
342 ALLOCSET_DEFAULT_SIZES);
343 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
344
345 MemSet(&ctl, 0, sizeof(ctl));
346 ctl.keysize = sizeof(lwlock_stats_key);
347 ctl.entrysize = sizeof(lwlock_stats);
348 ctl.hcxt = lwlock_stats_cxt;
349 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
350 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
351 if (!exit_registered)
352 {
353 on_shmem_exit(print_lwlock_stats, 0);
354 exit_registered = true;
355 }
356 }
357
358 static void
print_lwlock_stats(int code,Datum arg)359 print_lwlock_stats(int code, Datum arg)
360 {
361 HASH_SEQ_STATUS scan;
362 lwlock_stats *lwstats;
363
364 hash_seq_init(&scan, lwlock_stats_htab);
365
366 /* Grab an LWLock to keep different backends from mixing reports */
367 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
368
369 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
370 {
371 fprintf(stderr,
372 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
373 MyProcPid, GetLWTrancheName(lwstats->key.tranche),
374 lwstats->key.instance, lwstats->sh_acquire_count,
375 lwstats->ex_acquire_count, lwstats->block_count,
376 lwstats->spin_delay_count, lwstats->dequeue_self_count);
377 }
378
379 LWLockRelease(&MainLWLockArray[0].lock);
380 }
381
382 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)383 get_lwlock_stats_entry(LWLock *lock)
384 {
385 lwlock_stats_key key;
386 lwlock_stats *lwstats;
387 bool found;
388
389 /*
390 * During shared memory initialization, the hash table doesn't exist yet.
391 * Stats of that phase aren't very interesting, so just collect operations
392 * on all locks in a single dummy entry.
393 */
394 if (lwlock_stats_htab == NULL)
395 return &lwlock_stats_dummy;
396
397 /* Fetch or create the entry. */
398 MemSet(&key, 0, sizeof(key));
399 key.tranche = lock->tranche;
400 key.instance = lock;
401 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
402 if (!found)
403 {
404 lwstats->sh_acquire_count = 0;
405 lwstats->ex_acquire_count = 0;
406 lwstats->block_count = 0;
407 lwstats->dequeue_self_count = 0;
408 lwstats->spin_delay_count = 0;
409 }
410 return lwstats;
411 }
412 #endif /* LWLOCK_STATS */
413
414
415 /*
416 * Compute number of LWLocks required by named tranches. These will be
417 * allocated in the main array.
418 */
419 static int
NumLWLocksForNamedTranches(void)420 NumLWLocksForNamedTranches(void)
421 {
422 int numLocks = 0;
423 int i;
424
425 for (i = 0; i < NamedLWLockTrancheRequests; i++)
426 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
427
428 return numLocks;
429 }
430
431 /*
432 * Compute shmem space needed for LWLocks and named tranches.
433 */
434 Size
LWLockShmemSize(void)435 LWLockShmemSize(void)
436 {
437 Size size;
438 int i;
439 int numLocks = NUM_FIXED_LWLOCKS;
440
441 /* Calculate total number of locks needed in the main array. */
442 numLocks += NumLWLocksForNamedTranches();
443
444 /* Space for the LWLock array. */
445 size = mul_size(numLocks, sizeof(LWLockPadded));
446
447 /* Space for dynamic allocation counter, plus room for alignment. */
448 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
449
450 /* space for named tranches. */
451 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
452
453 /* space for name of each tranche. */
454 for (i = 0; i < NamedLWLockTrancheRequests; i++)
455 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
456
457 /* Disallow adding any more named tranches. */
458 lock_named_request_allowed = false;
459
460 return size;
461 }
462
463 /*
464 * Allocate shmem space for the main LWLock array and all tranches and
465 * initialize it. We also register extension LWLock tranches here.
466 */
467 void
CreateLWLocks(void)468 CreateLWLocks(void)
469 {
470 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
471 "MAX_BACKENDS too big for lwlock.c");
472
473 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
474 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
475 "Miscalculated LWLock padding");
476
477 if (!IsUnderPostmaster)
478 {
479 Size spaceLocks = LWLockShmemSize();
480 int *LWLockCounter;
481 char *ptr;
482
483 /* Allocate space */
484 ptr = (char *) ShmemAlloc(spaceLocks);
485
486 /* Leave room for dynamic allocation of tranches */
487 ptr += sizeof(int);
488
489 /* Ensure desired alignment of LWLock array */
490 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
491
492 MainLWLockArray = (LWLockPadded *) ptr;
493
494 /*
495 * Initialize the dynamic-allocation counter for tranches, which is
496 * stored just before the first LWLock.
497 */
498 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
499 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
500
501 /* Initialize all LWLocks */
502 InitializeLWLocks();
503 }
504
505 /* Register named extension LWLock tranches in the current process. */
506 for (int i = 0; i < NamedLWLockTrancheRequests; i++)
507 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
508 NamedLWLockTrancheArray[i].trancheName);
509 }
510
511 /*
512 * Initialize LWLocks that are fixed and those belonging to named tranches.
513 */
514 static void
InitializeLWLocks(void)515 InitializeLWLocks(void)
516 {
517 int numNamedLocks = NumLWLocksForNamedTranches();
518 int id;
519 int i;
520 int j;
521 LWLockPadded *lock;
522
523 /* Initialize all individual LWLocks in main array */
524 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
525 LWLockInitialize(&lock->lock, id);
526
527 /* Initialize buffer mapping LWLocks in main array */
528 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
529 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
530 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
531
532 /* Initialize lmgrs' LWLocks in main array */
533 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
534 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
535 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
536
537 /* Initialize predicate lmgrs' LWLocks in main array */
538 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
539 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
540 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
541 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
542
543 /*
544 * Copy the info about any named tranches into shared memory (so that
545 * other processes can see it), and initialize the requested LWLocks.
546 */
547 if (NamedLWLockTrancheRequests > 0)
548 {
549 char *trancheNames;
550
551 NamedLWLockTrancheArray = (NamedLWLockTranche *)
552 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
553
554 trancheNames = (char *) NamedLWLockTrancheArray +
555 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
556 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
557
558 for (i = 0; i < NamedLWLockTrancheRequests; i++)
559 {
560 NamedLWLockTrancheRequest *request;
561 NamedLWLockTranche *tranche;
562 char *name;
563
564 request = &NamedLWLockTrancheRequestArray[i];
565 tranche = &NamedLWLockTrancheArray[i];
566
567 name = trancheNames;
568 trancheNames += strlen(request->tranche_name) + 1;
569 strcpy(name, request->tranche_name);
570 tranche->trancheId = LWLockNewTrancheId();
571 tranche->trancheName = name;
572
573 for (j = 0; j < request->num_lwlocks; j++, lock++)
574 LWLockInitialize(&lock->lock, tranche->trancheId);
575 }
576 }
577 }
578
579 /*
580 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
581 */
582 void
InitLWLockAccess(void)583 InitLWLockAccess(void)
584 {
585 #ifdef LWLOCK_STATS
586 init_lwlock_stats();
587 #endif
588 }
589
590 /*
591 * GetNamedLWLockTranche - returns the base address of LWLock from the
592 * specified tranche.
593 *
594 * Caller needs to retrieve the requested number of LWLocks starting from
595 * the base lock address returned by this API. This can be used for
596 * tranches that are requested by using RequestNamedLWLockTranche() API.
597 */
598 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)599 GetNamedLWLockTranche(const char *tranche_name)
600 {
601 int lock_pos;
602 int i;
603
604 /*
605 * Obtain the position of base address of LWLock belonging to requested
606 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
607 * in MainLWLockArray after fixed locks.
608 */
609 lock_pos = NUM_FIXED_LWLOCKS;
610 for (i = 0; i < NamedLWLockTrancheRequests; i++)
611 {
612 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
613 tranche_name) == 0)
614 return &MainLWLockArray[lock_pos];
615
616 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
617 }
618
619 elog(ERROR, "requested tranche is not registered");
620
621 /* just to keep compiler quiet */
622 return NULL;
623 }
624
625 /*
626 * Allocate a new tranche ID.
627 */
628 int
LWLockNewTrancheId(void)629 LWLockNewTrancheId(void)
630 {
631 int result;
632 int *LWLockCounter;
633
634 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
635 SpinLockAcquire(ShmemLock);
636 result = (*LWLockCounter)++;
637 SpinLockRelease(ShmemLock);
638
639 return result;
640 }
641
642 /*
643 * Register a dynamic tranche name in the lookup table of the current process.
644 *
645 * This routine will save a pointer to the tranche name passed as an argument,
646 * so the name should be allocated in a backend-lifetime context
647 * (shared memory, TopMemoryContext, static constant, or similar).
648 *
649 * The tranche name will be user-visible as a wait event name, so try to
650 * use a name that fits the style for those.
651 */
652 void
LWLockRegisterTranche(int tranche_id,const char * tranche_name)653 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
654 {
655 /* This should only be called for user-defined tranches. */
656 if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED)
657 return;
658
659 /* Convert to array index. */
660 tranche_id -= LWTRANCHE_FIRST_USER_DEFINED;
661
662 /* If necessary, create or enlarge array. */
663 if (tranche_id >= LWLockTrancheNamesAllocated)
664 {
665 int newalloc;
666
667 newalloc = Max(LWLockTrancheNamesAllocated, 8);
668 while (newalloc <= tranche_id)
669 newalloc *= 2;
670
671 if (LWLockTrancheNames == NULL)
672 LWLockTrancheNames = (const char **)
673 MemoryContextAllocZero(TopMemoryContext,
674 newalloc * sizeof(char *));
675 else
676 {
677 LWLockTrancheNames = (const char **)
678 repalloc(LWLockTrancheNames, newalloc * sizeof(char *));
679 memset(LWLockTrancheNames + LWLockTrancheNamesAllocated,
680 0,
681 (newalloc - LWLockTrancheNamesAllocated) * sizeof(char *));
682 }
683 LWLockTrancheNamesAllocated = newalloc;
684 }
685
686 LWLockTrancheNames[tranche_id] = tranche_name;
687 }
688
689 /*
690 * RequestNamedLWLockTranche
691 * Request that extra LWLocks be allocated during postmaster
692 * startup.
693 *
694 * This is only useful for extensions if called from the _PG_init hook
695 * of a library that is loaded into the postmaster via
696 * shared_preload_libraries. Once shared memory has been allocated, calls
697 * will be ignored. (We could raise an error, but it seems better to make
698 * it a no-op, so that libraries containing such calls can be reloaded if
699 * needed.)
700 *
701 * The tranche name will be user-visible as a wait event name, so try to
702 * use a name that fits the style for those.
703 */
704 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)705 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
706 {
707 NamedLWLockTrancheRequest *request;
708
709 if (IsUnderPostmaster || !lock_named_request_allowed)
710 return; /* too late */
711
712 if (NamedLWLockTrancheRequestArray == NULL)
713 {
714 NamedLWLockTrancheRequestsAllocated = 16;
715 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
716 MemoryContextAlloc(TopMemoryContext,
717 NamedLWLockTrancheRequestsAllocated
718 * sizeof(NamedLWLockTrancheRequest));
719 }
720
721 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
722 {
723 int i = NamedLWLockTrancheRequestsAllocated;
724
725 while (i <= NamedLWLockTrancheRequests)
726 i *= 2;
727
728 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
729 repalloc(NamedLWLockTrancheRequestArray,
730 i * sizeof(NamedLWLockTrancheRequest));
731 NamedLWLockTrancheRequestsAllocated = i;
732 }
733
734 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
735 Assert(strlen(tranche_name) + 1 <= NAMEDATALEN);
736 strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
737 request->num_lwlocks = num_lwlocks;
738 NamedLWLockTrancheRequests++;
739 }
740
741 /*
742 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
743 */
744 void
LWLockInitialize(LWLock * lock,int tranche_id)745 LWLockInitialize(LWLock *lock, int tranche_id)
746 {
747 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
748 #ifdef LOCK_DEBUG
749 pg_atomic_init_u32(&lock->nwaiters, 0);
750 #endif
751 lock->tranche = tranche_id;
752 proclist_init(&lock->waiters);
753 }
754
755 /*
756 * Report start of wait event for light-weight locks.
757 *
758 * This function will be used by all the light-weight lock calls which
759 * needs to wait to acquire the lock. This function distinguishes wait
760 * event based on tranche and lock id.
761 */
762 static inline void
LWLockReportWaitStart(LWLock * lock)763 LWLockReportWaitStart(LWLock *lock)
764 {
765 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
766 }
767
768 /*
769 * Report end of wait event for light-weight locks.
770 */
771 static inline void
LWLockReportWaitEnd(void)772 LWLockReportWaitEnd(void)
773 {
774 pgstat_report_wait_end();
775 }
776
777 /*
778 * Return the name of an LWLock tranche.
779 */
780 static const char *
GetLWTrancheName(uint16 trancheId)781 GetLWTrancheName(uint16 trancheId)
782 {
783 /* Individual LWLock? */
784 if (trancheId < NUM_INDIVIDUAL_LWLOCKS)
785 return IndividualLWLockNames[trancheId];
786
787 /* Built-in tranche? */
788 if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
789 return BuiltinTrancheNames[trancheId - NUM_INDIVIDUAL_LWLOCKS];
790
791 /*
792 * It's an extension tranche, so look in LWLockTrancheNames[]. However,
793 * it's possible that the tranche has never been registered in the current
794 * process, in which case give up and return "extension".
795 */
796 trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
797
798 if (trancheId >= LWLockTrancheNamesAllocated ||
799 LWLockTrancheNames[trancheId] == NULL)
800 return "extension";
801
802 return LWLockTrancheNames[trancheId];
803 }
804
805 /*
806 * Return an identifier for an LWLock based on the wait class and event.
807 */
808 const char *
GetLWLockIdentifier(uint32 classId,uint16 eventId)809 GetLWLockIdentifier(uint32 classId, uint16 eventId)
810 {
811 Assert(classId == PG_WAIT_LWLOCK);
812 /* The event IDs are just tranche numbers. */
813 return GetLWTrancheName(eventId);
814 }
815
816 /*
817 * Internal function that tries to atomically acquire the lwlock in the passed
818 * in mode.
819 *
820 * This function will not block waiting for a lock to become free - that's the
821 * callers job.
822 *
823 * Returns true if the lock isn't free and we need to wait.
824 */
825 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)826 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
827 {
828 uint32 old_state;
829
830 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
831
832 /*
833 * Read once outside the loop, later iterations will get the newer value
834 * via compare & exchange.
835 */
836 old_state = pg_atomic_read_u32(&lock->state);
837
838 /* loop until we've determined whether we could acquire the lock or not */
839 while (true)
840 {
841 uint32 desired_state;
842 bool lock_free;
843
844 desired_state = old_state;
845
846 if (mode == LW_EXCLUSIVE)
847 {
848 lock_free = (old_state & LW_LOCK_MASK) == 0;
849 if (lock_free)
850 desired_state += LW_VAL_EXCLUSIVE;
851 }
852 else
853 {
854 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
855 if (lock_free)
856 desired_state += LW_VAL_SHARED;
857 }
858
859 /*
860 * Attempt to swap in the state we are expecting. If we didn't see
861 * lock to be free, that's just the old value. If we saw it as free,
862 * we'll attempt to mark it acquired. The reason that we always swap
863 * in the value is that this doubles as a memory barrier. We could try
864 * to be smarter and only swap in values if we saw the lock as free,
865 * but benchmark haven't shown it as beneficial so far.
866 *
867 * Retry if the value changed since we last looked at it.
868 */
869 if (pg_atomic_compare_exchange_u32(&lock->state,
870 &old_state, desired_state))
871 {
872 if (lock_free)
873 {
874 /* Great! Got the lock. */
875 #ifdef LOCK_DEBUG
876 if (mode == LW_EXCLUSIVE)
877 lock->owner = MyProc;
878 #endif
879 return false;
880 }
881 else
882 return true; /* somebody else has the lock */
883 }
884 }
885 pg_unreachable();
886 }
887
888 /*
889 * Lock the LWLock's wait list against concurrent activity.
890 *
891 * NB: even though the wait list is locked, non-conflicting lock operations
892 * may still happen concurrently.
893 *
894 * Time spent holding mutex should be short!
895 */
896 static void
LWLockWaitListLock(LWLock * lock)897 LWLockWaitListLock(LWLock *lock)
898 {
899 uint32 old_state;
900 #ifdef LWLOCK_STATS
901 lwlock_stats *lwstats;
902 uint32 delays = 0;
903
904 lwstats = get_lwlock_stats_entry(lock);
905 #endif
906
907 while (true)
908 {
909 /* always try once to acquire lock directly */
910 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
911 if (!(old_state & LW_FLAG_LOCKED))
912 break; /* got lock */
913
914 /* and then spin without atomic operations until lock is released */
915 {
916 SpinDelayStatus delayStatus;
917
918 init_local_spin_delay(&delayStatus);
919
920 while (old_state & LW_FLAG_LOCKED)
921 {
922 perform_spin_delay(&delayStatus);
923 old_state = pg_atomic_read_u32(&lock->state);
924 }
925 #ifdef LWLOCK_STATS
926 delays += delayStatus.delays;
927 #endif
928 finish_spin_delay(&delayStatus);
929 }
930
931 /*
932 * Retry. The lock might obviously already be re-acquired by the time
933 * we're attempting to get it again.
934 */
935 }
936
937 #ifdef LWLOCK_STATS
938 lwstats->spin_delay_count += delays;
939 #endif
940 }
941
942 /*
943 * Unlock the LWLock's wait list.
944 *
945 * Note that it can be more efficient to manipulate flags and release the
946 * locks in a single atomic operation.
947 */
948 static void
LWLockWaitListUnlock(LWLock * lock)949 LWLockWaitListUnlock(LWLock *lock)
950 {
951 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
952
953 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
954
955 Assert(old_state & LW_FLAG_LOCKED);
956 }
957
958 /*
959 * Wakeup all the lockers that currently have a chance to acquire the lock.
960 */
961 static void
LWLockWakeup(LWLock * lock)962 LWLockWakeup(LWLock *lock)
963 {
964 bool new_release_ok;
965 bool wokeup_somebody = false;
966 proclist_head wakeup;
967 proclist_mutable_iter iter;
968
969 proclist_init(&wakeup);
970
971 new_release_ok = true;
972
973 /* lock wait list while collecting backends to wake up */
974 LWLockWaitListLock(lock);
975
976 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
977 {
978 PGPROC *waiter = GetPGProcByNumber(iter.cur);
979
980 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
981 continue;
982
983 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
984 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
985
986 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
987 {
988 /*
989 * Prevent additional wakeups until retryer gets to run. Backends
990 * that are just waiting for the lock to become free don't retry
991 * automatically.
992 */
993 new_release_ok = false;
994
995 /*
996 * Don't wakeup (further) exclusive locks.
997 */
998 wokeup_somebody = true;
999 }
1000
1001 /*
1002 * Once we've woken up an exclusive lock, there's no point in waking
1003 * up anybody else.
1004 */
1005 if (waiter->lwWaitMode == LW_EXCLUSIVE)
1006 break;
1007 }
1008
1009 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
1010
1011 /* unset required flags, and release lock, in one fell swoop */
1012 {
1013 uint32 old_state;
1014 uint32 desired_state;
1015
1016 old_state = pg_atomic_read_u32(&lock->state);
1017 while (true)
1018 {
1019 desired_state = old_state;
1020
1021 /* compute desired flags */
1022
1023 if (new_release_ok)
1024 desired_state |= LW_FLAG_RELEASE_OK;
1025 else
1026 desired_state &= ~LW_FLAG_RELEASE_OK;
1027
1028 if (proclist_is_empty(&wakeup))
1029 desired_state &= ~LW_FLAG_HAS_WAITERS;
1030
1031 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
1032
1033 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
1034 desired_state))
1035 break;
1036 }
1037 }
1038
1039 /* Awaken any waiters I removed from the queue. */
1040 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1041 {
1042 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1043
1044 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1045 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1046
1047 /*
1048 * Guarantee that lwWaiting being unset only becomes visible once the
1049 * unlink from the link has completed. Otherwise the target backend
1050 * could be woken up for other reason and enqueue for a new lock - if
1051 * that happens before the list unlink happens, the list would end up
1052 * being corrupted.
1053 *
1054 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1055 * another lock.
1056 */
1057 pg_write_barrier();
1058 waiter->lwWaiting = false;
1059 PGSemaphoreUnlock(waiter->sem);
1060 }
1061 }
1062
1063 /*
1064 * Add ourselves to the end of the queue.
1065 *
1066 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1067 */
1068 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)1069 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1070 {
1071 /*
1072 * If we don't have a PGPROC structure, there's no way to wait. This
1073 * should never occur, since MyProc should only be null during shared
1074 * memory initialization.
1075 */
1076 if (MyProc == NULL)
1077 elog(PANIC, "cannot wait without a PGPROC structure");
1078
1079 if (MyProc->lwWaiting)
1080 elog(PANIC, "queueing for lock while waiting on another one");
1081
1082 LWLockWaitListLock(lock);
1083
1084 /* setting the flag is protected by the spinlock */
1085 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1086
1087 MyProc->lwWaiting = true;
1088 MyProc->lwWaitMode = mode;
1089
1090 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1091 if (mode == LW_WAIT_UNTIL_FREE)
1092 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1093 else
1094 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1095
1096 /* Can release the mutex now */
1097 LWLockWaitListUnlock(lock);
1098
1099 #ifdef LOCK_DEBUG
1100 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1101 #endif
1102
1103 }
1104
1105 /*
1106 * Remove ourselves from the waitlist.
1107 *
1108 * This is used if we queued ourselves because we thought we needed to sleep
1109 * but, after further checking, we discovered that we don't actually need to
1110 * do so.
1111 */
1112 static void
LWLockDequeueSelf(LWLock * lock)1113 LWLockDequeueSelf(LWLock *lock)
1114 {
1115 bool found = false;
1116 proclist_mutable_iter iter;
1117
1118 #ifdef LWLOCK_STATS
1119 lwlock_stats *lwstats;
1120
1121 lwstats = get_lwlock_stats_entry(lock);
1122
1123 lwstats->dequeue_self_count++;
1124 #endif
1125
1126 LWLockWaitListLock(lock);
1127
1128 /*
1129 * Can't just remove ourselves from the list, but we need to iterate over
1130 * all entries as somebody else could have dequeued us.
1131 */
1132 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1133 {
1134 if (iter.cur == MyProc->pgprocno)
1135 {
1136 found = true;
1137 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1138 break;
1139 }
1140 }
1141
1142 if (proclist_is_empty(&lock->waiters) &&
1143 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1144 {
1145 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1146 }
1147
1148 /* XXX: combine with fetch_and above? */
1149 LWLockWaitListUnlock(lock);
1150
1151 /* clear waiting state again, nice for debugging */
1152 if (found)
1153 MyProc->lwWaiting = false;
1154 else
1155 {
1156 int extraWaits = 0;
1157
1158 /*
1159 * Somebody else dequeued us and has or will wake us up. Deal with the
1160 * superfluous absorption of a wakeup.
1161 */
1162
1163 /*
1164 * Reset RELEASE_OK flag if somebody woke us before we removed
1165 * ourselves - they'll have set it to false.
1166 */
1167 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1168
1169 /*
1170 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1171 * get reset at some inconvenient point later. Most of the time this
1172 * will immediately return.
1173 */
1174 for (;;)
1175 {
1176 PGSemaphoreLock(MyProc->sem);
1177 if (!MyProc->lwWaiting)
1178 break;
1179 extraWaits++;
1180 }
1181
1182 /*
1183 * Fix the process wait semaphore's count for any absorbed wakeups.
1184 */
1185 while (extraWaits-- > 0)
1186 PGSemaphoreUnlock(MyProc->sem);
1187 }
1188
1189 #ifdef LOCK_DEBUG
1190 {
1191 /* not waiting anymore */
1192 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1193
1194 Assert(nwaiters < MAX_BACKENDS);
1195 }
1196 #endif
1197 }
1198
1199 /*
1200 * LWLockAcquire - acquire a lightweight lock in the specified mode
1201 *
1202 * If the lock is not available, sleep until it is. Returns true if the lock
1203 * was available immediately, false if we had to sleep.
1204 *
1205 * Side effect: cancel/die interrupts are held off until lock release.
1206 */
1207 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1208 LWLockAcquire(LWLock *lock, LWLockMode mode)
1209 {
1210 PGPROC *proc = MyProc;
1211 bool result = true;
1212 int extraWaits = 0;
1213 #ifdef LWLOCK_STATS
1214 lwlock_stats *lwstats;
1215
1216 lwstats = get_lwlock_stats_entry(lock);
1217 #endif
1218
1219 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1220
1221 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1222
1223 #ifdef LWLOCK_STATS
1224 /* Count lock acquisition attempts */
1225 if (mode == LW_EXCLUSIVE)
1226 lwstats->ex_acquire_count++;
1227 else
1228 lwstats->sh_acquire_count++;
1229 #endif /* LWLOCK_STATS */
1230
1231 /*
1232 * We can't wait if we haven't got a PGPROC. This should only occur
1233 * during bootstrap or shared memory initialization. Put an Assert here
1234 * to catch unsafe coding practices.
1235 */
1236 Assert(!(proc == NULL && IsUnderPostmaster));
1237
1238 /* Ensure we will have room to remember the lock */
1239 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1240 elog(ERROR, "too many LWLocks taken");
1241
1242 /*
1243 * Lock out cancel/die interrupts until we exit the code section protected
1244 * by the LWLock. This ensures that interrupts will not interfere with
1245 * manipulations of data structures in shared memory.
1246 */
1247 HOLD_INTERRUPTS();
1248
1249 /*
1250 * Loop here to try to acquire lock after each time we are signaled by
1251 * LWLockRelease.
1252 *
1253 * NOTE: it might seem better to have LWLockRelease actually grant us the
1254 * lock, rather than retrying and possibly having to go back to sleep. But
1255 * in practice that is no good because it means a process swap for every
1256 * lock acquisition when two or more processes are contending for the same
1257 * lock. Since LWLocks are normally used to protect not-very-long
1258 * sections of computation, a process needs to be able to acquire and
1259 * release the same lock many times during a single CPU time slice, even
1260 * in the presence of contention. The efficiency of being able to do that
1261 * outweighs the inefficiency of sometimes wasting a process dispatch
1262 * cycle because the lock is not free when a released waiter finally gets
1263 * to run. See pgsql-hackers archives for 29-Dec-01.
1264 */
1265 for (;;)
1266 {
1267 bool mustwait;
1268
1269 /*
1270 * Try to grab the lock the first time, we're not in the waitqueue
1271 * yet/anymore.
1272 */
1273 mustwait = LWLockAttemptLock(lock, mode);
1274
1275 if (!mustwait)
1276 {
1277 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1278 break; /* got the lock */
1279 }
1280
1281 /*
1282 * Ok, at this point we couldn't grab the lock on the first try. We
1283 * cannot simply queue ourselves to the end of the list and wait to be
1284 * woken up because by now the lock could long have been released.
1285 * Instead add us to the queue and try to grab the lock again. If we
1286 * succeed we need to revert the queuing and be happy, otherwise we
1287 * recheck the lock. If we still couldn't grab it, we know that the
1288 * other locker will see our queue entries when releasing since they
1289 * existed before we checked for the lock.
1290 */
1291
1292 /* add to the queue */
1293 LWLockQueueSelf(lock, mode);
1294
1295 /* we're now guaranteed to be woken up if necessary */
1296 mustwait = LWLockAttemptLock(lock, mode);
1297
1298 /* ok, grabbed the lock the second time round, need to undo queueing */
1299 if (!mustwait)
1300 {
1301 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1302
1303 LWLockDequeueSelf(lock);
1304 break;
1305 }
1306
1307 /*
1308 * Wait until awakened.
1309 *
1310 * It is possible that we get awakened for a reason other than being
1311 * signaled by LWLockRelease. If so, loop back and wait again. Once
1312 * we've gotten the LWLock, re-increment the sema by the number of
1313 * additional signals received.
1314 */
1315 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1316
1317 #ifdef LWLOCK_STATS
1318 lwstats->block_count++;
1319 #endif
1320
1321 LWLockReportWaitStart(lock);
1322 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1323 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1324
1325 for (;;)
1326 {
1327 PGSemaphoreLock(proc->sem);
1328 if (!proc->lwWaiting)
1329 break;
1330 extraWaits++;
1331 }
1332
1333 /* Retrying, allow LWLockRelease to release waiters again. */
1334 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1335
1336 #ifdef LOCK_DEBUG
1337 {
1338 /* not waiting anymore */
1339 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1340
1341 Assert(nwaiters < MAX_BACKENDS);
1342 }
1343 #endif
1344
1345 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1346 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1347 LWLockReportWaitEnd();
1348
1349 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1350
1351 /* Now loop back and try to acquire lock again. */
1352 result = false;
1353 }
1354
1355 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1356 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1357
1358 /* Add lock to list of locks held by this backend */
1359 held_lwlocks[num_held_lwlocks].lock = lock;
1360 held_lwlocks[num_held_lwlocks++].mode = mode;
1361
1362 /*
1363 * Fix the process wait semaphore's count for any absorbed wakeups.
1364 */
1365 while (extraWaits-- > 0)
1366 PGSemaphoreUnlock(proc->sem);
1367
1368 return result;
1369 }
1370
1371 /*
1372 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1373 *
1374 * If the lock is not available, return false with no side-effects.
1375 *
1376 * If successful, cancel/die interrupts are held off until lock release.
1377 */
1378 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1379 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1380 {
1381 bool mustwait;
1382
1383 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1384
1385 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1386
1387 /* Ensure we will have room to remember the lock */
1388 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1389 elog(ERROR, "too many LWLocks taken");
1390
1391 /*
1392 * Lock out cancel/die interrupts until we exit the code section protected
1393 * by the LWLock. This ensures that interrupts will not interfere with
1394 * manipulations of data structures in shared memory.
1395 */
1396 HOLD_INTERRUPTS();
1397
1398 /* Check for the lock */
1399 mustwait = LWLockAttemptLock(lock, mode);
1400
1401 if (mustwait)
1402 {
1403 /* Failed to get lock, so release interrupt holdoff */
1404 RESUME_INTERRUPTS();
1405
1406 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1407 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1408 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1409 }
1410 else
1411 {
1412 /* Add lock to list of locks held by this backend */
1413 held_lwlocks[num_held_lwlocks].lock = lock;
1414 held_lwlocks[num_held_lwlocks++].mode = mode;
1415 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1416 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1417 }
1418 return !mustwait;
1419 }
1420
1421 /*
1422 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1423 *
1424 * The semantics of this function are a bit funky. If the lock is currently
1425 * free, it is acquired in the given mode, and the function returns true. If
1426 * the lock isn't immediately free, the function waits until it is released
1427 * and returns false, but does not acquire the lock.
1428 *
1429 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1430 * holding WALWriteLock, it can flush the commit records of many other
1431 * backends as a side-effect. Those other backends need to wait until the
1432 * flush finishes, but don't need to acquire the lock anymore. They can just
1433 * wake up, observe that their records have already been flushed, and return.
1434 */
1435 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1436 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1437 {
1438 PGPROC *proc = MyProc;
1439 bool mustwait;
1440 int extraWaits = 0;
1441 #ifdef LWLOCK_STATS
1442 lwlock_stats *lwstats;
1443
1444 lwstats = get_lwlock_stats_entry(lock);
1445 #endif
1446
1447 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1448
1449 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1450
1451 /* Ensure we will have room to remember the lock */
1452 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1453 elog(ERROR, "too many LWLocks taken");
1454
1455 /*
1456 * Lock out cancel/die interrupts until we exit the code section protected
1457 * by the LWLock. This ensures that interrupts will not interfere with
1458 * manipulations of data structures in shared memory.
1459 */
1460 HOLD_INTERRUPTS();
1461
1462 /*
1463 * NB: We're using nearly the same twice-in-a-row lock acquisition
1464 * protocol as LWLockAcquire(). Check its comments for details.
1465 */
1466 mustwait = LWLockAttemptLock(lock, mode);
1467
1468 if (mustwait)
1469 {
1470 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1471
1472 mustwait = LWLockAttemptLock(lock, mode);
1473
1474 if (mustwait)
1475 {
1476 /*
1477 * Wait until awakened. Like in LWLockAcquire, be prepared for
1478 * bogus wakeups.
1479 */
1480 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1481
1482 #ifdef LWLOCK_STATS
1483 lwstats->block_count++;
1484 #endif
1485
1486 LWLockReportWaitStart(lock);
1487 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1488 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1489
1490 for (;;)
1491 {
1492 PGSemaphoreLock(proc->sem);
1493 if (!proc->lwWaiting)
1494 break;
1495 extraWaits++;
1496 }
1497
1498 #ifdef LOCK_DEBUG
1499 {
1500 /* not waiting anymore */
1501 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1502
1503 Assert(nwaiters < MAX_BACKENDS);
1504 }
1505 #endif
1506 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1507 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1508 LWLockReportWaitEnd();
1509
1510 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1511 }
1512 else
1513 {
1514 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1515
1516 /*
1517 * Got lock in the second attempt, undo queueing. We need to treat
1518 * this as having successfully acquired the lock, otherwise we'd
1519 * not necessarily wake up people we've prevented from acquiring
1520 * the lock.
1521 */
1522 LWLockDequeueSelf(lock);
1523 }
1524 }
1525
1526 /*
1527 * Fix the process wait semaphore's count for any absorbed wakeups.
1528 */
1529 while (extraWaits-- > 0)
1530 PGSemaphoreUnlock(proc->sem);
1531
1532 if (mustwait)
1533 {
1534 /* Failed to get lock, so release interrupt holdoff */
1535 RESUME_INTERRUPTS();
1536 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1537 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1538 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1539 }
1540 else
1541 {
1542 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1543 /* Add lock to list of locks held by this backend */
1544 held_lwlocks[num_held_lwlocks].lock = lock;
1545 held_lwlocks[num_held_lwlocks++].mode = mode;
1546 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1547 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1548 }
1549
1550 return !mustwait;
1551 }
1552
1553 /*
1554 * Does the lwlock in its current state need to wait for the variable value to
1555 * change?
1556 *
1557 * If we don't need to wait, and it's because the value of the variable has
1558 * changed, store the current value in newval.
1559 *
1560 * *result is set to true if the lock was free, and false otherwise.
1561 */
1562 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1563 LWLockConflictsWithVar(LWLock *lock,
1564 uint64 *valptr, uint64 oldval, uint64 *newval,
1565 bool *result)
1566 {
1567 bool mustwait;
1568 uint64 value;
1569
1570 /*
1571 * Test first to see if it the slot is free right now.
1572 *
1573 * XXX: the caller uses a spinlock before this, so we don't need a memory
1574 * barrier here as far as the current usage is concerned. But that might
1575 * not be safe in general.
1576 */
1577 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1578
1579 if (!mustwait)
1580 {
1581 *result = true;
1582 return false;
1583 }
1584
1585 *result = false;
1586
1587 /*
1588 * Read value using the lwlock's wait list lock, as we can't generally
1589 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1590 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1591 */
1592 LWLockWaitListLock(lock);
1593 value = *valptr;
1594 LWLockWaitListUnlock(lock);
1595
1596 if (value != oldval)
1597 {
1598 mustwait = false;
1599 *newval = value;
1600 }
1601 else
1602 {
1603 mustwait = true;
1604 }
1605
1606 return mustwait;
1607 }
1608
1609 /*
1610 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1611 *
1612 * If the lock is held and *valptr equals oldval, waits until the lock is
1613 * either freed, or the lock holder updates *valptr by calling
1614 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1615 * waiting), returns true. If the lock is still held, but *valptr no longer
1616 * matches oldval, returns false and sets *newval to the current value in
1617 * *valptr.
1618 *
1619 * Note: this function ignores shared lock holders; if the lock is held
1620 * in shared mode, returns 'true'.
1621 */
1622 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1623 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1624 {
1625 PGPROC *proc = MyProc;
1626 int extraWaits = 0;
1627 bool result = false;
1628 #ifdef LWLOCK_STATS
1629 lwlock_stats *lwstats;
1630
1631 lwstats = get_lwlock_stats_entry(lock);
1632 #endif
1633
1634 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1635
1636 /*
1637 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1638 * cleanup mechanism to remove us from the wait queue if we got
1639 * interrupted.
1640 */
1641 HOLD_INTERRUPTS();
1642
1643 /*
1644 * Loop here to check the lock's status after each time we are signaled.
1645 */
1646 for (;;)
1647 {
1648 bool mustwait;
1649
1650 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1651 &result);
1652
1653 if (!mustwait)
1654 break; /* the lock was free or value didn't match */
1655
1656 /*
1657 * Add myself to wait queue. Note that this is racy, somebody else
1658 * could wakeup before we're finished queuing. NB: We're using nearly
1659 * the same twice-in-a-row lock acquisition protocol as
1660 * LWLockAcquire(). Check its comments for details. The only
1661 * difference is that we also have to check the variable's values when
1662 * checking the state of the lock.
1663 */
1664 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1665
1666 /*
1667 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1668 * lock is released.
1669 */
1670 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1671
1672 /*
1673 * We're now guaranteed to be woken up if necessary. Recheck the lock
1674 * and variables state.
1675 */
1676 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1677 &result);
1678
1679 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1680 if (!mustwait)
1681 {
1682 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1683
1684 LWLockDequeueSelf(lock);
1685 break;
1686 }
1687
1688 /*
1689 * Wait until awakened.
1690 *
1691 * It is possible that we get awakened for a reason other than being
1692 * signaled by LWLockRelease. If so, loop back and wait again. Once
1693 * we've gotten the LWLock, re-increment the sema by the number of
1694 * additional signals received.
1695 */
1696 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1697
1698 #ifdef LWLOCK_STATS
1699 lwstats->block_count++;
1700 #endif
1701
1702 LWLockReportWaitStart(lock);
1703 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1704 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1705
1706 for (;;)
1707 {
1708 PGSemaphoreLock(proc->sem);
1709 if (!proc->lwWaiting)
1710 break;
1711 extraWaits++;
1712 }
1713
1714 #ifdef LOCK_DEBUG
1715 {
1716 /* not waiting anymore */
1717 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1718
1719 Assert(nwaiters < MAX_BACKENDS);
1720 }
1721 #endif
1722
1723 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1724 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1725 LWLockReportWaitEnd();
1726
1727 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1728
1729 /* Now loop back and check the status of the lock again. */
1730 }
1731
1732 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1733 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1734
1735 /*
1736 * Fix the process wait semaphore's count for any absorbed wakeups.
1737 */
1738 while (extraWaits-- > 0)
1739 PGSemaphoreUnlock(proc->sem);
1740
1741 /*
1742 * Now okay to allow cancel/die interrupts.
1743 */
1744 RESUME_INTERRUPTS();
1745
1746 return result;
1747 }
1748
1749
1750 /*
1751 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1752 *
1753 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1754 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1755 * atomically so that any process calling LWLockWaitForVar() on the same lock
1756 * is guaranteed to see the new value, and act accordingly.
1757 *
1758 * The caller must be holding the lock in exclusive mode.
1759 */
1760 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1761 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1762 {
1763 proclist_head wakeup;
1764 proclist_mutable_iter iter;
1765
1766 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1767
1768 proclist_init(&wakeup);
1769
1770 LWLockWaitListLock(lock);
1771
1772 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1773
1774 /* Update the lock's value */
1775 *valptr = val;
1776
1777 /*
1778 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1779 * up. They are always in the front of the queue.
1780 */
1781 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1782 {
1783 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1784
1785 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1786 break;
1787
1788 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1789 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1790 }
1791
1792 /* We are done updating shared state of the lock itself. */
1793 LWLockWaitListUnlock(lock);
1794
1795 /*
1796 * Awaken any waiters I removed from the queue.
1797 */
1798 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1799 {
1800 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1801
1802 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1803 /* check comment in LWLockWakeup() about this barrier */
1804 pg_write_barrier();
1805 waiter->lwWaiting = false;
1806 PGSemaphoreUnlock(waiter->sem);
1807 }
1808 }
1809
1810
1811 /*
1812 * LWLockRelease - release a previously acquired lock
1813 */
1814 void
LWLockRelease(LWLock * lock)1815 LWLockRelease(LWLock *lock)
1816 {
1817 LWLockMode mode;
1818 uint32 oldstate;
1819 bool check_waiters;
1820 int i;
1821
1822 /*
1823 * Remove lock from list of locks held. Usually, but not always, it will
1824 * be the latest-acquired lock; so search array backwards.
1825 */
1826 for (i = num_held_lwlocks; --i >= 0;)
1827 if (lock == held_lwlocks[i].lock)
1828 break;
1829
1830 if (i < 0)
1831 elog(ERROR, "lock %s is not held", T_NAME(lock));
1832
1833 mode = held_lwlocks[i].mode;
1834
1835 num_held_lwlocks--;
1836 for (; i < num_held_lwlocks; i++)
1837 held_lwlocks[i] = held_lwlocks[i + 1];
1838
1839 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1840
1841 /*
1842 * Release my hold on lock, after that it can immediately be acquired by
1843 * others, even if we still have to wakeup other waiters.
1844 */
1845 if (mode == LW_EXCLUSIVE)
1846 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1847 else
1848 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1849
1850 /* nobody else can have that kind of lock */
1851 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1852
1853
1854 /*
1855 * We're still waiting for backends to get scheduled, don't wake them up
1856 * again.
1857 */
1858 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1859 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1860 (oldstate & LW_LOCK_MASK) == 0)
1861 check_waiters = true;
1862 else
1863 check_waiters = false;
1864
1865 /*
1866 * As waking up waiters requires the spinlock to be acquired, only do so
1867 * if necessary.
1868 */
1869 if (check_waiters)
1870 {
1871 /* XXX: remove before commit? */
1872 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1873 LWLockWakeup(lock);
1874 }
1875
1876 if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1877 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1878
1879 /*
1880 * Now okay to allow cancel/die interrupts.
1881 */
1882 RESUME_INTERRUPTS();
1883 }
1884
1885 /*
1886 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1887 */
1888 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1889 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1890 {
1891 LWLockWaitListLock(lock);
1892
1893 /*
1894 * Set the variable's value before releasing the lock, that prevents race
1895 * a race condition wherein a new locker acquires the lock, but hasn't yet
1896 * set the variables value.
1897 */
1898 *valptr = val;
1899 LWLockWaitListUnlock(lock);
1900
1901 LWLockRelease(lock);
1902 }
1903
1904
1905 /*
1906 * LWLockReleaseAll - release all currently-held locks
1907 *
1908 * Used to clean up after ereport(ERROR). An important difference between this
1909 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1910 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1911 * has been set to an appropriate level earlier in error recovery. We could
1912 * decrement it below zero if we allow it to drop for each released lock!
1913 */
1914 void
LWLockReleaseAll(void)1915 LWLockReleaseAll(void)
1916 {
1917 while (num_held_lwlocks > 0)
1918 {
1919 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1920
1921 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1922 }
1923 }
1924
1925
1926 /*
1927 * LWLockHeldByMe - test whether my process holds a lock in any mode
1928 *
1929 * This is meant as debug support only.
1930 */
1931 bool
LWLockHeldByMe(LWLock * l)1932 LWLockHeldByMe(LWLock *l)
1933 {
1934 int i;
1935
1936 for (i = 0; i < num_held_lwlocks; i++)
1937 {
1938 if (held_lwlocks[i].lock == l)
1939 return true;
1940 }
1941 return false;
1942 }
1943
1944 /*
1945 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1946 *
1947 * This is meant as debug support only.
1948 */
1949 bool
LWLockHeldByMeInMode(LWLock * l,LWLockMode mode)1950 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1951 {
1952 int i;
1953
1954 for (i = 0; i < num_held_lwlocks; i++)
1955 {
1956 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1957 return true;
1958 }
1959 return false;
1960 }
1961