1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *	  Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  * In addition to exclusive and shared modes, lightweight locks can be used to
14  * wait until a variable changes value.  The variable is initially not set
15  * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16  * value it was set to when the lock was released last, and can be updated
17  * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
18  * waits for the variable to be updated, or until the lock is free.  When
19  * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20  * appropriate value for a free lock.  The meaning of the variable is up to
21  * the caller, the lightweight lock code just assigns and compares it.
22  *
23  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
24  * Portions Copyright (c) 1994, Regents of the University of California
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/lmgr/lwlock.c
28  *
29  * NOTES:
30  *
31  * This used to be a pretty straight forward reader-writer lock
32  * implementation, in which the internal state was protected by a
33  * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34  * too high for workloads/locks that were taken in shared mode very
35  * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36  * while trying to acquire a shared lock that was actually free.
37  *
38  * Thus a new implementation was devised that provides wait-free shared lock
39  * acquisition for locks that aren't exclusively locked.
40  *
41  * The basic idea is to have a single atomic variable 'lockcount' instead of
42  * the formerly separate shared and exclusive counters and to use atomic
43  * operations to acquire the lock. That's fairly easy to do for plain
44  * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45  * in the OS.
46  *
47  * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48  * variable. For exclusive lock we swap in a sentinel value
49  * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50  *
51  * To release the lock we use an atomic decrement to release the lock. If the
52  * new value is zero (we get that atomically), we know we can/have to release
53  * waiters.
54  *
55  * Obviously it is important that the sentinel value for exclusive locks
56  * doesn't conflict with the maximum number of possible share lockers -
57  * luckily MAX_BACKENDS makes that easily possible.
58  *
59  *
60  * The attentive reader might have noticed that naively doing the above has a
61  * glaring race condition: We try to lock using the atomic operations and
62  * notice that we have to wait. Unfortunately by the time we have finished
63  * queuing, the former locker very well might have already finished it's
64  * work. That's problematic because we're now stuck waiting inside the OS.
65 
66  * To mitigate those races we use a two phased attempt at locking:
67  *	 Phase 1: Try to do it atomically, if we succeed, nice
68  *	 Phase 2: Add ourselves to the waitqueue of the lock
69  *	 Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70  *			  the queue
71  *	 Phase 4: Sleep till wake-up, goto Phase 1
72  *
73  * This protects us against the problem from above as nobody can release too
74  *	  quick, before we're queued, since after Phase 2 we're already queued.
75  * -------------------------------------------------------------------------
76  */
77 #include "postgres.h"
78 
79 #include "miscadmin.h"
80 #include "pg_trace.h"
81 #include "pgstat.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90 
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94 
95 
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98 
99 #define LW_FLAG_HAS_WAITERS			((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK			((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED				((uint32) 1 << 28)
102 
103 #define LW_VAL_EXCLUSIVE			((uint32) 1 << 24)
104 #define LW_VAL_SHARED				1
105 
106 #define LW_LOCK_MASK				((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK				((uint32) ((1 << 24)-1))
109 
110 /*
111  * There are three sorts of LWLock "tranches":
112  *
113  * 1. The individually-named locks defined in lwlocknames.h each have their
114  * own tranche.  The names of these tranches appear in IndividualLWLockNames[]
115  * in lwlocknames.c.
116  *
117  * 2. There are some predefined tranches for built-in groups of locks.
118  * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
119  * appear in BuiltinTrancheNames[] below.
120  *
121  * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
122  * or LWLockRegisterTranche.  The names of these that are known in the current
123  * process appear in LWLockTrancheNames[].
124  *
125  * All these names are user-visible as wait event names, so choose with care
126  * ... and do not forget to update the documentation's list of wait events.
127  */
128 extern const char *const IndividualLWLockNames[];	/* in lwlocknames.c */
129 
130 static const char *const BuiltinTrancheNames[] = {
131 	/* LWTRANCHE_XACT_BUFFER: */
132 	"XactBuffer",
133 	/* LWTRANCHE_COMMITTS_BUFFER: */
134 	"CommitTSBuffer",
135 	/* LWTRANCHE_SUBTRANS_BUFFER: */
136 	"SubtransBuffer",
137 	/* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
138 	"MultiXactOffsetBuffer",
139 	/* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
140 	"MultiXactMemberBuffer",
141 	/* LWTRANCHE_NOTIFY_BUFFER: */
142 	"NotifyBuffer",
143 	/* LWTRANCHE_SERIAL_BUFFER: */
144 	"SerialBuffer",
145 	/* LWTRANCHE_WAL_INSERT: */
146 	"WALInsert",
147 	/* LWTRANCHE_BUFFER_CONTENT: */
148 	"BufferContent",
149 	/* LWTRANCHE_BUFFER_IO: */
150 	"BufferIO",
151 	/* LWTRANCHE_REPLICATION_ORIGIN_STATE: */
152 	"ReplicationOriginState",
153 	/* LWTRANCHE_REPLICATION_SLOT_IO: */
154 	"ReplicationSlotIO",
155 	/* LWTRANCHE_LOCK_FASTPATH: */
156 	"LockFastPath",
157 	/* LWTRANCHE_BUFFER_MAPPING: */
158 	"BufferMapping",
159 	/* LWTRANCHE_LOCK_MANAGER: */
160 	"LockManager",
161 	/* LWTRANCHE_PREDICATE_LOCK_MANAGER: */
162 	"PredicateLockManager",
163 	/* LWTRANCHE_PARALLEL_HASH_JOIN: */
164 	"ParallelHashJoin",
165 	/* LWTRANCHE_PARALLEL_QUERY_DSA: */
166 	"ParallelQueryDSA",
167 	/* LWTRANCHE_PER_SESSION_DSA: */
168 	"PerSessionDSA",
169 	/* LWTRANCHE_PER_SESSION_RECORD_TYPE: */
170 	"PerSessionRecordType",
171 	/* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */
172 	"PerSessionRecordTypmod",
173 	/* LWTRANCHE_SHARED_TUPLESTORE: */
174 	"SharedTupleStore",
175 	/* LWTRANCHE_SHARED_TIDBITMAP: */
176 	"SharedTidBitmap",
177 	/* LWTRANCHE_PARALLEL_APPEND: */
178 	"ParallelAppend",
179 	/* LWTRANCHE_PER_XACT_PREDICATE_LIST: */
180 	"PerXactPredicateList"
181 };
182 
183 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
184 				 LWTRANCHE_FIRST_USER_DEFINED - NUM_INDIVIDUAL_LWLOCKS,
185 				 "missing entries in BuiltinTrancheNames[]");
186 
187 /*
188  * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
189  * stores the names of all dynamically-created tranches known to the current
190  * process.  Any unused entries in the array will contain NULL.
191  */
192 static const char **LWLockTrancheNames = NULL;
193 static int	LWLockTrancheNamesAllocated = 0;
194 
195 /*
196  * This points to the main array of LWLocks in shared memory.  Backends inherit
197  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
198  * where we have special measures to pass it down).
199  */
200 LWLockPadded *MainLWLockArray = NULL;
201 
202 /*
203  * We use this structure to keep track of locked LWLocks for release
204  * during error recovery.  Normally, only a few will be held at once, but
205  * occasionally the number can be much higher; for example, the pg_buffercache
206  * extension locks all buffer partitions simultaneously.
207  */
208 #define MAX_SIMUL_LWLOCKS	200
209 
210 /* struct representing the LWLocks we're holding */
211 typedef struct LWLockHandle
212 {
213 	LWLock	   *lock;
214 	LWLockMode	mode;
215 } LWLockHandle;
216 
217 static int	num_held_lwlocks = 0;
218 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
219 
220 /* struct representing the LWLock tranche request for named tranche */
221 typedef struct NamedLWLockTrancheRequest
222 {
223 	char		tranche_name[NAMEDATALEN];
224 	int			num_lwlocks;
225 } NamedLWLockTrancheRequest;
226 
227 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
228 static int	NamedLWLockTrancheRequestsAllocated = 0;
229 
230 /*
231  * NamedLWLockTrancheRequests is both the valid length of the request array,
232  * and the length of the shared-memory NamedLWLockTrancheArray later on.
233  * This variable and NamedLWLockTrancheArray are non-static so that
234  * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
235  */
236 int			NamedLWLockTrancheRequests = 0;
237 
238 /* points to data in shared memory: */
239 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
240 
241 static bool lock_named_request_allowed = true;
242 
243 static void InitializeLWLocks(void);
244 static inline void LWLockReportWaitStart(LWLock *lock);
245 static inline void LWLockReportWaitEnd(void);
246 static const char *GetLWTrancheName(uint16 trancheId);
247 
248 #define T_NAME(lock) \
249 	GetLWTrancheName((lock)->tranche)
250 
251 #ifdef LWLOCK_STATS
252 typedef struct lwlock_stats_key
253 {
254 	int			tranche;
255 	void	   *instance;
256 }			lwlock_stats_key;
257 
258 typedef struct lwlock_stats
259 {
260 	lwlock_stats_key key;
261 	int			sh_acquire_count;
262 	int			ex_acquire_count;
263 	int			block_count;
264 	int			dequeue_self_count;
265 	int			spin_delay_count;
266 }			lwlock_stats;
267 
268 static HTAB *lwlock_stats_htab;
269 static lwlock_stats lwlock_stats_dummy;
270 #endif
271 
272 #ifdef LOCK_DEBUG
273 bool		Trace_lwlocks = false;
274 
275 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)276 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
277 {
278 	/* hide statement & context here, otherwise the log is just too verbose */
279 	if (Trace_lwlocks)
280 	{
281 		uint32		state = pg_atomic_read_u32(&lock->state);
282 
283 		ereport(LOG,
284 				(errhidestmt(true),
285 				 errhidecontext(true),
286 				 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
287 								 MyProcPid,
288 								 where, T_NAME(lock), lock,
289 								 (state & LW_VAL_EXCLUSIVE) != 0,
290 								 state & LW_SHARED_MASK,
291 								 (state & LW_FLAG_HAS_WAITERS) != 0,
292 								 pg_atomic_read_u32(&lock->nwaiters),
293 								 (state & LW_FLAG_RELEASE_OK) != 0)));
294 	}
295 }
296 
297 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)298 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
299 {
300 	/* hide statement & context here, otherwise the log is just too verbose */
301 	if (Trace_lwlocks)
302 	{
303 		ereport(LOG,
304 				(errhidestmt(true),
305 				 errhidecontext(true),
306 				 errmsg_internal("%s(%s %p): %s", where,
307 								 T_NAME(lock), lock, msg)));
308 	}
309 }
310 
311 #else							/* not LOCK_DEBUG */
312 #define PRINT_LWDEBUG(a,b,c) ((void)0)
313 #define LOG_LWDEBUG(a,b,c) ((void)0)
314 #endif							/* LOCK_DEBUG */
315 
316 #ifdef LWLOCK_STATS
317 
318 static void init_lwlock_stats(void);
319 static void print_lwlock_stats(int code, Datum arg);
320 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
321 
322 static void
init_lwlock_stats(void)323 init_lwlock_stats(void)
324 {
325 	HASHCTL		ctl;
326 	static MemoryContext lwlock_stats_cxt = NULL;
327 	static bool exit_registered = false;
328 
329 	if (lwlock_stats_cxt != NULL)
330 		MemoryContextDelete(lwlock_stats_cxt);
331 
332 	/*
333 	 * The LWLock stats will be updated within a critical section, which
334 	 * requires allocating new hash entries. Allocations within a critical
335 	 * section are normally not allowed because running out of memory would
336 	 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
337 	 * turned on in production, so that's an acceptable risk. The hash entries
338 	 * are small, so the risk of running out of memory is minimal in practice.
339 	 */
340 	lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
341 											 "LWLock stats",
342 											 ALLOCSET_DEFAULT_SIZES);
343 	MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
344 
345 	MemSet(&ctl, 0, sizeof(ctl));
346 	ctl.keysize = sizeof(lwlock_stats_key);
347 	ctl.entrysize = sizeof(lwlock_stats);
348 	ctl.hcxt = lwlock_stats_cxt;
349 	lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
350 									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
351 	if (!exit_registered)
352 	{
353 		on_shmem_exit(print_lwlock_stats, 0);
354 		exit_registered = true;
355 	}
356 }
357 
358 static void
print_lwlock_stats(int code,Datum arg)359 print_lwlock_stats(int code, Datum arg)
360 {
361 	HASH_SEQ_STATUS scan;
362 	lwlock_stats *lwstats;
363 
364 	hash_seq_init(&scan, lwlock_stats_htab);
365 
366 	/* Grab an LWLock to keep different backends from mixing reports */
367 	LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
368 
369 	while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
370 	{
371 		fprintf(stderr,
372 				"PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
373 				MyProcPid, GetLWTrancheName(lwstats->key.tranche),
374 				lwstats->key.instance, lwstats->sh_acquire_count,
375 				lwstats->ex_acquire_count, lwstats->block_count,
376 				lwstats->spin_delay_count, lwstats->dequeue_self_count);
377 	}
378 
379 	LWLockRelease(&MainLWLockArray[0].lock);
380 }
381 
382 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)383 get_lwlock_stats_entry(LWLock *lock)
384 {
385 	lwlock_stats_key key;
386 	lwlock_stats *lwstats;
387 	bool		found;
388 
389 	/*
390 	 * During shared memory initialization, the hash table doesn't exist yet.
391 	 * Stats of that phase aren't very interesting, so just collect operations
392 	 * on all locks in a single dummy entry.
393 	 */
394 	if (lwlock_stats_htab == NULL)
395 		return &lwlock_stats_dummy;
396 
397 	/* Fetch or create the entry. */
398 	MemSet(&key, 0, sizeof(key));
399 	key.tranche = lock->tranche;
400 	key.instance = lock;
401 	lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
402 	if (!found)
403 	{
404 		lwstats->sh_acquire_count = 0;
405 		lwstats->ex_acquire_count = 0;
406 		lwstats->block_count = 0;
407 		lwstats->dequeue_self_count = 0;
408 		lwstats->spin_delay_count = 0;
409 	}
410 	return lwstats;
411 }
412 #endif							/* LWLOCK_STATS */
413 
414 
415 /*
416  * Compute number of LWLocks required by named tranches.  These will be
417  * allocated in the main array.
418  */
419 static int
NumLWLocksForNamedTranches(void)420 NumLWLocksForNamedTranches(void)
421 {
422 	int			numLocks = 0;
423 	int			i;
424 
425 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
426 		numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
427 
428 	return numLocks;
429 }
430 
431 /*
432  * Compute shmem space needed for LWLocks and named tranches.
433  */
434 Size
LWLockShmemSize(void)435 LWLockShmemSize(void)
436 {
437 	Size		size;
438 	int			i;
439 	int			numLocks = NUM_FIXED_LWLOCKS;
440 
441 	/* Calculate total number of locks needed in the main array. */
442 	numLocks += NumLWLocksForNamedTranches();
443 
444 	/* Space for the LWLock array. */
445 	size = mul_size(numLocks, sizeof(LWLockPadded));
446 
447 	/* Space for dynamic allocation counter, plus room for alignment. */
448 	size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
449 
450 	/* space for named tranches. */
451 	size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
452 
453 	/* space for name of each tranche. */
454 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
455 		size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
456 
457 	/* Disallow adding any more named tranches. */
458 	lock_named_request_allowed = false;
459 
460 	return size;
461 }
462 
463 /*
464  * Allocate shmem space for the main LWLock array and all tranches and
465  * initialize it.  We also register extension LWLock tranches here.
466  */
467 void
CreateLWLocks(void)468 CreateLWLocks(void)
469 {
470 	StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
471 					 "MAX_BACKENDS too big for lwlock.c");
472 
473 	StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
474 					 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
475 					 "Miscalculated LWLock padding");
476 
477 	if (!IsUnderPostmaster)
478 	{
479 		Size		spaceLocks = LWLockShmemSize();
480 		int		   *LWLockCounter;
481 		char	   *ptr;
482 
483 		/* Allocate space */
484 		ptr = (char *) ShmemAlloc(spaceLocks);
485 
486 		/* Leave room for dynamic allocation of tranches */
487 		ptr += sizeof(int);
488 
489 		/* Ensure desired alignment of LWLock array */
490 		ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
491 
492 		MainLWLockArray = (LWLockPadded *) ptr;
493 
494 		/*
495 		 * Initialize the dynamic-allocation counter for tranches, which is
496 		 * stored just before the first LWLock.
497 		 */
498 		LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
499 		*LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
500 
501 		/* Initialize all LWLocks */
502 		InitializeLWLocks();
503 	}
504 
505 	/* Register named extension LWLock tranches in the current process. */
506 	for (int i = 0; i < NamedLWLockTrancheRequests; i++)
507 		LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
508 							  NamedLWLockTrancheArray[i].trancheName);
509 }
510 
511 /*
512  * Initialize LWLocks that are fixed and those belonging to named tranches.
513  */
514 static void
InitializeLWLocks(void)515 InitializeLWLocks(void)
516 {
517 	int			numNamedLocks = NumLWLocksForNamedTranches();
518 	int			id;
519 	int			i;
520 	int			j;
521 	LWLockPadded *lock;
522 
523 	/* Initialize all individual LWLocks in main array */
524 	for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
525 		LWLockInitialize(&lock->lock, id);
526 
527 	/* Initialize buffer mapping LWLocks in main array */
528 	lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
529 	for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
530 		LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
531 
532 	/* Initialize lmgrs' LWLocks in main array */
533 	lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
534 	for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
535 		LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
536 
537 	/* Initialize predicate lmgrs' LWLocks in main array */
538 	lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
539 		NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
540 	for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
541 		LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
542 
543 	/*
544 	 * Copy the info about any named tranches into shared memory (so that
545 	 * other processes can see it), and initialize the requested LWLocks.
546 	 */
547 	if (NamedLWLockTrancheRequests > 0)
548 	{
549 		char	   *trancheNames;
550 
551 		NamedLWLockTrancheArray = (NamedLWLockTranche *)
552 			&MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
553 
554 		trancheNames = (char *) NamedLWLockTrancheArray +
555 			(NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
556 		lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
557 
558 		for (i = 0; i < NamedLWLockTrancheRequests; i++)
559 		{
560 			NamedLWLockTrancheRequest *request;
561 			NamedLWLockTranche *tranche;
562 			char	   *name;
563 
564 			request = &NamedLWLockTrancheRequestArray[i];
565 			tranche = &NamedLWLockTrancheArray[i];
566 
567 			name = trancheNames;
568 			trancheNames += strlen(request->tranche_name) + 1;
569 			strcpy(name, request->tranche_name);
570 			tranche->trancheId = LWLockNewTrancheId();
571 			tranche->trancheName = name;
572 
573 			for (j = 0; j < request->num_lwlocks; j++, lock++)
574 				LWLockInitialize(&lock->lock, tranche->trancheId);
575 		}
576 	}
577 }
578 
579 /*
580  * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
581  */
582 void
InitLWLockAccess(void)583 InitLWLockAccess(void)
584 {
585 #ifdef LWLOCK_STATS
586 	init_lwlock_stats();
587 #endif
588 }
589 
590 /*
591  * GetNamedLWLockTranche - returns the base address of LWLock from the
592  *		specified tranche.
593  *
594  * Caller needs to retrieve the requested number of LWLocks starting from
595  * the base lock address returned by this API.  This can be used for
596  * tranches that are requested by using RequestNamedLWLockTranche() API.
597  */
598 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)599 GetNamedLWLockTranche(const char *tranche_name)
600 {
601 	int			lock_pos;
602 	int			i;
603 
604 	/*
605 	 * Obtain the position of base address of LWLock belonging to requested
606 	 * tranche_name in MainLWLockArray.  LWLocks for named tranches are placed
607 	 * in MainLWLockArray after fixed locks.
608 	 */
609 	lock_pos = NUM_FIXED_LWLOCKS;
610 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
611 	{
612 		if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
613 				   tranche_name) == 0)
614 			return &MainLWLockArray[lock_pos];
615 
616 		lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
617 	}
618 
619 	elog(ERROR, "requested tranche is not registered");
620 
621 	/* just to keep compiler quiet */
622 	return NULL;
623 }
624 
625 /*
626  * Allocate a new tranche ID.
627  */
628 int
LWLockNewTrancheId(void)629 LWLockNewTrancheId(void)
630 {
631 	int			result;
632 	int		   *LWLockCounter;
633 
634 	LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
635 	SpinLockAcquire(ShmemLock);
636 	result = (*LWLockCounter)++;
637 	SpinLockRelease(ShmemLock);
638 
639 	return result;
640 }
641 
642 /*
643  * Register a dynamic tranche name in the lookup table of the current process.
644  *
645  * This routine will save a pointer to the tranche name passed as an argument,
646  * so the name should be allocated in a backend-lifetime context
647  * (shared memory, TopMemoryContext, static constant, or similar).
648  *
649  * The tranche name will be user-visible as a wait event name, so try to
650  * use a name that fits the style for those.
651  */
652 void
LWLockRegisterTranche(int tranche_id,const char * tranche_name)653 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
654 {
655 	/* This should only be called for user-defined tranches. */
656 	if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED)
657 		return;
658 
659 	/* Convert to array index. */
660 	tranche_id -= LWTRANCHE_FIRST_USER_DEFINED;
661 
662 	/* If necessary, create or enlarge array. */
663 	if (tranche_id >= LWLockTrancheNamesAllocated)
664 	{
665 		int			newalloc;
666 
667 		newalloc = Max(LWLockTrancheNamesAllocated, 8);
668 		while (newalloc <= tranche_id)
669 			newalloc *= 2;
670 
671 		if (LWLockTrancheNames == NULL)
672 			LWLockTrancheNames = (const char **)
673 				MemoryContextAllocZero(TopMemoryContext,
674 									   newalloc * sizeof(char *));
675 		else
676 		{
677 			LWLockTrancheNames = (const char **)
678 				repalloc(LWLockTrancheNames, newalloc * sizeof(char *));
679 			memset(LWLockTrancheNames + LWLockTrancheNamesAllocated,
680 				   0,
681 				   (newalloc - LWLockTrancheNamesAllocated) * sizeof(char *));
682 		}
683 		LWLockTrancheNamesAllocated = newalloc;
684 	}
685 
686 	LWLockTrancheNames[tranche_id] = tranche_name;
687 }
688 
689 /*
690  * RequestNamedLWLockTranche
691  *		Request that extra LWLocks be allocated during postmaster
692  *		startup.
693  *
694  * This is only useful for extensions if called from the _PG_init hook
695  * of a library that is loaded into the postmaster via
696  * shared_preload_libraries.  Once shared memory has been allocated, calls
697  * will be ignored.  (We could raise an error, but it seems better to make
698  * it a no-op, so that libraries containing such calls can be reloaded if
699  * needed.)
700  *
701  * The tranche name will be user-visible as a wait event name, so try to
702  * use a name that fits the style for those.
703  */
704 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)705 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
706 {
707 	NamedLWLockTrancheRequest *request;
708 
709 	if (IsUnderPostmaster || !lock_named_request_allowed)
710 		return;					/* too late */
711 
712 	if (NamedLWLockTrancheRequestArray == NULL)
713 	{
714 		NamedLWLockTrancheRequestsAllocated = 16;
715 		NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
716 			MemoryContextAlloc(TopMemoryContext,
717 							   NamedLWLockTrancheRequestsAllocated
718 							   * sizeof(NamedLWLockTrancheRequest));
719 	}
720 
721 	if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
722 	{
723 		int			i = NamedLWLockTrancheRequestsAllocated;
724 
725 		while (i <= NamedLWLockTrancheRequests)
726 			i *= 2;
727 
728 		NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
729 			repalloc(NamedLWLockTrancheRequestArray,
730 					 i * sizeof(NamedLWLockTrancheRequest));
731 		NamedLWLockTrancheRequestsAllocated = i;
732 	}
733 
734 	request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
735 	Assert(strlen(tranche_name) + 1 <= NAMEDATALEN);
736 	strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
737 	request->num_lwlocks = num_lwlocks;
738 	NamedLWLockTrancheRequests++;
739 }
740 
741 /*
742  * LWLockInitialize - initialize a new lwlock; it's initially unlocked
743  */
744 void
LWLockInitialize(LWLock * lock,int tranche_id)745 LWLockInitialize(LWLock *lock, int tranche_id)
746 {
747 	pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
748 #ifdef LOCK_DEBUG
749 	pg_atomic_init_u32(&lock->nwaiters, 0);
750 #endif
751 	lock->tranche = tranche_id;
752 	proclist_init(&lock->waiters);
753 }
754 
755 /*
756  * Report start of wait event for light-weight locks.
757  *
758  * This function will be used by all the light-weight lock calls which
759  * needs to wait to acquire the lock.  This function distinguishes wait
760  * event based on tranche and lock id.
761  */
762 static inline void
LWLockReportWaitStart(LWLock * lock)763 LWLockReportWaitStart(LWLock *lock)
764 {
765 	pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
766 }
767 
768 /*
769  * Report end of wait event for light-weight locks.
770  */
771 static inline void
LWLockReportWaitEnd(void)772 LWLockReportWaitEnd(void)
773 {
774 	pgstat_report_wait_end();
775 }
776 
777 /*
778  * Return the name of an LWLock tranche.
779  */
780 static const char *
GetLWTrancheName(uint16 trancheId)781 GetLWTrancheName(uint16 trancheId)
782 {
783 	/* Individual LWLock? */
784 	if (trancheId < NUM_INDIVIDUAL_LWLOCKS)
785 		return IndividualLWLockNames[trancheId];
786 
787 	/* Built-in tranche? */
788 	if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
789 		return BuiltinTrancheNames[trancheId - NUM_INDIVIDUAL_LWLOCKS];
790 
791 	/*
792 	 * It's an extension tranche, so look in LWLockTrancheNames[].  However,
793 	 * it's possible that the tranche has never been registered in the current
794 	 * process, in which case give up and return "extension".
795 	 */
796 	trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
797 
798 	if (trancheId >= LWLockTrancheNamesAllocated ||
799 		LWLockTrancheNames[trancheId] == NULL)
800 		return "extension";
801 
802 	return LWLockTrancheNames[trancheId];
803 }
804 
805 /*
806  * Return an identifier for an LWLock based on the wait class and event.
807  */
808 const char *
GetLWLockIdentifier(uint32 classId,uint16 eventId)809 GetLWLockIdentifier(uint32 classId, uint16 eventId)
810 {
811 	Assert(classId == PG_WAIT_LWLOCK);
812 	/* The event IDs are just tranche numbers. */
813 	return GetLWTrancheName(eventId);
814 }
815 
816 /*
817  * Internal function that tries to atomically acquire the lwlock in the passed
818  * in mode.
819  *
820  * This function will not block waiting for a lock to become free - that's the
821  * callers job.
822  *
823  * Returns true if the lock isn't free and we need to wait.
824  */
825 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)826 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
827 {
828 	uint32		old_state;
829 
830 	AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
831 
832 	/*
833 	 * Read once outside the loop, later iterations will get the newer value
834 	 * via compare & exchange.
835 	 */
836 	old_state = pg_atomic_read_u32(&lock->state);
837 
838 	/* loop until we've determined whether we could acquire the lock or not */
839 	while (true)
840 	{
841 		uint32		desired_state;
842 		bool		lock_free;
843 
844 		desired_state = old_state;
845 
846 		if (mode == LW_EXCLUSIVE)
847 		{
848 			lock_free = (old_state & LW_LOCK_MASK) == 0;
849 			if (lock_free)
850 				desired_state += LW_VAL_EXCLUSIVE;
851 		}
852 		else
853 		{
854 			lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
855 			if (lock_free)
856 				desired_state += LW_VAL_SHARED;
857 		}
858 
859 		/*
860 		 * Attempt to swap in the state we are expecting. If we didn't see
861 		 * lock to be free, that's just the old value. If we saw it as free,
862 		 * we'll attempt to mark it acquired. The reason that we always swap
863 		 * in the value is that this doubles as a memory barrier. We could try
864 		 * to be smarter and only swap in values if we saw the lock as free,
865 		 * but benchmark haven't shown it as beneficial so far.
866 		 *
867 		 * Retry if the value changed since we last looked at it.
868 		 */
869 		if (pg_atomic_compare_exchange_u32(&lock->state,
870 										   &old_state, desired_state))
871 		{
872 			if (lock_free)
873 			{
874 				/* Great! Got the lock. */
875 #ifdef LOCK_DEBUG
876 				if (mode == LW_EXCLUSIVE)
877 					lock->owner = MyProc;
878 #endif
879 				return false;
880 			}
881 			else
882 				return true;	/* somebody else has the lock */
883 		}
884 	}
885 	pg_unreachable();
886 }
887 
888 /*
889  * Lock the LWLock's wait list against concurrent activity.
890  *
891  * NB: even though the wait list is locked, non-conflicting lock operations
892  * may still happen concurrently.
893  *
894  * Time spent holding mutex should be short!
895  */
896 static void
LWLockWaitListLock(LWLock * lock)897 LWLockWaitListLock(LWLock *lock)
898 {
899 	uint32		old_state;
900 #ifdef LWLOCK_STATS
901 	lwlock_stats *lwstats;
902 	uint32		delays = 0;
903 
904 	lwstats = get_lwlock_stats_entry(lock);
905 #endif
906 
907 	while (true)
908 	{
909 		/* always try once to acquire lock directly */
910 		old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
911 		if (!(old_state & LW_FLAG_LOCKED))
912 			break;				/* got lock */
913 
914 		/* and then spin without atomic operations until lock is released */
915 		{
916 			SpinDelayStatus delayStatus;
917 
918 			init_local_spin_delay(&delayStatus);
919 
920 			while (old_state & LW_FLAG_LOCKED)
921 			{
922 				perform_spin_delay(&delayStatus);
923 				old_state = pg_atomic_read_u32(&lock->state);
924 			}
925 #ifdef LWLOCK_STATS
926 			delays += delayStatus.delays;
927 #endif
928 			finish_spin_delay(&delayStatus);
929 		}
930 
931 		/*
932 		 * Retry. The lock might obviously already be re-acquired by the time
933 		 * we're attempting to get it again.
934 		 */
935 	}
936 
937 #ifdef LWLOCK_STATS
938 	lwstats->spin_delay_count += delays;
939 #endif
940 }
941 
942 /*
943  * Unlock the LWLock's wait list.
944  *
945  * Note that it can be more efficient to manipulate flags and release the
946  * locks in a single atomic operation.
947  */
948 static void
LWLockWaitListUnlock(LWLock * lock)949 LWLockWaitListUnlock(LWLock *lock)
950 {
951 	uint32		old_state PG_USED_FOR_ASSERTS_ONLY;
952 
953 	old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
954 
955 	Assert(old_state & LW_FLAG_LOCKED);
956 }
957 
958 /*
959  * Wakeup all the lockers that currently have a chance to acquire the lock.
960  */
961 static void
LWLockWakeup(LWLock * lock)962 LWLockWakeup(LWLock *lock)
963 {
964 	bool		new_release_ok;
965 	bool		wokeup_somebody = false;
966 	proclist_head wakeup;
967 	proclist_mutable_iter iter;
968 
969 	proclist_init(&wakeup);
970 
971 	new_release_ok = true;
972 
973 	/* lock wait list while collecting backends to wake up */
974 	LWLockWaitListLock(lock);
975 
976 	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
977 	{
978 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
979 
980 		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
981 			continue;
982 
983 		proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
984 		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
985 
986 		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
987 		{
988 			/*
989 			 * Prevent additional wakeups until retryer gets to run. Backends
990 			 * that are just waiting for the lock to become free don't retry
991 			 * automatically.
992 			 */
993 			new_release_ok = false;
994 
995 			/*
996 			 * Don't wakeup (further) exclusive locks.
997 			 */
998 			wokeup_somebody = true;
999 		}
1000 
1001 		/*
1002 		 * Once we've woken up an exclusive lock, there's no point in waking
1003 		 * up anybody else.
1004 		 */
1005 		if (waiter->lwWaitMode == LW_EXCLUSIVE)
1006 			break;
1007 	}
1008 
1009 	Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
1010 
1011 	/* unset required flags, and release lock, in one fell swoop */
1012 	{
1013 		uint32		old_state;
1014 		uint32		desired_state;
1015 
1016 		old_state = pg_atomic_read_u32(&lock->state);
1017 		while (true)
1018 		{
1019 			desired_state = old_state;
1020 
1021 			/* compute desired flags */
1022 
1023 			if (new_release_ok)
1024 				desired_state |= LW_FLAG_RELEASE_OK;
1025 			else
1026 				desired_state &= ~LW_FLAG_RELEASE_OK;
1027 
1028 			if (proclist_is_empty(&wakeup))
1029 				desired_state &= ~LW_FLAG_HAS_WAITERS;
1030 
1031 			desired_state &= ~LW_FLAG_LOCKED;	/* release lock */
1032 
1033 			if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
1034 											   desired_state))
1035 				break;
1036 		}
1037 	}
1038 
1039 	/* Awaken any waiters I removed from the queue. */
1040 	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1041 	{
1042 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
1043 
1044 		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1045 		proclist_delete(&wakeup, iter.cur, lwWaitLink);
1046 
1047 		/*
1048 		 * Guarantee that lwWaiting being unset only becomes visible once the
1049 		 * unlink from the link has completed. Otherwise the target backend
1050 		 * could be woken up for other reason and enqueue for a new lock - if
1051 		 * that happens before the list unlink happens, the list would end up
1052 		 * being corrupted.
1053 		 *
1054 		 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1055 		 * another lock.
1056 		 */
1057 		pg_write_barrier();
1058 		waiter->lwWaiting = false;
1059 		PGSemaphoreUnlock(waiter->sem);
1060 	}
1061 }
1062 
1063 /*
1064  * Add ourselves to the end of the queue.
1065  *
1066  * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1067  */
1068 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)1069 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1070 {
1071 	/*
1072 	 * If we don't have a PGPROC structure, there's no way to wait. This
1073 	 * should never occur, since MyProc should only be null during shared
1074 	 * memory initialization.
1075 	 */
1076 	if (MyProc == NULL)
1077 		elog(PANIC, "cannot wait without a PGPROC structure");
1078 
1079 	if (MyProc->lwWaiting)
1080 		elog(PANIC, "queueing for lock while waiting on another one");
1081 
1082 	LWLockWaitListLock(lock);
1083 
1084 	/* setting the flag is protected by the spinlock */
1085 	pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1086 
1087 	MyProc->lwWaiting = true;
1088 	MyProc->lwWaitMode = mode;
1089 
1090 	/* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1091 	if (mode == LW_WAIT_UNTIL_FREE)
1092 		proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1093 	else
1094 		proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1095 
1096 	/* Can release the mutex now */
1097 	LWLockWaitListUnlock(lock);
1098 
1099 #ifdef LOCK_DEBUG
1100 	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1101 #endif
1102 
1103 }
1104 
1105 /*
1106  * Remove ourselves from the waitlist.
1107  *
1108  * This is used if we queued ourselves because we thought we needed to sleep
1109  * but, after further checking, we discovered that we don't actually need to
1110  * do so.
1111  */
1112 static void
LWLockDequeueSelf(LWLock * lock)1113 LWLockDequeueSelf(LWLock *lock)
1114 {
1115 	bool		found = false;
1116 	proclist_mutable_iter iter;
1117 
1118 #ifdef LWLOCK_STATS
1119 	lwlock_stats *lwstats;
1120 
1121 	lwstats = get_lwlock_stats_entry(lock);
1122 
1123 	lwstats->dequeue_self_count++;
1124 #endif
1125 
1126 	LWLockWaitListLock(lock);
1127 
1128 	/*
1129 	 * Can't just remove ourselves from the list, but we need to iterate over
1130 	 * all entries as somebody else could have dequeued us.
1131 	 */
1132 	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1133 	{
1134 		if (iter.cur == MyProc->pgprocno)
1135 		{
1136 			found = true;
1137 			proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1138 			break;
1139 		}
1140 	}
1141 
1142 	if (proclist_is_empty(&lock->waiters) &&
1143 		(pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1144 	{
1145 		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1146 	}
1147 
1148 	/* XXX: combine with fetch_and above? */
1149 	LWLockWaitListUnlock(lock);
1150 
1151 	/* clear waiting state again, nice for debugging */
1152 	if (found)
1153 		MyProc->lwWaiting = false;
1154 	else
1155 	{
1156 		int			extraWaits = 0;
1157 
1158 		/*
1159 		 * Somebody else dequeued us and has or will wake us up. Deal with the
1160 		 * superfluous absorption of a wakeup.
1161 		 */
1162 
1163 		/*
1164 		 * Reset RELEASE_OK flag if somebody woke us before we removed
1165 		 * ourselves - they'll have set it to false.
1166 		 */
1167 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1168 
1169 		/*
1170 		 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1171 		 * get reset at some inconvenient point later. Most of the time this
1172 		 * will immediately return.
1173 		 */
1174 		for (;;)
1175 		{
1176 			PGSemaphoreLock(MyProc->sem);
1177 			if (!MyProc->lwWaiting)
1178 				break;
1179 			extraWaits++;
1180 		}
1181 
1182 		/*
1183 		 * Fix the process wait semaphore's count for any absorbed wakeups.
1184 		 */
1185 		while (extraWaits-- > 0)
1186 			PGSemaphoreUnlock(MyProc->sem);
1187 	}
1188 
1189 #ifdef LOCK_DEBUG
1190 	{
1191 		/* not waiting anymore */
1192 		uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1193 
1194 		Assert(nwaiters < MAX_BACKENDS);
1195 	}
1196 #endif
1197 }
1198 
1199 /*
1200  * LWLockAcquire - acquire a lightweight lock in the specified mode
1201  *
1202  * If the lock is not available, sleep until it is.  Returns true if the lock
1203  * was available immediately, false if we had to sleep.
1204  *
1205  * Side effect: cancel/die interrupts are held off until lock release.
1206  */
1207 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1208 LWLockAcquire(LWLock *lock, LWLockMode mode)
1209 {
1210 	PGPROC	   *proc = MyProc;
1211 	bool		result = true;
1212 	int			extraWaits = 0;
1213 #ifdef LWLOCK_STATS
1214 	lwlock_stats *lwstats;
1215 
1216 	lwstats = get_lwlock_stats_entry(lock);
1217 #endif
1218 
1219 	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1220 
1221 	PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1222 
1223 #ifdef LWLOCK_STATS
1224 	/* Count lock acquisition attempts */
1225 	if (mode == LW_EXCLUSIVE)
1226 		lwstats->ex_acquire_count++;
1227 	else
1228 		lwstats->sh_acquire_count++;
1229 #endif							/* LWLOCK_STATS */
1230 
1231 	/*
1232 	 * We can't wait if we haven't got a PGPROC.  This should only occur
1233 	 * during bootstrap or shared memory initialization.  Put an Assert here
1234 	 * to catch unsafe coding practices.
1235 	 */
1236 	Assert(!(proc == NULL && IsUnderPostmaster));
1237 
1238 	/* Ensure we will have room to remember the lock */
1239 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1240 		elog(ERROR, "too many LWLocks taken");
1241 
1242 	/*
1243 	 * Lock out cancel/die interrupts until we exit the code section protected
1244 	 * by the LWLock.  This ensures that interrupts will not interfere with
1245 	 * manipulations of data structures in shared memory.
1246 	 */
1247 	HOLD_INTERRUPTS();
1248 
1249 	/*
1250 	 * Loop here to try to acquire lock after each time we are signaled by
1251 	 * LWLockRelease.
1252 	 *
1253 	 * NOTE: it might seem better to have LWLockRelease actually grant us the
1254 	 * lock, rather than retrying and possibly having to go back to sleep. But
1255 	 * in practice that is no good because it means a process swap for every
1256 	 * lock acquisition when two or more processes are contending for the same
1257 	 * lock.  Since LWLocks are normally used to protect not-very-long
1258 	 * sections of computation, a process needs to be able to acquire and
1259 	 * release the same lock many times during a single CPU time slice, even
1260 	 * in the presence of contention.  The efficiency of being able to do that
1261 	 * outweighs the inefficiency of sometimes wasting a process dispatch
1262 	 * cycle because the lock is not free when a released waiter finally gets
1263 	 * to run.  See pgsql-hackers archives for 29-Dec-01.
1264 	 */
1265 	for (;;)
1266 	{
1267 		bool		mustwait;
1268 
1269 		/*
1270 		 * Try to grab the lock the first time, we're not in the waitqueue
1271 		 * yet/anymore.
1272 		 */
1273 		mustwait = LWLockAttemptLock(lock, mode);
1274 
1275 		if (!mustwait)
1276 		{
1277 			LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1278 			break;				/* got the lock */
1279 		}
1280 
1281 		/*
1282 		 * Ok, at this point we couldn't grab the lock on the first try. We
1283 		 * cannot simply queue ourselves to the end of the list and wait to be
1284 		 * woken up because by now the lock could long have been released.
1285 		 * Instead add us to the queue and try to grab the lock again. If we
1286 		 * succeed we need to revert the queuing and be happy, otherwise we
1287 		 * recheck the lock. If we still couldn't grab it, we know that the
1288 		 * other locker will see our queue entries when releasing since they
1289 		 * existed before we checked for the lock.
1290 		 */
1291 
1292 		/* add to the queue */
1293 		LWLockQueueSelf(lock, mode);
1294 
1295 		/* we're now guaranteed to be woken up if necessary */
1296 		mustwait = LWLockAttemptLock(lock, mode);
1297 
1298 		/* ok, grabbed the lock the second time round, need to undo queueing */
1299 		if (!mustwait)
1300 		{
1301 			LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1302 
1303 			LWLockDequeueSelf(lock);
1304 			break;
1305 		}
1306 
1307 		/*
1308 		 * Wait until awakened.
1309 		 *
1310 		 * It is possible that we get awakened for a reason other than being
1311 		 * signaled by LWLockRelease.  If so, loop back and wait again.  Once
1312 		 * we've gotten the LWLock, re-increment the sema by the number of
1313 		 * additional signals received.
1314 		 */
1315 		LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1316 
1317 #ifdef LWLOCK_STATS
1318 		lwstats->block_count++;
1319 #endif
1320 
1321 		LWLockReportWaitStart(lock);
1322 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1323 			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1324 
1325 		for (;;)
1326 		{
1327 			PGSemaphoreLock(proc->sem);
1328 			if (!proc->lwWaiting)
1329 				break;
1330 			extraWaits++;
1331 		}
1332 
1333 		/* Retrying, allow LWLockRelease to release waiters again. */
1334 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1335 
1336 #ifdef LOCK_DEBUG
1337 		{
1338 			/* not waiting anymore */
1339 			uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1340 
1341 			Assert(nwaiters < MAX_BACKENDS);
1342 		}
1343 #endif
1344 
1345 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1346 			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1347 		LWLockReportWaitEnd();
1348 
1349 		LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1350 
1351 		/* Now loop back and try to acquire lock again. */
1352 		result = false;
1353 	}
1354 
1355 	if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1356 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1357 
1358 	/* Add lock to list of locks held by this backend */
1359 	held_lwlocks[num_held_lwlocks].lock = lock;
1360 	held_lwlocks[num_held_lwlocks++].mode = mode;
1361 
1362 	/*
1363 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1364 	 */
1365 	while (extraWaits-- > 0)
1366 		PGSemaphoreUnlock(proc->sem);
1367 
1368 	return result;
1369 }
1370 
1371 /*
1372  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1373  *
1374  * If the lock is not available, return false with no side-effects.
1375  *
1376  * If successful, cancel/die interrupts are held off until lock release.
1377  */
1378 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1379 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1380 {
1381 	bool		mustwait;
1382 
1383 	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1384 
1385 	PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1386 
1387 	/* Ensure we will have room to remember the lock */
1388 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1389 		elog(ERROR, "too many LWLocks taken");
1390 
1391 	/*
1392 	 * Lock out cancel/die interrupts until we exit the code section protected
1393 	 * by the LWLock.  This ensures that interrupts will not interfere with
1394 	 * manipulations of data structures in shared memory.
1395 	 */
1396 	HOLD_INTERRUPTS();
1397 
1398 	/* Check for the lock */
1399 	mustwait = LWLockAttemptLock(lock, mode);
1400 
1401 	if (mustwait)
1402 	{
1403 		/* Failed to get lock, so release interrupt holdoff */
1404 		RESUME_INTERRUPTS();
1405 
1406 		LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1407 		if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1408 			TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1409 	}
1410 	else
1411 	{
1412 		/* Add lock to list of locks held by this backend */
1413 		held_lwlocks[num_held_lwlocks].lock = lock;
1414 		held_lwlocks[num_held_lwlocks++].mode = mode;
1415 		if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1416 			TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1417 	}
1418 	return !mustwait;
1419 }
1420 
1421 /*
1422  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1423  *
1424  * The semantics of this function are a bit funky.  If the lock is currently
1425  * free, it is acquired in the given mode, and the function returns true.  If
1426  * the lock isn't immediately free, the function waits until it is released
1427  * and returns false, but does not acquire the lock.
1428  *
1429  * This is currently used for WALWriteLock: when a backend flushes the WAL,
1430  * holding WALWriteLock, it can flush the commit records of many other
1431  * backends as a side-effect.  Those other backends need to wait until the
1432  * flush finishes, but don't need to acquire the lock anymore.  They can just
1433  * wake up, observe that their records have already been flushed, and return.
1434  */
1435 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1436 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1437 {
1438 	PGPROC	   *proc = MyProc;
1439 	bool		mustwait;
1440 	int			extraWaits = 0;
1441 #ifdef LWLOCK_STATS
1442 	lwlock_stats *lwstats;
1443 
1444 	lwstats = get_lwlock_stats_entry(lock);
1445 #endif
1446 
1447 	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1448 
1449 	PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1450 
1451 	/* Ensure we will have room to remember the lock */
1452 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1453 		elog(ERROR, "too many LWLocks taken");
1454 
1455 	/*
1456 	 * Lock out cancel/die interrupts until we exit the code section protected
1457 	 * by the LWLock.  This ensures that interrupts will not interfere with
1458 	 * manipulations of data structures in shared memory.
1459 	 */
1460 	HOLD_INTERRUPTS();
1461 
1462 	/*
1463 	 * NB: We're using nearly the same twice-in-a-row lock acquisition
1464 	 * protocol as LWLockAcquire(). Check its comments for details.
1465 	 */
1466 	mustwait = LWLockAttemptLock(lock, mode);
1467 
1468 	if (mustwait)
1469 	{
1470 		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1471 
1472 		mustwait = LWLockAttemptLock(lock, mode);
1473 
1474 		if (mustwait)
1475 		{
1476 			/*
1477 			 * Wait until awakened.  Like in LWLockAcquire, be prepared for
1478 			 * bogus wakeups.
1479 			 */
1480 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1481 
1482 #ifdef LWLOCK_STATS
1483 			lwstats->block_count++;
1484 #endif
1485 
1486 			LWLockReportWaitStart(lock);
1487 			if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1488 				TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1489 
1490 			for (;;)
1491 			{
1492 				PGSemaphoreLock(proc->sem);
1493 				if (!proc->lwWaiting)
1494 					break;
1495 				extraWaits++;
1496 			}
1497 
1498 #ifdef LOCK_DEBUG
1499 			{
1500 				/* not waiting anymore */
1501 				uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1502 
1503 				Assert(nwaiters < MAX_BACKENDS);
1504 			}
1505 #endif
1506 			if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1507 				TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1508 			LWLockReportWaitEnd();
1509 
1510 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1511 		}
1512 		else
1513 		{
1514 			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1515 
1516 			/*
1517 			 * Got lock in the second attempt, undo queueing. We need to treat
1518 			 * this as having successfully acquired the lock, otherwise we'd
1519 			 * not necessarily wake up people we've prevented from acquiring
1520 			 * the lock.
1521 			 */
1522 			LWLockDequeueSelf(lock);
1523 		}
1524 	}
1525 
1526 	/*
1527 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1528 	 */
1529 	while (extraWaits-- > 0)
1530 		PGSemaphoreUnlock(proc->sem);
1531 
1532 	if (mustwait)
1533 	{
1534 		/* Failed to get lock, so release interrupt holdoff */
1535 		RESUME_INTERRUPTS();
1536 		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1537 		if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1538 			TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1539 	}
1540 	else
1541 	{
1542 		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1543 		/* Add lock to list of locks held by this backend */
1544 		held_lwlocks[num_held_lwlocks].lock = lock;
1545 		held_lwlocks[num_held_lwlocks++].mode = mode;
1546 		if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1547 			TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1548 	}
1549 
1550 	return !mustwait;
1551 }
1552 
1553 /*
1554  * Does the lwlock in its current state need to wait for the variable value to
1555  * change?
1556  *
1557  * If we don't need to wait, and it's because the value of the variable has
1558  * changed, store the current value in newval.
1559  *
1560  * *result is set to true if the lock was free, and false otherwise.
1561  */
1562 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1563 LWLockConflictsWithVar(LWLock *lock,
1564 					   uint64 *valptr, uint64 oldval, uint64 *newval,
1565 					   bool *result)
1566 {
1567 	bool		mustwait;
1568 	uint64		value;
1569 
1570 	/*
1571 	 * Test first to see if it the slot is free right now.
1572 	 *
1573 	 * XXX: the caller uses a spinlock before this, so we don't need a memory
1574 	 * barrier here as far as the current usage is concerned.  But that might
1575 	 * not be safe in general.
1576 	 */
1577 	mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1578 
1579 	if (!mustwait)
1580 	{
1581 		*result = true;
1582 		return false;
1583 	}
1584 
1585 	*result = false;
1586 
1587 	/*
1588 	 * Read value using the lwlock's wait list lock, as we can't generally
1589 	 * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
1590 	 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1591 	 */
1592 	LWLockWaitListLock(lock);
1593 	value = *valptr;
1594 	LWLockWaitListUnlock(lock);
1595 
1596 	if (value != oldval)
1597 	{
1598 		mustwait = false;
1599 		*newval = value;
1600 	}
1601 	else
1602 	{
1603 		mustwait = true;
1604 	}
1605 
1606 	return mustwait;
1607 }
1608 
1609 /*
1610  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1611  *
1612  * If the lock is held and *valptr equals oldval, waits until the lock is
1613  * either freed, or the lock holder updates *valptr by calling
1614  * LWLockUpdateVar.  If the lock is free on exit (immediately or after
1615  * waiting), returns true.  If the lock is still held, but *valptr no longer
1616  * matches oldval, returns false and sets *newval to the current value in
1617  * *valptr.
1618  *
1619  * Note: this function ignores shared lock holders; if the lock is held
1620  * in shared mode, returns 'true'.
1621  */
1622 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1623 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1624 {
1625 	PGPROC	   *proc = MyProc;
1626 	int			extraWaits = 0;
1627 	bool		result = false;
1628 #ifdef LWLOCK_STATS
1629 	lwlock_stats *lwstats;
1630 
1631 	lwstats = get_lwlock_stats_entry(lock);
1632 #endif
1633 
1634 	PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1635 
1636 	/*
1637 	 * Lock out cancel/die interrupts while we sleep on the lock.  There is no
1638 	 * cleanup mechanism to remove us from the wait queue if we got
1639 	 * interrupted.
1640 	 */
1641 	HOLD_INTERRUPTS();
1642 
1643 	/*
1644 	 * Loop here to check the lock's status after each time we are signaled.
1645 	 */
1646 	for (;;)
1647 	{
1648 		bool		mustwait;
1649 
1650 		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1651 										  &result);
1652 
1653 		if (!mustwait)
1654 			break;				/* the lock was free or value didn't match */
1655 
1656 		/*
1657 		 * Add myself to wait queue. Note that this is racy, somebody else
1658 		 * could wakeup before we're finished queuing. NB: We're using nearly
1659 		 * the same twice-in-a-row lock acquisition protocol as
1660 		 * LWLockAcquire(). Check its comments for details. The only
1661 		 * difference is that we also have to check the variable's values when
1662 		 * checking the state of the lock.
1663 		 */
1664 		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1665 
1666 		/*
1667 		 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1668 		 * lock is released.
1669 		 */
1670 		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1671 
1672 		/*
1673 		 * We're now guaranteed to be woken up if necessary. Recheck the lock
1674 		 * and variables state.
1675 		 */
1676 		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1677 										  &result);
1678 
1679 		/* Ok, no conflict after we queued ourselves. Undo queueing. */
1680 		if (!mustwait)
1681 		{
1682 			LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1683 
1684 			LWLockDequeueSelf(lock);
1685 			break;
1686 		}
1687 
1688 		/*
1689 		 * Wait until awakened.
1690 		 *
1691 		 * It is possible that we get awakened for a reason other than being
1692 		 * signaled by LWLockRelease.  If so, loop back and wait again.  Once
1693 		 * we've gotten the LWLock, re-increment the sema by the number of
1694 		 * additional signals received.
1695 		 */
1696 		LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1697 
1698 #ifdef LWLOCK_STATS
1699 		lwstats->block_count++;
1700 #endif
1701 
1702 		LWLockReportWaitStart(lock);
1703 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1704 			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1705 
1706 		for (;;)
1707 		{
1708 			PGSemaphoreLock(proc->sem);
1709 			if (!proc->lwWaiting)
1710 				break;
1711 			extraWaits++;
1712 		}
1713 
1714 #ifdef LOCK_DEBUG
1715 		{
1716 			/* not waiting anymore */
1717 			uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1718 
1719 			Assert(nwaiters < MAX_BACKENDS);
1720 		}
1721 #endif
1722 
1723 		if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1724 			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1725 		LWLockReportWaitEnd();
1726 
1727 		LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1728 
1729 		/* Now loop back and check the status of the lock again. */
1730 	}
1731 
1732 	if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1733 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1734 
1735 	/*
1736 	 * Fix the process wait semaphore's count for any absorbed wakeups.
1737 	 */
1738 	while (extraWaits-- > 0)
1739 		PGSemaphoreUnlock(proc->sem);
1740 
1741 	/*
1742 	 * Now okay to allow cancel/die interrupts.
1743 	 */
1744 	RESUME_INTERRUPTS();
1745 
1746 	return result;
1747 }
1748 
1749 
1750 /*
1751  * LWLockUpdateVar - Update a variable and wake up waiters atomically
1752  *
1753  * Sets *valptr to 'val', and wakes up all processes waiting for us with
1754  * LWLockWaitForVar().  Setting the value and waking up the processes happen
1755  * atomically so that any process calling LWLockWaitForVar() on the same lock
1756  * is guaranteed to see the new value, and act accordingly.
1757  *
1758  * The caller must be holding the lock in exclusive mode.
1759  */
1760 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1761 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1762 {
1763 	proclist_head wakeup;
1764 	proclist_mutable_iter iter;
1765 
1766 	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1767 
1768 	proclist_init(&wakeup);
1769 
1770 	LWLockWaitListLock(lock);
1771 
1772 	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1773 
1774 	/* Update the lock's value */
1775 	*valptr = val;
1776 
1777 	/*
1778 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1779 	 * up. They are always in the front of the queue.
1780 	 */
1781 	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1782 	{
1783 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
1784 
1785 		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1786 			break;
1787 
1788 		proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1789 		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1790 	}
1791 
1792 	/* We are done updating shared state of the lock itself. */
1793 	LWLockWaitListUnlock(lock);
1794 
1795 	/*
1796 	 * Awaken any waiters I removed from the queue.
1797 	 */
1798 	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1799 	{
1800 		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
1801 
1802 		proclist_delete(&wakeup, iter.cur, lwWaitLink);
1803 		/* check comment in LWLockWakeup() about this barrier */
1804 		pg_write_barrier();
1805 		waiter->lwWaiting = false;
1806 		PGSemaphoreUnlock(waiter->sem);
1807 	}
1808 }
1809 
1810 
1811 /*
1812  * LWLockRelease - release a previously acquired lock
1813  */
1814 void
LWLockRelease(LWLock * lock)1815 LWLockRelease(LWLock *lock)
1816 {
1817 	LWLockMode	mode;
1818 	uint32		oldstate;
1819 	bool		check_waiters;
1820 	int			i;
1821 
1822 	/*
1823 	 * Remove lock from list of locks held.  Usually, but not always, it will
1824 	 * be the latest-acquired lock; so search array backwards.
1825 	 */
1826 	for (i = num_held_lwlocks; --i >= 0;)
1827 		if (lock == held_lwlocks[i].lock)
1828 			break;
1829 
1830 	if (i < 0)
1831 		elog(ERROR, "lock %s is not held", T_NAME(lock));
1832 
1833 	mode = held_lwlocks[i].mode;
1834 
1835 	num_held_lwlocks--;
1836 	for (; i < num_held_lwlocks; i++)
1837 		held_lwlocks[i] = held_lwlocks[i + 1];
1838 
1839 	PRINT_LWDEBUG("LWLockRelease", lock, mode);
1840 
1841 	/*
1842 	 * Release my hold on lock, after that it can immediately be acquired by
1843 	 * others, even if we still have to wakeup other waiters.
1844 	 */
1845 	if (mode == LW_EXCLUSIVE)
1846 		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1847 	else
1848 		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1849 
1850 	/* nobody else can have that kind of lock */
1851 	Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1852 
1853 
1854 	/*
1855 	 * We're still waiting for backends to get scheduled, don't wake them up
1856 	 * again.
1857 	 */
1858 	if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1859 		(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1860 		(oldstate & LW_LOCK_MASK) == 0)
1861 		check_waiters = true;
1862 	else
1863 		check_waiters = false;
1864 
1865 	/*
1866 	 * As waking up waiters requires the spinlock to be acquired, only do so
1867 	 * if necessary.
1868 	 */
1869 	if (check_waiters)
1870 	{
1871 		/* XXX: remove before commit? */
1872 		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1873 		LWLockWakeup(lock);
1874 	}
1875 
1876 	if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1877 		TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1878 
1879 	/*
1880 	 * Now okay to allow cancel/die interrupts.
1881 	 */
1882 	RESUME_INTERRUPTS();
1883 }
1884 
1885 /*
1886  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1887  */
1888 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1889 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1890 {
1891 	LWLockWaitListLock(lock);
1892 
1893 	/*
1894 	 * Set the variable's value before releasing the lock, that prevents race
1895 	 * a race condition wherein a new locker acquires the lock, but hasn't yet
1896 	 * set the variables value.
1897 	 */
1898 	*valptr = val;
1899 	LWLockWaitListUnlock(lock);
1900 
1901 	LWLockRelease(lock);
1902 }
1903 
1904 
1905 /*
1906  * LWLockReleaseAll - release all currently-held locks
1907  *
1908  * Used to clean up after ereport(ERROR). An important difference between this
1909  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1910  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
1911  * has been set to an appropriate level earlier in error recovery. We could
1912  * decrement it below zero if we allow it to drop for each released lock!
1913  */
1914 void
LWLockReleaseAll(void)1915 LWLockReleaseAll(void)
1916 {
1917 	while (num_held_lwlocks > 0)
1918 	{
1919 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
1920 
1921 		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1922 	}
1923 }
1924 
1925 
1926 /*
1927  * LWLockHeldByMe - test whether my process holds a lock in any mode
1928  *
1929  * This is meant as debug support only.
1930  */
1931 bool
LWLockHeldByMe(LWLock * l)1932 LWLockHeldByMe(LWLock *l)
1933 {
1934 	int			i;
1935 
1936 	for (i = 0; i < num_held_lwlocks; i++)
1937 	{
1938 		if (held_lwlocks[i].lock == l)
1939 			return true;
1940 	}
1941 	return false;
1942 }
1943 
1944 /*
1945  * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1946  *
1947  * This is meant as debug support only.
1948  */
1949 bool
LWLockHeldByMeInMode(LWLock * l,LWLockMode mode)1950 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1951 {
1952 	int			i;
1953 
1954 	for (i = 0; i < num_held_lwlocks; i++)
1955 	{
1956 		if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1957 			return true;
1958 	}
1959 	return false;
1960 }
1961