1 /*-------------------------------------------------------------------------
2  *
3  * lmgr.c
4  *	  POSTGRES lock manager code
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/lmgr/lmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "commands/progress.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "storage/lmgr.h"
26 #include "storage/proc.h"
27 #include "storage/procarray.h"
28 #include "storage/sinvaladt.h"
29 #include "utils/inval.h"
30 
31 
32 /*
33  * Per-backend counter for generating speculative insertion tokens.
34  *
35  * This may wrap around, but that's OK as it's only used for the short
36  * duration between inserting a tuple and checking that there are no (unique)
37  * constraint violations.  It's theoretically possible that a backend sees a
38  * tuple that was speculatively inserted by another backend, but before it has
39  * started waiting on the token, the other backend completes its insertion,
40  * and then performs 2^32 unrelated insertions.  And after all that, the
41  * first backend finally calls SpeculativeInsertionLockAcquire(), with the
42  * intention of waiting for the first insertion to complete, but ends up
43  * waiting for the latest unrelated insertion instead.  Even then, nothing
44  * particularly bad happens: in the worst case they deadlock, causing one of
45  * the transactions to abort.
46  */
47 static uint32 speculativeInsertionToken = 0;
48 
49 
50 /*
51  * Struct to hold context info for transaction lock waits.
52  *
53  * 'oper' is the operation that needs to wait for the other transaction; 'rel'
54  * and 'ctid' specify the address of the tuple being waited for.
55  */
56 typedef struct XactLockTableWaitInfo
57 {
58 	XLTW_Oper	oper;
59 	Relation	rel;
60 	ItemPointer ctid;
61 } XactLockTableWaitInfo;
62 
63 static void XactLockTableWaitErrorCb(void *arg);
64 
65 /*
66  * RelationInitLockInfo
67  *		Initializes the lock information in a relation descriptor.
68  *
69  *		relcache.c must call this during creation of any reldesc.
70  */
71 void
RelationInitLockInfo(Relation relation)72 RelationInitLockInfo(Relation relation)
73 {
74 	Assert(RelationIsValid(relation));
75 	Assert(OidIsValid(RelationGetRelid(relation)));
76 
77 	relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
78 
79 	if (relation->rd_rel->relisshared)
80 		relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
81 	else
82 		relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
83 }
84 
85 /*
86  * SetLocktagRelationOid
87  *		Set up a locktag for a relation, given only relation OID
88  */
89 static inline void
SetLocktagRelationOid(LOCKTAG * tag,Oid relid)90 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
91 {
92 	Oid			dbid;
93 
94 	if (IsSharedRelation(relid))
95 		dbid = InvalidOid;
96 	else
97 		dbid = MyDatabaseId;
98 
99 	SET_LOCKTAG_RELATION(*tag, dbid, relid);
100 }
101 
102 /*
103  *		LockRelationOid
104  *
105  * Lock a relation given only its OID.  This should generally be used
106  * before attempting to open the relation's relcache entry.
107  */
108 void
LockRelationOid(Oid relid,LOCKMODE lockmode)109 LockRelationOid(Oid relid, LOCKMODE lockmode)
110 {
111 	LOCKTAG		tag;
112 	LOCALLOCK  *locallock;
113 	LockAcquireResult res;
114 
115 	SetLocktagRelationOid(&tag, relid);
116 
117 	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
118 
119 	/*
120 	 * Now that we have the lock, check for invalidation messages, so that we
121 	 * will update or flush any stale relcache entry before we try to use it.
122 	 * RangeVarGetRelid() specifically relies on us for this.  We can skip
123 	 * this in the not-uncommon case that we already had the same type of lock
124 	 * being requested, since then no one else could have modified the
125 	 * relcache entry in an undesirable way.  (In the case where our own xact
126 	 * modifies the rel, the relcache update happens via
127 	 * CommandCounterIncrement, not here.)
128 	 *
129 	 * However, in corner cases where code acts on tables (usually catalogs)
130 	 * recursively, we might get here while still processing invalidation
131 	 * messages in some outer execution of this function or a sibling.  The
132 	 * "cleared" status of the lock tells us whether we really are done
133 	 * absorbing relevant inval messages.
134 	 */
135 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
136 	{
137 		AcceptInvalidationMessages();
138 		MarkLockClear(locallock);
139 	}
140 }
141 
142 /*
143  *		ConditionalLockRelationOid
144  *
145  * As above, but only lock if we can get the lock without blocking.
146  * Returns true iff the lock was acquired.
147  *
148  * NOTE: we do not currently need conditional versions of all the
149  * LockXXX routines in this file, but they could easily be added if needed.
150  */
151 bool
ConditionalLockRelationOid(Oid relid,LOCKMODE lockmode)152 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
153 {
154 	LOCKTAG		tag;
155 	LOCALLOCK  *locallock;
156 	LockAcquireResult res;
157 
158 	SetLocktagRelationOid(&tag, relid);
159 
160 	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
161 
162 	if (res == LOCKACQUIRE_NOT_AVAIL)
163 		return false;
164 
165 	/*
166 	 * Now that we have the lock, check for invalidation messages; see notes
167 	 * in LockRelationOid.
168 	 */
169 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
170 	{
171 		AcceptInvalidationMessages();
172 		MarkLockClear(locallock);
173 	}
174 
175 	return true;
176 }
177 
178 /*
179  *		UnlockRelationId
180  *
181  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
182  * for speed reasons.
183  */
184 void
UnlockRelationId(LockRelId * relid,LOCKMODE lockmode)185 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
186 {
187 	LOCKTAG		tag;
188 
189 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
190 
191 	LockRelease(&tag, lockmode, false);
192 }
193 
194 /*
195  *		UnlockRelationOid
196  *
197  * Unlock, given only a relation Oid.  Use UnlockRelationId if you can.
198  */
199 void
UnlockRelationOid(Oid relid,LOCKMODE lockmode)200 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
201 {
202 	LOCKTAG		tag;
203 
204 	SetLocktagRelationOid(&tag, relid);
205 
206 	LockRelease(&tag, lockmode, false);
207 }
208 
209 /*
210  *		LockRelation
211  *
212  * This is a convenience routine for acquiring an additional lock on an
213  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
214  * and then lock with this.
215  */
216 void
LockRelation(Relation relation,LOCKMODE lockmode)217 LockRelation(Relation relation, LOCKMODE lockmode)
218 {
219 	LOCKTAG		tag;
220 	LOCALLOCK  *locallock;
221 	LockAcquireResult res;
222 
223 	SET_LOCKTAG_RELATION(tag,
224 						 relation->rd_lockInfo.lockRelId.dbId,
225 						 relation->rd_lockInfo.lockRelId.relId);
226 
227 	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
228 
229 	/*
230 	 * Now that we have the lock, check for invalidation messages; see notes
231 	 * in LockRelationOid.
232 	 */
233 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
234 	{
235 		AcceptInvalidationMessages();
236 		MarkLockClear(locallock);
237 	}
238 }
239 
240 /*
241  *		ConditionalLockRelation
242  *
243  * This is a convenience routine for acquiring an additional lock on an
244  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
245  * and then lock with this.
246  */
247 bool
ConditionalLockRelation(Relation relation,LOCKMODE lockmode)248 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
249 {
250 	LOCKTAG		tag;
251 	LOCALLOCK  *locallock;
252 	LockAcquireResult res;
253 
254 	SET_LOCKTAG_RELATION(tag,
255 						 relation->rd_lockInfo.lockRelId.dbId,
256 						 relation->rd_lockInfo.lockRelId.relId);
257 
258 	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
259 
260 	if (res == LOCKACQUIRE_NOT_AVAIL)
261 		return false;
262 
263 	/*
264 	 * Now that we have the lock, check for invalidation messages; see notes
265 	 * in LockRelationOid.
266 	 */
267 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
268 	{
269 		AcceptInvalidationMessages();
270 		MarkLockClear(locallock);
271 	}
272 
273 	return true;
274 }
275 
276 /*
277  *		UnlockRelation
278  *
279  * This is a convenience routine for unlocking a relation without also
280  * closing it.
281  */
282 void
UnlockRelation(Relation relation,LOCKMODE lockmode)283 UnlockRelation(Relation relation, LOCKMODE lockmode)
284 {
285 	LOCKTAG		tag;
286 
287 	SET_LOCKTAG_RELATION(tag,
288 						 relation->rd_lockInfo.lockRelId.dbId,
289 						 relation->rd_lockInfo.lockRelId.relId);
290 
291 	LockRelease(&tag, lockmode, false);
292 }
293 
294 /*
295  *		CheckRelationLockedByMe
296  *
297  * Returns true if current transaction holds a lock on 'relation' of mode
298  * 'lockmode'.  If 'orstronger' is true, a stronger lockmode is also OK.
299  * ("Stronger" is defined as "numerically higher", which is a bit
300  * semantically dubious but is OK for the purposes we use this for.)
301  */
302 bool
CheckRelationLockedByMe(Relation relation,LOCKMODE lockmode,bool orstronger)303 CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
304 {
305 	LOCKTAG		tag;
306 
307 	SET_LOCKTAG_RELATION(tag,
308 						 relation->rd_lockInfo.lockRelId.dbId,
309 						 relation->rd_lockInfo.lockRelId.relId);
310 
311 	if (LockHeldByMe(&tag, lockmode))
312 		return true;
313 
314 	if (orstronger)
315 	{
316 		LOCKMODE	slockmode;
317 
318 		for (slockmode = lockmode + 1;
319 			 slockmode <= MaxLockMode;
320 			 slockmode++)
321 		{
322 			if (LockHeldByMe(&tag, slockmode))
323 			{
324 #ifdef NOT_USED
325 				/* Sometimes this might be useful for debugging purposes */
326 				elog(WARNING, "lock mode %s substituted for %s on relation %s",
327 					 GetLockmodeName(tag.locktag_lockmethodid, slockmode),
328 					 GetLockmodeName(tag.locktag_lockmethodid, lockmode),
329 					 RelationGetRelationName(relation));
330 #endif
331 				return true;
332 			}
333 		}
334 	}
335 
336 	return false;
337 }
338 
339 /*
340  *		LockHasWaitersRelation
341  *
342  * This is a function to check whether someone else is waiting for a
343  * lock which we are currently holding.
344  */
345 bool
LockHasWaitersRelation(Relation relation,LOCKMODE lockmode)346 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
347 {
348 	LOCKTAG		tag;
349 
350 	SET_LOCKTAG_RELATION(tag,
351 						 relation->rd_lockInfo.lockRelId.dbId,
352 						 relation->rd_lockInfo.lockRelId.relId);
353 
354 	return LockHasWaiters(&tag, lockmode, false);
355 }
356 
357 /*
358  *		LockRelationIdForSession
359  *
360  * This routine grabs a session-level lock on the target relation.  The
361  * session lock persists across transaction boundaries.  It will be removed
362  * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
363  * or if the backend exits.
364  *
365  * Note that one should also grab a transaction-level lock on the rel
366  * in any transaction that actually uses the rel, to ensure that the
367  * relcache entry is up to date.
368  */
369 void
LockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)370 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
371 {
372 	LOCKTAG		tag;
373 
374 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
375 
376 	(void) LockAcquire(&tag, lockmode, true, false);
377 }
378 
379 /*
380  *		UnlockRelationIdForSession
381  */
382 void
UnlockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)383 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
384 {
385 	LOCKTAG		tag;
386 
387 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
388 
389 	LockRelease(&tag, lockmode, true);
390 }
391 
392 /*
393  *		LockRelationForExtension
394  *
395  * This lock tag is used to interlock addition of pages to relations.
396  * We need such locking because bufmgr/smgr definition of P_NEW is not
397  * race-condition-proof.
398  *
399  * We assume the caller is already holding some type of regular lock on
400  * the relation, so no AcceptInvalidationMessages call is needed here.
401  */
402 void
LockRelationForExtension(Relation relation,LOCKMODE lockmode)403 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
404 {
405 	LOCKTAG		tag;
406 
407 	SET_LOCKTAG_RELATION_EXTEND(tag,
408 								relation->rd_lockInfo.lockRelId.dbId,
409 								relation->rd_lockInfo.lockRelId.relId);
410 
411 	(void) LockAcquire(&tag, lockmode, false, false);
412 }
413 
414 /*
415  *		ConditionalLockRelationForExtension
416  *
417  * As above, but only lock if we can get the lock without blocking.
418  * Returns true iff the lock was acquired.
419  */
420 bool
ConditionalLockRelationForExtension(Relation relation,LOCKMODE lockmode)421 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
422 {
423 	LOCKTAG		tag;
424 
425 	SET_LOCKTAG_RELATION_EXTEND(tag,
426 								relation->rd_lockInfo.lockRelId.dbId,
427 								relation->rd_lockInfo.lockRelId.relId);
428 
429 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
430 }
431 
432 /*
433  *		RelationExtensionLockWaiterCount
434  *
435  * Count the number of processes waiting for the given relation extension lock.
436  */
437 int
RelationExtensionLockWaiterCount(Relation relation)438 RelationExtensionLockWaiterCount(Relation relation)
439 {
440 	LOCKTAG		tag;
441 
442 	SET_LOCKTAG_RELATION_EXTEND(tag,
443 								relation->rd_lockInfo.lockRelId.dbId,
444 								relation->rd_lockInfo.lockRelId.relId);
445 
446 	return LockWaiterCount(&tag);
447 }
448 
449 /*
450  *		UnlockRelationForExtension
451  */
452 void
UnlockRelationForExtension(Relation relation,LOCKMODE lockmode)453 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
454 {
455 	LOCKTAG		tag;
456 
457 	SET_LOCKTAG_RELATION_EXTEND(tag,
458 								relation->rd_lockInfo.lockRelId.dbId,
459 								relation->rd_lockInfo.lockRelId.relId);
460 
461 	LockRelease(&tag, lockmode, false);
462 }
463 
464 /*
465  *		LockDatabaseFrozenIds
466  *
467  * This allows one backend per database to execute vac_update_datfrozenxid().
468  */
469 void
LockDatabaseFrozenIds(LOCKMODE lockmode)470 LockDatabaseFrozenIds(LOCKMODE lockmode)
471 {
472 	LOCKTAG		tag;
473 
474 	SET_LOCKTAG_DATABASE_FROZEN_IDS(tag, MyDatabaseId);
475 
476 	(void) LockAcquire(&tag, lockmode, false, false);
477 }
478 
479 /*
480  *		LockPage
481  *
482  * Obtain a page-level lock.  This is currently used by some index access
483  * methods to lock individual index pages.
484  */
485 void
LockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)486 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
487 {
488 	LOCKTAG		tag;
489 
490 	SET_LOCKTAG_PAGE(tag,
491 					 relation->rd_lockInfo.lockRelId.dbId,
492 					 relation->rd_lockInfo.lockRelId.relId,
493 					 blkno);
494 
495 	(void) LockAcquire(&tag, lockmode, false, false);
496 }
497 
498 /*
499  *		ConditionalLockPage
500  *
501  * As above, but only lock if we can get the lock without blocking.
502  * Returns true iff the lock was acquired.
503  */
504 bool
ConditionalLockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)505 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
506 {
507 	LOCKTAG		tag;
508 
509 	SET_LOCKTAG_PAGE(tag,
510 					 relation->rd_lockInfo.lockRelId.dbId,
511 					 relation->rd_lockInfo.lockRelId.relId,
512 					 blkno);
513 
514 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
515 }
516 
517 /*
518  *		UnlockPage
519  */
520 void
UnlockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)521 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
522 {
523 	LOCKTAG		tag;
524 
525 	SET_LOCKTAG_PAGE(tag,
526 					 relation->rd_lockInfo.lockRelId.dbId,
527 					 relation->rd_lockInfo.lockRelId.relId,
528 					 blkno);
529 
530 	LockRelease(&tag, lockmode, false);
531 }
532 
533 /*
534  *		LockTuple
535  *
536  * Obtain a tuple-level lock.  This is used in a less-than-intuitive fashion
537  * because we can't afford to keep a separate lock in shared memory for every
538  * tuple.  See heap_lock_tuple before using this!
539  */
540 void
LockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)541 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
542 {
543 	LOCKTAG		tag;
544 
545 	SET_LOCKTAG_TUPLE(tag,
546 					  relation->rd_lockInfo.lockRelId.dbId,
547 					  relation->rd_lockInfo.lockRelId.relId,
548 					  ItemPointerGetBlockNumber(tid),
549 					  ItemPointerGetOffsetNumber(tid));
550 
551 	(void) LockAcquire(&tag, lockmode, false, false);
552 }
553 
554 /*
555  *		ConditionalLockTuple
556  *
557  * As above, but only lock if we can get the lock without blocking.
558  * Returns true iff the lock was acquired.
559  */
560 bool
ConditionalLockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)561 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
562 {
563 	LOCKTAG		tag;
564 
565 	SET_LOCKTAG_TUPLE(tag,
566 					  relation->rd_lockInfo.lockRelId.dbId,
567 					  relation->rd_lockInfo.lockRelId.relId,
568 					  ItemPointerGetBlockNumber(tid),
569 					  ItemPointerGetOffsetNumber(tid));
570 
571 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
572 }
573 
574 /*
575  *		UnlockTuple
576  */
577 void
UnlockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)578 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
579 {
580 	LOCKTAG		tag;
581 
582 	SET_LOCKTAG_TUPLE(tag,
583 					  relation->rd_lockInfo.lockRelId.dbId,
584 					  relation->rd_lockInfo.lockRelId.relId,
585 					  ItemPointerGetBlockNumber(tid),
586 					  ItemPointerGetOffsetNumber(tid));
587 
588 	LockRelease(&tag, lockmode, false);
589 }
590 
591 /*
592  *		XactLockTableInsert
593  *
594  * Insert a lock showing that the given transaction ID is running ---
595  * this is done when an XID is acquired by a transaction or subtransaction.
596  * The lock can then be used to wait for the transaction to finish.
597  */
598 void
XactLockTableInsert(TransactionId xid)599 XactLockTableInsert(TransactionId xid)
600 {
601 	LOCKTAG		tag;
602 
603 	SET_LOCKTAG_TRANSACTION(tag, xid);
604 
605 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
606 }
607 
608 /*
609  *		XactLockTableDelete
610  *
611  * Delete the lock showing that the given transaction ID is running.
612  * (This is never used for main transaction IDs; those locks are only
613  * released implicitly at transaction end.  But we do use it for subtrans IDs.)
614  */
615 void
XactLockTableDelete(TransactionId xid)616 XactLockTableDelete(TransactionId xid)
617 {
618 	LOCKTAG		tag;
619 
620 	SET_LOCKTAG_TRANSACTION(tag, xid);
621 
622 	LockRelease(&tag, ExclusiveLock, false);
623 }
624 
625 /*
626  *		XactLockTableWait
627  *
628  * Wait for the specified transaction to commit or abort.  If an operation
629  * is specified, an error context callback is set up.  If 'oper' is passed as
630  * None, no error context callback is set up.
631  *
632  * Note that this does the right thing for subtransactions: if we wait on a
633  * subtransaction, we will exit as soon as it aborts or its top parent commits.
634  * It takes some extra work to ensure this, because to save on shared memory
635  * the XID lock of a subtransaction is released when it ends, whether
636  * successfully or unsuccessfully.  So we have to check if it's "still running"
637  * and if so wait for its parent.
638  */
639 void
XactLockTableWait(TransactionId xid,Relation rel,ItemPointer ctid,XLTW_Oper oper)640 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
641 				  XLTW_Oper oper)
642 {
643 	LOCKTAG		tag;
644 	XactLockTableWaitInfo info;
645 	ErrorContextCallback callback;
646 	bool		first = true;
647 
648 	/*
649 	 * If an operation is specified, set up our verbose error context
650 	 * callback.
651 	 */
652 	if (oper != XLTW_None)
653 	{
654 		Assert(RelationIsValid(rel));
655 		Assert(ItemPointerIsValid(ctid));
656 
657 		info.rel = rel;
658 		info.ctid = ctid;
659 		info.oper = oper;
660 
661 		callback.callback = XactLockTableWaitErrorCb;
662 		callback.arg = &info;
663 		callback.previous = error_context_stack;
664 		error_context_stack = &callback;
665 	}
666 
667 	for (;;)
668 	{
669 		Assert(TransactionIdIsValid(xid));
670 		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
671 
672 		SET_LOCKTAG_TRANSACTION(tag, xid);
673 
674 		(void) LockAcquire(&tag, ShareLock, false, false);
675 
676 		LockRelease(&tag, ShareLock, false);
677 
678 		if (!TransactionIdIsInProgress(xid))
679 			break;
680 
681 		/*
682 		 * If the Xid belonged to a subtransaction, then the lock would have
683 		 * gone away as soon as it was finished; for correct tuple visibility,
684 		 * the right action is to wait on its parent transaction to go away.
685 		 * But instead of going levels up one by one, we can just wait for the
686 		 * topmost transaction to finish with the same end result, which also
687 		 * incurs less locktable traffic.
688 		 *
689 		 * Some uses of this function don't involve tuple visibility -- such
690 		 * as when building snapshots for logical decoding.  It is possible to
691 		 * see a transaction in ProcArray before it registers itself in the
692 		 * locktable.  The topmost transaction in that case is the same xid,
693 		 * so we try again after a short sleep.  (Don't sleep the first time
694 		 * through, to avoid slowing down the normal case.)
695 		 */
696 		if (!first)
697 			pg_usleep(1000L);
698 		first = false;
699 		xid = SubTransGetTopmostTransaction(xid);
700 	}
701 
702 	if (oper != XLTW_None)
703 		error_context_stack = callback.previous;
704 }
705 
706 /*
707  *		ConditionalXactLockTableWait
708  *
709  * As above, but only lock if we can get the lock without blocking.
710  * Returns true if the lock was acquired.
711  */
712 bool
ConditionalXactLockTableWait(TransactionId xid)713 ConditionalXactLockTableWait(TransactionId xid)
714 {
715 	LOCKTAG		tag;
716 	bool		first = true;
717 
718 	for (;;)
719 	{
720 		Assert(TransactionIdIsValid(xid));
721 		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
722 
723 		SET_LOCKTAG_TRANSACTION(tag, xid);
724 
725 		if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
726 			return false;
727 
728 		LockRelease(&tag, ShareLock, false);
729 
730 		if (!TransactionIdIsInProgress(xid))
731 			break;
732 
733 		/* See XactLockTableWait about this case */
734 		if (!first)
735 			pg_usleep(1000L);
736 		first = false;
737 		xid = SubTransGetTopmostTransaction(xid);
738 	}
739 
740 	return true;
741 }
742 
743 /*
744  *		SpeculativeInsertionLockAcquire
745  *
746  * Insert a lock showing that the given transaction ID is inserting a tuple,
747  * but hasn't yet decided whether it's going to keep it.  The lock can then be
748  * used to wait for the decision to go ahead with the insertion, or aborting
749  * it.
750  *
751  * The token is used to distinguish multiple insertions by the same
752  * transaction.  It is returned to caller.
753  */
754 uint32
SpeculativeInsertionLockAcquire(TransactionId xid)755 SpeculativeInsertionLockAcquire(TransactionId xid)
756 {
757 	LOCKTAG		tag;
758 
759 	speculativeInsertionToken++;
760 
761 	/*
762 	 * Check for wrap-around. Zero means no token is held, so don't use that.
763 	 */
764 	if (speculativeInsertionToken == 0)
765 		speculativeInsertionToken = 1;
766 
767 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
768 
769 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
770 
771 	return speculativeInsertionToken;
772 }
773 
774 /*
775  *		SpeculativeInsertionLockRelease
776  *
777  * Delete the lock showing that the given transaction is speculatively
778  * inserting a tuple.
779  */
780 void
SpeculativeInsertionLockRelease(TransactionId xid)781 SpeculativeInsertionLockRelease(TransactionId xid)
782 {
783 	LOCKTAG		tag;
784 
785 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
786 
787 	LockRelease(&tag, ExclusiveLock, false);
788 }
789 
790 /*
791  *		SpeculativeInsertionWait
792  *
793  * Wait for the specified transaction to finish or abort the insertion of a
794  * tuple.
795  */
796 void
SpeculativeInsertionWait(TransactionId xid,uint32 token)797 SpeculativeInsertionWait(TransactionId xid, uint32 token)
798 {
799 	LOCKTAG		tag;
800 
801 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
802 
803 	Assert(TransactionIdIsValid(xid));
804 	Assert(token != 0);
805 
806 	(void) LockAcquire(&tag, ShareLock, false, false);
807 	LockRelease(&tag, ShareLock, false);
808 }
809 
810 /*
811  * XactLockTableWaitErrorCb
812  *		Error context callback for transaction lock waits.
813  */
814 static void
XactLockTableWaitErrorCb(void * arg)815 XactLockTableWaitErrorCb(void *arg)
816 {
817 	XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
818 
819 	/*
820 	 * We would like to print schema name too, but that would require a
821 	 * syscache lookup.
822 	 */
823 	if (info->oper != XLTW_None &&
824 		ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
825 	{
826 		const char *cxt;
827 
828 		switch (info->oper)
829 		{
830 			case XLTW_Update:
831 				cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
832 				break;
833 			case XLTW_Delete:
834 				cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
835 				break;
836 			case XLTW_Lock:
837 				cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
838 				break;
839 			case XLTW_LockUpdated:
840 				cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
841 				break;
842 			case XLTW_InsertIndex:
843 				cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
844 				break;
845 			case XLTW_InsertIndexUnique:
846 				cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
847 				break;
848 			case XLTW_FetchUpdated:
849 				cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
850 				break;
851 			case XLTW_RecheckExclusionConstr:
852 				cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
853 				break;
854 
855 			default:
856 				return;
857 		}
858 
859 		errcontext(cxt,
860 				   ItemPointerGetBlockNumber(info->ctid),
861 				   ItemPointerGetOffsetNumber(info->ctid),
862 				   RelationGetRelationName(info->rel));
863 	}
864 }
865 
866 /*
867  * WaitForLockersMultiple
868  *		Wait until no transaction holds locks that conflict with the given
869  *		locktags at the given lockmode.
870  *
871  * To do this, obtain the current list of lockers, and wait on their VXIDs
872  * until they are finished.
873  *
874  * Note we don't try to acquire the locks on the given locktags, only the
875  * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
876  * on the objects after we obtained our initial list of lockers, we will not
877  * wait for them.
878  */
879 void
WaitForLockersMultiple(List * locktags,LOCKMODE lockmode,bool progress)880 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
881 {
882 	List	   *holders = NIL;
883 	ListCell   *lc;
884 	int			total = 0;
885 	int			done = 0;
886 
887 	/* Done if no locks to wait for */
888 	if (list_length(locktags) == 0)
889 		return;
890 
891 	/* Collect the transactions we need to wait on */
892 	foreach(lc, locktags)
893 	{
894 		LOCKTAG    *locktag = lfirst(lc);
895 		int			count;
896 
897 		holders = lappend(holders,
898 						  GetLockConflicts(locktag, lockmode,
899 										   progress ? &count : NULL));
900 		if (progress)
901 			total += count;
902 	}
903 
904 	if (progress)
905 		pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, total);
906 
907 	/*
908 	 * Note: GetLockConflicts() never reports our own xid, hence we need not
909 	 * check for that.  Also, prepared xacts are reported and awaited.
910 	 */
911 
912 	/* Finally wait for each such transaction to complete */
913 	foreach(lc, holders)
914 	{
915 		VirtualTransactionId *lockholders = lfirst(lc);
916 
917 		while (VirtualTransactionIdIsValid(*lockholders))
918 		{
919 			/* If requested, publish who we're going to wait for. */
920 			if (progress)
921 			{
922 				PGPROC	   *holder = BackendIdGetProc(lockholders->backendId);
923 
924 				if (holder)
925 					pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
926 												 holder->pid);
927 			}
928 			VirtualXactLock(*lockholders, true);
929 			lockholders++;
930 
931 			if (progress)
932 				pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, ++done);
933 		}
934 	}
935 	if (progress)
936 	{
937 		const int	index[] = {
938 			PROGRESS_WAITFOR_TOTAL,
939 			PROGRESS_WAITFOR_DONE,
940 			PROGRESS_WAITFOR_CURRENT_PID
941 		};
942 		const int64 values[] = {
943 			0, 0, 0
944 		};
945 
946 		pgstat_progress_update_multi_param(3, index, values);
947 	}
948 
949 	list_free_deep(holders);
950 }
951 
952 /*
953  * WaitForLockers
954  *
955  * Same as WaitForLockersMultiple, for a single lock tag.
956  */
957 void
WaitForLockers(LOCKTAG heaplocktag,LOCKMODE lockmode,bool progress)958 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress)
959 {
960 	List	   *l;
961 
962 	l = list_make1(&heaplocktag);
963 	WaitForLockersMultiple(l, lockmode, progress);
964 	list_free(l);
965 }
966 
967 
968 /*
969  *		LockDatabaseObject
970  *
971  * Obtain a lock on a general object of the current database.  Don't use
972  * this for shared objects (such as tablespaces).  It's unwise to apply it
973  * to relations, also, since a lock taken this way will NOT conflict with
974  * locks taken via LockRelation and friends.
975  */
976 void
LockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)977 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
978 				   LOCKMODE lockmode)
979 {
980 	LOCKTAG		tag;
981 
982 	SET_LOCKTAG_OBJECT(tag,
983 					   MyDatabaseId,
984 					   classid,
985 					   objid,
986 					   objsubid);
987 
988 	(void) LockAcquire(&tag, lockmode, false, false);
989 
990 	/* Make sure syscaches are up-to-date with any changes we waited for */
991 	AcceptInvalidationMessages();
992 }
993 
994 /*
995  *		UnlockDatabaseObject
996  */
997 void
UnlockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)998 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
999 					 LOCKMODE lockmode)
1000 {
1001 	LOCKTAG		tag;
1002 
1003 	SET_LOCKTAG_OBJECT(tag,
1004 					   MyDatabaseId,
1005 					   classid,
1006 					   objid,
1007 					   objsubid);
1008 
1009 	LockRelease(&tag, lockmode, false);
1010 }
1011 
1012 /*
1013  *		LockSharedObject
1014  *
1015  * Obtain a lock on a shared-across-databases object.
1016  */
1017 void
LockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1018 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1019 				 LOCKMODE lockmode)
1020 {
1021 	LOCKTAG		tag;
1022 
1023 	SET_LOCKTAG_OBJECT(tag,
1024 					   InvalidOid,
1025 					   classid,
1026 					   objid,
1027 					   objsubid);
1028 
1029 	(void) LockAcquire(&tag, lockmode, false, false);
1030 
1031 	/* Make sure syscaches are up-to-date with any changes we waited for */
1032 	AcceptInvalidationMessages();
1033 }
1034 
1035 /*
1036  *		UnlockSharedObject
1037  */
1038 void
UnlockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1039 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1040 				   LOCKMODE lockmode)
1041 {
1042 	LOCKTAG		tag;
1043 
1044 	SET_LOCKTAG_OBJECT(tag,
1045 					   InvalidOid,
1046 					   classid,
1047 					   objid,
1048 					   objsubid);
1049 
1050 	LockRelease(&tag, lockmode, false);
1051 }
1052 
1053 /*
1054  *		LockSharedObjectForSession
1055  *
1056  * Obtain a session-level lock on a shared-across-databases object.
1057  * See LockRelationIdForSession for notes about session-level locks.
1058  */
1059 void
LockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1060 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1061 						   LOCKMODE lockmode)
1062 {
1063 	LOCKTAG		tag;
1064 
1065 	SET_LOCKTAG_OBJECT(tag,
1066 					   InvalidOid,
1067 					   classid,
1068 					   objid,
1069 					   objsubid);
1070 
1071 	(void) LockAcquire(&tag, lockmode, true, false);
1072 }
1073 
1074 /*
1075  *		UnlockSharedObjectForSession
1076  */
1077 void
UnlockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1078 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1079 							 LOCKMODE lockmode)
1080 {
1081 	LOCKTAG		tag;
1082 
1083 	SET_LOCKTAG_OBJECT(tag,
1084 					   InvalidOid,
1085 					   classid,
1086 					   objid,
1087 					   objsubid);
1088 
1089 	LockRelease(&tag, lockmode, true);
1090 }
1091 
1092 
1093 /*
1094  * Append a description of a lockable object to buf.
1095  *
1096  * Ideally we would print names for the numeric values, but that requires
1097  * getting locks on system tables, which might cause problems since this is
1098  * typically used to report deadlock situations.
1099  */
1100 void
DescribeLockTag(StringInfo buf,const LOCKTAG * tag)1101 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1102 {
1103 	switch ((LockTagType) tag->locktag_type)
1104 	{
1105 		case LOCKTAG_RELATION:
1106 			appendStringInfo(buf,
1107 							 _("relation %u of database %u"),
1108 							 tag->locktag_field2,
1109 							 tag->locktag_field1);
1110 			break;
1111 		case LOCKTAG_RELATION_EXTEND:
1112 			appendStringInfo(buf,
1113 							 _("extension of relation %u of database %u"),
1114 							 tag->locktag_field2,
1115 							 tag->locktag_field1);
1116 			break;
1117 		case LOCKTAG_DATABASE_FROZEN_IDS:
1118 			appendStringInfo(buf,
1119 							 _("pg_database.datfrozenxid of database %u"),
1120 							 tag->locktag_field1);
1121 			break;
1122 		case LOCKTAG_PAGE:
1123 			appendStringInfo(buf,
1124 							 _("page %u of relation %u of database %u"),
1125 							 tag->locktag_field3,
1126 							 tag->locktag_field2,
1127 							 tag->locktag_field1);
1128 			break;
1129 		case LOCKTAG_TUPLE:
1130 			appendStringInfo(buf,
1131 							 _("tuple (%u,%u) of relation %u of database %u"),
1132 							 tag->locktag_field3,
1133 							 tag->locktag_field4,
1134 							 tag->locktag_field2,
1135 							 tag->locktag_field1);
1136 			break;
1137 		case LOCKTAG_TRANSACTION:
1138 			appendStringInfo(buf,
1139 							 _("transaction %u"),
1140 							 tag->locktag_field1);
1141 			break;
1142 		case LOCKTAG_VIRTUALTRANSACTION:
1143 			appendStringInfo(buf,
1144 							 _("virtual transaction %d/%u"),
1145 							 tag->locktag_field1,
1146 							 tag->locktag_field2);
1147 			break;
1148 		case LOCKTAG_SPECULATIVE_TOKEN:
1149 			appendStringInfo(buf,
1150 							 _("speculative token %u of transaction %u"),
1151 							 tag->locktag_field2,
1152 							 tag->locktag_field1);
1153 			break;
1154 		case LOCKTAG_OBJECT:
1155 			appendStringInfo(buf,
1156 							 _("object %u of class %u of database %u"),
1157 							 tag->locktag_field3,
1158 							 tag->locktag_field2,
1159 							 tag->locktag_field1);
1160 			break;
1161 		case LOCKTAG_USERLOCK:
1162 			/* reserved for old contrib code, now on pgfoundry */
1163 			appendStringInfo(buf,
1164 							 _("user lock [%u,%u,%u]"),
1165 							 tag->locktag_field1,
1166 							 tag->locktag_field2,
1167 							 tag->locktag_field3);
1168 			break;
1169 		case LOCKTAG_ADVISORY:
1170 			appendStringInfo(buf,
1171 							 _("advisory lock [%u,%u,%u,%u]"),
1172 							 tag->locktag_field1,
1173 							 tag->locktag_field2,
1174 							 tag->locktag_field3,
1175 							 tag->locktag_field4);
1176 			break;
1177 		default:
1178 			appendStringInfo(buf,
1179 							 _("unrecognized locktag type %d"),
1180 							 (int) tag->locktag_type);
1181 			break;
1182 	}
1183 }
1184 
1185 /*
1186  * GetLockNameFromTagType
1187  *
1188  *	Given locktag type, return the corresponding lock name.
1189  */
1190 const char *
GetLockNameFromTagType(uint16 locktag_type)1191 GetLockNameFromTagType(uint16 locktag_type)
1192 {
1193 	if (locktag_type > LOCKTAG_LAST_TYPE)
1194 		return "???";
1195 	return LockTagTypeNames[locktag_type];
1196 }
1197