1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *	  Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  * In addition to exclusive and shared modes, lightweight locks can be used to
14  * wait until a variable changes value.  The variable is initially not set
15  * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16  * value it was set to when the lock was released last, and can be updated
17  * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
18  * waits for the variable to be updated, or until the lock is free.  When
19  * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20  * appropriate value for a free lock.  The meaning of the variable is up to
21  * the caller, the lightweight lock code just assigns and compares it.
22  *
23  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
24  * Portions Copyright (c) 1994, Regents of the University of California
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/lmgr/lwlock.c
28  *
29  * NOTES:
30  *
31  * This used to be a pretty straight forward reader-writer lock
32  * implementation, in which the internal state was protected by a
33  * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34  * too high for workloads/locks that were taken in shared mode very
35  * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36  * while trying to acquire a shared lock that was actually free.
37  *
38  * Thus a new implementation was devised that provides wait-free shared lock
39  * acquisition for locks that aren't exclusively locked.
40  *
41  * The basic idea is to have a single atomic variable 'lockcount' instead of
42  * the formerly separate shared and exclusive counters and to use atomic
43  * operations to acquire the lock. That's fairly easy to do for plain
44  * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45  * in the OS.
46  *
47  * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48  * variable. For exclusive lock we swap in a sentinel value
49  * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50  *
51  * To release the lock we use an atomic decrement to release the lock. If the
52  * new value is zero (we get that atomically), we know we can/have to release
53  * waiters.
54  *
55  * Obviously it is important that the sentinel value for exclusive locks
56  * doesn't conflict with the maximum number of possible share lockers -
57  * luckily MAX_BACKENDS makes that easily possible.
58  *
59  *
60  * The attentive reader might have noticed that naively doing the above has a
61  * glaring race condition: We try to lock using the atomic operations and
62  * notice that we have to wait. Unfortunately by the time we have finished
63  * queuing, the former locker very well might have already finished it's
64  * work. That's problematic because we're now stuck waiting inside the OS.
65 
66  * To mitigate those races we use a two phased attempt at locking:
67  *	 Phase 1: Try to do it atomically, if we succeed, nice
68  *	 Phase 2: Add ourselves to the waitqueue of the lock
69  *	 Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70  *			  the queue
71  *	 Phase 4: Sleep till wake-up, goto Phase 1
72  *
73  * This protects us against the problem from above as nobody can release too
74  *	  quick, before we're queued, since after Phase 2 we're already queued.
75  * -------------------------------------------------------------------------
76  */
77 #include "postgres.h"
78 
79 #include "miscadmin.h"
80 #include "pg_trace.h"
81 #include "pgstat.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90 
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94 
95 
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98 
99 #define LW_FLAG_HAS_WAITERS			((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK			((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED				((uint32) 1 << 28)
102 
103 #define LW_VAL_EXCLUSIVE			((uint32) 1 << 24)
104 #define LW_VAL_SHARED				1
105 
106 #define LW_LOCK_MASK				((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK				((uint32) ((1 << 24)-1))
109 
110 /*
111  * There are three sorts of LWLock "tranches":
112  *
113  * 1. The individually-named locks defined in lwlocknames.h each have their
114  * own tranche.  The names of these tranches appear in IndividualLWLockNames[]
115  * in lwlocknames.c.
116  *
117  * 2. There are some predefined tranches for built-in groups of locks.
118  * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
119  * appear in BuiltinTrancheNames[] below.
120  *
121  * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
122  * or LWLockRegisterTranche.  The names of these that are known in the current
123  * process appear in LWLockTrancheNames[].
124  *
125  * All these names are user-visible as wait event names, so choose with care
126  * ... and do not forget to update the documentation's list of wait events.
127  */
128 extern const char *const IndividualLWLockNames[];	/* in lwlocknames.c */
129 
130 static const char *const BuiltinTrancheNames[] = {
131 	/* LWTRANCHE_XACT_BUFFER: */
132 	"XactBuffer",
133 	/* LWTRANCHE_COMMITTS_BUFFER: */
134 	"CommitTSBuffer",
135 	/* LWTRANCHE_SUBTRANS_BUFFER: */
136 	"SubtransBuffer",
137 	/* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
138 	"MultiXactOffsetBuffer",
139 	/* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
140 	"MultiXactMemberBuffer",
141 	/* LWTRANCHE_NOTIFY_BUFFER: */
142 	"NotifyBuffer",
143 	/* LWTRANCHE_SERIAL_BUFFER: */
144 	"SerialBuffer",
145 	/* LWTRANCHE_WAL_INSERT: */
146 	"WALInsert",
147 	/* LWTRANCHE_BUFFER_CONTENT: */
148 	"BufferContent",
149 	/* LWTRANCHE_REPLICATION_ORIGIN_STATE: */
150 	"ReplicationOriginState",
151 	/* LWTRANCHE_REPLICATION_SLOT_IO: */
152 	"ReplicationSlotIO",
153 	/* LWTRANCHE_LOCK_FASTPATH: */
154 	"LockFastPath",
155 	/* LWTRANCHE_BUFFER_MAPPING: */
156 	"BufferMapping",
157 	/* LWTRANCHE_LOCK_MANAGER: */
158 	"LockManager",
159 	/* LWTRANCHE_PREDICATE_LOCK_MANAGER: */
160 	"PredicateLockManager",
161 	/* LWTRANCHE_PARALLEL_HASH_JOIN: */
162 	"ParallelHashJoin",
163 	/* LWTRANCHE_PARALLEL_QUERY_DSA: */
164 	"ParallelQueryDSA",
165 	/* LWTRANCHE_PER_SESSION_DSA: */
166 	"PerSessionDSA",
167 	/* LWTRANCHE_PER_SESSION_RECORD_TYPE: */
168 	"PerSessionRecordType",
169 	/* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */
170 	"PerSessionRecordTypmod",
171 	/* LWTRANCHE_SHARED_TUPLESTORE: */
172 	"SharedTupleStore",
173 	/* LWTRANCHE_SHARED_TIDBITMAP: */
174 	"SharedTidBitmap",
175 	/* LWTRANCHE_PARALLEL_APPEND: */
176 	"ParallelAppend",
177 	/* LWTRANCHE_PER_XACT_PREDICATE_LIST: */
178 	"PerXactPredicateList"
179 };
180 
181 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
182 				 LWTRANCHE_FIRST_USER_DEFINED - NUM_INDIVIDUAL_LWLOCKS,
183 				 "missing entries in BuiltinTrancheNames[]");
184 
185 /*
186  * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
187  * stores the names of all dynamically-created tranches known to the current
188  * process.  Any unused entries in the array will contain NULL.
189  */
190 static const char **LWLockTrancheNames = NULL;
191 static int	LWLockTrancheNamesAllocated = 0;
192 
193 /*
194  * This points to the main array of LWLocks in shared memory.  Backends inherit
195  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
196  * where we have special measures to pass it down).
197  */
198 LWLockPadded *MainLWLockArray = NULL;
199 
200 /*
201  * We use this structure to keep track of locked LWLocks for release
202  * during error recovery.  Normally, only a few will be held at once, but
203  * occasionally the number can be much higher; for example, the pg_buffercache
204  * extension locks all buffer partitions simultaneously.
205  */
206 #define MAX_SIMUL_LWLOCKS	200
207 
208 /* struct representing the LWLocks we're holding */
209 typedef struct LWLockHandle
210 {
211 	LWLock	   *lock;
212 	LWLockMode	mode;
213 } LWLockHandle;
214 
215 static int	num_held_lwlocks = 0;
216 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
217 
218 /* struct representing the LWLock tranche request for named tranche */
219 typedef struct NamedLWLockTrancheRequest
220 {
221 	char		tranche_name[NAMEDATALEN];
222 	int			num_lwlocks;
223 } NamedLWLockTrancheRequest;
224 
225 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
226 static int	NamedLWLockTrancheRequestsAllocated = 0;
227 
228 /*
229  * NamedLWLockTrancheRequests is both the valid length of the request array,
230  * and the length of the shared-memory NamedLWLockTrancheArray later on.
231  * This variable and NamedLWLockTrancheArray are non-static so that
232  * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
233  */
234 int			NamedLWLockTrancheRequests = 0;
235 
236 /* points to data in shared memory: */
237 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
238 
239 static bool lock_named_request_allowed = true;
240 
241 static void InitializeLWLocks(void);
242 static inline void LWLockReportWaitStart(LWLock *lock);
243 static inline void LWLockReportWaitEnd(void);
244 static const char *GetLWTrancheName(uint16 trancheId);
245 
246 #define T_NAME(lock) \
247 	GetLWTrancheName((lock)->tranche)
248 
249 #ifdef LWLOCK_STATS
250 typedef struct lwlock_stats_key
251 {
252 	int			tranche;
253 	void	   *instance;
254 }			lwlock_stats_key;
255 
256 typedef struct lwlock_stats
257 {
258 	lwlock_stats_key key;
259 	int			sh_acquire_count;
260 	int			ex_acquire_count;
261 	int			block_count;
262 	int			dequeue_self_count;
263 	int			spin_delay_count;
264 }			lwlock_stats;
265 
266 static HTAB *lwlock_stats_htab;
267 static lwlock_stats lwlock_stats_dummy;
268 #endif
269 
270 #ifdef LOCK_DEBUG
271 bool		Trace_lwlocks = false;
272 
273 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)274 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
275 {
276 	/* hide statement & context here, otherwise the log is just too verbose */
277 	if (Trace_lwlocks)
278 	{
279 		uint32		state = pg_atomic_read_u32(&lock->state);
280 
281 		ereport(LOG,
282 				(errhidestmt(true),
283 				 errhidecontext(true),
284 				 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
285 								 MyProcPid,
286 								 where, T_NAME(lock), lock,
287 								 (state & LW_VAL_EXCLUSIVE) != 0,
288 								 state & LW_SHARED_MASK,
289 								 (state & LW_FLAG_HAS_WAITERS) != 0,
290 								 pg_atomic_read_u32(&lock->nwaiters),
291 								 (state & LW_FLAG_RELEASE_OK) != 0)));
292 	}
293 }
294 
295 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)296 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
297 {
298 	/* hide statement & context here, otherwise the log is just too verbose */
299 	if (Trace_lwlocks)
300 	{
301 		ereport(LOG,
302 				(errhidestmt(true),
303 				 errhidecontext(true),
304 				 errmsg_internal("%s(%s %p): %s", where,
305 								 T_NAME(lock), lock, msg)));
306 	}
307 }
308 
309 #else							/* not LOCK_DEBUG */
310 #define PRINT_LWDEBUG(a,b,c) ((void)0)
311 #define LOG_LWDEBUG(a,b,c) ((void)0)
312 #endif							/* LOCK_DEBUG */
313 
314 #ifdef LWLOCK_STATS
315 
316 static void init_lwlock_stats(void);
317 static void print_lwlock_stats(int code, Datum arg);
318 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
319 
320 static void
init_lwlock_stats(void)321 init_lwlock_stats(void)
322 {
323 	HASHCTL		ctl;
324 	static MemoryContext lwlock_stats_cxt = NULL;
325 	static bool exit_registered = false;
326 
327 	if (lwlock_stats_cxt != NULL)
328 		MemoryContextDelete(lwlock_stats_cxt);
329 
330 	/*
331 	 * The LWLock stats will be updated within a critical section, which
332 	 * requires allocating new hash entries. Allocations within a critical
333 	 * section are normally not allowed because running out of memory would
334 	 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
335 	 * turned on in production, so that's an acceptable risk. The hash entries
336 	 * are small, so the risk of running out of memory is minimal in practice.
337 	 */
338 	lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
339 											 "LWLock stats",
340 											 ALLOCSET_DEFAULT_SIZES);
341 	MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
342 
343 	ctl.keysize = sizeof(lwlock_stats_key);
344 	ctl.entrysize = sizeof(lwlock_stats);
345 	ctl.hcxt = lwlock_stats_cxt;
346 	lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
347 									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
348 	if (!exit_registered)
349 	{
350 		on_shmem_exit(print_lwlock_stats, 0);
351 		exit_registered = true;
352 	}
353 }
354 
355 static void
print_lwlock_stats(int code,Datum arg)356 print_lwlock_stats(int code, Datum arg)
357 {
358 	HASH_SEQ_STATUS scan;
359 	lwlock_stats *lwstats;
360 
361 	hash_seq_init(&scan, lwlock_stats_htab);
362 
363 	/* Grab an LWLock to keep different backends from mixing reports */
364 	LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
365 
366 	while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
367 	{
368 		fprintf(stderr,
369 				"PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
370 				MyProcPid, GetLWTrancheName(lwstats->key.tranche),
371 				lwstats->key.instance, lwstats->sh_acquire_count,
372 				lwstats->ex_acquire_count, lwstats->block_count,
373 				lwstats->spin_delay_count, lwstats->dequeue_self_count);
374 	}
375 
376 	LWLockRelease(&MainLWLockArray[0].lock);
377 }
378 
379 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)380 get_lwlock_stats_entry(LWLock *lock)
381 {
382 	lwlock_stats_key key;
383 	lwlock_stats *lwstats;
384 	bool		found;
385 
386 	/*
387 	 * During shared memory initialization, the hash table doesn't exist yet.
388 	 * Stats of that phase aren't very interesting, so just collect operations
389 	 * on all locks in a single dummy entry.
390 	 */
391 	if (lwlock_stats_htab == NULL)
392 		return &lwlock_stats_dummy;
393 
394 	/* Fetch or create the entry. */
395 	MemSet(&key, 0, sizeof(key));
396 	key.tranche = lock->tranche;
397 	key.instance = lock;
398 	lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
399 	if (!found)
400 	{
401 		lwstats->sh_acquire_count = 0;
402 		lwstats->ex_acquire_count = 0;
403 		lwstats->block_count = 0;
404 		lwstats->dequeue_self_count = 0;
405 		lwstats->spin_delay_count = 0;
406 	}
407 	return lwstats;
408 }
409 #endif							/* LWLOCK_STATS */
410 
411 
412 /*
413  * Compute number of LWLocks required by named tranches.  These will be
414  * allocated in the main array.
415  */
416 static int
NumLWLocksForNamedTranches(void)417 NumLWLocksForNamedTranches(void)
418 {
419 	int			numLocks = 0;
420 	int			i;
421 
422 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
423 		numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
424 
425 	return numLocks;
426 }
427 
428 /*
429  * Compute shmem space needed for LWLocks and named tranches.
430  */
431 Size
LWLockShmemSize(void)432 LWLockShmemSize(void)
433 {
434 	Size		size;
435 	int			i;
436 	int			numLocks = NUM_FIXED_LWLOCKS;
437 
438 	/* Calculate total number of locks needed in the main array. */
439 	numLocks += NumLWLocksForNamedTranches();
440 
441 	/* Space for the LWLock array. */
442 	size = mul_size(numLocks, sizeof(LWLockPadded));
443 
444 	/* Space for dynamic allocation counter, plus room for alignment. */
445 	size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
446 
447 	/* space for named tranches. */
448 	size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
449 
450 	/* space for name of each tranche. */
451 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
452 		size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
453 
454 	/* Disallow adding any more named tranches. */
455 	lock_named_request_allowed = false;
456 
457 	return size;
458 }
459 
460 /*
461  * Allocate shmem space for the main LWLock array and all tranches and
462  * initialize it.  We also register extension LWLock tranches here.
463  */
464 void
CreateLWLocks(void)465 CreateLWLocks(void)
466 {
467 	StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
468 					 "MAX_BACKENDS too big for lwlock.c");
469 
470 	StaticAssertStmt(sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
471 					 "Miscalculated LWLock padding");
472 
473 	if (!IsUnderPostmaster)
474 	{
475 		Size		spaceLocks = LWLockShmemSize();
476 		int		   *LWLockCounter;
477 		char	   *ptr;
478 
479 		/* Allocate space */
480 		ptr = (char *) ShmemAlloc(spaceLocks);
481 
482 		/* Leave room for dynamic allocation of tranches */
483 		ptr += sizeof(int);
484 
485 		/* Ensure desired alignment of LWLock array */
486 		ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
487 
488 		MainLWLockArray = (LWLockPadded *) ptr;
489 
490 		/*
491 		 * Initialize the dynamic-allocation counter for tranches, which is
492 		 * stored just before the first LWLock.
493 		 */
494 		LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
495 		*LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
496 
497 		/* Initialize all LWLocks */
498 		InitializeLWLocks();
499 	}
500 
501 	/* Register named extension LWLock tranches in the current process. */
502 	for (int i = 0; i < NamedLWLockTrancheRequests; i++)
503 		LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
504 							  NamedLWLockTrancheArray[i].trancheName);
505 }
506 
507 /*
508  * Initialize LWLocks that are fixed and those belonging to named tranches.
509  */
510 static void
InitializeLWLocks(void)511 InitializeLWLocks(void)
512 {
513 	int			numNamedLocks = NumLWLocksForNamedTranches();
514 	int			id;
515 	int			i;
516 	int			j;
517 	LWLockPadded *lock;
518 
519 	/* Initialize all individual LWLocks in main array */
520 	for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
521 		LWLockInitialize(&lock->lock, id);
522 
523 	/* Initialize buffer mapping LWLocks in main array */
524 	lock = MainLWLockArray + BUFFER_MAPPING_LWLOCK_OFFSET;
525 	for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
526 		LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
527 
528 	/* Initialize lmgrs' LWLocks in main array */
529 	lock = MainLWLockArray + LOCK_MANAGER_LWLOCK_OFFSET;
530 	for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
531 		LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
532 
533 	/* Initialize predicate lmgrs' LWLocks in main array */
534 	lock = MainLWLockArray + PREDICATELOCK_MANAGER_LWLOCK_OFFSET;
535 	for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
536 		LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
537 
538 	/*
539 	 * Copy the info about any named tranches into shared memory (so that
540 	 * other processes can see it), and initialize the requested LWLocks.
541 	 */
542 	if (NamedLWLockTrancheRequests > 0)
543 	{
544 		char	   *trancheNames;
545 
546 		NamedLWLockTrancheArray = (NamedLWLockTranche *)
547 			&MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
548 
549 		trancheNames = (char *) NamedLWLockTrancheArray +
550 			(NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
551 		lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
552 
553 		for (i = 0; i < NamedLWLockTrancheRequests; i++)
554 		{
555 			NamedLWLockTrancheRequest *request;
556 			NamedLWLockTranche *tranche;
557 			char	   *name;
558 
559 			request = &NamedLWLockTrancheRequestArray[i];
560 			tranche = &NamedLWLockTrancheArray[i];
561 
562 			name = trancheNames;
563 			trancheNames += strlen(request->tranche_name) + 1;
564 			strcpy(name, request->tranche_name);
565 			tranche->trancheId = LWLockNewTrancheId();
566 			tranche->trancheName = name;
567 
568 			for (j = 0; j < request->num_lwlocks; j++, lock++)
569 				LWLockInitialize(&lock->lock, tranche->trancheId);
570 		}
571 	}
572 }
573 
574 /*
575  * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
576  */
577 void
InitLWLockAccess(void)578 InitLWLockAccess(void)
579 {
580 #ifdef LWLOCK_STATS
581 	init_lwlock_stats();
582 #endif
583 }
584 
585 /*
586  * GetNamedLWLockTranche - returns the base address of LWLock from the
587  *		specified tranche.
588  *
589  * Caller needs to retrieve the requested number of LWLocks starting from
590  * the base lock address returned by this API.  This can be used for
591  * tranches that are requested by using RequestNamedLWLockTranche() API.
592  */
593 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)594 GetNamedLWLockTranche(const char *tranche_name)
595 {
596 	int			lock_pos;
597 	int			i;
598 
599 	/*
600 	 * Obtain the position of base address of LWLock belonging to requested
601 	 * tranche_name in MainLWLockArray.  LWLocks for named tranches are placed
602 	 * in MainLWLockArray after fixed locks.
603 	 */
604 	lock_pos = NUM_FIXED_LWLOCKS;
605 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
606 	{
607 		if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
608 				   tranche_name) == 0)
609 			return &MainLWLockArray[lock_pos];
610 
611 		lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
612 	}
613 
614 	elog(ERROR, "requested tranche is not registered");
615 
616 	/* just to keep compiler quiet */
617 	return NULL;
618 }
619 
620 /*
621  * Allocate a new tranche ID.
622  */
623 int
LWLockNewTrancheId(void)624 LWLockNewTrancheId(void)
625 {
626 	int			result;
627 	int		   *LWLockCounter;
628 
629 	LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
630 	SpinLockAcquire(ShmemLock);
631 	result = (*LWLockCounter)++;
632 	SpinLockRelease(ShmemLock);
633 
634 	return result;
635 }
636 
637 /*
638  * Register a dynamic tranche name in the lookup table of the current process.
639  *
640  * This routine will save a pointer to the tranche name passed as an argument,
641  * so the name should be allocated in a backend-lifetime context
642  * (shared memory, TopMemoryContext, static constant, or similar).
643  *
644  * The tranche name will be user-visible as a wait event name, so try to
645  * use a name that fits the style for those.
646  */
647 void
LWLockRegisterTranche(int tranche_id,const char * tranche_name)648 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
649 {
650 	/* This should only be called for user-defined tranches. */
651 	if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED)
652 		return;
653 
654 	/* Convert to array index. */
655 	tranche_id -= LWTRANCHE_FIRST_USER_DEFINED;
656 
657 	/* If necessary, create or enlarge array. */
658 	if (tranche_id >= LWLockTrancheNamesAllocated)
659 	{
660 		int			newalloc;
661 
662 		newalloc = Max(LWLockTrancheNamesAllocated, 8);
663 		while (newalloc <= tranche_id)
664 			newalloc *= 2;
665 
666 		if (LWLockTrancheNames == NULL)
667 			LWLockTrancheNames = (const char **)
668 				MemoryContextAllocZero(TopMemoryContext,
669 									   newalloc * sizeof(char *));
670 		else
671 		{
672 			LWLockTrancheNames = (const char **)
673 				repalloc(LWLockTrancheNames, newalloc * sizeof(char *));
674 			memset(LWLockTrancheNames + LWLockTrancheNamesAllocated,
675 				   0,
676 				   (newalloc - LWLockTrancheNamesAllocated) * sizeof(char *));
677 		}
678 		LWLockTrancheNamesAllocated = newalloc;
679 	}
680 
681 	LWLockTrancheNames[tranche_id] = tranche_name;
682 }
683 
684 /*
685  * RequestNamedLWLockTranche
686  *		Request that extra LWLocks be allocated during postmaster
687  *		startup.
688  *
689  * This is only useful for extensions if called from the _PG_init hook
690  * of a library that is loaded into the postmaster via
691  * shared_preload_libraries.  Once shared memory has been allocated, calls
692  * will be ignored.  (We could raise an error, but it seems better to make
693  * it a no-op, so that libraries containing such calls can be reloaded if
694  * needed.)
695  *
696  * The tranche name will be user-visible as a wait event name, so try to
697  * use a name that fits the style for those.
698  */
699 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)700 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
701 {
702 	NamedLWLockTrancheRequest *request;
703 
704 	if (IsUnderPostmaster || !lock_named_request_allowed)
705 		return;					/* too late */
706 
707 	if (NamedLWLockTrancheRequestArray == NULL)
708 	{
709 		NamedLWLockTrancheRequestsAllocated = 16;
710 		NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
711 			MemoryContextAlloc(TopMemoryContext,
712 							   NamedLWLockTrancheRequestsAllocated
713 							   * sizeof(NamedLWLockTrancheRequest));
714 	}
715 
716 	if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
717 	{
718 		int			i = NamedLWLockTrancheRequestsAllocated;
719 
720 		while (i <= NamedLWLockTrancheRequests)
721 			i *= 2;
722 
723 		NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
724 			repalloc(NamedLWLockTrancheRequestArray,
725 					 i * sizeof(NamedLWLockTrancheRequest));
726 		NamedLWLockTrancheRequestsAllocated = i;
727 	}
728 
729 	request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
730 	Assert(strlen(tranche_name) + 1 <= NAMEDATALEN);
731 	strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
732 	request->num_lwlocks = num_lwlocks;
733 	NamedLWLockTrancheRequests++;
734 }
735 
736 /*
737  * LWLockInitialize - initialize a new lwlock; it's initially unlocked
738  */
739 void
LWLockInitialize(LWLock * lock,int tranche_id)740 LWLockInitialize(LWLock *lock, int tranche_id)
741 {
742 	pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
743 #ifdef LOCK_DEBUG
744 	pg_atomic_init_u32(&lock->nwaiters, 0);
745 #endif
746 	lock->tranche = tranche_id;
747 	proclist_init(&lock->waiters);
748 }
749 
750 /*
751  * Report start of wait event for light-weight locks.
752  *
753  * This function will be used by all the light-weight lock calls which
754  * needs to wait to acquire the lock.  This function distinguishes wait
755  * event based on tranche and lock id.
756  */
757 static inline void
LWLockReportWaitStart(LWLock * lock)758 LWLockReportWaitStart(LWLock *lock)
759 {
760 	pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
761 }
762 
763 /*
764  * Report end of wait event for light-weight locks.
765  */
766 static inline void
LWLockReportWaitEnd(void)767 LWLockReportWaitEnd(void)
768 {
769 	pgstat_report_wait_end();
770 }
771 
772 /*
773  * Return the name of an LWLock tranche.
774  */
775 static const char *
GetLWTrancheName(uint16 trancheId)776 GetLWTrancheName(uint16 trancheId)
777 {
778 	/* Individual LWLock? */
779 	if (trancheId < NUM_INDIVIDUAL_LWLOCKS)
780 		return IndividualLWLockNames[trancheId];
781 
782 	/* Built-in tranche? */
783 	if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
784 		return BuiltinTrancheNames[trancheId - NUM_INDIVIDUAL_LWLOCKS];
785 
786 	/*
787 	 * It's an extension tranche, so look in LWLockTrancheNames[].  However,
788 	 * it's possible that the tranche has never been registered in the current
789 	 * process, in which case give up and return "extension".
790 	 */
791 	trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
792 
793 	if (trancheId >= LWLockTrancheNamesAllocated ||
794 		LWLockTrancheNames[trancheId] == NULL)
795 		return "extension";
796 
797 	return LWLockTrancheNames[trancheId];
798 }
799 
800 /*
801  * Return an identifier for an LWLock based on the wait class and event.
802  */
803 const char *
GetLWLockIdentifier(uint32 classId,uint16 eventId)804 GetLWLockIdentifier(uint32 classId, uint16 eventId)
805 {
806 	Assert(classId == PG_WAIT_LWLOCK);
807 	/* The event IDs are just tranche numbers. */
808 	return GetLWTrancheName(eventId);
809 }
810 
811 /*
812  * Internal function that tries to atomically acquire the lwlock in the passed
813  * in mode.
814  *
815  * This function will not block waiting for a lock to become free - that's the
816  * callers job.
817  *
818  * Returns true if the lock isn't free and we need to wait.
819  */
820 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)821 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
822 {
823 	uint32		old_state;
824 
825 	AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
826 
827 	/*
828 	 * Read once outside the loop, later iterations will get the newer value
829 	 * via compare & exchange.
830 	 */
831 	old_state = pg_atomic_read_u32(&lock->state);
832 
833 	/* loop until we've determined whether we could acquire the lock or not */
834 	while (true)
835 	{
836 		uint32		desired_state;
837 		bool		lock_free;
838 
839 		desired_state = old_state;
840 
841 		if (mode == LW_EXCLUSIVE)
842 		{
843 			lock_free = (old_state & LW_LOCK_MASK) == 0;
844 			if (lock_free)
845 				desired_state += LW_VAL_EXCLUSIVE;
846 		}
847 		else
848 		{
849 			lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
850 			if (lock_free)
851 				desired_state += LW_VAL_SHARED;
852 		}
853 
854 		/*
855 		 * Attempt to swap in the state we are expecting. If we didn't see
856 		 * lock to be free, that's just the old value. If we saw it as free,
857 		 * we'll attempt to mark it acquired. The reason that we always swap
858 		 * in the value is that this doubles as a memory barrier. We could try
859 		 * to be smarter and only swap in values if we saw the lock as free,
860 		 * but benchmark haven't shown it as beneficial so far.
861 		 *
862 		 * Retry if the value changed since we last looked at it.
863 		 */
864 		if (pg_atomic_compare_exchange_u32(&lock->state,
865 										   &old_state, desired_state))
866 		{
867 			if (lock_free)
868 			{
869 				/* Great! Got the lock. */
870 #ifdef LOCK_DEBUG
871 				if (mode == LW_EXCLUSIVE)
872 					lock->owner = MyProc;
873 #endif
874 				return false;
875 			}
876 			else
877 				return true;	/* somebody else has the lock */
878 		}
879 	}
880 	pg_unreachable();
881 }
882 
883 /*
884  * Lock the LWLock's wait list against concurrent activity.
885  *
886  * NB: even though the wait list is locked, non-conflicting lock operations
887  * may still happen concurrently.
888  *
889  * Time spent holding mutex should be short!
890  */
891 static void
LWLockWaitListLock(LWLock * lock)892 LWLockWaitListLock(LWLock *lock)
893 {
894 	uint32		old_state;
895 #ifdef LWLOCK_STATS
896 	lwlock_stats *lwstats;
897 	uint32		delays = 0;
898 
899 	lwstats = get_lwlock_stats_entry(lock);
900 #endif
901 
902 	while (true)
903 	{
904 		/* always try once to acquire lock directly */
905 		old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
906 		if (!(old_state & LW_FLAG_LOCKED))
907 			break;				/* got lock */
908 
909 		/* and then spin without atomic operations until lock is released */
910 		{
911 			SpinDelayStatus delayStatus;
912 
913 			init_local_spin_delay(&delayStatus);
914 
915 			while (old_state & LW_FLAG_LOCKED)
916 			{
917 				perform_spin_delay(&delayStatus);
918 				old_state = pg_atomic_read_u32(&lock->state);
919 			}
920 #ifdef LWLOCK_STATS
921 			delays += delayStatus.delays;
922 #endif
923 			finish_spin_delay(&delayStatus);
924 		}
925 
926 		/*
927 		 * Retry. The lock might obviously already be re-acquired by the time
928 		 * we're attempting to get it again.
929 		 */
930 	}
931 
932 #ifdef LWLOCK_STATS
933 	lwstats->spin_delay_count += delays;
934 #endif
935 }
936 
937 /*
938  * Unlock the LWLock's wait list.
939  *
940  * Note that it can be more efficient to manipulate flags and release the
941  * locks in a single atomic operation.
942  */
943 static void
LWLockWaitListUnlock(LWLock * lock)944 LWLockWaitListUnlock(LWLock *lock)
945 {
946 	uint32		old_state PG_USED_FOR_ASSERTS_ONLY;
947 
948 	old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
949 
950 	Assert(old_state & LW_FLAG_LOCKED);
951 }
952 
953 /*
954  * Wakeup all the lockers that currently have a chance to acquire the lock.
955  */
956 static void
LWLockWakeup(LWLock * lock)957 LWLockWakeup(LWLock *lock)
958 {
959 	bool		new_release_ok;
960 	bool		wokeup_somebody = false;
961 	proclist_head wakeup;
962 	proclist_mutable_iter iter;
963 
964 	proclist_init(&wakeup);
965 
966 	new_release_ok = true;
967 
968 	/* lock wait list while collecting backends to wake up */
969 	LWLockWaitListLock(lock);
970 
971 	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
972 	{
973 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
974 
975 		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
976 			continue;
977 
978 		proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
979 		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
980 
981 		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
982 		{
983 			/*
984 			 * Prevent additional wakeups until retryer gets to run. Backends
985 			 * that are just waiting for the lock to become free don't retry
986 			 * automatically.
987 			 */
988 			new_release_ok = false;
989 
990 			/*
991 			 * Don't wakeup (further) exclusive locks.
992 			 */
993 			wokeup_somebody = true;
994 		}
995 
996 		/*
997 		 * Once we've woken up an exclusive lock, there's no point in waking
998 		 * up anybody else.
999 		 */
1000 		if (waiter->lwWaitMode == LW_EXCLUSIVE)
1001 			break;
1002 	}
1003 
1004 	Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
1005 
1006 	/* unset required flags, and release lock, in one fell swoop */
1007 	{
1008 		uint32		old_state;
1009 		uint32		desired_state;
1010 
1011 		old_state = pg_atomic_read_u32(&lock->state);
1012 		while (true)
1013 		{
1014 			desired_state = old_state;
1015 
1016 			/* compute desired flags */
1017 
1018 			if (new_release_ok)
1019 				desired_state |= LW_FLAG_RELEASE_OK;
1020 			else
1021 				desired_state &= ~LW_FLAG_RELEASE_OK;
1022 
1023 			if (proclist_is_empty(&wakeup))
1024 				desired_state &= ~LW_FLAG_HAS_WAITERS;
1025 
1026 			desired_state &= ~LW_FLAG_LOCKED;	/* release lock */
1027 
1028 			if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
1029 											   desired_state))
1030 				break;
1031 		}
1032 	}
1033 
1034 	/* Awaken any waiters I removed from the queue. */
1035 	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1036 	{
1037 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
1038 
1039 		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1040 		proclist_delete(&wakeup, iter.cur, lwWaitLink);
1041 
1042 		/*
1043 		 * Guarantee that lwWaiting being unset only becomes visible once the
1044 		 * unlink from the link has completed. Otherwise the target backend
1045 		 * could be woken up for other reason and enqueue for a new lock - if
1046 		 * that happens before the list unlink happens, the list would end up
1047 		 * being corrupted.
1048 		 *
1049 		 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1050 		 * another lock.
1051 		 */
1052 		pg_write_barrier();
1053 		waiter->lwWaiting = false;
1054 		PGSemaphoreUnlock(waiter->sem);
1055 	}
1056 }
1057 
1058 /*
1059  * Add ourselves to the end of the queue.
1060  *
1061  * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1062  */
1063 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)1064 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1065 {
1066 	/*
1067 	 * If we don't have a PGPROC structure, there's no way to wait. This
1068 	 * should never occur, since MyProc should only be null during shared
1069 	 * memory initialization.
1070 	 */
1071 	if (MyProc == NULL)
1072 		elog(PANIC, "cannot wait without a PGPROC structure");
1073 
1074 	if (MyProc->lwWaiting)
1075 		elog(PANIC, "queueing for lock while waiting on another one");
1076 
1077 	LWLockWaitListLock(lock);
1078 
1079 	/* setting the flag is protected by the spinlock */
1080 	pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1081 
1082 	MyProc->lwWaiting = true;
1083 	MyProc->lwWaitMode = mode;
1084 
1085 	/* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1086 	if (mode == LW_WAIT_UNTIL_FREE)
1087 		proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1088 	else
1089 		proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1090 
1091 	/* Can release the mutex now */
1092 	LWLockWaitListUnlock(lock);
1093 
1094 #ifdef LOCK_DEBUG
1095 	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1096 #endif
1097 
1098 }
1099 
1100 /*
1101  * Remove ourselves from the waitlist.
1102  *
1103  * This is used if we queued ourselves because we thought we needed to sleep
1104  * but, after further checking, we discovered that we don't actually need to
1105  * do so.
1106  */
1107 static void
LWLockDequeueSelf(LWLock * lock)1108 LWLockDequeueSelf(LWLock *lock)
1109 {
1110 	bool		found = false;
1111 	proclist_mutable_iter iter;
1112 
1113 #ifdef LWLOCK_STATS
1114 	lwlock_stats *lwstats;
1115 
1116 	lwstats = get_lwlock_stats_entry(lock);
1117 
1118 	lwstats->dequeue_self_count++;
1119 #endif
1120 
1121 	LWLockWaitListLock(lock);
1122 
1123 	/*
1124 	 * Can't just remove ourselves from the list, but we need to iterate over
1125 	 * all entries as somebody else could have dequeued us.
1126 	 */
1127 	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1128 	{
1129 		if (iter.cur == MyProc->pgprocno)
1130 		{
1131 			found = true;
1132 			proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1133 			break;
1134 		}
1135 	}
1136 
1137 	if (proclist_is_empty(&lock->waiters) &&
1138 		(pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1139 	{
1140 		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1141 	}
1142 
1143 	/* XXX: combine with fetch_and above? */
1144 	LWLockWaitListUnlock(lock);
1145 
1146 	/* clear waiting state again, nice for debugging */
1147 	if (found)
1148 		MyProc->lwWaiting = false;
1149 	else
1150 	{
1151 		int			extraWaits = 0;
1152 
1153 		/*
1154 		 * Somebody else dequeued us and has or will wake us up. Deal with the
1155 		 * superfluous absorption of a wakeup.
1156 		 */
1157 
1158 		/*
1159 		 * Reset RELEASE_OK flag if somebody woke us before we removed
1160 		 * ourselves - they'll have set it to false.
1161 		 */
1162 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1163 
1164 		/*
1165 		 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1166 		 * get reset at some inconvenient point later. Most of the time this
1167 		 * will immediately return.
1168 		 */
1169 		for (;;)
1170 		{
1171 			PGSemaphoreLock(MyProc->sem);
1172 			if (!MyProc->lwWaiting)
1173 				break;
1174 			extraWaits++;
1175 		}
1176 
1177 		/*
1178 		 * Fix the process wait semaphore's count for any absorbed wakeups.
1179 		 */
1180 		while (extraWaits-- > 0)
1181 			PGSemaphoreUnlock(MyProc->sem);
1182 	}
1183 
1184 #ifdef LOCK_DEBUG
1185 	{
1186 		/* not waiting anymore */
1187 		uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1188 
1189 		Assert(nwaiters < MAX_BACKENDS);
1190 	}
1191 #endif
1192 }
1193 
1194 /*
1195  * LWLockAcquire - acquire a lightweight lock in the specified mode
1196  *
1197  * If the lock is not available, sleep until it is.  Returns true if the lock
1198  * was available immediately, false if we had to sleep.
1199  *
1200  * Side effect: cancel/die interrupts are held off until lock release.
1201  */
1202 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1203 LWLockAcquire(LWLock *lock, LWLockMode mode)
1204 {
1205 	PGPROC	   *proc = MyProc;
1206 	bool		result = true;
1207 	int			extraWaits = 0;
1208 #ifdef LWLOCK_STATS
1209 	lwlock_stats *lwstats;
1210 
1211 	lwstats = get_lwlock_stats_entry(lock);
1212 #endif
1213 
1214 	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1215 
1216 	PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1217 
1218 #ifdef LWLOCK_STATS
1219 	/* Count lock acquisition attempts */
1220 	if (mode == LW_EXCLUSIVE)
1221 		lwstats->ex_acquire_count++;
1222 	else
1223 		lwstats->sh_acquire_count++;
1224 #endif							/* LWLOCK_STATS */
1225 
1226 	/*
1227 	 * We can't wait if we haven't got a PGPROC.  This should only occur
1228 	 * during bootstrap or shared memory initialization.  Put an Assert here
1229 	 * to catch unsafe coding practices.
1230 	 */
1231 	Assert(!(proc == NULL && IsUnderPostmaster));
1232 
1233 	/* Ensure we will have room to remember the lock */
1234 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1235 		elog(ERROR, "too many LWLocks taken");
1236 
1237 	/*
1238 	 * Lock out cancel/die interrupts until we exit the code section protected
1239 	 * by the LWLock.  This ensures that interrupts will not interfere with
1240 	 * manipulations of data structures in shared memory.
1241 	 */
1242 	HOLD_INTERRUPTS();
1243 
1244 	/*
1245 	 * Loop here to try to acquire lock after each time we are signaled by
1246 	 * LWLockRelease.
1247 	 *
1248 	 * NOTE: it might seem better to have LWLockRelease actually grant us the
1249 	 * lock, rather than retrying and possibly having to go back to sleep. But
1250 	 * in practice that is no good because it means a process swap for every
1251 	 * lock acquisition when two or more processes are contending for the same
1252 	 * lock.  Since LWLocks are normally used to protect not-very-long
1253 	 * sections of computation, a process needs to be able to acquire and
1254 	 * release the same lock many times during a single CPU time slice, even
1255 	 * in the presence of contention.  The efficiency of being able to do that
1256 	 * outweighs the inefficiency of sometimes wasting a process dispatch
1257 	 * cycle because the lock is not free when a released waiter finally gets
1258 	 * to run.  See pgsql-hackers archives for 29-Dec-01.
1259 	 */
1260 	for (;;)
1261 	{
1262 		bool		mustwait;
1263 
1264 		/*
1265 		 * Try to grab the lock the first time, we're not in the waitqueue
1266 		 * yet/anymore.
1267 		 */
1268 		mustwait = LWLockAttemptLock(lock, mode);
1269 
1270 		if (!mustwait)
1271 		{
1272 			LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1273 			break;				/* got the lock */
1274 		}
1275 
1276 		/*
1277 		 * Ok, at this point we couldn't grab the lock on the first try. We
1278 		 * cannot simply queue ourselves to the end of the list and wait to be
1279 		 * woken up because by now the lock could long have been released.
1280 		 * Instead add us to the queue and try to grab the lock again. If we
1281 		 * succeed we need to revert the queuing and be happy, otherwise we
1282 		 * recheck the lock. If we still couldn't grab it, we know that the
1283 		 * other locker will see our queue entries when releasing since they
1284 		 * existed before we checked for the lock.
1285 		 */
1286 
1287 		/* add to the queue */
1288 		LWLockQueueSelf(lock, mode);
1289 
1290 		/* we're now guaranteed to be woken up if necessary */
1291 		mustwait = LWLockAttemptLock(lock, mode);
1292 
1293 		/* ok, grabbed the lock the second time round, need to undo queueing */
1294 		if (!mustwait)
1295 		{
1296 			LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1297 
1298 			LWLockDequeueSelf(lock);
1299 			break;
1300 		}
1301 
1302 		/*
1303 		 * Wait until awakened.
1304 		 *
1305 		 * It is possible that we get awakened for a reason other than being
1306 		 * signaled by LWLockRelease.  If so, loop back and wait again.  Once
1307 		 * we've gotten the LWLock, re-increment the sema by the number of
1308 		 * additional signals received.
1309 		 */
1310 		LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1311 
1312 #ifdef LWLOCK_STATS
1313 		lwstats->block_count++;
1314 #endif
1315 
1316 		LWLockReportWaitStart(lock);
1317 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1318 			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1319 
1320 		for (;;)
1321 		{
1322 			PGSemaphoreLock(proc->sem);
1323 			if (!proc->lwWaiting)
1324 				break;
1325 			extraWaits++;
1326 		}
1327 
1328 		/* Retrying, allow LWLockRelease to release waiters again. */
1329 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1330 
1331 #ifdef LOCK_DEBUG
1332 		{
1333 			/* not waiting anymore */
1334 			uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1335 
1336 			Assert(nwaiters < MAX_BACKENDS);
1337 		}
1338 #endif
1339 
1340 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1341 			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1342 		LWLockReportWaitEnd();
1343 
1344 		LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1345 
1346 		/* Now loop back and try to acquire lock again. */
1347 		result = false;
1348 	}
1349 
1350 	if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1351 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1352 
1353 	/* Add lock to list of locks held by this backend */
1354 	held_lwlocks[num_held_lwlocks].lock = lock;
1355 	held_lwlocks[num_held_lwlocks++].mode = mode;
1356 
1357 	/*
1358 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1359 	 */
1360 	while (extraWaits-- > 0)
1361 		PGSemaphoreUnlock(proc->sem);
1362 
1363 	return result;
1364 }
1365 
1366 /*
1367  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1368  *
1369  * If the lock is not available, return false with no side-effects.
1370  *
1371  * If successful, cancel/die interrupts are held off until lock release.
1372  */
1373 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1374 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1375 {
1376 	bool		mustwait;
1377 
1378 	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1379 
1380 	PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1381 
1382 	/* Ensure we will have room to remember the lock */
1383 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1384 		elog(ERROR, "too many LWLocks taken");
1385 
1386 	/*
1387 	 * Lock out cancel/die interrupts until we exit the code section protected
1388 	 * by the LWLock.  This ensures that interrupts will not interfere with
1389 	 * manipulations of data structures in shared memory.
1390 	 */
1391 	HOLD_INTERRUPTS();
1392 
1393 	/* Check for the lock */
1394 	mustwait = LWLockAttemptLock(lock, mode);
1395 
1396 	if (mustwait)
1397 	{
1398 		/* Failed to get lock, so release interrupt holdoff */
1399 		RESUME_INTERRUPTS();
1400 
1401 		LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1402 		if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1403 			TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1404 	}
1405 	else
1406 	{
1407 		/* Add lock to list of locks held by this backend */
1408 		held_lwlocks[num_held_lwlocks].lock = lock;
1409 		held_lwlocks[num_held_lwlocks++].mode = mode;
1410 		if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1411 			TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1412 	}
1413 	return !mustwait;
1414 }
1415 
1416 /*
1417  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1418  *
1419  * The semantics of this function are a bit funky.  If the lock is currently
1420  * free, it is acquired in the given mode, and the function returns true.  If
1421  * the lock isn't immediately free, the function waits until it is released
1422  * and returns false, but does not acquire the lock.
1423  *
1424  * This is currently used for WALWriteLock: when a backend flushes the WAL,
1425  * holding WALWriteLock, it can flush the commit records of many other
1426  * backends as a side-effect.  Those other backends need to wait until the
1427  * flush finishes, but don't need to acquire the lock anymore.  They can just
1428  * wake up, observe that their records have already been flushed, and return.
1429  */
1430 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1431 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1432 {
1433 	PGPROC	   *proc = MyProc;
1434 	bool		mustwait;
1435 	int			extraWaits = 0;
1436 #ifdef LWLOCK_STATS
1437 	lwlock_stats *lwstats;
1438 
1439 	lwstats = get_lwlock_stats_entry(lock);
1440 #endif
1441 
1442 	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1443 
1444 	PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1445 
1446 	/* Ensure we will have room to remember the lock */
1447 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1448 		elog(ERROR, "too many LWLocks taken");
1449 
1450 	/*
1451 	 * Lock out cancel/die interrupts until we exit the code section protected
1452 	 * by the LWLock.  This ensures that interrupts will not interfere with
1453 	 * manipulations of data structures in shared memory.
1454 	 */
1455 	HOLD_INTERRUPTS();
1456 
1457 	/*
1458 	 * NB: We're using nearly the same twice-in-a-row lock acquisition
1459 	 * protocol as LWLockAcquire(). Check its comments for details.
1460 	 */
1461 	mustwait = LWLockAttemptLock(lock, mode);
1462 
1463 	if (mustwait)
1464 	{
1465 		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1466 
1467 		mustwait = LWLockAttemptLock(lock, mode);
1468 
1469 		if (mustwait)
1470 		{
1471 			/*
1472 			 * Wait until awakened.  Like in LWLockAcquire, be prepared for
1473 			 * bogus wakeups.
1474 			 */
1475 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1476 
1477 #ifdef LWLOCK_STATS
1478 			lwstats->block_count++;
1479 #endif
1480 
1481 			LWLockReportWaitStart(lock);
1482 			if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1483 				TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1484 
1485 			for (;;)
1486 			{
1487 				PGSemaphoreLock(proc->sem);
1488 				if (!proc->lwWaiting)
1489 					break;
1490 				extraWaits++;
1491 			}
1492 
1493 #ifdef LOCK_DEBUG
1494 			{
1495 				/* not waiting anymore */
1496 				uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1497 
1498 				Assert(nwaiters < MAX_BACKENDS);
1499 			}
1500 #endif
1501 			if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1502 				TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1503 			LWLockReportWaitEnd();
1504 
1505 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1506 		}
1507 		else
1508 		{
1509 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1510 
1511 			/*
1512 			 * Got lock in the second attempt, undo queueing. We need to treat
1513 			 * this as having successfully acquired the lock, otherwise we'd
1514 			 * not necessarily wake up people we've prevented from acquiring
1515 			 * the lock.
1516 			 */
1517 			LWLockDequeueSelf(lock);
1518 		}
1519 	}
1520 
1521 	/*
1522 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1523 	 */
1524 	while (extraWaits-- > 0)
1525 		PGSemaphoreUnlock(proc->sem);
1526 
1527 	if (mustwait)
1528 	{
1529 		/* Failed to get lock, so release interrupt holdoff */
1530 		RESUME_INTERRUPTS();
1531 		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1532 		if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1533 			TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1534 	}
1535 	else
1536 	{
1537 		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1538 		/* Add lock to list of locks held by this backend */
1539 		held_lwlocks[num_held_lwlocks].lock = lock;
1540 		held_lwlocks[num_held_lwlocks++].mode = mode;
1541 		if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1542 			TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1543 	}
1544 
1545 	return !mustwait;
1546 }
1547 
1548 /*
1549  * Does the lwlock in its current state need to wait for the variable value to
1550  * change?
1551  *
1552  * If we don't need to wait, and it's because the value of the variable has
1553  * changed, store the current value in newval.
1554  *
1555  * *result is set to true if the lock was free, and false otherwise.
1556  */
1557 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1558 LWLockConflictsWithVar(LWLock *lock,
1559 					   uint64 *valptr, uint64 oldval, uint64 *newval,
1560 					   bool *result)
1561 {
1562 	bool		mustwait;
1563 	uint64		value;
1564 
1565 	/*
1566 	 * Test first to see if it the slot is free right now.
1567 	 *
1568 	 * XXX: the caller uses a spinlock before this, so we don't need a memory
1569 	 * barrier here as far as the current usage is concerned.  But that might
1570 	 * not be safe in general.
1571 	 */
1572 	mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1573 
1574 	if (!mustwait)
1575 	{
1576 		*result = true;
1577 		return false;
1578 	}
1579 
1580 	*result = false;
1581 
1582 	/*
1583 	 * Read value using the lwlock's wait list lock, as we can't generally
1584 	 * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
1585 	 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1586 	 */
1587 	LWLockWaitListLock(lock);
1588 	value = *valptr;
1589 	LWLockWaitListUnlock(lock);
1590 
1591 	if (value != oldval)
1592 	{
1593 		mustwait = false;
1594 		*newval = value;
1595 	}
1596 	else
1597 	{
1598 		mustwait = true;
1599 	}
1600 
1601 	return mustwait;
1602 }
1603 
1604 /*
1605  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1606  *
1607  * If the lock is held and *valptr equals oldval, waits until the lock is
1608  * either freed, or the lock holder updates *valptr by calling
1609  * LWLockUpdateVar.  If the lock is free on exit (immediately or after
1610  * waiting), returns true.  If the lock is still held, but *valptr no longer
1611  * matches oldval, returns false and sets *newval to the current value in
1612  * *valptr.
1613  *
1614  * Note: this function ignores shared lock holders; if the lock is held
1615  * in shared mode, returns 'true'.
1616  */
1617 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1618 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1619 {
1620 	PGPROC	   *proc = MyProc;
1621 	int			extraWaits = 0;
1622 	bool		result = false;
1623 #ifdef LWLOCK_STATS
1624 	lwlock_stats *lwstats;
1625 
1626 	lwstats = get_lwlock_stats_entry(lock);
1627 #endif
1628 
1629 	PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1630 
1631 	/*
1632 	 * Lock out cancel/die interrupts while we sleep on the lock.  There is no
1633 	 * cleanup mechanism to remove us from the wait queue if we got
1634 	 * interrupted.
1635 	 */
1636 	HOLD_INTERRUPTS();
1637 
1638 	/*
1639 	 * Loop here to check the lock's status after each time we are signaled.
1640 	 */
1641 	for (;;)
1642 	{
1643 		bool		mustwait;
1644 
1645 		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1646 										  &result);
1647 
1648 		if (!mustwait)
1649 			break;				/* the lock was free or value didn't match */
1650 
1651 		/*
1652 		 * Add myself to wait queue. Note that this is racy, somebody else
1653 		 * could wakeup before we're finished queuing. NB: We're using nearly
1654 		 * the same twice-in-a-row lock acquisition protocol as
1655 		 * LWLockAcquire(). Check its comments for details. The only
1656 		 * difference is that we also have to check the variable's values when
1657 		 * checking the state of the lock.
1658 		 */
1659 		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1660 
1661 		/*
1662 		 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1663 		 * lock is released.
1664 		 */
1665 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1666 
1667 		/*
1668 		 * We're now guaranteed to be woken up if necessary. Recheck the lock
1669 		 * and variables state.
1670 		 */
1671 		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1672 										  &result);
1673 
1674 		/* Ok, no conflict after we queued ourselves. Undo queueing. */
1675 		if (!mustwait)
1676 		{
1677 			LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1678 
1679 			LWLockDequeueSelf(lock);
1680 			break;
1681 		}
1682 
1683 		/*
1684 		 * Wait until awakened.
1685 		 *
1686 		 * It is possible that we get awakened for a reason other than being
1687 		 * signaled by LWLockRelease.  If so, loop back and wait again.  Once
1688 		 * we've gotten the LWLock, re-increment the sema by the number of
1689 		 * additional signals received.
1690 		 */
1691 		LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1692 
1693 #ifdef LWLOCK_STATS
1694 		lwstats->block_count++;
1695 #endif
1696 
1697 		LWLockReportWaitStart(lock);
1698 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1699 			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1700 
1701 		for (;;)
1702 		{
1703 			PGSemaphoreLock(proc->sem);
1704 			if (!proc->lwWaiting)
1705 				break;
1706 			extraWaits++;
1707 		}
1708 
1709 #ifdef LOCK_DEBUG
1710 		{
1711 			/* not waiting anymore */
1712 			uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1713 
1714 			Assert(nwaiters < MAX_BACKENDS);
1715 		}
1716 #endif
1717 
1718 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1719 			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1720 		LWLockReportWaitEnd();
1721 
1722 		LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1723 
1724 		/* Now loop back and check the status of the lock again. */
1725 	}
1726 
1727 	/*
1728 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1729 	 */
1730 	while (extraWaits-- > 0)
1731 		PGSemaphoreUnlock(proc->sem);
1732 
1733 	/*
1734 	 * Now okay to allow cancel/die interrupts.
1735 	 */
1736 	RESUME_INTERRUPTS();
1737 
1738 	return result;
1739 }
1740 
1741 
1742 /*
1743  * LWLockUpdateVar - Update a variable and wake up waiters atomically
1744  *
1745  * Sets *valptr to 'val', and wakes up all processes waiting for us with
1746  * LWLockWaitForVar().  Setting the value and waking up the processes happen
1747  * atomically so that any process calling LWLockWaitForVar() on the same lock
1748  * is guaranteed to see the new value, and act accordingly.
1749  *
1750  * The caller must be holding the lock in exclusive mode.
1751  */
1752 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1753 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1754 {
1755 	proclist_head wakeup;
1756 	proclist_mutable_iter iter;
1757 
1758 	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1759 
1760 	proclist_init(&wakeup);
1761 
1762 	LWLockWaitListLock(lock);
1763 
1764 	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1765 
1766 	/* Update the lock's value */
1767 	*valptr = val;
1768 
1769 	/*
1770 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1771 	 * up. They are always in the front of the queue.
1772 	 */
1773 	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1774 	{
1775 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
1776 
1777 		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1778 			break;
1779 
1780 		proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1781 		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1782 	}
1783 
1784 	/* We are done updating shared state of the lock itself. */
1785 	LWLockWaitListUnlock(lock);
1786 
1787 	/*
1788 	 * Awaken any waiters I removed from the queue.
1789 	 */
1790 	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1791 	{
1792 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
1793 
1794 		proclist_delete(&wakeup, iter.cur, lwWaitLink);
1795 		/* check comment in LWLockWakeup() about this barrier */
1796 		pg_write_barrier();
1797 		waiter->lwWaiting = false;
1798 		PGSemaphoreUnlock(waiter->sem);
1799 	}
1800 }
1801 
1802 
1803 /*
1804  * LWLockRelease - release a previously acquired lock
1805  */
1806 void
LWLockRelease(LWLock * lock)1807 LWLockRelease(LWLock *lock)
1808 {
1809 	LWLockMode	mode;
1810 	uint32		oldstate;
1811 	bool		check_waiters;
1812 	int			i;
1813 
1814 	/*
1815 	 * Remove lock from list of locks held.  Usually, but not always, it will
1816 	 * be the latest-acquired lock; so search array backwards.
1817 	 */
1818 	for (i = num_held_lwlocks; --i >= 0;)
1819 		if (lock == held_lwlocks[i].lock)
1820 			break;
1821 
1822 	if (i < 0)
1823 		elog(ERROR, "lock %s is not held", T_NAME(lock));
1824 
1825 	mode = held_lwlocks[i].mode;
1826 
1827 	num_held_lwlocks--;
1828 	for (; i < num_held_lwlocks; i++)
1829 		held_lwlocks[i] = held_lwlocks[i + 1];
1830 
1831 	PRINT_LWDEBUG("LWLockRelease", lock, mode);
1832 
1833 	/*
1834 	 * Release my hold on lock, after that it can immediately be acquired by
1835 	 * others, even if we still have to wakeup other waiters.
1836 	 */
1837 	if (mode == LW_EXCLUSIVE)
1838 		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1839 	else
1840 		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1841 
1842 	/* nobody else can have that kind of lock */
1843 	Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1844 
1845 	if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1846 		TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1847 
1848 	/*
1849 	 * We're still waiting for backends to get scheduled, don't wake them up
1850 	 * again.
1851 	 */
1852 	if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1853 		(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1854 		(oldstate & LW_LOCK_MASK) == 0)
1855 		check_waiters = true;
1856 	else
1857 		check_waiters = false;
1858 
1859 	/*
1860 	 * As waking up waiters requires the spinlock to be acquired, only do so
1861 	 * if necessary.
1862 	 */
1863 	if (check_waiters)
1864 	{
1865 		/* XXX: remove before commit? */
1866 		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1867 		LWLockWakeup(lock);
1868 	}
1869 
1870 	/*
1871 	 * Now okay to allow cancel/die interrupts.
1872 	 */
1873 	RESUME_INTERRUPTS();
1874 }
1875 
1876 /*
1877  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1878  */
1879 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1880 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1881 {
1882 	LWLockWaitListLock(lock);
1883 
1884 	/*
1885 	 * Set the variable's value before releasing the lock, that prevents race
1886 	 * a race condition wherein a new locker acquires the lock, but hasn't yet
1887 	 * set the variables value.
1888 	 */
1889 	*valptr = val;
1890 	LWLockWaitListUnlock(lock);
1891 
1892 	LWLockRelease(lock);
1893 }
1894 
1895 
1896 /*
1897  * LWLockReleaseAll - release all currently-held locks
1898  *
1899  * Used to clean up after ereport(ERROR). An important difference between this
1900  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1901  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
1902  * has been set to an appropriate level earlier in error recovery. We could
1903  * decrement it below zero if we allow it to drop for each released lock!
1904  */
1905 void
LWLockReleaseAll(void)1906 LWLockReleaseAll(void)
1907 {
1908 	while (num_held_lwlocks > 0)
1909 	{
1910 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
1911 
1912 		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1913 	}
1914 }
1915 
1916 
1917 /*
1918  * LWLockHeldByMe - test whether my process holds a lock in any mode
1919  *
1920  * This is meant as debug support only.
1921  */
1922 bool
LWLockHeldByMe(LWLock * l)1923 LWLockHeldByMe(LWLock *l)
1924 {
1925 	int			i;
1926 
1927 	for (i = 0; i < num_held_lwlocks; i++)
1928 	{
1929 		if (held_lwlocks[i].lock == l)
1930 			return true;
1931 	}
1932 	return false;
1933 }
1934 
1935 /*
1936  * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1937  *
1938  * This is meant as debug support only.
1939  */
1940 bool
LWLockHeldByMeInMode(LWLock * l,LWLockMode mode)1941 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1942 {
1943 	int			i;
1944 
1945 	for (i = 0; i < num_held_lwlocks; i++)
1946 	{
1947 		if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1948 			return true;
1949 	}
1950 	return false;
1951 }
1952