1 /*-------------------------------------------------------------------------
2  *
3  * lmgr.c
4  *	  POSTGRES lock manager code
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/lmgr/lmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "miscadmin.h"
23 #include "storage/lmgr.h"
24 #include "storage/procarray.h"
25 #include "utils/inval.h"
26 
27 
28 /*
29  * Per-backend counter for generating speculative insertion tokens.
30  *
31  * This may wrap around, but that's OK as it's only used for the short
32  * duration between inserting a tuple and checking that there are no (unique)
33  * constraint violations.  It's theoretically possible that a backend sees a
34  * tuple that was speculatively inserted by another backend, but before it has
35  * started waiting on the token, the other backend completes its insertion,
36  * and then then performs 2^32 unrelated insertions.  And after all that, the
37  * first backend finally calls SpeculativeInsertionLockAcquire(), with the
38  * intention of waiting for the first insertion to complete, but ends up
39  * waiting for the latest unrelated insertion instead.  Even then, nothing
40  * particularly bad happens: in the worst case they deadlock, causing one of
41  * the transactions to abort.
42  */
43 static uint32 speculativeInsertionToken = 0;
44 
45 
46 /*
47  * Struct to hold context info for transaction lock waits.
48  *
49  * 'oper' is the operation that needs to wait for the other transaction; 'rel'
50  * and 'ctid' specify the address of the tuple being waited for.
51  */
52 typedef struct XactLockTableWaitInfo
53 {
54 	XLTW_Oper	oper;
55 	Relation	rel;
56 	ItemPointer ctid;
57 } XactLockTableWaitInfo;
58 
59 static void XactLockTableWaitErrorCb(void *arg);
60 
61 /*
62  * RelationInitLockInfo
63  *		Initializes the lock information in a relation descriptor.
64  *
65  *		relcache.c must call this during creation of any reldesc.
66  */
67 void
RelationInitLockInfo(Relation relation)68 RelationInitLockInfo(Relation relation)
69 {
70 	Assert(RelationIsValid(relation));
71 	Assert(OidIsValid(RelationGetRelid(relation)));
72 
73 	relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
74 
75 	if (relation->rd_rel->relisshared)
76 		relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
77 	else
78 		relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
79 }
80 
81 /*
82  * SetLocktagRelationOid
83  *		Set up a locktag for a relation, given only relation OID
84  */
85 static inline void
SetLocktagRelationOid(LOCKTAG * tag,Oid relid)86 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
87 {
88 	Oid			dbid;
89 
90 	if (IsSharedRelation(relid))
91 		dbid = InvalidOid;
92 	else
93 		dbid = MyDatabaseId;
94 
95 	SET_LOCKTAG_RELATION(*tag, dbid, relid);
96 }
97 
98 /*
99  *		LockRelationOid
100  *
101  * Lock a relation given only its OID.  This should generally be used
102  * before attempting to open the relation's relcache entry.
103  */
104 void
LockRelationOid(Oid relid,LOCKMODE lockmode)105 LockRelationOid(Oid relid, LOCKMODE lockmode)
106 {
107 	LOCKTAG		tag;
108 	LOCALLOCK  *locallock;
109 	LockAcquireResult res;
110 
111 	SetLocktagRelationOid(&tag, relid);
112 
113 	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
114 
115 	/*
116 	 * Now that we have the lock, check for invalidation messages, so that we
117 	 * will update or flush any stale relcache entry before we try to use it.
118 	 * RangeVarGetRelid() specifically relies on us for this.  We can skip
119 	 * this in the not-uncommon case that we already had the same type of lock
120 	 * being requested, since then no one else could have modified the
121 	 * relcache entry in an undesirable way.  (In the case where our own xact
122 	 * modifies the rel, the relcache update happens via
123 	 * CommandCounterIncrement, not here.)
124 	 *
125 	 * However, in corner cases where code acts on tables (usually catalogs)
126 	 * recursively, we might get here while still processing invalidation
127 	 * messages in some outer execution of this function or a sibling.  The
128 	 * "cleared" status of the lock tells us whether we really are done
129 	 * absorbing relevant inval messages.
130 	 */
131 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
132 	{
133 		AcceptInvalidationMessages();
134 		MarkLockClear(locallock);
135 	}
136 }
137 
138 /*
139  *		ConditionalLockRelationOid
140  *
141  * As above, but only lock if we can get the lock without blocking.
142  * Returns TRUE iff the lock was acquired.
143  *
144  * NOTE: we do not currently need conditional versions of all the
145  * LockXXX routines in this file, but they could easily be added if needed.
146  */
147 bool
ConditionalLockRelationOid(Oid relid,LOCKMODE lockmode)148 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
149 {
150 	LOCKTAG		tag;
151 	LOCALLOCK  *locallock;
152 	LockAcquireResult res;
153 
154 	SetLocktagRelationOid(&tag, relid);
155 
156 	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
157 
158 	if (res == LOCKACQUIRE_NOT_AVAIL)
159 		return false;
160 
161 	/*
162 	 * Now that we have the lock, check for invalidation messages; see notes
163 	 * in LockRelationOid.
164 	 */
165 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
166 	{
167 		AcceptInvalidationMessages();
168 		MarkLockClear(locallock);
169 	}
170 
171 	return true;
172 }
173 
174 /*
175  *		UnlockRelationId
176  *
177  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
178  * for speed reasons.
179  */
180 void
UnlockRelationId(LockRelId * relid,LOCKMODE lockmode)181 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
182 {
183 	LOCKTAG		tag;
184 
185 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
186 
187 	LockRelease(&tag, lockmode, false);
188 }
189 
190 /*
191  *		UnlockRelationOid
192  *
193  * Unlock, given only a relation Oid.  Use UnlockRelationId if you can.
194  */
195 void
UnlockRelationOid(Oid relid,LOCKMODE lockmode)196 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
197 {
198 	LOCKTAG		tag;
199 
200 	SetLocktagRelationOid(&tag, relid);
201 
202 	LockRelease(&tag, lockmode, false);
203 }
204 
205 /*
206  *		LockRelation
207  *
208  * This is a convenience routine for acquiring an additional lock on an
209  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
210  * and then lock with this.
211  */
212 void
LockRelation(Relation relation,LOCKMODE lockmode)213 LockRelation(Relation relation, LOCKMODE lockmode)
214 {
215 	LOCKTAG		tag;
216 	LOCALLOCK  *locallock;
217 	LockAcquireResult res;
218 
219 	SET_LOCKTAG_RELATION(tag,
220 						 relation->rd_lockInfo.lockRelId.dbId,
221 						 relation->rd_lockInfo.lockRelId.relId);
222 
223 	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
224 
225 	/*
226 	 * Now that we have the lock, check for invalidation messages; see notes
227 	 * in LockRelationOid.
228 	 */
229 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
230 	{
231 		AcceptInvalidationMessages();
232 		MarkLockClear(locallock);
233 	}
234 }
235 
236 /*
237  *		ConditionalLockRelation
238  *
239  * This is a convenience routine for acquiring an additional lock on an
240  * already-open relation.  Never try to do "relation_open(foo, NoLock)"
241  * and then lock with this.
242  */
243 bool
ConditionalLockRelation(Relation relation,LOCKMODE lockmode)244 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
245 {
246 	LOCKTAG		tag;
247 	LOCALLOCK  *locallock;
248 	LockAcquireResult res;
249 
250 	SET_LOCKTAG_RELATION(tag,
251 						 relation->rd_lockInfo.lockRelId.dbId,
252 						 relation->rd_lockInfo.lockRelId.relId);
253 
254 	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
255 
256 	if (res == LOCKACQUIRE_NOT_AVAIL)
257 		return false;
258 
259 	/*
260 	 * Now that we have the lock, check for invalidation messages; see notes
261 	 * in LockRelationOid.
262 	 */
263 	if (res != LOCKACQUIRE_ALREADY_CLEAR)
264 	{
265 		AcceptInvalidationMessages();
266 		MarkLockClear(locallock);
267 	}
268 
269 	return true;
270 }
271 
272 /*
273  *		UnlockRelation
274  *
275  * This is a convenience routine for unlocking a relation without also
276  * closing it.
277  */
278 void
UnlockRelation(Relation relation,LOCKMODE lockmode)279 UnlockRelation(Relation relation, LOCKMODE lockmode)
280 {
281 	LOCKTAG		tag;
282 
283 	SET_LOCKTAG_RELATION(tag,
284 						 relation->rd_lockInfo.lockRelId.dbId,
285 						 relation->rd_lockInfo.lockRelId.relId);
286 
287 	LockRelease(&tag, lockmode, false);
288 }
289 
290 /*
291  *		LockHasWaitersRelation
292  *
293  * This is a functiion to check if someone else is waiting on a
294  * lock, we are currently holding.
295  */
296 bool
LockHasWaitersRelation(Relation relation,LOCKMODE lockmode)297 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
298 {
299 	LOCKTAG		tag;
300 
301 	SET_LOCKTAG_RELATION(tag,
302 						 relation->rd_lockInfo.lockRelId.dbId,
303 						 relation->rd_lockInfo.lockRelId.relId);
304 
305 	return LockHasWaiters(&tag, lockmode, false);
306 }
307 
308 /*
309  *		LockRelationIdForSession
310  *
311  * This routine grabs a session-level lock on the target relation.  The
312  * session lock persists across transaction boundaries.  It will be removed
313  * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
314  * or if the backend exits.
315  *
316  * Note that one should also grab a transaction-level lock on the rel
317  * in any transaction that actually uses the rel, to ensure that the
318  * relcache entry is up to date.
319  */
320 void
LockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)321 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
322 {
323 	LOCKTAG		tag;
324 
325 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
326 
327 	(void) LockAcquire(&tag, lockmode, true, false);
328 }
329 
330 /*
331  *		UnlockRelationIdForSession
332  */
333 void
UnlockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)334 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
335 {
336 	LOCKTAG		tag;
337 
338 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
339 
340 	LockRelease(&tag, lockmode, true);
341 }
342 
343 /*
344  *		LockRelationForExtension
345  *
346  * This lock tag is used to interlock addition of pages to relations.
347  * We need such locking because bufmgr/smgr definition of P_NEW is not
348  * race-condition-proof.
349  *
350  * We assume the caller is already holding some type of regular lock on
351  * the relation, so no AcceptInvalidationMessages call is needed here.
352  */
353 void
LockRelationForExtension(Relation relation,LOCKMODE lockmode)354 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
355 {
356 	LOCKTAG		tag;
357 
358 	SET_LOCKTAG_RELATION_EXTEND(tag,
359 								relation->rd_lockInfo.lockRelId.dbId,
360 								relation->rd_lockInfo.lockRelId.relId);
361 
362 	(void) LockAcquire(&tag, lockmode, false, false);
363 }
364 
365 /*
366  *		ConditionalLockRelationForExtension
367  *
368  * As above, but only lock if we can get the lock without blocking.
369  * Returns TRUE iff the lock was acquired.
370  */
371 bool
ConditionalLockRelationForExtension(Relation relation,LOCKMODE lockmode)372 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
373 {
374 	LOCKTAG		tag;
375 
376 	SET_LOCKTAG_RELATION_EXTEND(tag,
377 								relation->rd_lockInfo.lockRelId.dbId,
378 								relation->rd_lockInfo.lockRelId.relId);
379 
380 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
381 }
382 
383 /*
384  *		RelationExtensionLockWaiterCount
385  *
386  * Count the number of processes waiting for the given relation extension lock.
387  */
388 int
RelationExtensionLockWaiterCount(Relation relation)389 RelationExtensionLockWaiterCount(Relation relation)
390 {
391 	LOCKTAG		tag;
392 
393 	SET_LOCKTAG_RELATION_EXTEND(tag,
394 								relation->rd_lockInfo.lockRelId.dbId,
395 								relation->rd_lockInfo.lockRelId.relId);
396 
397 	return LockWaiterCount(&tag);
398 }
399 
400 /*
401  *		UnlockRelationForExtension
402  */
403 void
UnlockRelationForExtension(Relation relation,LOCKMODE lockmode)404 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
405 {
406 	LOCKTAG		tag;
407 
408 	SET_LOCKTAG_RELATION_EXTEND(tag,
409 								relation->rd_lockInfo.lockRelId.dbId,
410 								relation->rd_lockInfo.lockRelId.relId);
411 
412 	LockRelease(&tag, lockmode, false);
413 }
414 
415 /*
416  *		LockDatabaseFrozenIds
417  *
418  * This allows one backend per database to execute vac_update_datfrozenxid().
419  */
420 void
LockDatabaseFrozenIds(LOCKMODE lockmode)421 LockDatabaseFrozenIds(LOCKMODE lockmode)
422 {
423 	LOCKTAG		tag;
424 
425 	SET_LOCKTAG_DATABASE_FROZEN_IDS(tag, MyDatabaseId);
426 
427 	(void) LockAcquire(&tag, lockmode, false, false);
428 }
429 
430 /*
431  *		LockPage
432  *
433  * Obtain a page-level lock.  This is currently used by some index access
434  * methods to lock individual index pages.
435  */
436 void
LockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)437 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
438 {
439 	LOCKTAG		tag;
440 
441 	SET_LOCKTAG_PAGE(tag,
442 					 relation->rd_lockInfo.lockRelId.dbId,
443 					 relation->rd_lockInfo.lockRelId.relId,
444 					 blkno);
445 
446 	(void) LockAcquire(&tag, lockmode, false, false);
447 }
448 
449 /*
450  *		ConditionalLockPage
451  *
452  * As above, but only lock if we can get the lock without blocking.
453  * Returns TRUE iff the lock was acquired.
454  */
455 bool
ConditionalLockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)456 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
457 {
458 	LOCKTAG		tag;
459 
460 	SET_LOCKTAG_PAGE(tag,
461 					 relation->rd_lockInfo.lockRelId.dbId,
462 					 relation->rd_lockInfo.lockRelId.relId,
463 					 blkno);
464 
465 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
466 }
467 
468 /*
469  *		UnlockPage
470  */
471 void
UnlockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)472 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
473 {
474 	LOCKTAG		tag;
475 
476 	SET_LOCKTAG_PAGE(tag,
477 					 relation->rd_lockInfo.lockRelId.dbId,
478 					 relation->rd_lockInfo.lockRelId.relId,
479 					 blkno);
480 
481 	LockRelease(&tag, lockmode, false);
482 }
483 
484 /*
485  *		LockTuple
486  *
487  * Obtain a tuple-level lock.  This is used in a less-than-intuitive fashion
488  * because we can't afford to keep a separate lock in shared memory for every
489  * tuple.  See heap_lock_tuple before using this!
490  */
491 void
LockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)492 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
493 {
494 	LOCKTAG		tag;
495 
496 	SET_LOCKTAG_TUPLE(tag,
497 					  relation->rd_lockInfo.lockRelId.dbId,
498 					  relation->rd_lockInfo.lockRelId.relId,
499 					  ItemPointerGetBlockNumber(tid),
500 					  ItemPointerGetOffsetNumber(tid));
501 
502 	(void) LockAcquire(&tag, lockmode, false, false);
503 }
504 
505 /*
506  *		ConditionalLockTuple
507  *
508  * As above, but only lock if we can get the lock without blocking.
509  * Returns TRUE iff the lock was acquired.
510  */
511 bool
ConditionalLockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)512 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
513 {
514 	LOCKTAG		tag;
515 
516 	SET_LOCKTAG_TUPLE(tag,
517 					  relation->rd_lockInfo.lockRelId.dbId,
518 					  relation->rd_lockInfo.lockRelId.relId,
519 					  ItemPointerGetBlockNumber(tid),
520 					  ItemPointerGetOffsetNumber(tid));
521 
522 	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
523 }
524 
525 /*
526  *		UnlockTuple
527  */
528 void
UnlockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)529 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
530 {
531 	LOCKTAG		tag;
532 
533 	SET_LOCKTAG_TUPLE(tag,
534 					  relation->rd_lockInfo.lockRelId.dbId,
535 					  relation->rd_lockInfo.lockRelId.relId,
536 					  ItemPointerGetBlockNumber(tid),
537 					  ItemPointerGetOffsetNumber(tid));
538 
539 	LockRelease(&tag, lockmode, false);
540 }
541 
542 /*
543  *		XactLockTableInsert
544  *
545  * Insert a lock showing that the given transaction ID is running ---
546  * this is done when an XID is acquired by a transaction or subtransaction.
547  * The lock can then be used to wait for the transaction to finish.
548  */
549 void
XactLockTableInsert(TransactionId xid)550 XactLockTableInsert(TransactionId xid)
551 {
552 	LOCKTAG		tag;
553 
554 	SET_LOCKTAG_TRANSACTION(tag, xid);
555 
556 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
557 }
558 
559 /*
560  *		XactLockTableDelete
561  *
562  * Delete the lock showing that the given transaction ID is running.
563  * (This is never used for main transaction IDs; those locks are only
564  * released implicitly at transaction end.  But we do use it for subtrans IDs.)
565  */
566 void
XactLockTableDelete(TransactionId xid)567 XactLockTableDelete(TransactionId xid)
568 {
569 	LOCKTAG		tag;
570 
571 	SET_LOCKTAG_TRANSACTION(tag, xid);
572 
573 	LockRelease(&tag, ExclusiveLock, false);
574 }
575 
576 /*
577  *		XactLockTableWait
578  *
579  * Wait for the specified transaction to commit or abort.  If an operation
580  * is specified, an error context callback is set up.  If 'oper' is passed as
581  * None, no error context callback is set up.
582  *
583  * Note that this does the right thing for subtransactions: if we wait on a
584  * subtransaction, we will exit as soon as it aborts or its top parent commits.
585  * It takes some extra work to ensure this, because to save on shared memory
586  * the XID lock of a subtransaction is released when it ends, whether
587  * successfully or unsuccessfully.  So we have to check if it's "still running"
588  * and if so wait for its parent.
589  */
590 void
XactLockTableWait(TransactionId xid,Relation rel,ItemPointer ctid,XLTW_Oper oper)591 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
592 				  XLTW_Oper oper)
593 {
594 	LOCKTAG		tag;
595 	XactLockTableWaitInfo info;
596 	ErrorContextCallback callback;
597 	bool		first = true;
598 
599 	/*
600 	 * If an operation is specified, set up our verbose error context
601 	 * callback.
602 	 */
603 	if (oper != XLTW_None)
604 	{
605 		Assert(RelationIsValid(rel));
606 		Assert(ItemPointerIsValid(ctid));
607 
608 		info.rel = rel;
609 		info.ctid = ctid;
610 		info.oper = oper;
611 
612 		callback.callback = XactLockTableWaitErrorCb;
613 		callback.arg = &info;
614 		callback.previous = error_context_stack;
615 		error_context_stack = &callback;
616 	}
617 
618 	for (;;)
619 	{
620 		Assert(TransactionIdIsValid(xid));
621 		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
622 
623 		SET_LOCKTAG_TRANSACTION(tag, xid);
624 
625 		(void) LockAcquire(&tag, ShareLock, false, false);
626 
627 		LockRelease(&tag, ShareLock, false);
628 
629 		if (!TransactionIdIsInProgress(xid))
630 			break;
631 
632 		/*
633 		 * If the Xid belonged to a subtransaction, then the lock would have
634 		 * gone away as soon as it was finished; for correct tuple visibility,
635 		 * the right action is to wait on its parent transaction to go away.
636 		 * But instead of going levels up one by one, we can just wait for the
637 		 * topmost transaction to finish with the same end result, which also
638 		 * incurs less locktable traffic.
639 		 *
640 		 * Some uses of this function don't involve tuple visibility -- such
641 		 * as when building snapshots for logical decoding.  It is possible to
642 		 * see a transaction in ProcArray before it registers itself in the
643 		 * locktable.  The topmost transaction in that case is the same xid,
644 		 * so we try again after a short sleep.  (Don't sleep the first time
645 		 * through, to avoid slowing down the normal case.)
646 		 */
647 		if (!first)
648 			pg_usleep(1000L);
649 		first = false;
650 		xid = SubTransGetTopmostTransaction(xid);
651 	}
652 
653 	if (oper != XLTW_None)
654 		error_context_stack = callback.previous;
655 }
656 
657 /*
658  *		ConditionalXactLockTableWait
659  *
660  * As above, but only lock if we can get the lock without blocking.
661  * Returns TRUE if the lock was acquired.
662  */
663 bool
ConditionalXactLockTableWait(TransactionId xid)664 ConditionalXactLockTableWait(TransactionId xid)
665 {
666 	LOCKTAG		tag;
667 	bool		first = true;
668 
669 	for (;;)
670 	{
671 		Assert(TransactionIdIsValid(xid));
672 		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
673 
674 		SET_LOCKTAG_TRANSACTION(tag, xid);
675 
676 		if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
677 			return false;
678 
679 		LockRelease(&tag, ShareLock, false);
680 
681 		if (!TransactionIdIsInProgress(xid))
682 			break;
683 
684 		/* See XactLockTableWait about this case */
685 		if (!first)
686 			pg_usleep(1000L);
687 		first = false;
688 		xid = SubTransGetTopmostTransaction(xid);
689 	}
690 
691 	return true;
692 }
693 
694 /*
695  *		SpeculativeInsertionLockAcquire
696  *
697  * Insert a lock showing that the given transaction ID is inserting a tuple,
698  * but hasn't yet decided whether it's going to keep it.  The lock can then be
699  * used to wait for the decision to go ahead with the insertion, or aborting
700  * it.
701  *
702  * The token is used to distinguish multiple insertions by the same
703  * transaction.  It is returned to caller.
704  */
705 uint32
SpeculativeInsertionLockAcquire(TransactionId xid)706 SpeculativeInsertionLockAcquire(TransactionId xid)
707 {
708 	LOCKTAG		tag;
709 
710 	speculativeInsertionToken++;
711 
712 	/*
713 	 * Check for wrap-around. Zero means no token is held, so don't use that.
714 	 */
715 	if (speculativeInsertionToken == 0)
716 		speculativeInsertionToken = 1;
717 
718 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
719 
720 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
721 
722 	return speculativeInsertionToken;
723 }
724 
725 /*
726  *		SpeculativeInsertionLockRelease
727  *
728  * Delete the lock showing that the given transaction is speculatively
729  * inserting a tuple.
730  */
731 void
SpeculativeInsertionLockRelease(TransactionId xid)732 SpeculativeInsertionLockRelease(TransactionId xid)
733 {
734 	LOCKTAG		tag;
735 
736 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
737 
738 	LockRelease(&tag, ExclusiveLock, false);
739 }
740 
741 /*
742  *		SpeculativeInsertionWait
743  *
744  * Wait for the specified transaction to finish or abort the insertion of a
745  * tuple.
746  */
747 void
SpeculativeInsertionWait(TransactionId xid,uint32 token)748 SpeculativeInsertionWait(TransactionId xid, uint32 token)
749 {
750 	LOCKTAG		tag;
751 
752 	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
753 
754 	Assert(TransactionIdIsValid(xid));
755 	Assert(token != 0);
756 
757 	(void) LockAcquire(&tag, ShareLock, false, false);
758 	LockRelease(&tag, ShareLock, false);
759 }
760 
761 /*
762  * XactLockTableWaitErrorContextCb
763  *		Error context callback for transaction lock waits.
764  */
765 static void
XactLockTableWaitErrorCb(void * arg)766 XactLockTableWaitErrorCb(void *arg)
767 {
768 	XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
769 
770 	/*
771 	 * We would like to print schema name too, but that would require a
772 	 * syscache lookup.
773 	 */
774 	if (info->oper != XLTW_None &&
775 		ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
776 	{
777 		const char *cxt;
778 
779 		switch (info->oper)
780 		{
781 			case XLTW_Update:
782 				cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
783 				break;
784 			case XLTW_Delete:
785 				cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
786 				break;
787 			case XLTW_Lock:
788 				cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
789 				break;
790 			case XLTW_LockUpdated:
791 				cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
792 				break;
793 			case XLTW_InsertIndex:
794 				cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
795 				break;
796 			case XLTW_InsertIndexUnique:
797 				cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
798 				break;
799 			case XLTW_FetchUpdated:
800 				cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
801 				break;
802 			case XLTW_RecheckExclusionConstr:
803 				cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
804 				break;
805 
806 			default:
807 				return;
808 		}
809 
810 		errcontext(cxt,
811 				   ItemPointerGetBlockNumber(info->ctid),
812 				   ItemPointerGetOffsetNumber(info->ctid),
813 				   RelationGetRelationName(info->rel));
814 	}
815 }
816 
817 /*
818  * WaitForLockersMultiple
819  *		Wait until no transaction holds locks that conflict with the given
820  *		locktags at the given lockmode.
821  *
822  * To do this, obtain the current list of lockers, and wait on their VXIDs
823  * until they are finished.
824  *
825  * Note we don't try to acquire the locks on the given locktags, only the
826  * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
827  * on the objects after we obtained our initial list of lockers, we will not
828  * wait for them.
829  */
830 void
WaitForLockersMultiple(List * locktags,LOCKMODE lockmode)831 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode)
832 {
833 	List	   *holders = NIL;
834 	ListCell   *lc;
835 
836 	/* Done if no locks to wait for */
837 	if (list_length(locktags) == 0)
838 		return;
839 
840 	/* Collect the transactions we need to wait on */
841 	foreach(lc, locktags)
842 	{
843 		LOCKTAG    *locktag = lfirst(lc);
844 
845 		holders = lappend(holders, GetLockConflicts(locktag, lockmode));
846 	}
847 
848 	/*
849 	 * Note: GetLockConflicts() never reports our own xid, hence we need not
850 	 * check for that.  Also, prepared xacts are reported and awaited.
851 	 */
852 
853 	/* Finally wait for each such transaction to complete */
854 	foreach(lc, holders)
855 	{
856 		VirtualTransactionId *lockholders = lfirst(lc);
857 
858 		while (VirtualTransactionIdIsValid(*lockholders))
859 		{
860 			VirtualXactLock(*lockholders, true);
861 			lockholders++;
862 		}
863 	}
864 
865 	list_free_deep(holders);
866 }
867 
868 /*
869  * WaitForLockers
870  *
871  * Same as WaitForLockersMultiple, for a single lock tag.
872  */
873 void
WaitForLockers(LOCKTAG heaplocktag,LOCKMODE lockmode)874 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode)
875 {
876 	List	   *l;
877 
878 	l = list_make1(&heaplocktag);
879 	WaitForLockersMultiple(l, lockmode);
880 	list_free(l);
881 }
882 
883 
884 /*
885  *		LockDatabaseObject
886  *
887  * Obtain a lock on a general object of the current database.  Don't use
888  * this for shared objects (such as tablespaces).  It's unwise to apply it
889  * to relations, also, since a lock taken this way will NOT conflict with
890  * locks taken via LockRelation and friends.
891  */
892 void
LockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)893 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
894 				   LOCKMODE lockmode)
895 {
896 	LOCKTAG		tag;
897 
898 	SET_LOCKTAG_OBJECT(tag,
899 					   MyDatabaseId,
900 					   classid,
901 					   objid,
902 					   objsubid);
903 
904 	(void) LockAcquire(&tag, lockmode, false, false);
905 
906 	/* Make sure syscaches are up-to-date with any changes we waited for */
907 	AcceptInvalidationMessages();
908 }
909 
910 /*
911  *		UnlockDatabaseObject
912  */
913 void
UnlockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)914 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
915 					 LOCKMODE lockmode)
916 {
917 	LOCKTAG		tag;
918 
919 	SET_LOCKTAG_OBJECT(tag,
920 					   MyDatabaseId,
921 					   classid,
922 					   objid,
923 					   objsubid);
924 
925 	LockRelease(&tag, lockmode, false);
926 }
927 
928 /*
929  *		LockSharedObject
930  *
931  * Obtain a lock on a shared-across-databases object.
932  */
933 void
LockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)934 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
935 				 LOCKMODE lockmode)
936 {
937 	LOCKTAG		tag;
938 
939 	SET_LOCKTAG_OBJECT(tag,
940 					   InvalidOid,
941 					   classid,
942 					   objid,
943 					   objsubid);
944 
945 	(void) LockAcquire(&tag, lockmode, false, false);
946 
947 	/* Make sure syscaches are up-to-date with any changes we waited for */
948 	AcceptInvalidationMessages();
949 }
950 
951 /*
952  *		UnlockSharedObject
953  */
954 void
UnlockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)955 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
956 				   LOCKMODE lockmode)
957 {
958 	LOCKTAG		tag;
959 
960 	SET_LOCKTAG_OBJECT(tag,
961 					   InvalidOid,
962 					   classid,
963 					   objid,
964 					   objsubid);
965 
966 	LockRelease(&tag, lockmode, false);
967 }
968 
969 /*
970  *		LockSharedObjectForSession
971  *
972  * Obtain a session-level lock on a shared-across-databases object.
973  * See LockRelationIdForSession for notes about session-level locks.
974  */
975 void
LockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)976 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
977 						   LOCKMODE lockmode)
978 {
979 	LOCKTAG		tag;
980 
981 	SET_LOCKTAG_OBJECT(tag,
982 					   InvalidOid,
983 					   classid,
984 					   objid,
985 					   objsubid);
986 
987 	(void) LockAcquire(&tag, lockmode, true, false);
988 }
989 
990 /*
991  *		UnlockSharedObjectForSession
992  */
993 void
UnlockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)994 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
995 							 LOCKMODE lockmode)
996 {
997 	LOCKTAG		tag;
998 
999 	SET_LOCKTAG_OBJECT(tag,
1000 					   InvalidOid,
1001 					   classid,
1002 					   objid,
1003 					   objsubid);
1004 
1005 	LockRelease(&tag, lockmode, true);
1006 }
1007 
1008 
1009 /*
1010  * Append a description of a lockable object to buf.
1011  *
1012  * Ideally we would print names for the numeric values, but that requires
1013  * getting locks on system tables, which might cause problems since this is
1014  * typically used to report deadlock situations.
1015  */
1016 void
DescribeLockTag(StringInfo buf,const LOCKTAG * tag)1017 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1018 {
1019 	switch ((LockTagType) tag->locktag_type)
1020 	{
1021 		case LOCKTAG_RELATION:
1022 			appendStringInfo(buf,
1023 							 _("relation %u of database %u"),
1024 							 tag->locktag_field2,
1025 							 tag->locktag_field1);
1026 			break;
1027 		case LOCKTAG_RELATION_EXTEND:
1028 			appendStringInfo(buf,
1029 							 _("extension of relation %u of database %u"),
1030 							 tag->locktag_field2,
1031 							 tag->locktag_field1);
1032 			break;
1033 		case LOCKTAG_DATABASE_FROZEN_IDS:
1034 			appendStringInfo(buf,
1035 							 _("pg_database.datfrozenxid of database %u"),
1036 							 tag->locktag_field1);
1037 			break;
1038 		case LOCKTAG_PAGE:
1039 			appendStringInfo(buf,
1040 							 _("page %u of relation %u of database %u"),
1041 							 tag->locktag_field3,
1042 							 tag->locktag_field2,
1043 							 tag->locktag_field1);
1044 			break;
1045 		case LOCKTAG_TUPLE:
1046 			appendStringInfo(buf,
1047 							 _("tuple (%u,%u) of relation %u of database %u"),
1048 							 tag->locktag_field3,
1049 							 tag->locktag_field4,
1050 							 tag->locktag_field2,
1051 							 tag->locktag_field1);
1052 			break;
1053 		case LOCKTAG_TRANSACTION:
1054 			appendStringInfo(buf,
1055 							 _("transaction %u"),
1056 							 tag->locktag_field1);
1057 			break;
1058 		case LOCKTAG_VIRTUALTRANSACTION:
1059 			appendStringInfo(buf,
1060 							 _("virtual transaction %d/%u"),
1061 							 tag->locktag_field1,
1062 							 tag->locktag_field2);
1063 			break;
1064 		case LOCKTAG_SPECULATIVE_TOKEN:
1065 			appendStringInfo(buf,
1066 							 _("speculative token %u of transaction %u"),
1067 							 tag->locktag_field2,
1068 							 tag->locktag_field1);
1069 			break;
1070 		case LOCKTAG_OBJECT:
1071 			appendStringInfo(buf,
1072 							 _("object %u of class %u of database %u"),
1073 							 tag->locktag_field3,
1074 							 tag->locktag_field2,
1075 							 tag->locktag_field1);
1076 			break;
1077 		case LOCKTAG_USERLOCK:
1078 			/* reserved for old contrib code, now on pgfoundry */
1079 			appendStringInfo(buf,
1080 							 _("user lock [%u,%u,%u]"),
1081 							 tag->locktag_field1,
1082 							 tag->locktag_field2,
1083 							 tag->locktag_field3);
1084 			break;
1085 		case LOCKTAG_ADVISORY:
1086 			appendStringInfo(buf,
1087 							 _("advisory lock [%u,%u,%u,%u]"),
1088 							 tag->locktag_field1,
1089 							 tag->locktag_field2,
1090 							 tag->locktag_field3,
1091 							 tag->locktag_field4);
1092 			break;
1093 		default:
1094 			appendStringInfo(buf,
1095 							 _("unrecognized locktag type %d"),
1096 							 (int) tag->locktag_type);
1097 			break;
1098 	}
1099 }
1100 
1101 /*
1102  * GetLockNameFromTagType
1103  *
1104  *	Given locktag type, return the corresponding lock name.
1105  */
1106 const char *
GetLockNameFromTagType(uint16 locktag_type)1107 GetLockNameFromTagType(uint16 locktag_type)
1108 {
1109 	if (locktag_type > LOCKTAG_LAST_TYPE)
1110 		return "???";
1111 	return LockTagTypeNames[locktag_type];
1112 }
1113