1 /*-------------------------------------------------------------------------
2 *
3 * lwlock.c
4 * Lightweight lock manager
5 *
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
12 *
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
22 *
23 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
28 *
29 * NOTES:
30 *
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
37 *
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
40 *
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
46 *
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 *
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
54 *
emit_nil(&mut self) -> Result<(), Self::Error>55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
58 *
59 *
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
65
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
72 *
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
76 */
77 #include "postgres.h"
78
79 #include "miscadmin.h"
80 #include "pgstat.h"
81 #include "pg_trace.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94
95
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98
99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
102
emit_char(&mut self, v: char) -> Result<(), Self::Error>103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104 #define LW_VAL_SHARED 1
105
106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
109
110 /*
111 * This is indexed by tranche ID and stores the names of all tranches known
112 * to the current backend.
113 */
114 static const char **LWLockTrancheArray = NULL;
115 static int LWLockTranchesAllocated = 0;
116
117 #define T_NAME(lock) \
118 (LWLockTrancheArray[(lock)->tranche])
119
120 /*
121 * This points to the main array of LWLocks in shared memory. Backends inherit
122 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
123 * where we have special measures to pass it down).
124 */
125 LWLockPadded *MainLWLockArray = NULL;
126
127 /*
128 * We use this structure to keep track of locked LWLocks for release
129 * during error recovery. Normally, only a few will be held at once, but
130 * occasionally the number can be much higher; for example, the pg_buffercache
131 * extension locks all buffer partitions simultaneously.
132 */
133 #define MAX_SIMUL_LWLOCKS 200
134
135 /* struct representing the LWLocks we're holding */
136 typedef struct LWLockHandle
137 {
138 LWLock *lock;
139 LWLockMode mode;
140 } LWLockHandle;
141
142 static int num_held_lwlocks = 0;
143 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
144
145 /* struct representing the LWLock tranche request for named tranche */
146 typedef struct NamedLWLockTrancheRequest
147 {
148 char tranche_name[NAMEDATALEN];
149 int num_lwlocks;
150 } NamedLWLockTrancheRequest;
151
152 NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
153 static int NamedLWLockTrancheRequestsAllocated = 0;
154 int NamedLWLockTrancheRequests = 0;
155
156 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
157
158 static bool lock_named_request_allowed = true;
159
160 static void InitializeLWLocks(void);
161 static void RegisterLWLockTranches(void);
162
163 static inline void LWLockReportWaitStart(LWLock *lock);
164 static inline void LWLockReportWaitEnd(void);
emit_enum_variant<F>(&mut self, v_name: &str, v_id: usize, len: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>165
166 #ifdef LWLOCK_STATS
167 typedef struct lwlock_stats_key
168 {
169 int tranche;
170 void *instance;
171 } lwlock_stats_key;
172
173 typedef struct lwlock_stats
174 {
175 lwlock_stats_key key;
176 int sh_acquire_count;
177 int ex_acquire_count;
178 int block_count;
179 int dequeue_self_count;
180 int spin_delay_count;
181 } lwlock_stats;
emit_enum_variant_arg<F>(&mut self, a_idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>182
183 static HTAB *lwlock_stats_htab;
184 static lwlock_stats lwlock_stats_dummy;
185 #endif
186
187 #ifdef LOCK_DEBUG
188 bool Trace_lwlocks = false;
189
190 inline static void
191 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
192 {
193 /* hide statement & context here, otherwise the log is just too verbose */
194 if (Trace_lwlocks)
195 {
196 uint32 state = pg_atomic_read_u32(&lock->state);
197
198 ereport(LOG,
199 (errhidestmt(true),
200 errhidecontext(true),
201 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
202 MyProcPid,
203 where, T_NAME(lock), lock,
204 (state & LW_VAL_EXCLUSIVE) != 0,
205 state & LW_SHARED_MASK,
206 (state & LW_FLAG_HAS_WAITERS) != 0,
207 pg_atomic_read_u32(&lock->nwaiters),
208 (state & LW_FLAG_RELEASE_OK) != 0)));
209 }
210 }
211
212 inline static void
213 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
214 {
215 /* hide statement & context here, otherwise the log is just too verbose */
216 if (Trace_lwlocks)
217 {
218 ereport(LOG,
219 (errhidestmt(true),
220 errhidecontext(true),
221 errmsg_internal("%s(%s %p): %s", where,
222 T_NAME(lock), lock, msg)));
223 }
224 }
225
226 #else /* not LOCK_DEBUG */
227 #define PRINT_LWDEBUG(a,b,c) ((void)0)
228 #define LOG_LWDEBUG(a,b,c) ((void)0)
229 #endif /* LOCK_DEBUG */
230
emit_enum_struct_variant<F>(&mut self, v_name: &str, v_id: usize, len: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>231 #ifdef LWLOCK_STATS
232
233 static void init_lwlock_stats(void);
234 static void print_lwlock_stats(int code, Datum arg);
235 static lwlock_stats * get_lwlock_stats_entry(LWLock *lockid);
236
237 static void
238 init_lwlock_stats(void)
239 {
240 HASHCTL ctl;
241 static MemoryContext lwlock_stats_cxt = NULL;
242 static bool exit_registered = false;
243
244 if (lwlock_stats_cxt != NULL)
245 MemoryContextDelete(lwlock_stats_cxt);
246
247 /*
248 * The LWLock stats will be updated within a critical section, which
emit_enum_struct_variant_field<F>(&mut self, f_name: &str, f_idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>249 * requires allocating new hash entries. Allocations within a critical
250 * section are normally not allowed because running out of memory would
251 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
252 * turned on in production, so that's an acceptable risk. The hash entries
253 * are small, so the risk of running out of memory is minimal in practice.
254 */
255 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
256 "LWLock stats",
257 ALLOCSET_DEFAULT_SIZES);
258 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
259
260 MemSet(&ctl, 0, sizeof(ctl));
261 ctl.keysize = sizeof(lwlock_stats_key);
262 ctl.entrysize = sizeof(lwlock_stats);
263 ctl.hcxt = lwlock_stats_cxt;
264 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
265 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
266 if (!exit_registered)
267 {
268 on_shmem_exit(print_lwlock_stats, 0);
269 exit_registered = true;
270 }
271 }
272
273 static void
274 print_lwlock_stats(int code, Datum arg)
275 {
276 HASH_SEQ_STATUS scan;
277 lwlock_stats *lwstats;
278
279 hash_seq_init(&scan, lwlock_stats_htab);
280
281 /* Grab an LWLock to keep different backends from mixing reports */
282 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
283
284 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
285 {
emit_struct<F>(&mut self, name: &str, len: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>286 fprintf(stderr,
287 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
288 MyProcPid, LWLockTrancheArray[lwstats->key.tranche],
289 lwstats->key.instance, lwstats->sh_acquire_count,
290 lwstats->ex_acquire_count, lwstats->block_count,
291 lwstats->spin_delay_count, lwstats->dequeue_self_count);
292 }
293
294 LWLockRelease(&MainLWLockArray[0].lock);
295 }
296
297 static lwlock_stats *
298 get_lwlock_stats_entry(LWLock *lock)
299 {
emit_struct_field<F>(&mut self, f_name: &str, f_idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>300 lwlock_stats_key key;
301 lwlock_stats *lwstats;
302 bool found;
303
304 /*
305 * During shared memory initialization, the hash table doesn't exist yet.
306 * Stats of that phase aren't very interesting, so just collect operations
307 * on all locks in a single dummy entry.
308 */
309 if (lwlock_stats_htab == NULL)
310 return &lwlock_stats_dummy;
311
emit_tuple<F>(&mut self, len: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>312 /* Fetch or create the entry. */
313 MemSet(&key, 0, sizeof(key));
314 key.tranche = lock->tranche;
315 key.instance = lock;
316 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
317 if (!found)
318 {
319 lwstats->sh_acquire_count = 0;
320 lwstats->ex_acquire_count = 0;
321 lwstats->block_count = 0;
322 lwstats->dequeue_self_count = 0;
323 lwstats->spin_delay_count = 0;
324 }
emit_tuple_arg<F>(&mut self, idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>325 return lwstats;
326 }
327 #endif /* LWLOCK_STATS */
328
329
330 /*
331 * Compute number of LWLocks required by named tranches. These will be
332 * allocated in the main array.
333 */
334 static int
335 NumLWLocksByNamedTranches(void)
336 {
337 int numLocks = 0;
338 int i;
339
340 for (i = 0; i < NamedLWLockTrancheRequests; i++)
341 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
342
343 return numLocks;
344 }
345
346 /*
347 * Compute shmem space needed for LWLocks and named tranches.
348 */
349 Size
350 LWLockShmemSize(void)
351 {
352 Size size;
353 int i;
354 int numLocks = NUM_FIXED_LWLOCKS;
355
356 numLocks += NumLWLocksByNamedTranches();
emit_tuple_struct<F>(&mut self, name: &str, len: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>357
358 /* Space for the LWLock array. */
359 size = mul_size(numLocks, sizeof(LWLockPadded));
360
361 /* Space for dynamic allocation counter, plus room for alignment. */
362 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
363
364 /* space for named tranches. */
365 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
366
367 /* space for name of each tranche. */
368 for (i = 0; i < NamedLWLockTrancheRequests; i++)
369 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
370
371 /* Disallow named LWLocks' requests after startup */
emit_tuple_struct_arg<F>(&mut self, f_idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>372 lock_named_request_allowed = false;
373
374 return size;
375 }
376
377 /*
378 * Allocate shmem space for the main LWLock array and all tranches and
379 * initialize it. We also register all the LWLock tranches here.
380 */
381 void
382 CreateLWLocks(void)
383 {
384 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
385 "MAX_BACKENDS too big for lwlock.c");
386
387 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
388 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
emit_option<F>(&mut self, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>389 "Miscalculated LWLock padding");
390
391 if (!IsUnderPostmaster)
392 {
393 Size spaceLocks = LWLockShmemSize();
394 int *LWLockCounter;
emit_option_none(&mut self) -> Result<(), Self::Error>395 char *ptr;
396
397 /* Allocate space */
398 ptr = (char *) ShmemAlloc(spaceLocks);
399
400 /* Leave room for dynamic allocation of tranches */
401 ptr += sizeof(int);
402
emit_option_some<F>(&mut self, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>403 /* Ensure desired alignment of LWLock array */
404 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
405
406 MainLWLockArray = (LWLockPadded *) ptr;
407
408 /*
409 * Initialize the dynamic-allocation counter for tranches, which is
410 * stored just before the first LWLock.
411 */
412 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
413 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
emit_seq<F>(&mut self, len: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>414
415 /* Initialize all LWLocks */
416 InitializeLWLocks();
417 }
418
419 /* Register all LWLock tranches */
420 RegisterLWLockTranches();
421 }
422
423 /*
424 * Initialize LWLocks that are fixed and those belonging to named tranches.
425 */
426 static void
emit_seq_elt<F>(&mut self, idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>427 InitializeLWLocks(void)
428 {
429 int numNamedLocks = NumLWLocksByNamedTranches();
430 int id;
431 int i;
432 int j;
433 LWLockPadded *lock;
434
435 /* Initialize all individual LWLocks in main array */
436 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
437 LWLockInitialize(&lock->lock, id);
438
439 /* Initialize buffer mapping LWLocks in main array */
440 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
441 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
442 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
443
444 /* Initialize lmgrs' LWLocks in main array */
445 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
446 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
447 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
448
449 /* Initialize predicate lmgrs' LWLocks in main array */
450 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
451 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
452 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
453 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
454
455 /* Initialize named tranches. */
456 if (NamedLWLockTrancheRequests > 0)
457 {
458 char *trancheNames;
459
460 NamedLWLockTrancheArray = (NamedLWLockTranche *)
461 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
462
463 trancheNames = (char *) NamedLWLockTrancheArray +
464 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
465 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
466
467 for (i = 0; i < NamedLWLockTrancheRequests; i++)
468 {
469 NamedLWLockTrancheRequest *request;
470 NamedLWLockTranche *tranche;
471 char *name;
472
473 request = &NamedLWLockTrancheRequestArray[i];
474 tranche = &NamedLWLockTrancheArray[i];
475
476 name = trancheNames;
477 trancheNames += strlen(request->tranche_name) + 1;
478 strcpy(name, request->tranche_name);
479 tranche->trancheId = LWLockNewTrancheId();
480 tranche->trancheName = name;
481
482 for (j = 0; j < request->num_lwlocks; j++, lock++)
483 LWLockInitialize(&lock->lock, tranche->trancheId);
484 }
485 }
486 }
emit_map_elt_val<F>(&mut self, idx: usize, f: F) -> Result<(), Self::Error> where F: FnOnce(&mut Self) -> Result<(), Self::Error>487
488 /*
489 * Register named tranches and tranches for fixed LWLocks.
490 */
491 static void
492 RegisterLWLockTranches(void)
493 {
494 int i;
495
496 if (LWLockTrancheArray == NULL)
497 {
498 LWLockTranchesAllocated = 128;
499 LWLockTrancheArray = (const char **)
500 MemoryContextAllocZero(TopMemoryContext,
501 LWLockTranchesAllocated * sizeof(char *));
502 Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED);
503 }
504
505 for (i = 0; i < NUM_INDIVIDUAL_LWLOCKS; ++i)
506 LWLockRegisterTranche(i, MainLWLockNames[i]);
507
508 LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, "buffer_mapping");
509 LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, "lock_manager");
510 LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER,
511 "predicate_lock_manager");
512 LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
513 "parallel_query_dsa");
514 LWLockRegisterTranche(LWTRANCHE_SESSION_DSA,
515 "session_dsa");
516 LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE,
517 "session_record_table");
518 LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
519 "session_typmod_table");
520 LWLockRegisterTranche(LWTRANCHE_SHARED_TUPLESTORE,
521 "shared_tuplestore");
522 LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
523 LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
524 LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
525
526 /* Register named tranches. */
527 for (i = 0; i < NamedLWLockTrancheRequests; i++)
528 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
529 NamedLWLockTrancheArray[i].trancheName);
530 }
531
read_i64(&mut self) -> Result<i64, Self::Error>532 /*
533 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
534 */
535 void
536 InitLWLockAccess(void)
537 {
538 #ifdef LWLOCK_STATS
539 init_lwlock_stats();
540 #endif
541 }
542
543 /*
read_bool(&mut self) -> Result<bool, Self::Error>544 * GetNamedLWLockTranche - returns the base address of LWLock from the
545 * specified tranche.
546 *
547 * Caller needs to retrieve the requested number of LWLocks starting from
548 * the base lock address returned by this API. This can be used for
549 * tranches that are requested by using RequestNamedLWLockTranche() API.
550 */
551 LWLockPadded *
552 GetNamedLWLockTranche(const char *tranche_name)
553 {
554 int lock_pos;
555 int i;
556
557 /*
558 * Obtain the position of base address of LWLock belonging to requested
559 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
560 * in MainLWLockArray after fixed locks.
561 */
562 lock_pos = NUM_FIXED_LWLOCKS;
563 for (i = 0; i < NamedLWLockTrancheRequests; i++)
564 {
565 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
566 tranche_name) == 0)
567 return &MainLWLockArray[lock_pos];
568
569 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
570 }
571
572 if (i >= NamedLWLockTrancheRequests)
573 elog(ERROR, "requested tranche is not registered");
574
575 /* just to keep compiler quiet */
576 return NULL;
577 }
578
579 /*
580 * Allocate a new tranche ID.
581 */
582 int
583 LWLockNewTrancheId(void)
584 {
585 int result;
586 int *LWLockCounter;
587
588 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
589 SpinLockAcquire(ShmemLock);
590 result = (*LWLockCounter)++;
591 SpinLockRelease(ShmemLock);
592
read_enum_variant_arg<T, F>(&mut self, a_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>593 return result;
594 }
595
596 /*
597 * Register a tranche ID in the lookup table for the current process. This
598 * routine will save a pointer to the tranche name passed as an argument,
599 * so the name should be allocated in a backend-lifetime context
600 * (TopMemoryContext, static variable, or similar).
read_enum_struct_variant<T, F>(&mut self, names: &[&str], f: F) -> Result<T, Self::Error> where F: FnMut(&mut Self, usize) -> Result<T, Self::Error>601 */
602 void
603 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
604 {
605 Assert(LWLockTrancheArray != NULL);
606
607 if (tranche_id >= LWLockTranchesAllocated)
608 {
609 int i = LWLockTranchesAllocated;
610 int j = LWLockTranchesAllocated;
611
612 while (i <= tranche_id)
613 i *= 2;
614
615 LWLockTrancheArray = (const char **)
616 repalloc(LWLockTrancheArray, i * sizeof(char *));
617 LWLockTranchesAllocated = i;
618 while (j < LWLockTranchesAllocated)
619 LWLockTrancheArray[j++] = NULL;
620 }
621
622 LWLockTrancheArray[tranche_id] = tranche_name;
623 }
624
625 /*
626 * RequestNamedLWLockTranche
627 * Request that extra LWLocks be allocated during postmaster
628 * startup.
629 *
630 * This is only useful for extensions if called from the _PG_init hook
631 * of a library that is loaded into the postmaster via
632 * shared_preload_libraries. Once shared memory has been allocated, calls
633 * will be ignored. (We could raise an error, but it seems better to make
634 * it a no-op, so that libraries containing such calls can be reloaded if
635 * needed.)
read_struct<T, F>(&mut self, s_name: &str, len: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>636 */
637 void
638 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
639 {
640 NamedLWLockTrancheRequest *request;
641
642 if (IsUnderPostmaster || !lock_named_request_allowed)
643 return; /* too late */
644
645 if (NamedLWLockTrancheRequestArray == NULL)
646 {
647 NamedLWLockTrancheRequestsAllocated = 16;
648 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
649 MemoryContextAlloc(TopMemoryContext,
650 NamedLWLockTrancheRequestsAllocated
651 * sizeof(NamedLWLockTrancheRequest));
652 }
653
654 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
655 {
656 int i = NamedLWLockTrancheRequestsAllocated;
657
658 while (i <= NamedLWLockTrancheRequests)
659 i *= 2;
660
661 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
662 repalloc(NamedLWLockTrancheRequestArray,
663 i * sizeof(NamedLWLockTrancheRequest));
664 NamedLWLockTrancheRequestsAllocated = i;
665 }
666
667 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
668 Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
669 StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
670 request->num_lwlocks = num_lwlocks;
671 NamedLWLockTrancheRequests++;
672 }
673
674 /*
675 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
676 */
677 void
678 LWLockInitialize(LWLock *lock, int tranche_id)
679 {
680 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
681 #ifdef LOCK_DEBUG
682 pg_atomic_init_u32(&lock->nwaiters, 0);
683 #endif
read_tuple_arg<T, F>(&mut self, a_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>684 lock->tranche = tranche_id;
685 proclist_init(&lock->waiters);
686 }
687
688 /*
689 * Report start of wait event for light-weight locks.
690 *
691 * This function will be used by all the light-weight lock calls which
692 * needs to wait to acquire the lock. This function distinguishes wait
read_tuple_struct<T, F>(&mut self, s_name: &str, len: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>693 * event based on tranche and lock id.
694 */
695 static inline void
696 LWLockReportWaitStart(LWLock *lock)
697 {
698 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
699 }
700
701 /*
702 * Report end of wait event for light-weight locks.
703 */
704 static inline void
705 LWLockReportWaitEnd(void)
706 {
707 pgstat_report_wait_end();
read_tuple_struct_arg<T, F>(&mut self, a_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>708 }
709
710 /*
711 * Return an identifier for an LWLock based on the wait class and event.
712 */
713 const char *
714 GetLWLockIdentifier(uint32 classId, uint16 eventId)
715 {
716 Assert(classId == PG_WAIT_LWLOCK);
717
718 /*
719 * It is quite possible that user has registered tranche in one of the
720 * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
721 * all of them, so we can't assume the tranche is registered here.
722 */
723 if (eventId >= LWLockTranchesAllocated ||
724 LWLockTrancheArray[eventId] == NULL)
725 return "extension";
726
727 return LWLockTrancheArray[eventId];
728 }
729
730 /*
731 * Internal function that tries to atomically acquire the lwlock in the passed
732 * in mode.
733 *
734 * This function will not block waiting for a lock to become free - that's the
735 * callers job.
736 *
read_seq<T, F>(&mut self, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self, usize) -> Result<T, Self::Error>737 * Returns true if the lock isn't free and we need to wait.
738 */
739 static bool
740 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
741 {
742 uint32 old_state;
743
744 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
745
746 /*
747 * Read once outside the loop, later iterations will get the newer value
748 * via compare & exchange.
749 */
750 old_state = pg_atomic_read_u32(&lock->state);
751
752 /* loop until we've determined whether we could acquire the lock or not */
753 while (true)
754 {
755 uint32 desired_state;
756 bool lock_free;
757
758 desired_state = old_state;
759
760 if (mode == LW_EXCLUSIVE)
761 {
762 lock_free = (old_state & LW_LOCK_MASK) == 0;
763 if (lock_free)
764 desired_state += LW_VAL_EXCLUSIVE;
765 }
766 else
767 {
768 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
769 if (lock_free)
770 desired_state += LW_VAL_SHARED;
771 }
772
773 /*
774 * Attempt to swap in the state we are expecting. If we didn't see
775 * lock to be free, that's just the old value. If we saw it as free,
776 * we'll attempt to mark it acquired. The reason that we always swap
777 * in the value is that this doubles as a memory barrier. We could try
778 * to be smarter and only swap in values if we saw the lock as free,
779 * but benchmark haven't shown it as beneficial so far.
780 *
781 * Retry if the value changed since we last looked at it.
782 */
783 if (pg_atomic_compare_exchange_u32(&lock->state,
784 &old_state, desired_state))
785 {
786 if (lock_free)
787 {
788 /* Great! Got the lock. */
789 #ifdef LOCK_DEBUG
790 if (mode == LW_EXCLUSIVE)
791 lock->owner = MyProc;
792 #endif
793 return false;
794 }
795 else
796 return true; /* somebody else has the lock */
797 }
798 }
799 pg_unreachable();
800 }
801
802 /*
803 * Lock the LWLock's wait list against concurrent activity.
804 *
805 * NB: even though the wait list is locked, non-conflicting lock operations
806 * may still happen concurrently.
807 *
808 * Time spent holding mutex should be short!
809 */
810 static void
811 LWLockWaitListLock(LWLock *lock)
812 {
813 uint32 old_state;
814 #ifdef LWLOCK_STATS
815 lwlock_stats *lwstats;
816 uint32 delays = 0;
817
818 lwstats = get_lwlock_stats_entry(lock);
819 #endif
820
821 while (true)
822 {
823 /* always try once to acquire lock directly */
824 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
825 if (!(old_state & LW_FLAG_LOCKED))
826 break; /* got lock */
827
828 /* and then spin without atomic operations until lock is released */
829 {
830 SpinDelayStatus delayStatus;
831
832 init_local_spin_delay(&delayStatus);
833
834 while (old_state & LW_FLAG_LOCKED)
835 {
836 perform_spin_delay(&delayStatus);
837 old_state = pg_atomic_read_u32(&lock->state);
838 }
839 #ifdef LWLOCK_STATS
840 delays += delayStatus.delays;
841 #endif
842 finish_spin_delay(&delayStatus);
843 }
844
845 /*
846 * Retry. The lock might obviously already be re-acquired by the time
847 * we're attempting to get it again.
848 */
849 }
850
851 #ifdef LWLOCK_STATS
852 lwstats->spin_delay_count += delays;
encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error>853 #endif
854 }
855
856 /*
857 * Unlock the LWLock's wait list.
858 *
859 * Note that it can be more efficient to manipulate flags and release the
860 * locks in a single atomic operation.
861 */
862 static void
863 LWLockWaitListUnlock(LWLock *lock)
864 {
865 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
866
867 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
868
869 Assert(old_state & LW_FLAG_LOCKED);
870 }
871
872 /*
873 * Wakeup all the lockers that currently have a chance to acquire the lock.
874 */
875 static void
876 LWLockWakeup(LWLock *lock)
877 {
878 bool new_release_ok;
879 bool wokeup_somebody = false;
880 proclist_head wakeup;
881 proclist_mutable_iter iter;
882
883 proclist_init(&wakeup);
884
885 new_release_ok = true;
886
887 /* lock wait list while collecting backends to wake up */
888 LWLockWaitListLock(lock);
889
890 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
891 {
892 PGPROC *waiter = GetPGProcByNumber(iter.cur);
893
894 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
895 continue;
896
897 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
898 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
899
900 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
901 {
902 /*
903 * Prevent additional wakeups until retryer gets to run. Backends
decode<D: Decoder>(d: &mut D) -> Result<Self, D::Error>904 * that are just waiting for the lock to become free don't retry
905 * automatically.
906 */
907 new_release_ok = false;
encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error>908
909 /*
910 * Don't wakeup (further) exclusive locks.
911 */
912 wokeup_somebody = true;
913 }
decode<D: Decoder>(d: &mut D) -> Result<usize, D::Error>914
915 /*
916 * Once we've woken up an exclusive lock, there's no point in waking
917 * up anybody else.
918 */
919 if (waiter->lwWaitMode == LW_EXCLUSIVE)
920 break;
921 }
922
923 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
924
925 /* unset required flags, and release lock, in one fell swoop */
decode<D: Decoder>(d: &mut D) -> Result<u8, D::Error>926 {
927 uint32 old_state;
928 uint32 desired_state;
929
930 old_state = pg_atomic_read_u32(&lock->state);
931 while (true)
932 {
933 desired_state = old_state;
934
935 /* compute desired flags */
936
937 if (new_release_ok)
938 desired_state |= LW_FLAG_RELEASE_OK;
939 else
940 desired_state &= ~LW_FLAG_RELEASE_OK;
941
942 if (proclist_is_empty(&wakeup))
943 desired_state &= ~LW_FLAG_HAS_WAITERS;
944
945 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
946
947 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
948 desired_state))
949 break;
950 }
951 }
952
953 /* Awaken any waiters I removed from the queue. */
954 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
955 {
encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error>956 PGPROC *waiter = GetPGProcByNumber(iter.cur);
957
958 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
959 proclist_delete(&wakeup, iter.cur, lwWaitLink);
960
961 /*
decode<D: Decoder>(d: &mut D) -> Result<u64, D::Error>962 * Guarantee that lwWaiting being unset only becomes visible once the
963 * unlink from the link has completed. Otherwise the target backend
964 * could be woken up for other reason and enqueue for a new lock - if
965 * that happens before the list unlink happens, the list would end up
966 * being corrupted.
967 *
968 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
969 * another lock.
970 */
971 pg_write_barrier();
972 waiter->lwWaiting = false;
973 PGSemaphoreUnlock(waiter->sem);
decode<D: Decoder>(d: &mut D) -> Result<isize, D::Error>974 }
975 }
976
977 /*
978 * Add ourselves to the end of the queue.
979 *
980 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
981 */
982 static void
983 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
984 {
985 /*
986 * If we don't have a PGPROC structure, there's no way to wait. This
987 * should never occur, since MyProc should only be null during shared
988 * memory initialization.
989 */
990 if (MyProc == NULL)
991 elog(PANIC, "cannot wait without a PGPROC structure");
992
993 if (MyProc->lwWaiting)
994 elog(PANIC, "queueing for lock while waiting on another one");
995
996 LWLockWaitListLock(lock);
997
998 /* setting the flag is protected by the spinlock */
999 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1000
1001 MyProc->lwWaiting = true;
1002 MyProc->lwWaitMode = mode;
1003
1004 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1005 if (mode == LW_WAIT_UNTIL_FREE)
1006 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1007 else
1008 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1009
1010 /* Can release the mutex now */
1011 LWLockWaitListUnlock(lock);
1012
1013 #ifdef LOCK_DEBUG
1014 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1015 #endif
1016
1017 }
1018
1019 /*
1020 * Remove ourselves from the waitlist.
1021 *
decode<D: Decoder>(d: &mut D) -> Result<i64, D::Error>1022 * This is used if we queued ourselves because we thought we needed to sleep
1023 * but, after further checking, we discovered that we don't actually need to
1024 * do so.
1025 */
1026 static void
1027 LWLockDequeueSelf(LWLock *lock)
1028 {
1029 bool found = false;
1030 proclist_mutable_iter iter;
1031
1032 #ifdef LWLOCK_STATS
1033 lwlock_stats *lwstats;
1034
1035 lwstats = get_lwlock_stats_entry(lock);
1036
1037 lwstats->dequeue_self_count++;
1038 #endif
1039
1040 LWLockWaitListLock(lock);
1041
1042 /*
1043 * Can't just remove ourselves from the list, but we need to iterate over
1044 * all entries as somebody else could have dequeued us.
1045 */
1046 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1047 {
1048 if (iter.cur == MyProc->pgprocno)
1049 {
1050 found = true;
1051 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1052 break;
1053 }
1054 }
1055
1056 if (proclist_is_empty(&lock->waiters) &&
1057 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1058 {
1059 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1060 }
1061
1062 /* XXX: combine with fetch_and above? */
1063 LWLockWaitListUnlock(lock);
1064
1065 /* clear waiting state again, nice for debugging */
1066 if (found)
1067 MyProc->lwWaiting = false;
1068 else
1069 {
1070 int extraWaits = 0;
1071
1072 /*
1073 * Somebody else dequeued us and has or will wake us up. Deal with the
1074 * superfluous absorption of a wakeup.
1075 */
1076
1077 /*
1078 * Reset releaseOk if somebody woke us before we removed ourselves -
1079 * they'll have set it to false.
1080 */
1081 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1082
1083 /*
1084 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1085 * get reset at some inconvenient point later. Most of the time this
1086 * will immediately return.
1087 */
1088 for (;;)
1089 {
1090 PGSemaphoreLock(MyProc->sem);
1091 if (!MyProc->lwWaiting)
1092 break;
1093 extraWaits++;
1094 }
1095
1096 /*
1097 * Fix the process wait semaphore's count for any absorbed wakeups.
1098 */
1099 while (extraWaits-- > 0)
1100 PGSemaphoreUnlock(MyProc->sem);
1101 }
1102
1103 #ifdef LOCK_DEBUG
1104 {
1105 /* not waiting anymore */
1106 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1107
1108 Assert(nwaiters < MAX_BACKENDS);
1109 }
1110 #endif
1111 }
encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error>1112
1113 /*
1114 * LWLockAcquire - acquire a lightweight lock in the specified mode
1115 *
1116 * If the lock is not available, sleep until it is. Returns true if the lock
1117 * was available immediately, false if we had to sleep.
1118 *
1119 * Side effect: cancel/die interrupts are held off until lock release.
1120 */
1121 bool
1122 LWLockAcquire(LWLock *lock, LWLockMode mode)
1123 {
1124 PGPROC *proc = MyProc;
1125 bool result = true;
1126 int extraWaits = 0;
1127 #ifdef LWLOCK_STATS
1128 lwlock_stats *lwstats;
1129
1130 lwstats = get_lwlock_stats_entry(lock);
1131 #endif
1132
1133 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1134
1135 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1136
1137 #ifdef LWLOCK_STATS
1138 /* Count lock acquisition attempts */
1139 if (mode == LW_EXCLUSIVE)
1140 lwstats->ex_acquire_count++;
1141 else
1142 lwstats->sh_acquire_count++;
1143 #endif /* LWLOCK_STATS */
1144
1145 /*
1146 * We can't wait if we haven't got a PGPROC. This should only occur
1147 * during bootstrap or shared memory initialization. Put an Assert here
1148 * to catch unsafe coding practices.
1149 */
1150 Assert(!(proc == NULL && IsUnderPostmaster));
1151
1152 /* Ensure we will have room to remember the lock */
1153 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1154 elog(ERROR, "too many LWLocks taken");
1155
1156 /*
1157 * Lock out cancel/die interrupts until we exit the code section protected
1158 * by the LWLock. This ensures that interrupts will not interfere with
1159 * manipulations of data structures in shared memory.
1160 */
1161 HOLD_INTERRUPTS();
1162
1163 /*
1164 * Loop here to try to acquire lock after each time we are signaled by
1165 * LWLockRelease.
1166 *
1167 * NOTE: it might seem better to have LWLockRelease actually grant us the
1168 * lock, rather than retrying and possibly having to go back to sleep. But
1169 * in practice that is no good because it means a process swap for every
1170 * lock acquisition when two or more processes are contending for the same
1171 * lock. Since LWLocks are normally used to protect not-very-long
1172 * sections of computation, a process needs to be able to acquire and
1173 * release the same lock many times during a single CPU time slice, even
1174 * in the presence of contention. The efficiency of being able to do that
1175 * outweighs the inefficiency of sometimes wasting a process dispatch
1176 * cycle because the lock is not free when a released waiter finally gets
1177 * to run. See pgsql-hackers archives for 29-Dec-01.
1178 */
1179 for (;;)
1180 {
1181 bool mustwait;
1182
1183 /*
1184 * Try to grab the lock the first time, we're not in the waitqueue
1185 * yet/anymore.
1186 */
1187 mustwait = LWLockAttemptLock(lock, mode);
1188
1189 if (!mustwait)
1190 {
1191 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1192 break; /* got the lock */
1193 }
1194
1195 /*
1196 * Ok, at this point we couldn't grab the lock on the first try. We
1197 * cannot simply queue ourselves to the end of the list and wait to be
1198 * woken up because by now the lock could long have been released.
1199 * Instead add us to the queue and try to grab the lock again. If we
1200 * succeed we need to revert the queuing and be happy, otherwise we
1201 * recheck the lock. If we still couldn't grab it, we know that the
1202 * other locker will see our queue entries when releasing since they
1203 * existed before we checked for the lock.
1204 */
1205
1206 /* add to the queue */
1207 LWLockQueueSelf(lock, mode);
1208
1209 /* we're now guaranteed to be woken up if necessary */
1210 mustwait = LWLockAttemptLock(lock, mode);
1211
1212 /* ok, grabbed the lock the second time round, need to undo queueing */
1213 if (!mustwait)
1214 {
1215 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1216
1217 LWLockDequeueSelf(lock);
1218 break;
1219 }
1220
1221 /*
1222 * Wait until awakened.
1223 *
1224 * It is possible that we get awakened for a reason other than being
1225 * signaled by LWLockRelease. If so, loop back and wait again. Once
1226 * we've gotten the LWLock, re-increment the sema by the number of
1227 * additional signals received.
1228 */
1229 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1230
1231 #ifdef LWLOCK_STATS
1232 lwstats->block_count++;
1233 #endif
1234
1235 LWLockReportWaitStart(lock);
1236 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1237
1238 for (;;)
1239 {
1240 PGSemaphoreLock(proc->sem);
1241 if (!proc->lwWaiting)
1242 break;
1243 extraWaits++;
1244 }
1245
1246 /* Retrying, allow LWLockRelease to release waiters again. */
1247 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1248
1249 #ifdef LOCK_DEBUG
1250 {
1251 /* not waiting anymore */
1252 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1253
1254 Assert(nwaiters < MAX_BACKENDS);
1255 }
1256 #endif
1257
1258 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1259 LWLockReportWaitEnd();
1260
1261 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1262
1263 /* Now loop back and try to acquire lock again. */
1264 result = false;
1265 }
1266
1267 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1268
1269 /* Add lock to list of locks held by this backend */
1270 held_lwlocks[num_held_lwlocks].lock = lock;
1271 held_lwlocks[num_held_lwlocks++].mode = mode;
1272
1273 /*
1274 * Fix the process wait semaphore's count for any absorbed wakeups.
1275 */
1276 while (extraWaits-- > 0)
1277 PGSemaphoreUnlock(proc->sem);
1278
1279 return result;
1280 }
1281
1282 /*
1283 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1284 *
1285 * If the lock is not available, return false with no side-effects.
1286 *
1287 * If successful, cancel/die interrupts are held off until lock release.
1288 */
1289 bool
1290 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1291 {
1292 bool mustwait;
1293
1294 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1295
1296 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1297
1298 /* Ensure we will have room to remember the lock */
1299 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1300 elog(ERROR, "too many LWLocks taken");
1301
1302 /*
1303 * Lock out cancel/die interrupts until we exit the code section protected
1304 * by the LWLock. This ensures that interrupts will not interfere with
1305 * manipulations of data structures in shared memory.
1306 */
1307 HOLD_INTERRUPTS();
1308
1309 /* Check for the lock */
1310 mustwait = LWLockAttemptLock(lock, mode);
1311
1312 if (mustwait)
1313 {
1314 /* Failed to get lock, so release interrupt holdoff */
1315 RESUME_INTERRUPTS();
1316
1317 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1318 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1319 }
1320 else
1321 {
1322 /* Add lock to list of locks held by this backend */
1323 held_lwlocks[num_held_lwlocks].lock = lock;
1324 held_lwlocks[num_held_lwlocks++].mode = mode;
1325 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1326 }
1327 return !mustwait;
1328 }
1329
1330 /*
1331 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1332 *
1333 * The semantics of this function are a bit funky. If the lock is currently
1334 * free, it is acquired in the given mode, and the function returns true. If
1335 * the lock isn't immediately free, the function waits until it is released
1336 * and returns false, but does not acquire the lock.
1337 *
1338 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1339 * holding WALWriteLock, it can flush the commit records of many other
1340 * backends as a side-effect. Those other backends need to wait until the
1341 * flush finishes, but don't need to acquire the lock anymore. They can just
1342 * wake up, observe that their records have already been flushed, and return.
1343 */
1344 bool
1345 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1346 {
1347 PGPROC *proc = MyProc;
1348 bool mustwait;
1349 int extraWaits = 0;
1350 #ifdef LWLOCK_STATS
1351 lwlock_stats *lwstats;
1352
1353 lwstats = get_lwlock_stats_entry(lock);
1354 #endif
1355
1356 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1357
1358 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1359
encode<S: Encoder>(&self, e: &mut S) -> Result<(), S::Error>1360 /* Ensure we will have room to remember the lock */
1361 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1362 elog(ERROR, "too many LWLocks taken");
1363
encode<S: Encoder>(&self, e: &mut S) -> Result<(), S::Error>1364 /*
1365 * Lock out cancel/die interrupts until we exit the code section protected
1366 * by the LWLock. This ensures that interrupts will not interfere with
1367 * manipulations of data structures in shared memory.
1368 */
1369 HOLD_INTERRUPTS();
1370
1371 /*
1372 * NB: We're using nearly the same twice-in-a-row lock acquisition
1373 * protocol as LWLockAcquire(). Check its comments for details.
1374 */
1375 mustwait = LWLockAttemptLock(lock, mode);
1376
encode<S: Encoder>(&self, e: &mut S) -> Result<(), S::Error>1377 if (mustwait)
1378 {
1379 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1380
1381 mustwait = LWLockAttemptLock(lock, mode);
1382
1383 if (mustwait)
1384 {
1385 /*
1386 * Wait until awakened. Like in LWLockAcquire, be prepared for
1387 * bogus wakeups.
1388 */
1389 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1390
1391 #ifdef LWLOCK_STATS
1392 lwstats->block_count++;
1393 #endif
1394
1395 LWLockReportWaitStart(lock);
1396 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1397
1398 for (;;)
1399 {
1400 PGSemaphoreLock(proc->sem);
1401 if (!proc->lwWaiting)
1402 break;
1403 extraWaits++;
1404 }
1405
1406 #ifdef LOCK_DEBUG
1407 {
1408 /* not waiting anymore */
1409 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1410
1411 Assert(nwaiters < MAX_BACKENDS);
1412 }
1413 #endif
1414 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1415 LWLockReportWaitEnd();
1416
1417 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1418 }
1419 else
1420 {
1421 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1422
1423 /*
1424 * Got lock in the second attempt, undo queueing. We need to treat
1425 * this as having successfully acquired the lock, otherwise we'd
1426 * not necessarily wake up people we've prevented from acquiring
1427 * the lock.
1428 */
1429 LWLockDequeueSelf(lock);
1430 }
1431 }
1432
1433 /*
1434 * Fix the process wait semaphore's count for any absorbed wakeups.
decode<D: Decoder>(d: &mut D) -> Result<RefCell<T>, D::Error>1435 */
1436 while (extraWaits-- > 0)
1437 PGSemaphoreUnlock(proc->sem);
1438
1439 if (mustwait)
1440 {
encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error>1441 /* Failed to get lock, so release interrupt holdoff */
1442 RESUME_INTERRUPTS();
1443 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1444 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1445 }
1446 else
decode<D: Decoder>(d: &mut D) -> Result<Arc<T>, D::Error>1447 {
1448 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1449 /* Add lock to list of locks held by this backend */
1450 held_lwlocks[num_held_lwlocks].lock = lock;
1451 held_lwlocks[num_held_lwlocks++].mode = mode;
1452 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1453 }
1454
1455 return !mustwait;
1456 }
1457
1458 /*
1459 * Does the lwlock in its current state need to wait for the variable value to
1460 * change?
1461 *
1462 * If we don't need to wait, and it's because the value of the variable has
1463 * changed, store the current value in newval.
1464 *
1465 * *result is set to true if the lock was free, and false otherwise.
1466 */
1467 static bool
1468 LWLockConflictsWithVar(LWLock *lock,
1469 uint64 *valptr, uint64 oldval, uint64 *newval,
1470 bool *result)
1471 {
1472 bool mustwait;
1473 uint64 value;
1474
1475 /*
1476 * Test first to see if it the slot is free right now.
1477 *
1478 * XXX: the caller uses a spinlock before this, so we don't need a memory
1479 * barrier here as far as the current usage is concerned. But that might
1480 * not be safe in general.
1481 */
1482 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1483
1484 if (!mustwait)
1485 {
1486 *result = true;
1487 return false;
1488 }
1489
emit_from_vec<T, F>(&mut self, v: &[T], f: F) -> Result<(), <Self as Encoder>::Error> where F: FnMut(&mut Self, &T) -> Result<(), <Self as Encoder>::Error>1490 *result = false;
1491
1492 /*
1493 * Read value using the lwlock's wait list lock, as we can't generally
1494 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1495 * do atomic 64 bit reads/writes the spinlock should be optimized away.
emit_from_vec<T, F>(&mut self, v: &[T], mut f: F) -> Result<(), S::Error> where F: FnMut(&mut S, &T) -> Result<(), S::Error>,1496 */
1497 LWLockWaitListLock(lock);
1498 value = *valptr;
1499 LWLockWaitListUnlock(lock);
1500
1501 if (value != oldval)
1502 {
1503 mustwait = false;
1504 *newval = value;
1505 }
1506 else
1507 {
1508 mustwait = true;
1509 }
1510
1511 return mustwait;
1512 }
1513
1514 /*
1515 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1516 *
1517 * If the lock is held and *valptr equals oldval, waits until the lock is
1518 * either freed, or the lock holder updates *valptr by calling
1519 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1520 * waiting), returns true. If the lock is still held, but *valptr no longer
1521 * matches oldval, returns false and sets *newval to the current value in
1522 * *valptr.
1523 *
1524 * Note: this function ignores shared lock holders; if the lock is held
1525 * in shared mode, returns 'true'.
1526 */
1527 bool
1528 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1529 {
1530 PGPROC *proc = MyProc;
1531 int extraWaits = 0;
1532 bool result = false;
1533 #ifdef LWLOCK_STATS
1534 lwlock_stats *lwstats;
1535
1536 lwstats = get_lwlock_stats_entry(lock);
1537 #endif
1538
1539 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1540
1541 /*
1542 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1543 * cleanup mechanism to remove us from the wait queue if we got
1544 * interrupted.
read_to_vec<T, F>(&mut self, f: F) -> Result<Vec<T>, <Self as Decoder>::Error> where F: FnMut(&mut Self) -> Result<T, <Self as Decoder>::Error>1545 */
1546 HOLD_INTERRUPTS();
1547
1548 /*
1549 * Loop here to check the lock's status after each time we are signaled.
1550 */
read_to_vec<T, F>(&mut self, mut f: F) -> Result<Vec<T>, D::Error> where F: FnMut(&mut D) -> Result<T, D::Error>,1551 for (;;)
1552 {
1553 bool mustwait;
1554
1555 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1556 &result);
1557
1558 if (!mustwait)
1559 break; /* the lock was free or value didn't match */
1560
1561 /*
1562 * Add myself to wait queue. Note that this is racy, somebody else
1563 * could wakeup before we're finished queuing. NB: We're using nearly
1564 * the same twice-in-a-row lock acquisition protocol as
1565 * LWLockAcquire(). Check its comments for details. The only
capacity_rules()1566 * difference is that we also have to check the variable's values when
1567 * checking the state of the lock.
1568 */
1569 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1570
1571 /*
1572 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1573 * lock is released.
1574 */
read_nil(&mut self) -> Result<(), Self::Error>1575 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
read_usize(&mut self) -> Result<usize, Self::Error>1576
1577 /*
1578 * We're now guaranteed to be woken up if necessary. Recheck the lock
1579 * and variables state.
1580 */
1581 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1582 &result);
read_i32(&mut self) -> Result<i32, Self::Error>1583
1584 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1585 if (!mustwait)
1586 {
1587 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1588
1589 LWLockDequeueSelf(lock);
1590 break;
1591 }
1592
read_enum<T, F>(&mut self, name: &str, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1593 /*
1594 * Wait until awakened.
1595 *
1596 * It is possible that we get awakened for a reason other than being
1597 * signaled by LWLockRelease. If so, loop back and wait again. Once
1598 * we've gotten the LWLock, re-increment the sema by the number of
1599 * additional signals received.
1600 */
1601 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1602
read_enum_struct_variant<T, F>(&mut self, names: &[&str], f: F) -> Result<T, Self::Error> where F: FnMut(&mut Self, usize) -> Result<T, Self::Error>1603 #ifdef LWLOCK_STATS
1604 lwstats->block_count++;
1605 #endif
read_enum_struct_variant_field<T, F>(&mut self, f_name: &str, f_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1606
1607 LWLockReportWaitStart(lock);
1608 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1609
1610 for (;;)
1611 {
1612 PGSemaphoreLock(proc->sem);
read_struct<T, F>(&mut self, s_name: &str, len: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1613 if (!proc->lwWaiting)
1614 break;
1615 extraWaits++;
read_struct_field<T, F>(&mut self, f_name: &str, f_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1616 }
1617
1618 #ifdef LOCK_DEBUG
1619 {
1620 /* not waiting anymore */
1621 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1622
1623 Assert(nwaiters < MAX_BACKENDS);
1624 }
read_tuple_arg<T, F>(&mut self, a_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1625 #endif
1626
1627 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1628 LWLockReportWaitEnd();
read_tuple_struct<T, F>(&mut self, s_name: &str, len: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1629
1630 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1631
read_tuple_struct_arg<T, F>(&mut self, a_idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1632 /* Now loop back and check the status of the lock again. */
1633 }
1634
1635 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1636
read_option<T, F>(&mut self, f: F) -> Result<T, Self::Error> where F: FnMut(&mut Self, bool) -> Result<T, Self::Error>1637 /*
1638 * Fix the process wait semaphore's count for any absorbed wakeups.
1639 */
1640 while (extraWaits-- > 0)
1641 PGSemaphoreUnlock(proc->sem);
1642
1643 /*
read_seq_elt<T, F>(&mut self, idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1644 * Now okay to allow cancel/die interrupts.
1645 */
1646 RESUME_INTERRUPTS();
read_map<T, F>(&mut self, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self, usize) -> Result<T, Self::Error>1647
1648 return result;
1649 }
1650
read_map_elt_key<T, F>(&mut self, idx: usize, f: F) -> Result<T, Self::Error> where F: FnOnce(&mut Self) -> Result<T, Self::Error>1651
1652 /*
1653 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1654 *
1655 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1656 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1657 * atomically so that any process calling LWLockWaitForVar() on the same lock
1658 * is guaranteed to see the new value, and act accordingly.
1659 *
1660 * The caller must be holding the lock in exclusive mode.
1661 */
1662 void
1663 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1664 {
1665 proclist_head wakeup;
1666 proclist_mutable_iter iter;
1667
1668 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1669
1670 proclist_init(&wakeup);
1671
1672 LWLockWaitListLock(lock);
1673
1674 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1675
1676 /* Update the lock's value */
1677 *valptr = val;
1678
1679 /*
1680 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1681 * up. They are always in the front of the queue.
1682 */
1683 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1684 {
1685 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1686
1687 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1688 break;
1689
1690 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1691 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1692 }
1693
1694 /* We are done updating shared state of the lock itself. */
1695 LWLockWaitListUnlock(lock);
1696
1697 /*
1698 * Awaken any waiters I removed from the queue.
1699 */
1700 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1701 {
1702 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1703
1704 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1705 /* check comment in LWLockWakeup() about this barrier */
1706 pg_write_barrier();
1707 waiter->lwWaiting = false;
1708 PGSemaphoreUnlock(waiter->sem);
1709 }
1710 }
1711
1712
1713 /*
1714 * LWLockRelease - release a previously acquired lock
1715 */
1716 void
1717 LWLockRelease(LWLock *lock)
1718 {
1719 LWLockMode mode;
1720 uint32 oldstate;
1721 bool check_waiters;
1722 int i;
1723
1724 /*
1725 * Remove lock from list of locks held. Usually, but not always, it will
1726 * be the latest-acquired lock; so search array backwards.
1727 */
1728 for (i = num_held_lwlocks; --i >= 0;)
1729 if (lock == held_lwlocks[i].lock)
1730 break;
1731
1732 if (i < 0)
1733 elog(ERROR, "lock %s is not held", T_NAME(lock));
1734
1735 mode = held_lwlocks[i].mode;
1736
1737 num_held_lwlocks--;
1738 for (; i < num_held_lwlocks; i++)
1739 held_lwlocks[i] = held_lwlocks[i + 1];
1740
1741 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1742
1743 /*
1744 * Release my hold on lock, after that it can immediately be acquired by
1745 * others, even if we still have to wakeup other waiters.
1746 */
1747 if (mode == LW_EXCLUSIVE)
1748 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1749 else
1750 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1751
1752 /* nobody else can have that kind of lock */
1753 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1754
1755
1756 /*
1757 * We're still waiting for backends to get scheduled, don't wake them up
1758 * again.
1759 */
1760 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1761 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1762 (oldstate & LW_LOCK_MASK) == 0)
1763 check_waiters = true;
1764 else
1765 check_waiters = false;
1766
1767 /*
1768 * As waking up waiters requires the spinlock to be acquired, only do so
1769 * if necessary.
1770 */
1771 if (check_waiters)
1772 {
1773 /* XXX: remove before commit? */
1774 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1775 LWLockWakeup(lock);
1776 }
1777
1778 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1779
1780 /*
1781 * Now okay to allow cancel/die interrupts.
1782 */
1783 RESUME_INTERRUPTS();
1784 }
1785
1786 /*
1787 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1788 */
1789 void
1790 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1791 {
1792 LWLockWaitListLock(lock);
1793
1794 /*
1795 * Set the variable's value before releasing the lock, that prevents race
1796 * a race condition wherein a new locker acquires the lock, but hasn't yet
1797 * set the variables value.
1798 */
1799 *valptr = val;
1800 LWLockWaitListUnlock(lock);
1801
1802 LWLockRelease(lock);
1803 }
1804
1805
1806 /*
1807 * LWLockReleaseAll - release all currently-held locks
1808 *
1809 * Used to clean up after ereport(ERROR). An important difference between this
1810 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1811 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1812 * has been set to an appropriate level earlier in error recovery. We could
1813 * decrement it below zero if we allow it to drop for each released lock!
1814 */
1815 void
1816 LWLockReleaseAll(void)
1817 {
1818 while (num_held_lwlocks > 0)
1819 {
1820 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1821
1822 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1823 }
1824 }
1825
1826
1827 /*
1828 * LWLockHeldByMe - test whether my process holds a lock in any mode
1829 *
1830 * This is meant as debug support only.
1831 */
1832 bool
1833 LWLockHeldByMe(LWLock *l)
1834 {
1835 int i;
1836
1837 for (i = 0; i < num_held_lwlocks; i++)
1838 {
1839 if (held_lwlocks[i].lock == l)
1840 return true;
1841 }
1842 return false;
1843 }
1844
1845 /*
1846 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1847 *
1848 * This is meant as debug support only.
1849 */
1850 bool
1851 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1852 {
1853 int i;
1854
1855 for (i = 0; i < num_held_lwlocks; i++)
1856 {
1857 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1858 return true;
1859 }
1860 return false;
1861 }
1862