1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *	  Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  * In addition to exclusive and shared modes, lightweight locks can be used to
14  * wait until a variable changes value.  The variable is initially not set
15  * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16  * value it was set to when the lock was released last, and can be updated
17  * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
18  * waits for the variable to be updated, or until the lock is free.  When
19  * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20  * appropriate value for a free lock.  The meaning of the variable is up to
21  * the caller, the lightweight lock code just assigns and compares it.
22  *
23  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
24  * Portions Copyright (c) 1994, Regents of the University of California
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/lmgr/lwlock.c
28  *
29  * NOTES:
30  *
31  * This used to be a pretty straight forward reader-writer lock
32  * implementation, in which the internal state was protected by a
33  * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34  * too high for workloads/locks that were taken in shared mode very
35  * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36  * while trying to acquire a shared lock that was actually free.
37  *
38  * Thus a new implementation was devised that provides wait-free shared lock
39  * acquisition for locks that aren't exclusively locked.
40  *
41  * The basic idea is to have a single atomic variable 'lockcount' instead of
42  * the formerly separate shared and exclusive counters and to use atomic
43  * operations to acquire the lock. That's fairly easy to do for plain
44  * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45  * in the OS.
46  *
47  * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48  * variable. For exclusive lock we swap in a sentinel value
49  * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50  *
51  * To release the lock we use an atomic decrement to release the lock. If the
52  * new value is zero (we get that atomically), we know we can/have to release
53  * waiters.
54  *
55  * Obviously it is important that the sentinel value for exclusive locks
56  * doesn't conflict with the maximum number of possible share lockers -
57  * luckily MAX_BACKENDS makes that easily possible.
58  *
59  *
60  * The attentive reader might have noticed that naively doing the above has a
61  * glaring race condition: We try to lock using the atomic operations and
62  * notice that we have to wait. Unfortunately by the time we have finished
63  * queuing, the former locker very well might have already finished it's
64  * work. That's problematic because we're now stuck waiting inside the OS.
65 
66  * To mitigate those races we use a two phased attempt at locking:
67  *	 Phase 1: Try to do it atomically, if we succeed, nice
68  *	 Phase 2: Add ourselves to the waitqueue of the lock
69  *	 Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70  *			  the queue
71  *	 Phase 4: Sleep till wake-up, goto Phase 1
72  *
73  * This protects us against the problem from above as nobody can release too
74  *	  quick, before we're queued, since after Phase 2 we're already queued.
75  * -------------------------------------------------------------------------
76  */
77 #include "postgres.h"
78 
79 #include "miscadmin.h"
80 #include "pgstat.h"
81 #include "pg_trace.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/spin.h"
88 #include "utils/memutils.h"
89 
90 #ifdef LWLOCK_STATS
91 #include "utils/hsearch.h"
92 #endif
93 
94 
95 /* We use the ShmemLock spinlock to protect LWLockCounter */
96 extern slock_t *ShmemLock;
97 
98 #define LW_FLAG_HAS_WAITERS			((uint32) 1 << 30)
99 #define LW_FLAG_RELEASE_OK			((uint32) 1 << 29)
100 #define LW_FLAG_LOCKED				((uint32) 1 << 28)
101 
102 #define LW_VAL_EXCLUSIVE			((uint32) 1 << 24)
103 #define LW_VAL_SHARED				1
104 
105 #define LW_LOCK_MASK				((uint32) ((1 << 25)-1))
106 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
107 #define LW_SHARED_MASK				((uint32) ((1 << 24)-1))
108 
109 /*
110  * This is indexed by tranche ID and stores metadata for all tranches known
111  * to the current backend.
112  */
113 static LWLockTranche **LWLockTrancheArray = NULL;
114 static int	LWLockTranchesAllocated = 0;
115 
116 #define T_NAME(lock) \
117 	(LWLockTrancheArray[(lock)->tranche]->name)
118 #define T_ID(lock) \
119 	((int) ((((char *) lock) - \
120 		((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
121 		LWLockTrancheArray[(lock)->tranche]->array_stride))
122 
123 /*
124  * This points to the main array of LWLocks in shared memory.  Backends inherit
125  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
126  * where we have special measures to pass it down).
127  */
128 LWLockPadded *MainLWLockArray = NULL;
129 static LWLockTranche MainLWLockTranche;
130 static LWLockTranche BufMappingLWLockTranche;
131 static LWLockTranche LockManagerLWLockTranche;
132 static LWLockTranche PredicateLockManagerLWLockTranche;
133 
134 /*
135  * We use this structure to keep track of locked LWLocks for release
136  * during error recovery.  Normally, only a few will be held at once, but
137  * occasionally the number can be much higher; for example, the pg_buffercache
138  * extension locks all buffer partitions simultaneously.
139  */
140 #define MAX_SIMUL_LWLOCKS	200
141 
142 /* struct representing the LWLocks we're holding */
143 typedef struct LWLockHandle
144 {
145 	LWLock	   *lock;
146 	LWLockMode	mode;
147 } LWLockHandle;
148 
149 static int	num_held_lwlocks = 0;
150 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
151 
152 /* struct representing the LWLock tranche request for named tranche */
153 typedef struct NamedLWLockTrancheRequest
154 {
155 	char		tranche_name[NAMEDATALEN];
156 	int			num_lwlocks;
157 } NamedLWLockTrancheRequest;
158 
159 NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
160 static int	NamedLWLockTrancheRequestsAllocated = 0;
161 int			NamedLWLockTrancheRequests = 0;
162 
163 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
164 
165 static bool lock_named_request_allowed = true;
166 
167 static void InitializeLWLocks(void);
168 static void RegisterLWLockTranches(void);
169 
170 static inline void LWLockReportWaitStart(LWLock *lock);
171 static inline void LWLockReportWaitEnd(void);
172 
173 #ifdef LWLOCK_STATS
174 typedef struct lwlock_stats_key
175 {
176 	int			tranche;
177 	int			instance;
178 }	lwlock_stats_key;
179 
180 typedef struct lwlock_stats
181 {
182 	lwlock_stats_key key;
183 	int			sh_acquire_count;
184 	int			ex_acquire_count;
185 	int			block_count;
186 	int			dequeue_self_count;
187 	int			spin_delay_count;
188 }	lwlock_stats;
189 
190 static HTAB *lwlock_stats_htab;
191 static lwlock_stats lwlock_stats_dummy;
192 #endif
193 
194 #ifdef LOCK_DEBUG
195 bool		Trace_lwlocks = false;
196 
197 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)198 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
199 {
200 	/* hide statement & context here, otherwise the log is just too verbose */
201 	if (Trace_lwlocks)
202 	{
203 		uint32		state = pg_atomic_read_u32(&lock->state);
204 		int			id = T_ID(lock);
205 
206 		if (lock->tranche == 0 && id < NUM_INDIVIDUAL_LWLOCKS)
207 			ereport(LOG,
208 					(errhidestmt(true),
209 					 errhidecontext(true),
210 					 errmsg_internal("%d: %s(%s): excl %u shared %u haswaiters %u waiters %u rOK %d",
211 									 MyProcPid,
212 									 where, MainLWLockNames[id],
213 									 (state & LW_VAL_EXCLUSIVE) != 0,
214 									 state & LW_SHARED_MASK,
215 									 (state & LW_FLAG_HAS_WAITERS) != 0,
216 									 pg_atomic_read_u32(&lock->nwaiters),
217 									 (state & LW_FLAG_RELEASE_OK) != 0)));
218 		else
219 			ereport(LOG,
220 					(errhidestmt(true),
221 					 errhidecontext(true),
222 					 errmsg_internal("%d: %s(%s %d): excl %u shared %u haswaiters %u waiters %u rOK %d",
223 									 MyProcPid,
224 									 where, T_NAME(lock), id,
225 									 (state & LW_VAL_EXCLUSIVE) != 0,
226 									 state & LW_SHARED_MASK,
227 									 (state & LW_FLAG_HAS_WAITERS) != 0,
228 									 pg_atomic_read_u32(&lock->nwaiters),
229 									 (state & LW_FLAG_RELEASE_OK) != 0)));
230 	}
231 }
232 
233 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)234 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
235 {
236 	/* hide statement & context here, otherwise the log is just too verbose */
237 	if (Trace_lwlocks)
238 	{
239 		int			id = T_ID(lock);
240 
241 		if (lock->tranche == 0 && id < NUM_INDIVIDUAL_LWLOCKS)
242 			ereport(LOG,
243 					(errhidestmt(true),
244 					 errhidecontext(true),
245 					 errmsg_internal("%s(%s): %s", where,
246 									 MainLWLockNames[id], msg)));
247 		else
248 			ereport(LOG,
249 					(errhidestmt(true),
250 					 errhidecontext(true),
251 					 errmsg_internal("%s(%s %d): %s", where,
252 									 T_NAME(lock), id, msg)));
253 	}
254 }
255 
256 #else							/* not LOCK_DEBUG */
257 #define PRINT_LWDEBUG(a,b,c) ((void)0)
258 #define LOG_LWDEBUG(a,b,c) ((void)0)
259 #endif   /* LOCK_DEBUG */
260 
261 #ifdef LWLOCK_STATS
262 
263 static void init_lwlock_stats(void);
264 static void print_lwlock_stats(int code, Datum arg);
265 static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
266 
267 static void
init_lwlock_stats(void)268 init_lwlock_stats(void)
269 {
270 	HASHCTL		ctl;
271 	static MemoryContext lwlock_stats_cxt = NULL;
272 	static bool exit_registered = false;
273 
274 	if (lwlock_stats_cxt != NULL)
275 		MemoryContextDelete(lwlock_stats_cxt);
276 
277 	/*
278 	 * The LWLock stats will be updated within a critical section, which
279 	 * requires allocating new hash entries. Allocations within a critical
280 	 * section are normally not allowed because running out of memory would
281 	 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
282 	 * turned on in production, so that's an acceptable risk. The hash entries
283 	 * are small, so the risk of running out of memory is minimal in practice.
284 	 */
285 	lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
286 											 "LWLock stats",
287 											 ALLOCSET_DEFAULT_SIZES);
288 	MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
289 
290 	MemSet(&ctl, 0, sizeof(ctl));
291 	ctl.keysize = sizeof(lwlock_stats_key);
292 	ctl.entrysize = sizeof(lwlock_stats);
293 	ctl.hcxt = lwlock_stats_cxt;
294 	lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
295 									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
296 	if (!exit_registered)
297 	{
298 		on_shmem_exit(print_lwlock_stats, 0);
299 		exit_registered = true;
300 	}
301 }
302 
303 static void
print_lwlock_stats(int code,Datum arg)304 print_lwlock_stats(int code, Datum arg)
305 {
306 	HASH_SEQ_STATUS scan;
307 	lwlock_stats *lwstats;
308 
309 	hash_seq_init(&scan, lwlock_stats_htab);
310 
311 	/* Grab an LWLock to keep different backends from mixing reports */
312 	LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
313 
314 	while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
315 	{
316 		fprintf(stderr,
317 				"PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
318 				MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
319 				lwstats->key.instance, lwstats->sh_acquire_count,
320 				lwstats->ex_acquire_count, lwstats->block_count,
321 				lwstats->spin_delay_count, lwstats->dequeue_self_count);
322 	}
323 
324 	LWLockRelease(&MainLWLockArray[0].lock);
325 }
326 
327 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)328 get_lwlock_stats_entry(LWLock *lock)
329 {
330 	lwlock_stats_key key;
331 	lwlock_stats *lwstats;
332 	bool		found;
333 
334 	/*
335 	 * During shared memory initialization, the hash table doesn't exist yet.
336 	 * Stats of that phase aren't very interesting, so just collect operations
337 	 * on all locks in a single dummy entry.
338 	 */
339 	if (lwlock_stats_htab == NULL)
340 		return &lwlock_stats_dummy;
341 
342 	/* Fetch or create the entry. */
343 	key.tranche = lock->tranche;
344 	key.instance = T_ID(lock);
345 	lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
346 	if (!found)
347 	{
348 		lwstats->sh_acquire_count = 0;
349 		lwstats->ex_acquire_count = 0;
350 		lwstats->block_count = 0;
351 		lwstats->dequeue_self_count = 0;
352 		lwstats->spin_delay_count = 0;
353 	}
354 	return lwstats;
355 }
356 #endif   /* LWLOCK_STATS */
357 
358 
359 /*
360  * Compute number of LWLocks required by named tranches.  These will be
361  * allocated in the main array.
362  */
363 static int
NumLWLocksByNamedTranches(void)364 NumLWLocksByNamedTranches(void)
365 {
366 	int			numLocks = 0;
367 	int			i;
368 
369 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
370 		numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
371 
372 	return numLocks;
373 }
374 
375 /*
376  * Compute shmem space needed for LWLocks and named tranches.
377  */
378 Size
LWLockShmemSize(void)379 LWLockShmemSize(void)
380 {
381 	Size		size;
382 	int			i;
383 	int			numLocks = NUM_FIXED_LWLOCKS;
384 
385 	numLocks += NumLWLocksByNamedTranches();
386 
387 	/* Space for the LWLock array. */
388 	size = mul_size(numLocks, sizeof(LWLockPadded));
389 
390 	/* Space for dynamic allocation counter, plus room for alignment. */
391 	size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
392 
393 	/* space for named tranches. */
394 	size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
395 
396 	/* space for name of each tranche. */
397 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
398 		size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
399 
400 	/* Disallow named LWLocks' requests after startup */
401 	lock_named_request_allowed = false;
402 
403 	return size;
404 }
405 
406 /*
407  * Allocate shmem space for the main LWLock array and all tranches and
408  * initialize it.  We also register all the LWLock tranches here.
409  */
410 void
CreateLWLocks(void)411 CreateLWLocks(void)
412 {
413 	StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
414 					 "MAX_BACKENDS too big for lwlock.c");
415 
416 	StaticAssertExpr(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
417 					 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
418 					 "Miscalculated LWLock padding");
419 
420 	if (!IsUnderPostmaster)
421 	{
422 		Size		spaceLocks = LWLockShmemSize();
423 		int		   *LWLockCounter;
424 		char	   *ptr;
425 
426 		/* Allocate space */
427 		ptr = (char *) ShmemAlloc(spaceLocks);
428 
429 		/* Leave room for dynamic allocation of tranches */
430 		ptr += sizeof(int);
431 
432 		/* Ensure desired alignment of LWLock array */
433 		ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
434 
435 		MainLWLockArray = (LWLockPadded *) ptr;
436 
437 		/*
438 		 * Initialize the dynamic-allocation counter for tranches, which is
439 		 * stored just before the first LWLock.
440 		 */
441 		LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
442 		*LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
443 
444 		/* Initialize all LWLocks */
445 		InitializeLWLocks();
446 	}
447 
448 	/* Register all LWLock tranches */
449 	RegisterLWLockTranches();
450 }
451 
452 /*
453  * Initialize LWLocks that are fixed and those belonging to named tranches.
454  */
455 static void
InitializeLWLocks(void)456 InitializeLWLocks(void)
457 {
458 	int			numNamedLocks = NumLWLocksByNamedTranches();
459 	int			id;
460 	int			i;
461 	int			j;
462 	LWLockPadded *lock;
463 
464 	/* Initialize all individual LWLocks in main array */
465 	for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
466 		LWLockInitialize(&lock->lock, LWTRANCHE_MAIN);
467 
468 	/* Initialize buffer mapping LWLocks in main array */
469 	lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
470 	for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
471 		LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
472 
473 	/* Initialize lmgrs' LWLocks in main array */
474 	lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
475 	for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
476 		LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
477 
478 	/* Initialize predicate lmgrs' LWLocks in main array */
479 	lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
480 		NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
481 	for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
482 		LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
483 
484 	/* Initialize named tranches. */
485 	if (NamedLWLockTrancheRequests > 0)
486 	{
487 		char	   *trancheNames;
488 
489 		NamedLWLockTrancheArray = (NamedLWLockTranche *)
490 			&MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
491 
492 		trancheNames = (char *) NamedLWLockTrancheArray +
493 			(NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
494 		lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
495 
496 		for (i = 0; i < NamedLWLockTrancheRequests; i++)
497 		{
498 			NamedLWLockTrancheRequest *request;
499 			NamedLWLockTranche *tranche;
500 			char	   *name;
501 
502 			request = &NamedLWLockTrancheRequestArray[i];
503 			tranche = &NamedLWLockTrancheArray[i];
504 
505 			name = trancheNames;
506 			trancheNames += strlen(request->tranche_name) + 1;
507 			strcpy(name, request->tranche_name);
508 			tranche->lwLockTranche.name = name;
509 			tranche->trancheId = LWLockNewTrancheId();
510 			tranche->lwLockTranche.array_base = lock;
511 			tranche->lwLockTranche.array_stride = sizeof(LWLockPadded);
512 
513 			for (j = 0; j < request->num_lwlocks; j++, lock++)
514 				LWLockInitialize(&lock->lock, tranche->trancheId);
515 		}
516 	}
517 }
518 
519 /*
520  * Register named tranches and tranches for fixed LWLocks.
521  */
522 static void
RegisterLWLockTranches(void)523 RegisterLWLockTranches(void)
524 {
525 	int			i;
526 
527 	if (LWLockTrancheArray == NULL)
528 	{
529 		LWLockTranchesAllocated = 32;
530 		LWLockTrancheArray = (LWLockTranche **)
531 			MemoryContextAllocZero(TopMemoryContext,
532 						  LWLockTranchesAllocated * sizeof(LWLockTranche *));
533 		Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED);
534 	}
535 
536 	MainLWLockTranche.name = "main";
537 	MainLWLockTranche.array_base = MainLWLockArray;
538 	MainLWLockTranche.array_stride = sizeof(LWLockPadded);
539 	LWLockRegisterTranche(LWTRANCHE_MAIN, &MainLWLockTranche);
540 
541 	BufMappingLWLockTranche.name = "buffer_mapping";
542 	BufMappingLWLockTranche.array_base = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
543 	BufMappingLWLockTranche.array_stride = sizeof(LWLockPadded);
544 	LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, &BufMappingLWLockTranche);
545 
546 	LockManagerLWLockTranche.name = "lock_manager";
547 	LockManagerLWLockTranche.array_base = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
548 		NUM_BUFFER_PARTITIONS;
549 	LockManagerLWLockTranche.array_stride = sizeof(LWLockPadded);
550 	LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, &LockManagerLWLockTranche);
551 
552 	PredicateLockManagerLWLockTranche.name = "predicate_lock_manager";
553 	PredicateLockManagerLWLockTranche.array_base = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
554 		NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
555 	PredicateLockManagerLWLockTranche.array_stride = sizeof(LWLockPadded);
556 	LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER, &PredicateLockManagerLWLockTranche);
557 
558 	/* Register named tranches. */
559 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
560 		LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
561 							  &NamedLWLockTrancheArray[i].lwLockTranche);
562 }
563 
564 /*
565  * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
566  */
567 void
InitLWLockAccess(void)568 InitLWLockAccess(void)
569 {
570 #ifdef LWLOCK_STATS
571 	init_lwlock_stats();
572 #endif
573 }
574 
575 /*
576  * GetNamedLWLockTranche - returns the base address of LWLock from the
577  *		specified tranche.
578  *
579  * Caller needs to retrieve the requested number of LWLocks starting from
580  * the base lock address returned by this API.  This can be used for
581  * tranches that are requested by using RequestNamedLWLockTranche() API.
582  */
583 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)584 GetNamedLWLockTranche(const char *tranche_name)
585 {
586 	int			lock_pos;
587 	int			i;
588 
589 	/*
590 	 * Obtain the position of base address of LWLock belonging to requested
591 	 * tranche_name in MainLWLockArray.  LWLocks for named tranches are placed
592 	 * in MainLWLockArray after fixed locks.
593 	 */
594 	lock_pos = NUM_FIXED_LWLOCKS;
595 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
596 	{
597 		if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
598 				   tranche_name) == 0)
599 			return &MainLWLockArray[lock_pos];
600 
601 		lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
602 	}
603 
604 	if (i >= NamedLWLockTrancheRequests)
605 		elog(ERROR, "requested tranche is not registered");
606 
607 	/* just to keep compiler quiet */
608 	return NULL;
609 }
610 
611 /*
612  * Allocate a new tranche ID.
613  */
614 int
LWLockNewTrancheId(void)615 LWLockNewTrancheId(void)
616 {
617 	int			result;
618 	int		   *LWLockCounter;
619 
620 	LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
621 	SpinLockAcquire(ShmemLock);
622 	result = (*LWLockCounter)++;
623 	SpinLockRelease(ShmemLock);
624 
625 	return result;
626 }
627 
628 /*
629  * Register a tranche ID in the lookup table for the current process.  This
630  * routine will save a pointer to the tranche object passed as an argument,
631  * so that object should be allocated in a backend-lifetime context
632  * (TopMemoryContext, static variable, or similar).
633  */
634 void
LWLockRegisterTranche(int tranche_id,LWLockTranche * tranche)635 LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
636 {
637 	Assert(LWLockTrancheArray != NULL);
638 
639 	if (tranche_id >= LWLockTranchesAllocated)
640 	{
641 		int			i = LWLockTranchesAllocated;
642 		int			j = LWLockTranchesAllocated;
643 
644 		while (i <= tranche_id)
645 			i *= 2;
646 
647 		LWLockTrancheArray = (LWLockTranche **)
648 			repalloc(LWLockTrancheArray,
649 					 i * sizeof(LWLockTranche *));
650 		LWLockTranchesAllocated = i;
651 		while (j < LWLockTranchesAllocated)
652 			LWLockTrancheArray[j++] = NULL;
653 	}
654 
655 	LWLockTrancheArray[tranche_id] = tranche;
656 }
657 
658 /*
659  * RequestNamedLWLockTranche
660  *		Request that extra LWLocks be allocated during postmaster
661  *		startup.
662  *
663  * This is only useful for extensions if called from the _PG_init hook
664  * of a library that is loaded into the postmaster via
665  * shared_preload_libraries.  Once shared memory has been allocated, calls
666  * will be ignored.  (We could raise an error, but it seems better to make
667  * it a no-op, so that libraries containing such calls can be reloaded if
668  * needed.)
669  */
670 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)671 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
672 {
673 	NamedLWLockTrancheRequest *request;
674 
675 	if (IsUnderPostmaster || !lock_named_request_allowed)
676 		return;					/* too late */
677 
678 	if (NamedLWLockTrancheRequestArray == NULL)
679 	{
680 		NamedLWLockTrancheRequestsAllocated = 16;
681 		NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
682 			MemoryContextAlloc(TopMemoryContext,
683 							   NamedLWLockTrancheRequestsAllocated
684 							   * sizeof(NamedLWLockTrancheRequest));
685 	}
686 
687 	if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
688 	{
689 		int			i = NamedLWLockTrancheRequestsAllocated;
690 
691 		while (i <= NamedLWLockTrancheRequests)
692 			i *= 2;
693 
694 		NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
695 			repalloc(NamedLWLockTrancheRequestArray,
696 					 i * sizeof(NamedLWLockTrancheRequest));
697 		NamedLWLockTrancheRequestsAllocated = i;
698 	}
699 
700 	request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
701 	Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
702 	StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
703 	request->num_lwlocks = num_lwlocks;
704 	NamedLWLockTrancheRequests++;
705 }
706 
707 /*
708  * LWLockInitialize - initialize a new lwlock; it's initially unlocked
709  */
710 void
LWLockInitialize(LWLock * lock,int tranche_id)711 LWLockInitialize(LWLock *lock, int tranche_id)
712 {
713 	pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
714 #ifdef LOCK_DEBUG
715 	pg_atomic_init_u32(&lock->nwaiters, 0);
716 #endif
717 	lock->tranche = tranche_id;
718 	dlist_init(&lock->waiters);
719 }
720 
721 /*
722  * Report start of wait event for light-weight locks.
723  *
724  * This function will be used by all the light-weight lock calls which
725  * needs to wait to acquire the lock.  This function distinguishes wait
726  * event based on tranche and lock id.
727  */
728 static inline void
LWLockReportWaitStart(LWLock * lock)729 LWLockReportWaitStart(LWLock *lock)
730 {
731 	int			lockId = T_ID(lock);
732 
733 	if (lock->tranche == 0)
734 		pgstat_report_wait_start(WAIT_LWLOCK_NAMED, (uint16) lockId);
735 	else
736 		pgstat_report_wait_start(WAIT_LWLOCK_TRANCHE, lock->tranche);
737 }
738 
739 /*
740  * Report end of wait event for light-weight locks.
741  */
742 static inline void
LWLockReportWaitEnd(void)743 LWLockReportWaitEnd(void)
744 {
745 	pgstat_report_wait_end();
746 }
747 
748 /*
749  * Return an identifier for an LWLock based on the wait class and event.
750  */
751 const char *
GetLWLockIdentifier(uint8 classId,uint16 eventId)752 GetLWLockIdentifier(uint8 classId, uint16 eventId)
753 {
754 	if (classId == WAIT_LWLOCK_NAMED)
755 		return MainLWLockNames[eventId];
756 
757 	Assert(classId == WAIT_LWLOCK_TRANCHE);
758 
759 	/*
760 	 * It is quite possible that user has registered tranche in one of the
761 	 * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
762 	 * all of them, so we can't assume the tranche is registered here.
763 	 */
764 	if (eventId >= LWLockTranchesAllocated ||
765 		LWLockTrancheArray[eventId]->name == NULL)
766 		return "extension";
767 
768 	return LWLockTrancheArray[eventId]->name;
769 }
770 
771 /*
772  * Internal function that tries to atomically acquire the lwlock in the passed
773  * in mode.
774  *
775  * This function will not block waiting for a lock to become free - that's the
776  * callers job.
777  *
778  * Returns true if the lock isn't free and we need to wait.
779  */
780 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)781 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
782 {
783 	uint32		old_state;
784 
785 	AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
786 
787 	/*
788 	 * Read once outside the loop, later iterations will get the newer value
789 	 * via compare & exchange.
790 	 */
791 	old_state = pg_atomic_read_u32(&lock->state);
792 
793 	/* loop until we've determined whether we could acquire the lock or not */
794 	while (true)
795 	{
796 		uint32		desired_state;
797 		bool		lock_free;
798 
799 		desired_state = old_state;
800 
801 		if (mode == LW_EXCLUSIVE)
802 		{
803 			lock_free = (old_state & LW_LOCK_MASK) == 0;
804 			if (lock_free)
805 				desired_state += LW_VAL_EXCLUSIVE;
806 		}
807 		else
808 		{
809 			lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
810 			if (lock_free)
811 				desired_state += LW_VAL_SHARED;
812 		}
813 
814 		/*
815 		 * Attempt to swap in the state we are expecting. If we didn't see
816 		 * lock to be free, that's just the old value. If we saw it as free,
817 		 * we'll attempt to mark it acquired. The reason that we always swap
818 		 * in the value is that this doubles as a memory barrier. We could try
819 		 * to be smarter and only swap in values if we saw the lock as free,
820 		 * but benchmark haven't shown it as beneficial so far.
821 		 *
822 		 * Retry if the value changed since we last looked at it.
823 		 */
824 		if (pg_atomic_compare_exchange_u32(&lock->state,
825 										   &old_state, desired_state))
826 		{
827 			if (lock_free)
828 			{
829 				/* Great! Got the lock. */
830 #ifdef LOCK_DEBUG
831 				if (mode == LW_EXCLUSIVE)
832 					lock->owner = MyProc;
833 #endif
834 				return false;
835 			}
836 			else
837 				return true;	/* somebody else has the lock */
838 		}
839 	}
840 	pg_unreachable();
841 }
842 
843 /*
844  * Lock the LWLock's wait list against concurrent activity.
845  *
846  * NB: even though the wait list is locked, non-conflicting lock operations
847  * may still happen concurrently.
848  *
849  * Time spent holding mutex should be short!
850  */
851 static void
LWLockWaitListLock(LWLock * lock)852 LWLockWaitListLock(LWLock *lock)
853 {
854 	uint32		old_state;
855 #ifdef LWLOCK_STATS
856 	lwlock_stats *lwstats;
857 	uint32		delays = 0;
858 
859 	lwstats = get_lwlock_stats_entry(lock);
860 #endif
861 
862 	while (true)
863 	{
864 		/* always try once to acquire lock directly */
865 		old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
866 		if (!(old_state & LW_FLAG_LOCKED))
867 			break;				/* got lock */
868 
869 		/* and then spin without atomic operations until lock is released */
870 		{
871 			SpinDelayStatus delayStatus;
872 
873 			init_local_spin_delay(&delayStatus);
874 
875 			while (old_state & LW_FLAG_LOCKED)
876 			{
877 				perform_spin_delay(&delayStatus);
878 				old_state = pg_atomic_read_u32(&lock->state);
879 			}
880 #ifdef LWLOCK_STATS
881 			delays += delayStatus.delays;
882 #endif
883 			finish_spin_delay(&delayStatus);
884 		}
885 
886 		/*
887 		 * Retry. The lock might obviously already be re-acquired by the time
888 		 * we're attempting to get it again.
889 		 */
890 	}
891 
892 #ifdef LWLOCK_STATS
893 	lwstats->spin_delay_count += delays;
894 #endif
895 }
896 
897 /*
898  * Unlock the LWLock's wait list.
899  *
900  * Note that it can be more efficient to manipulate flags and release the
901  * locks in a single atomic operation.
902  */
903 static void
LWLockWaitListUnlock(LWLock * lock)904 LWLockWaitListUnlock(LWLock *lock)
905 {
906 	uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
907 
908 	old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
909 
910 	Assert(old_state & LW_FLAG_LOCKED);
911 }
912 
913 /*
914  * Wakeup all the lockers that currently have a chance to acquire the lock.
915  */
916 static void
LWLockWakeup(LWLock * lock)917 LWLockWakeup(LWLock *lock)
918 {
919 	bool		new_release_ok;
920 	bool		wokeup_somebody = false;
921 	dlist_head	wakeup;
922 	dlist_mutable_iter iter;
923 
924 	dlist_init(&wakeup);
925 
926 	new_release_ok = true;
927 
928 	/* lock wait list while collecting backends to wake up */
929 	LWLockWaitListLock(lock);
930 
931 	dlist_foreach_modify(iter, &lock->waiters)
932 	{
933 		PGPROC	   *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
934 
935 		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
936 			continue;
937 
938 		dlist_delete(&waiter->lwWaitLink);
939 		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
940 
941 		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
942 		{
943 			/*
944 			 * Prevent additional wakeups until retryer gets to run. Backends
945 			 * that are just waiting for the lock to become free don't retry
946 			 * automatically.
947 			 */
948 			new_release_ok = false;
949 
950 			/*
951 			 * Don't wakeup (further) exclusive locks.
952 			 */
953 			wokeup_somebody = true;
954 		}
955 
956 		/*
957 		 * Once we've woken up an exclusive lock, there's no point in waking
958 		 * up anybody else.
959 		 */
960 		if (waiter->lwWaitMode == LW_EXCLUSIVE)
961 			break;
962 	}
963 
964 	Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
965 
966 	/* unset required flags, and release lock, in one fell swoop */
967 	{
968 		uint32		old_state;
969 		uint32		desired_state;
970 
971 		old_state = pg_atomic_read_u32(&lock->state);
972 		while (true)
973 		{
974 			desired_state = old_state;
975 
976 			/* compute desired flags */
977 
978 			if (new_release_ok)
979 				desired_state |= LW_FLAG_RELEASE_OK;
980 			else
981 				desired_state &= ~LW_FLAG_RELEASE_OK;
982 
983 			if (dlist_is_empty(&wakeup))
984 				desired_state &= ~LW_FLAG_HAS_WAITERS;
985 
986 			desired_state &= ~LW_FLAG_LOCKED;	/* release lock */
987 
988 			if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
989 											   desired_state))
990 				break;
991 		}
992 	}
993 
994 	/* Awaken any waiters I removed from the queue. */
995 	dlist_foreach_modify(iter, &wakeup)
996 	{
997 		PGPROC	   *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
998 
999 		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1000 		dlist_delete(&waiter->lwWaitLink);
1001 
1002 		/*
1003 		 * Guarantee that lwWaiting being unset only becomes visible once the
1004 		 * unlink from the link has completed. Otherwise the target backend
1005 		 * could be woken up for other reason and enqueue for a new lock - if
1006 		 * that happens before the list unlink happens, the list would end up
1007 		 * being corrupted.
1008 		 *
1009 		 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1010 		 * another lock.
1011 		 */
1012 		pg_write_barrier();
1013 		waiter->lwWaiting = false;
1014 		PGSemaphoreUnlock(&waiter->sem);
1015 	}
1016 }
1017 
1018 /*
1019  * Add ourselves to the end of the queue.
1020  *
1021  * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1022  */
1023 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)1024 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1025 {
1026 	/*
1027 	 * If we don't have a PGPROC structure, there's no way to wait. This
1028 	 * should never occur, since MyProc should only be null during shared
1029 	 * memory initialization.
1030 	 */
1031 	if (MyProc == NULL)
1032 		elog(PANIC, "cannot wait without a PGPROC structure");
1033 
1034 	if (MyProc->lwWaiting)
1035 		elog(PANIC, "queueing for lock while waiting on another one");
1036 
1037 	LWLockWaitListLock(lock);
1038 
1039 	/* setting the flag is protected by the spinlock */
1040 	pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1041 
1042 	MyProc->lwWaiting = true;
1043 	MyProc->lwWaitMode = mode;
1044 
1045 	/* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1046 	if (mode == LW_WAIT_UNTIL_FREE)
1047 		dlist_push_head(&lock->waiters, &MyProc->lwWaitLink);
1048 	else
1049 		dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
1050 
1051 	/* Can release the mutex now */
1052 	LWLockWaitListUnlock(lock);
1053 
1054 #ifdef LOCK_DEBUG
1055 	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1056 #endif
1057 
1058 }
1059 
1060 /*
1061  * Remove ourselves from the waitlist.
1062  *
1063  * This is used if we queued ourselves because we thought we needed to sleep
1064  * but, after further checking, we discovered that we don't actually need to
1065  * do so.
1066  */
1067 static void
LWLockDequeueSelf(LWLock * lock)1068 LWLockDequeueSelf(LWLock *lock)
1069 {
1070 	bool		found = false;
1071 	dlist_mutable_iter iter;
1072 
1073 #ifdef LWLOCK_STATS
1074 	lwlock_stats *lwstats;
1075 
1076 	lwstats = get_lwlock_stats_entry(lock);
1077 
1078 	lwstats->dequeue_self_count++;
1079 #endif
1080 
1081 	LWLockWaitListLock(lock);
1082 
1083 	/*
1084 	 * Can't just remove ourselves from the list, but we need to iterate over
1085 	 * all entries as somebody else could have dequeued us.
1086 	 */
1087 	dlist_foreach_modify(iter, &lock->waiters)
1088 	{
1089 		PGPROC	   *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
1090 
1091 		if (proc == MyProc)
1092 		{
1093 			found = true;
1094 			dlist_delete(&proc->lwWaitLink);
1095 			break;
1096 		}
1097 	}
1098 
1099 	if (dlist_is_empty(&lock->waiters) &&
1100 		(pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1101 	{
1102 		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1103 	}
1104 
1105 	/* XXX: combine with fetch_and above? */
1106 	LWLockWaitListUnlock(lock);
1107 
1108 	/* clear waiting state again, nice for debugging */
1109 	if (found)
1110 		MyProc->lwWaiting = false;
1111 	else
1112 	{
1113 		int			extraWaits = 0;
1114 
1115 		/*
1116 		 * Somebody else dequeued us and has or will wake us up. Deal with the
1117 		 * superfluous absorption of a wakeup.
1118 		 */
1119 
1120 		/*
1121 		 * Reset releaseOk if somebody woke us before we removed ourselves -
1122 		 * they'll have set it to false.
1123 		 */
1124 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1125 
1126 		/*
1127 		 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1128 		 * get reset at some inconvenient point later. Most of the time this
1129 		 * will immediately return.
1130 		 */
1131 		for (;;)
1132 		{
1133 			PGSemaphoreLock(&MyProc->sem);
1134 			if (!MyProc->lwWaiting)
1135 				break;
1136 			extraWaits++;
1137 		}
1138 
1139 		/*
1140 		 * Fix the process wait semaphore's count for any absorbed wakeups.
1141 		 */
1142 		while (extraWaits-- > 0)
1143 			PGSemaphoreUnlock(&MyProc->sem);
1144 	}
1145 
1146 #ifdef LOCK_DEBUG
1147 	{
1148 		/* not waiting anymore */
1149 		uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1150 
1151 		Assert(nwaiters < MAX_BACKENDS);
1152 	}
1153 #endif
1154 }
1155 
1156 /*
1157  * LWLockAcquire - acquire a lightweight lock in the specified mode
1158  *
1159  * If the lock is not available, sleep until it is.  Returns true if the lock
1160  * was available immediately, false if we had to sleep.
1161  *
1162  * Side effect: cancel/die interrupts are held off until lock release.
1163  */
1164 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1165 LWLockAcquire(LWLock *lock, LWLockMode mode)
1166 {
1167 	PGPROC	   *proc = MyProc;
1168 	bool		result = true;
1169 	int			extraWaits = 0;
1170 #ifdef LWLOCK_STATS
1171 	lwlock_stats *lwstats;
1172 
1173 	lwstats = get_lwlock_stats_entry(lock);
1174 #endif
1175 
1176 	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1177 
1178 	PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1179 
1180 #ifdef LWLOCK_STATS
1181 	/* Count lock acquisition attempts */
1182 	if (mode == LW_EXCLUSIVE)
1183 		lwstats->ex_acquire_count++;
1184 	else
1185 		lwstats->sh_acquire_count++;
1186 #endif   /* LWLOCK_STATS */
1187 
1188 	/*
1189 	 * We can't wait if we haven't got a PGPROC.  This should only occur
1190 	 * during bootstrap or shared memory initialization.  Put an Assert here
1191 	 * to catch unsafe coding practices.
1192 	 */
1193 	Assert(!(proc == NULL && IsUnderPostmaster));
1194 
1195 	/* Ensure we will have room to remember the lock */
1196 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1197 		elog(ERROR, "too many LWLocks taken");
1198 
1199 	/*
1200 	 * Lock out cancel/die interrupts until we exit the code section protected
1201 	 * by the LWLock.  This ensures that interrupts will not interfere with
1202 	 * manipulations of data structures in shared memory.
1203 	 */
1204 	HOLD_INTERRUPTS();
1205 
1206 	/*
1207 	 * Loop here to try to acquire lock after each time we are signaled by
1208 	 * LWLockRelease.
1209 	 *
1210 	 * NOTE: it might seem better to have LWLockRelease actually grant us the
1211 	 * lock, rather than retrying and possibly having to go back to sleep. But
1212 	 * in practice that is no good because it means a process swap for every
1213 	 * lock acquisition when two or more processes are contending for the same
1214 	 * lock.  Since LWLocks are normally used to protect not-very-long
1215 	 * sections of computation, a process needs to be able to acquire and
1216 	 * release the same lock many times during a single CPU time slice, even
1217 	 * in the presence of contention.  The efficiency of being able to do that
1218 	 * outweighs the inefficiency of sometimes wasting a process dispatch
1219 	 * cycle because the lock is not free when a released waiter finally gets
1220 	 * to run.  See pgsql-hackers archives for 29-Dec-01.
1221 	 */
1222 	for (;;)
1223 	{
1224 		bool		mustwait;
1225 
1226 		/*
1227 		 * Try to grab the lock the first time, we're not in the waitqueue
1228 		 * yet/anymore.
1229 		 */
1230 		mustwait = LWLockAttemptLock(lock, mode);
1231 
1232 		if (!mustwait)
1233 		{
1234 			LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1235 			break;				/* got the lock */
1236 		}
1237 
1238 		/*
1239 		 * Ok, at this point we couldn't grab the lock on the first try. We
1240 		 * cannot simply queue ourselves to the end of the list and wait to be
1241 		 * woken up because by now the lock could long have been released.
1242 		 * Instead add us to the queue and try to grab the lock again. If we
1243 		 * succeed we need to revert the queuing and be happy, otherwise we
1244 		 * recheck the lock. If we still couldn't grab it, we know that the
1245 		 * other lock will see our queue entries when releasing since they
1246 		 * existed before we checked for the lock.
1247 		 */
1248 
1249 		/* add to the queue */
1250 		LWLockQueueSelf(lock, mode);
1251 
1252 		/* we're now guaranteed to be woken up if necessary */
1253 		mustwait = LWLockAttemptLock(lock, mode);
1254 
1255 		/* ok, grabbed the lock the second time round, need to undo queueing */
1256 		if (!mustwait)
1257 		{
1258 			LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1259 
1260 			LWLockDequeueSelf(lock);
1261 			break;
1262 		}
1263 
1264 		/*
1265 		 * Wait until awakened.
1266 		 *
1267 		 * It is possible that we get awakened for a reason other than being
1268 		 * signaled by LWLockRelease.  If so, loop back and wait again.  Once
1269 		 * we've gotten the LWLock, re-increment the sema by the number of
1270 		 * additional signals received.
1271 		 */
1272 		LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1273 
1274 #ifdef LWLOCK_STATS
1275 		lwstats->block_count++;
1276 #endif
1277 
1278 		LWLockReportWaitStart(lock);
1279 		TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1280 
1281 		for (;;)
1282 		{
1283 			PGSemaphoreLock(&proc->sem);
1284 			if (!proc->lwWaiting)
1285 				break;
1286 			extraWaits++;
1287 		}
1288 
1289 		/* Retrying, allow LWLockRelease to release waiters again. */
1290 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1291 
1292 #ifdef LOCK_DEBUG
1293 		{
1294 			/* not waiting anymore */
1295 			uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1296 
1297 			Assert(nwaiters < MAX_BACKENDS);
1298 		}
1299 #endif
1300 
1301 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1302 		LWLockReportWaitEnd();
1303 
1304 		LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1305 
1306 		/* Now loop back and try to acquire lock again. */
1307 		result = false;
1308 	}
1309 
1310 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
1311 
1312 	/* Add lock to list of locks held by this backend */
1313 	held_lwlocks[num_held_lwlocks].lock = lock;
1314 	held_lwlocks[num_held_lwlocks++].mode = mode;
1315 
1316 	/*
1317 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1318 	 */
1319 	while (extraWaits-- > 0)
1320 		PGSemaphoreUnlock(&proc->sem);
1321 
1322 	return result;
1323 }
1324 
1325 /*
1326  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1327  *
1328  * If the lock is not available, return FALSE with no side-effects.
1329  *
1330  * If successful, cancel/die interrupts are held off until lock release.
1331  */
1332 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1333 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1334 {
1335 	bool		mustwait;
1336 
1337 	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1338 
1339 	PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1340 
1341 	/* Ensure we will have room to remember the lock */
1342 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1343 		elog(ERROR, "too many LWLocks taken");
1344 
1345 	/*
1346 	 * Lock out cancel/die interrupts until we exit the code section protected
1347 	 * by the LWLock.  This ensures that interrupts will not interfere with
1348 	 * manipulations of data structures in shared memory.
1349 	 */
1350 	HOLD_INTERRUPTS();
1351 
1352 	/* Check for the lock */
1353 	mustwait = LWLockAttemptLock(lock, mode);
1354 
1355 	if (mustwait)
1356 	{
1357 		/* Failed to get lock, so release interrupt holdoff */
1358 		RESUME_INTERRUPTS();
1359 
1360 		LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1361 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode);
1362 	}
1363 	else
1364 	{
1365 		/* Add lock to list of locks held by this backend */
1366 		held_lwlocks[num_held_lwlocks].lock = lock;
1367 		held_lwlocks[num_held_lwlocks++].mode = mode;
1368 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode);
1369 	}
1370 	return !mustwait;
1371 }
1372 
1373 /*
1374  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1375  *
1376  * The semantics of this function are a bit funky.  If the lock is currently
1377  * free, it is acquired in the given mode, and the function returns true.  If
1378  * the lock isn't immediately free, the function waits until it is released
1379  * and returns false, but does not acquire the lock.
1380  *
1381  * This is currently used for WALWriteLock: when a backend flushes the WAL,
1382  * holding WALWriteLock, it can flush the commit records of many other
1383  * backends as a side-effect.  Those other backends need to wait until the
1384  * flush finishes, but don't need to acquire the lock anymore.  They can just
1385  * wake up, observe that their records have already been flushed, and return.
1386  */
1387 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1388 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1389 {
1390 	PGPROC	   *proc = MyProc;
1391 	bool		mustwait;
1392 	int			extraWaits = 0;
1393 #ifdef LWLOCK_STATS
1394 	lwlock_stats *lwstats;
1395 
1396 	lwstats = get_lwlock_stats_entry(lock);
1397 #endif
1398 
1399 	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1400 
1401 	PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1402 
1403 	/* Ensure we will have room to remember the lock */
1404 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1405 		elog(ERROR, "too many LWLocks taken");
1406 
1407 	/*
1408 	 * Lock out cancel/die interrupts until we exit the code section protected
1409 	 * by the LWLock.  This ensures that interrupts will not interfere with
1410 	 * manipulations of data structures in shared memory.
1411 	 */
1412 	HOLD_INTERRUPTS();
1413 
1414 	/*
1415 	 * NB: We're using nearly the same twice-in-a-row lock acquisition
1416 	 * protocol as LWLockAcquire(). Check its comments for details.
1417 	 */
1418 	mustwait = LWLockAttemptLock(lock, mode);
1419 
1420 	if (mustwait)
1421 	{
1422 		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1423 
1424 		mustwait = LWLockAttemptLock(lock, mode);
1425 
1426 		if (mustwait)
1427 		{
1428 			/*
1429 			 * Wait until awakened.  Like in LWLockAcquire, be prepared for
1430 			 * bogus wakeups.
1431 			 */
1432 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1433 
1434 #ifdef LWLOCK_STATS
1435 			lwstats->block_count++;
1436 #endif
1437 
1438 			LWLockReportWaitStart(lock);
1439 			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1440 
1441 			for (;;)
1442 			{
1443 				PGSemaphoreLock(&proc->sem);
1444 				if (!proc->lwWaiting)
1445 					break;
1446 				extraWaits++;
1447 			}
1448 
1449 #ifdef LOCK_DEBUG
1450 			{
1451 				/* not waiting anymore */
1452 				uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1453 
1454 				Assert(nwaiters < MAX_BACKENDS);
1455 			}
1456 #endif
1457 			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1458 			LWLockReportWaitEnd();
1459 
1460 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1461 		}
1462 		else
1463 		{
1464 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1465 
1466 			/*
1467 			 * Got lock in the second attempt, undo queueing. We need to treat
1468 			 * this as having successfully acquired the lock, otherwise we'd
1469 			 * not necessarily wake up people we've prevented from acquiring
1470 			 * the lock.
1471 			 */
1472 			LWLockDequeueSelf(lock);
1473 		}
1474 	}
1475 
1476 	/*
1477 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1478 	 */
1479 	while (extraWaits-- > 0)
1480 		PGSemaphoreUnlock(&proc->sem);
1481 
1482 	if (mustwait)
1483 	{
1484 		/* Failed to get lock, so release interrupt holdoff */
1485 		RESUME_INTERRUPTS();
1486 		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1487 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock),
1488 													 mode);
1489 	}
1490 	else
1491 	{
1492 		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1493 		/* Add lock to list of locks held by this backend */
1494 		held_lwlocks[num_held_lwlocks].lock = lock;
1495 		held_lwlocks[num_held_lwlocks++].mode = mode;
1496 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode);
1497 	}
1498 
1499 	return !mustwait;
1500 }
1501 
1502 /*
1503  * Does the lwlock in its current state need to wait for the variable value to
1504  * change?
1505  *
1506  * If we don't need to wait, and it's because the value of the variable has
1507  * changed, store the current value in newval.
1508  *
1509  * *result is set to true if the lock was free, and false otherwise.
1510  */
1511 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1512 LWLockConflictsWithVar(LWLock *lock,
1513 					   uint64 *valptr, uint64 oldval, uint64 *newval,
1514 					   bool *result)
1515 {
1516 	bool		mustwait;
1517 	uint64		value;
1518 
1519 	/*
1520 	 * Test first to see if it the slot is free right now.
1521 	 *
1522 	 * XXX: the caller uses a spinlock before this, so we don't need a memory
1523 	 * barrier here as far as the current usage is concerned.  But that might
1524 	 * not be safe in general.
1525 	 */
1526 	mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1527 
1528 	if (!mustwait)
1529 	{
1530 		*result = true;
1531 		return false;
1532 	}
1533 
1534 	*result = false;
1535 
1536 	/*
1537 	 * Read value using the lwlock's wait list lock, as we can't generally
1538 	 * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
1539 	 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1540 	 */
1541 	LWLockWaitListLock(lock);
1542 	value = *valptr;
1543 	LWLockWaitListUnlock(lock);
1544 
1545 	if (value != oldval)
1546 	{
1547 		mustwait = false;
1548 		*newval = value;
1549 	}
1550 	else
1551 	{
1552 		mustwait = true;
1553 	}
1554 
1555 	return mustwait;
1556 }
1557 
1558 /*
1559  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1560  *
1561  * If the lock is held and *valptr equals oldval, waits until the lock is
1562  * either freed, or the lock holder updates *valptr by calling
1563  * LWLockUpdateVar.  If the lock is free on exit (immediately or after
1564  * waiting), returns true.  If the lock is still held, but *valptr no longer
1565  * matches oldval, returns false and sets *newval to the current value in
1566  * *valptr.
1567  *
1568  * Note: this function ignores shared lock holders; if the lock is held
1569  * in shared mode, returns 'true'.
1570  */
1571 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1572 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1573 {
1574 	PGPROC	   *proc = MyProc;
1575 	int			extraWaits = 0;
1576 	bool		result = false;
1577 #ifdef LWLOCK_STATS
1578 	lwlock_stats *lwstats;
1579 
1580 	lwstats = get_lwlock_stats_entry(lock);
1581 #endif
1582 
1583 	PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1584 
1585 	/*
1586 	 * Lock out cancel/die interrupts while we sleep on the lock.  There is no
1587 	 * cleanup mechanism to remove us from the wait queue if we got
1588 	 * interrupted.
1589 	 */
1590 	HOLD_INTERRUPTS();
1591 
1592 	/*
1593 	 * Loop here to check the lock's status after each time we are signaled.
1594 	 */
1595 	for (;;)
1596 	{
1597 		bool		mustwait;
1598 
1599 		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1600 										  &result);
1601 
1602 		if (!mustwait)
1603 			break;				/* the lock was free or value didn't match */
1604 
1605 		/*
1606 		 * Add myself to wait queue. Note that this is racy, somebody else
1607 		 * could wakeup before we're finished queuing. NB: We're using nearly
1608 		 * the same twice-in-a-row lock acquisition protocol as
1609 		 * LWLockAcquire(). Check its comments for details. The only
1610 		 * difference is that we also have to check the variable's values when
1611 		 * checking the state of the lock.
1612 		 */
1613 		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1614 
1615 		/*
1616 		 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1617 		 * lock is released.
1618 		 */
1619 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1620 
1621 		/*
1622 		 * We're now guaranteed to be woken up if necessary. Recheck the lock
1623 		 * and variables state.
1624 		 */
1625 		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1626 										  &result);
1627 
1628 		/* Ok, no conflict after we queued ourselves. Undo queueing. */
1629 		if (!mustwait)
1630 		{
1631 			LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1632 
1633 			LWLockDequeueSelf(lock);
1634 			break;
1635 		}
1636 
1637 		/*
1638 		 * Wait until awakened.
1639 		 *
1640 		 * It is possible that we get awakened for a reason other than being
1641 		 * signaled by LWLockRelease.  If so, loop back and wait again.  Once
1642 		 * we've gotten the LWLock, re-increment the sema by the number of
1643 		 * additional signals received.
1644 		 */
1645 		LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1646 
1647 #ifdef LWLOCK_STATS
1648 		lwstats->block_count++;
1649 #endif
1650 
1651 		LWLockReportWaitStart(lock);
1652 		TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock),
1653 										   LW_EXCLUSIVE);
1654 
1655 		for (;;)
1656 		{
1657 			PGSemaphoreLock(&proc->sem);
1658 			if (!proc->lwWaiting)
1659 				break;
1660 			extraWaits++;
1661 		}
1662 
1663 #ifdef LOCK_DEBUG
1664 		{
1665 			/* not waiting anymore */
1666 			uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1667 
1668 			Assert(nwaiters < MAX_BACKENDS);
1669 		}
1670 #endif
1671 
1672 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock),
1673 										  LW_EXCLUSIVE);
1674 		LWLockReportWaitEnd();
1675 
1676 		LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1677 
1678 		/* Now loop back and check the status of the lock again. */
1679 	}
1680 
1681 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE);
1682 
1683 	/*
1684 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1685 	 */
1686 	while (extraWaits-- > 0)
1687 		PGSemaphoreUnlock(&proc->sem);
1688 
1689 	/*
1690 	 * Now okay to allow cancel/die interrupts.
1691 	 */
1692 	RESUME_INTERRUPTS();
1693 
1694 	return result;
1695 }
1696 
1697 
1698 /*
1699  * LWLockUpdateVar - Update a variable and wake up waiters atomically
1700  *
1701  * Sets *valptr to 'val', and wakes up all processes waiting for us with
1702  * LWLockWaitForVar().  Setting the value and waking up the processes happen
1703  * atomically so that any process calling LWLockWaitForVar() on the same lock
1704  * is guaranteed to see the new value, and act accordingly.
1705  *
1706  * The caller must be holding the lock in exclusive mode.
1707  */
1708 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1709 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1710 {
1711 	dlist_head	wakeup;
1712 	dlist_mutable_iter iter;
1713 
1714 	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1715 
1716 	dlist_init(&wakeup);
1717 
1718 	LWLockWaitListLock(lock);
1719 
1720 	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1721 
1722 	/* Update the lock's value */
1723 	*valptr = val;
1724 
1725 	/*
1726 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1727 	 * up. They are always in the front of the queue.
1728 	 */
1729 	dlist_foreach_modify(iter, &lock->waiters)
1730 	{
1731 		PGPROC	   *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1732 
1733 		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1734 			break;
1735 
1736 		dlist_delete(&waiter->lwWaitLink);
1737 		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
1738 	}
1739 
1740 	/* We are done updating shared state of the lock itself. */
1741 	LWLockWaitListUnlock(lock);
1742 
1743 	/*
1744 	 * Awaken any waiters I removed from the queue.
1745 	 */
1746 	dlist_foreach_modify(iter, &wakeup)
1747 	{
1748 		PGPROC	   *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1749 
1750 		dlist_delete(&waiter->lwWaitLink);
1751 		/* check comment in LWLockWakeup() about this barrier */
1752 		pg_write_barrier();
1753 		waiter->lwWaiting = false;
1754 		PGSemaphoreUnlock(&waiter->sem);
1755 	}
1756 }
1757 
1758 
1759 /*
1760  * LWLockRelease - release a previously acquired lock
1761  */
1762 void
LWLockRelease(LWLock * lock)1763 LWLockRelease(LWLock *lock)
1764 {
1765 	LWLockMode	mode;
1766 	uint32		oldstate;
1767 	bool		check_waiters;
1768 	int			i;
1769 
1770 	/*
1771 	 * Remove lock from list of locks held.  Usually, but not always, it will
1772 	 * be the latest-acquired lock; so search array backwards.
1773 	 */
1774 	for (i = num_held_lwlocks; --i >= 0;)
1775 		if (lock == held_lwlocks[i].lock)
1776 			break;
1777 
1778 	if (i < 0)
1779 		elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock));
1780 
1781 	mode = held_lwlocks[i].mode;
1782 
1783 	num_held_lwlocks--;
1784 	for (; i < num_held_lwlocks; i++)
1785 		held_lwlocks[i] = held_lwlocks[i + 1];
1786 
1787 	PRINT_LWDEBUG("LWLockRelease", lock, mode);
1788 
1789 	/*
1790 	 * Release my hold on lock, after that it can immediately be acquired by
1791 	 * others, even if we still have to wakeup other waiters.
1792 	 */
1793 	if (mode == LW_EXCLUSIVE)
1794 		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1795 	else
1796 		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1797 
1798 	/* nobody else can have that kind of lock */
1799 	Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1800 
1801 
1802 	/*
1803 	 * We're still waiting for backends to get scheduled, don't wake them up
1804 	 * again.
1805 	 */
1806 	if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1807 		(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1808 		(oldstate & LW_LOCK_MASK) == 0)
1809 		check_waiters = true;
1810 	else
1811 		check_waiters = false;
1812 
1813 	/*
1814 	 * As waking up waiters requires the spinlock to be acquired, only do so
1815 	 * if necessary.
1816 	 */
1817 	if (check_waiters)
1818 	{
1819 		/* XXX: remove before commit? */
1820 		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1821 		LWLockWakeup(lock);
1822 	}
1823 
1824 	TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
1825 
1826 	/*
1827 	 * Now okay to allow cancel/die interrupts.
1828 	 */
1829 	RESUME_INTERRUPTS();
1830 }
1831 
1832 /*
1833  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1834  */
1835 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1836 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1837 {
1838 	LWLockWaitListLock(lock);
1839 
1840 	/*
1841 	 * Set the variable's value before releasing the lock, that prevents race
1842 	 * a race condition wherein a new locker acquires the lock, but hasn't yet
1843 	 * set the variables value.
1844 	 */
1845 	*valptr = val;
1846 	LWLockWaitListUnlock(lock);
1847 
1848 	LWLockRelease(lock);
1849 }
1850 
1851 
1852 /*
1853  * LWLockReleaseAll - release all currently-held locks
1854  *
1855  * Used to clean up after ereport(ERROR). An important difference between this
1856  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1857  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
1858  * has been set to an appropriate level earlier in error recovery. We could
1859  * decrement it below zero if we allow it to drop for each released lock!
1860  */
1861 void
LWLockReleaseAll(void)1862 LWLockReleaseAll(void)
1863 {
1864 	while (num_held_lwlocks > 0)
1865 	{
1866 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
1867 
1868 		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1869 	}
1870 }
1871 
1872 
1873 /*
1874  * LWLockHeldByMe - test whether my process currently holds a lock
1875  *
1876  * This is meant as debug support only.  We currently do not distinguish
1877  * whether the lock is held shared or exclusive.
1878  */
1879 bool
LWLockHeldByMe(LWLock * l)1880 LWLockHeldByMe(LWLock *l)
1881 {
1882 	int			i;
1883 
1884 	for (i = 0; i < num_held_lwlocks; i++)
1885 	{
1886 		if (held_lwlocks[i].lock == l)
1887 			return true;
1888 	}
1889 	return false;
1890 }
1891