1 /*-------------------------------------------------------------------------
2  *
3  * lmgr.c
4  *	  POSTGRES lock manager code
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/lmgr/lmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "commands/progress.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "storage/lmgr.h"
26 #include "storage/procarray.h"
27 #include "storage/sinvaladt.h"
28 #include "utils/inval.h"
29 
30 
31 /*
32  * Per-backend counter for generating speculative insertion tokens.
33  *
34  * This may wrap around, but that's OK as it's only used for the short
35  * duration between inserting a tuple and checking that there are no (unique)
36  * constraint violations.  It's theoretically possible that a backend sees a
37  * tuple that was speculatively inserted by another backend, but before it has
38  * started waiting on the token, the other backend completes its insertion,
39  * and then performs 2^32 unrelated insertions.  And after all that, the
40  * first backend finally calls SpeculativeInsertionLockAcquire(), with the
41  * intention of waiting for the first insertion to complete, but ends up
42  * waiting for the latest unrelated insertion instead.  Even then, nothing
43  * particularly bad happens: in the worst case they deadlock, causing one of
44  * the transactions to abort.
45  */
46 static uint32 speculativeInsertionToken = 0;
47 
48 
49 /*
50  * Struct to hold context info for transaction lock waits.
51  *
52  * 'oper' is the operation that needs to wait for the other transaction; 'rel'
53  * and 'ctid' specify the address of the tuple being waited for.
54  */
55 typedef struct XactLockTableWaitInfo
56 {
57 	XLTW_Oper	oper;
58 	Relation	rel;
59 	ItemPointer ctid;
60 } XactLockTableWaitInfo;
61 
62 static void XactLockTableWaitErrorCb(void *arg);
63 
64 /*
65  * RelationInitLockInfo
66  *		Initializes the lock information in a relation descriptor.
67  *
68  *		relcache.c must call this during creation of any reldesc.
69  */
70 void
71 RelationInitLockInfo(Relation relation)
72 {
73 	Assert(RelationIsValid(relation));
74 	Assert(OidIsValid(RelationGetRelid(relation)));
75 
76 	relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
77 
78 	if (relation->rd_rel->relisshared)
79 		relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
80 	else
81 		relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
82 }
83 
84 /*
85  * SetLocktagRelationOid
86  *		Set up a locktag for a relation, given only relation OID
87  */
88 static inline void
89 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
90 {
91 	Oid			dbid;
92 
93 	if (IsSharedRelation(relid))
94 		dbid = InvalidOid;
95 	else
96 		dbid = MyDatabaseId;
97 
98 	SET_LOCKTAG_RELATION(*tag, dbid, relid);
99 }
100 
101 /*
102  *		LockRelationOid
103  *
104  * Lock a relation given only its OID.  This should generally be used
105  * before attempting to open the relation's relcache entry.
106  */
107 void
108 LockRelationOid(Oid relid, LOCKMODE lockmode)
109 {
110 	LOCKTAG		tag;
111 	LOCALLOCK  *locallock;
112 	LockAcquireResult res;
113 
114 	SetLocktagRelationOid(&tag, relid);
115 
116 	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
117 
118 	/*
119 	 * Now that we have the lock, check for invalidation messages, so that we
120 	 * will update or flush any stale relcache entry before we try to use it.
121 	 * RangeVarGetRelid() specifically relies on us for this.  We can skip
122 	 * this in the not-uncommon case that we already had the same type of lock
123 	 * being requested, since then no one else could have modified the
124 	 * relcache entry in an undesirable way.  (In the case where our own xact
125 	 * modifies the rel, the relcache update happens via
126 	 * CommandCounterIncrement, not here.)
127 	 *
128 	 * However, in corner cases where code acts on tables (usually catalogs)
129 	 * recursively, we might get here while still processing invalidation
130 	 * messages in some outer execution of this function or a sibling.  The
131 	 * "cleared" status of the lock tells us whether we really are done
132 	 * absorbing relevant inval messages.
133 	 */
134 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
135 	{
136 		AcceptInvalidationMessages();
137 		MarkLockClear(locallock);
138 	}
139 }
140 
141 /*
142  *		ConditionalLockRelationOid
143  *
144  * As above, but only lock if we can get the lock without blocking.
145  * Returns true iff the lock was acquired.
146  *
147  * NOTE: we do not currently need conditional versions of all the
148  * LockXXX routines in this file, but they could easily be added if needed.
149  */
150 bool
151 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
152 {
153 	LOCKTAG		tag;
154 	LOCALLOCK  *locallock;
155 	LockAcquireResult res;
156 
157 	SetLocktagRelationOid(&tag, relid);
158 
159 	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
160 
161 	if (res == LOCKACQUIRE_NOT_AVAIL)
162 		return false;
163 
164 	/*
165 	 * Now that we have the lock, check for invalidation messages; see notes
166 	 * in LockRelationOid.
167 	 */
168 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
169 	{
170 		AcceptInvalidationMessages();
171 		MarkLockClear(locallock);
172 	}
173 
174 	return true;
175 }
176 
177 /*
178  *		UnlockRelationId
179  *
180  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
181  * for speed reasons.
182  */
183 void
184 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
185 {
186 	LOCKTAG		tag;
187 
188 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
189 
190 	LockRelease(&tag, lockmode, false);
191 }
192 
193 /*
194  *		UnlockRelationOid
195  *
196  * Unlock, given only a relation Oid.  Use UnlockRelationId if you can.
197  */
198 void
199 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
200 {
201 	LOCKTAG		tag;
202 
203 	SetLocktagRelationOid(&tag, relid);
204 
205 	LockRelease(&tag, lockmode, false);
206 }
207 
208 /*
209  *		LockRelation
210  *
211  * This is a convenience routine for acquiring an additional lock on an
212  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
213  * and then lock with this.
214  */
215 void
216 LockRelation(Relation relation, LOCKMODE lockmode)
217 {
218 	LOCKTAG		tag;
219 	LOCALLOCK  *locallock;
220 	LockAcquireResult res;
221 
222 	SET_LOCKTAG_RELATION(tag,
223 						 relation->rd_lockInfo.lockRelId.dbId,
224 						 relation->rd_lockInfo.lockRelId.relId);
225 
226 	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
227 
228 	/*
229 	 * Now that we have the lock, check for invalidation messages; see notes
230 	 * in LockRelationOid.
231 	 */
232 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
233 	{
234 		AcceptInvalidationMessages();
235 		MarkLockClear(locallock);
236 	}
237 }
238 
239 /*
240  *		ConditionalLockRelation
241  *
242  * This is a convenience routine for acquiring an additional lock on an
243  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
244  * and then lock with this.
245  */
246 bool
247 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
248 {
249 	LOCKTAG		tag;
250 	LOCALLOCK  *locallock;
251 	LockAcquireResult res;
252 
253 	SET_LOCKTAG_RELATION(tag,
254 						 relation->rd_lockInfo.lockRelId.dbId,
255 						 relation->rd_lockInfo.lockRelId.relId);
256 
257 	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
258 
259 	if (res == LOCKACQUIRE_NOT_AVAIL)
260 		return false;
261 
262 	/*
263 	 * Now that we have the lock, check for invalidation messages; see notes
264 	 * in LockRelationOid.
265 	 */
266 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
267 	{
268 		AcceptInvalidationMessages();
269 		MarkLockClear(locallock);
270 	}
271 
272 	return true;
273 }
274 
275 /*
276  *		UnlockRelation
277  *
278  * This is a convenience routine for unlocking a relation without also
279  * closing it.
280  */
281 void
282 UnlockRelation(Relation relation, LOCKMODE lockmode)
283 {
284 	LOCKTAG		tag;
285 
286 	SET_LOCKTAG_RELATION(tag,
287 						 relation->rd_lockInfo.lockRelId.dbId,
288 						 relation->rd_lockInfo.lockRelId.relId);
289 
290 	LockRelease(&tag, lockmode, false);
291 }
292 
293 /*
294  *		CheckRelationLockedByMe
295  *
296  * Returns true if current transaction holds a lock on 'relation' of mode
297  * 'lockmode'.  If 'orstronger' is true, a stronger lockmode is also OK.
298  * ("Stronger" is defined as "numerically higher", which is a bit
299  * semantically dubious but is OK for the purposes we use this for.)
300  */
301 bool
302 CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
303 {
304 	LOCKTAG		tag;
305 
306 	SET_LOCKTAG_RELATION(tag,
307 						 relation->rd_lockInfo.lockRelId.dbId,
308 						 relation->rd_lockInfo.lockRelId.relId);
309 
310 	if (LockHeldByMe(&tag, lockmode))
311 		return true;
312 
313 	if (orstronger)
314 	{
315 		LOCKMODE	slockmode;
316 
317 		for (slockmode = lockmode + 1;
318 			 slockmode <= MaxLockMode;
319 			 slockmode++)
320 		{
321 			if (LockHeldByMe(&tag, slockmode))
322 			{
323 #ifdef NOT_USED
324 				/* Sometimes this might be useful for debugging purposes */
325 				elog(WARNING, "lock mode %s substituted for %s on relation %s",
326 					 GetLockmodeName(tag.locktag_lockmethodid, slockmode),
327 					 GetLockmodeName(tag.locktag_lockmethodid, lockmode),
328 					 RelationGetRelationName(relation));
329 #endif
330 				return true;
331 			}
332 		}
333 	}
334 
335 	return false;
336 }
337 
338 /*
339  *		LockHasWaitersRelation
340  *
341  * This is a function to check whether someone else is waiting for a
342  * lock which we are currently holding.
343  */
344 bool
345 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
346 {
347 	LOCKTAG		tag;
348 
349 	SET_LOCKTAG_RELATION(tag,
350 						 relation->rd_lockInfo.lockRelId.dbId,
351 						 relation->rd_lockInfo.lockRelId.relId);
352 
353 	return LockHasWaiters(&tag, lockmode, false);
354 }
355 
356 /*
357  *		LockRelationIdForSession
358  *
359  * This routine grabs a session-level lock on the target relation.  The
360  * session lock persists across transaction boundaries.  It will be removed
361  * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
362  * or if the backend exits.
363  *
364  * Note that one should also grab a transaction-level lock on the rel
365  * in any transaction that actually uses the rel, to ensure that the
366  * relcache entry is up to date.
367  */
368 void
369 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
370 {
371 	LOCKTAG		tag;
372 
373 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
374 
375 	(void) LockAcquire(&tag, lockmode, true, false);
376 }
377 
378 /*
379  *		UnlockRelationIdForSession
380  */
381 void
382 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
383 {
384 	LOCKTAG		tag;
385 
386 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
387 
388 	LockRelease(&tag, lockmode, true);
389 }
390 
391 /*
392  *		LockRelationForExtension
393  *
394  * This lock tag is used to interlock addition of pages to relations.
395  * We need such locking because bufmgr/smgr definition of P_NEW is not
396  * race-condition-proof.
397  *
398  * We assume the caller is already holding some type of regular lock on
399  * the relation, so no AcceptInvalidationMessages call is needed here.
400  */
401 void
402 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
403 {
404 	LOCKTAG		tag;
405 
406 	SET_LOCKTAG_RELATION_EXTEND(tag,
407 								relation->rd_lockInfo.lockRelId.dbId,
408 								relation->rd_lockInfo.lockRelId.relId);
409 
410 	(void) LockAcquire(&tag, lockmode, false, false);
411 }
412 
413 /*
414  *		ConditionalLockRelationForExtension
415  *
416  * As above, but only lock if we can get the lock without blocking.
417  * Returns true iff the lock was acquired.
418  */
419 bool
420 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
421 {
422 	LOCKTAG		tag;
423 
424 	SET_LOCKTAG_RELATION_EXTEND(tag,
425 								relation->rd_lockInfo.lockRelId.dbId,
426 								relation->rd_lockInfo.lockRelId.relId);
427 
428 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
429 }
430 
431 /*
432  *		RelationExtensionLockWaiterCount
433  *
434  * Count the number of processes waiting for the given relation extension lock.
435  */
436 int
437 RelationExtensionLockWaiterCount(Relation relation)
438 {
439 	LOCKTAG		tag;
440 
441 	SET_LOCKTAG_RELATION_EXTEND(tag,
442 								relation->rd_lockInfo.lockRelId.dbId,
443 								relation->rd_lockInfo.lockRelId.relId);
444 
445 	return LockWaiterCount(&tag);
446 }
447 
448 /*
449  *		UnlockRelationForExtension
450  */
451 void
452 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
453 {
454 	LOCKTAG		tag;
455 
456 	SET_LOCKTAG_RELATION_EXTEND(tag,
457 								relation->rd_lockInfo.lockRelId.dbId,
458 								relation->rd_lockInfo.lockRelId.relId);
459 
460 	LockRelease(&tag, lockmode, false);
461 }
462 
463 /*
464  *		LockDatabaseFrozenIds
465  *
466  * This allows one backend per database to execute vac_update_datfrozenxid().
467  */
468 void
469 LockDatabaseFrozenIds(LOCKMODE lockmode)
470 {
471 	LOCKTAG		tag;
472 
473 	SET_LOCKTAG_DATABASE_FROZEN_IDS(tag, MyDatabaseId);
474 
475 	(void) LockAcquire(&tag, lockmode, false, false);
476 }
477 
478 /*
479  *		LockPage
480  *
481  * Obtain a page-level lock.  This is currently used by some index access
482  * methods to lock individual index pages.
483  */
484 void
485 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
486 {
487 	LOCKTAG		tag;
488 
489 	SET_LOCKTAG_PAGE(tag,
490 					 relation->rd_lockInfo.lockRelId.dbId,
491 					 relation->rd_lockInfo.lockRelId.relId,
492 					 blkno);
493 
494 	(void) LockAcquire(&tag, lockmode, false, false);
495 }
496 
497 /*
498  *		ConditionalLockPage
499  *
500  * As above, but only lock if we can get the lock without blocking.
501  * Returns true iff the lock was acquired.
502  */
503 bool
504 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
505 {
506 	LOCKTAG		tag;
507 
508 	SET_LOCKTAG_PAGE(tag,
509 					 relation->rd_lockInfo.lockRelId.dbId,
510 					 relation->rd_lockInfo.lockRelId.relId,
511 					 blkno);
512 
513 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
514 }
515 
516 /*
517  *		UnlockPage
518  */
519 void
520 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
521 {
522 	LOCKTAG		tag;
523 
524 	SET_LOCKTAG_PAGE(tag,
525 					 relation->rd_lockInfo.lockRelId.dbId,
526 					 relation->rd_lockInfo.lockRelId.relId,
527 					 blkno);
528 
529 	LockRelease(&tag, lockmode, false);
530 }
531 
532 /*
533  *		LockTuple
534  *
535  * Obtain a tuple-level lock.  This is used in a less-than-intuitive fashion
536  * because we can't afford to keep a separate lock in shared memory for every
537  * tuple.  See heap_lock_tuple before using this!
538  */
539 void
540 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
541 {
542 	LOCKTAG		tag;
543 
544 	SET_LOCKTAG_TUPLE(tag,
545 					  relation->rd_lockInfo.lockRelId.dbId,
546 					  relation->rd_lockInfo.lockRelId.relId,
547 					  ItemPointerGetBlockNumber(tid),
548 					  ItemPointerGetOffsetNumber(tid));
549 
550 	(void) LockAcquire(&tag, lockmode, false, false);
551 }
552 
553 /*
554  *		ConditionalLockTuple
555  *
556  * As above, but only lock if we can get the lock without blocking.
557  * Returns true iff the lock was acquired.
558  */
559 bool
560 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
561 {
562 	LOCKTAG		tag;
563 
564 	SET_LOCKTAG_TUPLE(tag,
565 					  relation->rd_lockInfo.lockRelId.dbId,
566 					  relation->rd_lockInfo.lockRelId.relId,
567 					  ItemPointerGetBlockNumber(tid),
568 					  ItemPointerGetOffsetNumber(tid));
569 
570 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
571 }
572 
573 /*
574  *		UnlockTuple
575  */
576 void
577 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
578 {
579 	LOCKTAG		tag;
580 
581 	SET_LOCKTAG_TUPLE(tag,
582 					  relation->rd_lockInfo.lockRelId.dbId,
583 					  relation->rd_lockInfo.lockRelId.relId,
584 					  ItemPointerGetBlockNumber(tid),
585 					  ItemPointerGetOffsetNumber(tid));
586 
587 	LockRelease(&tag, lockmode, false);
588 }
589 
590 /*
591  *		XactLockTableInsert
592  *
593  * Insert a lock showing that the given transaction ID is running ---
594  * this is done when an XID is acquired by a transaction or subtransaction.
595  * The lock can then be used to wait for the transaction to finish.
596  */
597 void
598 XactLockTableInsert(TransactionId xid)
599 {
600 	LOCKTAG		tag;
601 
602 	SET_LOCKTAG_TRANSACTION(tag, xid);
603 
604 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
605 }
606 
607 /*
608  *		XactLockTableDelete
609  *
610  * Delete the lock showing that the given transaction ID is running.
611  * (This is never used for main transaction IDs; those locks are only
612  * released implicitly at transaction end.  But we do use it for subtrans IDs.)
613  */
614 void
615 XactLockTableDelete(TransactionId xid)
616 {
617 	LOCKTAG		tag;
618 
619 	SET_LOCKTAG_TRANSACTION(tag, xid);
620 
621 	LockRelease(&tag, ExclusiveLock, false);
622 }
623 
624 /*
625  *		XactLockTableWait
626  *
627  * Wait for the specified transaction to commit or abort.  If an operation
628  * is specified, an error context callback is set up.  If 'oper' is passed as
629  * None, no error context callback is set up.
630  *
631  * Note that this does the right thing for subtransactions: if we wait on a
632  * subtransaction, we will exit as soon as it aborts or its top parent commits.
633  * It takes some extra work to ensure this, because to save on shared memory
634  * the XID lock of a subtransaction is released when it ends, whether
635  * successfully or unsuccessfully.  So we have to check if it's "still running"
636  * and if so wait for its parent.
637  */
638 void
639 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
640 				  XLTW_Oper oper)
641 {
642 	LOCKTAG		tag;
643 	XactLockTableWaitInfo info;
644 	ErrorContextCallback callback;
645 	bool		first = true;
646 
647 	/*
648 	 * If an operation is specified, set up our verbose error context
649 	 * callback.
650 	 */
651 	if (oper != XLTW_None)
652 	{
653 		Assert(RelationIsValid(rel));
654 		Assert(ItemPointerIsValid(ctid));
655 
656 		info.rel = rel;
657 		info.ctid = ctid;
658 		info.oper = oper;
659 
660 		callback.callback = XactLockTableWaitErrorCb;
661 		callback.arg = &info;
662 		callback.previous = error_context_stack;
663 		error_context_stack = &callback;
664 	}
665 
666 	for (;;)
667 	{
668 		Assert(TransactionIdIsValid(xid));
669 		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
670 
671 		SET_LOCKTAG_TRANSACTION(tag, xid);
672 
673 		(void) LockAcquire(&tag, ShareLock, false, false);
674 
675 		LockRelease(&tag, ShareLock, false);
676 
677 		if (!TransactionIdIsInProgress(xid))
678 			break;
679 
680 		/*
681 		 * If the Xid belonged to a subtransaction, then the lock would have
682 		 * gone away as soon as it was finished; for correct tuple visibility,
683 		 * the right action is to wait on its parent transaction to go away.
684 		 * But instead of going levels up one by one, we can just wait for the
685 		 * topmost transaction to finish with the same end result, which also
686 		 * incurs less locktable traffic.
687 		 *
688 		 * Some uses of this function don't involve tuple visibility -- such
689 		 * as when building snapshots for logical decoding.  It is possible to
690 		 * see a transaction in ProcArray before it registers itself in the
691 		 * locktable.  The topmost transaction in that case is the same xid,
692 		 * so we try again after a short sleep.  (Don't sleep the first time
693 		 * through, to avoid slowing down the normal case.)
694 		 */
695 		if (!first)
696 			pg_usleep(1000L);
697 		first = false;
698 		xid = SubTransGetTopmostTransaction(xid);
699 	}
700 
701 	if (oper != XLTW_None)
702 		error_context_stack = callback.previous;
703 }
704 
705 /*
706  *		ConditionalXactLockTableWait
707  *
708  * As above, but only lock if we can get the lock without blocking.
709  * Returns true if the lock was acquired.
710  */
711 bool
712 ConditionalXactLockTableWait(TransactionId xid)
713 {
714 	LOCKTAG		tag;
715 	bool		first = true;
716 
717 	for (;;)
718 	{
719 		Assert(TransactionIdIsValid(xid));
720 		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
721 
722 		SET_LOCKTAG_TRANSACTION(tag, xid);
723 
724 		if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
725 			return false;
726 
727 		LockRelease(&tag, ShareLock, false);
728 
729 		if (!TransactionIdIsInProgress(xid))
730 			break;
731 
732 		/* See XactLockTableWait about this case */
733 		if (!first)
734 			pg_usleep(1000L);
735 		first = false;
736 		xid = SubTransGetTopmostTransaction(xid);
737 	}
738 
739 	return true;
740 }
741 
742 /*
743  *		SpeculativeInsertionLockAcquire
744  *
745  * Insert a lock showing that the given transaction ID is inserting a tuple,
746  * but hasn't yet decided whether it's going to keep it.  The lock can then be
747  * used to wait for the decision to go ahead with the insertion, or aborting
748  * it.
749  *
750  * The token is used to distinguish multiple insertions by the same
751  * transaction.  It is returned to caller.
752  */
753 uint32
754 SpeculativeInsertionLockAcquire(TransactionId xid)
755 {
756 	LOCKTAG		tag;
757 
758 	speculativeInsertionToken++;
759 
760 	/*
761 	 * Check for wrap-around. Zero means no token is held, so don't use that.
762 	 */
763 	if (speculativeInsertionToken == 0)
764 		speculativeInsertionToken = 1;
765 
766 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
767 
768 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
769 
770 	return speculativeInsertionToken;
771 }
772 
773 /*
774  *		SpeculativeInsertionLockRelease
775  *
776  * Delete the lock showing that the given transaction is speculatively
777  * inserting a tuple.
778  */
779 void
780 SpeculativeInsertionLockRelease(TransactionId xid)
781 {
782 	LOCKTAG		tag;
783 
784 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
785 
786 	LockRelease(&tag, ExclusiveLock, false);
787 }
788 
789 /*
790  *		SpeculativeInsertionWait
791  *
792  * Wait for the specified transaction to finish or abort the insertion of a
793  * tuple.
794  */
795 void
796 SpeculativeInsertionWait(TransactionId xid, uint32 token)
797 {
798 	LOCKTAG		tag;
799 
800 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
801 
802 	Assert(TransactionIdIsValid(xid));
803 	Assert(token != 0);
804 
805 	(void) LockAcquire(&tag, ShareLock, false, false);
806 	LockRelease(&tag, ShareLock, false);
807 }
808 
809 /*
810  * XactLockTableWaitErrorContextCb
811  *		Error context callback for transaction lock waits.
812  */
813 static void
814 XactLockTableWaitErrorCb(void *arg)
815 {
816 	XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
817 
818 	/*
819 	 * We would like to print schema name too, but that would require a
820 	 * syscache lookup.
821 	 */
822 	if (info->oper != XLTW_None &&
823 		ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
824 	{
825 		const char *cxt;
826 
827 		switch (info->oper)
828 		{
829 			case XLTW_Update:
830 				cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
831 				break;
832 			case XLTW_Delete:
833 				cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
834 				break;
835 			case XLTW_Lock:
836 				cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
837 				break;
838 			case XLTW_LockUpdated:
839 				cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
840 				break;
841 			case XLTW_InsertIndex:
842 				cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
843 				break;
844 			case XLTW_InsertIndexUnique:
845 				cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
846 				break;
847 			case XLTW_FetchUpdated:
848 				cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
849 				break;
850 			case XLTW_RecheckExclusionConstr:
851 				cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
852 				break;
853 
854 			default:
855 				return;
856 		}
857 
858 		errcontext(cxt,
859 				   ItemPointerGetBlockNumber(info->ctid),
860 				   ItemPointerGetOffsetNumber(info->ctid),
861 				   RelationGetRelationName(info->rel));
862 	}
863 }
864 
865 /*
866  * WaitForLockersMultiple
867  *		Wait until no transaction holds locks that conflict with the given
868  *		locktags at the given lockmode.
869  *
870  * To do this, obtain the current list of lockers, and wait on their VXIDs
871  * until they are finished.
872  *
873  * Note we don't try to acquire the locks on the given locktags, only the
874  * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
875  * on the objects after we obtained our initial list of lockers, we will not
876  * wait for them.
877  */
878 void
879 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
880 {
881 	List	   *holders = NIL;
882 	ListCell   *lc;
883 	int			total = 0;
884 	int			done = 0;
885 
886 	/* Done if no locks to wait for */
887 	if (list_length(locktags) == 0)
888 		return;
889 
890 	/* Collect the transactions we need to wait on */
891 	foreach(lc, locktags)
892 	{
893 		LOCKTAG    *locktag = lfirst(lc);
894 		int			count;
895 
896 		holders = lappend(holders,
897 						  GetLockConflicts(locktag, lockmode,
898 										   progress ? &count : NULL));
899 		if (progress)
900 			total += count;
901 	}
902 
903 	if (progress)
904 		pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, total);
905 
906 	/*
907 	 * Note: GetLockConflicts() never reports our own xid, hence we need not
908 	 * check for that.  Also, prepared xacts are reported and awaited.
909 	 */
910 
911 	/* Finally wait for each such transaction to complete */
912 	foreach(lc, holders)
913 	{
914 		VirtualTransactionId *lockholders = lfirst(lc);
915 
916 		while (VirtualTransactionIdIsValid(*lockholders))
917 		{
918 			/* If requested, publish who we're going to wait for. */
919 			if (progress)
920 			{
921 				PGPROC	   *holder = BackendIdGetProc(lockholders->backendId);
922 
923 				if (holder)
924 					pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
925 												 holder->pid);
926 			}
927 			VirtualXactLock(*lockholders, true);
928 			lockholders++;
929 
930 			if (progress)
931 				pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, ++done);
932 		}
933 	}
934 	if (progress)
935 	{
936 		const int	index[] = {
937 			PROGRESS_WAITFOR_TOTAL,
938 			PROGRESS_WAITFOR_DONE,
939 			PROGRESS_WAITFOR_CURRENT_PID
940 		};
941 		const int64 values[] = {
942 			0, 0, 0
943 		};
944 
945 		pgstat_progress_update_multi_param(3, index, values);
946 	}
947 
948 	list_free_deep(holders);
949 }
950 
951 /*
952  * WaitForLockers
953  *
954  * Same as WaitForLockersMultiple, for a single lock tag.
955  */
956 void
957 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress)
958 {
959 	List	   *l;
960 
961 	l = list_make1(&heaplocktag);
962 	WaitForLockersMultiple(l, lockmode, progress);
963 	list_free(l);
964 }
965 
966 
967 /*
968  *		LockDatabaseObject
969  *
970  * Obtain a lock on a general object of the current database.  Don't use
971  * this for shared objects (such as tablespaces).  It's unwise to apply it
972  * to relations, also, since a lock taken this way will NOT conflict with
973  * locks taken via LockRelation and friends.
974  */
975 void
976 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
977 				   LOCKMODE lockmode)
978 {
979 	LOCKTAG		tag;
980 
981 	SET_LOCKTAG_OBJECT(tag,
982 					   MyDatabaseId,
983 					   classid,
984 					   objid,
985 					   objsubid);
986 
987 	(void) LockAcquire(&tag, lockmode, false, false);
988 
989 	/* Make sure syscaches are up-to-date with any changes we waited for */
990 	AcceptInvalidationMessages();
991 }
992 
993 /*
994  *		UnlockDatabaseObject
995  */
996 void
997 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
998 					 LOCKMODE lockmode)
999 {
1000 	LOCKTAG		tag;
1001 
1002 	SET_LOCKTAG_OBJECT(tag,
1003 					   MyDatabaseId,
1004 					   classid,
1005 					   objid,
1006 					   objsubid);
1007 
1008 	LockRelease(&tag, lockmode, false);
1009 }
1010 
1011 /*
1012  *		LockSharedObject
1013  *
1014  * Obtain a lock on a shared-across-databases object.
1015  */
1016 void
1017 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1018 				 LOCKMODE lockmode)
1019 {
1020 	LOCKTAG		tag;
1021 
1022 	SET_LOCKTAG_OBJECT(tag,
1023 					   InvalidOid,
1024 					   classid,
1025 					   objid,
1026 					   objsubid);
1027 
1028 	(void) LockAcquire(&tag, lockmode, false, false);
1029 
1030 	/* Make sure syscaches are up-to-date with any changes we waited for */
1031 	AcceptInvalidationMessages();
1032 }
1033 
1034 /*
1035  *		UnlockSharedObject
1036  */
1037 void
1038 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1039 				   LOCKMODE lockmode)
1040 {
1041 	LOCKTAG		tag;
1042 
1043 	SET_LOCKTAG_OBJECT(tag,
1044 					   InvalidOid,
1045 					   classid,
1046 					   objid,
1047 					   objsubid);
1048 
1049 	LockRelease(&tag, lockmode, false);
1050 }
1051 
1052 /*
1053  *		LockSharedObjectForSession
1054  *
1055  * Obtain a session-level lock on a shared-across-databases object.
1056  * See LockRelationIdForSession for notes about session-level locks.
1057  */
1058 void
1059 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1060 						   LOCKMODE lockmode)
1061 {
1062 	LOCKTAG		tag;
1063 
1064 	SET_LOCKTAG_OBJECT(tag,
1065 					   InvalidOid,
1066 					   classid,
1067 					   objid,
1068 					   objsubid);
1069 
1070 	(void) LockAcquire(&tag, lockmode, true, false);
1071 }
1072 
1073 /*
1074  *		UnlockSharedObjectForSession
1075  */
1076 void
1077 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1078 							 LOCKMODE lockmode)
1079 {
1080 	LOCKTAG		tag;
1081 
1082 	SET_LOCKTAG_OBJECT(tag,
1083 					   InvalidOid,
1084 					   classid,
1085 					   objid,
1086 					   objsubid);
1087 
1088 	LockRelease(&tag, lockmode, true);
1089 }
1090 
1091 
1092 /*
1093  * Append a description of a lockable object to buf.
1094  *
1095  * Ideally we would print names for the numeric values, but that requires
1096  * getting locks on system tables, which might cause problems since this is
1097  * typically used to report deadlock situations.
1098  */
1099 void
1100 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1101 {
1102 	switch ((LockTagType) tag->locktag_type)
1103 	{
1104 		case LOCKTAG_RELATION:
1105 			appendStringInfo(buf,
1106 							 _("relation %u of database %u"),
1107 							 tag->locktag_field2,
1108 							 tag->locktag_field1);
1109 			break;
1110 		case LOCKTAG_RELATION_EXTEND:
1111 			appendStringInfo(buf,
1112 							 _("extension of relation %u of database %u"),
1113 							 tag->locktag_field2,
1114 							 tag->locktag_field1);
1115 			break;
1116 		case LOCKTAG_DATABASE_FROZEN_IDS:
1117 			appendStringInfo(buf,
1118 							 _("pg_database.datfrozenxid of database %u"),
1119 							 tag->locktag_field1);
1120 			break;
1121 		case LOCKTAG_PAGE:
1122 			appendStringInfo(buf,
1123 							 _("page %u of relation %u of database %u"),
1124 							 tag->locktag_field3,
1125 							 tag->locktag_field2,
1126 							 tag->locktag_field1);
1127 			break;
1128 		case LOCKTAG_TUPLE:
1129 			appendStringInfo(buf,
1130 							 _("tuple (%u,%u) of relation %u of database %u"),
1131 							 tag->locktag_field3,
1132 							 tag->locktag_field4,
1133 							 tag->locktag_field2,
1134 							 tag->locktag_field1);
1135 			break;
1136 		case LOCKTAG_TRANSACTION:
1137 			appendStringInfo(buf,
1138 							 _("transaction %u"),
1139 							 tag->locktag_field1);
1140 			break;
1141 		case LOCKTAG_VIRTUALTRANSACTION:
1142 			appendStringInfo(buf,
1143 							 _("virtual transaction %d/%u"),
1144 							 tag->locktag_field1,
1145 							 tag->locktag_field2);
1146 			break;
1147 		case LOCKTAG_SPECULATIVE_TOKEN:
1148 			appendStringInfo(buf,
1149 							 _("speculative token %u of transaction %u"),
1150 							 tag->locktag_field2,
1151 							 tag->locktag_field1);
1152 			break;
1153 		case LOCKTAG_OBJECT:
1154 			appendStringInfo(buf,
1155 							 _("object %u of class %u of database %u"),
1156 							 tag->locktag_field3,
1157 							 tag->locktag_field2,
1158 							 tag->locktag_field1);
1159 			break;
1160 		case LOCKTAG_USERLOCK:
1161 			/* reserved for old contrib code, now on pgfoundry */
1162 			appendStringInfo(buf,
1163 							 _("user lock [%u,%u,%u]"),
1164 							 tag->locktag_field1,
1165 							 tag->locktag_field2,
1166 							 tag->locktag_field3);
1167 			break;
1168 		case LOCKTAG_ADVISORY:
1169 			appendStringInfo(buf,
1170 							 _("advisory lock [%u,%u,%u,%u]"),
1171 							 tag->locktag_field1,
1172 							 tag->locktag_field2,
1173 							 tag->locktag_field3,
1174 							 tag->locktag_field4);
1175 			break;
1176 		default:
1177 			appendStringInfo(buf,
1178 							 _("unrecognized locktag type %d"),
1179 							 (int) tag->locktag_type);
1180 			break;
1181 	}
1182 }
1183 
1184 /*
1185  * GetLockNameFromTagType
1186  *
1187  *	Given locktag type, return the corresponding lock name.
1188  */
1189 const char *
1190 GetLockNameFromTagType(uint16 locktag_type)
1191 {
1192 	if (locktag_type > LOCKTAG_LAST_TYPE)
1193 		return "???";
1194 	return LockTagTypeNames[locktag_type];
1195 }
1196