1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *	  POSTGRES primary lock mechanism
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/lmgr/lock.c
12  *
13  * NOTES
14  *	  A lock table is a shared memory hash table.  When
15  *	  a process tries to acquire a lock of a type that conflicts
16  *	  with existing locks, it is put to sleep using the routines
17  *	  in storage/lmgr/proc.c.
18  *
19  *	  For the most part, this code should be invoked via lmgr.c
20  *	  or another lock-management module, not directly.
21  *
22  *	Interface:
23  *
24  *	InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
25  *	LockAcquire(), LockRelease(), LockReleaseAll(),
26  *	LockCheckConflicts(), GrantLock()
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31 
32 #include <signal.h>
33 #include <unistd.h>
34 
35 #include "access/transam.h"
36 #include "access/twophase.h"
37 #include "access/twophase_rmgr.h"
38 #include "access/xact.h"
39 #include "access/xlog.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/sinvaladt.h"
46 #include "storage/spin.h"
47 #include "storage/standby.h"
48 #include "utils/memutils.h"
49 #include "utils/ps_status.h"
50 #include "utils/resowner_private.h"
51 
52 
53 /* This configuration variable is used to set the lock table size */
54 int			max_locks_per_xact; /* set by guc.c */
55 
56 #define NLOCKENTS() \
57 	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
58 
59 
60 /*
61  * Data structures defining the semantics of the standard lock methods.
62  *
63  * The conflict table defines the semantics of the various lock modes.
64  */
65 static const LOCKMASK LockConflicts[] = {
66 	0,
67 
68 	/* AccessShareLock */
69 	LOCKBIT_ON(AccessExclusiveLock),
70 
71 	/* RowShareLock */
72 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
73 
74 	/* RowExclusiveLock */
75 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
76 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
77 
78 	/* ShareUpdateExclusiveLock */
79 	LOCKBIT_ON(ShareUpdateExclusiveLock) |
80 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
81 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
82 
83 	/* ShareLock */
84 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
85 	LOCKBIT_ON(ShareRowExclusiveLock) |
86 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
87 
88 	/* ShareRowExclusiveLock */
89 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
90 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
91 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
92 
93 	/* ExclusiveLock */
94 	LOCKBIT_ON(RowShareLock) |
95 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
96 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
97 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
98 
99 	/* AccessExclusiveLock */
100 	LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
101 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
102 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
103 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)
104 
105 };
106 
107 /* Names of lock modes, for debug printouts */
108 static const char *const lock_mode_names[] =
109 {
110 	"INVALID",
111 	"AccessShareLock",
112 	"RowShareLock",
113 	"RowExclusiveLock",
114 	"ShareUpdateExclusiveLock",
115 	"ShareLock",
116 	"ShareRowExclusiveLock",
117 	"ExclusiveLock",
118 	"AccessExclusiveLock"
119 };
120 
121 #ifndef LOCK_DEBUG
122 static bool Dummy_trace = false;
123 #endif
124 
125 static const LockMethodData default_lockmethod = {
126 	AccessExclusiveLock,		/* highest valid lock mode number */
127 	LockConflicts,
128 	lock_mode_names,
129 #ifdef LOCK_DEBUG
130 	&Trace_locks
131 #else
132 	&Dummy_trace
133 #endif
134 };
135 
136 static const LockMethodData user_lockmethod = {
137 	AccessExclusiveLock,		/* highest valid lock mode number */
138 	LockConflicts,
139 	lock_mode_names,
140 #ifdef LOCK_DEBUG
141 	&Trace_userlocks
142 #else
143 	&Dummy_trace
144 #endif
145 };
146 
147 /*
148  * map from lock method id to the lock table data structures
149  */
150 static const LockMethod LockMethods[] = {
151 	NULL,
152 	&default_lockmethod,
153 	&user_lockmethod
154 };
155 
156 
157 /* Record that's written to 2PC state file when a lock is persisted */
158 typedef struct TwoPhaseLockRecord
159 {
160 	LOCKTAG		locktag;
161 	LOCKMODE	lockmode;
162 } TwoPhaseLockRecord;
163 
164 
165 /*
166  * Count of the number of fast path lock slots we believe to be used.  This
167  * might be higher than the real number if another backend has transferred
168  * our locks to the primary lock table, but it can never be lower than the
169  * real value, since only we can acquire locks on our own behalf.
170  */
171 static int	FastPathLocalUseCount = 0;
172 
173 /* Macros for manipulating proc->fpLockBits */
174 #define FAST_PATH_BITS_PER_SLOT			3
175 #define FAST_PATH_LOCKNUMBER_OFFSET		1
176 #define FAST_PATH_MASK					((1 << FAST_PATH_BITS_PER_SLOT) - 1)
177 #define FAST_PATH_GET_BITS(proc, n) \
178 	(((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
179 #define FAST_PATH_BIT_POSITION(n, l) \
180 	(AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
181 	 AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
182 	 AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
183 	 ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
184 #define FAST_PATH_SET_LOCKMODE(proc, n, l) \
185 	 (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
186 #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
187 	 (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
188 #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
189 	 ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
190 
191 /*
192  * The fast-path lock mechanism is concerned only with relation locks on
193  * unshared relations by backends bound to a database.  The fast-path
194  * mechanism exists mostly to accelerate acquisition and release of locks
195  * that rarely conflict.  Because ShareUpdateExclusiveLock is
196  * self-conflicting, it can't use the fast-path mechanism; but it also does
197  * not conflict with any of the locks that do, so we can ignore it completely.
198  */
199 #define EligibleForRelationFastPath(locktag, mode) \
200 	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
201 	(locktag)->locktag_type == LOCKTAG_RELATION && \
202 	(locktag)->locktag_field1 == MyDatabaseId && \
203 	MyDatabaseId != InvalidOid && \
204 	(mode) < ShareUpdateExclusiveLock)
205 #define ConflictsWithRelationFastPath(locktag, mode) \
206 	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
207 	(locktag)->locktag_type == LOCKTAG_RELATION && \
208 	(locktag)->locktag_field1 != InvalidOid && \
209 	(mode) > ShareUpdateExclusiveLock)
210 
211 static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
212 static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
213 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
214 										  const LOCKTAG *locktag, uint32 hashcode);
215 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
216 
217 /*
218  * To make the fast-path lock mechanism work, we must have some way of
219  * preventing the use of the fast-path when a conflicting lock might be
220  * present.  We partition* the locktag space into FAST_PATH_HASH_BUCKETS
221  * partitions, and maintain an integer count of the number of "strong" lockers
222  * in each partition.  When any "strong" lockers are present (which is
223  * hopefully not very often), the fast-path mechanism can't be used, and we
224  * must fall back to the slower method of pushing matching locks directly
225  * into the main lock tables.
226  *
227  * The deadlock detector does not know anything about the fast path mechanism,
228  * so any locks that might be involved in a deadlock must be transferred from
229  * the fast-path queues to the main lock table.
230  */
231 
232 #define FAST_PATH_STRONG_LOCK_HASH_BITS			10
233 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
234 	(1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
235 #define FastPathStrongLockHashPartition(hashcode) \
236 	((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
237 
238 typedef struct
239 {
240 	slock_t		mutex;
241 	uint32		count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
242 } FastPathStrongRelationLockData;
243 
244 static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;
245 
246 
247 /*
248  * Pointers to hash tables containing lock state
249  *
250  * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
251  * shared memory; LockMethodLocalHash is local to each backend.
252  */
253 static HTAB *LockMethodLockHash;
254 static HTAB *LockMethodProcLockHash;
255 static HTAB *LockMethodLocalHash;
256 
257 
258 /* private state for error cleanup */
259 static LOCALLOCK *StrongLockInProgress;
260 static LOCALLOCK *awaitedLock;
261 static ResourceOwner awaitedOwner;
262 
263 
264 #ifdef LOCK_DEBUG
265 
266 /*------
267  * The following configuration options are available for lock debugging:
268  *
269  *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
270  *	   TRACE_USERLOCKS	-- same but for user locks
271  *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
272  *						   (use to avoid output on system tables)
273  *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
274  *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
275  *
276  * Furthermore, but in storage/lmgr/lwlock.c:
277  *	   TRACE_LWLOCKS	-- trace lightweight locks (pretty useless)
278  *
279  * Define LOCK_DEBUG at compile time to get all these enabled.
280  * --------
281  */
282 
283 int			Trace_lock_oidmin = FirstNormalObjectId;
284 bool		Trace_locks = false;
285 bool		Trace_userlocks = false;
286 int			Trace_lock_table = 0;
287 bool		Debug_deadlocks = false;
288 
289 
290 inline static bool
LOCK_DEBUG_ENABLED(const LOCKTAG * tag)291 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
292 {
293 	return
294 		(*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
295 		 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
296 		|| (Trace_lock_table &&
297 			(tag->locktag_field2 == Trace_lock_table));
298 }
299 
300 
301 inline static void
LOCK_PRINT(const char * where,const LOCK * lock,LOCKMODE type)302 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
303 {
304 	if (LOCK_DEBUG_ENABLED(&lock->tag))
305 		elog(LOG,
306 			 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
307 			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
308 			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
309 			 where, lock,
310 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
311 			 lock->tag.locktag_field3, lock->tag.locktag_field4,
312 			 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
313 			 lock->grantMask,
314 			 lock->requested[1], lock->requested[2], lock->requested[3],
315 			 lock->requested[4], lock->requested[5], lock->requested[6],
316 			 lock->requested[7], lock->nRequested,
317 			 lock->granted[1], lock->granted[2], lock->granted[3],
318 			 lock->granted[4], lock->granted[5], lock->granted[6],
319 			 lock->granted[7], lock->nGranted,
320 			 lock->waitProcs.size,
321 			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
322 }
323 
324 
325 inline static void
PROCLOCK_PRINT(const char * where,const PROCLOCK * proclockP)326 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
327 {
328 	if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
329 		elog(LOG,
330 			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
331 			 where, proclockP, proclockP->tag.myLock,
332 			 PROCLOCK_LOCKMETHOD(*(proclockP)),
333 			 proclockP->tag.myProc, (int) proclockP->holdMask);
334 }
335 #else							/* not LOCK_DEBUG */
336 
337 #define LOCK_PRINT(where, lock, type)  ((void) 0)
338 #define PROCLOCK_PRINT(where, proclockP)  ((void) 0)
339 #endif							/* not LOCK_DEBUG */
340 
341 
342 static uint32 proclock_hash(const void *key, Size keysize);
343 static void RemoveLocalLock(LOCALLOCK *locallock);
344 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
345 								  const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
346 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
347 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
348 static void FinishStrongLockAcquire(void);
349 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
350 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
351 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
352 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
353 						PROCLOCK *proclock, LockMethod lockMethodTable);
354 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
355 						LockMethod lockMethodTable, uint32 hashcode,
356 						bool wakeupNeeded);
357 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
358 								 LOCKTAG *locktag, LOCKMODE lockmode,
359 								 bool decrement_strong_lock_count);
360 static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc,
361 										   BlockedProcsData *data);
362 
363 
364 /*
365  * InitLocks -- Initialize the lock manager's data structures.
366  *
367  * This is called from CreateSharedMemoryAndSemaphores(), which see for
368  * more comments.  In the normal postmaster case, the shared hash tables
369  * are created here, as well as a locallock hash table that will remain
370  * unused and empty in the postmaster itself.  Backends inherit the pointers
371  * to the shared tables via fork(), and also inherit an image of the locallock
372  * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
373  * backend re-executes this code to obtain pointers to the already existing
374  * shared hash tables and to create its locallock hash table.
375  */
376 void
InitLocks(void)377 InitLocks(void)
378 {
379 	HASHCTL		info;
380 	long		init_table_size,
381 				max_table_size;
382 	bool		found;
383 
384 	/*
385 	 * Compute init/max size to request for lock hashtables.  Note these
386 	 * calculations must agree with LockShmemSize!
387 	 */
388 	max_table_size = NLOCKENTS();
389 	init_table_size = max_table_size / 2;
390 
391 	/*
392 	 * Allocate hash table for LOCK structs.  This stores per-locked-object
393 	 * information.
394 	 */
395 	MemSet(&info, 0, sizeof(info));
396 	info.keysize = sizeof(LOCKTAG);
397 	info.entrysize = sizeof(LOCK);
398 	info.num_partitions = NUM_LOCK_PARTITIONS;
399 
400 	LockMethodLockHash = ShmemInitHash("LOCK hash",
401 									   init_table_size,
402 									   max_table_size,
403 									   &info,
404 									   HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
405 
406 	/* Assume an average of 2 holders per lock */
407 	max_table_size *= 2;
408 	init_table_size *= 2;
409 
410 	/*
411 	 * Allocate hash table for PROCLOCK structs.  This stores
412 	 * per-lock-per-holder information.
413 	 */
414 	info.keysize = sizeof(PROCLOCKTAG);
415 	info.entrysize = sizeof(PROCLOCK);
416 	info.hash = proclock_hash;
417 	info.num_partitions = NUM_LOCK_PARTITIONS;
418 
419 	LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
420 										   init_table_size,
421 										   max_table_size,
422 										   &info,
423 										   HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
424 
425 	/*
426 	 * Allocate fast-path structures.
427 	 */
428 	FastPathStrongRelationLocks =
429 		ShmemInitStruct("Fast Path Strong Relation Lock Data",
430 						sizeof(FastPathStrongRelationLockData), &found);
431 	if (!found)
432 		SpinLockInit(&FastPathStrongRelationLocks->mutex);
433 
434 	/*
435 	 * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
436 	 * counts and resource owner information.
437 	 *
438 	 * The non-shared table could already exist in this process (this occurs
439 	 * when the postmaster is recreating shared memory after a backend crash).
440 	 * If so, delete and recreate it.  (We could simply leave it, since it
441 	 * ought to be empty in the postmaster, but for safety let's zap it.)
442 	 */
443 	if (LockMethodLocalHash)
444 		hash_destroy(LockMethodLocalHash);
445 
446 	info.keysize = sizeof(LOCALLOCKTAG);
447 	info.entrysize = sizeof(LOCALLOCK);
448 
449 	LockMethodLocalHash = hash_create("LOCALLOCK hash",
450 									  16,
451 									  &info,
452 									  HASH_ELEM | HASH_BLOBS);
453 }
454 
455 
456 /*
457  * Fetch the lock method table associated with a given lock
458  */
459 LockMethod
GetLocksMethodTable(const LOCK * lock)460 GetLocksMethodTable(const LOCK *lock)
461 {
462 	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
463 
464 	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
465 	return LockMethods[lockmethodid];
466 }
467 
468 /*
469  * Fetch the lock method table associated with a given locktag
470  */
471 LockMethod
GetLockTagsMethodTable(const LOCKTAG * locktag)472 GetLockTagsMethodTable(const LOCKTAG *locktag)
473 {
474 	LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid;
475 
476 	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
477 	return LockMethods[lockmethodid];
478 }
479 
480 
481 /*
482  * Compute the hash code associated with a LOCKTAG.
483  *
484  * To avoid unnecessary recomputations of the hash code, we try to do this
485  * just once per function, and then pass it around as needed.  Aside from
486  * passing the hashcode to hash_search_with_hash_value(), we can extract
487  * the lock partition number from the hashcode.
488  */
489 uint32
LockTagHashCode(const LOCKTAG * locktag)490 LockTagHashCode(const LOCKTAG *locktag)
491 {
492 	return get_hash_value(LockMethodLockHash, (const void *) locktag);
493 }
494 
495 /*
496  * Compute the hash code associated with a PROCLOCKTAG.
497  *
498  * Because we want to use just one set of partition locks for both the
499  * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
500  * fall into the same partition number as their associated LOCKs.
501  * dynahash.c expects the partition number to be the low-order bits of
502  * the hash code, and therefore a PROCLOCKTAG's hash code must have the
503  * same low-order bits as the associated LOCKTAG's hash code.  We achieve
504  * this with this specialized hash function.
505  */
506 static uint32
proclock_hash(const void * key,Size keysize)507 proclock_hash(const void *key, Size keysize)
508 {
509 	const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
510 	uint32		lockhash;
511 	Datum		procptr;
512 
513 	Assert(keysize == sizeof(PROCLOCKTAG));
514 
515 	/* Look into the associated LOCK object, and compute its hash code */
516 	lockhash = LockTagHashCode(&proclocktag->myLock->tag);
517 
518 	/*
519 	 * To make the hash code also depend on the PGPROC, we xor the proc
520 	 * struct's address into the hash code, left-shifted so that the
521 	 * partition-number bits don't change.  Since this is only a hash, we
522 	 * don't care if we lose high-order bits of the address; use an
523 	 * intermediate variable to suppress cast-pointer-to-int warnings.
524 	 */
525 	procptr = PointerGetDatum(proclocktag->myProc);
526 	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
527 
528 	return lockhash;
529 }
530 
531 /*
532  * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
533  * for its underlying LOCK.
534  *
535  * We use this just to avoid redundant calls of LockTagHashCode().
536  */
537 static inline uint32
ProcLockHashCode(const PROCLOCKTAG * proclocktag,uint32 hashcode)538 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
539 {
540 	uint32		lockhash = hashcode;
541 	Datum		procptr;
542 
543 	/*
544 	 * This must match proclock_hash()!
545 	 */
546 	procptr = PointerGetDatum(proclocktag->myProc);
547 	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
548 
549 	return lockhash;
550 }
551 
552 /*
553  * Given two lock modes, return whether they would conflict.
554  */
555 bool
DoLockModesConflict(LOCKMODE mode1,LOCKMODE mode2)556 DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
557 {
558 	LockMethod	lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
559 
560 	if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
561 		return true;
562 
563 	return false;
564 }
565 
566 /*
567  * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
568  *		by the current transaction
569  */
570 bool
LockHeldByMe(const LOCKTAG * locktag,LOCKMODE lockmode)571 LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
572 {
573 	LOCALLOCKTAG localtag;
574 	LOCALLOCK  *locallock;
575 
576 	/*
577 	 * See if there is a LOCALLOCK entry for this lock and lockmode
578 	 */
579 	MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
580 	localtag.lock = *locktag;
581 	localtag.mode = lockmode;
582 
583 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
584 										  (void *) &localtag,
585 										  HASH_FIND, NULL);
586 
587 	return (locallock && locallock->nLocks > 0);
588 }
589 
590 /*
591  * LockHasWaiters -- look up 'locktag' and check if releasing this
592  *		lock would wake up other processes waiting for it.
593  */
594 bool
LockHasWaiters(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)595 LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
596 {
597 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
598 	LockMethod	lockMethodTable;
599 	LOCALLOCKTAG localtag;
600 	LOCALLOCK  *locallock;
601 	LOCK	   *lock;
602 	PROCLOCK   *proclock;
603 	LWLock	   *partitionLock;
604 	bool		hasWaiters = false;
605 
606 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
607 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
608 	lockMethodTable = LockMethods[lockmethodid];
609 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
610 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
611 
612 #ifdef LOCK_DEBUG
613 	if (LOCK_DEBUG_ENABLED(locktag))
614 		elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
615 			 locktag->locktag_field1, locktag->locktag_field2,
616 			 lockMethodTable->lockModeNames[lockmode]);
617 #endif
618 
619 	/*
620 	 * Find the LOCALLOCK entry for this lock and lockmode
621 	 */
622 	MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
623 	localtag.lock = *locktag;
624 	localtag.mode = lockmode;
625 
626 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
627 										  (void *) &localtag,
628 										  HASH_FIND, NULL);
629 
630 	/*
631 	 * let the caller print its own error message, too. Do not ereport(ERROR).
632 	 */
633 	if (!locallock || locallock->nLocks <= 0)
634 	{
635 		elog(WARNING, "you don't own a lock of type %s",
636 			 lockMethodTable->lockModeNames[lockmode]);
637 		return false;
638 	}
639 
640 	/*
641 	 * Check the shared lock table.
642 	 */
643 	partitionLock = LockHashPartitionLock(locallock->hashcode);
644 
645 	LWLockAcquire(partitionLock, LW_SHARED);
646 
647 	/*
648 	 * We don't need to re-find the lock or proclock, since we kept their
649 	 * addresses in the locallock table, and they couldn't have been removed
650 	 * while we were holding a lock on them.
651 	 */
652 	lock = locallock->lock;
653 	LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
654 	proclock = locallock->proclock;
655 	PROCLOCK_PRINT("LockHasWaiters: found", proclock);
656 
657 	/*
658 	 * Double-check that we are actually holding a lock of the type we want to
659 	 * release.
660 	 */
661 	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
662 	{
663 		PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
664 		LWLockRelease(partitionLock);
665 		elog(WARNING, "you don't own a lock of type %s",
666 			 lockMethodTable->lockModeNames[lockmode]);
667 		RemoveLocalLock(locallock);
668 		return false;
669 	}
670 
671 	/*
672 	 * Do the checking.
673 	 */
674 	if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
675 		hasWaiters = true;
676 
677 	LWLockRelease(partitionLock);
678 
679 	return hasWaiters;
680 }
681 
682 /*
683  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
684  *		set lock if/when no conflicts.
685  *
686  * Inputs:
687  *	locktag: unique identifier for the lockable object
688  *	lockmode: lock mode to acquire
689  *	sessionLock: if true, acquire lock for session not current transaction
690  *	dontWait: if true, don't wait to acquire lock
691  *
692  * Returns one of:
693  *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
694  *		LOCKACQUIRE_OK				lock successfully acquired
695  *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
696  *		LOCKACQUIRE_ALREADY_CLEAR	incremented count for lock already clear
697  *
698  * In the normal case where dontWait=false and the caller doesn't need to
699  * distinguish a freshly acquired lock from one already taken earlier in
700  * this same transaction, there is no need to examine the return value.
701  *
702  * Side Effects: The lock is acquired and recorded in lock tables.
703  *
704  * NOTE: if we wait for the lock, there is no way to abort the wait
705  * short of aborting the transaction.
706  */
707 LockAcquireResult
LockAcquire(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait)708 LockAcquire(const LOCKTAG *locktag,
709 			LOCKMODE lockmode,
710 			bool sessionLock,
711 			bool dontWait)
712 {
713 	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
714 							   true, NULL);
715 }
716 
717 /*
718  * LockAcquireExtended - allows us to specify additional options
719  *
720  * reportMemoryError specifies whether a lock request that fills the lock
721  * table should generate an ERROR or not.  Passing "false" allows the caller
722  * to attempt to recover from lock-table-full situations, perhaps by forcibly
723  * cancelling other lock holders and then retrying.  Note, however, that the
724  * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use
725  * in combination with dontWait = true, as the cause of failure couldn't be
726  * distinguished.
727  *
728  * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
729  * table entry if a lock is successfully acquired, or NULL if not.
730  */
731 LockAcquireResult
LockAcquireExtended(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait,bool reportMemoryError,LOCALLOCK ** locallockp)732 LockAcquireExtended(const LOCKTAG *locktag,
733 					LOCKMODE lockmode,
734 					bool sessionLock,
735 					bool dontWait,
736 					bool reportMemoryError,
737 					LOCALLOCK **locallockp)
738 {
739 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
740 	LockMethod	lockMethodTable;
741 	LOCALLOCKTAG localtag;
742 	LOCALLOCK  *locallock;
743 	LOCK	   *lock;
744 	PROCLOCK   *proclock;
745 	bool		found;
746 	ResourceOwner owner;
747 	uint32		hashcode;
748 	LWLock	   *partitionLock;
749 	int			status;
750 	bool		log_lock = false;
751 
752 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
753 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
754 	lockMethodTable = LockMethods[lockmethodid];
755 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
756 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
757 
758 	if (RecoveryInProgress() && !InRecovery &&
759 		(locktag->locktag_type == LOCKTAG_OBJECT ||
760 		 locktag->locktag_type == LOCKTAG_RELATION) &&
761 		lockmode > RowExclusiveLock)
762 		ereport(ERROR,
763 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
764 				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
765 						lockMethodTable->lockModeNames[lockmode]),
766 				 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
767 
768 #ifdef LOCK_DEBUG
769 	if (LOCK_DEBUG_ENABLED(locktag))
770 		elog(LOG, "LockAcquire: lock [%u,%u] %s",
771 			 locktag->locktag_field1, locktag->locktag_field2,
772 			 lockMethodTable->lockModeNames[lockmode]);
773 #endif
774 
775 	/* Identify owner for lock */
776 	if (sessionLock)
777 		owner = NULL;
778 	else
779 		owner = CurrentResourceOwner;
780 
781 	/*
782 	 * Find or create a LOCALLOCK entry for this lock and lockmode
783 	 */
784 	MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
785 	localtag.lock = *locktag;
786 	localtag.mode = lockmode;
787 
788 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
789 										  (void *) &localtag,
790 										  HASH_ENTER, &found);
791 
792 	/*
793 	 * if it's a new locallock object, initialize it
794 	 */
795 	if (!found)
796 	{
797 		locallock->lock = NULL;
798 		locallock->proclock = NULL;
799 		locallock->hashcode = LockTagHashCode(&(localtag.lock));
800 		locallock->nLocks = 0;
801 		locallock->holdsStrongLockCount = false;
802 		locallock->lockCleared = false;
803 		locallock->numLockOwners = 0;
804 		locallock->maxLockOwners = 8;
805 		locallock->lockOwners = NULL;	/* in case next line fails */
806 		locallock->lockOwners = (LOCALLOCKOWNER *)
807 			MemoryContextAlloc(TopMemoryContext,
808 							   locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
809 	}
810 	else
811 	{
812 		/* Make sure there will be room to remember the lock */
813 		if (locallock->numLockOwners >= locallock->maxLockOwners)
814 		{
815 			int			newsize = locallock->maxLockOwners * 2;
816 
817 			locallock->lockOwners = (LOCALLOCKOWNER *)
818 				repalloc(locallock->lockOwners,
819 						 newsize * sizeof(LOCALLOCKOWNER));
820 			locallock->maxLockOwners = newsize;
821 		}
822 	}
823 	hashcode = locallock->hashcode;
824 
825 	if (locallockp)
826 		*locallockp = locallock;
827 
828 	/*
829 	 * If we already hold the lock, we can just increase the count locally.
830 	 *
831 	 * If lockCleared is already set, caller need not worry about absorbing
832 	 * sinval messages related to the lock's object.
833 	 */
834 	if (locallock->nLocks > 0)
835 	{
836 		GrantLockLocal(locallock, owner);
837 		if (locallock->lockCleared)
838 			return LOCKACQUIRE_ALREADY_CLEAR;
839 		else
840 			return LOCKACQUIRE_ALREADY_HELD;
841 	}
842 
843 	/*
844 	 * Prepare to emit a WAL record if acquisition of this lock needs to be
845 	 * replayed in a standby server.
846 	 *
847 	 * Here we prepare to log; after lock is acquired we'll issue log record.
848 	 * This arrangement simplifies error recovery in case the preparation step
849 	 * fails.
850 	 *
851 	 * Only AccessExclusiveLocks can conflict with lock types that read-only
852 	 * transactions can acquire in a standby server. Make sure this definition
853 	 * matches the one in GetRunningTransactionLocks().
854 	 */
855 	if (lockmode >= AccessExclusiveLock &&
856 		locktag->locktag_type == LOCKTAG_RELATION &&
857 		!RecoveryInProgress() &&
858 		XLogStandbyInfoActive())
859 	{
860 		LogAccessExclusiveLockPrepare();
861 		log_lock = true;
862 	}
863 
864 	/*
865 	 * Attempt to take lock via fast path, if eligible.  But if we remember
866 	 * having filled up the fast path array, we don't attempt to make any
867 	 * further use of it until we release some locks.  It's possible that some
868 	 * other backend has transferred some of those locks to the shared hash
869 	 * table, leaving space free, but it's not worth acquiring the LWLock just
870 	 * to check.  It's also possible that we're acquiring a second or third
871 	 * lock type on a relation we have already locked using the fast-path, but
872 	 * for now we don't worry about that case either.
873 	 */
874 	if (EligibleForRelationFastPath(locktag, lockmode) &&
875 		FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
876 	{
877 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
878 		bool		acquired;
879 
880 		/*
881 		 * LWLockAcquire acts as a memory sequencing point, so it's safe to
882 		 * assume that any strong locker whose increment to
883 		 * FastPathStrongRelationLocks->counts becomes visible after we test
884 		 * it has yet to begin to transfer fast-path locks.
885 		 */
886 		LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
887 		if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
888 			acquired = false;
889 		else
890 			acquired = FastPathGrantRelationLock(locktag->locktag_field2,
891 												 lockmode);
892 		LWLockRelease(&MyProc->backendLock);
893 		if (acquired)
894 		{
895 			/*
896 			 * The locallock might contain stale pointers to some old shared
897 			 * objects; we MUST reset these to null before considering the
898 			 * lock to be acquired via fast-path.
899 			 */
900 			locallock->lock = NULL;
901 			locallock->proclock = NULL;
902 			GrantLockLocal(locallock, owner);
903 			return LOCKACQUIRE_OK;
904 		}
905 	}
906 
907 	/*
908 	 * If this lock could potentially have been taken via the fast-path by
909 	 * some other backend, we must (temporarily) disable further use of the
910 	 * fast-path for this lock tag, and migrate any locks already taken via
911 	 * this method to the main lock table.
912 	 */
913 	if (ConflictsWithRelationFastPath(locktag, lockmode))
914 	{
915 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
916 
917 		BeginStrongLockAcquire(locallock, fasthashcode);
918 		if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
919 										   hashcode))
920 		{
921 			AbortStrongLockAcquire();
922 			if (locallock->nLocks == 0)
923 				RemoveLocalLock(locallock);
924 			if (locallockp)
925 				*locallockp = NULL;
926 			if (reportMemoryError)
927 				ereport(ERROR,
928 						(errcode(ERRCODE_OUT_OF_MEMORY),
929 						 errmsg("out of shared memory"),
930 						 errhint("You might need to increase max_locks_per_transaction.")));
931 			else
932 				return LOCKACQUIRE_NOT_AVAIL;
933 		}
934 	}
935 
936 	/*
937 	 * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
938 	 * take it via the fast-path, either, so we've got to mess with the shared
939 	 * lock table.
940 	 */
941 	partitionLock = LockHashPartitionLock(hashcode);
942 
943 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
944 
945 	/*
946 	 * Find or create lock and proclock entries with this tag
947 	 *
948 	 * Note: if the locallock object already existed, it might have a pointer
949 	 * to the lock already ... but we should not assume that that pointer is
950 	 * valid, since a lock object with zero hold and request counts can go
951 	 * away anytime.  So we have to use SetupLockInTable() to recompute the
952 	 * lock and proclock pointers, even if they're already set.
953 	 */
954 	proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
955 								hashcode, lockmode);
956 	if (!proclock)
957 	{
958 		AbortStrongLockAcquire();
959 		LWLockRelease(partitionLock);
960 		if (locallock->nLocks == 0)
961 			RemoveLocalLock(locallock);
962 		if (locallockp)
963 			*locallockp = NULL;
964 		if (reportMemoryError)
965 			ereport(ERROR,
966 					(errcode(ERRCODE_OUT_OF_MEMORY),
967 					 errmsg("out of shared memory"),
968 					 errhint("You might need to increase max_locks_per_transaction.")));
969 		else
970 			return LOCKACQUIRE_NOT_AVAIL;
971 	}
972 	locallock->proclock = proclock;
973 	lock = proclock->tag.myLock;
974 	locallock->lock = lock;
975 
976 	/*
977 	 * If lock requested conflicts with locks requested by waiters, must join
978 	 * wait queue.  Otherwise, check for conflict with already-held locks.
979 	 * (That's last because most complex check.)
980 	 */
981 	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
982 		status = STATUS_FOUND;
983 	else
984 		status = LockCheckConflicts(lockMethodTable, lockmode,
985 									lock, proclock);
986 
987 	if (status == STATUS_OK)
988 	{
989 		/* No conflict with held or previously requested locks */
990 		GrantLock(lock, proclock, lockmode);
991 		GrantLockLocal(locallock, owner);
992 	}
993 	else
994 	{
995 		Assert(status == STATUS_FOUND);
996 
997 		/*
998 		 * We can't acquire the lock immediately.  If caller specified no
999 		 * blocking, remove useless table entries and return NOT_AVAIL without
1000 		 * waiting.
1001 		 */
1002 		if (dontWait)
1003 		{
1004 			AbortStrongLockAcquire();
1005 			if (proclock->holdMask == 0)
1006 			{
1007 				uint32		proclock_hashcode;
1008 
1009 				proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1010 				SHMQueueDelete(&proclock->lockLink);
1011 				SHMQueueDelete(&proclock->procLink);
1012 				if (!hash_search_with_hash_value(LockMethodProcLockHash,
1013 												 (void *) &(proclock->tag),
1014 												 proclock_hashcode,
1015 												 HASH_REMOVE,
1016 												 NULL))
1017 					elog(PANIC, "proclock table corrupted");
1018 			}
1019 			else
1020 				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
1021 			lock->nRequested--;
1022 			lock->requested[lockmode]--;
1023 			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
1024 			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
1025 			Assert(lock->nGranted <= lock->nRequested);
1026 			LWLockRelease(partitionLock);
1027 			if (locallock->nLocks == 0)
1028 				RemoveLocalLock(locallock);
1029 			if (locallockp)
1030 				*locallockp = NULL;
1031 			return LOCKACQUIRE_NOT_AVAIL;
1032 		}
1033 
1034 		/*
1035 		 * Set bitmask of locks this process already holds on this object.
1036 		 */
1037 		MyProc->heldLocks = proclock->holdMask;
1038 
1039 		/*
1040 		 * Sleep till someone wakes me up.
1041 		 */
1042 
1043 		TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
1044 										 locktag->locktag_field2,
1045 										 locktag->locktag_field3,
1046 										 locktag->locktag_field4,
1047 										 locktag->locktag_type,
1048 										 lockmode);
1049 
1050 		WaitOnLock(locallock, owner);
1051 
1052 		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
1053 										locktag->locktag_field2,
1054 										locktag->locktag_field3,
1055 										locktag->locktag_field4,
1056 										locktag->locktag_type,
1057 										lockmode);
1058 
1059 		/*
1060 		 * NOTE: do not do any material change of state between here and
1061 		 * return.  All required changes in locktable state must have been
1062 		 * done when the lock was granted to us --- see notes in WaitOnLock.
1063 		 */
1064 
1065 		/*
1066 		 * Check the proclock entry status, in case something in the ipc
1067 		 * communication doesn't work correctly.
1068 		 */
1069 		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1070 		{
1071 			AbortStrongLockAcquire();
1072 			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
1073 			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
1074 			/* Should we retry ? */
1075 			LWLockRelease(partitionLock);
1076 			elog(ERROR, "LockAcquire failed");
1077 		}
1078 		PROCLOCK_PRINT("LockAcquire: granted", proclock);
1079 		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
1080 	}
1081 
1082 	/*
1083 	 * Lock state is fully up-to-date now; if we error out after this, no
1084 	 * special error cleanup is required.
1085 	 */
1086 	FinishStrongLockAcquire();
1087 
1088 	LWLockRelease(partitionLock);
1089 
1090 	/*
1091 	 * Emit a WAL record if acquisition of this lock needs to be replayed in a
1092 	 * standby server.
1093 	 */
1094 	if (log_lock)
1095 	{
1096 		/*
1097 		 * Decode the locktag back to the original values, to avoid sending
1098 		 * lots of empty bytes with every message.  See lock.h to check how a
1099 		 * locktag is defined for LOCKTAG_RELATION
1100 		 */
1101 		LogAccessExclusiveLock(locktag->locktag_field1,
1102 							   locktag->locktag_field2);
1103 	}
1104 
1105 	return LOCKACQUIRE_OK;
1106 }
1107 
1108 /*
1109  * Find or create LOCK and PROCLOCK objects as needed for a new lock
1110  * request.
1111  *
1112  * Returns the PROCLOCK object, or NULL if we failed to create the objects
1113  * for lack of shared memory.
1114  *
1115  * The appropriate partition lock must be held at entry, and will be
1116  * held at exit.
1117  */
1118 static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable,PGPROC * proc,const LOCKTAG * locktag,uint32 hashcode,LOCKMODE lockmode)1119 SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
1120 				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
1121 {
1122 	LOCK	   *lock;
1123 	PROCLOCK   *proclock;
1124 	PROCLOCKTAG proclocktag;
1125 	uint32		proclock_hashcode;
1126 	bool		found;
1127 
1128 	/*
1129 	 * Find or create a lock with this tag.
1130 	 */
1131 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1132 												(const void *) locktag,
1133 												hashcode,
1134 												HASH_ENTER_NULL,
1135 												&found);
1136 	if (!lock)
1137 		return NULL;
1138 
1139 	/*
1140 	 * if it's a new lock object, initialize it
1141 	 */
1142 	if (!found)
1143 	{
1144 		lock->grantMask = 0;
1145 		lock->waitMask = 0;
1146 		SHMQueueInit(&(lock->procLocks));
1147 		ProcQueueInit(&(lock->waitProcs));
1148 		lock->nRequested = 0;
1149 		lock->nGranted = 0;
1150 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
1151 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
1152 		LOCK_PRINT("LockAcquire: new", lock, lockmode);
1153 	}
1154 	else
1155 	{
1156 		LOCK_PRINT("LockAcquire: found", lock, lockmode);
1157 		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1158 		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1159 		Assert(lock->nGranted <= lock->nRequested);
1160 	}
1161 
1162 	/*
1163 	 * Create the hash key for the proclock table.
1164 	 */
1165 	proclocktag.myLock = lock;
1166 	proclocktag.myProc = proc;
1167 
1168 	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
1169 
1170 	/*
1171 	 * Find or create a proclock entry with this tag
1172 	 */
1173 	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
1174 														(void *) &proclocktag,
1175 														proclock_hashcode,
1176 														HASH_ENTER_NULL,
1177 														&found);
1178 	if (!proclock)
1179 	{
1180 		/* Oops, not enough shmem for the proclock */
1181 		if (lock->nRequested == 0)
1182 		{
1183 			/*
1184 			 * There are no other requestors of this lock, so garbage-collect
1185 			 * the lock object.  We *must* do this to avoid a permanent leak
1186 			 * of shared memory, because there won't be anything to cause
1187 			 * anyone to release the lock object later.
1188 			 */
1189 			Assert(SHMQueueEmpty(&(lock->procLocks)));
1190 			if (!hash_search_with_hash_value(LockMethodLockHash,
1191 											 (void *) &(lock->tag),
1192 											 hashcode,
1193 											 HASH_REMOVE,
1194 											 NULL))
1195 				elog(PANIC, "lock table corrupted");
1196 		}
1197 		return NULL;
1198 	}
1199 
1200 	/*
1201 	 * If new, initialize the new entry
1202 	 */
1203 	if (!found)
1204 	{
1205 		uint32		partition = LockHashPartition(hashcode);
1206 
1207 		/*
1208 		 * It might seem unsafe to access proclock->groupLeader without a
1209 		 * lock, but it's not really.  Either we are initializing a proclock
1210 		 * on our own behalf, in which case our group leader isn't changing
1211 		 * because the group leader for a process can only ever be changed by
1212 		 * the process itself; or else we are transferring a fast-path lock to
1213 		 * the main lock table, in which case that process can't change it's
1214 		 * lock group leader without first releasing all of its locks (and in
1215 		 * particular the one we are currently transferring).
1216 		 */
1217 		proclock->groupLeader = proc->lockGroupLeader != NULL ?
1218 			proc->lockGroupLeader : proc;
1219 		proclock->holdMask = 0;
1220 		proclock->releaseMask = 0;
1221 		/* Add proclock to appropriate lists */
1222 		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1223 		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1224 							 &proclock->procLink);
1225 		PROCLOCK_PRINT("LockAcquire: new", proclock);
1226 	}
1227 	else
1228 	{
1229 		PROCLOCK_PRINT("LockAcquire: found", proclock);
1230 		Assert((proclock->holdMask & ~lock->grantMask) == 0);
1231 
1232 #ifdef CHECK_DEADLOCK_RISK
1233 
1234 		/*
1235 		 * Issue warning if we already hold a lower-level lock on this object
1236 		 * and do not hold a lock of the requested level or higher. This
1237 		 * indicates a deadlock-prone coding practice (eg, we'd have a
1238 		 * deadlock if another backend were following the same code path at
1239 		 * about the same time).
1240 		 *
1241 		 * This is not enabled by default, because it may generate log entries
1242 		 * about user-level coding practices that are in fact safe in context.
1243 		 * It can be enabled to help find system-level problems.
1244 		 *
1245 		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
1246 		 * better to use a table.  For now, though, this works.
1247 		 */
1248 		{
1249 			int			i;
1250 
1251 			for (i = lockMethodTable->numLockModes; i > 0; i--)
1252 			{
1253 				if (proclock->holdMask & LOCKBIT_ON(i))
1254 				{
1255 					if (i >= (int) lockmode)
1256 						break;	/* safe: we have a lock >= req level */
1257 					elog(LOG, "deadlock risk: raising lock level"
1258 						 " from %s to %s on object %u/%u/%u",
1259 						 lockMethodTable->lockModeNames[i],
1260 						 lockMethodTable->lockModeNames[lockmode],
1261 						 lock->tag.locktag_field1, lock->tag.locktag_field2,
1262 						 lock->tag.locktag_field3);
1263 					break;
1264 				}
1265 			}
1266 		}
1267 #endif							/* CHECK_DEADLOCK_RISK */
1268 	}
1269 
1270 	/*
1271 	 * lock->nRequested and lock->requested[] count the total number of
1272 	 * requests, whether granted or waiting, so increment those immediately.
1273 	 * The other counts don't increment till we get the lock.
1274 	 */
1275 	lock->nRequested++;
1276 	lock->requested[lockmode]++;
1277 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1278 
1279 	/*
1280 	 * We shouldn't already hold the desired lock; else locallock table is
1281 	 * broken.
1282 	 */
1283 	if (proclock->holdMask & LOCKBIT_ON(lockmode))
1284 		elog(ERROR, "lock %s on object %u/%u/%u is already held",
1285 			 lockMethodTable->lockModeNames[lockmode],
1286 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
1287 			 lock->tag.locktag_field3);
1288 
1289 	return proclock;
1290 }
1291 
1292 /*
1293  * Subroutine to free a locallock entry
1294  */
1295 static void
RemoveLocalLock(LOCALLOCK * locallock)1296 RemoveLocalLock(LOCALLOCK *locallock)
1297 {
1298 	int			i;
1299 
1300 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
1301 	{
1302 		if (locallock->lockOwners[i].owner != NULL)
1303 			ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
1304 	}
1305 	locallock->numLockOwners = 0;
1306 	if (locallock->lockOwners != NULL)
1307 		pfree(locallock->lockOwners);
1308 	locallock->lockOwners = NULL;
1309 
1310 	if (locallock->holdsStrongLockCount)
1311 	{
1312 		uint32		fasthashcode;
1313 
1314 		fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1315 
1316 		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1317 		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1318 		FastPathStrongRelationLocks->count[fasthashcode]--;
1319 		locallock->holdsStrongLockCount = false;
1320 		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1321 	}
1322 
1323 	if (!hash_search(LockMethodLocalHash,
1324 					 (void *) &(locallock->tag),
1325 					 HASH_REMOVE, NULL))
1326 		elog(WARNING, "locallock table corrupted");
1327 }
1328 
1329 /*
1330  * LockCheckConflicts -- test whether requested lock conflicts
1331  *		with those already granted
1332  *
1333  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1334  *
1335  * NOTES:
1336  *		Here's what makes this complicated: one process's locks don't
1337  * conflict with one another, no matter what purpose they are held for
1338  * (eg, session and transaction locks do not conflict).  Nor do the locks
1339  * of one process in a lock group conflict with those of another process in
1340  * the same group.  So, we must subtract off these locks when determining
1341  * whether the requested new lock conflicts with those already held.
1342  */
1343 int
LockCheckConflicts(LockMethod lockMethodTable,LOCKMODE lockmode,LOCK * lock,PROCLOCK * proclock)1344 LockCheckConflicts(LockMethod lockMethodTable,
1345 				   LOCKMODE lockmode,
1346 				   LOCK *lock,
1347 				   PROCLOCK *proclock)
1348 {
1349 	int			numLockModes = lockMethodTable->numLockModes;
1350 	LOCKMASK	myLocks;
1351 	int			conflictMask = lockMethodTable->conflictTab[lockmode];
1352 	int			conflictsRemaining[MAX_LOCKMODES];
1353 	int			totalConflictsRemaining = 0;
1354 	int			i;
1355 	SHM_QUEUE  *procLocks;
1356 	PROCLOCK   *otherproclock;
1357 
1358 	/*
1359 	 * first check for global conflicts: If no locks conflict with my request,
1360 	 * then I get the lock.
1361 	 *
1362 	 * Checking for conflict: lock->grantMask represents the types of
1363 	 * currently held locks.  conflictTable[lockmode] has a bit set for each
1364 	 * type of lock that conflicts with request.   Bitwise compare tells if
1365 	 * there is a conflict.
1366 	 */
1367 	if (!(conflictMask & lock->grantMask))
1368 	{
1369 		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1370 		return STATUS_OK;
1371 	}
1372 
1373 	/*
1374 	 * Rats.  Something conflicts.  But it could still be my own lock, or a
1375 	 * lock held by another member of my locking group.  First, figure out how
1376 	 * many conflicts remain after subtracting out any locks I hold myself.
1377 	 */
1378 	myLocks = proclock->holdMask;
1379 	for (i = 1; i <= numLockModes; i++)
1380 	{
1381 		if ((conflictMask & LOCKBIT_ON(i)) == 0)
1382 		{
1383 			conflictsRemaining[i] = 0;
1384 			continue;
1385 		}
1386 		conflictsRemaining[i] = lock->granted[i];
1387 		if (myLocks & LOCKBIT_ON(i))
1388 			--conflictsRemaining[i];
1389 		totalConflictsRemaining += conflictsRemaining[i];
1390 	}
1391 
1392 	/* If no conflicts remain, we get the lock. */
1393 	if (totalConflictsRemaining == 0)
1394 	{
1395 		PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
1396 		return STATUS_OK;
1397 	}
1398 
1399 	/* If no group locking, it's definitely a conflict. */
1400 	if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
1401 	{
1402 		Assert(proclock->tag.myProc == MyProc);
1403 		PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
1404 					   proclock);
1405 		return STATUS_FOUND;
1406 	}
1407 
1408 	/*
1409 	 * Locks held in conflicting modes by members of our own lock group are
1410 	 * not real conflicts; we can subtract those out and see if we still have
1411 	 * a conflict.  This is O(N) in the number of processes holding or
1412 	 * awaiting locks on this object.  We could improve that by making the
1413 	 * shared memory state more complex (and larger) but it doesn't seem worth
1414 	 * it.
1415 	 */
1416 	procLocks = &(lock->procLocks);
1417 	otherproclock = (PROCLOCK *)
1418 		SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
1419 	while (otherproclock != NULL)
1420 	{
1421 		if (proclock != otherproclock &&
1422 			proclock->groupLeader == otherproclock->groupLeader &&
1423 			(otherproclock->holdMask & conflictMask) != 0)
1424 		{
1425 			int			intersectMask = otherproclock->holdMask & conflictMask;
1426 
1427 			for (i = 1; i <= numLockModes; i++)
1428 			{
1429 				if ((intersectMask & LOCKBIT_ON(i)) != 0)
1430 				{
1431 					if (conflictsRemaining[i] <= 0)
1432 						elog(PANIC, "proclocks held do not match lock");
1433 					conflictsRemaining[i]--;
1434 					totalConflictsRemaining--;
1435 				}
1436 			}
1437 
1438 			if (totalConflictsRemaining == 0)
1439 			{
1440 				PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
1441 							   proclock);
1442 				return STATUS_OK;
1443 			}
1444 		}
1445 		otherproclock = (PROCLOCK *)
1446 			SHMQueueNext(procLocks, &otherproclock->lockLink,
1447 						 offsetof(PROCLOCK, lockLink));
1448 	}
1449 
1450 	/* Nope, it's a real conflict. */
1451 	PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
1452 	return STATUS_FOUND;
1453 }
1454 
1455 /*
1456  * GrantLock -- update the lock and proclock data structures to show
1457  *		the lock request has been granted.
1458  *
1459  * NOTE: if proc was blocked, it also needs to be removed from the wait list
1460  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
1461  *
1462  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
1463  * table entry; but since we may be awaking some other process, we can't do
1464  * that here; it's done by GrantLockLocal, instead.
1465  */
1466 void
GrantLock(LOCK * lock,PROCLOCK * proclock,LOCKMODE lockmode)1467 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
1468 {
1469 	lock->nGranted++;
1470 	lock->granted[lockmode]++;
1471 	lock->grantMask |= LOCKBIT_ON(lockmode);
1472 	if (lock->granted[lockmode] == lock->requested[lockmode])
1473 		lock->waitMask &= LOCKBIT_OFF(lockmode);
1474 	proclock->holdMask |= LOCKBIT_ON(lockmode);
1475 	LOCK_PRINT("GrantLock", lock, lockmode);
1476 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1477 	Assert(lock->nGranted <= lock->nRequested);
1478 }
1479 
1480 /*
1481  * UnGrantLock -- opposite of GrantLock.
1482  *
1483  * Updates the lock and proclock data structures to show that the lock
1484  * is no longer held nor requested by the current holder.
1485  *
1486  * Returns true if there were any waiters waiting on the lock that
1487  * should now be woken up with ProcLockWakeup.
1488  */
1489 static bool
UnGrantLock(LOCK * lock,LOCKMODE lockmode,PROCLOCK * proclock,LockMethod lockMethodTable)1490 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
1491 			PROCLOCK *proclock, LockMethod lockMethodTable)
1492 {
1493 	bool		wakeupNeeded = false;
1494 
1495 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1496 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1497 	Assert(lock->nGranted <= lock->nRequested);
1498 
1499 	/*
1500 	 * fix the general lock stats
1501 	 */
1502 	lock->nRequested--;
1503 	lock->requested[lockmode]--;
1504 	lock->nGranted--;
1505 	lock->granted[lockmode]--;
1506 
1507 	if (lock->granted[lockmode] == 0)
1508 	{
1509 		/* change the conflict mask.  No more of this lock type. */
1510 		lock->grantMask &= LOCKBIT_OFF(lockmode);
1511 	}
1512 
1513 	LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
1514 
1515 	/*
1516 	 * We need only run ProcLockWakeup if the released lock conflicts with at
1517 	 * least one of the lock types requested by waiter(s).  Otherwise whatever
1518 	 * conflict made them wait must still exist.  NOTE: before MVCC, we could
1519 	 * skip wakeup if lock->granted[lockmode] was still positive. But that's
1520 	 * not true anymore, because the remaining granted locks might belong to
1521 	 * some waiter, who could now be awakened because he doesn't conflict with
1522 	 * his own locks.
1523 	 */
1524 	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1525 		wakeupNeeded = true;
1526 
1527 	/*
1528 	 * Now fix the per-proclock state.
1529 	 */
1530 	proclock->holdMask &= LOCKBIT_OFF(lockmode);
1531 	PROCLOCK_PRINT("UnGrantLock: updated", proclock);
1532 
1533 	return wakeupNeeded;
1534 }
1535 
1536 /*
1537  * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
1538  * proclock and lock objects if possible, and call ProcLockWakeup if there
1539  * are remaining requests and the caller says it's OK.  (Normally, this
1540  * should be called after UnGrantLock, and wakeupNeeded is the result from
1541  * UnGrantLock.)
1542  *
1543  * The appropriate partition lock must be held at entry, and will be
1544  * held at exit.
1545  */
1546 static void
CleanUpLock(LOCK * lock,PROCLOCK * proclock,LockMethod lockMethodTable,uint32 hashcode,bool wakeupNeeded)1547 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1548 			LockMethod lockMethodTable, uint32 hashcode,
1549 			bool wakeupNeeded)
1550 {
1551 	/*
1552 	 * If this was my last hold on this lock, delete my entry in the proclock
1553 	 * table.
1554 	 */
1555 	if (proclock->holdMask == 0)
1556 	{
1557 		uint32		proclock_hashcode;
1558 
1559 		PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
1560 		SHMQueueDelete(&proclock->lockLink);
1561 		SHMQueueDelete(&proclock->procLink);
1562 		proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1563 		if (!hash_search_with_hash_value(LockMethodProcLockHash,
1564 										 (void *) &(proclock->tag),
1565 										 proclock_hashcode,
1566 										 HASH_REMOVE,
1567 										 NULL))
1568 			elog(PANIC, "proclock table corrupted");
1569 	}
1570 
1571 	if (lock->nRequested == 0)
1572 	{
1573 		/*
1574 		 * The caller just released the last lock, so garbage-collect the lock
1575 		 * object.
1576 		 */
1577 		LOCK_PRINT("CleanUpLock: deleting", lock, 0);
1578 		Assert(SHMQueueEmpty(&(lock->procLocks)));
1579 		if (!hash_search_with_hash_value(LockMethodLockHash,
1580 										 (void *) &(lock->tag),
1581 										 hashcode,
1582 										 HASH_REMOVE,
1583 										 NULL))
1584 			elog(PANIC, "lock table corrupted");
1585 	}
1586 	else if (wakeupNeeded)
1587 	{
1588 		/* There are waiters on this lock, so wake them up. */
1589 		ProcLockWakeup(lockMethodTable, lock);
1590 	}
1591 }
1592 
1593 /*
1594  * GrantLockLocal -- update the locallock data structures to show
1595  *		the lock request has been granted.
1596  *
1597  * We expect that LockAcquire made sure there is room to add a new
1598  * ResourceOwner entry.
1599  */
1600 static void
GrantLockLocal(LOCALLOCK * locallock,ResourceOwner owner)1601 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1602 {
1603 	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1604 	int			i;
1605 
1606 	Assert(locallock->numLockOwners < locallock->maxLockOwners);
1607 	/* Count the total */
1608 	locallock->nLocks++;
1609 	/* Count the per-owner lock */
1610 	for (i = 0; i < locallock->numLockOwners; i++)
1611 	{
1612 		if (lockOwners[i].owner == owner)
1613 		{
1614 			lockOwners[i].nLocks++;
1615 			return;
1616 		}
1617 	}
1618 	lockOwners[i].owner = owner;
1619 	lockOwners[i].nLocks = 1;
1620 	locallock->numLockOwners++;
1621 	if (owner != NULL)
1622 		ResourceOwnerRememberLock(owner, locallock);
1623 }
1624 
1625 /*
1626  * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
1627  * and arrange for error cleanup if it fails
1628  */
1629 static void
BeginStrongLockAcquire(LOCALLOCK * locallock,uint32 fasthashcode)1630 BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
1631 {
1632 	Assert(StrongLockInProgress == NULL);
1633 	Assert(locallock->holdsStrongLockCount == false);
1634 
1635 	/*
1636 	 * Adding to a memory location is not atomic, so we take a spinlock to
1637 	 * ensure we don't collide with someone else trying to bump the count at
1638 	 * the same time.
1639 	 *
1640 	 * XXX: It might be worth considering using an atomic fetch-and-add
1641 	 * instruction here, on architectures where that is supported.
1642 	 */
1643 
1644 	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1645 	FastPathStrongRelationLocks->count[fasthashcode]++;
1646 	locallock->holdsStrongLockCount = true;
1647 	StrongLockInProgress = locallock;
1648 	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1649 }
1650 
1651 /*
1652  * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
1653  * acquisition once it's no longer needed
1654  */
1655 static void
FinishStrongLockAcquire(void)1656 FinishStrongLockAcquire(void)
1657 {
1658 	StrongLockInProgress = NULL;
1659 }
1660 
1661 /*
1662  * AbortStrongLockAcquire - undo strong lock state changes performed by
1663  * BeginStrongLockAcquire.
1664  */
1665 void
AbortStrongLockAcquire(void)1666 AbortStrongLockAcquire(void)
1667 {
1668 	uint32		fasthashcode;
1669 	LOCALLOCK  *locallock = StrongLockInProgress;
1670 
1671 	if (locallock == NULL)
1672 		return;
1673 
1674 	fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1675 	Assert(locallock->holdsStrongLockCount == true);
1676 	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1677 	Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1678 	FastPathStrongRelationLocks->count[fasthashcode]--;
1679 	locallock->holdsStrongLockCount = false;
1680 	StrongLockInProgress = NULL;
1681 	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1682 }
1683 
1684 /*
1685  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1686  *		WaitOnLock on.
1687  *
1688  * proc.c needs this for the case where we are booted off the lock by
1689  * timeout, but discover that someone granted us the lock anyway.
1690  *
1691  * We could just export GrantLockLocal, but that would require including
1692  * resowner.h in lock.h, which creates circularity.
1693  */
1694 void
GrantAwaitedLock(void)1695 GrantAwaitedLock(void)
1696 {
1697 	GrantLockLocal(awaitedLock, awaitedOwner);
1698 }
1699 
1700 /*
1701  * MarkLockClear -- mark an acquired lock as "clear"
1702  *
1703  * This means that we know we have absorbed all sinval messages that other
1704  * sessions generated before we acquired this lock, and so we can confidently
1705  * assume we know about any catalog changes protected by this lock.
1706  */
1707 void
MarkLockClear(LOCALLOCK * locallock)1708 MarkLockClear(LOCALLOCK *locallock)
1709 {
1710 	Assert(locallock->nLocks > 0);
1711 	locallock->lockCleared = true;
1712 }
1713 
1714 /*
1715  * WaitOnLock -- wait to acquire a lock
1716  *
1717  * Caller must have set MyProc->heldLocks to reflect locks already held
1718  * on the lockable object by this process.
1719  *
1720  * The appropriate partition lock must be held at entry.
1721  */
1722 static void
WaitOnLock(LOCALLOCK * locallock,ResourceOwner owner)1723 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1724 {
1725 	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1726 	LockMethod	lockMethodTable = LockMethods[lockmethodid];
1727 	char	   *volatile new_status = NULL;
1728 
1729 	LOCK_PRINT("WaitOnLock: sleeping on lock",
1730 			   locallock->lock, locallock->tag.mode);
1731 
1732 	/* Report change to waiting status */
1733 	if (update_process_title)
1734 	{
1735 		const char *old_status;
1736 		int			len;
1737 
1738 		old_status = get_ps_display(&len);
1739 		new_status = (char *) palloc(len + 8 + 1);
1740 		memcpy(new_status, old_status, len);
1741 		strcpy(new_status + len, " waiting");
1742 		set_ps_display(new_status, false);
1743 		new_status[len] = '\0'; /* truncate off " waiting" */
1744 	}
1745 
1746 	awaitedLock = locallock;
1747 	awaitedOwner = owner;
1748 
1749 	/*
1750 	 * NOTE: Think not to put any shared-state cleanup after the call to
1751 	 * ProcSleep, in either the normal or failure path.  The lock state must
1752 	 * be fully set by the lock grantor, or by CheckDeadLock if we give up
1753 	 * waiting for the lock.  This is necessary because of the possibility
1754 	 * that a cancel/die interrupt will interrupt ProcSleep after someone else
1755 	 * grants us the lock, but before we've noticed it. Hence, after granting,
1756 	 * the locktable state must fully reflect the fact that we own the lock;
1757 	 * we can't do additional work on return.
1758 	 *
1759 	 * We can and do use a PG_TRY block to try to clean up after failure, but
1760 	 * this still has a major limitation: elog(FATAL) can occur while waiting
1761 	 * (eg, a "die" interrupt), and then control won't come back here. So all
1762 	 * cleanup of essential state should happen in LockErrorCleanup, not here.
1763 	 * We can use PG_TRY to clear the "waiting" status flags, since doing that
1764 	 * is unimportant if the process exits.
1765 	 */
1766 	PG_TRY();
1767 	{
1768 		if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1769 		{
1770 			/*
1771 			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1772 			 * now.
1773 			 */
1774 			awaitedLock = NULL;
1775 			LOCK_PRINT("WaitOnLock: aborting on lock",
1776 					   locallock->lock, locallock->tag.mode);
1777 			LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1778 
1779 			/*
1780 			 * Now that we aren't holding the partition lock, we can give an
1781 			 * error report including details about the detected deadlock.
1782 			 */
1783 			DeadLockReport();
1784 			/* not reached */
1785 		}
1786 	}
1787 	PG_CATCH();
1788 	{
1789 		/* In this path, awaitedLock remains set until LockErrorCleanup */
1790 
1791 		/* Report change to non-waiting status */
1792 		if (update_process_title)
1793 		{
1794 			set_ps_display(new_status, false);
1795 			pfree(new_status);
1796 		}
1797 
1798 		/* and propagate the error */
1799 		PG_RE_THROW();
1800 	}
1801 	PG_END_TRY();
1802 
1803 	awaitedLock = NULL;
1804 
1805 	/* Report change to non-waiting status */
1806 	if (update_process_title)
1807 	{
1808 		set_ps_display(new_status, false);
1809 		pfree(new_status);
1810 	}
1811 
1812 	LOCK_PRINT("WaitOnLock: wakeup on lock",
1813 			   locallock->lock, locallock->tag.mode);
1814 }
1815 
1816 /*
1817  * Remove a proc from the wait-queue it is on (caller must know it is on one).
1818  * This is only used when the proc has failed to get the lock, so we set its
1819  * waitStatus to STATUS_ERROR.
1820  *
1821  * Appropriate partition lock must be held by caller.  Also, caller is
1822  * responsible for signaling the proc if needed.
1823  *
1824  * NB: this does not clean up any locallock object that may exist for the lock.
1825  */
1826 void
RemoveFromWaitQueue(PGPROC * proc,uint32 hashcode)1827 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1828 {
1829 	LOCK	   *waitLock = proc->waitLock;
1830 	PROCLOCK   *proclock = proc->waitProcLock;
1831 	LOCKMODE	lockmode = proc->waitLockMode;
1832 	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1833 
1834 	/* Make sure proc is waiting */
1835 	Assert(proc->waitStatus == STATUS_WAITING);
1836 	Assert(proc->links.next != NULL);
1837 	Assert(waitLock);
1838 	Assert(waitLock->waitProcs.size > 0);
1839 	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1840 
1841 	/* Remove proc from lock's wait queue */
1842 	SHMQueueDelete(&(proc->links));
1843 	waitLock->waitProcs.size--;
1844 
1845 	/* Undo increments of request counts by waiting process */
1846 	Assert(waitLock->nRequested > 0);
1847 	Assert(waitLock->nRequested > proc->waitLock->nGranted);
1848 	waitLock->nRequested--;
1849 	Assert(waitLock->requested[lockmode] > 0);
1850 	waitLock->requested[lockmode]--;
1851 	/* don't forget to clear waitMask bit if appropriate */
1852 	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1853 		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1854 
1855 	/* Clean up the proc's own state, and pass it the ok/fail signal */
1856 	proc->waitLock = NULL;
1857 	proc->waitProcLock = NULL;
1858 	proc->waitStatus = STATUS_ERROR;
1859 
1860 	/*
1861 	 * Delete the proclock immediately if it represents no already-held locks.
1862 	 * (This must happen now because if the owner of the lock decides to
1863 	 * release it, and the requested/granted counts then go to zero,
1864 	 * LockRelease expects there to be no remaining proclocks.) Then see if
1865 	 * any other waiters for the lock can be woken up now.
1866 	 */
1867 	CleanUpLock(waitLock, proclock,
1868 				LockMethods[lockmethodid], hashcode,
1869 				true);
1870 }
1871 
1872 /*
1873  * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
1874  *		Release a session lock if 'sessionLock' is true, else release a
1875  *		regular transaction lock.
1876  *
1877  * Side Effects: find any waiting processes that are now wakable,
1878  *		grant them their requested locks and awaken them.
1879  *		(We have to grant the lock here to avoid a race between
1880  *		the waking process and any new process to
1881  *		come along and request the lock.)
1882  */
1883 bool
LockRelease(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)1884 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1885 {
1886 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
1887 	LockMethod	lockMethodTable;
1888 	LOCALLOCKTAG localtag;
1889 	LOCALLOCK  *locallock;
1890 	LOCK	   *lock;
1891 	PROCLOCK   *proclock;
1892 	LWLock	   *partitionLock;
1893 	bool		wakeupNeeded;
1894 
1895 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1896 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1897 	lockMethodTable = LockMethods[lockmethodid];
1898 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
1899 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
1900 
1901 #ifdef LOCK_DEBUG
1902 	if (LOCK_DEBUG_ENABLED(locktag))
1903 		elog(LOG, "LockRelease: lock [%u,%u] %s",
1904 			 locktag->locktag_field1, locktag->locktag_field2,
1905 			 lockMethodTable->lockModeNames[lockmode]);
1906 #endif
1907 
1908 	/*
1909 	 * Find the LOCALLOCK entry for this lock and lockmode
1910 	 */
1911 	MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
1912 	localtag.lock = *locktag;
1913 	localtag.mode = lockmode;
1914 
1915 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1916 										  (void *) &localtag,
1917 										  HASH_FIND, NULL);
1918 
1919 	/*
1920 	 * let the caller print its own error message, too. Do not ereport(ERROR).
1921 	 */
1922 	if (!locallock || locallock->nLocks <= 0)
1923 	{
1924 		elog(WARNING, "you don't own a lock of type %s",
1925 			 lockMethodTable->lockModeNames[lockmode]);
1926 		return false;
1927 	}
1928 
1929 	/*
1930 	 * Decrease the count for the resource owner.
1931 	 */
1932 	{
1933 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1934 		ResourceOwner owner;
1935 		int			i;
1936 
1937 		/* Identify owner for lock */
1938 		if (sessionLock)
1939 			owner = NULL;
1940 		else
1941 			owner = CurrentResourceOwner;
1942 
1943 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
1944 		{
1945 			if (lockOwners[i].owner == owner)
1946 			{
1947 				Assert(lockOwners[i].nLocks > 0);
1948 				if (--lockOwners[i].nLocks == 0)
1949 				{
1950 					if (owner != NULL)
1951 						ResourceOwnerForgetLock(owner, locallock);
1952 					/* compact out unused slot */
1953 					locallock->numLockOwners--;
1954 					if (i < locallock->numLockOwners)
1955 						lockOwners[i] = lockOwners[locallock->numLockOwners];
1956 				}
1957 				break;
1958 			}
1959 		}
1960 		if (i < 0)
1961 		{
1962 			/* don't release a lock belonging to another owner */
1963 			elog(WARNING, "you don't own a lock of type %s",
1964 				 lockMethodTable->lockModeNames[lockmode]);
1965 			return false;
1966 		}
1967 	}
1968 
1969 	/*
1970 	 * Decrease the total local count.  If we're still holding the lock, we're
1971 	 * done.
1972 	 */
1973 	locallock->nLocks--;
1974 
1975 	if (locallock->nLocks > 0)
1976 		return true;
1977 
1978 	/*
1979 	 * At this point we can no longer suppose we are clear of invalidation
1980 	 * messages related to this lock.  Although we'll delete the LOCALLOCK
1981 	 * object before any intentional return from this routine, it seems worth
1982 	 * the trouble to explicitly reset lockCleared right now, just in case
1983 	 * some error prevents us from deleting the LOCALLOCK.
1984 	 */
1985 	locallock->lockCleared = false;
1986 
1987 	/* Attempt fast release of any lock eligible for the fast path. */
1988 	if (EligibleForRelationFastPath(locktag, lockmode) &&
1989 		FastPathLocalUseCount > 0)
1990 	{
1991 		bool		released;
1992 
1993 		/*
1994 		 * We might not find the lock here, even if we originally entered it
1995 		 * here.  Another backend may have moved it to the main table.
1996 		 */
1997 		LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
1998 		released = FastPathUnGrantRelationLock(locktag->locktag_field2,
1999 											   lockmode);
2000 		LWLockRelease(&MyProc->backendLock);
2001 		if (released)
2002 		{
2003 			RemoveLocalLock(locallock);
2004 			return true;
2005 		}
2006 	}
2007 
2008 	/*
2009 	 * Otherwise we've got to mess with the shared lock table.
2010 	 */
2011 	partitionLock = LockHashPartitionLock(locallock->hashcode);
2012 
2013 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2014 
2015 	/*
2016 	 * Normally, we don't need to re-find the lock or proclock, since we kept
2017 	 * their addresses in the locallock table, and they couldn't have been
2018 	 * removed while we were holding a lock on them.  But it's possible that
2019 	 * the lock was taken fast-path and has since been moved to the main hash
2020 	 * table by another backend, in which case we will need to look up the
2021 	 * objects here.  We assume the lock field is NULL if so.
2022 	 */
2023 	lock = locallock->lock;
2024 	if (!lock)
2025 	{
2026 		PROCLOCKTAG proclocktag;
2027 
2028 		Assert(EligibleForRelationFastPath(locktag, lockmode));
2029 		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2030 													(const void *) locktag,
2031 													locallock->hashcode,
2032 													HASH_FIND,
2033 													NULL);
2034 		if (!lock)
2035 			elog(ERROR, "failed to re-find shared lock object");
2036 		locallock->lock = lock;
2037 
2038 		proclocktag.myLock = lock;
2039 		proclocktag.myProc = MyProc;
2040 		locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2041 													   (void *) &proclocktag,
2042 													   HASH_FIND,
2043 													   NULL);
2044 		if (!locallock->proclock)
2045 			elog(ERROR, "failed to re-find shared proclock object");
2046 	}
2047 	LOCK_PRINT("LockRelease: found", lock, lockmode);
2048 	proclock = locallock->proclock;
2049 	PROCLOCK_PRINT("LockRelease: found", proclock);
2050 
2051 	/*
2052 	 * Double-check that we are actually holding a lock of the type we want to
2053 	 * release.
2054 	 */
2055 	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
2056 	{
2057 		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
2058 		LWLockRelease(partitionLock);
2059 		elog(WARNING, "you don't own a lock of type %s",
2060 			 lockMethodTable->lockModeNames[lockmode]);
2061 		RemoveLocalLock(locallock);
2062 		return false;
2063 	}
2064 
2065 	/*
2066 	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
2067 	 */
2068 	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
2069 
2070 	CleanUpLock(lock, proclock,
2071 				lockMethodTable, locallock->hashcode,
2072 				wakeupNeeded);
2073 
2074 	LWLockRelease(partitionLock);
2075 
2076 	RemoveLocalLock(locallock);
2077 	return true;
2078 }
2079 
2080 /*
2081  * LockReleaseAll -- Release all locks of the specified lock method that
2082  *		are held by the current process.
2083  *
2084  * Well, not necessarily *all* locks.  The available behaviors are:
2085  *		allLocks == true: release all locks including session locks.
2086  *		allLocks == false: release all non-session locks.
2087  */
2088 void
LockReleaseAll(LOCKMETHODID lockmethodid,bool allLocks)2089 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
2090 {
2091 	HASH_SEQ_STATUS status;
2092 	LockMethod	lockMethodTable;
2093 	int			i,
2094 				numLockModes;
2095 	LOCALLOCK  *locallock;
2096 	LOCK	   *lock;
2097 	PROCLOCK   *proclock;
2098 	int			partition;
2099 	bool		have_fast_path_lwlock = false;
2100 
2101 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2102 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2103 	lockMethodTable = LockMethods[lockmethodid];
2104 
2105 #ifdef LOCK_DEBUG
2106 	if (*(lockMethodTable->trace_flag))
2107 		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
2108 #endif
2109 
2110 	/*
2111 	 * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
2112 	 * the only way that the lock we hold on our own VXID can ever get
2113 	 * released: it is always and only released when a toplevel transaction
2114 	 * ends.
2115 	 */
2116 	if (lockmethodid == DEFAULT_LOCKMETHOD)
2117 		VirtualXactLockTableCleanup();
2118 
2119 	numLockModes = lockMethodTable->numLockModes;
2120 
2121 	/*
2122 	 * First we run through the locallock table and get rid of unwanted
2123 	 * entries, then we scan the process's proclocks and get rid of those. We
2124 	 * do this separately because we may have multiple locallock entries
2125 	 * pointing to the same proclock, and we daren't end up with any dangling
2126 	 * pointers.  Fast-path locks are cleaned up during the locallock table
2127 	 * scan, though.
2128 	 */
2129 	hash_seq_init(&status, LockMethodLocalHash);
2130 
2131 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2132 	{
2133 		/*
2134 		 * If the LOCALLOCK entry is unused, we must've run out of shared
2135 		 * memory while trying to set up this lock.  Just forget the local
2136 		 * entry.
2137 		 */
2138 		if (locallock->nLocks == 0)
2139 		{
2140 			RemoveLocalLock(locallock);
2141 			continue;
2142 		}
2143 
2144 		/* Ignore items that are not of the lockmethod to be removed */
2145 		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2146 			continue;
2147 
2148 		/*
2149 		 * If we are asked to release all locks, we can just zap the entry.
2150 		 * Otherwise, must scan to see if there are session locks. We assume
2151 		 * there is at most one lockOwners entry for session locks.
2152 		 */
2153 		if (!allLocks)
2154 		{
2155 			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
2156 
2157 			/* If session lock is above array position 0, move it down to 0 */
2158 			for (i = 0; i < locallock->numLockOwners; i++)
2159 			{
2160 				if (lockOwners[i].owner == NULL)
2161 					lockOwners[0] = lockOwners[i];
2162 				else
2163 					ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
2164 			}
2165 
2166 			if (locallock->numLockOwners > 0 &&
2167 				lockOwners[0].owner == NULL &&
2168 				lockOwners[0].nLocks > 0)
2169 			{
2170 				/* Fix the locallock to show just the session locks */
2171 				locallock->nLocks = lockOwners[0].nLocks;
2172 				locallock->numLockOwners = 1;
2173 				/* We aren't deleting this locallock, so done */
2174 				continue;
2175 			}
2176 			else
2177 				locallock->numLockOwners = 0;
2178 		}
2179 
2180 		/*
2181 		 * If the lock or proclock pointers are NULL, this lock was taken via
2182 		 * the relation fast-path (and is not known to have been transferred).
2183 		 */
2184 		if (locallock->proclock == NULL || locallock->lock == NULL)
2185 		{
2186 			LOCKMODE	lockmode = locallock->tag.mode;
2187 			Oid			relid;
2188 
2189 			/* Verify that a fast-path lock is what we've got. */
2190 			if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
2191 				elog(PANIC, "locallock table corrupted");
2192 
2193 			/*
2194 			 * If we don't currently hold the LWLock that protects our
2195 			 * fast-path data structures, we must acquire it before attempting
2196 			 * to release the lock via the fast-path.  We will continue to
2197 			 * hold the LWLock until we're done scanning the locallock table,
2198 			 * unless we hit a transferred fast-path lock.  (XXX is this
2199 			 * really such a good idea?  There could be a lot of entries ...)
2200 			 */
2201 			if (!have_fast_path_lwlock)
2202 			{
2203 				LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2204 				have_fast_path_lwlock = true;
2205 			}
2206 
2207 			/* Attempt fast-path release. */
2208 			relid = locallock->tag.lock.locktag_field2;
2209 			if (FastPathUnGrantRelationLock(relid, lockmode))
2210 			{
2211 				RemoveLocalLock(locallock);
2212 				continue;
2213 			}
2214 
2215 			/*
2216 			 * Our lock, originally taken via the fast path, has been
2217 			 * transferred to the main lock table.  That's going to require
2218 			 * some extra work, so release our fast-path lock before starting.
2219 			 */
2220 			LWLockRelease(&MyProc->backendLock);
2221 			have_fast_path_lwlock = false;
2222 
2223 			/*
2224 			 * Now dump the lock.  We haven't got a pointer to the LOCK or
2225 			 * PROCLOCK in this case, so we have to handle this a bit
2226 			 * differently than a normal lock release.  Unfortunately, this
2227 			 * requires an extra LWLock acquire-and-release cycle on the
2228 			 * partitionLock, but hopefully it shouldn't happen often.
2229 			 */
2230 			LockRefindAndRelease(lockMethodTable, MyProc,
2231 								 &locallock->tag.lock, lockmode, false);
2232 			RemoveLocalLock(locallock);
2233 			continue;
2234 		}
2235 
2236 		/* Mark the proclock to show we need to release this lockmode */
2237 		if (locallock->nLocks > 0)
2238 			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
2239 
2240 		/* And remove the locallock hashtable entry */
2241 		RemoveLocalLock(locallock);
2242 	}
2243 
2244 	/* Done with the fast-path data structures */
2245 	if (have_fast_path_lwlock)
2246 		LWLockRelease(&MyProc->backendLock);
2247 
2248 	/*
2249 	 * Now, scan each lock partition separately.
2250 	 */
2251 	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
2252 	{
2253 		LWLock	   *partitionLock;
2254 		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
2255 		PROCLOCK   *nextplock;
2256 
2257 		partitionLock = LockHashPartitionLockByIndex(partition);
2258 
2259 		/*
2260 		 * If the proclock list for this partition is empty, we can skip
2261 		 * acquiring the partition lock.  This optimization is trickier than
2262 		 * it looks, because another backend could be in process of adding
2263 		 * something to our proclock list due to promoting one of our
2264 		 * fast-path locks.  However, any such lock must be one that we
2265 		 * decided not to delete above, so it's okay to skip it again now;
2266 		 * we'd just decide not to delete it again.  We must, however, be
2267 		 * careful to re-fetch the list header once we've acquired the
2268 		 * partition lock, to be sure we have a valid, up-to-date pointer.
2269 		 * (There is probably no significant risk if pointer fetch/store is
2270 		 * atomic, but we don't wish to assume that.)
2271 		 *
2272 		 * XXX This argument assumes that the locallock table correctly
2273 		 * represents all of our fast-path locks.  While allLocks mode
2274 		 * guarantees to clean up all of our normal locks regardless of the
2275 		 * locallock situation, we lose that guarantee for fast-path locks.
2276 		 * This is not ideal.
2277 		 */
2278 		if (SHMQueueNext(procLocks, procLocks,
2279 						 offsetof(PROCLOCK, procLink)) == NULL)
2280 			continue;			/* needn't examine this partition */
2281 
2282 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2283 
2284 		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2285 												  offsetof(PROCLOCK, procLink));
2286 			 proclock;
2287 			 proclock = nextplock)
2288 		{
2289 			bool		wakeupNeeded = false;
2290 
2291 			/* Get link first, since we may unlink/delete this proclock */
2292 			nextplock = (PROCLOCK *)
2293 				SHMQueueNext(procLocks, &proclock->procLink,
2294 							 offsetof(PROCLOCK, procLink));
2295 
2296 			Assert(proclock->tag.myProc == MyProc);
2297 
2298 			lock = proclock->tag.myLock;
2299 
2300 			/* Ignore items that are not of the lockmethod to be removed */
2301 			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
2302 				continue;
2303 
2304 			/*
2305 			 * In allLocks mode, force release of all locks even if locallock
2306 			 * table had problems
2307 			 */
2308 			if (allLocks)
2309 				proclock->releaseMask = proclock->holdMask;
2310 			else
2311 				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
2312 
2313 			/*
2314 			 * Ignore items that have nothing to be released, unless they have
2315 			 * holdMask == 0 and are therefore recyclable
2316 			 */
2317 			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
2318 				continue;
2319 
2320 			PROCLOCK_PRINT("LockReleaseAll", proclock);
2321 			LOCK_PRINT("LockReleaseAll", lock, 0);
2322 			Assert(lock->nRequested >= 0);
2323 			Assert(lock->nGranted >= 0);
2324 			Assert(lock->nGranted <= lock->nRequested);
2325 			Assert((proclock->holdMask & ~lock->grantMask) == 0);
2326 
2327 			/*
2328 			 * Release the previously-marked lock modes
2329 			 */
2330 			for (i = 1; i <= numLockModes; i++)
2331 			{
2332 				if (proclock->releaseMask & LOCKBIT_ON(i))
2333 					wakeupNeeded |= UnGrantLock(lock, i, proclock,
2334 												lockMethodTable);
2335 			}
2336 			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
2337 			Assert(lock->nGranted <= lock->nRequested);
2338 			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
2339 
2340 			proclock->releaseMask = 0;
2341 
2342 			/* CleanUpLock will wake up waiters if needed. */
2343 			CleanUpLock(lock, proclock,
2344 						lockMethodTable,
2345 						LockTagHashCode(&lock->tag),
2346 						wakeupNeeded);
2347 		}						/* loop over PROCLOCKs within this partition */
2348 
2349 		LWLockRelease(partitionLock);
2350 	}							/* loop over partitions */
2351 
2352 #ifdef LOCK_DEBUG
2353 	if (*(lockMethodTable->trace_flag))
2354 		elog(LOG, "LockReleaseAll done");
2355 #endif
2356 }
2357 
2358 /*
2359  * LockReleaseSession -- Release all session locks of the specified lock method
2360  *		that are held by the current process.
2361  */
2362 void
LockReleaseSession(LOCKMETHODID lockmethodid)2363 LockReleaseSession(LOCKMETHODID lockmethodid)
2364 {
2365 	HASH_SEQ_STATUS status;
2366 	LOCALLOCK  *locallock;
2367 
2368 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2369 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2370 
2371 	hash_seq_init(&status, LockMethodLocalHash);
2372 
2373 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2374 	{
2375 		/* Ignore items that are not of the specified lock method */
2376 		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2377 			continue;
2378 
2379 		ReleaseLockIfHeld(locallock, true);
2380 	}
2381 }
2382 
2383 /*
2384  * LockReleaseCurrentOwner
2385  *		Release all locks belonging to CurrentResourceOwner
2386  *
2387  * If the caller knows what those locks are, it can pass them as an array.
2388  * That speeds up the call significantly, when a lot of locks are held.
2389  * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
2390  * table to find them.
2391  */
2392 void
LockReleaseCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2393 LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2394 {
2395 	if (locallocks == NULL)
2396 	{
2397 		HASH_SEQ_STATUS status;
2398 		LOCALLOCK  *locallock;
2399 
2400 		hash_seq_init(&status, LockMethodLocalHash);
2401 
2402 		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2403 			ReleaseLockIfHeld(locallock, false);
2404 	}
2405 	else
2406 	{
2407 		int			i;
2408 
2409 		for (i = nlocks - 1; i >= 0; i--)
2410 			ReleaseLockIfHeld(locallocks[i], false);
2411 	}
2412 }
2413 
2414 /*
2415  * ReleaseLockIfHeld
2416  *		Release any session-level locks on this lockable object if sessionLock
2417  *		is true; else, release any locks held by CurrentResourceOwner.
2418  *
2419  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
2420  * locks), but without refactoring LockRelease() we cannot support releasing
2421  * locks belonging to resource owners other than CurrentResourceOwner.
2422  * If we were to refactor, it'd be a good idea to fix it so we don't have to
2423  * do a hashtable lookup of the locallock, too.  However, currently this
2424  * function isn't used heavily enough to justify refactoring for its
2425  * convenience.
2426  */
2427 static void
ReleaseLockIfHeld(LOCALLOCK * locallock,bool sessionLock)2428 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
2429 {
2430 	ResourceOwner owner;
2431 	LOCALLOCKOWNER *lockOwners;
2432 	int			i;
2433 
2434 	/* Identify owner for lock (must match LockRelease!) */
2435 	if (sessionLock)
2436 		owner = NULL;
2437 	else
2438 		owner = CurrentResourceOwner;
2439 
2440 	/* Scan to see if there are any locks belonging to the target owner */
2441 	lockOwners = locallock->lockOwners;
2442 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
2443 	{
2444 		if (lockOwners[i].owner == owner)
2445 		{
2446 			Assert(lockOwners[i].nLocks > 0);
2447 			if (lockOwners[i].nLocks < locallock->nLocks)
2448 			{
2449 				/*
2450 				 * We will still hold this lock after forgetting this
2451 				 * ResourceOwner.
2452 				 */
2453 				locallock->nLocks -= lockOwners[i].nLocks;
2454 				/* compact out unused slot */
2455 				locallock->numLockOwners--;
2456 				if (owner != NULL)
2457 					ResourceOwnerForgetLock(owner, locallock);
2458 				if (i < locallock->numLockOwners)
2459 					lockOwners[i] = lockOwners[locallock->numLockOwners];
2460 			}
2461 			else
2462 			{
2463 				Assert(lockOwners[i].nLocks == locallock->nLocks);
2464 				/* We want to call LockRelease just once */
2465 				lockOwners[i].nLocks = 1;
2466 				locallock->nLocks = 1;
2467 				if (!LockRelease(&locallock->tag.lock,
2468 								 locallock->tag.mode,
2469 								 sessionLock))
2470 					elog(WARNING, "ReleaseLockIfHeld: failed??");
2471 			}
2472 			break;
2473 		}
2474 	}
2475 }
2476 
2477 /*
2478  * LockReassignCurrentOwner
2479  *		Reassign all locks belonging to CurrentResourceOwner to belong
2480  *		to its parent resource owner.
2481  *
2482  * If the caller knows what those locks are, it can pass them as an array.
2483  * That speeds up the call significantly, when a lot of locks are held
2484  * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
2485  * and we'll traverse through our hash table to find them.
2486  */
2487 void
LockReassignCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2488 LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2489 {
2490 	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
2491 
2492 	Assert(parent != NULL);
2493 
2494 	if (locallocks == NULL)
2495 	{
2496 		HASH_SEQ_STATUS status;
2497 		LOCALLOCK  *locallock;
2498 
2499 		hash_seq_init(&status, LockMethodLocalHash);
2500 
2501 		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2502 			LockReassignOwner(locallock, parent);
2503 	}
2504 	else
2505 	{
2506 		int			i;
2507 
2508 		for (i = nlocks - 1; i >= 0; i--)
2509 			LockReassignOwner(locallocks[i], parent);
2510 	}
2511 }
2512 
2513 /*
2514  * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
2515  * CurrentResourceOwner to its parent.
2516  */
2517 static void
LockReassignOwner(LOCALLOCK * locallock,ResourceOwner parent)2518 LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
2519 {
2520 	LOCALLOCKOWNER *lockOwners;
2521 	int			i;
2522 	int			ic = -1;
2523 	int			ip = -1;
2524 
2525 	/*
2526 	 * Scan to see if there are any locks belonging to current owner or its
2527 	 * parent
2528 	 */
2529 	lockOwners = locallock->lockOwners;
2530 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
2531 	{
2532 		if (lockOwners[i].owner == CurrentResourceOwner)
2533 			ic = i;
2534 		else if (lockOwners[i].owner == parent)
2535 			ip = i;
2536 	}
2537 
2538 	if (ic < 0)
2539 		return;					/* no current locks */
2540 
2541 	if (ip < 0)
2542 	{
2543 		/* Parent has no slot, so just give it the child's slot */
2544 		lockOwners[ic].owner = parent;
2545 		ResourceOwnerRememberLock(parent, locallock);
2546 	}
2547 	else
2548 	{
2549 		/* Merge child's count with parent's */
2550 		lockOwners[ip].nLocks += lockOwners[ic].nLocks;
2551 		/* compact out unused slot */
2552 		locallock->numLockOwners--;
2553 		if (ic < locallock->numLockOwners)
2554 			lockOwners[ic] = lockOwners[locallock->numLockOwners];
2555 	}
2556 	ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
2557 }
2558 
2559 /*
2560  * FastPathGrantRelationLock
2561  *		Grant lock using per-backend fast-path array, if there is space.
2562  */
2563 static bool
FastPathGrantRelationLock(Oid relid,LOCKMODE lockmode)2564 FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2565 {
2566 	uint32		f;
2567 	uint32		unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
2568 
2569 	/* Scan for existing entry for this relid, remembering empty slot. */
2570 	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2571 	{
2572 		if (FAST_PATH_GET_BITS(MyProc, f) == 0)
2573 			unused_slot = f;
2574 		else if (MyProc->fpRelId[f] == relid)
2575 		{
2576 			Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
2577 			FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
2578 			return true;
2579 		}
2580 	}
2581 
2582 	/* If no existing entry, use any empty slot. */
2583 	if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
2584 	{
2585 		MyProc->fpRelId[unused_slot] = relid;
2586 		FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
2587 		++FastPathLocalUseCount;
2588 		return true;
2589 	}
2590 
2591 	/* No existing entry, and no empty slot. */
2592 	return false;
2593 }
2594 
2595 /*
2596  * FastPathUnGrantRelationLock
2597  *		Release fast-path lock, if present.  Update backend-private local
2598  *		use count, while we're at it.
2599  */
2600 static bool
FastPathUnGrantRelationLock(Oid relid,LOCKMODE lockmode)2601 FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2602 {
2603 	uint32		f;
2604 	bool		result = false;
2605 
2606 	FastPathLocalUseCount = 0;
2607 	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2608 	{
2609 		if (MyProc->fpRelId[f] == relid
2610 			&& FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2611 		{
2612 			Assert(!result);
2613 			FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2614 			result = true;
2615 			/* we continue iterating so as to update FastPathLocalUseCount */
2616 		}
2617 		if (FAST_PATH_GET_BITS(MyProc, f) != 0)
2618 			++FastPathLocalUseCount;
2619 	}
2620 	return result;
2621 }
2622 
2623 /*
2624  * FastPathTransferRelationLocks
2625  *		Transfer locks matching the given lock tag from per-backend fast-path
2626  *		arrays to the shared hash table.
2627  *
2628  * Returns true if successful, false if ran out of shared memory.
2629  */
2630 static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable,const LOCKTAG * locktag,uint32 hashcode)2631 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2632 							  uint32 hashcode)
2633 {
2634 	LWLock	   *partitionLock = LockHashPartitionLock(hashcode);
2635 	Oid			relid = locktag->locktag_field2;
2636 	uint32		i;
2637 
2638 	/*
2639 	 * Every PGPROC that can potentially hold a fast-path lock is present in
2640 	 * ProcGlobal->allProcs.  Prepared transactions are not, but any
2641 	 * outstanding fast-path locks held by prepared transactions are
2642 	 * transferred to the main lock table.
2643 	 */
2644 	for (i = 0; i < ProcGlobal->allProcCount; i++)
2645 	{
2646 		PGPROC	   *proc = &ProcGlobal->allProcs[i];
2647 		uint32		f;
2648 
2649 		LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
2650 
2651 		/*
2652 		 * If the target backend isn't referencing the same database as the
2653 		 * lock, then we needn't examine the individual relation IDs at all;
2654 		 * none of them can be relevant.
2655 		 *
2656 		 * proc->databaseId is set at backend startup time and never changes
2657 		 * thereafter, so it might be safe to perform this test before
2658 		 * acquiring &proc->backendLock.  In particular, it's certainly safe
2659 		 * to assume that if the target backend holds any fast-path locks, it
2660 		 * must have performed a memory-fencing operation (in particular, an
2661 		 * LWLock acquisition) since setting proc->databaseId.  However, it's
2662 		 * less clear that our backend is certain to have performed a memory
2663 		 * fencing operation since the other backend set proc->databaseId.  So
2664 		 * for now, we test it after acquiring the LWLock just to be safe.
2665 		 */
2666 		if (proc->databaseId != locktag->locktag_field1)
2667 		{
2668 			LWLockRelease(&proc->backendLock);
2669 			continue;
2670 		}
2671 
2672 		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2673 		{
2674 			uint32		lockmode;
2675 
2676 			/* Look for an allocated slot matching the given relid. */
2677 			if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
2678 				continue;
2679 
2680 			/* Find or create lock object. */
2681 			LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2682 			for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
2683 				 lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
2684 				 ++lockmode)
2685 			{
2686 				PROCLOCK   *proclock;
2687 
2688 				if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
2689 					continue;
2690 				proclock = SetupLockInTable(lockMethodTable, proc, locktag,
2691 											hashcode, lockmode);
2692 				if (!proclock)
2693 				{
2694 					LWLockRelease(partitionLock);
2695 					LWLockRelease(&proc->backendLock);
2696 					return false;
2697 				}
2698 				GrantLock(proclock->tag.myLock, proclock, lockmode);
2699 				FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
2700 			}
2701 			LWLockRelease(partitionLock);
2702 
2703 			/* No need to examine remaining slots. */
2704 			break;
2705 		}
2706 		LWLockRelease(&proc->backendLock);
2707 	}
2708 	return true;
2709 }
2710 
2711 /*
2712  * FastPathGetLockEntry
2713  *		Return the PROCLOCK for a lock originally taken via the fast-path,
2714  *		transferring it to the primary lock table if necessary.
2715  *
2716  * Note: caller takes care of updating the locallock object.
2717  */
2718 static PROCLOCK *
FastPathGetRelationLockEntry(LOCALLOCK * locallock)2719 FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2720 {
2721 	LockMethod	lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
2722 	LOCKTAG    *locktag = &locallock->tag.lock;
2723 	PROCLOCK   *proclock = NULL;
2724 	LWLock	   *partitionLock = LockHashPartitionLock(locallock->hashcode);
2725 	Oid			relid = locktag->locktag_field2;
2726 	uint32		f;
2727 
2728 	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2729 
2730 	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2731 	{
2732 		uint32		lockmode;
2733 
2734 		/* Look for an allocated slot matching the given relid. */
2735 		if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
2736 			continue;
2737 
2738 		/* If we don't have a lock of the given mode, forget it! */
2739 		lockmode = locallock->tag.mode;
2740 		if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2741 			break;
2742 
2743 		/* Find or create lock object. */
2744 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2745 
2746 		proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
2747 									locallock->hashcode, lockmode);
2748 		if (!proclock)
2749 		{
2750 			LWLockRelease(partitionLock);
2751 			LWLockRelease(&MyProc->backendLock);
2752 			ereport(ERROR,
2753 					(errcode(ERRCODE_OUT_OF_MEMORY),
2754 					 errmsg("out of shared memory"),
2755 					 errhint("You might need to increase max_locks_per_transaction.")));
2756 		}
2757 		GrantLock(proclock->tag.myLock, proclock, lockmode);
2758 		FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2759 
2760 		LWLockRelease(partitionLock);
2761 
2762 		/* No need to examine remaining slots. */
2763 		break;
2764 	}
2765 
2766 	LWLockRelease(&MyProc->backendLock);
2767 
2768 	/* Lock may have already been transferred by some other backend. */
2769 	if (proclock == NULL)
2770 	{
2771 		LOCK	   *lock;
2772 		PROCLOCKTAG proclocktag;
2773 		uint32		proclock_hashcode;
2774 
2775 		LWLockAcquire(partitionLock, LW_SHARED);
2776 
2777 		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2778 													(void *) locktag,
2779 													locallock->hashcode,
2780 													HASH_FIND,
2781 													NULL);
2782 		if (!lock)
2783 			elog(ERROR, "failed to re-find shared lock object");
2784 
2785 		proclocktag.myLock = lock;
2786 		proclocktag.myProc = MyProc;
2787 
2788 		proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
2789 		proclock = (PROCLOCK *)
2790 			hash_search_with_hash_value(LockMethodProcLockHash,
2791 										(void *) &proclocktag,
2792 										proclock_hashcode,
2793 										HASH_FIND,
2794 										NULL);
2795 		if (!proclock)
2796 			elog(ERROR, "failed to re-find shared proclock object");
2797 		LWLockRelease(partitionLock);
2798 	}
2799 
2800 	return proclock;
2801 }
2802 
2803 /*
2804  * GetLockConflicts
2805  *		Get an array of VirtualTransactionIds of xacts currently holding locks
2806  *		that would conflict with the specified lock/lockmode.
2807  *		xacts merely awaiting such a lock are NOT reported.
2808  *
2809  * The result array is palloc'd and is terminated with an invalid VXID.
2810  * *countp, if not null, is updated to the number of items set.
2811  *
2812  * Of course, the result could be out of date by the time it's returned, so
2813  * use of this function has to be thought about carefully.  Similarly, a
2814  * PGPROC with no "lxid" will be considered non-conflicting regardless of any
2815  * lock it holds.  Existing callers don't care about a locker after that
2816  * locker's pg_xact updates complete.  CommitTransaction() clears "lxid" after
2817  * pg_xact updates and before releasing locks.
2818  *
2819  * Note we never include the current xact's vxid in the result array,
2820  * since an xact never blocks itself.
2821  */
2822 VirtualTransactionId *
GetLockConflicts(const LOCKTAG * locktag,LOCKMODE lockmode,int * countp)2823 GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
2824 {
2825 	static VirtualTransactionId *vxids;
2826 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
2827 	LockMethod	lockMethodTable;
2828 	LOCK	   *lock;
2829 	LOCKMASK	conflictMask;
2830 	SHM_QUEUE  *procLocks;
2831 	PROCLOCK   *proclock;
2832 	uint32		hashcode;
2833 	LWLock	   *partitionLock;
2834 	int			count = 0;
2835 	int			fast_count = 0;
2836 
2837 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2838 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2839 	lockMethodTable = LockMethods[lockmethodid];
2840 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
2841 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
2842 
2843 	/*
2844 	 * Allocate memory to store results, and fill with InvalidVXID.  We only
2845 	 * need enough space for MaxBackends + max_prepared_xacts + a terminator.
2846 	 * InHotStandby allocate once in TopMemoryContext.
2847 	 */
2848 	if (InHotStandby)
2849 	{
2850 		if (vxids == NULL)
2851 			vxids = (VirtualTransactionId *)
2852 				MemoryContextAlloc(TopMemoryContext,
2853 								   sizeof(VirtualTransactionId) *
2854 								   (MaxBackends + max_prepared_xacts + 1));
2855 	}
2856 	else
2857 		vxids = (VirtualTransactionId *)
2858 			palloc0(sizeof(VirtualTransactionId) *
2859 					(MaxBackends + max_prepared_xacts + 1));
2860 
2861 	/* Compute hash code and partition lock, and look up conflicting modes. */
2862 	hashcode = LockTagHashCode(locktag);
2863 	partitionLock = LockHashPartitionLock(hashcode);
2864 	conflictMask = lockMethodTable->conflictTab[lockmode];
2865 
2866 	/*
2867 	 * Fast path locks might not have been entered in the primary lock table.
2868 	 * If the lock we're dealing with could conflict with such a lock, we must
2869 	 * examine each backend's fast-path array for conflicts.
2870 	 */
2871 	if (ConflictsWithRelationFastPath(locktag, lockmode))
2872 	{
2873 		int			i;
2874 		Oid			relid = locktag->locktag_field2;
2875 		VirtualTransactionId vxid;
2876 
2877 		/*
2878 		 * Iterate over relevant PGPROCs.  Anything held by a prepared
2879 		 * transaction will have been transferred to the primary lock table,
2880 		 * so we need not worry about those.  This is all a bit fuzzy, because
2881 		 * new locks could be taken after we've visited a particular
2882 		 * partition, but the callers had better be prepared to deal with that
2883 		 * anyway, since the locks could equally well be taken between the
2884 		 * time we return the value and the time the caller does something
2885 		 * with it.
2886 		 */
2887 		for (i = 0; i < ProcGlobal->allProcCount; i++)
2888 		{
2889 			PGPROC	   *proc = &ProcGlobal->allProcs[i];
2890 			uint32		f;
2891 
2892 			/* A backend never blocks itself */
2893 			if (proc == MyProc)
2894 				continue;
2895 
2896 			LWLockAcquire(&proc->backendLock, LW_SHARED);
2897 
2898 			/*
2899 			 * If the target backend isn't referencing the same database as
2900 			 * the lock, then we needn't examine the individual relation IDs
2901 			 * at all; none of them can be relevant.
2902 			 *
2903 			 * See FastPathTransferLocks() for discussion of why we do this
2904 			 * test after acquiring the lock.
2905 			 */
2906 			if (proc->databaseId != locktag->locktag_field1)
2907 			{
2908 				LWLockRelease(&proc->backendLock);
2909 				continue;
2910 			}
2911 
2912 			for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2913 			{
2914 				uint32		lockmask;
2915 
2916 				/* Look for an allocated slot matching the given relid. */
2917 				if (relid != proc->fpRelId[f])
2918 					continue;
2919 				lockmask = FAST_PATH_GET_BITS(proc, f);
2920 				if (!lockmask)
2921 					continue;
2922 				lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
2923 
2924 				/*
2925 				 * There can only be one entry per relation, so if we found it
2926 				 * and it doesn't conflict, we can skip the rest of the slots.
2927 				 */
2928 				if ((lockmask & conflictMask) == 0)
2929 					break;
2930 
2931 				/* Conflict! */
2932 				GET_VXID_FROM_PGPROC(vxid, *proc);
2933 
2934 				if (VirtualTransactionIdIsValid(vxid))
2935 					vxids[count++] = vxid;
2936 				/* else, xact already committed or aborted */
2937 
2938 				/* No need to examine remaining slots. */
2939 				break;
2940 			}
2941 
2942 			LWLockRelease(&proc->backendLock);
2943 		}
2944 	}
2945 
2946 	/* Remember how many fast-path conflicts we found. */
2947 	fast_count = count;
2948 
2949 	/*
2950 	 * Look up the lock object matching the tag.
2951 	 */
2952 	LWLockAcquire(partitionLock, LW_SHARED);
2953 
2954 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2955 												(const void *) locktag,
2956 												hashcode,
2957 												HASH_FIND,
2958 												NULL);
2959 	if (!lock)
2960 	{
2961 		/*
2962 		 * If the lock object doesn't exist, there is nothing holding a lock
2963 		 * on this lockable object.
2964 		 */
2965 		LWLockRelease(partitionLock);
2966 		vxids[count].backendId = InvalidBackendId;
2967 		vxids[count].localTransactionId = InvalidLocalTransactionId;
2968 		if (countp)
2969 			*countp = count;
2970 		return vxids;
2971 	}
2972 
2973 	/*
2974 	 * Examine each existing holder (or awaiter) of the lock.
2975 	 */
2976 
2977 	procLocks = &(lock->procLocks);
2978 
2979 	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2980 										 offsetof(PROCLOCK, lockLink));
2981 
2982 	while (proclock)
2983 	{
2984 		if (conflictMask & proclock->holdMask)
2985 		{
2986 			PGPROC	   *proc = proclock->tag.myProc;
2987 
2988 			/* A backend never blocks itself */
2989 			if (proc != MyProc)
2990 			{
2991 				VirtualTransactionId vxid;
2992 
2993 				GET_VXID_FROM_PGPROC(vxid, *proc);
2994 
2995 				if (VirtualTransactionIdIsValid(vxid))
2996 				{
2997 					int			i;
2998 
2999 					/* Avoid duplicate entries. */
3000 					for (i = 0; i < fast_count; ++i)
3001 						if (VirtualTransactionIdEquals(vxids[i], vxid))
3002 							break;
3003 					if (i >= fast_count)
3004 						vxids[count++] = vxid;
3005 				}
3006 				/* else, xact already committed or aborted */
3007 			}
3008 		}
3009 
3010 		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3011 											 offsetof(PROCLOCK, lockLink));
3012 	}
3013 
3014 	LWLockRelease(partitionLock);
3015 
3016 	if (count > MaxBackends + max_prepared_xacts)	/* should never happen */
3017 		elog(PANIC, "too many conflicting locks found");
3018 
3019 	vxids[count].backendId = InvalidBackendId;
3020 	vxids[count].localTransactionId = InvalidLocalTransactionId;
3021 	if (countp)
3022 		*countp = count;
3023 	return vxids;
3024 }
3025 
3026 /*
3027  * Find a lock in the shared lock table and release it.  It is the caller's
3028  * responsibility to verify that this is a sane thing to do.  (For example, it
3029  * would be bad to release a lock here if there might still be a LOCALLOCK
3030  * object with pointers to it.)
3031  *
3032  * We currently use this in two situations: first, to release locks held by
3033  * prepared transactions on commit (see lock_twophase_postcommit); and second,
3034  * to release locks taken via the fast-path, transferred to the main hash
3035  * table, and then released (see LockReleaseAll).
3036  */
3037 static void
LockRefindAndRelease(LockMethod lockMethodTable,PGPROC * proc,LOCKTAG * locktag,LOCKMODE lockmode,bool decrement_strong_lock_count)3038 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
3039 					 LOCKTAG *locktag, LOCKMODE lockmode,
3040 					 bool decrement_strong_lock_count)
3041 {
3042 	LOCK	   *lock;
3043 	PROCLOCK   *proclock;
3044 	PROCLOCKTAG proclocktag;
3045 	uint32		hashcode;
3046 	uint32		proclock_hashcode;
3047 	LWLock	   *partitionLock;
3048 	bool		wakeupNeeded;
3049 
3050 	hashcode = LockTagHashCode(locktag);
3051 	partitionLock = LockHashPartitionLock(hashcode);
3052 
3053 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3054 
3055 	/*
3056 	 * Re-find the lock object (it had better be there).
3057 	 */
3058 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
3059 												(void *) locktag,
3060 												hashcode,
3061 												HASH_FIND,
3062 												NULL);
3063 	if (!lock)
3064 		elog(PANIC, "failed to re-find shared lock object");
3065 
3066 	/*
3067 	 * Re-find the proclock object (ditto).
3068 	 */
3069 	proclocktag.myLock = lock;
3070 	proclocktag.myProc = proc;
3071 
3072 	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3073 
3074 	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
3075 														(void *) &proclocktag,
3076 														proclock_hashcode,
3077 														HASH_FIND,
3078 														NULL);
3079 	if (!proclock)
3080 		elog(PANIC, "failed to re-find shared proclock object");
3081 
3082 	/*
3083 	 * Double-check that we are actually holding a lock of the type we want to
3084 	 * release.
3085 	 */
3086 	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
3087 	{
3088 		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
3089 		LWLockRelease(partitionLock);
3090 		elog(WARNING, "you don't own a lock of type %s",
3091 			 lockMethodTable->lockModeNames[lockmode]);
3092 		return;
3093 	}
3094 
3095 	/*
3096 	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
3097 	 */
3098 	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
3099 
3100 	CleanUpLock(lock, proclock,
3101 				lockMethodTable, hashcode,
3102 				wakeupNeeded);
3103 
3104 	LWLockRelease(partitionLock);
3105 
3106 	/*
3107 	 * Decrement strong lock count.  This logic is needed only for 2PC.
3108 	 */
3109 	if (decrement_strong_lock_count
3110 		&& ConflictsWithRelationFastPath(locktag, lockmode))
3111 	{
3112 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
3113 
3114 		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
3115 		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
3116 		FastPathStrongRelationLocks->count[fasthashcode]--;
3117 		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3118 	}
3119 }
3120 
3121 /*
3122  * CheckForSessionAndXactLocks
3123  *		Check to see if transaction holds both session-level and xact-level
3124  *		locks on the same object; if so, throw an error.
3125  *
3126  * If we have both session- and transaction-level locks on the same object,
3127  * PREPARE TRANSACTION must fail.  This should never happen with regular
3128  * locks, since we only take those at session level in some special operations
3129  * like VACUUM.  It's possible to hit this with advisory locks, though.
3130  *
3131  * It would be nice if we could keep the session hold and give away the
3132  * transactional hold to the prepared xact.  However, that would require two
3133  * PROCLOCK objects, and we cannot be sure that another PROCLOCK will be
3134  * available when it comes time for PostPrepare_Locks to do the deed.
3135  * So for now, we error out while we can still do so safely.
3136  *
3137  * Since the LOCALLOCK table stores a separate entry for each lockmode,
3138  * we can't implement this check by examining LOCALLOCK entries in isolation.
3139  * We must build a transient hashtable that is indexed by locktag only.
3140  */
3141 static void
CheckForSessionAndXactLocks(void)3142 CheckForSessionAndXactLocks(void)
3143 {
3144 	typedef struct
3145 	{
3146 		LOCKTAG		lock;		/* identifies the lockable object */
3147 		bool		sessLock;	/* is any lockmode held at session level? */
3148 		bool		xactLock;	/* is any lockmode held at xact level? */
3149 	} PerLockTagEntry;
3150 
3151 	HASHCTL		hash_ctl;
3152 	HTAB	   *lockhtab;
3153 	HASH_SEQ_STATUS status;
3154 	LOCALLOCK  *locallock;
3155 
3156 	/* Create a local hash table keyed by LOCKTAG only */
3157 	hash_ctl.keysize = sizeof(LOCKTAG);
3158 	hash_ctl.entrysize = sizeof(PerLockTagEntry);
3159 	hash_ctl.hcxt = CurrentMemoryContext;
3160 
3161 	lockhtab = hash_create("CheckForSessionAndXactLocks table",
3162 						   256, /* arbitrary initial size */
3163 						   &hash_ctl,
3164 						   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3165 
3166 	/* Scan local lock table to find entries for each LOCKTAG */
3167 	hash_seq_init(&status, LockMethodLocalHash);
3168 
3169 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3170 	{
3171 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3172 		PerLockTagEntry *hentry;
3173 		bool		found;
3174 		int			i;
3175 
3176 		/*
3177 		 * Ignore VXID locks.  We don't want those to be held by prepared
3178 		 * transactions, since they aren't meaningful after a restart.
3179 		 */
3180 		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3181 			continue;
3182 
3183 		/* Ignore it if we don't actually hold the lock */
3184 		if (locallock->nLocks <= 0)
3185 			continue;
3186 
3187 		/* Otherwise, find or make an entry in lockhtab */
3188 		hentry = (PerLockTagEntry *) hash_search(lockhtab,
3189 												 (void *) &locallock->tag.lock,
3190 												 HASH_ENTER, &found);
3191 		if (!found)				/* initialize, if newly created */
3192 			hentry->sessLock = hentry->xactLock = false;
3193 
3194 		/* Scan to see if we hold lock at session or xact level or both */
3195 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
3196 		{
3197 			if (lockOwners[i].owner == NULL)
3198 				hentry->sessLock = true;
3199 			else
3200 				hentry->xactLock = true;
3201 		}
3202 
3203 		/*
3204 		 * We can throw error immediately when we see both types of locks; no
3205 		 * need to wait around to see if there are more violations.
3206 		 */
3207 		if (hentry->sessLock && hentry->xactLock)
3208 			ereport(ERROR,
3209 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3210 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3211 	}
3212 
3213 	/* Success, so clean up */
3214 	hash_destroy(lockhtab);
3215 }
3216 
3217 /*
3218  * AtPrepare_Locks
3219  *		Do the preparatory work for a PREPARE: make 2PC state file records
3220  *		for all locks currently held.
3221  *
3222  * Session-level locks are ignored, as are VXID locks.
3223  *
3224  * For the most part, we don't need to touch shared memory for this ---
3225  * all the necessary state information is in the locallock table.
3226  * Fast-path locks are an exception, however: we move any such locks to
3227  * the main table before allowing PREPARE TRANSACTION to succeed.
3228  */
3229 void
AtPrepare_Locks(void)3230 AtPrepare_Locks(void)
3231 {
3232 	HASH_SEQ_STATUS status;
3233 	LOCALLOCK  *locallock;
3234 
3235 	/* First, verify there aren't locks of both xact and session level */
3236 	CheckForSessionAndXactLocks();
3237 
3238 	/* Now do the per-locallock cleanup work */
3239 	hash_seq_init(&status, LockMethodLocalHash);
3240 
3241 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3242 	{
3243 		TwoPhaseLockRecord record;
3244 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3245 		bool		haveSessionLock;
3246 		bool		haveXactLock;
3247 		int			i;
3248 
3249 		/*
3250 		 * Ignore VXID locks.  We don't want those to be held by prepared
3251 		 * transactions, since they aren't meaningful after a restart.
3252 		 */
3253 		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3254 			continue;
3255 
3256 		/* Ignore it if we don't actually hold the lock */
3257 		if (locallock->nLocks <= 0)
3258 			continue;
3259 
3260 		/* Scan to see whether we hold it at session or transaction level */
3261 		haveSessionLock = haveXactLock = false;
3262 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
3263 		{
3264 			if (lockOwners[i].owner == NULL)
3265 				haveSessionLock = true;
3266 			else
3267 				haveXactLock = true;
3268 		}
3269 
3270 		/* Ignore it if we have only session lock */
3271 		if (!haveXactLock)
3272 			continue;
3273 
3274 		/* This can't happen, because we already checked it */
3275 		if (haveSessionLock)
3276 			ereport(ERROR,
3277 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3278 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3279 
3280 		/*
3281 		 * If the local lock was taken via the fast-path, we need to move it
3282 		 * to the primary lock table, or just get a pointer to the existing
3283 		 * primary lock table entry if by chance it's already been
3284 		 * transferred.
3285 		 */
3286 		if (locallock->proclock == NULL)
3287 		{
3288 			locallock->proclock = FastPathGetRelationLockEntry(locallock);
3289 			locallock->lock = locallock->proclock->tag.myLock;
3290 		}
3291 
3292 		/*
3293 		 * Arrange to not release any strong lock count held by this lock
3294 		 * entry.  We must retain the count until the prepared transaction is
3295 		 * committed or rolled back.
3296 		 */
3297 		locallock->holdsStrongLockCount = false;
3298 
3299 		/*
3300 		 * Create a 2PC record.
3301 		 */
3302 		memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
3303 		record.lockmode = locallock->tag.mode;
3304 
3305 		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
3306 							   &record, sizeof(TwoPhaseLockRecord));
3307 	}
3308 }
3309 
3310 /*
3311  * PostPrepare_Locks
3312  *		Clean up after successful PREPARE
3313  *
3314  * Here, we want to transfer ownership of our locks to a dummy PGPROC
3315  * that's now associated with the prepared transaction, and we want to
3316  * clean out the corresponding entries in the LOCALLOCK table.
3317  *
3318  * Note: by removing the LOCALLOCK entries, we are leaving dangling
3319  * pointers in the transaction's resource owner.  This is OK at the
3320  * moment since resowner.c doesn't try to free locks retail at a toplevel
3321  * transaction commit or abort.  We could alternatively zero out nLocks
3322  * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
3323  * but that probably costs more cycles.
3324  */
3325 void
PostPrepare_Locks(TransactionId xid)3326 PostPrepare_Locks(TransactionId xid)
3327 {
3328 	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid, false);
3329 	HASH_SEQ_STATUS status;
3330 	LOCALLOCK  *locallock;
3331 	LOCK	   *lock;
3332 	PROCLOCK   *proclock;
3333 	PROCLOCKTAG proclocktag;
3334 	int			partition;
3335 
3336 	/* Can't prepare a lock group follower. */
3337 	Assert(MyProc->lockGroupLeader == NULL ||
3338 		   MyProc->lockGroupLeader == MyProc);
3339 
3340 	/* This is a critical section: any error means big trouble */
3341 	START_CRIT_SECTION();
3342 
3343 	/*
3344 	 * First we run through the locallock table and get rid of unwanted
3345 	 * entries, then we scan the process's proclocks and transfer them to the
3346 	 * target proc.
3347 	 *
3348 	 * We do this separately because we may have multiple locallock entries
3349 	 * pointing to the same proclock, and we daren't end up with any dangling
3350 	 * pointers.
3351 	 */
3352 	hash_seq_init(&status, LockMethodLocalHash);
3353 
3354 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3355 	{
3356 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3357 		bool		haveSessionLock;
3358 		bool		haveXactLock;
3359 		int			i;
3360 
3361 		if (locallock->proclock == NULL || locallock->lock == NULL)
3362 		{
3363 			/*
3364 			 * We must've run out of shared memory while trying to set up this
3365 			 * lock.  Just forget the local entry.
3366 			 */
3367 			Assert(locallock->nLocks == 0);
3368 			RemoveLocalLock(locallock);
3369 			continue;
3370 		}
3371 
3372 		/* Ignore VXID locks */
3373 		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3374 			continue;
3375 
3376 		/* Scan to see whether we hold it at session or transaction level */
3377 		haveSessionLock = haveXactLock = false;
3378 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
3379 		{
3380 			if (lockOwners[i].owner == NULL)
3381 				haveSessionLock = true;
3382 			else
3383 				haveXactLock = true;
3384 		}
3385 
3386 		/* Ignore it if we have only session lock */
3387 		if (!haveXactLock)
3388 			continue;
3389 
3390 		/* This can't happen, because we already checked it */
3391 		if (haveSessionLock)
3392 			ereport(PANIC,
3393 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3394 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3395 
3396 		/* Mark the proclock to show we need to release this lockmode */
3397 		if (locallock->nLocks > 0)
3398 			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
3399 
3400 		/* And remove the locallock hashtable entry */
3401 		RemoveLocalLock(locallock);
3402 	}
3403 
3404 	/*
3405 	 * Now, scan each lock partition separately.
3406 	 */
3407 	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
3408 	{
3409 		LWLock	   *partitionLock;
3410 		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
3411 		PROCLOCK   *nextplock;
3412 
3413 		partitionLock = LockHashPartitionLockByIndex(partition);
3414 
3415 		/*
3416 		 * If the proclock list for this partition is empty, we can skip
3417 		 * acquiring the partition lock.  This optimization is safer than the
3418 		 * situation in LockReleaseAll, because we got rid of any fast-path
3419 		 * locks during AtPrepare_Locks, so there cannot be any case where
3420 		 * another backend is adding something to our lists now.  For safety,
3421 		 * though, we code this the same way as in LockReleaseAll.
3422 		 */
3423 		if (SHMQueueNext(procLocks, procLocks,
3424 						 offsetof(PROCLOCK, procLink)) == NULL)
3425 			continue;			/* needn't examine this partition */
3426 
3427 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3428 
3429 		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3430 												  offsetof(PROCLOCK, procLink));
3431 			 proclock;
3432 			 proclock = nextplock)
3433 		{
3434 			/* Get link first, since we may unlink/relink this proclock */
3435 			nextplock = (PROCLOCK *)
3436 				SHMQueueNext(procLocks, &proclock->procLink,
3437 							 offsetof(PROCLOCK, procLink));
3438 
3439 			Assert(proclock->tag.myProc == MyProc);
3440 
3441 			lock = proclock->tag.myLock;
3442 
3443 			/* Ignore VXID locks */
3444 			if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3445 				continue;
3446 
3447 			PROCLOCK_PRINT("PostPrepare_Locks", proclock);
3448 			LOCK_PRINT("PostPrepare_Locks", lock, 0);
3449 			Assert(lock->nRequested >= 0);
3450 			Assert(lock->nGranted >= 0);
3451 			Assert(lock->nGranted <= lock->nRequested);
3452 			Assert((proclock->holdMask & ~lock->grantMask) == 0);
3453 
3454 			/* Ignore it if nothing to release (must be a session lock) */
3455 			if (proclock->releaseMask == 0)
3456 				continue;
3457 
3458 			/* Else we should be releasing all locks */
3459 			if (proclock->releaseMask != proclock->holdMask)
3460 				elog(PANIC, "we seem to have dropped a bit somewhere");
3461 
3462 			/*
3463 			 * We cannot simply modify proclock->tag.myProc to reassign
3464 			 * ownership of the lock, because that's part of the hash key and
3465 			 * the proclock would then be in the wrong hash chain.  Instead
3466 			 * use hash_update_hash_key.  (We used to create a new hash entry,
3467 			 * but that risks out-of-memory failure if other processes are
3468 			 * busy making proclocks too.)	We must unlink the proclock from
3469 			 * our procLink chain and put it into the new proc's chain, too.
3470 			 *
3471 			 * Note: the updated proclock hash key will still belong to the
3472 			 * same hash partition, cf proclock_hash().  So the partition lock
3473 			 * we already hold is sufficient for this.
3474 			 */
3475 			SHMQueueDelete(&proclock->procLink);
3476 
3477 			/*
3478 			 * Create the new hash key for the proclock.
3479 			 */
3480 			proclocktag.myLock = lock;
3481 			proclocktag.myProc = newproc;
3482 
3483 			/*
3484 			 * Update groupLeader pointer to point to the new proc.  (We'd
3485 			 * better not be a member of somebody else's lock group!)
3486 			 */
3487 			Assert(proclock->groupLeader == proclock->tag.myProc);
3488 			proclock->groupLeader = newproc;
3489 
3490 			/*
3491 			 * Update the proclock.  We should not find any existing entry for
3492 			 * the same hash key, since there can be only one entry for any
3493 			 * given lock with my own proc.
3494 			 */
3495 			if (!hash_update_hash_key(LockMethodProcLockHash,
3496 									  (void *) proclock,
3497 									  (void *) &proclocktag))
3498 				elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3499 
3500 			/* Re-link into the new proc's proclock list */
3501 			SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
3502 								 &proclock->procLink);
3503 
3504 			PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
3505 		}						/* loop over PROCLOCKs within this partition */
3506 
3507 		LWLockRelease(partitionLock);
3508 	}							/* loop over partitions */
3509 
3510 	END_CRIT_SECTION();
3511 }
3512 
3513 
3514 /*
3515  * Estimate shared-memory space used for lock tables
3516  */
3517 Size
LockShmemSize(void)3518 LockShmemSize(void)
3519 {
3520 	Size		size = 0;
3521 	long		max_table_size;
3522 
3523 	/* lock hash table */
3524 	max_table_size = NLOCKENTS();
3525 	size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
3526 
3527 	/* proclock hash table */
3528 	max_table_size *= 2;
3529 	size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
3530 
3531 	/*
3532 	 * Since NLOCKENTS is only an estimate, add 10% safety margin.
3533 	 */
3534 	size = add_size(size, size / 10);
3535 
3536 	return size;
3537 }
3538 
3539 /*
3540  * GetLockStatusData - Return a summary of the lock manager's internal
3541  * status, for use in a user-level reporting function.
3542  *
3543  * The return data consists of an array of LockInstanceData objects,
3544  * which are a lightly abstracted version of the PROCLOCK data structures,
3545  * i.e. there is one entry for each unique lock and interested PGPROC.
3546  * It is the caller's responsibility to match up related items (such as
3547  * references to the same lockable object or PGPROC) if wanted.
3548  *
3549  * The design goal is to hold the LWLocks for as short a time as possible;
3550  * thus, this function simply makes a copy of the necessary data and releases
3551  * the locks, allowing the caller to contemplate and format the data for as
3552  * long as it pleases.
3553  */
3554 LockData *
GetLockStatusData(void)3555 GetLockStatusData(void)
3556 {
3557 	LockData   *data;
3558 	PROCLOCK   *proclock;
3559 	HASH_SEQ_STATUS seqstat;
3560 	int			els;
3561 	int			el;
3562 	int			i;
3563 
3564 	data = (LockData *) palloc(sizeof(LockData));
3565 
3566 	/* Guess how much space we'll need. */
3567 	els = MaxBackends;
3568 	el = 0;
3569 	data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
3570 
3571 	/*
3572 	 * First, we iterate through the per-backend fast-path arrays, locking
3573 	 * them one at a time.  This might produce an inconsistent picture of the
3574 	 * system state, but taking all of those LWLocks at the same time seems
3575 	 * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
3576 	 * matter too much, because none of these locks can be involved in lock
3577 	 * conflicts anyway - anything that might must be present in the main lock
3578 	 * table.  (For the same reason, we don't sweat about making leaderPid
3579 	 * completely valid.  We cannot safely dereference another backend's
3580 	 * lockGroupLeader field without holding all lock partition locks, and
3581 	 * it's not worth that.)
3582 	 */
3583 	for (i = 0; i < ProcGlobal->allProcCount; ++i)
3584 	{
3585 		PGPROC	   *proc = &ProcGlobal->allProcs[i];
3586 		uint32		f;
3587 
3588 		LWLockAcquire(&proc->backendLock, LW_SHARED);
3589 
3590 		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
3591 		{
3592 			LockInstanceData *instance;
3593 			uint32		lockbits = FAST_PATH_GET_BITS(proc, f);
3594 
3595 			/* Skip unallocated slots. */
3596 			if (!lockbits)
3597 				continue;
3598 
3599 			if (el >= els)
3600 			{
3601 				els += MaxBackends;
3602 				data->locks = (LockInstanceData *)
3603 					repalloc(data->locks, sizeof(LockInstanceData) * els);
3604 			}
3605 
3606 			instance = &data->locks[el];
3607 			SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
3608 								 proc->fpRelId[f]);
3609 			instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
3610 			instance->waitLockMode = NoLock;
3611 			instance->backend = proc->backendId;
3612 			instance->lxid = proc->lxid;
3613 			instance->pid = proc->pid;
3614 			instance->leaderPid = proc->pid;
3615 			instance->fastpath = true;
3616 
3617 			el++;
3618 		}
3619 
3620 		if (proc->fpVXIDLock)
3621 		{
3622 			VirtualTransactionId vxid;
3623 			LockInstanceData *instance;
3624 
3625 			if (el >= els)
3626 			{
3627 				els += MaxBackends;
3628 				data->locks = (LockInstanceData *)
3629 					repalloc(data->locks, sizeof(LockInstanceData) * els);
3630 			}
3631 
3632 			vxid.backendId = proc->backendId;
3633 			vxid.localTransactionId = proc->fpLocalTransactionId;
3634 
3635 			instance = &data->locks[el];
3636 			SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
3637 			instance->holdMask = LOCKBIT_ON(ExclusiveLock);
3638 			instance->waitLockMode = NoLock;
3639 			instance->backend = proc->backendId;
3640 			instance->lxid = proc->lxid;
3641 			instance->pid = proc->pid;
3642 			instance->leaderPid = proc->pid;
3643 			instance->fastpath = true;
3644 
3645 			el++;
3646 		}
3647 
3648 		LWLockRelease(&proc->backendLock);
3649 	}
3650 
3651 	/*
3652 	 * Next, acquire lock on the entire shared lock data structure.  We do
3653 	 * this so that, at least for locks in the primary lock table, the state
3654 	 * will be self-consistent.
3655 	 *
3656 	 * Since this is a read-only operation, we take shared instead of
3657 	 * exclusive lock.  There's not a whole lot of point to this, because all
3658 	 * the normal operations require exclusive lock, but it doesn't hurt
3659 	 * anything either. It will at least allow two backends to do
3660 	 * GetLockStatusData in parallel.
3661 	 *
3662 	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3663 	 */
3664 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3665 		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3666 
3667 	/* Now we can safely count the number of proclocks */
3668 	data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
3669 	if (data->nelements > els)
3670 	{
3671 		els = data->nelements;
3672 		data->locks = (LockInstanceData *)
3673 			repalloc(data->locks, sizeof(LockInstanceData) * els);
3674 	}
3675 
3676 	/* Now scan the tables to copy the data */
3677 	hash_seq_init(&seqstat, LockMethodProcLockHash);
3678 
3679 	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3680 	{
3681 		PGPROC	   *proc = proclock->tag.myProc;
3682 		LOCK	   *lock = proclock->tag.myLock;
3683 		LockInstanceData *instance = &data->locks[el];
3684 
3685 		memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3686 		instance->holdMask = proclock->holdMask;
3687 		if (proc->waitLock == proclock->tag.myLock)
3688 			instance->waitLockMode = proc->waitLockMode;
3689 		else
3690 			instance->waitLockMode = NoLock;
3691 		instance->backend = proc->backendId;
3692 		instance->lxid = proc->lxid;
3693 		instance->pid = proc->pid;
3694 		instance->leaderPid = proclock->groupLeader->pid;
3695 		instance->fastpath = false;
3696 
3697 		el++;
3698 	}
3699 
3700 	/*
3701 	 * And release locks.  We do this in reverse order for two reasons: (1)
3702 	 * Anyone else who needs more than one of the locks will be trying to lock
3703 	 * them in increasing order; we don't want to release the other process
3704 	 * until it can get all the locks it needs. (2) This avoids O(N^2)
3705 	 * behavior inside LWLockRelease.
3706 	 */
3707 	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3708 		LWLockRelease(LockHashPartitionLockByIndex(i));
3709 
3710 	Assert(el == data->nelements);
3711 
3712 	return data;
3713 }
3714 
3715 /*
3716  * GetBlockerStatusData - Return a summary of the lock manager's state
3717  * concerning locks that are blocking the specified PID or any member of
3718  * the PID's lock group, for use in a user-level reporting function.
3719  *
3720  * For each PID within the lock group that is awaiting some heavyweight lock,
3721  * the return data includes an array of LockInstanceData objects, which are
3722  * the same data structure used by GetLockStatusData; but unlike that function,
3723  * this one reports only the PROCLOCKs associated with the lock that that PID
3724  * is blocked on.  (Hence, all the locktags should be the same for any one
3725  * blocked PID.)  In addition, we return an array of the PIDs of those backends
3726  * that are ahead of the blocked PID in the lock's wait queue.  These can be
3727  * compared with the PIDs in the LockInstanceData objects to determine which
3728  * waiters are ahead of or behind the blocked PID in the queue.
3729  *
3730  * If blocked_pid isn't a valid backend PID or nothing in its lock group is
3731  * waiting on any heavyweight lock, return empty arrays.
3732  *
3733  * The design goal is to hold the LWLocks for as short a time as possible;
3734  * thus, this function simply makes a copy of the necessary data and releases
3735  * the locks, allowing the caller to contemplate and format the data for as
3736  * long as it pleases.
3737  */
3738 BlockedProcsData *
GetBlockerStatusData(int blocked_pid)3739 GetBlockerStatusData(int blocked_pid)
3740 {
3741 	BlockedProcsData *data;
3742 	PGPROC	   *proc;
3743 	int			i;
3744 
3745 	data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData));
3746 
3747 	/*
3748 	 * Guess how much space we'll need, and preallocate.  Most of the time
3749 	 * this will avoid needing to do repalloc while holding the LWLocks.  (We
3750 	 * assume, but check with an Assert, that MaxBackends is enough entries
3751 	 * for the procs[] array; the other two could need enlargement, though.)
3752 	 */
3753 	data->nprocs = data->nlocks = data->npids = 0;
3754 	data->maxprocs = data->maxlocks = data->maxpids = MaxBackends;
3755 	data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs);
3756 	data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks);
3757 	data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids);
3758 
3759 	/*
3760 	 * In order to search the ProcArray for blocked_pid and assume that that
3761 	 * entry won't immediately disappear under us, we must hold ProcArrayLock.
3762 	 * In addition, to examine the lock grouping fields of any other backend,
3763 	 * we must hold all the hash partition locks.  (Only one of those locks is
3764 	 * actually relevant for any one lock group, but we can't know which one
3765 	 * ahead of time.)	It's fairly annoying to hold all those locks
3766 	 * throughout this, but it's no worse than GetLockStatusData(), and it
3767 	 * does have the advantage that we're guaranteed to return a
3768 	 * self-consistent instantaneous state.
3769 	 */
3770 	LWLockAcquire(ProcArrayLock, LW_SHARED);
3771 
3772 	proc = BackendPidGetProcWithLock(blocked_pid);
3773 
3774 	/* Nothing to do if it's gone */
3775 	if (proc != NULL)
3776 	{
3777 		/*
3778 		 * Acquire lock on the entire shared lock data structure.  See notes
3779 		 * in GetLockStatusData().
3780 		 */
3781 		for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3782 			LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3783 
3784 		if (proc->lockGroupLeader == NULL)
3785 		{
3786 			/* Easy case, proc is not a lock group member */
3787 			GetSingleProcBlockerStatusData(proc, data);
3788 		}
3789 		else
3790 		{
3791 			/* Examine all procs in proc's lock group */
3792 			dlist_iter	iter;
3793 
3794 			dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers)
3795 			{
3796 				PGPROC	   *memberProc;
3797 
3798 				memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);
3799 				GetSingleProcBlockerStatusData(memberProc, data);
3800 			}
3801 		}
3802 
3803 		/*
3804 		 * And release locks.  See notes in GetLockStatusData().
3805 		 */
3806 		for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3807 			LWLockRelease(LockHashPartitionLockByIndex(i));
3808 
3809 		Assert(data->nprocs <= data->maxprocs);
3810 	}
3811 
3812 	LWLockRelease(ProcArrayLock);
3813 
3814 	return data;
3815 }
3816 
3817 /* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */
3818 static void
GetSingleProcBlockerStatusData(PGPROC * blocked_proc,BlockedProcsData * data)3819 GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
3820 {
3821 	LOCK	   *theLock = blocked_proc->waitLock;
3822 	BlockedProcData *bproc;
3823 	SHM_QUEUE  *procLocks;
3824 	PROCLOCK   *proclock;
3825 	PROC_QUEUE *waitQueue;
3826 	PGPROC	   *proc;
3827 	int			queue_size;
3828 	int			i;
3829 
3830 	/* Nothing to do if this proc is not blocked */
3831 	if (theLock == NULL)
3832 		return;
3833 
3834 	/* Set up a procs[] element */
3835 	bproc = &data->procs[data->nprocs++];
3836 	bproc->pid = blocked_proc->pid;
3837 	bproc->first_lock = data->nlocks;
3838 	bproc->first_waiter = data->npids;
3839 
3840 	/*
3841 	 * We may ignore the proc's fast-path arrays, since nothing in those could
3842 	 * be related to a contended lock.
3843 	 */
3844 
3845 	/* Collect all PROCLOCKs associated with theLock */
3846 	procLocks = &(theLock->procLocks);
3847 	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3848 										 offsetof(PROCLOCK, lockLink));
3849 	while (proclock)
3850 	{
3851 		PGPROC	   *proc = proclock->tag.myProc;
3852 		LOCK	   *lock = proclock->tag.myLock;
3853 		LockInstanceData *instance;
3854 
3855 		if (data->nlocks >= data->maxlocks)
3856 		{
3857 			data->maxlocks += MaxBackends;
3858 			data->locks = (LockInstanceData *)
3859 				repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks);
3860 		}
3861 
3862 		instance = &data->locks[data->nlocks];
3863 		memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3864 		instance->holdMask = proclock->holdMask;
3865 		if (proc->waitLock == lock)
3866 			instance->waitLockMode = proc->waitLockMode;
3867 		else
3868 			instance->waitLockMode = NoLock;
3869 		instance->backend = proc->backendId;
3870 		instance->lxid = proc->lxid;
3871 		instance->pid = proc->pid;
3872 		instance->leaderPid = proclock->groupLeader->pid;
3873 		instance->fastpath = false;
3874 		data->nlocks++;
3875 
3876 		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3877 											 offsetof(PROCLOCK, lockLink));
3878 	}
3879 
3880 	/* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
3881 	waitQueue = &(theLock->waitProcs);
3882 	queue_size = waitQueue->size;
3883 
3884 	if (queue_size > data->maxpids - data->npids)
3885 	{
3886 		data->maxpids = Max(data->maxpids + MaxBackends,
3887 							data->npids + queue_size);
3888 		data->waiter_pids = (int *) repalloc(data->waiter_pids,
3889 											 sizeof(int) * data->maxpids);
3890 	}
3891 
3892 	/* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
3893 	proc = (PGPROC *) waitQueue->links.next;
3894 	for (i = 0; i < queue_size; i++)
3895 	{
3896 		if (proc == blocked_proc)
3897 			break;
3898 		data->waiter_pids[data->npids++] = proc->pid;
3899 		proc = (PGPROC *) proc->links.next;
3900 	}
3901 
3902 	bproc->num_locks = data->nlocks - bproc->first_lock;
3903 	bproc->num_waiters = data->npids - bproc->first_waiter;
3904 }
3905 
3906 /*
3907  * Returns a list of currently held AccessExclusiveLocks, for use by
3908  * LogStandbySnapshot().  The result is a palloc'd array,
3909  * with the number of elements returned into *nlocks.
3910  *
3911  * XXX This currently takes a lock on all partitions of the lock table,
3912  * but it's possible to do better.  By reference counting locks and storing
3913  * the value in the ProcArray entry for each backend we could tell if any
3914  * locks need recording without having to acquire the partition locks and
3915  * scan the lock table.  Whether that's worth the additional overhead
3916  * is pretty dubious though.
3917  */
3918 xl_standby_lock *
GetRunningTransactionLocks(int * nlocks)3919 GetRunningTransactionLocks(int *nlocks)
3920 {
3921 	xl_standby_lock *accessExclusiveLocks;
3922 	PROCLOCK   *proclock;
3923 	HASH_SEQ_STATUS seqstat;
3924 	int			i;
3925 	int			index;
3926 	int			els;
3927 
3928 	/*
3929 	 * Acquire lock on the entire shared lock data structure.
3930 	 *
3931 	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3932 	 */
3933 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3934 		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3935 
3936 	/* Now we can safely count the number of proclocks */
3937 	els = hash_get_num_entries(LockMethodProcLockHash);
3938 
3939 	/*
3940 	 * Allocating enough space for all locks in the lock table is overkill,
3941 	 * but it's more convenient and faster than having to enlarge the array.
3942 	 */
3943 	accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
3944 
3945 	/* Now scan the tables to copy the data */
3946 	hash_seq_init(&seqstat, LockMethodProcLockHash);
3947 
3948 	/*
3949 	 * If lock is a currently granted AccessExclusiveLock then it will have
3950 	 * just one proclock holder, so locks are never accessed twice in this
3951 	 * particular case. Don't copy this code for use elsewhere because in the
3952 	 * general case this will give you duplicate locks when looking at
3953 	 * non-exclusive lock types.
3954 	 */
3955 	index = 0;
3956 	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3957 	{
3958 		/* make sure this definition matches the one used in LockAcquire */
3959 		if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
3960 			proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
3961 		{
3962 			PGPROC	   *proc = proclock->tag.myProc;
3963 			PGXACT	   *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
3964 			LOCK	   *lock = proclock->tag.myLock;
3965 			TransactionId xid = pgxact->xid;
3966 
3967 			/*
3968 			 * Don't record locks for transactions if we know they have
3969 			 * already issued their WAL record for commit but not yet released
3970 			 * lock. It is still possible that we see locks held by already
3971 			 * complete transactions, if they haven't yet zeroed their xids.
3972 			 */
3973 			if (!TransactionIdIsValid(xid))
3974 				continue;
3975 
3976 			accessExclusiveLocks[index].xid = xid;
3977 			accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3978 			accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
3979 
3980 			index++;
3981 		}
3982 	}
3983 
3984 	Assert(index <= els);
3985 
3986 	/*
3987 	 * And release locks.  We do this in reverse order for two reasons: (1)
3988 	 * Anyone else who needs more than one of the locks will be trying to lock
3989 	 * them in increasing order; we don't want to release the other process
3990 	 * until it can get all the locks it needs. (2) This avoids O(N^2)
3991 	 * behavior inside LWLockRelease.
3992 	 */
3993 	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3994 		LWLockRelease(LockHashPartitionLockByIndex(i));
3995 
3996 	*nlocks = index;
3997 	return accessExclusiveLocks;
3998 }
3999 
4000 /* Provide the textual name of any lock mode */
4001 const char *
GetLockmodeName(LOCKMETHODID lockmethodid,LOCKMODE mode)4002 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
4003 {
4004 	Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
4005 	Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
4006 	return LockMethods[lockmethodid]->lockModeNames[mode];
4007 }
4008 
4009 #ifdef LOCK_DEBUG
4010 /*
4011  * Dump all locks in the given proc's myProcLocks lists.
4012  *
4013  * Caller is responsible for having acquired appropriate LWLocks.
4014  */
4015 void
DumpLocks(PGPROC * proc)4016 DumpLocks(PGPROC *proc)
4017 {
4018 	SHM_QUEUE  *procLocks;
4019 	PROCLOCK   *proclock;
4020 	LOCK	   *lock;
4021 	int			i;
4022 
4023 	if (proc == NULL)
4024 		return;
4025 
4026 	if (proc->waitLock)
4027 		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
4028 
4029 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
4030 	{
4031 		procLocks = &(proc->myProcLocks[i]);
4032 
4033 		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
4034 											 offsetof(PROCLOCK, procLink));
4035 
4036 		while (proclock)
4037 		{
4038 			Assert(proclock->tag.myProc == proc);
4039 
4040 			lock = proclock->tag.myLock;
4041 
4042 			PROCLOCK_PRINT("DumpLocks", proclock);
4043 			LOCK_PRINT("DumpLocks", lock, 0);
4044 
4045 			proclock = (PROCLOCK *)
4046 				SHMQueueNext(procLocks, &proclock->procLink,
4047 							 offsetof(PROCLOCK, procLink));
4048 		}
4049 	}
4050 }
4051 
4052 /*
4053  * Dump all lmgr locks.
4054  *
4055  * Caller is responsible for having acquired appropriate LWLocks.
4056  */
4057 void
DumpAllLocks(void)4058 DumpAllLocks(void)
4059 {
4060 	PGPROC	   *proc;
4061 	PROCLOCK   *proclock;
4062 	LOCK	   *lock;
4063 	HASH_SEQ_STATUS status;
4064 
4065 	proc = MyProc;
4066 
4067 	if (proc && proc->waitLock)
4068 		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
4069 
4070 	hash_seq_init(&status, LockMethodProcLockHash);
4071 
4072 	while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
4073 	{
4074 		PROCLOCK_PRINT("DumpAllLocks", proclock);
4075 
4076 		lock = proclock->tag.myLock;
4077 		if (lock)
4078 			LOCK_PRINT("DumpAllLocks", lock, 0);
4079 		else
4080 			elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
4081 	}
4082 }
4083 #endif							/* LOCK_DEBUG */
4084 
4085 /*
4086  * LOCK 2PC resource manager's routines
4087  */
4088 
4089 /*
4090  * Re-acquire a lock belonging to a transaction that was prepared.
4091  *
4092  * Because this function is run at db startup, re-acquiring the locks should
4093  * never conflict with running transactions because there are none.  We
4094  * assume that the lock state represented by the stored 2PC files is legal.
4095  *
4096  * When switching from Hot Standby mode to normal operation, the locks will
4097  * be already held by the startup process. The locks are acquired for the new
4098  * procs without checking for conflicts, so we don't get a conflict between the
4099  * startup process and the dummy procs, even though we will momentarily have
4100  * a situation where two procs are holding the same AccessExclusiveLock,
4101  * which isn't normally possible because the conflict. If we're in standby
4102  * mode, but a recovery snapshot hasn't been established yet, it's possible
4103  * that some but not all of the locks are already held by the startup process.
4104  *
4105  * This approach is simple, but also a bit dangerous, because if there isn't
4106  * enough shared memory to acquire the locks, an error will be thrown, which
4107  * is promoted to FATAL and recovery will abort, bringing down postmaster.
4108  * A safer approach would be to transfer the locks like we do in
4109  * AtPrepare_Locks, but then again, in hot standby mode it's possible for
4110  * read-only backends to use up all the shared lock memory anyway, so that
4111  * replaying the WAL record that needs to acquire a lock will throw an error
4112  * and PANIC anyway.
4113  */
4114 void
lock_twophase_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4115 lock_twophase_recover(TransactionId xid, uint16 info,
4116 					  void *recdata, uint32 len)
4117 {
4118 	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4119 	PGPROC	   *proc = TwoPhaseGetDummyProc(xid, false);
4120 	LOCKTAG    *locktag;
4121 	LOCKMODE	lockmode;
4122 	LOCKMETHODID lockmethodid;
4123 	LOCK	   *lock;
4124 	PROCLOCK   *proclock;
4125 	PROCLOCKTAG proclocktag;
4126 	bool		found;
4127 	uint32		hashcode;
4128 	uint32		proclock_hashcode;
4129 	int			partition;
4130 	LWLock	   *partitionLock;
4131 	LockMethod	lockMethodTable;
4132 
4133 	Assert(len == sizeof(TwoPhaseLockRecord));
4134 	locktag = &rec->locktag;
4135 	lockmode = rec->lockmode;
4136 	lockmethodid = locktag->locktag_lockmethodid;
4137 
4138 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4139 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4140 	lockMethodTable = LockMethods[lockmethodid];
4141 
4142 	hashcode = LockTagHashCode(locktag);
4143 	partition = LockHashPartition(hashcode);
4144 	partitionLock = LockHashPartitionLock(hashcode);
4145 
4146 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4147 
4148 	/*
4149 	 * Find or create a lock with this tag.
4150 	 */
4151 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4152 												(void *) locktag,
4153 												hashcode,
4154 												HASH_ENTER_NULL,
4155 												&found);
4156 	if (!lock)
4157 	{
4158 		LWLockRelease(partitionLock);
4159 		ereport(ERROR,
4160 				(errcode(ERRCODE_OUT_OF_MEMORY),
4161 				 errmsg("out of shared memory"),
4162 				 errhint("You might need to increase max_locks_per_transaction.")));
4163 	}
4164 
4165 	/*
4166 	 * if it's a new lock object, initialize it
4167 	 */
4168 	if (!found)
4169 	{
4170 		lock->grantMask = 0;
4171 		lock->waitMask = 0;
4172 		SHMQueueInit(&(lock->procLocks));
4173 		ProcQueueInit(&(lock->waitProcs));
4174 		lock->nRequested = 0;
4175 		lock->nGranted = 0;
4176 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
4177 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
4178 		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
4179 	}
4180 	else
4181 	{
4182 		LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
4183 		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
4184 		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
4185 		Assert(lock->nGranted <= lock->nRequested);
4186 	}
4187 
4188 	/*
4189 	 * Create the hash key for the proclock table.
4190 	 */
4191 	proclocktag.myLock = lock;
4192 	proclocktag.myProc = proc;
4193 
4194 	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
4195 
4196 	/*
4197 	 * Find or create a proclock entry with this tag
4198 	 */
4199 	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
4200 														(void *) &proclocktag,
4201 														proclock_hashcode,
4202 														HASH_ENTER_NULL,
4203 														&found);
4204 	if (!proclock)
4205 	{
4206 		/* Oops, not enough shmem for the proclock */
4207 		if (lock->nRequested == 0)
4208 		{
4209 			/*
4210 			 * There are no other requestors of this lock, so garbage-collect
4211 			 * the lock object.  We *must* do this to avoid a permanent leak
4212 			 * of shared memory, because there won't be anything to cause
4213 			 * anyone to release the lock object later.
4214 			 */
4215 			Assert(SHMQueueEmpty(&(lock->procLocks)));
4216 			if (!hash_search_with_hash_value(LockMethodLockHash,
4217 											 (void *) &(lock->tag),
4218 											 hashcode,
4219 											 HASH_REMOVE,
4220 											 NULL))
4221 				elog(PANIC, "lock table corrupted");
4222 		}
4223 		LWLockRelease(partitionLock);
4224 		ereport(ERROR,
4225 				(errcode(ERRCODE_OUT_OF_MEMORY),
4226 				 errmsg("out of shared memory"),
4227 				 errhint("You might need to increase max_locks_per_transaction.")));
4228 	}
4229 
4230 	/*
4231 	 * If new, initialize the new entry
4232 	 */
4233 	if (!found)
4234 	{
4235 		Assert(proc->lockGroupLeader == NULL);
4236 		proclock->groupLeader = proc;
4237 		proclock->holdMask = 0;
4238 		proclock->releaseMask = 0;
4239 		/* Add proclock to appropriate lists */
4240 		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
4241 		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
4242 							 &proclock->procLink);
4243 		PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
4244 	}
4245 	else
4246 	{
4247 		PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
4248 		Assert((proclock->holdMask & ~lock->grantMask) == 0);
4249 	}
4250 
4251 	/*
4252 	 * lock->nRequested and lock->requested[] count the total number of
4253 	 * requests, whether granted or waiting, so increment those immediately.
4254 	 */
4255 	lock->nRequested++;
4256 	lock->requested[lockmode]++;
4257 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
4258 
4259 	/*
4260 	 * We shouldn't already hold the desired lock.
4261 	 */
4262 	if (proclock->holdMask & LOCKBIT_ON(lockmode))
4263 		elog(ERROR, "lock %s on object %u/%u/%u is already held",
4264 			 lockMethodTable->lockModeNames[lockmode],
4265 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
4266 			 lock->tag.locktag_field3);
4267 
4268 	/*
4269 	 * We ignore any possible conflicts and just grant ourselves the lock. Not
4270 	 * only because we don't bother, but also to avoid deadlocks when
4271 	 * switching from standby to normal mode. See function comment.
4272 	 */
4273 	GrantLock(lock, proclock, lockmode);
4274 
4275 	/*
4276 	 * Bump strong lock count, to make sure any fast-path lock requests won't
4277 	 * be granted without consulting the primary lock table.
4278 	 */
4279 	if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
4280 	{
4281 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
4282 
4283 		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
4284 		FastPathStrongRelationLocks->count[fasthashcode]++;
4285 		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
4286 	}
4287 
4288 	LWLockRelease(partitionLock);
4289 }
4290 
4291 /*
4292  * Re-acquire a lock belonging to a transaction that was prepared, when
4293  * starting up into hot standby mode.
4294  */
4295 void
lock_twophase_standby_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4296 lock_twophase_standby_recover(TransactionId xid, uint16 info,
4297 							  void *recdata, uint32 len)
4298 {
4299 	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4300 	LOCKTAG    *locktag;
4301 	LOCKMODE	lockmode;
4302 	LOCKMETHODID lockmethodid;
4303 
4304 	Assert(len == sizeof(TwoPhaseLockRecord));
4305 	locktag = &rec->locktag;
4306 	lockmode = rec->lockmode;
4307 	lockmethodid = locktag->locktag_lockmethodid;
4308 
4309 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4310 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4311 
4312 	if (lockmode == AccessExclusiveLock &&
4313 		locktag->locktag_type == LOCKTAG_RELATION)
4314 	{
4315 		StandbyAcquireAccessExclusiveLock(xid,
4316 										  locktag->locktag_field1 /* dboid */ ,
4317 										  locktag->locktag_field2 /* reloid */ );
4318 	}
4319 }
4320 
4321 
4322 /*
4323  * 2PC processing routine for COMMIT PREPARED case.
4324  *
4325  * Find and release the lock indicated by the 2PC record.
4326  */
4327 void
lock_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)4328 lock_twophase_postcommit(TransactionId xid, uint16 info,
4329 						 void *recdata, uint32 len)
4330 {
4331 	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4332 	PGPROC	   *proc = TwoPhaseGetDummyProc(xid, true);
4333 	LOCKTAG    *locktag;
4334 	LOCKMETHODID lockmethodid;
4335 	LockMethod	lockMethodTable;
4336 
4337 	Assert(len == sizeof(TwoPhaseLockRecord));
4338 	locktag = &rec->locktag;
4339 	lockmethodid = locktag->locktag_lockmethodid;
4340 
4341 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4342 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4343 	lockMethodTable = LockMethods[lockmethodid];
4344 
4345 	LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
4346 }
4347 
4348 /*
4349  * 2PC processing routine for ROLLBACK PREPARED case.
4350  *
4351  * This is actually just the same as the COMMIT case.
4352  */
4353 void
lock_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)4354 lock_twophase_postabort(TransactionId xid, uint16 info,
4355 						void *recdata, uint32 len)
4356 {
4357 	lock_twophase_postcommit(xid, info, recdata, len);
4358 }
4359 
4360 /*
4361  *		VirtualXactLockTableInsert
4362  *
4363  *		Take vxid lock via the fast-path.  There can't be any pre-existing
4364  *		lockers, as we haven't advertised this vxid via the ProcArray yet.
4365  *
4366  *		Since MyProc->fpLocalTransactionId will normally contain the same data
4367  *		as MyProc->lxid, you might wonder if we really need both.  The
4368  *		difference is that MyProc->lxid is set and cleared unlocked, and
4369  *		examined by procarray.c, while fpLocalTransactionId is protected by
4370  *		backendLock and is used only by the locking subsystem.  Doing it this
4371  *		way makes it easier to verify that there are no funny race conditions.
4372  *
4373  *		We don't bother recording this lock in the local lock table, since it's
4374  *		only ever released at the end of a transaction.  Instead,
4375  *		LockReleaseAll() calls VirtualXactLockTableCleanup().
4376  */
4377 void
VirtualXactLockTableInsert(VirtualTransactionId vxid)4378 VirtualXactLockTableInsert(VirtualTransactionId vxid)
4379 {
4380 	Assert(VirtualTransactionIdIsValid(vxid));
4381 
4382 	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4383 
4384 	Assert(MyProc->backendId == vxid.backendId);
4385 	Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
4386 	Assert(MyProc->fpVXIDLock == false);
4387 
4388 	MyProc->fpVXIDLock = true;
4389 	MyProc->fpLocalTransactionId = vxid.localTransactionId;
4390 
4391 	LWLockRelease(&MyProc->backendLock);
4392 }
4393 
4394 /*
4395  *		VirtualXactLockTableCleanup
4396  *
4397  *		Check whether a VXID lock has been materialized; if so, release it,
4398  *		unblocking waiters.
4399  */
4400 void
VirtualXactLockTableCleanup(void)4401 VirtualXactLockTableCleanup(void)
4402 {
4403 	bool		fastpath;
4404 	LocalTransactionId lxid;
4405 
4406 	Assert(MyProc->backendId != InvalidBackendId);
4407 
4408 	/*
4409 	 * Clean up shared memory state.
4410 	 */
4411 	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4412 
4413 	fastpath = MyProc->fpVXIDLock;
4414 	lxid = MyProc->fpLocalTransactionId;
4415 	MyProc->fpVXIDLock = false;
4416 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
4417 
4418 	LWLockRelease(&MyProc->backendLock);
4419 
4420 	/*
4421 	 * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
4422 	 * that means someone transferred the lock to the main lock table.
4423 	 */
4424 	if (!fastpath && LocalTransactionIdIsValid(lxid))
4425 	{
4426 		VirtualTransactionId vxid;
4427 		LOCKTAG		locktag;
4428 
4429 		vxid.backendId = MyBackendId;
4430 		vxid.localTransactionId = lxid;
4431 		SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
4432 
4433 		LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
4434 							 &locktag, ExclusiveLock, false);
4435 	}
4436 }
4437 
4438 /*
4439  *		XactLockForVirtualXact
4440  *
4441  * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
4442  * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid).  Unlike those
4443  * functions, it assumes "xid" is never a subtransaction and that "xid" is
4444  * prepared, committed, or aborted.
4445  *
4446  * If !TransactionIdIsValid(xid), this locks every prepared XID having been
4447  * known as "vxid" before its PREPARE TRANSACTION.
4448  */
4449 static bool
XactLockForVirtualXact(VirtualTransactionId vxid,TransactionId xid,bool wait)4450 XactLockForVirtualXact(VirtualTransactionId vxid,
4451 					   TransactionId xid, bool wait)
4452 {
4453 	bool		more = false;
4454 
4455 	/* There is no point to wait for 2PCs if you have no 2PCs. */
4456 	if (max_prepared_xacts == 0)
4457 		return true;
4458 
4459 	do
4460 	{
4461 		LockAcquireResult lar;
4462 		LOCKTAG		tag;
4463 
4464 		/* Clear state from previous iterations. */
4465 		if (more)
4466 		{
4467 			xid = InvalidTransactionId;
4468 			more = false;
4469 		}
4470 
4471 		/* If we have no xid, try to find one. */
4472 		if (!TransactionIdIsValid(xid))
4473 			xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
4474 		if (!TransactionIdIsValid(xid))
4475 		{
4476 			Assert(!more);
4477 			return true;
4478 		}
4479 
4480 		/* Check or wait for XID completion. */
4481 		SET_LOCKTAG_TRANSACTION(tag, xid);
4482 		lar = LockAcquire(&tag, ShareLock, false, !wait);
4483 		if (lar == LOCKACQUIRE_NOT_AVAIL)
4484 			return false;
4485 		LockRelease(&tag, ShareLock, false);
4486 	} while (more);
4487 
4488 	return true;
4489 }
4490 
4491 /*
4492  *		VirtualXactLock
4493  *
4494  * If wait = true, wait as long as the given VXID or any XID acquired by the
4495  * same transaction is still running.  Then, return true.
4496  *
4497  * If wait = false, just check whether that VXID or one of those XIDs is still
4498  * running, and return true or false.
4499  */
4500 bool
VirtualXactLock(VirtualTransactionId vxid,bool wait)4501 VirtualXactLock(VirtualTransactionId vxid, bool wait)
4502 {
4503 	LOCKTAG		tag;
4504 	PGPROC	   *proc;
4505 	TransactionId xid = InvalidTransactionId;
4506 
4507 	Assert(VirtualTransactionIdIsValid(vxid));
4508 
4509 	if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
4510 		/* no vxid lock; localTransactionId is a normal, locked XID */
4511 		return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
4512 
4513 	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
4514 
4515 	/*
4516 	 * If a lock table entry must be made, this is the PGPROC on whose behalf
4517 	 * it must be done.  Note that the transaction might end or the PGPROC
4518 	 * might be reassigned to a new backend before we get around to examining
4519 	 * it, but it doesn't matter.  If we find upon examination that the
4520 	 * relevant lxid is no longer running here, that's enough to prove that
4521 	 * it's no longer running anywhere.
4522 	 */
4523 	proc = BackendIdGetProc(vxid.backendId);
4524 	if (proc == NULL)
4525 		return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4526 
4527 	/*
4528 	 * We must acquire this lock before checking the backendId and lxid
4529 	 * against the ones we're waiting for.  The target backend will only set
4530 	 * or clear lxid while holding this lock.
4531 	 */
4532 	LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
4533 
4534 	if (proc->backendId != vxid.backendId
4535 		|| proc->fpLocalTransactionId != vxid.localTransactionId)
4536 	{
4537 		/* VXID ended */
4538 		LWLockRelease(&proc->backendLock);
4539 		return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4540 	}
4541 
4542 	/*
4543 	 * If we aren't asked to wait, there's no need to set up a lock table
4544 	 * entry.  The transaction is still in progress, so just return false.
4545 	 */
4546 	if (!wait)
4547 	{
4548 		LWLockRelease(&proc->backendLock);
4549 		return false;
4550 	}
4551 
4552 	/*
4553 	 * OK, we're going to need to sleep on the VXID.  But first, we must set
4554 	 * up the primary lock table entry, if needed (ie, convert the proc's
4555 	 * fast-path lock on its VXID to a regular lock).
4556 	 */
4557 	if (proc->fpVXIDLock)
4558 	{
4559 		PROCLOCK   *proclock;
4560 		uint32		hashcode;
4561 		LWLock	   *partitionLock;
4562 
4563 		hashcode = LockTagHashCode(&tag);
4564 
4565 		partitionLock = LockHashPartitionLock(hashcode);
4566 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4567 
4568 		proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
4569 									&tag, hashcode, ExclusiveLock);
4570 		if (!proclock)
4571 		{
4572 			LWLockRelease(partitionLock);
4573 			LWLockRelease(&proc->backendLock);
4574 			ereport(ERROR,
4575 					(errcode(ERRCODE_OUT_OF_MEMORY),
4576 					 errmsg("out of shared memory"),
4577 					 errhint("You might need to increase max_locks_per_transaction.")));
4578 		}
4579 		GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
4580 
4581 		LWLockRelease(partitionLock);
4582 
4583 		proc->fpVXIDLock = false;
4584 	}
4585 
4586 	/*
4587 	 * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
4588 	 * search.  The proc might have assigned this XID but not yet locked it,
4589 	 * in which case the proc will lock this XID before releasing the VXID.
4590 	 * The backendLock critical section excludes VirtualXactLockTableCleanup(),
4591 	 * so we won't save an XID of a different VXID.  It doesn't matter whether
4592 	 * we save this before or after setting up the primary lock table entry.
4593 	 */
4594 	xid = ProcGlobal->allPgXact[proc->pgprocno].xid;
4595 
4596 	/* Done with proc->fpLockBits */
4597 	LWLockRelease(&proc->backendLock);
4598 
4599 	/* Time to wait. */
4600 	(void) LockAcquire(&tag, ShareLock, false, false);
4601 
4602 	LockRelease(&tag, ShareLock, false);
4603 	return XactLockForVirtualXact(vxid, xid, wait);
4604 }
4605 
4606 /*
4607  * LockWaiterCount
4608  *
4609  * Find the number of lock requester on this locktag
4610  */
4611 int
LockWaiterCount(const LOCKTAG * locktag)4612 LockWaiterCount(const LOCKTAG *locktag)
4613 {
4614 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
4615 	LOCK	   *lock;
4616 	bool		found;
4617 	uint32		hashcode;
4618 	LWLock	   *partitionLock;
4619 	int			waiters = 0;
4620 
4621 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4622 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4623 
4624 	hashcode = LockTagHashCode(locktag);
4625 	partitionLock = LockHashPartitionLock(hashcode);
4626 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4627 
4628 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4629 												(const void *) locktag,
4630 												hashcode,
4631 												HASH_FIND,
4632 												&found);
4633 	if (found)
4634 	{
4635 		Assert(lock != NULL);
4636 		waiters = lock->nRequested;
4637 	}
4638 	LWLockRelease(partitionLock);
4639 
4640 	return waiters;
4641 }
4642