1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *	  POSTGRES primary lock mechanism
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/lmgr/lock.c
12  *
13  * NOTES
14  *	  A lock table is a shared memory hash table.  When
15  *	  a process tries to acquire a lock of a type that conflicts
16  *	  with existing locks, it is put to sleep using the routines
17  *	  in storage/lmgr/proc.c.
18  *
19  *	  For the most part, this code should be invoked via lmgr.c
20  *	  or another lock-management module, not directly.
21  *
22  *	Interface:
23  *
24  *	InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
25  *	LockAcquire(), LockRelease(), LockReleaseAll(),
26  *	LockCheckConflicts(), GrantLock()
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31 
32 #include <signal.h>
33 #include <unistd.h>
34 
35 #include "access/transam.h"
36 #include "access/twophase.h"
37 #include "access/twophase_rmgr.h"
38 #include "access/xact.h"
39 #include "access/xlog.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/sinvaladt.h"
46 #include "storage/spin.h"
47 #include "storage/standby.h"
48 #include "utils/memutils.h"
49 #include "utils/ps_status.h"
50 #include "utils/resowner_private.h"
51 
52 
53 /* This configuration variable is used to set the lock table size */
54 int			max_locks_per_xact; /* set by guc.c */
55 
56 #define NLOCKENTS() \
57 	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
58 
59 
60 /*
61  * Data structures defining the semantics of the standard lock methods.
62  *
63  * The conflict table defines the semantics of the various lock modes.
64  */
65 static const LOCKMASK LockConflicts[] = {
66 	0,
67 
68 	/* AccessShareLock */
69 	LOCKBIT_ON(AccessExclusiveLock),
70 
71 	/* RowShareLock */
72 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
73 
74 	/* RowExclusiveLock */
75 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
76 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
77 
78 	/* ShareUpdateExclusiveLock */
79 	LOCKBIT_ON(ShareUpdateExclusiveLock) |
80 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
81 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
82 
83 	/* ShareLock */
84 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
85 	LOCKBIT_ON(ShareRowExclusiveLock) |
86 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
87 
88 	/* ShareRowExclusiveLock */
89 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
90 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
91 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
92 
93 	/* ExclusiveLock */
94 	LOCKBIT_ON(RowShareLock) |
95 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
96 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
97 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
98 
99 	/* AccessExclusiveLock */
100 	LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
101 	LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
102 	LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
103 	LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)
104 
105 };
106 
107 /* Names of lock modes, for debug printouts */
108 static const char *const lock_mode_names[] =
109 {
110 	"INVALID",
111 	"AccessShareLock",
112 	"RowShareLock",
113 	"RowExclusiveLock",
114 	"ShareUpdateExclusiveLock",
115 	"ShareLock",
116 	"ShareRowExclusiveLock",
117 	"ExclusiveLock",
118 	"AccessExclusiveLock"
119 };
120 
121 #ifndef LOCK_DEBUG
122 static bool Dummy_trace = false;
123 #endif
124 
125 static const LockMethodData default_lockmethod = {
126 	AccessExclusiveLock,		/* highest valid lock mode number */
127 	LockConflicts,
128 	lock_mode_names,
129 #ifdef LOCK_DEBUG
130 	&Trace_locks
131 #else
132 	&Dummy_trace
133 #endif
134 };
135 
136 static const LockMethodData user_lockmethod = {
137 	AccessExclusiveLock,		/* highest valid lock mode number */
138 	LockConflicts,
139 	lock_mode_names,
140 #ifdef LOCK_DEBUG
141 	&Trace_userlocks
142 #else
143 	&Dummy_trace
144 #endif
145 };
146 
147 /*
148  * map from lock method id to the lock table data structures
149  */
150 static const LockMethod LockMethods[] = {
151 	NULL,
152 	&default_lockmethod,
153 	&user_lockmethod
154 };
155 
156 
157 /* Record that's written to 2PC state file when a lock is persisted */
158 typedef struct TwoPhaseLockRecord
159 {
160 	LOCKTAG		locktag;
161 	LOCKMODE	lockmode;
162 } TwoPhaseLockRecord;
163 
164 
165 /*
166  * Count of the number of fast path lock slots we believe to be used.  This
167  * might be higher than the real number if another backend has transferred
168  * our locks to the primary lock table, but it can never be lower than the
169  * real value, since only we can acquire locks on our own behalf.
170  */
171 static int	FastPathLocalUseCount = 0;
172 
173 /* Macros for manipulating proc->fpLockBits */
174 #define FAST_PATH_BITS_PER_SLOT			3
175 #define FAST_PATH_LOCKNUMBER_OFFSET		1
176 #define FAST_PATH_MASK					((1 << FAST_PATH_BITS_PER_SLOT) - 1)
177 #define FAST_PATH_GET_BITS(proc, n) \
178 	(((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
179 #define FAST_PATH_BIT_POSITION(n, l) \
180 	(AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
181 	 AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
182 	 AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
183 	 ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
184 #define FAST_PATH_SET_LOCKMODE(proc, n, l) \
185 	 (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
186 #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
187 	 (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
188 #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
189 	 ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
190 
191 /*
192  * The fast-path lock mechanism is concerned only with relation locks on
193  * unshared relations by backends bound to a database.  The fast-path
194  * mechanism exists mostly to accelerate acquisition and release of locks
195  * that rarely conflict.  Because ShareUpdateExclusiveLock is
196  * self-conflicting, it can't use the fast-path mechanism; but it also does
197  * not conflict with any of the locks that do, so we can ignore it completely.
198  */
199 #define EligibleForRelationFastPath(locktag, mode) \
200 	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
201 	(locktag)->locktag_type == LOCKTAG_RELATION && \
202 	(locktag)->locktag_field1 == MyDatabaseId && \
203 	MyDatabaseId != InvalidOid && \
204 	(mode) < ShareUpdateExclusiveLock)
205 #define ConflictsWithRelationFastPath(locktag, mode) \
206 	((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
207 	(locktag)->locktag_type == LOCKTAG_RELATION && \
208 	(locktag)->locktag_field1 != InvalidOid && \
209 	(mode) > ShareUpdateExclusiveLock)
210 
211 static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
212 static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
213 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
214 							  const LOCKTAG *locktag, uint32 hashcode);
215 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
216 
217 /*
218  * To make the fast-path lock mechanism work, we must have some way of
219  * preventing the use of the fast-path when a conflicting lock might be
220  * present.  We partition* the locktag space into FAST_PATH_HASH_BUCKETS
221  * partitions, and maintain an integer count of the number of "strong" lockers
222  * in each partition.  When any "strong" lockers are present (which is
223  * hopefully not very often), the fast-path mechanism can't be used, and we
224  * must fall back to the slower method of pushing matching locks directly
225  * into the main lock tables.
226  *
227  * The deadlock detector does not know anything about the fast path mechanism,
228  * so any locks that might be involved in a deadlock must be transferred from
229  * the fast-path queues to the main lock table.
230  */
231 
232 #define FAST_PATH_STRONG_LOCK_HASH_BITS			10
233 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
234 	(1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
235 #define FastPathStrongLockHashPartition(hashcode) \
236 	((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
237 
238 typedef struct
239 {
240 	slock_t		mutex;
241 	uint32		count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
242 } FastPathStrongRelationLockData;
243 
244 static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;
245 
246 
247 /*
248  * Pointers to hash tables containing lock state
249  *
250  * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
251  * shared memory; LockMethodLocalHash is local to each backend.
252  */
253 static HTAB *LockMethodLockHash;
254 static HTAB *LockMethodProcLockHash;
255 static HTAB *LockMethodLocalHash;
256 
257 
258 /* private state for error cleanup */
259 static LOCALLOCK *StrongLockInProgress;
260 static LOCALLOCK *awaitedLock;
261 static ResourceOwner awaitedOwner;
262 
263 
264 #ifdef LOCK_DEBUG
265 
266 /*------
267  * The following configuration options are available for lock debugging:
268  *
269  *	   TRACE_LOCKS		-- give a bunch of output what's going on in this file
270  *	   TRACE_USERLOCKS	-- same but for user locks
271  *	   TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
272  *						   (use to avoid output on system tables)
273  *	   TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
274  *	   DEBUG_DEADLOCKS	-- currently dumps locks at untimely occasions ;)
275  *
276  * Furthermore, but in storage/lmgr/lwlock.c:
277  *	   TRACE_LWLOCKS	-- trace lightweight locks (pretty useless)
278  *
279  * Define LOCK_DEBUG at compile time to get all these enabled.
280  * --------
281  */
282 
283 int			Trace_lock_oidmin = FirstNormalObjectId;
284 bool		Trace_locks = false;
285 bool		Trace_userlocks = false;
286 int			Trace_lock_table = 0;
287 bool		Debug_deadlocks = false;
288 
289 
290 inline static bool
LOCK_DEBUG_ENABLED(const LOCKTAG * tag)291 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
292 {
293 	return
294 		(*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
295 		 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
296 		|| (Trace_lock_table &&
297 			(tag->locktag_field2 == Trace_lock_table));
298 }
299 
300 
301 inline static void
LOCK_PRINT(const char * where,const LOCK * lock,LOCKMODE type)302 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
303 {
304 	if (LOCK_DEBUG_ENABLED(&lock->tag))
305 		elog(LOG,
306 			 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
307 			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
308 			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
309 			 where, lock,
310 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
311 			 lock->tag.locktag_field3, lock->tag.locktag_field4,
312 			 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
313 			 lock->grantMask,
314 			 lock->requested[1], lock->requested[2], lock->requested[3],
315 			 lock->requested[4], lock->requested[5], lock->requested[6],
316 			 lock->requested[7], lock->nRequested,
317 			 lock->granted[1], lock->granted[2], lock->granted[3],
318 			 lock->granted[4], lock->granted[5], lock->granted[6],
319 			 lock->granted[7], lock->nGranted,
320 			 lock->waitProcs.size,
321 			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
322 }
323 
324 
325 inline static void
PROCLOCK_PRINT(const char * where,const PROCLOCK * proclockP)326 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
327 {
328 	if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
329 		elog(LOG,
330 			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
331 			 where, proclockP, proclockP->tag.myLock,
332 			 PROCLOCK_LOCKMETHOD(*(proclockP)),
333 			 proclockP->tag.myProc, (int) proclockP->holdMask);
334 }
335 #else							/* not LOCK_DEBUG */
336 
337 #define LOCK_PRINT(where, lock, type)  ((void) 0)
338 #define PROCLOCK_PRINT(where, proclockP)  ((void) 0)
339 #endif   /* not LOCK_DEBUG */
340 
341 
342 static uint32 proclock_hash(const void *key, Size keysize);
343 static void RemoveLocalLock(LOCALLOCK *locallock);
344 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
345 				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
346 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
347 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
348 static void FinishStrongLockAcquire(void);
349 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
350 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
351 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
352 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
353 			PROCLOCK *proclock, LockMethod lockMethodTable);
354 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
355 			LockMethod lockMethodTable, uint32 hashcode,
356 			bool wakeupNeeded);
357 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
358 					 LOCKTAG *locktag, LOCKMODE lockmode,
359 					 bool decrement_strong_lock_count);
360 static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc,
361 							   BlockedProcsData *data);
362 
363 
364 /*
365  * InitLocks -- Initialize the lock manager's data structures.
366  *
367  * This is called from CreateSharedMemoryAndSemaphores(), which see for
368  * more comments.  In the normal postmaster case, the shared hash tables
369  * are created here, as well as a locallock hash table that will remain
370  * unused and empty in the postmaster itself.  Backends inherit the pointers
371  * to the shared tables via fork(), and also inherit an image of the locallock
372  * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
373  * backend re-executes this code to obtain pointers to the already existing
374  * shared hash tables and to create its locallock hash table.
375  */
376 void
InitLocks(void)377 InitLocks(void)
378 {
379 	HASHCTL		info;
380 	long		init_table_size,
381 				max_table_size;
382 	bool		found;
383 
384 	/*
385 	 * Compute init/max size to request for lock hashtables.  Note these
386 	 * calculations must agree with LockShmemSize!
387 	 */
388 	max_table_size = NLOCKENTS();
389 	init_table_size = max_table_size / 2;
390 
391 	/*
392 	 * Allocate hash table for LOCK structs.  This stores per-locked-object
393 	 * information.
394 	 */
395 	MemSet(&info, 0, sizeof(info));
396 	info.keysize = sizeof(LOCKTAG);
397 	info.entrysize = sizeof(LOCK);
398 	info.num_partitions = NUM_LOCK_PARTITIONS;
399 
400 	LockMethodLockHash = ShmemInitHash("LOCK hash",
401 									   init_table_size,
402 									   max_table_size,
403 									   &info,
404 									HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
405 
406 	/* Assume an average of 2 holders per lock */
407 	max_table_size *= 2;
408 	init_table_size *= 2;
409 
410 	/*
411 	 * Allocate hash table for PROCLOCK structs.  This stores
412 	 * per-lock-per-holder information.
413 	 */
414 	info.keysize = sizeof(PROCLOCKTAG);
415 	info.entrysize = sizeof(PROCLOCK);
416 	info.hash = proclock_hash;
417 	info.num_partitions = NUM_LOCK_PARTITIONS;
418 
419 	LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
420 										   init_table_size,
421 										   max_table_size,
422 										   &info,
423 								 HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
424 
425 	/*
426 	 * Allocate fast-path structures.
427 	 */
428 	FastPathStrongRelationLocks =
429 		ShmemInitStruct("Fast Path Strong Relation Lock Data",
430 						sizeof(FastPathStrongRelationLockData), &found);
431 	if (!found)
432 		SpinLockInit(&FastPathStrongRelationLocks->mutex);
433 
434 	/*
435 	 * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
436 	 * counts and resource owner information.
437 	 *
438 	 * The non-shared table could already exist in this process (this occurs
439 	 * when the postmaster is recreating shared memory after a backend crash).
440 	 * If so, delete and recreate it.  (We could simply leave it, since it
441 	 * ought to be empty in the postmaster, but for safety let's zap it.)
442 	 */
443 	if (LockMethodLocalHash)
444 		hash_destroy(LockMethodLocalHash);
445 
446 	info.keysize = sizeof(LOCALLOCKTAG);
447 	info.entrysize = sizeof(LOCALLOCK);
448 
449 	LockMethodLocalHash = hash_create("LOCALLOCK hash",
450 									  16,
451 									  &info,
452 									  HASH_ELEM | HASH_BLOBS);
453 }
454 
455 
456 /*
457  * Fetch the lock method table associated with a given lock
458  */
459 LockMethod
GetLocksMethodTable(const LOCK * lock)460 GetLocksMethodTable(const LOCK *lock)
461 {
462 	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
463 
464 	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
465 	return LockMethods[lockmethodid];
466 }
467 
468 /*
469  * Fetch the lock method table associated with a given locktag
470  */
471 LockMethod
GetLockTagsMethodTable(const LOCKTAG * locktag)472 GetLockTagsMethodTable(const LOCKTAG *locktag)
473 {
474 	LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid;
475 
476 	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
477 	return LockMethods[lockmethodid];
478 }
479 
480 
481 /*
482  * Compute the hash code associated with a LOCKTAG.
483  *
484  * To avoid unnecessary recomputations of the hash code, we try to do this
485  * just once per function, and then pass it around as needed.  Aside from
486  * passing the hashcode to hash_search_with_hash_value(), we can extract
487  * the lock partition number from the hashcode.
488  */
489 uint32
LockTagHashCode(const LOCKTAG * locktag)490 LockTagHashCode(const LOCKTAG *locktag)
491 {
492 	return get_hash_value(LockMethodLockHash, (const void *) locktag);
493 }
494 
495 /*
496  * Compute the hash code associated with a PROCLOCKTAG.
497  *
498  * Because we want to use just one set of partition locks for both the
499  * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
500  * fall into the same partition number as their associated LOCKs.
501  * dynahash.c expects the partition number to be the low-order bits of
502  * the hash code, and therefore a PROCLOCKTAG's hash code must have the
503  * same low-order bits as the associated LOCKTAG's hash code.  We achieve
504  * this with this specialized hash function.
505  */
506 static uint32
proclock_hash(const void * key,Size keysize)507 proclock_hash(const void *key, Size keysize)
508 {
509 	const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
510 	uint32		lockhash;
511 	Datum		procptr;
512 
513 	Assert(keysize == sizeof(PROCLOCKTAG));
514 
515 	/* Look into the associated LOCK object, and compute its hash code */
516 	lockhash = LockTagHashCode(&proclocktag->myLock->tag);
517 
518 	/*
519 	 * To make the hash code also depend on the PGPROC, we xor the proc
520 	 * struct's address into the hash code, left-shifted so that the
521 	 * partition-number bits don't change.  Since this is only a hash, we
522 	 * don't care if we lose high-order bits of the address; use an
523 	 * intermediate variable to suppress cast-pointer-to-int warnings.
524 	 */
525 	procptr = PointerGetDatum(proclocktag->myProc);
526 	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
527 
528 	return lockhash;
529 }
530 
531 /*
532  * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
533  * for its underlying LOCK.
534  *
535  * We use this just to avoid redundant calls of LockTagHashCode().
536  */
537 static inline uint32
ProcLockHashCode(const PROCLOCKTAG * proclocktag,uint32 hashcode)538 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
539 {
540 	uint32		lockhash = hashcode;
541 	Datum		procptr;
542 
543 	/*
544 	 * This must match proclock_hash()!
545 	 */
546 	procptr = PointerGetDatum(proclocktag->myProc);
547 	lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
548 
549 	return lockhash;
550 }
551 
552 /*
553  * Given two lock modes, return whether they would conflict.
554  */
555 bool
DoLockModesConflict(LOCKMODE mode1,LOCKMODE mode2)556 DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
557 {
558 	LockMethod	lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
559 
560 	if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
561 		return true;
562 
563 	return false;
564 }
565 
566 /*
567  * LockHasWaiters -- look up 'locktag' and check if releasing this
568  *		lock would wake up other processes waiting for it.
569  */
570 bool
LockHasWaiters(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)571 LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
572 {
573 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
574 	LockMethod	lockMethodTable;
575 	LOCALLOCKTAG localtag;
576 	LOCALLOCK  *locallock;
577 	LOCK	   *lock;
578 	PROCLOCK   *proclock;
579 	LWLock	   *partitionLock;
580 	bool		hasWaiters = false;
581 
582 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
583 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
584 	lockMethodTable = LockMethods[lockmethodid];
585 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
586 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
587 
588 #ifdef LOCK_DEBUG
589 	if (LOCK_DEBUG_ENABLED(locktag))
590 		elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
591 			 locktag->locktag_field1, locktag->locktag_field2,
592 			 lockMethodTable->lockModeNames[lockmode]);
593 #endif
594 
595 	/*
596 	 * Find the LOCALLOCK entry for this lock and lockmode
597 	 */
598 	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
599 	localtag.lock = *locktag;
600 	localtag.mode = lockmode;
601 
602 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
603 										  (void *) &localtag,
604 										  HASH_FIND, NULL);
605 
606 	/*
607 	 * let the caller print its own error message, too. Do not ereport(ERROR).
608 	 */
609 	if (!locallock || locallock->nLocks <= 0)
610 	{
611 		elog(WARNING, "you don't own a lock of type %s",
612 			 lockMethodTable->lockModeNames[lockmode]);
613 		return false;
614 	}
615 
616 	/*
617 	 * Check the shared lock table.
618 	 */
619 	partitionLock = LockHashPartitionLock(locallock->hashcode);
620 
621 	LWLockAcquire(partitionLock, LW_SHARED);
622 
623 	/*
624 	 * We don't need to re-find the lock or proclock, since we kept their
625 	 * addresses in the locallock table, and they couldn't have been removed
626 	 * while we were holding a lock on them.
627 	 */
628 	lock = locallock->lock;
629 	LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
630 	proclock = locallock->proclock;
631 	PROCLOCK_PRINT("LockHasWaiters: found", proclock);
632 
633 	/*
634 	 * Double-check that we are actually holding a lock of the type we want to
635 	 * release.
636 	 */
637 	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
638 	{
639 		PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
640 		LWLockRelease(partitionLock);
641 		elog(WARNING, "you don't own a lock of type %s",
642 			 lockMethodTable->lockModeNames[lockmode]);
643 		RemoveLocalLock(locallock);
644 		return false;
645 	}
646 
647 	/*
648 	 * Do the checking.
649 	 */
650 	if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
651 		hasWaiters = true;
652 
653 	LWLockRelease(partitionLock);
654 
655 	return hasWaiters;
656 }
657 
658 /*
659  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
660  *		set lock if/when no conflicts.
661  *
662  * Inputs:
663  *	locktag: unique identifier for the lockable object
664  *	lockmode: lock mode to acquire
665  *	sessionLock: if true, acquire lock for session not current transaction
666  *	dontWait: if true, don't wait to acquire lock
667  *
668  * Returns one of:
669  *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
670  *		LOCKACQUIRE_OK				lock successfully acquired
671  *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
672  *		LOCKACQUIRE_ALREADY_CLEAR	incremented count for lock already clear
673  *
674  * In the normal case where dontWait=false and the caller doesn't need to
675  * distinguish a freshly acquired lock from one already taken earlier in
676  * this same transaction, there is no need to examine the return value.
677  *
678  * Side Effects: The lock is acquired and recorded in lock tables.
679  *
680  * NOTE: if we wait for the lock, there is no way to abort the wait
681  * short of aborting the transaction.
682  */
683 LockAcquireResult
LockAcquire(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait)684 LockAcquire(const LOCKTAG *locktag,
685 			LOCKMODE lockmode,
686 			bool sessionLock,
687 			bool dontWait)
688 {
689 	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
690 							   true, NULL);
691 }
692 
693 /*
694  * LockAcquireExtended - allows us to specify additional options
695  *
696  * reportMemoryError specifies whether a lock request that fills the lock
697  * table should generate an ERROR or not.  Passing "false" allows the caller
698  * to attempt to recover from lock-table-full situations, perhaps by forcibly
699  * cancelling other lock holders and then retrying.  Note, however, that the
700  * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use
701  * in combination with dontWait = true, as the cause of failure couldn't be
702  * distinguished.
703  *
704  * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
705  * table entry if a lock is successfully acquired, or NULL if not.
706  */
707 LockAcquireResult
LockAcquireExtended(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait,bool reportMemoryError,LOCALLOCK ** locallockp)708 LockAcquireExtended(const LOCKTAG *locktag,
709 					LOCKMODE lockmode,
710 					bool sessionLock,
711 					bool dontWait,
712 					bool reportMemoryError,
713 					LOCALLOCK **locallockp)
714 {
715 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
716 	LockMethod	lockMethodTable;
717 	LOCALLOCKTAG localtag;
718 	LOCALLOCK  *locallock;
719 	LOCK	   *lock;
720 	PROCLOCK   *proclock;
721 	bool		found;
722 	ResourceOwner owner;
723 	uint32		hashcode;
724 	LWLock	   *partitionLock;
725 	int			status;
726 	bool		log_lock = false;
727 
728 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
729 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
730 	lockMethodTable = LockMethods[lockmethodid];
731 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
732 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
733 
734 	if (RecoveryInProgress() && !InRecovery &&
735 		(locktag->locktag_type == LOCKTAG_OBJECT ||
736 		 locktag->locktag_type == LOCKTAG_RELATION) &&
737 		lockmode > RowExclusiveLock)
738 		ereport(ERROR,
739 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
740 				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
741 						lockMethodTable->lockModeNames[lockmode]),
742 				 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
743 
744 #ifdef LOCK_DEBUG
745 	if (LOCK_DEBUG_ENABLED(locktag))
746 		elog(LOG, "LockAcquire: lock [%u,%u] %s",
747 			 locktag->locktag_field1, locktag->locktag_field2,
748 			 lockMethodTable->lockModeNames[lockmode]);
749 #endif
750 
751 	/* Identify owner for lock */
752 	if (sessionLock)
753 		owner = NULL;
754 	else
755 		owner = CurrentResourceOwner;
756 
757 	/*
758 	 * Find or create a LOCALLOCK entry for this lock and lockmode
759 	 */
760 	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
761 	localtag.lock = *locktag;
762 	localtag.mode = lockmode;
763 
764 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
765 										  (void *) &localtag,
766 										  HASH_ENTER, &found);
767 
768 	/*
769 	 * if it's a new locallock object, initialize it
770 	 */
771 	if (!found)
772 	{
773 		locallock->lock = NULL;
774 		locallock->proclock = NULL;
775 		locallock->hashcode = LockTagHashCode(&(localtag.lock));
776 		locallock->nLocks = 0;
777 		locallock->numLockOwners = 0;
778 		locallock->maxLockOwners = 8;
779 		locallock->holdsStrongLockCount = FALSE;
780 		locallock->lockCleared = false;
781 		locallock->lockOwners = NULL;	/* in case next line fails */
782 		locallock->lockOwners = (LOCALLOCKOWNER *)
783 			MemoryContextAlloc(TopMemoryContext,
784 						  locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
785 	}
786 	else
787 	{
788 		/* Make sure there will be room to remember the lock */
789 		if (locallock->numLockOwners >= locallock->maxLockOwners)
790 		{
791 			int			newsize = locallock->maxLockOwners * 2;
792 
793 			locallock->lockOwners = (LOCALLOCKOWNER *)
794 				repalloc(locallock->lockOwners,
795 						 newsize * sizeof(LOCALLOCKOWNER));
796 			locallock->maxLockOwners = newsize;
797 		}
798 	}
799 	hashcode = locallock->hashcode;
800 
801 	if (locallockp)
802 		*locallockp = locallock;
803 
804 	/*
805 	 * If we already hold the lock, we can just increase the count locally.
806 	 *
807 	 * If lockCleared is already set, caller need not worry about absorbing
808 	 * sinval messages related to the lock's object.
809 	 */
810 	if (locallock->nLocks > 0)
811 	{
812 		GrantLockLocal(locallock, owner);
813 		if (locallock->lockCleared)
814 			return LOCKACQUIRE_ALREADY_CLEAR;
815 		else
816 			return LOCKACQUIRE_ALREADY_HELD;
817 	}
818 
819 	/*
820 	 * Prepare to emit a WAL record if acquisition of this lock needs to be
821 	 * replayed in a standby server.
822 	 *
823 	 * Here we prepare to log; after lock is acquired we'll issue log record.
824 	 * This arrangement simplifies error recovery in case the preparation step
825 	 * fails.
826 	 *
827 	 * Only AccessExclusiveLocks can conflict with lock types that read-only
828 	 * transactions can acquire in a standby server. Make sure this definition
829 	 * matches the one in GetRunningTransactionLocks().
830 	 */
831 	if (lockmode >= AccessExclusiveLock &&
832 		locktag->locktag_type == LOCKTAG_RELATION &&
833 		!RecoveryInProgress() &&
834 		XLogStandbyInfoActive())
835 	{
836 		LogAccessExclusiveLockPrepare();
837 		log_lock = true;
838 	}
839 
840 	/*
841 	 * Attempt to take lock via fast path, if eligible.  But if we remember
842 	 * having filled up the fast path array, we don't attempt to make any
843 	 * further use of it until we release some locks.  It's possible that some
844 	 * other backend has transferred some of those locks to the shared hash
845 	 * table, leaving space free, but it's not worth acquiring the LWLock just
846 	 * to check.  It's also possible that we're acquiring a second or third
847 	 * lock type on a relation we have already locked using the fast-path, but
848 	 * for now we don't worry about that case either.
849 	 */
850 	if (EligibleForRelationFastPath(locktag, lockmode) &&
851 		FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
852 	{
853 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
854 		bool		acquired;
855 
856 		/*
857 		 * LWLockAcquire acts as a memory sequencing point, so it's safe to
858 		 * assume that any strong locker whose increment to
859 		 * FastPathStrongRelationLocks->counts becomes visible after we test
860 		 * it has yet to begin to transfer fast-path locks.
861 		 */
862 		LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
863 		if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
864 			acquired = false;
865 		else
866 			acquired = FastPathGrantRelationLock(locktag->locktag_field2,
867 												 lockmode);
868 		LWLockRelease(&MyProc->backendLock);
869 		if (acquired)
870 		{
871 			/*
872 			 * The locallock might contain stale pointers to some old shared
873 			 * objects; we MUST reset these to null before considering the
874 			 * lock to be acquired via fast-path.
875 			 */
876 			locallock->lock = NULL;
877 			locallock->proclock = NULL;
878 			GrantLockLocal(locallock, owner);
879 			return LOCKACQUIRE_OK;
880 		}
881 	}
882 
883 	/*
884 	 * If this lock could potentially have been taken via the fast-path by
885 	 * some other backend, we must (temporarily) disable further use of the
886 	 * fast-path for this lock tag, and migrate any locks already taken via
887 	 * this method to the main lock table.
888 	 */
889 	if (ConflictsWithRelationFastPath(locktag, lockmode))
890 	{
891 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
892 
893 		BeginStrongLockAcquire(locallock, fasthashcode);
894 		if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
895 										   hashcode))
896 		{
897 			AbortStrongLockAcquire();
898 			if (locallock->nLocks == 0)
899 				RemoveLocalLock(locallock);
900 			if (locallockp)
901 				*locallockp = NULL;
902 			if (reportMemoryError)
903 				ereport(ERROR,
904 						(errcode(ERRCODE_OUT_OF_MEMORY),
905 						 errmsg("out of shared memory"),
906 						 errhint("You might need to increase max_locks_per_transaction.")));
907 			else
908 				return LOCKACQUIRE_NOT_AVAIL;
909 		}
910 	}
911 
912 	/*
913 	 * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
914 	 * take it via the fast-path, either, so we've got to mess with the shared
915 	 * lock table.
916 	 */
917 	partitionLock = LockHashPartitionLock(hashcode);
918 
919 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
920 
921 	/*
922 	 * Find or create lock and proclock entries with this tag
923 	 *
924 	 * Note: if the locallock object already existed, it might have a pointer
925 	 * to the lock already ... but we should not assume that that pointer is
926 	 * valid, since a lock object with zero hold and request counts can go
927 	 * away anytime.  So we have to use SetupLockInTable() to recompute the
928 	 * lock and proclock pointers, even if they're already set.
929 	 */
930 	proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
931 								hashcode, lockmode);
932 	if (!proclock)
933 	{
934 		AbortStrongLockAcquire();
935 		LWLockRelease(partitionLock);
936 		if (locallock->nLocks == 0)
937 			RemoveLocalLock(locallock);
938 		if (locallockp)
939 			*locallockp = NULL;
940 		if (reportMemoryError)
941 			ereport(ERROR,
942 					(errcode(ERRCODE_OUT_OF_MEMORY),
943 					 errmsg("out of shared memory"),
944 					 errhint("You might need to increase max_locks_per_transaction.")));
945 		else
946 			return LOCKACQUIRE_NOT_AVAIL;
947 	}
948 	locallock->proclock = proclock;
949 	lock = proclock->tag.myLock;
950 	locallock->lock = lock;
951 
952 	/*
953 	 * If lock requested conflicts with locks requested by waiters, must join
954 	 * wait queue.  Otherwise, check for conflict with already-held locks.
955 	 * (That's last because most complex check.)
956 	 */
957 	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
958 		status = STATUS_FOUND;
959 	else
960 		status = LockCheckConflicts(lockMethodTable, lockmode,
961 									lock, proclock);
962 
963 	if (status == STATUS_OK)
964 	{
965 		/* No conflict with held or previously requested locks */
966 		GrantLock(lock, proclock, lockmode);
967 		GrantLockLocal(locallock, owner);
968 	}
969 	else
970 	{
971 		Assert(status == STATUS_FOUND);
972 
973 		/*
974 		 * We can't acquire the lock immediately.  If caller specified no
975 		 * blocking, remove useless table entries and return NOT_AVAIL without
976 		 * waiting.
977 		 */
978 		if (dontWait)
979 		{
980 			AbortStrongLockAcquire();
981 			if (proclock->holdMask == 0)
982 			{
983 				uint32		proclock_hashcode;
984 
985 				proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
986 				SHMQueueDelete(&proclock->lockLink);
987 				SHMQueueDelete(&proclock->procLink);
988 				if (!hash_search_with_hash_value(LockMethodProcLockHash,
989 												 (void *) &(proclock->tag),
990 												 proclock_hashcode,
991 												 HASH_REMOVE,
992 												 NULL))
993 					elog(PANIC, "proclock table corrupted");
994 			}
995 			else
996 				PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
997 			lock->nRequested--;
998 			lock->requested[lockmode]--;
999 			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
1000 			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
1001 			Assert(lock->nGranted <= lock->nRequested);
1002 			LWLockRelease(partitionLock);
1003 			if (locallock->nLocks == 0)
1004 				RemoveLocalLock(locallock);
1005 			if (locallockp)
1006 				*locallockp = NULL;
1007 			return LOCKACQUIRE_NOT_AVAIL;
1008 		}
1009 
1010 		/*
1011 		 * Set bitmask of locks this process already holds on this object.
1012 		 */
1013 		MyProc->heldLocks = proclock->holdMask;
1014 
1015 		/*
1016 		 * Sleep till someone wakes me up.
1017 		 */
1018 
1019 		TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
1020 										 locktag->locktag_field2,
1021 										 locktag->locktag_field3,
1022 										 locktag->locktag_field4,
1023 										 locktag->locktag_type,
1024 										 lockmode);
1025 
1026 		WaitOnLock(locallock, owner);
1027 
1028 		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
1029 										locktag->locktag_field2,
1030 										locktag->locktag_field3,
1031 										locktag->locktag_field4,
1032 										locktag->locktag_type,
1033 										lockmode);
1034 
1035 		/*
1036 		 * NOTE: do not do any material change of state between here and
1037 		 * return.  All required changes in locktable state must have been
1038 		 * done when the lock was granted to us --- see notes in WaitOnLock.
1039 		 */
1040 
1041 		/*
1042 		 * Check the proclock entry status, in case something in the ipc
1043 		 * communication doesn't work correctly.
1044 		 */
1045 		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1046 		{
1047 			AbortStrongLockAcquire();
1048 			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
1049 			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
1050 			/* Should we retry ? */
1051 			LWLockRelease(partitionLock);
1052 			elog(ERROR, "LockAcquire failed");
1053 		}
1054 		PROCLOCK_PRINT("LockAcquire: granted", proclock);
1055 		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
1056 	}
1057 
1058 	/*
1059 	 * Lock state is fully up-to-date now; if we error out after this, no
1060 	 * special error cleanup is required.
1061 	 */
1062 	FinishStrongLockAcquire();
1063 
1064 	LWLockRelease(partitionLock);
1065 
1066 	/*
1067 	 * Emit a WAL record if acquisition of this lock needs to be replayed in a
1068 	 * standby server.
1069 	 */
1070 	if (log_lock)
1071 	{
1072 		/*
1073 		 * Decode the locktag back to the original values, to avoid sending
1074 		 * lots of empty bytes with every message.  See lock.h to check how a
1075 		 * locktag is defined for LOCKTAG_RELATION
1076 		 */
1077 		LogAccessExclusiveLock(locktag->locktag_field1,
1078 							   locktag->locktag_field2);
1079 	}
1080 
1081 	return LOCKACQUIRE_OK;
1082 }
1083 
1084 /*
1085  * Find or create LOCK and PROCLOCK objects as needed for a new lock
1086  * request.
1087  *
1088  * Returns the PROCLOCK object, or NULL if we failed to create the objects
1089  * for lack of shared memory.
1090  *
1091  * The appropriate partition lock must be held at entry, and will be
1092  * held at exit.
1093  */
1094 static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable,PGPROC * proc,const LOCKTAG * locktag,uint32 hashcode,LOCKMODE lockmode)1095 SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
1096 				 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
1097 {
1098 	LOCK	   *lock;
1099 	PROCLOCK   *proclock;
1100 	PROCLOCKTAG proclocktag;
1101 	uint32		proclock_hashcode;
1102 	bool		found;
1103 
1104 	/*
1105 	 * Find or create a lock with this tag.
1106 	 */
1107 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1108 												(const void *) locktag,
1109 												hashcode,
1110 												HASH_ENTER_NULL,
1111 												&found);
1112 	if (!lock)
1113 		return NULL;
1114 
1115 	/*
1116 	 * if it's a new lock object, initialize it
1117 	 */
1118 	if (!found)
1119 	{
1120 		lock->grantMask = 0;
1121 		lock->waitMask = 0;
1122 		SHMQueueInit(&(lock->procLocks));
1123 		ProcQueueInit(&(lock->waitProcs));
1124 		lock->nRequested = 0;
1125 		lock->nGranted = 0;
1126 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
1127 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
1128 		LOCK_PRINT("LockAcquire: new", lock, lockmode);
1129 	}
1130 	else
1131 	{
1132 		LOCK_PRINT("LockAcquire: found", lock, lockmode);
1133 		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1134 		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1135 		Assert(lock->nGranted <= lock->nRequested);
1136 	}
1137 
1138 	/*
1139 	 * Create the hash key for the proclock table.
1140 	 */
1141 	proclocktag.myLock = lock;
1142 	proclocktag.myProc = proc;
1143 
1144 	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
1145 
1146 	/*
1147 	 * Find or create a proclock entry with this tag
1148 	 */
1149 	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
1150 														(void *) &proclocktag,
1151 														proclock_hashcode,
1152 														HASH_ENTER_NULL,
1153 														&found);
1154 	if (!proclock)
1155 	{
1156 		/* Ooops, not enough shmem for the proclock */
1157 		if (lock->nRequested == 0)
1158 		{
1159 			/*
1160 			 * There are no other requestors of this lock, so garbage-collect
1161 			 * the lock object.  We *must* do this to avoid a permanent leak
1162 			 * of shared memory, because there won't be anything to cause
1163 			 * anyone to release the lock object later.
1164 			 */
1165 			Assert(SHMQueueEmpty(&(lock->procLocks)));
1166 			if (!hash_search_with_hash_value(LockMethodLockHash,
1167 											 (void *) &(lock->tag),
1168 											 hashcode,
1169 											 HASH_REMOVE,
1170 											 NULL))
1171 				elog(PANIC, "lock table corrupted");
1172 		}
1173 		return NULL;
1174 	}
1175 
1176 	/*
1177 	 * If new, initialize the new entry
1178 	 */
1179 	if (!found)
1180 	{
1181 		uint32		partition = LockHashPartition(hashcode);
1182 
1183 		/*
1184 		 * It might seem unsafe to access proclock->groupLeader without a
1185 		 * lock, but it's not really.  Either we are initializing a proclock
1186 		 * on our own behalf, in which case our group leader isn't changing
1187 		 * because the group leader for a process can only ever be changed by
1188 		 * the process itself; or else we are transferring a fast-path lock to
1189 		 * the main lock table, in which case that process can't change it's
1190 		 * lock group leader without first releasing all of its locks (and in
1191 		 * particular the one we are currently transferring).
1192 		 */
1193 		proclock->groupLeader = proc->lockGroupLeader != NULL ?
1194 			proc->lockGroupLeader : proc;
1195 		proclock->holdMask = 0;
1196 		proclock->releaseMask = 0;
1197 		/* Add proclock to appropriate lists */
1198 		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1199 		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1200 							 &proclock->procLink);
1201 		PROCLOCK_PRINT("LockAcquire: new", proclock);
1202 	}
1203 	else
1204 	{
1205 		PROCLOCK_PRINT("LockAcquire: found", proclock);
1206 		Assert((proclock->holdMask & ~lock->grantMask) == 0);
1207 
1208 #ifdef CHECK_DEADLOCK_RISK
1209 
1210 		/*
1211 		 * Issue warning if we already hold a lower-level lock on this object
1212 		 * and do not hold a lock of the requested level or higher. This
1213 		 * indicates a deadlock-prone coding practice (eg, we'd have a
1214 		 * deadlock if another backend were following the same code path at
1215 		 * about the same time).
1216 		 *
1217 		 * This is not enabled by default, because it may generate log entries
1218 		 * about user-level coding practices that are in fact safe in context.
1219 		 * It can be enabled to help find system-level problems.
1220 		 *
1221 		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
1222 		 * better to use a table.  For now, though, this works.
1223 		 */
1224 		{
1225 			int			i;
1226 
1227 			for (i = lockMethodTable->numLockModes; i > 0; i--)
1228 			{
1229 				if (proclock->holdMask & LOCKBIT_ON(i))
1230 				{
1231 					if (i >= (int) lockmode)
1232 						break;	/* safe: we have a lock >= req level */
1233 					elog(LOG, "deadlock risk: raising lock level"
1234 						 " from %s to %s on object %u/%u/%u",
1235 						 lockMethodTable->lockModeNames[i],
1236 						 lockMethodTable->lockModeNames[lockmode],
1237 						 lock->tag.locktag_field1, lock->tag.locktag_field2,
1238 						 lock->tag.locktag_field3);
1239 					break;
1240 				}
1241 			}
1242 		}
1243 #endif   /* CHECK_DEADLOCK_RISK */
1244 	}
1245 
1246 	/*
1247 	 * lock->nRequested and lock->requested[] count the total number of
1248 	 * requests, whether granted or waiting, so increment those immediately.
1249 	 * The other counts don't increment till we get the lock.
1250 	 */
1251 	lock->nRequested++;
1252 	lock->requested[lockmode]++;
1253 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1254 
1255 	/*
1256 	 * We shouldn't already hold the desired lock; else locallock table is
1257 	 * broken.
1258 	 */
1259 	if (proclock->holdMask & LOCKBIT_ON(lockmode))
1260 		elog(ERROR, "lock %s on object %u/%u/%u is already held",
1261 			 lockMethodTable->lockModeNames[lockmode],
1262 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
1263 			 lock->tag.locktag_field3);
1264 
1265 	return proclock;
1266 }
1267 
1268 /*
1269  * Subroutine to free a locallock entry
1270  */
1271 static void
RemoveLocalLock(LOCALLOCK * locallock)1272 RemoveLocalLock(LOCALLOCK *locallock)
1273 {
1274 	int			i;
1275 
1276 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
1277 	{
1278 		if (locallock->lockOwners[i].owner != NULL)
1279 			ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
1280 	}
1281 	locallock->numLockOwners = 0;
1282 	if (locallock->lockOwners != NULL)
1283 		pfree(locallock->lockOwners);
1284 	locallock->lockOwners = NULL;
1285 
1286 	if (locallock->holdsStrongLockCount)
1287 	{
1288 		uint32		fasthashcode;
1289 
1290 		fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1291 
1292 		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1293 		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1294 		FastPathStrongRelationLocks->count[fasthashcode]--;
1295 		locallock->holdsStrongLockCount = FALSE;
1296 		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1297 	}
1298 
1299 	if (!hash_search(LockMethodLocalHash,
1300 					 (void *) &(locallock->tag),
1301 					 HASH_REMOVE, NULL))
1302 		elog(WARNING, "locallock table corrupted");
1303 }
1304 
1305 /*
1306  * LockCheckConflicts -- test whether requested lock conflicts
1307  *		with those already granted
1308  *
1309  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1310  *
1311  * NOTES:
1312  *		Here's what makes this complicated: one process's locks don't
1313  * conflict with one another, no matter what purpose they are held for
1314  * (eg, session and transaction locks do not conflict).  Nor do the locks
1315  * of one process in a lock group conflict with those of another process in
1316  * the same group.  So, we must subtract off these locks when determining
1317  * whether the requested new lock conflicts with those already held.
1318  */
1319 int
LockCheckConflicts(LockMethod lockMethodTable,LOCKMODE lockmode,LOCK * lock,PROCLOCK * proclock)1320 LockCheckConflicts(LockMethod lockMethodTable,
1321 				   LOCKMODE lockmode,
1322 				   LOCK *lock,
1323 				   PROCLOCK *proclock)
1324 {
1325 	int			numLockModes = lockMethodTable->numLockModes;
1326 	LOCKMASK	myLocks;
1327 	int			conflictMask = lockMethodTable->conflictTab[lockmode];
1328 	int			conflictsRemaining[MAX_LOCKMODES];
1329 	int			totalConflictsRemaining = 0;
1330 	int			i;
1331 	SHM_QUEUE  *procLocks;
1332 	PROCLOCK   *otherproclock;
1333 
1334 	/*
1335 	 * first check for global conflicts: If no locks conflict with my request,
1336 	 * then I get the lock.
1337 	 *
1338 	 * Checking for conflict: lock->grantMask represents the types of
1339 	 * currently held locks.  conflictTable[lockmode] has a bit set for each
1340 	 * type of lock that conflicts with request.   Bitwise compare tells if
1341 	 * there is a conflict.
1342 	 */
1343 	if (!(conflictMask & lock->grantMask))
1344 	{
1345 		PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1346 		return STATUS_OK;
1347 	}
1348 
1349 	/*
1350 	 * Rats.  Something conflicts.  But it could still be my own lock, or a
1351 	 * lock held by another member of my locking group.  First, figure out how
1352 	 * many conflicts remain after subtracting out any locks I hold myself.
1353 	 */
1354 	myLocks = proclock->holdMask;
1355 	for (i = 1; i <= numLockModes; i++)
1356 	{
1357 		if ((conflictMask & LOCKBIT_ON(i)) == 0)
1358 		{
1359 			conflictsRemaining[i] = 0;
1360 			continue;
1361 		}
1362 		conflictsRemaining[i] = lock->granted[i];
1363 		if (myLocks & LOCKBIT_ON(i))
1364 			--conflictsRemaining[i];
1365 		totalConflictsRemaining += conflictsRemaining[i];
1366 	}
1367 
1368 	/* If no conflicts remain, we get the lock. */
1369 	if (totalConflictsRemaining == 0)
1370 	{
1371 		PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
1372 		return STATUS_OK;
1373 	}
1374 
1375 	/* If no group locking, it's definitely a conflict. */
1376 	if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
1377 	{
1378 		Assert(proclock->tag.myProc == MyProc);
1379 		PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
1380 					   proclock);
1381 		return STATUS_FOUND;
1382 	}
1383 
1384 	/*
1385 	 * Locks held in conflicting modes by members of our own lock group are
1386 	 * not real conflicts; we can subtract those out and see if we still have
1387 	 * a conflict.  This is O(N) in the number of processes holding or
1388 	 * awaiting locks on this object.  We could improve that by making the
1389 	 * shared memory state more complex (and larger) but it doesn't seem worth
1390 	 * it.
1391 	 */
1392 	procLocks = &(lock->procLocks);
1393 	otherproclock = (PROCLOCK *)
1394 		SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
1395 	while (otherproclock != NULL)
1396 	{
1397 		if (proclock != otherproclock &&
1398 			proclock->groupLeader == otherproclock->groupLeader &&
1399 			(otherproclock->holdMask & conflictMask) != 0)
1400 		{
1401 			int			intersectMask = otherproclock->holdMask & conflictMask;
1402 
1403 			for (i = 1; i <= numLockModes; i++)
1404 			{
1405 				if ((intersectMask & LOCKBIT_ON(i)) != 0)
1406 				{
1407 					if (conflictsRemaining[i] <= 0)
1408 						elog(PANIC, "proclocks held do not match lock");
1409 					conflictsRemaining[i]--;
1410 					totalConflictsRemaining--;
1411 				}
1412 			}
1413 
1414 			if (totalConflictsRemaining == 0)
1415 			{
1416 				PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
1417 							   proclock);
1418 				return STATUS_OK;
1419 			}
1420 		}
1421 		otherproclock = (PROCLOCK *)
1422 			SHMQueueNext(procLocks, &otherproclock->lockLink,
1423 						 offsetof(PROCLOCK, lockLink));
1424 	}
1425 
1426 	/* Nope, it's a real conflict. */
1427 	PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
1428 	return STATUS_FOUND;
1429 }
1430 
1431 /*
1432  * GrantLock -- update the lock and proclock data structures to show
1433  *		the lock request has been granted.
1434  *
1435  * NOTE: if proc was blocked, it also needs to be removed from the wait list
1436  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
1437  *
1438  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
1439  * table entry; but since we may be awaking some other process, we can't do
1440  * that here; it's done by GrantLockLocal, instead.
1441  */
1442 void
GrantLock(LOCK * lock,PROCLOCK * proclock,LOCKMODE lockmode)1443 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
1444 {
1445 	lock->nGranted++;
1446 	lock->granted[lockmode]++;
1447 	lock->grantMask |= LOCKBIT_ON(lockmode);
1448 	if (lock->granted[lockmode] == lock->requested[lockmode])
1449 		lock->waitMask &= LOCKBIT_OFF(lockmode);
1450 	proclock->holdMask |= LOCKBIT_ON(lockmode);
1451 	LOCK_PRINT("GrantLock", lock, lockmode);
1452 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1453 	Assert(lock->nGranted <= lock->nRequested);
1454 }
1455 
1456 /*
1457  * UnGrantLock -- opposite of GrantLock.
1458  *
1459  * Updates the lock and proclock data structures to show that the lock
1460  * is no longer held nor requested by the current holder.
1461  *
1462  * Returns true if there were any waiters waiting on the lock that
1463  * should now be woken up with ProcLockWakeup.
1464  */
1465 static bool
UnGrantLock(LOCK * lock,LOCKMODE lockmode,PROCLOCK * proclock,LockMethod lockMethodTable)1466 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
1467 			PROCLOCK *proclock, LockMethod lockMethodTable)
1468 {
1469 	bool		wakeupNeeded = false;
1470 
1471 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1472 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1473 	Assert(lock->nGranted <= lock->nRequested);
1474 
1475 	/*
1476 	 * fix the general lock stats
1477 	 */
1478 	lock->nRequested--;
1479 	lock->requested[lockmode]--;
1480 	lock->nGranted--;
1481 	lock->granted[lockmode]--;
1482 
1483 	if (lock->granted[lockmode] == 0)
1484 	{
1485 		/* change the conflict mask.  No more of this lock type. */
1486 		lock->grantMask &= LOCKBIT_OFF(lockmode);
1487 	}
1488 
1489 	LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
1490 
1491 	/*
1492 	 * We need only run ProcLockWakeup if the released lock conflicts with at
1493 	 * least one of the lock types requested by waiter(s).  Otherwise whatever
1494 	 * conflict made them wait must still exist.  NOTE: before MVCC, we could
1495 	 * skip wakeup if lock->granted[lockmode] was still positive. But that's
1496 	 * not true anymore, because the remaining granted locks might belong to
1497 	 * some waiter, who could now be awakened because he doesn't conflict with
1498 	 * his own locks.
1499 	 */
1500 	if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1501 		wakeupNeeded = true;
1502 
1503 	/*
1504 	 * Now fix the per-proclock state.
1505 	 */
1506 	proclock->holdMask &= LOCKBIT_OFF(lockmode);
1507 	PROCLOCK_PRINT("UnGrantLock: updated", proclock);
1508 
1509 	return wakeupNeeded;
1510 }
1511 
1512 /*
1513  * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
1514  * proclock and lock objects if possible, and call ProcLockWakeup if there
1515  * are remaining requests and the caller says it's OK.  (Normally, this
1516  * should be called after UnGrantLock, and wakeupNeeded is the result from
1517  * UnGrantLock.)
1518  *
1519  * The appropriate partition lock must be held at entry, and will be
1520  * held at exit.
1521  */
1522 static void
CleanUpLock(LOCK * lock,PROCLOCK * proclock,LockMethod lockMethodTable,uint32 hashcode,bool wakeupNeeded)1523 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1524 			LockMethod lockMethodTable, uint32 hashcode,
1525 			bool wakeupNeeded)
1526 {
1527 	/*
1528 	 * If this was my last hold on this lock, delete my entry in the proclock
1529 	 * table.
1530 	 */
1531 	if (proclock->holdMask == 0)
1532 	{
1533 		uint32		proclock_hashcode;
1534 
1535 		PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
1536 		SHMQueueDelete(&proclock->lockLink);
1537 		SHMQueueDelete(&proclock->procLink);
1538 		proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1539 		if (!hash_search_with_hash_value(LockMethodProcLockHash,
1540 										 (void *) &(proclock->tag),
1541 										 proclock_hashcode,
1542 										 HASH_REMOVE,
1543 										 NULL))
1544 			elog(PANIC, "proclock table corrupted");
1545 	}
1546 
1547 	if (lock->nRequested == 0)
1548 	{
1549 		/*
1550 		 * The caller just released the last lock, so garbage-collect the lock
1551 		 * object.
1552 		 */
1553 		LOCK_PRINT("CleanUpLock: deleting", lock, 0);
1554 		Assert(SHMQueueEmpty(&(lock->procLocks)));
1555 		if (!hash_search_with_hash_value(LockMethodLockHash,
1556 										 (void *) &(lock->tag),
1557 										 hashcode,
1558 										 HASH_REMOVE,
1559 										 NULL))
1560 			elog(PANIC, "lock table corrupted");
1561 	}
1562 	else if (wakeupNeeded)
1563 	{
1564 		/* There are waiters on this lock, so wake them up. */
1565 		ProcLockWakeup(lockMethodTable, lock);
1566 	}
1567 }
1568 
1569 /*
1570  * GrantLockLocal -- update the locallock data structures to show
1571  *		the lock request has been granted.
1572  *
1573  * We expect that LockAcquire made sure there is room to add a new
1574  * ResourceOwner entry.
1575  */
1576 static void
GrantLockLocal(LOCALLOCK * locallock,ResourceOwner owner)1577 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1578 {
1579 	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1580 	int			i;
1581 
1582 	Assert(locallock->numLockOwners < locallock->maxLockOwners);
1583 	/* Count the total */
1584 	locallock->nLocks++;
1585 	/* Count the per-owner lock */
1586 	for (i = 0; i < locallock->numLockOwners; i++)
1587 	{
1588 		if (lockOwners[i].owner == owner)
1589 		{
1590 			lockOwners[i].nLocks++;
1591 			return;
1592 		}
1593 	}
1594 	lockOwners[i].owner = owner;
1595 	lockOwners[i].nLocks = 1;
1596 	locallock->numLockOwners++;
1597 	if (owner != NULL)
1598 		ResourceOwnerRememberLock(owner, locallock);
1599 }
1600 
1601 /*
1602  * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
1603  * and arrange for error cleanup if it fails
1604  */
1605 static void
BeginStrongLockAcquire(LOCALLOCK * locallock,uint32 fasthashcode)1606 BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
1607 {
1608 	Assert(StrongLockInProgress == NULL);
1609 	Assert(locallock->holdsStrongLockCount == FALSE);
1610 
1611 	/*
1612 	 * Adding to a memory location is not atomic, so we take a spinlock to
1613 	 * ensure we don't collide with someone else trying to bump the count at
1614 	 * the same time.
1615 	 *
1616 	 * XXX: It might be worth considering using an atomic fetch-and-add
1617 	 * instruction here, on architectures where that is supported.
1618 	 */
1619 
1620 	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1621 	FastPathStrongRelationLocks->count[fasthashcode]++;
1622 	locallock->holdsStrongLockCount = TRUE;
1623 	StrongLockInProgress = locallock;
1624 	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1625 }
1626 
1627 /*
1628  * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
1629  * acquisition once it's no longer needed
1630  */
1631 static void
FinishStrongLockAcquire(void)1632 FinishStrongLockAcquire(void)
1633 {
1634 	StrongLockInProgress = NULL;
1635 }
1636 
1637 /*
1638  * AbortStrongLockAcquire - undo strong lock state changes performed by
1639  * BeginStrongLockAcquire.
1640  */
1641 void
AbortStrongLockAcquire(void)1642 AbortStrongLockAcquire(void)
1643 {
1644 	uint32		fasthashcode;
1645 	LOCALLOCK  *locallock = StrongLockInProgress;
1646 
1647 	if (locallock == NULL)
1648 		return;
1649 
1650 	fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1651 	Assert(locallock->holdsStrongLockCount == TRUE);
1652 	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1653 	Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1654 	FastPathStrongRelationLocks->count[fasthashcode]--;
1655 	locallock->holdsStrongLockCount = FALSE;
1656 	StrongLockInProgress = NULL;
1657 	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1658 }
1659 
1660 /*
1661  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1662  *		WaitOnLock on.
1663  *
1664  * proc.c needs this for the case where we are booted off the lock by
1665  * timeout, but discover that someone granted us the lock anyway.
1666  *
1667  * We could just export GrantLockLocal, but that would require including
1668  * resowner.h in lock.h, which creates circularity.
1669  */
1670 void
GrantAwaitedLock(void)1671 GrantAwaitedLock(void)
1672 {
1673 	GrantLockLocal(awaitedLock, awaitedOwner);
1674 }
1675 
1676 /*
1677  * MarkLockClear -- mark an acquired lock as "clear"
1678  *
1679  * This means that we know we have absorbed all sinval messages that other
1680  * sessions generated before we acquired this lock, and so we can confidently
1681  * assume we know about any catalog changes protected by this lock.
1682  */
1683 void
MarkLockClear(LOCALLOCK * locallock)1684 MarkLockClear(LOCALLOCK *locallock)
1685 {
1686 	Assert(locallock->nLocks > 0);
1687 	locallock->lockCleared = true;
1688 }
1689 
1690 /*
1691  * WaitOnLock -- wait to acquire a lock
1692  *
1693  * Caller must have set MyProc->heldLocks to reflect locks already held
1694  * on the lockable object by this process.
1695  *
1696  * The appropriate partition lock must be held at entry.
1697  */
1698 static void
WaitOnLock(LOCALLOCK * locallock,ResourceOwner owner)1699 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1700 {
1701 	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1702 	LockMethod	lockMethodTable = LockMethods[lockmethodid];
1703 	char	   *volatile new_status = NULL;
1704 
1705 	LOCK_PRINT("WaitOnLock: sleeping on lock",
1706 			   locallock->lock, locallock->tag.mode);
1707 
1708 	/* Report change to waiting status */
1709 	if (update_process_title)
1710 	{
1711 		const char *old_status;
1712 		int			len;
1713 
1714 		old_status = get_ps_display(&len);
1715 		new_status = (char *) palloc(len + 8 + 1);
1716 		memcpy(new_status, old_status, len);
1717 		strcpy(new_status + len, " waiting");
1718 		set_ps_display(new_status, false);
1719 		new_status[len] = '\0'; /* truncate off " waiting" */
1720 	}
1721 	pgstat_report_wait_start(WAIT_LOCK, locallock->tag.lock.locktag_type);
1722 
1723 	awaitedLock = locallock;
1724 	awaitedOwner = owner;
1725 
1726 	/*
1727 	 * NOTE: Think not to put any shared-state cleanup after the call to
1728 	 * ProcSleep, in either the normal or failure path.  The lock state must
1729 	 * be fully set by the lock grantor, or by CheckDeadLock if we give up
1730 	 * waiting for the lock.  This is necessary because of the possibility
1731 	 * that a cancel/die interrupt will interrupt ProcSleep after someone else
1732 	 * grants us the lock, but before we've noticed it. Hence, after granting,
1733 	 * the locktable state must fully reflect the fact that we own the lock;
1734 	 * we can't do additional work on return.
1735 	 *
1736 	 * We can and do use a PG_TRY block to try to clean up after failure, but
1737 	 * this still has a major limitation: elog(FATAL) can occur while waiting
1738 	 * (eg, a "die" interrupt), and then control won't come back here. So all
1739 	 * cleanup of essential state should happen in LockErrorCleanup, not here.
1740 	 * We can use PG_TRY to clear the "waiting" status flags, since doing that
1741 	 * is unimportant if the process exits.
1742 	 */
1743 	PG_TRY();
1744 	{
1745 		if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1746 		{
1747 			/*
1748 			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1749 			 * now.
1750 			 */
1751 			awaitedLock = NULL;
1752 			LOCK_PRINT("WaitOnLock: aborting on lock",
1753 					   locallock->lock, locallock->tag.mode);
1754 			LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1755 
1756 			/*
1757 			 * Now that we aren't holding the partition lock, we can give an
1758 			 * error report including details about the detected deadlock.
1759 			 */
1760 			DeadLockReport();
1761 			/* not reached */
1762 		}
1763 	}
1764 	PG_CATCH();
1765 	{
1766 		/* In this path, awaitedLock remains set until LockErrorCleanup */
1767 
1768 		/* Report change to non-waiting status */
1769 		pgstat_report_wait_end();
1770 		if (update_process_title)
1771 		{
1772 			set_ps_display(new_status, false);
1773 			pfree(new_status);
1774 		}
1775 
1776 		/* and propagate the error */
1777 		PG_RE_THROW();
1778 	}
1779 	PG_END_TRY();
1780 
1781 	awaitedLock = NULL;
1782 
1783 	/* Report change to non-waiting status */
1784 	pgstat_report_wait_end();
1785 	if (update_process_title)
1786 	{
1787 		set_ps_display(new_status, false);
1788 		pfree(new_status);
1789 	}
1790 
1791 	LOCK_PRINT("WaitOnLock: wakeup on lock",
1792 			   locallock->lock, locallock->tag.mode);
1793 }
1794 
1795 /*
1796  * Remove a proc from the wait-queue it is on (caller must know it is on one).
1797  * This is only used when the proc has failed to get the lock, so we set its
1798  * waitStatus to STATUS_ERROR.
1799  *
1800  * Appropriate partition lock must be held by caller.  Also, caller is
1801  * responsible for signaling the proc if needed.
1802  *
1803  * NB: this does not clean up any locallock object that may exist for the lock.
1804  */
1805 void
RemoveFromWaitQueue(PGPROC * proc,uint32 hashcode)1806 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1807 {
1808 	LOCK	   *waitLock = proc->waitLock;
1809 	PROCLOCK   *proclock = proc->waitProcLock;
1810 	LOCKMODE	lockmode = proc->waitLockMode;
1811 	LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1812 
1813 	/* Make sure proc is waiting */
1814 	Assert(proc->waitStatus == STATUS_WAITING);
1815 	Assert(proc->links.next != NULL);
1816 	Assert(waitLock);
1817 	Assert(waitLock->waitProcs.size > 0);
1818 	Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1819 
1820 	/* Remove proc from lock's wait queue */
1821 	SHMQueueDelete(&(proc->links));
1822 	waitLock->waitProcs.size--;
1823 
1824 	/* Undo increments of request counts by waiting process */
1825 	Assert(waitLock->nRequested > 0);
1826 	Assert(waitLock->nRequested > proc->waitLock->nGranted);
1827 	waitLock->nRequested--;
1828 	Assert(waitLock->requested[lockmode] > 0);
1829 	waitLock->requested[lockmode]--;
1830 	/* don't forget to clear waitMask bit if appropriate */
1831 	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1832 		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1833 
1834 	/* Clean up the proc's own state, and pass it the ok/fail signal */
1835 	proc->waitLock = NULL;
1836 	proc->waitProcLock = NULL;
1837 	proc->waitStatus = STATUS_ERROR;
1838 
1839 	/*
1840 	 * Delete the proclock immediately if it represents no already-held locks.
1841 	 * (This must happen now because if the owner of the lock decides to
1842 	 * release it, and the requested/granted counts then go to zero,
1843 	 * LockRelease expects there to be no remaining proclocks.) Then see if
1844 	 * any other waiters for the lock can be woken up now.
1845 	 */
1846 	CleanUpLock(waitLock, proclock,
1847 				LockMethods[lockmethodid], hashcode,
1848 				true);
1849 }
1850 
1851 /*
1852  * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
1853  *		Release a session lock if 'sessionLock' is true, else release a
1854  *		regular transaction lock.
1855  *
1856  * Side Effects: find any waiting processes that are now wakable,
1857  *		grant them their requested locks and awaken them.
1858  *		(We have to grant the lock here to avoid a race between
1859  *		the waking process and any new process to
1860  *		come along and request the lock.)
1861  */
1862 bool
LockRelease(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)1863 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1864 {
1865 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
1866 	LockMethod	lockMethodTable;
1867 	LOCALLOCKTAG localtag;
1868 	LOCALLOCK  *locallock;
1869 	LOCK	   *lock;
1870 	PROCLOCK   *proclock;
1871 	LWLock	   *partitionLock;
1872 	bool		wakeupNeeded;
1873 
1874 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1875 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1876 	lockMethodTable = LockMethods[lockmethodid];
1877 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
1878 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
1879 
1880 #ifdef LOCK_DEBUG
1881 	if (LOCK_DEBUG_ENABLED(locktag))
1882 		elog(LOG, "LockRelease: lock [%u,%u] %s",
1883 			 locktag->locktag_field1, locktag->locktag_field2,
1884 			 lockMethodTable->lockModeNames[lockmode]);
1885 #endif
1886 
1887 	/*
1888 	 * Find the LOCALLOCK entry for this lock and lockmode
1889 	 */
1890 	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
1891 	localtag.lock = *locktag;
1892 	localtag.mode = lockmode;
1893 
1894 	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1895 										  (void *) &localtag,
1896 										  HASH_FIND, NULL);
1897 
1898 	/*
1899 	 * let the caller print its own error message, too. Do not ereport(ERROR).
1900 	 */
1901 	if (!locallock || locallock->nLocks <= 0)
1902 	{
1903 		elog(WARNING, "you don't own a lock of type %s",
1904 			 lockMethodTable->lockModeNames[lockmode]);
1905 		return FALSE;
1906 	}
1907 
1908 	/*
1909 	 * Decrease the count for the resource owner.
1910 	 */
1911 	{
1912 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1913 		ResourceOwner owner;
1914 		int			i;
1915 
1916 		/* Identify owner for lock */
1917 		if (sessionLock)
1918 			owner = NULL;
1919 		else
1920 			owner = CurrentResourceOwner;
1921 
1922 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
1923 		{
1924 			if (lockOwners[i].owner == owner)
1925 			{
1926 				Assert(lockOwners[i].nLocks > 0);
1927 				if (--lockOwners[i].nLocks == 0)
1928 				{
1929 					if (owner != NULL)
1930 						ResourceOwnerForgetLock(owner, locallock);
1931 					/* compact out unused slot */
1932 					locallock->numLockOwners--;
1933 					if (i < locallock->numLockOwners)
1934 						lockOwners[i] = lockOwners[locallock->numLockOwners];
1935 				}
1936 				break;
1937 			}
1938 		}
1939 		if (i < 0)
1940 		{
1941 			/* don't release a lock belonging to another owner */
1942 			elog(WARNING, "you don't own a lock of type %s",
1943 				 lockMethodTable->lockModeNames[lockmode]);
1944 			return FALSE;
1945 		}
1946 	}
1947 
1948 	/*
1949 	 * Decrease the total local count.  If we're still holding the lock, we're
1950 	 * done.
1951 	 */
1952 	locallock->nLocks--;
1953 
1954 	if (locallock->nLocks > 0)
1955 		return TRUE;
1956 
1957 	/*
1958 	 * At this point we can no longer suppose we are clear of invalidation
1959 	 * messages related to this lock.  Although we'll delete the LOCALLOCK
1960 	 * object before any intentional return from this routine, it seems worth
1961 	 * the trouble to explicitly reset lockCleared right now, just in case
1962 	 * some error prevents us from deleting the LOCALLOCK.
1963 	 */
1964 	locallock->lockCleared = false;
1965 
1966 	/* Attempt fast release of any lock eligible for the fast path. */
1967 	if (EligibleForRelationFastPath(locktag, lockmode) &&
1968 		FastPathLocalUseCount > 0)
1969 	{
1970 		bool		released;
1971 
1972 		/*
1973 		 * We might not find the lock here, even if we originally entered it
1974 		 * here.  Another backend may have moved it to the main table.
1975 		 */
1976 		LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
1977 		released = FastPathUnGrantRelationLock(locktag->locktag_field2,
1978 											   lockmode);
1979 		LWLockRelease(&MyProc->backendLock);
1980 		if (released)
1981 		{
1982 			RemoveLocalLock(locallock);
1983 			return TRUE;
1984 		}
1985 	}
1986 
1987 	/*
1988 	 * Otherwise we've got to mess with the shared lock table.
1989 	 */
1990 	partitionLock = LockHashPartitionLock(locallock->hashcode);
1991 
1992 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1993 
1994 	/*
1995 	 * Normally, we don't need to re-find the lock or proclock, since we kept
1996 	 * their addresses in the locallock table, and they couldn't have been
1997 	 * removed while we were holding a lock on them.  But it's possible that
1998 	 * the lock was taken fast-path and has since been moved to the main hash
1999 	 * table by another backend, in which case we will need to look up the
2000 	 * objects here.  We assume the lock field is NULL if so.
2001 	 */
2002 	lock = locallock->lock;
2003 	if (!lock)
2004 	{
2005 		PROCLOCKTAG proclocktag;
2006 
2007 		Assert(EligibleForRelationFastPath(locktag, lockmode));
2008 		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2009 													(const void *) locktag,
2010 													locallock->hashcode,
2011 													HASH_FIND,
2012 													NULL);
2013 		if (!lock)
2014 			elog(ERROR, "failed to re-find shared lock object");
2015 		locallock->lock = lock;
2016 
2017 		proclocktag.myLock = lock;
2018 		proclocktag.myProc = MyProc;
2019 		locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2020 													   (void *) &proclocktag,
2021 													   HASH_FIND,
2022 													   NULL);
2023 		if (!locallock->proclock)
2024 			elog(ERROR, "failed to re-find shared proclock object");
2025 	}
2026 	LOCK_PRINT("LockRelease: found", lock, lockmode);
2027 	proclock = locallock->proclock;
2028 	PROCLOCK_PRINT("LockRelease: found", proclock);
2029 
2030 	/*
2031 	 * Double-check that we are actually holding a lock of the type we want to
2032 	 * release.
2033 	 */
2034 	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
2035 	{
2036 		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
2037 		LWLockRelease(partitionLock);
2038 		elog(WARNING, "you don't own a lock of type %s",
2039 			 lockMethodTable->lockModeNames[lockmode]);
2040 		RemoveLocalLock(locallock);
2041 		return FALSE;
2042 	}
2043 
2044 	/*
2045 	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
2046 	 */
2047 	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
2048 
2049 	CleanUpLock(lock, proclock,
2050 				lockMethodTable, locallock->hashcode,
2051 				wakeupNeeded);
2052 
2053 	LWLockRelease(partitionLock);
2054 
2055 	RemoveLocalLock(locallock);
2056 	return TRUE;
2057 }
2058 
2059 /*
2060  * LockReleaseAll -- Release all locks of the specified lock method that
2061  *		are held by the current process.
2062  *
2063  * Well, not necessarily *all* locks.  The available behaviors are:
2064  *		allLocks == true: release all locks including session locks.
2065  *		allLocks == false: release all non-session locks.
2066  */
2067 void
LockReleaseAll(LOCKMETHODID lockmethodid,bool allLocks)2068 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
2069 {
2070 	HASH_SEQ_STATUS status;
2071 	LockMethod	lockMethodTable;
2072 	int			i,
2073 				numLockModes;
2074 	LOCALLOCK  *locallock;
2075 	LOCK	   *lock;
2076 	PROCLOCK   *proclock;
2077 	int			partition;
2078 	bool		have_fast_path_lwlock = false;
2079 
2080 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2081 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2082 	lockMethodTable = LockMethods[lockmethodid];
2083 
2084 #ifdef LOCK_DEBUG
2085 	if (*(lockMethodTable->trace_flag))
2086 		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
2087 #endif
2088 
2089 	/*
2090 	 * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
2091 	 * the only way that the lock we hold on our own VXID can ever get
2092 	 * released: it is always and only released when a toplevel transaction
2093 	 * ends.
2094 	 */
2095 	if (lockmethodid == DEFAULT_LOCKMETHOD)
2096 		VirtualXactLockTableCleanup();
2097 
2098 	numLockModes = lockMethodTable->numLockModes;
2099 
2100 	/*
2101 	 * First we run through the locallock table and get rid of unwanted
2102 	 * entries, then we scan the process's proclocks and get rid of those. We
2103 	 * do this separately because we may have multiple locallock entries
2104 	 * pointing to the same proclock, and we daren't end up with any dangling
2105 	 * pointers.  Fast-path locks are cleaned up during the locallock table
2106 	 * scan, though.
2107 	 */
2108 	hash_seq_init(&status, LockMethodLocalHash);
2109 
2110 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2111 	{
2112 		/*
2113 		 * If the LOCALLOCK entry is unused, we must've run out of shared
2114 		 * memory while trying to set up this lock.  Just forget the local
2115 		 * entry.
2116 		 */
2117 		if (locallock->nLocks == 0)
2118 		{
2119 			RemoveLocalLock(locallock);
2120 			continue;
2121 		}
2122 
2123 		/* Ignore items that are not of the lockmethod to be removed */
2124 		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2125 			continue;
2126 
2127 		/*
2128 		 * If we are asked to release all locks, we can just zap the entry.
2129 		 * Otherwise, must scan to see if there are session locks. We assume
2130 		 * there is at most one lockOwners entry for session locks.
2131 		 */
2132 		if (!allLocks)
2133 		{
2134 			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
2135 
2136 			/* If session lock is above array position 0, move it down to 0 */
2137 			for (i = 0; i < locallock->numLockOwners; i++)
2138 			{
2139 				if (lockOwners[i].owner == NULL)
2140 					lockOwners[0] = lockOwners[i];
2141 				else
2142 					ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
2143 			}
2144 
2145 			if (locallock->numLockOwners > 0 &&
2146 				lockOwners[0].owner == NULL &&
2147 				lockOwners[0].nLocks > 0)
2148 			{
2149 				/* Fix the locallock to show just the session locks */
2150 				locallock->nLocks = lockOwners[0].nLocks;
2151 				locallock->numLockOwners = 1;
2152 				/* We aren't deleting this locallock, so done */
2153 				continue;
2154 			}
2155 			else
2156 				locallock->numLockOwners = 0;
2157 		}
2158 
2159 		/*
2160 		 * If the lock or proclock pointers are NULL, this lock was taken via
2161 		 * the relation fast-path (and is not known to have been transferred).
2162 		 */
2163 		if (locallock->proclock == NULL || locallock->lock == NULL)
2164 		{
2165 			LOCKMODE	lockmode = locallock->tag.mode;
2166 			Oid			relid;
2167 
2168 			/* Verify that a fast-path lock is what we've got. */
2169 			if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
2170 				elog(PANIC, "locallock table corrupted");
2171 
2172 			/*
2173 			 * If we don't currently hold the LWLock that protects our
2174 			 * fast-path data structures, we must acquire it before attempting
2175 			 * to release the lock via the fast-path.  We will continue to
2176 			 * hold the LWLock until we're done scanning the locallock table,
2177 			 * unless we hit a transferred fast-path lock.  (XXX is this
2178 			 * really such a good idea?  There could be a lot of entries ...)
2179 			 */
2180 			if (!have_fast_path_lwlock)
2181 			{
2182 				LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2183 				have_fast_path_lwlock = true;
2184 			}
2185 
2186 			/* Attempt fast-path release. */
2187 			relid = locallock->tag.lock.locktag_field2;
2188 			if (FastPathUnGrantRelationLock(relid, lockmode))
2189 			{
2190 				RemoveLocalLock(locallock);
2191 				continue;
2192 			}
2193 
2194 			/*
2195 			 * Our lock, originally taken via the fast path, has been
2196 			 * transferred to the main lock table.  That's going to require
2197 			 * some extra work, so release our fast-path lock before starting.
2198 			 */
2199 			LWLockRelease(&MyProc->backendLock);
2200 			have_fast_path_lwlock = false;
2201 
2202 			/*
2203 			 * Now dump the lock.  We haven't got a pointer to the LOCK or
2204 			 * PROCLOCK in this case, so we have to handle this a bit
2205 			 * differently than a normal lock release.  Unfortunately, this
2206 			 * requires an extra LWLock acquire-and-release cycle on the
2207 			 * partitionLock, but hopefully it shouldn't happen often.
2208 			 */
2209 			LockRefindAndRelease(lockMethodTable, MyProc,
2210 								 &locallock->tag.lock, lockmode, false);
2211 			RemoveLocalLock(locallock);
2212 			continue;
2213 		}
2214 
2215 		/* Mark the proclock to show we need to release this lockmode */
2216 		if (locallock->nLocks > 0)
2217 			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
2218 
2219 		/* And remove the locallock hashtable entry */
2220 		RemoveLocalLock(locallock);
2221 	}
2222 
2223 	/* Done with the fast-path data structures */
2224 	if (have_fast_path_lwlock)
2225 		LWLockRelease(&MyProc->backendLock);
2226 
2227 	/*
2228 	 * Now, scan each lock partition separately.
2229 	 */
2230 	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
2231 	{
2232 		LWLock	   *partitionLock;
2233 		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
2234 		PROCLOCK   *nextplock;
2235 
2236 		partitionLock = LockHashPartitionLockByIndex(partition);
2237 
2238 		/*
2239 		 * If the proclock list for this partition is empty, we can skip
2240 		 * acquiring the partition lock.  This optimization is trickier than
2241 		 * it looks, because another backend could be in process of adding
2242 		 * something to our proclock list due to promoting one of our
2243 		 * fast-path locks.  However, any such lock must be one that we
2244 		 * decided not to delete above, so it's okay to skip it again now;
2245 		 * we'd just decide not to delete it again.  We must, however, be
2246 		 * careful to re-fetch the list header once we've acquired the
2247 		 * partition lock, to be sure we have a valid, up-to-date pointer.
2248 		 * (There is probably no significant risk if pointer fetch/store is
2249 		 * atomic, but we don't wish to assume that.)
2250 		 *
2251 		 * XXX This argument assumes that the locallock table correctly
2252 		 * represents all of our fast-path locks.  While allLocks mode
2253 		 * guarantees to clean up all of our normal locks regardless of the
2254 		 * locallock situation, we lose that guarantee for fast-path locks.
2255 		 * This is not ideal.
2256 		 */
2257 		if (SHMQueueNext(procLocks, procLocks,
2258 						 offsetof(PROCLOCK, procLink)) == NULL)
2259 			continue;			/* needn't examine this partition */
2260 
2261 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2262 
2263 		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2264 											   offsetof(PROCLOCK, procLink));
2265 			 proclock;
2266 			 proclock = nextplock)
2267 		{
2268 			bool		wakeupNeeded = false;
2269 
2270 			/* Get link first, since we may unlink/delete this proclock */
2271 			nextplock = (PROCLOCK *)
2272 				SHMQueueNext(procLocks, &proclock->procLink,
2273 							 offsetof(PROCLOCK, procLink));
2274 
2275 			Assert(proclock->tag.myProc == MyProc);
2276 
2277 			lock = proclock->tag.myLock;
2278 
2279 			/* Ignore items that are not of the lockmethod to be removed */
2280 			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
2281 				continue;
2282 
2283 			/*
2284 			 * In allLocks mode, force release of all locks even if locallock
2285 			 * table had problems
2286 			 */
2287 			if (allLocks)
2288 				proclock->releaseMask = proclock->holdMask;
2289 			else
2290 				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
2291 
2292 			/*
2293 			 * Ignore items that have nothing to be released, unless they have
2294 			 * holdMask == 0 and are therefore recyclable
2295 			 */
2296 			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
2297 				continue;
2298 
2299 			PROCLOCK_PRINT("LockReleaseAll", proclock);
2300 			LOCK_PRINT("LockReleaseAll", lock, 0);
2301 			Assert(lock->nRequested >= 0);
2302 			Assert(lock->nGranted >= 0);
2303 			Assert(lock->nGranted <= lock->nRequested);
2304 			Assert((proclock->holdMask & ~lock->grantMask) == 0);
2305 
2306 			/*
2307 			 * Release the previously-marked lock modes
2308 			 */
2309 			for (i = 1; i <= numLockModes; i++)
2310 			{
2311 				if (proclock->releaseMask & LOCKBIT_ON(i))
2312 					wakeupNeeded |= UnGrantLock(lock, i, proclock,
2313 												lockMethodTable);
2314 			}
2315 			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
2316 			Assert(lock->nGranted <= lock->nRequested);
2317 			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
2318 
2319 			proclock->releaseMask = 0;
2320 
2321 			/* CleanUpLock will wake up waiters if needed. */
2322 			CleanUpLock(lock, proclock,
2323 						lockMethodTable,
2324 						LockTagHashCode(&lock->tag),
2325 						wakeupNeeded);
2326 		}						/* loop over PROCLOCKs within this partition */
2327 
2328 		LWLockRelease(partitionLock);
2329 	}							/* loop over partitions */
2330 
2331 #ifdef LOCK_DEBUG
2332 	if (*(lockMethodTable->trace_flag))
2333 		elog(LOG, "LockReleaseAll done");
2334 #endif
2335 }
2336 
2337 /*
2338  * LockReleaseSession -- Release all session locks of the specified lock method
2339  *		that are held by the current process.
2340  */
2341 void
LockReleaseSession(LOCKMETHODID lockmethodid)2342 LockReleaseSession(LOCKMETHODID lockmethodid)
2343 {
2344 	HASH_SEQ_STATUS status;
2345 	LOCALLOCK  *locallock;
2346 
2347 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2348 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2349 
2350 	hash_seq_init(&status, LockMethodLocalHash);
2351 
2352 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2353 	{
2354 		/* Ignore items that are not of the specified lock method */
2355 		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2356 			continue;
2357 
2358 		ReleaseLockIfHeld(locallock, true);
2359 	}
2360 }
2361 
2362 /*
2363  * LockReleaseCurrentOwner
2364  *		Release all locks belonging to CurrentResourceOwner
2365  *
2366  * If the caller knows what those locks are, it can pass them as an array.
2367  * That speeds up the call significantly, when a lot of locks are held.
2368  * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
2369  * table to find them.
2370  */
2371 void
LockReleaseCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2372 LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2373 {
2374 	if (locallocks == NULL)
2375 	{
2376 		HASH_SEQ_STATUS status;
2377 		LOCALLOCK  *locallock;
2378 
2379 		hash_seq_init(&status, LockMethodLocalHash);
2380 
2381 		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2382 			ReleaseLockIfHeld(locallock, false);
2383 	}
2384 	else
2385 	{
2386 		int			i;
2387 
2388 		for (i = nlocks - 1; i >= 0; i--)
2389 			ReleaseLockIfHeld(locallocks[i], false);
2390 	}
2391 }
2392 
2393 /*
2394  * ReleaseLockIfHeld
2395  *		Release any session-level locks on this lockable object if sessionLock
2396  *		is true; else, release any locks held by CurrentResourceOwner.
2397  *
2398  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
2399  * locks), but without refactoring LockRelease() we cannot support releasing
2400  * locks belonging to resource owners other than CurrentResourceOwner.
2401  * If we were to refactor, it'd be a good idea to fix it so we don't have to
2402  * do a hashtable lookup of the locallock, too.  However, currently this
2403  * function isn't used heavily enough to justify refactoring for its
2404  * convenience.
2405  */
2406 static void
ReleaseLockIfHeld(LOCALLOCK * locallock,bool sessionLock)2407 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
2408 {
2409 	ResourceOwner owner;
2410 	LOCALLOCKOWNER *lockOwners;
2411 	int			i;
2412 
2413 	/* Identify owner for lock (must match LockRelease!) */
2414 	if (sessionLock)
2415 		owner = NULL;
2416 	else
2417 		owner = CurrentResourceOwner;
2418 
2419 	/* Scan to see if there are any locks belonging to the target owner */
2420 	lockOwners = locallock->lockOwners;
2421 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
2422 	{
2423 		if (lockOwners[i].owner == owner)
2424 		{
2425 			Assert(lockOwners[i].nLocks > 0);
2426 			if (lockOwners[i].nLocks < locallock->nLocks)
2427 			{
2428 				/*
2429 				 * We will still hold this lock after forgetting this
2430 				 * ResourceOwner.
2431 				 */
2432 				locallock->nLocks -= lockOwners[i].nLocks;
2433 				/* compact out unused slot */
2434 				locallock->numLockOwners--;
2435 				if (owner != NULL)
2436 					ResourceOwnerForgetLock(owner, locallock);
2437 				if (i < locallock->numLockOwners)
2438 					lockOwners[i] = lockOwners[locallock->numLockOwners];
2439 			}
2440 			else
2441 			{
2442 				Assert(lockOwners[i].nLocks == locallock->nLocks);
2443 				/* We want to call LockRelease just once */
2444 				lockOwners[i].nLocks = 1;
2445 				locallock->nLocks = 1;
2446 				if (!LockRelease(&locallock->tag.lock,
2447 								 locallock->tag.mode,
2448 								 sessionLock))
2449 					elog(WARNING, "ReleaseLockIfHeld: failed??");
2450 			}
2451 			break;
2452 		}
2453 	}
2454 }
2455 
2456 /*
2457  * LockReassignCurrentOwner
2458  *		Reassign all locks belonging to CurrentResourceOwner to belong
2459  *		to its parent resource owner.
2460  *
2461  * If the caller knows what those locks are, it can pass them as an array.
2462  * That speeds up the call significantly, when a lot of locks are held
2463  * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
2464  * and we'll traverse through our hash table to find them.
2465  */
2466 void
LockReassignCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2467 LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2468 {
2469 	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
2470 
2471 	Assert(parent != NULL);
2472 
2473 	if (locallocks == NULL)
2474 	{
2475 		HASH_SEQ_STATUS status;
2476 		LOCALLOCK  *locallock;
2477 
2478 		hash_seq_init(&status, LockMethodLocalHash);
2479 
2480 		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2481 			LockReassignOwner(locallock, parent);
2482 	}
2483 	else
2484 	{
2485 		int			i;
2486 
2487 		for (i = nlocks - 1; i >= 0; i--)
2488 			LockReassignOwner(locallocks[i], parent);
2489 	}
2490 }
2491 
2492 /*
2493  * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
2494  * CurrentResourceOwner to its parent.
2495  */
2496 static void
LockReassignOwner(LOCALLOCK * locallock,ResourceOwner parent)2497 LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
2498 {
2499 	LOCALLOCKOWNER *lockOwners;
2500 	int			i;
2501 	int			ic = -1;
2502 	int			ip = -1;
2503 
2504 	/*
2505 	 * Scan to see if there are any locks belonging to current owner or its
2506 	 * parent
2507 	 */
2508 	lockOwners = locallock->lockOwners;
2509 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
2510 	{
2511 		if (lockOwners[i].owner == CurrentResourceOwner)
2512 			ic = i;
2513 		else if (lockOwners[i].owner == parent)
2514 			ip = i;
2515 	}
2516 
2517 	if (ic < 0)
2518 		return;					/* no current locks */
2519 
2520 	if (ip < 0)
2521 	{
2522 		/* Parent has no slot, so just give it the child's slot */
2523 		lockOwners[ic].owner = parent;
2524 		ResourceOwnerRememberLock(parent, locallock);
2525 	}
2526 	else
2527 	{
2528 		/* Merge child's count with parent's */
2529 		lockOwners[ip].nLocks += lockOwners[ic].nLocks;
2530 		/* compact out unused slot */
2531 		locallock->numLockOwners--;
2532 		if (ic < locallock->numLockOwners)
2533 			lockOwners[ic] = lockOwners[locallock->numLockOwners];
2534 	}
2535 	ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
2536 }
2537 
2538 /*
2539  * FastPathGrantRelationLock
2540  *		Grant lock using per-backend fast-path array, if there is space.
2541  */
2542 static bool
FastPathGrantRelationLock(Oid relid,LOCKMODE lockmode)2543 FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2544 {
2545 	uint32		f;
2546 	uint32		unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
2547 
2548 	/* Scan for existing entry for this relid, remembering empty slot. */
2549 	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2550 	{
2551 		if (FAST_PATH_GET_BITS(MyProc, f) == 0)
2552 			unused_slot = f;
2553 		else if (MyProc->fpRelId[f] == relid)
2554 		{
2555 			Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
2556 			FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
2557 			return true;
2558 		}
2559 	}
2560 
2561 	/* If no existing entry, use any empty slot. */
2562 	if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
2563 	{
2564 		MyProc->fpRelId[unused_slot] = relid;
2565 		FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
2566 		++FastPathLocalUseCount;
2567 		return true;
2568 	}
2569 
2570 	/* No existing entry, and no empty slot. */
2571 	return false;
2572 }
2573 
2574 /*
2575  * FastPathUnGrantRelationLock
2576  *		Release fast-path lock, if present.  Update backend-private local
2577  *		use count, while we're at it.
2578  */
2579 static bool
FastPathUnGrantRelationLock(Oid relid,LOCKMODE lockmode)2580 FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2581 {
2582 	uint32		f;
2583 	bool		result = false;
2584 
2585 	FastPathLocalUseCount = 0;
2586 	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2587 	{
2588 		if (MyProc->fpRelId[f] == relid
2589 			&& FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2590 		{
2591 			Assert(!result);
2592 			FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2593 			result = true;
2594 			/* we continue iterating so as to update FastPathLocalUseCount */
2595 		}
2596 		if (FAST_PATH_GET_BITS(MyProc, f) != 0)
2597 			++FastPathLocalUseCount;
2598 	}
2599 	return result;
2600 }
2601 
2602 /*
2603  * FastPathTransferRelationLocks
2604  *		Transfer locks matching the given lock tag from per-backend fast-path
2605  *		arrays to the shared hash table.
2606  *
2607  * Returns true if successful, false if ran out of shared memory.
2608  */
2609 static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable,const LOCKTAG * locktag,uint32 hashcode)2610 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2611 							  uint32 hashcode)
2612 {
2613 	LWLock	   *partitionLock = LockHashPartitionLock(hashcode);
2614 	Oid			relid = locktag->locktag_field2;
2615 	uint32		i;
2616 
2617 	/*
2618 	 * Every PGPROC that can potentially hold a fast-path lock is present in
2619 	 * ProcGlobal->allProcs.  Prepared transactions are not, but any
2620 	 * outstanding fast-path locks held by prepared transactions are
2621 	 * transferred to the main lock table.
2622 	 */
2623 	for (i = 0; i < ProcGlobal->allProcCount; i++)
2624 	{
2625 		PGPROC	   *proc = &ProcGlobal->allProcs[i];
2626 		uint32		f;
2627 
2628 		LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
2629 
2630 		/*
2631 		 * If the target backend isn't referencing the same database as the
2632 		 * lock, then we needn't examine the individual relation IDs at all;
2633 		 * none of them can be relevant.
2634 		 *
2635 		 * proc->databaseId is set at backend startup time and never changes
2636 		 * thereafter, so it might be safe to perform this test before
2637 		 * acquiring &proc->backendLock.  In particular, it's certainly safe
2638 		 * to assume that if the target backend holds any fast-path locks, it
2639 		 * must have performed a memory-fencing operation (in particular, an
2640 		 * LWLock acquisition) since setting proc->databaseId.  However, it's
2641 		 * less clear that our backend is certain to have performed a memory
2642 		 * fencing operation since the other backend set proc->databaseId.  So
2643 		 * for now, we test it after acquiring the LWLock just to be safe.
2644 		 */
2645 		if (proc->databaseId != locktag->locktag_field1)
2646 		{
2647 			LWLockRelease(&proc->backendLock);
2648 			continue;
2649 		}
2650 
2651 		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2652 		{
2653 			uint32		lockmode;
2654 
2655 			/* Look for an allocated slot matching the given relid. */
2656 			if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
2657 				continue;
2658 
2659 			/* Find or create lock object. */
2660 			LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2661 			for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
2662 			lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
2663 				 ++lockmode)
2664 			{
2665 				PROCLOCK   *proclock;
2666 
2667 				if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
2668 					continue;
2669 				proclock = SetupLockInTable(lockMethodTable, proc, locktag,
2670 											hashcode, lockmode);
2671 				if (!proclock)
2672 				{
2673 					LWLockRelease(partitionLock);
2674 					LWLockRelease(&proc->backendLock);
2675 					return false;
2676 				}
2677 				GrantLock(proclock->tag.myLock, proclock, lockmode);
2678 				FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
2679 			}
2680 			LWLockRelease(partitionLock);
2681 
2682 			/* No need to examine remaining slots. */
2683 			break;
2684 		}
2685 		LWLockRelease(&proc->backendLock);
2686 	}
2687 	return true;
2688 }
2689 
2690 /*
2691  * FastPathGetLockEntry
2692  *		Return the PROCLOCK for a lock originally taken via the fast-path,
2693  *		transferring it to the primary lock table if necessary.
2694  *
2695  * Note: caller takes care of updating the locallock object.
2696  */
2697 static PROCLOCK *
FastPathGetRelationLockEntry(LOCALLOCK * locallock)2698 FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2699 {
2700 	LockMethod	lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
2701 	LOCKTAG    *locktag = &locallock->tag.lock;
2702 	PROCLOCK   *proclock = NULL;
2703 	LWLock	   *partitionLock = LockHashPartitionLock(locallock->hashcode);
2704 	Oid			relid = locktag->locktag_field2;
2705 	uint32		f;
2706 
2707 	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2708 
2709 	for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2710 	{
2711 		uint32		lockmode;
2712 
2713 		/* Look for an allocated slot matching the given relid. */
2714 		if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
2715 			continue;
2716 
2717 		/* If we don't have a lock of the given mode, forget it! */
2718 		lockmode = locallock->tag.mode;
2719 		if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2720 			break;
2721 
2722 		/* Find or create lock object. */
2723 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2724 
2725 		proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
2726 									locallock->hashcode, lockmode);
2727 		if (!proclock)
2728 		{
2729 			LWLockRelease(partitionLock);
2730 			LWLockRelease(&MyProc->backendLock);
2731 			ereport(ERROR,
2732 					(errcode(ERRCODE_OUT_OF_MEMORY),
2733 					 errmsg("out of shared memory"),
2734 					 errhint("You might need to increase max_locks_per_transaction.")));
2735 		}
2736 		GrantLock(proclock->tag.myLock, proclock, lockmode);
2737 		FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2738 
2739 		LWLockRelease(partitionLock);
2740 
2741 		/* No need to examine remaining slots. */
2742 		break;
2743 	}
2744 
2745 	LWLockRelease(&MyProc->backendLock);
2746 
2747 	/* Lock may have already been transferred by some other backend. */
2748 	if (proclock == NULL)
2749 	{
2750 		LOCK	   *lock;
2751 		PROCLOCKTAG proclocktag;
2752 		uint32		proclock_hashcode;
2753 
2754 		LWLockAcquire(partitionLock, LW_SHARED);
2755 
2756 		lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2757 													(void *) locktag,
2758 													locallock->hashcode,
2759 													HASH_FIND,
2760 													NULL);
2761 		if (!lock)
2762 			elog(ERROR, "failed to re-find shared lock object");
2763 
2764 		proclocktag.myLock = lock;
2765 		proclocktag.myProc = MyProc;
2766 
2767 		proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
2768 		proclock = (PROCLOCK *)
2769 			hash_search_with_hash_value(LockMethodProcLockHash,
2770 										(void *) &proclocktag,
2771 										proclock_hashcode,
2772 										HASH_FIND,
2773 										NULL);
2774 		if (!proclock)
2775 			elog(ERROR, "failed to re-find shared proclock object");
2776 		LWLockRelease(partitionLock);
2777 	}
2778 
2779 	return proclock;
2780 }
2781 
2782 /*
2783  * GetLockConflicts
2784  *		Get an array of VirtualTransactionIds of xacts currently holding locks
2785  *		that would conflict with the specified lock/lockmode.
2786  *		xacts merely awaiting such a lock are NOT reported.
2787  *
2788  * The result array is palloc'd and is terminated with an invalid VXID.
2789  *
2790  * Of course, the result could be out of date by the time it's returned, so
2791  * use of this function has to be thought about carefully.  Similarly, a
2792  * PGPROC with no "lxid" will be considered non-conflicting regardless of any
2793  * lock it holds.  Existing callers don't care about a locker after that
2794  * locker's pg_xact updates complete.  CommitTransaction() clears "lxid" after
2795  * pg_xact updates and before releasing locks.
2796  *
2797  * Note we never include the current xact's vxid in the result array,
2798  * since an xact never blocks itself.
2799  */
2800 VirtualTransactionId *
GetLockConflicts(const LOCKTAG * locktag,LOCKMODE lockmode)2801 GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
2802 {
2803 	static VirtualTransactionId *vxids;
2804 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
2805 	LockMethod	lockMethodTable;
2806 	LOCK	   *lock;
2807 	LOCKMASK	conflictMask;
2808 	SHM_QUEUE  *procLocks;
2809 	PROCLOCK   *proclock;
2810 	uint32		hashcode;
2811 	LWLock	   *partitionLock;
2812 	int			count = 0;
2813 	int			fast_count = 0;
2814 
2815 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2816 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2817 	lockMethodTable = LockMethods[lockmethodid];
2818 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
2819 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
2820 
2821 	/*
2822 	 * Allocate memory to store results, and fill with InvalidVXID.  We only
2823 	 * need enough space for MaxBackends + max_prepared_xacts + a terminator.
2824 	 * InHotStandby allocate once in TopMemoryContext.
2825 	 */
2826 	if (InHotStandby)
2827 	{
2828 		if (vxids == NULL)
2829 			vxids = (VirtualTransactionId *)
2830 				MemoryContextAlloc(TopMemoryContext,
2831 								   sizeof(VirtualTransactionId) *
2832 								   (MaxBackends + max_prepared_xacts + 1));
2833 	}
2834 	else
2835 		vxids = (VirtualTransactionId *)
2836 			palloc0(sizeof(VirtualTransactionId) *
2837 					(MaxBackends + max_prepared_xacts + 1));
2838 
2839 	/* Compute hash code and partition lock, and look up conflicting modes. */
2840 	hashcode = LockTagHashCode(locktag);
2841 	partitionLock = LockHashPartitionLock(hashcode);
2842 	conflictMask = lockMethodTable->conflictTab[lockmode];
2843 
2844 	/*
2845 	 * Fast path locks might not have been entered in the primary lock table.
2846 	 * If the lock we're dealing with could conflict with such a lock, we must
2847 	 * examine each backend's fast-path array for conflicts.
2848 	 */
2849 	if (ConflictsWithRelationFastPath(locktag, lockmode))
2850 	{
2851 		int			i;
2852 		Oid			relid = locktag->locktag_field2;
2853 		VirtualTransactionId vxid;
2854 
2855 		/*
2856 		 * Iterate over relevant PGPROCs.  Anything held by a prepared
2857 		 * transaction will have been transferred to the primary lock table,
2858 		 * so we need not worry about those.  This is all a bit fuzzy, because
2859 		 * new locks could be taken after we've visited a particular
2860 		 * partition, but the callers had better be prepared to deal with that
2861 		 * anyway, since the locks could equally well be taken between the
2862 		 * time we return the value and the time the caller does something
2863 		 * with it.
2864 		 */
2865 		for (i = 0; i < ProcGlobal->allProcCount; i++)
2866 		{
2867 			PGPROC	   *proc = &ProcGlobal->allProcs[i];
2868 			uint32		f;
2869 
2870 			/* A backend never blocks itself */
2871 			if (proc == MyProc)
2872 				continue;
2873 
2874 			LWLockAcquire(&proc->backendLock, LW_SHARED);
2875 
2876 			/*
2877 			 * If the target backend isn't referencing the same database as
2878 			 * the lock, then we needn't examine the individual relation IDs
2879 			 * at all; none of them can be relevant.
2880 			 *
2881 			 * See FastPathTransferLocks() for discussion of why we do this
2882 			 * test after acquiring the lock.
2883 			 */
2884 			if (proc->databaseId != locktag->locktag_field1)
2885 			{
2886 				LWLockRelease(&proc->backendLock);
2887 				continue;
2888 			}
2889 
2890 			for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2891 			{
2892 				uint32		lockmask;
2893 
2894 				/* Look for an allocated slot matching the given relid. */
2895 				if (relid != proc->fpRelId[f])
2896 					continue;
2897 				lockmask = FAST_PATH_GET_BITS(proc, f);
2898 				if (!lockmask)
2899 					continue;
2900 				lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
2901 
2902 				/*
2903 				 * There can only be one entry per relation, so if we found it
2904 				 * and it doesn't conflict, we can skip the rest of the slots.
2905 				 */
2906 				if ((lockmask & conflictMask) == 0)
2907 					break;
2908 
2909 				/* Conflict! */
2910 				GET_VXID_FROM_PGPROC(vxid, *proc);
2911 
2912 				if (VirtualTransactionIdIsValid(vxid))
2913 					vxids[count++] = vxid;
2914 				/* else, xact already committed or aborted */
2915 
2916 				/* No need to examine remaining slots. */
2917 				break;
2918 			}
2919 
2920 			LWLockRelease(&proc->backendLock);
2921 		}
2922 	}
2923 
2924 	/* Remember how many fast-path conflicts we found. */
2925 	fast_count = count;
2926 
2927 	/*
2928 	 * Look up the lock object matching the tag.
2929 	 */
2930 	LWLockAcquire(partitionLock, LW_SHARED);
2931 
2932 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2933 												(const void *) locktag,
2934 												hashcode,
2935 												HASH_FIND,
2936 												NULL);
2937 	if (!lock)
2938 	{
2939 		/*
2940 		 * If the lock object doesn't exist, there is nothing holding a lock
2941 		 * on this lockable object.
2942 		 */
2943 		LWLockRelease(partitionLock);
2944 		vxids[count].backendId = InvalidBackendId;
2945 		vxids[count].localTransactionId = InvalidLocalTransactionId;
2946 		return vxids;
2947 	}
2948 
2949 	/*
2950 	 * Examine each existing holder (or awaiter) of the lock.
2951 	 */
2952 
2953 	procLocks = &(lock->procLocks);
2954 
2955 	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2956 										 offsetof(PROCLOCK, lockLink));
2957 
2958 	while (proclock)
2959 	{
2960 		if (conflictMask & proclock->holdMask)
2961 		{
2962 			PGPROC	   *proc = proclock->tag.myProc;
2963 
2964 			/* A backend never blocks itself */
2965 			if (proc != MyProc)
2966 			{
2967 				VirtualTransactionId vxid;
2968 
2969 				GET_VXID_FROM_PGPROC(vxid, *proc);
2970 
2971 				if (VirtualTransactionIdIsValid(vxid))
2972 				{
2973 					int			i;
2974 
2975 					/* Avoid duplicate entries. */
2976 					for (i = 0; i < fast_count; ++i)
2977 						if (VirtualTransactionIdEquals(vxids[i], vxid))
2978 							break;
2979 					if (i >= fast_count)
2980 						vxids[count++] = vxid;
2981 				}
2982 				/* else, xact already committed or aborted */
2983 			}
2984 		}
2985 
2986 		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
2987 											 offsetof(PROCLOCK, lockLink));
2988 	}
2989 
2990 	LWLockRelease(partitionLock);
2991 
2992 	if (count > MaxBackends + max_prepared_xacts)	/* should never happen */
2993 		elog(PANIC, "too many conflicting locks found");
2994 
2995 	vxids[count].backendId = InvalidBackendId;
2996 	vxids[count].localTransactionId = InvalidLocalTransactionId;
2997 	return vxids;
2998 }
2999 
3000 /*
3001  * Find a lock in the shared lock table and release it.  It is the caller's
3002  * responsibility to verify that this is a sane thing to do.  (For example, it
3003  * would be bad to release a lock here if there might still be a LOCALLOCK
3004  * object with pointers to it.)
3005  *
3006  * We currently use this in two situations: first, to release locks held by
3007  * prepared transactions on commit (see lock_twophase_postcommit); and second,
3008  * to release locks taken via the fast-path, transferred to the main hash
3009  * table, and then released (see LockReleaseAll).
3010  */
3011 static void
LockRefindAndRelease(LockMethod lockMethodTable,PGPROC * proc,LOCKTAG * locktag,LOCKMODE lockmode,bool decrement_strong_lock_count)3012 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
3013 					 LOCKTAG *locktag, LOCKMODE lockmode,
3014 					 bool decrement_strong_lock_count)
3015 {
3016 	LOCK	   *lock;
3017 	PROCLOCK   *proclock;
3018 	PROCLOCKTAG proclocktag;
3019 	uint32		hashcode;
3020 	uint32		proclock_hashcode;
3021 	LWLock	   *partitionLock;
3022 	bool		wakeupNeeded;
3023 
3024 	hashcode = LockTagHashCode(locktag);
3025 	partitionLock = LockHashPartitionLock(hashcode);
3026 
3027 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3028 
3029 	/*
3030 	 * Re-find the lock object (it had better be there).
3031 	 */
3032 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
3033 												(void *) locktag,
3034 												hashcode,
3035 												HASH_FIND,
3036 												NULL);
3037 	if (!lock)
3038 		elog(PANIC, "failed to re-find shared lock object");
3039 
3040 	/*
3041 	 * Re-find the proclock object (ditto).
3042 	 */
3043 	proclocktag.myLock = lock;
3044 	proclocktag.myProc = proc;
3045 
3046 	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3047 
3048 	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
3049 														(void *) &proclocktag,
3050 														proclock_hashcode,
3051 														HASH_FIND,
3052 														NULL);
3053 	if (!proclock)
3054 		elog(PANIC, "failed to re-find shared proclock object");
3055 
3056 	/*
3057 	 * Double-check that we are actually holding a lock of the type we want to
3058 	 * release.
3059 	 */
3060 	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
3061 	{
3062 		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
3063 		LWLockRelease(partitionLock);
3064 		elog(WARNING, "you don't own a lock of type %s",
3065 			 lockMethodTable->lockModeNames[lockmode]);
3066 		return;
3067 	}
3068 
3069 	/*
3070 	 * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
3071 	 */
3072 	wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
3073 
3074 	CleanUpLock(lock, proclock,
3075 				lockMethodTable, hashcode,
3076 				wakeupNeeded);
3077 
3078 	LWLockRelease(partitionLock);
3079 
3080 	/*
3081 	 * Decrement strong lock count.  This logic is needed only for 2PC.
3082 	 */
3083 	if (decrement_strong_lock_count
3084 		&& ConflictsWithRelationFastPath(locktag, lockmode))
3085 	{
3086 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
3087 
3088 		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
3089 		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
3090 		FastPathStrongRelationLocks->count[fasthashcode]--;
3091 		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3092 	}
3093 }
3094 
3095 /*
3096  * CheckForSessionAndXactLocks
3097  *		Check to see if transaction holds both session-level and xact-level
3098  *		locks on the same object; if so, throw an error.
3099  *
3100  * If we have both session- and transaction-level locks on the same object,
3101  * PREPARE TRANSACTION must fail.  This should never happen with regular
3102  * locks, since we only take those at session level in some special operations
3103  * like VACUUM.  It's possible to hit this with advisory locks, though.
3104  *
3105  * It would be nice if we could keep the session hold and give away the
3106  * transactional hold to the prepared xact.  However, that would require two
3107  * PROCLOCK objects, and we cannot be sure that another PROCLOCK will be
3108  * available when it comes time for PostPrepare_Locks to do the deed.
3109  * So for now, we error out while we can still do so safely.
3110  *
3111  * Since the LOCALLOCK table stores a separate entry for each lockmode,
3112  * we can't implement this check by examining LOCALLOCK entries in isolation.
3113  * We must build a transient hashtable that is indexed by locktag only.
3114  */
3115 static void
CheckForSessionAndXactLocks(void)3116 CheckForSessionAndXactLocks(void)
3117 {
3118 	typedef struct
3119 	{
3120 		LOCKTAG		lock;		/* identifies the lockable object */
3121 		bool		sessLock;	/* is any lockmode held at session level? */
3122 		bool		xactLock;	/* is any lockmode held at xact level? */
3123 	} PerLockTagEntry;
3124 
3125 	HASHCTL		hash_ctl;
3126 	HTAB	   *lockhtab;
3127 	HASH_SEQ_STATUS status;
3128 	LOCALLOCK  *locallock;
3129 
3130 	/* Create a local hash table keyed by LOCKTAG only */
3131 	hash_ctl.keysize = sizeof(LOCKTAG);
3132 	hash_ctl.entrysize = sizeof(PerLockTagEntry);
3133 	hash_ctl.hcxt = CurrentMemoryContext;
3134 
3135 	lockhtab = hash_create("CheckForSessionAndXactLocks table",
3136 						   256, /* arbitrary initial size */
3137 						   &hash_ctl,
3138 						   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3139 
3140 	/* Scan local lock table to find entries for each LOCKTAG */
3141 	hash_seq_init(&status, LockMethodLocalHash);
3142 
3143 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3144 	{
3145 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3146 		PerLockTagEntry *hentry;
3147 		bool		found;
3148 		int			i;
3149 
3150 		/*
3151 		 * Ignore VXID locks.  We don't want those to be held by prepared
3152 		 * transactions, since they aren't meaningful after a restart.
3153 		 */
3154 		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3155 			continue;
3156 
3157 		/* Ignore it if we don't actually hold the lock */
3158 		if (locallock->nLocks <= 0)
3159 			continue;
3160 
3161 		/* Otherwise, find or make an entry in lockhtab */
3162 		hentry = (PerLockTagEntry *) hash_search(lockhtab,
3163 												 (void *) &locallock->tag.lock,
3164 												 HASH_ENTER, &found);
3165 		if (!found)				/* initialize, if newly created */
3166 			hentry->sessLock = hentry->xactLock = false;
3167 
3168 		/* Scan to see if we hold lock at session or xact level or both */
3169 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
3170 		{
3171 			if (lockOwners[i].owner == NULL)
3172 				hentry->sessLock = true;
3173 			else
3174 				hentry->xactLock = true;
3175 		}
3176 
3177 		/*
3178 		 * We can throw error immediately when we see both types of locks; no
3179 		 * need to wait around to see if there are more violations.
3180 		 */
3181 		if (hentry->sessLock && hentry->xactLock)
3182 			ereport(ERROR,
3183 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3184 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3185 	}
3186 
3187 	/* Success, so clean up */
3188 	hash_destroy(lockhtab);
3189 }
3190 
3191 /*
3192  * AtPrepare_Locks
3193  *		Do the preparatory work for a PREPARE: make 2PC state file records
3194  *		for all locks currently held.
3195  *
3196  * Session-level locks are ignored, as are VXID locks.
3197  *
3198  * For the most part, we don't need to touch shared memory for this ---
3199  * all the necessary state information is in the locallock table.
3200  * Fast-path locks are an exception, however: we move any such locks to
3201  * the main table before allowing PREPARE TRANSACTION to succeed.
3202  */
3203 void
AtPrepare_Locks(void)3204 AtPrepare_Locks(void)
3205 {
3206 	HASH_SEQ_STATUS status;
3207 	LOCALLOCK  *locallock;
3208 
3209 	/* First, verify there aren't locks of both xact and session level */
3210 	CheckForSessionAndXactLocks();
3211 
3212 	/* Now do the per-locallock cleanup work */
3213 	hash_seq_init(&status, LockMethodLocalHash);
3214 
3215 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3216 	{
3217 		TwoPhaseLockRecord record;
3218 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3219 		bool		haveSessionLock;
3220 		bool		haveXactLock;
3221 		int			i;
3222 
3223 		/*
3224 		 * Ignore VXID locks.  We don't want those to be held by prepared
3225 		 * transactions, since they aren't meaningful after a restart.
3226 		 */
3227 		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3228 			continue;
3229 
3230 		/* Ignore it if we don't actually hold the lock */
3231 		if (locallock->nLocks <= 0)
3232 			continue;
3233 
3234 		/* Scan to see whether we hold it at session or transaction level */
3235 		haveSessionLock = haveXactLock = false;
3236 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
3237 		{
3238 			if (lockOwners[i].owner == NULL)
3239 				haveSessionLock = true;
3240 			else
3241 				haveXactLock = true;
3242 		}
3243 
3244 		/* Ignore it if we have only session lock */
3245 		if (!haveXactLock)
3246 			continue;
3247 
3248 		/* This can't happen, because we already checked it */
3249 		if (haveSessionLock)
3250 			ereport(ERROR,
3251 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3252 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3253 
3254 		/*
3255 		 * If the local lock was taken via the fast-path, we need to move it
3256 		 * to the primary lock table, or just get a pointer to the existing
3257 		 * primary lock table entry if by chance it's already been
3258 		 * transferred.
3259 		 */
3260 		if (locallock->proclock == NULL)
3261 		{
3262 			locallock->proclock = FastPathGetRelationLockEntry(locallock);
3263 			locallock->lock = locallock->proclock->tag.myLock;
3264 		}
3265 
3266 		/*
3267 		 * Arrange to not release any strong lock count held by this lock
3268 		 * entry.  We must retain the count until the prepared transaction is
3269 		 * committed or rolled back.
3270 		 */
3271 		locallock->holdsStrongLockCount = FALSE;
3272 
3273 		/*
3274 		 * Create a 2PC record.
3275 		 */
3276 		memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
3277 		record.lockmode = locallock->tag.mode;
3278 
3279 		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
3280 							   &record, sizeof(TwoPhaseLockRecord));
3281 	}
3282 }
3283 
3284 /*
3285  * PostPrepare_Locks
3286  *		Clean up after successful PREPARE
3287  *
3288  * Here, we want to transfer ownership of our locks to a dummy PGPROC
3289  * that's now associated with the prepared transaction, and we want to
3290  * clean out the corresponding entries in the LOCALLOCK table.
3291  *
3292  * Note: by removing the LOCALLOCK entries, we are leaving dangling
3293  * pointers in the transaction's resource owner.  This is OK at the
3294  * moment since resowner.c doesn't try to free locks retail at a toplevel
3295  * transaction commit or abort.  We could alternatively zero out nLocks
3296  * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
3297  * but that probably costs more cycles.
3298  */
3299 void
PostPrepare_Locks(TransactionId xid)3300 PostPrepare_Locks(TransactionId xid)
3301 {
3302 	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid);
3303 	HASH_SEQ_STATUS status;
3304 	LOCALLOCK  *locallock;
3305 	LOCK	   *lock;
3306 	PROCLOCK   *proclock;
3307 	PROCLOCKTAG proclocktag;
3308 	int			partition;
3309 
3310 	/* Can't prepare a lock group follower. */
3311 	Assert(MyProc->lockGroupLeader == NULL ||
3312 		   MyProc->lockGroupLeader == MyProc);
3313 
3314 	/* This is a critical section: any error means big trouble */
3315 	START_CRIT_SECTION();
3316 
3317 	/*
3318 	 * First we run through the locallock table and get rid of unwanted
3319 	 * entries, then we scan the process's proclocks and transfer them to the
3320 	 * target proc.
3321 	 *
3322 	 * We do this separately because we may have multiple locallock entries
3323 	 * pointing to the same proclock, and we daren't end up with any dangling
3324 	 * pointers.
3325 	 */
3326 	hash_seq_init(&status, LockMethodLocalHash);
3327 
3328 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3329 	{
3330 		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3331 		bool		haveSessionLock;
3332 		bool		haveXactLock;
3333 		int			i;
3334 
3335 		if (locallock->proclock == NULL || locallock->lock == NULL)
3336 		{
3337 			/*
3338 			 * We must've run out of shared memory while trying to set up this
3339 			 * lock.  Just forget the local entry.
3340 			 */
3341 			Assert(locallock->nLocks == 0);
3342 			RemoveLocalLock(locallock);
3343 			continue;
3344 		}
3345 
3346 		/* Ignore VXID locks */
3347 		if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3348 			continue;
3349 
3350 		/* Scan to see whether we hold it at session or transaction level */
3351 		haveSessionLock = haveXactLock = false;
3352 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
3353 		{
3354 			if (lockOwners[i].owner == NULL)
3355 				haveSessionLock = true;
3356 			else
3357 				haveXactLock = true;
3358 		}
3359 
3360 		/* Ignore it if we have only session lock */
3361 		if (!haveXactLock)
3362 			continue;
3363 
3364 		/* This can't happen, because we already checked it */
3365 		if (haveSessionLock)
3366 			ereport(PANIC,
3367 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3368 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3369 
3370 		/* Mark the proclock to show we need to release this lockmode */
3371 		if (locallock->nLocks > 0)
3372 			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
3373 
3374 		/* And remove the locallock hashtable entry */
3375 		RemoveLocalLock(locallock);
3376 	}
3377 
3378 	/*
3379 	 * Now, scan each lock partition separately.
3380 	 */
3381 	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
3382 	{
3383 		LWLock	   *partitionLock;
3384 		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
3385 		PROCLOCK   *nextplock;
3386 
3387 		partitionLock = LockHashPartitionLockByIndex(partition);
3388 
3389 		/*
3390 		 * If the proclock list for this partition is empty, we can skip
3391 		 * acquiring the partition lock.  This optimization is safer than the
3392 		 * situation in LockReleaseAll, because we got rid of any fast-path
3393 		 * locks during AtPrepare_Locks, so there cannot be any case where
3394 		 * another backend is adding something to our lists now.  For safety,
3395 		 * though, we code this the same way as in LockReleaseAll.
3396 		 */
3397 		if (SHMQueueNext(procLocks, procLocks,
3398 						 offsetof(PROCLOCK, procLink)) == NULL)
3399 			continue;			/* needn't examine this partition */
3400 
3401 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3402 
3403 		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3404 											   offsetof(PROCLOCK, procLink));
3405 			 proclock;
3406 			 proclock = nextplock)
3407 		{
3408 			/* Get link first, since we may unlink/relink this proclock */
3409 			nextplock = (PROCLOCK *)
3410 				SHMQueueNext(procLocks, &proclock->procLink,
3411 							 offsetof(PROCLOCK, procLink));
3412 
3413 			Assert(proclock->tag.myProc == MyProc);
3414 
3415 			lock = proclock->tag.myLock;
3416 
3417 			/* Ignore VXID locks */
3418 			if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3419 				continue;
3420 
3421 			PROCLOCK_PRINT("PostPrepare_Locks", proclock);
3422 			LOCK_PRINT("PostPrepare_Locks", lock, 0);
3423 			Assert(lock->nRequested >= 0);
3424 			Assert(lock->nGranted >= 0);
3425 			Assert(lock->nGranted <= lock->nRequested);
3426 			Assert((proclock->holdMask & ~lock->grantMask) == 0);
3427 
3428 			/* Ignore it if nothing to release (must be a session lock) */
3429 			if (proclock->releaseMask == 0)
3430 				continue;
3431 
3432 			/* Else we should be releasing all locks */
3433 			if (proclock->releaseMask != proclock->holdMask)
3434 				elog(PANIC, "we seem to have dropped a bit somewhere");
3435 
3436 			/*
3437 			 * We cannot simply modify proclock->tag.myProc to reassign
3438 			 * ownership of the lock, because that's part of the hash key and
3439 			 * the proclock would then be in the wrong hash chain.  Instead
3440 			 * use hash_update_hash_key.  (We used to create a new hash entry,
3441 			 * but that risks out-of-memory failure if other processes are
3442 			 * busy making proclocks too.)	We must unlink the proclock from
3443 			 * our procLink chain and put it into the new proc's chain, too.
3444 			 *
3445 			 * Note: the updated proclock hash key will still belong to the
3446 			 * same hash partition, cf proclock_hash().  So the partition lock
3447 			 * we already hold is sufficient for this.
3448 			 */
3449 			SHMQueueDelete(&proclock->procLink);
3450 
3451 			/*
3452 			 * Create the new hash key for the proclock.
3453 			 */
3454 			proclocktag.myLock = lock;
3455 			proclocktag.myProc = newproc;
3456 
3457 			/*
3458 			 * Update groupLeader pointer to point to the new proc.  (We'd
3459 			 * better not be a member of somebody else's lock group!)
3460 			 */
3461 			Assert(proclock->groupLeader == proclock->tag.myProc);
3462 			proclock->groupLeader = newproc;
3463 
3464 			/*
3465 			 * Update the proclock.  We should not find any existing entry for
3466 			 * the same hash key, since there can be only one entry for any
3467 			 * given lock with my own proc.
3468 			 */
3469 			if (!hash_update_hash_key(LockMethodProcLockHash,
3470 									  (void *) proclock,
3471 									  (void *) &proclocktag))
3472 				elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3473 
3474 			/* Re-link into the new proc's proclock list */
3475 			SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
3476 								 &proclock->procLink);
3477 
3478 			PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
3479 		}						/* loop over PROCLOCKs within this partition */
3480 
3481 		LWLockRelease(partitionLock);
3482 	}							/* loop over partitions */
3483 
3484 	END_CRIT_SECTION();
3485 }
3486 
3487 
3488 /*
3489  * Estimate shared-memory space used for lock tables
3490  */
3491 Size
LockShmemSize(void)3492 LockShmemSize(void)
3493 {
3494 	Size		size = 0;
3495 	long		max_table_size;
3496 
3497 	/* lock hash table */
3498 	max_table_size = NLOCKENTS();
3499 	size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
3500 
3501 	/* proclock hash table */
3502 	max_table_size *= 2;
3503 	size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
3504 
3505 	/*
3506 	 * Since NLOCKENTS is only an estimate, add 10% safety margin.
3507 	 */
3508 	size = add_size(size, size / 10);
3509 
3510 	return size;
3511 }
3512 
3513 /*
3514  * GetLockStatusData - Return a summary of the lock manager's internal
3515  * status, for use in a user-level reporting function.
3516  *
3517  * The return data consists of an array of LockInstanceData objects,
3518  * which are a lightly abstracted version of the PROCLOCK data structures,
3519  * i.e. there is one entry for each unique lock and interested PGPROC.
3520  * It is the caller's responsibility to match up related items (such as
3521  * references to the same lockable object or PGPROC) if wanted.
3522  *
3523  * The design goal is to hold the LWLocks for as short a time as possible;
3524  * thus, this function simply makes a copy of the necessary data and releases
3525  * the locks, allowing the caller to contemplate and format the data for as
3526  * long as it pleases.
3527  */
3528 LockData *
GetLockStatusData(void)3529 GetLockStatusData(void)
3530 {
3531 	LockData   *data;
3532 	PROCLOCK   *proclock;
3533 	HASH_SEQ_STATUS seqstat;
3534 	int			els;
3535 	int			el;
3536 	int			i;
3537 
3538 	data = (LockData *) palloc(sizeof(LockData));
3539 
3540 	/* Guess how much space we'll need. */
3541 	els = MaxBackends;
3542 	el = 0;
3543 	data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
3544 
3545 	/*
3546 	 * First, we iterate through the per-backend fast-path arrays, locking
3547 	 * them one at a time.  This might produce an inconsistent picture of the
3548 	 * system state, but taking all of those LWLocks at the same time seems
3549 	 * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
3550 	 * matter too much, because none of these locks can be involved in lock
3551 	 * conflicts anyway - anything that might must be present in the main lock
3552 	 * table.  (For the same reason, we don't sweat about making leaderPid
3553 	 * completely valid.  We cannot safely dereference another backend's
3554 	 * lockGroupLeader field without holding all lock partition locks, and
3555 	 * it's not worth that.)
3556 	 */
3557 	for (i = 0; i < ProcGlobal->allProcCount; ++i)
3558 	{
3559 		PGPROC	   *proc = &ProcGlobal->allProcs[i];
3560 		uint32		f;
3561 
3562 		LWLockAcquire(&proc->backendLock, LW_SHARED);
3563 
3564 		for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
3565 		{
3566 			LockInstanceData *instance;
3567 			uint32		lockbits = FAST_PATH_GET_BITS(proc, f);
3568 
3569 			/* Skip unallocated slots. */
3570 			if (!lockbits)
3571 				continue;
3572 
3573 			if (el >= els)
3574 			{
3575 				els += MaxBackends;
3576 				data->locks = (LockInstanceData *)
3577 					repalloc(data->locks, sizeof(LockInstanceData) * els);
3578 			}
3579 
3580 			instance = &data->locks[el];
3581 			SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
3582 								 proc->fpRelId[f]);
3583 			instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
3584 			instance->waitLockMode = NoLock;
3585 			instance->backend = proc->backendId;
3586 			instance->lxid = proc->lxid;
3587 			instance->pid = proc->pid;
3588 			instance->leaderPid = proc->pid;
3589 			instance->fastpath = true;
3590 
3591 			el++;
3592 		}
3593 
3594 		if (proc->fpVXIDLock)
3595 		{
3596 			VirtualTransactionId vxid;
3597 			LockInstanceData *instance;
3598 
3599 			if (el >= els)
3600 			{
3601 				els += MaxBackends;
3602 				data->locks = (LockInstanceData *)
3603 					repalloc(data->locks, sizeof(LockInstanceData) * els);
3604 			}
3605 
3606 			vxid.backendId = proc->backendId;
3607 			vxid.localTransactionId = proc->fpLocalTransactionId;
3608 
3609 			instance = &data->locks[el];
3610 			SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
3611 			instance->holdMask = LOCKBIT_ON(ExclusiveLock);
3612 			instance->waitLockMode = NoLock;
3613 			instance->backend = proc->backendId;
3614 			instance->lxid = proc->lxid;
3615 			instance->pid = proc->pid;
3616 			instance->leaderPid = proc->pid;
3617 			instance->fastpath = true;
3618 
3619 			el++;
3620 		}
3621 
3622 		LWLockRelease(&proc->backendLock);
3623 	}
3624 
3625 	/*
3626 	 * Next, acquire lock on the entire shared lock data structure.  We do
3627 	 * this so that, at least for locks in the primary lock table, the state
3628 	 * will be self-consistent.
3629 	 *
3630 	 * Since this is a read-only operation, we take shared instead of
3631 	 * exclusive lock.  There's not a whole lot of point to this, because all
3632 	 * the normal operations require exclusive lock, but it doesn't hurt
3633 	 * anything either. It will at least allow two backends to do
3634 	 * GetLockStatusData in parallel.
3635 	 *
3636 	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3637 	 */
3638 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3639 		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3640 
3641 	/* Now we can safely count the number of proclocks */
3642 	data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
3643 	if (data->nelements > els)
3644 	{
3645 		els = data->nelements;
3646 		data->locks = (LockInstanceData *)
3647 			repalloc(data->locks, sizeof(LockInstanceData) * els);
3648 	}
3649 
3650 	/* Now scan the tables to copy the data */
3651 	hash_seq_init(&seqstat, LockMethodProcLockHash);
3652 
3653 	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3654 	{
3655 		PGPROC	   *proc = proclock->tag.myProc;
3656 		LOCK	   *lock = proclock->tag.myLock;
3657 		LockInstanceData *instance = &data->locks[el];
3658 
3659 		memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3660 		instance->holdMask = proclock->holdMask;
3661 		if (proc->waitLock == proclock->tag.myLock)
3662 			instance->waitLockMode = proc->waitLockMode;
3663 		else
3664 			instance->waitLockMode = NoLock;
3665 		instance->backend = proc->backendId;
3666 		instance->lxid = proc->lxid;
3667 		instance->pid = proc->pid;
3668 		instance->leaderPid = proclock->groupLeader->pid;
3669 		instance->fastpath = false;
3670 
3671 		el++;
3672 	}
3673 
3674 	/*
3675 	 * And release locks.  We do this in reverse order for two reasons: (1)
3676 	 * Anyone else who needs more than one of the locks will be trying to lock
3677 	 * them in increasing order; we don't want to release the other process
3678 	 * until it can get all the locks it needs. (2) This avoids O(N^2)
3679 	 * behavior inside LWLockRelease.
3680 	 */
3681 	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3682 		LWLockRelease(LockHashPartitionLockByIndex(i));
3683 
3684 	Assert(el == data->nelements);
3685 
3686 	return data;
3687 }
3688 
3689 /*
3690  * GetBlockerStatusData - Return a summary of the lock manager's state
3691  * concerning locks that are blocking the specified PID or any member of
3692  * the PID's lock group, for use in a user-level reporting function.
3693  *
3694  * For each PID within the lock group that is awaiting some heavyweight lock,
3695  * the return data includes an array of LockInstanceData objects, which are
3696  * the same data structure used by GetLockStatusData; but unlike that function,
3697  * this one reports only the PROCLOCKs associated with the lock that that PID
3698  * is blocked on.  (Hence, all the locktags should be the same for any one
3699  * blocked PID.)  In addition, we return an array of the PIDs of those backends
3700  * that are ahead of the blocked PID in the lock's wait queue.  These can be
3701  * compared with the PIDs in the LockInstanceData objects to determine which
3702  * waiters are ahead of or behind the blocked PID in the queue.
3703  *
3704  * If blocked_pid isn't a valid backend PID or nothing in its lock group is
3705  * waiting on any heavyweight lock, return empty arrays.
3706  *
3707  * The design goal is to hold the LWLocks for as short a time as possible;
3708  * thus, this function simply makes a copy of the necessary data and releases
3709  * the locks, allowing the caller to contemplate and format the data for as
3710  * long as it pleases.
3711  */
3712 BlockedProcsData *
GetBlockerStatusData(int blocked_pid)3713 GetBlockerStatusData(int blocked_pid)
3714 {
3715 	BlockedProcsData *data;
3716 	PGPROC	   *proc;
3717 	int			i;
3718 
3719 	data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData));
3720 
3721 	/*
3722 	 * Guess how much space we'll need, and preallocate.  Most of the time
3723 	 * this will avoid needing to do repalloc while holding the LWLocks.  (We
3724 	 * assume, but check with an Assert, that MaxBackends is enough entries
3725 	 * for the procs[] array; the other two could need enlargement, though.)
3726 	 */
3727 	data->nprocs = data->nlocks = data->npids = 0;
3728 	data->maxprocs = data->maxlocks = data->maxpids = MaxBackends;
3729 	data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs);
3730 	data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks);
3731 	data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids);
3732 
3733 	/*
3734 	 * In order to search the ProcArray for blocked_pid and assume that that
3735 	 * entry won't immediately disappear under us, we must hold ProcArrayLock.
3736 	 * In addition, to examine the lock grouping fields of any other backend,
3737 	 * we must hold all the hash partition locks.  (Only one of those locks is
3738 	 * actually relevant for any one lock group, but we can't know which one
3739 	 * ahead of time.)	It's fairly annoying to hold all those locks
3740 	 * throughout this, but it's no worse than GetLockStatusData(), and it
3741 	 * does have the advantage that we're guaranteed to return a
3742 	 * self-consistent instantaneous state.
3743 	 */
3744 	LWLockAcquire(ProcArrayLock, LW_SHARED);
3745 
3746 	proc = BackendPidGetProcWithLock(blocked_pid);
3747 
3748 	/* Nothing to do if it's gone */
3749 	if (proc != NULL)
3750 	{
3751 		/*
3752 		 * Acquire lock on the entire shared lock data structure.  See notes
3753 		 * in GetLockStatusData().
3754 		 */
3755 		for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3756 			LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3757 
3758 		if (proc->lockGroupLeader == NULL)
3759 		{
3760 			/* Easy case, proc is not a lock group member */
3761 			GetSingleProcBlockerStatusData(proc, data);
3762 		}
3763 		else
3764 		{
3765 			/* Examine all procs in proc's lock group */
3766 			dlist_iter	iter;
3767 
3768 			dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers)
3769 			{
3770 				PGPROC	   *memberProc;
3771 
3772 				memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);
3773 				GetSingleProcBlockerStatusData(memberProc, data);
3774 			}
3775 		}
3776 
3777 		/*
3778 		 * And release locks.  See notes in GetLockStatusData().
3779 		 */
3780 		for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3781 			LWLockRelease(LockHashPartitionLockByIndex(i));
3782 
3783 		Assert(data->nprocs <= data->maxprocs);
3784 	}
3785 
3786 	LWLockRelease(ProcArrayLock);
3787 
3788 	return data;
3789 }
3790 
3791 /* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */
3792 static void
GetSingleProcBlockerStatusData(PGPROC * blocked_proc,BlockedProcsData * data)3793 GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
3794 {
3795 	LOCK	   *theLock = blocked_proc->waitLock;
3796 	BlockedProcData *bproc;
3797 	SHM_QUEUE  *procLocks;
3798 	PROCLOCK   *proclock;
3799 	PROC_QUEUE *waitQueue;
3800 	PGPROC	   *proc;
3801 	int			queue_size;
3802 	int			i;
3803 
3804 	/* Nothing to do if this proc is not blocked */
3805 	if (theLock == NULL)
3806 		return;
3807 
3808 	/* Set up a procs[] element */
3809 	bproc = &data->procs[data->nprocs++];
3810 	bproc->pid = blocked_proc->pid;
3811 	bproc->first_lock = data->nlocks;
3812 	bproc->first_waiter = data->npids;
3813 
3814 	/*
3815 	 * We may ignore the proc's fast-path arrays, since nothing in those could
3816 	 * be related to a contended lock.
3817 	 */
3818 
3819 	/* Collect all PROCLOCKs associated with theLock */
3820 	procLocks = &(theLock->procLocks);
3821 	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3822 										 offsetof(PROCLOCK, lockLink));
3823 	while (proclock)
3824 	{
3825 		PGPROC	   *proc = proclock->tag.myProc;
3826 		LOCK	   *lock = proclock->tag.myLock;
3827 		LockInstanceData *instance;
3828 
3829 		if (data->nlocks >= data->maxlocks)
3830 		{
3831 			data->maxlocks += MaxBackends;
3832 			data->locks = (LockInstanceData *)
3833 				repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks);
3834 		}
3835 
3836 		instance = &data->locks[data->nlocks];
3837 		memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3838 		instance->holdMask = proclock->holdMask;
3839 		if (proc->waitLock == lock)
3840 			instance->waitLockMode = proc->waitLockMode;
3841 		else
3842 			instance->waitLockMode = NoLock;
3843 		instance->backend = proc->backendId;
3844 		instance->lxid = proc->lxid;
3845 		instance->pid = proc->pid;
3846 		instance->leaderPid = proclock->groupLeader->pid;
3847 		instance->fastpath = false;
3848 		data->nlocks++;
3849 
3850 		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3851 											 offsetof(PROCLOCK, lockLink));
3852 	}
3853 
3854 	/* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
3855 	waitQueue = &(theLock->waitProcs);
3856 	queue_size = waitQueue->size;
3857 
3858 	if (queue_size > data->maxpids - data->npids)
3859 	{
3860 		data->maxpids = Max(data->maxpids + MaxBackends,
3861 							data->npids + queue_size);
3862 		data->waiter_pids = (int *) repalloc(data->waiter_pids,
3863 											 sizeof(int) * data->maxpids);
3864 	}
3865 
3866 	/* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
3867 	proc = (PGPROC *) waitQueue->links.next;
3868 	for (i = 0; i < queue_size; i++)
3869 	{
3870 		if (proc == blocked_proc)
3871 			break;
3872 		data->waiter_pids[data->npids++] = proc->pid;
3873 		proc = (PGPROC *) proc->links.next;
3874 	}
3875 
3876 	bproc->num_locks = data->nlocks - bproc->first_lock;
3877 	bproc->num_waiters = data->npids - bproc->first_waiter;
3878 }
3879 
3880 /*
3881  * Returns a list of currently held AccessExclusiveLocks, for use by
3882  * LogStandbySnapshot().  The result is a palloc'd array,
3883  * with the number of elements returned into *nlocks.
3884  *
3885  * XXX This currently takes a lock on all partitions of the lock table,
3886  * but it's possible to do better.  By reference counting locks and storing
3887  * the value in the ProcArray entry for each backend we could tell if any
3888  * locks need recording without having to acquire the partition locks and
3889  * scan the lock table.  Whether that's worth the additional overhead
3890  * is pretty dubious though.
3891  */
3892 xl_standby_lock *
GetRunningTransactionLocks(int * nlocks)3893 GetRunningTransactionLocks(int *nlocks)
3894 {
3895 	xl_standby_lock *accessExclusiveLocks;
3896 	PROCLOCK   *proclock;
3897 	HASH_SEQ_STATUS seqstat;
3898 	int			i;
3899 	int			index;
3900 	int			els;
3901 
3902 	/*
3903 	 * Acquire lock on the entire shared lock data structure.
3904 	 *
3905 	 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3906 	 */
3907 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3908 		LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3909 
3910 	/* Now we can safely count the number of proclocks */
3911 	els = hash_get_num_entries(LockMethodProcLockHash);
3912 
3913 	/*
3914 	 * Allocating enough space for all locks in the lock table is overkill,
3915 	 * but it's more convenient and faster than having to enlarge the array.
3916 	 */
3917 	accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
3918 
3919 	/* Now scan the tables to copy the data */
3920 	hash_seq_init(&seqstat, LockMethodProcLockHash);
3921 
3922 	/*
3923 	 * If lock is a currently granted AccessExclusiveLock then it will have
3924 	 * just one proclock holder, so locks are never accessed twice in this
3925 	 * particular case. Don't copy this code for use elsewhere because in the
3926 	 * general case this will give you duplicate locks when looking at
3927 	 * non-exclusive lock types.
3928 	 */
3929 	index = 0;
3930 	while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3931 	{
3932 		/* make sure this definition matches the one used in LockAcquire */
3933 		if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
3934 			proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
3935 		{
3936 			PGPROC	   *proc = proclock->tag.myProc;
3937 			PGXACT	   *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
3938 			LOCK	   *lock = proclock->tag.myLock;
3939 			TransactionId xid = pgxact->xid;
3940 
3941 			/*
3942 			 * Don't record locks for transactions if we know they have
3943 			 * already issued their WAL record for commit but not yet released
3944 			 * lock. It is still possible that we see locks held by already
3945 			 * complete transactions, if they haven't yet zeroed their xids.
3946 			 */
3947 			if (!TransactionIdIsValid(xid))
3948 				continue;
3949 
3950 			accessExclusiveLocks[index].xid = xid;
3951 			accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3952 			accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
3953 
3954 			index++;
3955 		}
3956 	}
3957 
3958 	Assert(index <= els);
3959 
3960 	/*
3961 	 * And release locks.  We do this in reverse order for two reasons: (1)
3962 	 * Anyone else who needs more than one of the locks will be trying to lock
3963 	 * them in increasing order; we don't want to release the other process
3964 	 * until it can get all the locks it needs. (2) This avoids O(N^2)
3965 	 * behavior inside LWLockRelease.
3966 	 */
3967 	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3968 		LWLockRelease(LockHashPartitionLockByIndex(i));
3969 
3970 	*nlocks = index;
3971 	return accessExclusiveLocks;
3972 }
3973 
3974 /* Provide the textual name of any lock mode */
3975 const char *
GetLockmodeName(LOCKMETHODID lockmethodid,LOCKMODE mode)3976 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
3977 {
3978 	Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
3979 	Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
3980 	return LockMethods[lockmethodid]->lockModeNames[mode];
3981 }
3982 
3983 #ifdef LOCK_DEBUG
3984 /*
3985  * Dump all locks in the given proc's myProcLocks lists.
3986  *
3987  * Caller is responsible for having acquired appropriate LWLocks.
3988  */
3989 void
DumpLocks(PGPROC * proc)3990 DumpLocks(PGPROC *proc)
3991 {
3992 	SHM_QUEUE  *procLocks;
3993 	PROCLOCK   *proclock;
3994 	LOCK	   *lock;
3995 	int			i;
3996 
3997 	if (proc == NULL)
3998 		return;
3999 
4000 	if (proc->waitLock)
4001 		LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
4002 
4003 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
4004 	{
4005 		procLocks = &(proc->myProcLocks[i]);
4006 
4007 		proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
4008 											 offsetof(PROCLOCK, procLink));
4009 
4010 		while (proclock)
4011 		{
4012 			Assert(proclock->tag.myProc == proc);
4013 
4014 			lock = proclock->tag.myLock;
4015 
4016 			PROCLOCK_PRINT("DumpLocks", proclock);
4017 			LOCK_PRINT("DumpLocks", lock, 0);
4018 
4019 			proclock = (PROCLOCK *)
4020 				SHMQueueNext(procLocks, &proclock->procLink,
4021 							 offsetof(PROCLOCK, procLink));
4022 		}
4023 	}
4024 }
4025 
4026 /*
4027  * Dump all lmgr locks.
4028  *
4029  * Caller is responsible for having acquired appropriate LWLocks.
4030  */
4031 void
DumpAllLocks(void)4032 DumpAllLocks(void)
4033 {
4034 	PGPROC	   *proc;
4035 	PROCLOCK   *proclock;
4036 	LOCK	   *lock;
4037 	HASH_SEQ_STATUS status;
4038 
4039 	proc = MyProc;
4040 
4041 	if (proc && proc->waitLock)
4042 		LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
4043 
4044 	hash_seq_init(&status, LockMethodProcLockHash);
4045 
4046 	while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
4047 	{
4048 		PROCLOCK_PRINT("DumpAllLocks", proclock);
4049 
4050 		lock = proclock->tag.myLock;
4051 		if (lock)
4052 			LOCK_PRINT("DumpAllLocks", lock, 0);
4053 		else
4054 			elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
4055 	}
4056 }
4057 #endif   /* LOCK_DEBUG */
4058 
4059 /*
4060  * LOCK 2PC resource manager's routines
4061  */
4062 
4063 /*
4064  * Re-acquire a lock belonging to a transaction that was prepared.
4065  *
4066  * Because this function is run at db startup, re-acquiring the locks should
4067  * never conflict with running transactions because there are none.  We
4068  * assume that the lock state represented by the stored 2PC files is legal.
4069  *
4070  * When switching from Hot Standby mode to normal operation, the locks will
4071  * be already held by the startup process. The locks are acquired for the new
4072  * procs without checking for conflicts, so we don't get a conflict between the
4073  * startup process and the dummy procs, even though we will momentarily have
4074  * a situation where two procs are holding the same AccessExclusiveLock,
4075  * which isn't normally possible because the conflict. If we're in standby
4076  * mode, but a recovery snapshot hasn't been established yet, it's possible
4077  * that some but not all of the locks are already held by the startup process.
4078  *
4079  * This approach is simple, but also a bit dangerous, because if there isn't
4080  * enough shared memory to acquire the locks, an error will be thrown, which
4081  * is promoted to FATAL and recovery will abort, bringing down postmaster.
4082  * A safer approach would be to transfer the locks like we do in
4083  * AtPrepare_Locks, but then again, in hot standby mode it's possible for
4084  * read-only backends to use up all the shared lock memory anyway, so that
4085  * replaying the WAL record that needs to acquire a lock will throw an error
4086  * and PANIC anyway.
4087  */
4088 void
lock_twophase_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4089 lock_twophase_recover(TransactionId xid, uint16 info,
4090 					  void *recdata, uint32 len)
4091 {
4092 	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4093 	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
4094 	LOCKTAG    *locktag;
4095 	LOCKMODE	lockmode;
4096 	LOCKMETHODID lockmethodid;
4097 	LOCK	   *lock;
4098 	PROCLOCK   *proclock;
4099 	PROCLOCKTAG proclocktag;
4100 	bool		found;
4101 	uint32		hashcode;
4102 	uint32		proclock_hashcode;
4103 	int			partition;
4104 	LWLock	   *partitionLock;
4105 	LockMethod	lockMethodTable;
4106 
4107 	Assert(len == sizeof(TwoPhaseLockRecord));
4108 	locktag = &rec->locktag;
4109 	lockmode = rec->lockmode;
4110 	lockmethodid = locktag->locktag_lockmethodid;
4111 
4112 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4113 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4114 	lockMethodTable = LockMethods[lockmethodid];
4115 
4116 	hashcode = LockTagHashCode(locktag);
4117 	partition = LockHashPartition(hashcode);
4118 	partitionLock = LockHashPartitionLock(hashcode);
4119 
4120 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4121 
4122 	/*
4123 	 * Find or create a lock with this tag.
4124 	 */
4125 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4126 												(void *) locktag,
4127 												hashcode,
4128 												HASH_ENTER_NULL,
4129 												&found);
4130 	if (!lock)
4131 	{
4132 		LWLockRelease(partitionLock);
4133 		ereport(ERROR,
4134 				(errcode(ERRCODE_OUT_OF_MEMORY),
4135 				 errmsg("out of shared memory"),
4136 		  errhint("You might need to increase max_locks_per_transaction.")));
4137 	}
4138 
4139 	/*
4140 	 * if it's a new lock object, initialize it
4141 	 */
4142 	if (!found)
4143 	{
4144 		lock->grantMask = 0;
4145 		lock->waitMask = 0;
4146 		SHMQueueInit(&(lock->procLocks));
4147 		ProcQueueInit(&(lock->waitProcs));
4148 		lock->nRequested = 0;
4149 		lock->nGranted = 0;
4150 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
4151 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
4152 		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
4153 	}
4154 	else
4155 	{
4156 		LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
4157 		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
4158 		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
4159 		Assert(lock->nGranted <= lock->nRequested);
4160 	}
4161 
4162 	/*
4163 	 * Create the hash key for the proclock table.
4164 	 */
4165 	proclocktag.myLock = lock;
4166 	proclocktag.myProc = proc;
4167 
4168 	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
4169 
4170 	/*
4171 	 * Find or create a proclock entry with this tag
4172 	 */
4173 	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
4174 														(void *) &proclocktag,
4175 														proclock_hashcode,
4176 														HASH_ENTER_NULL,
4177 														&found);
4178 	if (!proclock)
4179 	{
4180 		/* Ooops, not enough shmem for the proclock */
4181 		if (lock->nRequested == 0)
4182 		{
4183 			/*
4184 			 * There are no other requestors of this lock, so garbage-collect
4185 			 * the lock object.  We *must* do this to avoid a permanent leak
4186 			 * of shared memory, because there won't be anything to cause
4187 			 * anyone to release the lock object later.
4188 			 */
4189 			Assert(SHMQueueEmpty(&(lock->procLocks)));
4190 			if (!hash_search_with_hash_value(LockMethodLockHash,
4191 											 (void *) &(lock->tag),
4192 											 hashcode,
4193 											 HASH_REMOVE,
4194 											 NULL))
4195 				elog(PANIC, "lock table corrupted");
4196 		}
4197 		LWLockRelease(partitionLock);
4198 		ereport(ERROR,
4199 				(errcode(ERRCODE_OUT_OF_MEMORY),
4200 				 errmsg("out of shared memory"),
4201 		  errhint("You might need to increase max_locks_per_transaction.")));
4202 	}
4203 
4204 	/*
4205 	 * If new, initialize the new entry
4206 	 */
4207 	if (!found)
4208 	{
4209 		Assert(proc->lockGroupLeader == NULL);
4210 		proclock->groupLeader = proc;
4211 		proclock->holdMask = 0;
4212 		proclock->releaseMask = 0;
4213 		/* Add proclock to appropriate lists */
4214 		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
4215 		SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
4216 							 &proclock->procLink);
4217 		PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
4218 	}
4219 	else
4220 	{
4221 		PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
4222 		Assert((proclock->holdMask & ~lock->grantMask) == 0);
4223 	}
4224 
4225 	/*
4226 	 * lock->nRequested and lock->requested[] count the total number of
4227 	 * requests, whether granted or waiting, so increment those immediately.
4228 	 */
4229 	lock->nRequested++;
4230 	lock->requested[lockmode]++;
4231 	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
4232 
4233 	/*
4234 	 * We shouldn't already hold the desired lock.
4235 	 */
4236 	if (proclock->holdMask & LOCKBIT_ON(lockmode))
4237 		elog(ERROR, "lock %s on object %u/%u/%u is already held",
4238 			 lockMethodTable->lockModeNames[lockmode],
4239 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
4240 			 lock->tag.locktag_field3);
4241 
4242 	/*
4243 	 * We ignore any possible conflicts and just grant ourselves the lock. Not
4244 	 * only because we don't bother, but also to avoid deadlocks when
4245 	 * switching from standby to normal mode. See function comment.
4246 	 */
4247 	GrantLock(lock, proclock, lockmode);
4248 
4249 	/*
4250 	 * Bump strong lock count, to make sure any fast-path lock requests won't
4251 	 * be granted without consulting the primary lock table.
4252 	 */
4253 	if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
4254 	{
4255 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
4256 
4257 		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
4258 		FastPathStrongRelationLocks->count[fasthashcode]++;
4259 		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
4260 	}
4261 
4262 	LWLockRelease(partitionLock);
4263 }
4264 
4265 /*
4266  * Re-acquire a lock belonging to a transaction that was prepared, when
4267  * starting up into hot standby mode.
4268  */
4269 void
lock_twophase_standby_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4270 lock_twophase_standby_recover(TransactionId xid, uint16 info,
4271 							  void *recdata, uint32 len)
4272 {
4273 	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4274 	LOCKTAG    *locktag;
4275 	LOCKMODE	lockmode;
4276 	LOCKMETHODID lockmethodid;
4277 
4278 	Assert(len == sizeof(TwoPhaseLockRecord));
4279 	locktag = &rec->locktag;
4280 	lockmode = rec->lockmode;
4281 	lockmethodid = locktag->locktag_lockmethodid;
4282 
4283 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4284 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4285 
4286 	if (lockmode == AccessExclusiveLock &&
4287 		locktag->locktag_type == LOCKTAG_RELATION)
4288 	{
4289 		StandbyAcquireAccessExclusiveLock(xid,
4290 										locktag->locktag_field1 /* dboid */ ,
4291 									  locktag->locktag_field2 /* reloid */ );
4292 	}
4293 }
4294 
4295 
4296 /*
4297  * 2PC processing routine for COMMIT PREPARED case.
4298  *
4299  * Find and release the lock indicated by the 2PC record.
4300  */
4301 void
lock_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)4302 lock_twophase_postcommit(TransactionId xid, uint16 info,
4303 						 void *recdata, uint32 len)
4304 {
4305 	TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4306 	PGPROC	   *proc = TwoPhaseGetDummyProc(xid);
4307 	LOCKTAG    *locktag;
4308 	LOCKMETHODID lockmethodid;
4309 	LockMethod	lockMethodTable;
4310 
4311 	Assert(len == sizeof(TwoPhaseLockRecord));
4312 	locktag = &rec->locktag;
4313 	lockmethodid = locktag->locktag_lockmethodid;
4314 
4315 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4316 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4317 	lockMethodTable = LockMethods[lockmethodid];
4318 
4319 	LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
4320 }
4321 
4322 /*
4323  * 2PC processing routine for ROLLBACK PREPARED case.
4324  *
4325  * This is actually just the same as the COMMIT case.
4326  */
4327 void
lock_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)4328 lock_twophase_postabort(TransactionId xid, uint16 info,
4329 						void *recdata, uint32 len)
4330 {
4331 	lock_twophase_postcommit(xid, info, recdata, len);
4332 }
4333 
4334 /*
4335  *		VirtualXactLockTableInsert
4336  *
4337  *		Take vxid lock via the fast-path.  There can't be any pre-existing
4338  *		lockers, as we haven't advertised this vxid via the ProcArray yet.
4339  *
4340  *		Since MyProc->fpLocalTransactionId will normally contain the same data
4341  *		as MyProc->lxid, you might wonder if we really need both.  The
4342  *		difference is that MyProc->lxid is set and cleared unlocked, and
4343  *		examined by procarray.c, while fpLocalTransactionId is protected by
4344  *		backendLock and is used only by the locking subsystem.  Doing it this
4345  *		way makes it easier to verify that there are no funny race conditions.
4346  *
4347  *		We don't bother recording this lock in the local lock table, since it's
4348  *		only ever released at the end of a transaction.  Instead,
4349  *		LockReleaseAll() calls VirtualXactLockTableCleanup().
4350  */
4351 void
VirtualXactLockTableInsert(VirtualTransactionId vxid)4352 VirtualXactLockTableInsert(VirtualTransactionId vxid)
4353 {
4354 	Assert(VirtualTransactionIdIsValid(vxid));
4355 
4356 	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4357 
4358 	Assert(MyProc->backendId == vxid.backendId);
4359 	Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
4360 	Assert(MyProc->fpVXIDLock == false);
4361 
4362 	MyProc->fpVXIDLock = true;
4363 	MyProc->fpLocalTransactionId = vxid.localTransactionId;
4364 
4365 	LWLockRelease(&MyProc->backendLock);
4366 }
4367 
4368 /*
4369  *		VirtualXactLockTableCleanup
4370  *
4371  *		Check whether a VXID lock has been materialized; if so, release it,
4372  *		unblocking waiters.
4373  */
4374 void
VirtualXactLockTableCleanup(void)4375 VirtualXactLockTableCleanup(void)
4376 {
4377 	bool		fastpath;
4378 	LocalTransactionId lxid;
4379 
4380 	Assert(MyProc->backendId != InvalidBackendId);
4381 
4382 	/*
4383 	 * Clean up shared memory state.
4384 	 */
4385 	LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4386 
4387 	fastpath = MyProc->fpVXIDLock;
4388 	lxid = MyProc->fpLocalTransactionId;
4389 	MyProc->fpVXIDLock = false;
4390 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
4391 
4392 	LWLockRelease(&MyProc->backendLock);
4393 
4394 	/*
4395 	 * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
4396 	 * that means someone transferred the lock to the main lock table.
4397 	 */
4398 	if (!fastpath && LocalTransactionIdIsValid(lxid))
4399 	{
4400 		VirtualTransactionId vxid;
4401 		LOCKTAG		locktag;
4402 
4403 		vxid.backendId = MyBackendId;
4404 		vxid.localTransactionId = lxid;
4405 		SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
4406 
4407 		LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
4408 							 &locktag, ExclusiveLock, false);
4409 	}
4410 }
4411 
4412 /*
4413  *		XactLockForVirtualXact
4414  *
4415  * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
4416  * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid).  Unlike those
4417  * functions, it assumes "xid" is never a subtransaction and that "xid" is
4418  * prepared, committed, or aborted.
4419  *
4420  * If !TransactionIdIsValid(xid), this locks every prepared XID having been
4421  * known as "vxid" before its PREPARE TRANSACTION.
4422  */
4423 static bool
XactLockForVirtualXact(VirtualTransactionId vxid,TransactionId xid,bool wait)4424 XactLockForVirtualXact(VirtualTransactionId vxid,
4425 					   TransactionId xid, bool wait)
4426 {
4427 	bool		more = false;
4428 
4429 	/* There is no point to wait for 2PCs if you have no 2PCs. */
4430 	if (max_prepared_xacts == 0)
4431 		return true;
4432 
4433 	do
4434 	{
4435 		LockAcquireResult lar;
4436 		LOCKTAG		tag;
4437 
4438 		/* Clear state from previous iterations. */
4439 		if (more)
4440 		{
4441 			xid = InvalidTransactionId;
4442 			more = false;
4443 		}
4444 
4445 		/* If we have no xid, try to find one. */
4446 		if (!TransactionIdIsValid(xid))
4447 			xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
4448 		if (!TransactionIdIsValid(xid))
4449 		{
4450 			Assert(!more);
4451 			return true;
4452 		}
4453 
4454 		/* Check or wait for XID completion. */
4455 		SET_LOCKTAG_TRANSACTION(tag, xid);
4456 		lar = LockAcquire(&tag, ShareLock, false, !wait);
4457 		if (lar == LOCKACQUIRE_NOT_AVAIL)
4458 			return false;
4459 		LockRelease(&tag, ShareLock, false);
4460 	} while (more);
4461 
4462 	return true;
4463 }
4464 
4465 /*
4466  *		VirtualXactLock
4467  *
4468  * If wait = true, wait as long as the given VXID or any XID acquired by the
4469  * same transaction is still running.  Then, return true.
4470  *
4471  * If wait = false, just check whether that VXID or one of those XIDs is still
4472  * running, and return true or false.
4473  */
4474 bool
VirtualXactLock(VirtualTransactionId vxid,bool wait)4475 VirtualXactLock(VirtualTransactionId vxid, bool wait)
4476 {
4477 	LOCKTAG		tag;
4478 	PGPROC	   *proc;
4479 	TransactionId xid = InvalidTransactionId;
4480 
4481 	Assert(VirtualTransactionIdIsValid(vxid));
4482 
4483 	if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
4484 		/* no vxid lock; localTransactionId is a normal, locked XID */
4485 		return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
4486 
4487 	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
4488 
4489 	/*
4490 	 * If a lock table entry must be made, this is the PGPROC on whose behalf
4491 	 * it must be done.  Note that the transaction might end or the PGPROC
4492 	 * might be reassigned to a new backend before we get around to examining
4493 	 * it, but it doesn't matter.  If we find upon examination that the
4494 	 * relevant lxid is no longer running here, that's enough to prove that
4495 	 * it's no longer running anywhere.
4496 	 */
4497 	proc = BackendIdGetProc(vxid.backendId);
4498 	if (proc == NULL)
4499 		return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4500 
4501 	/*
4502 	 * We must acquire this lock before checking the backendId and lxid
4503 	 * against the ones we're waiting for.  The target backend will only set
4504 	 * or clear lxid while holding this lock.
4505 	 */
4506 	LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
4507 
4508 	if (proc->backendId != vxid.backendId
4509 		|| proc->fpLocalTransactionId != vxid.localTransactionId)
4510 	{
4511 		/* VXID ended */
4512 		LWLockRelease(&proc->backendLock);
4513 		return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4514 	}
4515 
4516 	/*
4517 	 * If we aren't asked to wait, there's no need to set up a lock table
4518 	 * entry.  The transaction is still in progress, so just return false.
4519 	 */
4520 	if (!wait)
4521 	{
4522 		LWLockRelease(&proc->backendLock);
4523 		return false;
4524 	}
4525 
4526 	/*
4527 	 * OK, we're going to need to sleep on the VXID.  But first, we must set
4528 	 * up the primary lock table entry, if needed (ie, convert the proc's
4529 	 * fast-path lock on its VXID to a regular lock).
4530 	 */
4531 	if (proc->fpVXIDLock)
4532 	{
4533 		PROCLOCK   *proclock;
4534 		uint32		hashcode;
4535 		LWLock	   *partitionLock;
4536 
4537 		hashcode = LockTagHashCode(&tag);
4538 
4539 		partitionLock = LockHashPartitionLock(hashcode);
4540 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4541 
4542 		proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
4543 									&tag, hashcode, ExclusiveLock);
4544 		if (!proclock)
4545 		{
4546 			LWLockRelease(partitionLock);
4547 			LWLockRelease(&proc->backendLock);
4548 			ereport(ERROR,
4549 					(errcode(ERRCODE_OUT_OF_MEMORY),
4550 					 errmsg("out of shared memory"),
4551 					 errhint("You might need to increase max_locks_per_transaction.")));
4552 		}
4553 		GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
4554 
4555 		LWLockRelease(partitionLock);
4556 
4557 		proc->fpVXIDLock = false;
4558 	}
4559 
4560 	/*
4561 	 * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
4562 	 * search.  The proc might have assigned this XID but not yet locked it,
4563 	 * in which case the proc will lock this XID before releasing the VXID.
4564 	 * The backendLock critical section excludes VirtualXactLockTableCleanup(),
4565 	 * so we won't save an XID of a different VXID.  It doesn't matter whether
4566 	 * we save this before or after setting up the primary lock table entry.
4567 	 */
4568 	xid = ProcGlobal->allPgXact[proc->pgprocno].xid;
4569 
4570 	/* Done with proc->fpLockBits */
4571 	LWLockRelease(&proc->backendLock);
4572 
4573 	/* Time to wait. */
4574 	(void) LockAcquire(&tag, ShareLock, false, false);
4575 
4576 	LockRelease(&tag, ShareLock, false);
4577 	return XactLockForVirtualXact(vxid, xid, wait);
4578 }
4579 
4580 /*
4581  * LockWaiterCount
4582  *
4583  * Find the number of lock requester on this locktag
4584  */
4585 int
LockWaiterCount(const LOCKTAG * locktag)4586 LockWaiterCount(const LOCKTAG *locktag)
4587 {
4588 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
4589 	LOCK	   *lock;
4590 	bool		found;
4591 	uint32		hashcode;
4592 	LWLock	   *partitionLock;
4593 	int			waiters = 0;
4594 
4595 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4596 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4597 
4598 	hashcode = LockTagHashCode(locktag);
4599 	partitionLock = LockHashPartitionLock(hashcode);
4600 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4601 
4602 	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4603 												(const void *) locktag,
4604 												hashcode,
4605 												HASH_FIND,
4606 												&found);
4607 	if (found)
4608 	{
4609 		Assert(lock != NULL);
4610 		waiters = lock->nRequested;
4611 	}
4612 	LWLockRelease(partitionLock);
4613 
4614 	return waiters;
4615 }
4616