1 /*-------------------------------------------------------------------------
2 *
3 * lmgr.c
4 * POSTGRES lock manager code
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/lmgr/lmgr.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "commands/progress.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "storage/lmgr.h"
26 #include "storage/proc.h"
27 #include "storage/procarray.h"
28 #include "storage/sinvaladt.h"
29 #include "utils/inval.h"
30
31
32 /*
33 * Per-backend counter for generating speculative insertion tokens.
34 *
35 * This may wrap around, but that's OK as it's only used for the short
36 * duration between inserting a tuple and checking that there are no (unique)
37 * constraint violations. It's theoretically possible that a backend sees a
38 * tuple that was speculatively inserted by another backend, but before it has
39 * started waiting on the token, the other backend completes its insertion,
40 * and then performs 2^32 unrelated insertions. And after all that, the
41 * first backend finally calls SpeculativeInsertionLockAcquire(), with the
42 * intention of waiting for the first insertion to complete, but ends up
43 * waiting for the latest unrelated insertion instead. Even then, nothing
44 * particularly bad happens: in the worst case they deadlock, causing one of
45 * the transactions to abort.
46 */
47 static uint32 speculativeInsertionToken = 0;
48
49
50 /*
51 * Struct to hold context info for transaction lock waits.
52 *
53 * 'oper' is the operation that needs to wait for the other transaction; 'rel'
54 * and 'ctid' specify the address of the tuple being waited for.
55 */
56 typedef struct XactLockTableWaitInfo
57 {
58 XLTW_Oper oper;
59 Relation rel;
60 ItemPointer ctid;
61 } XactLockTableWaitInfo;
62
63 static void XactLockTableWaitErrorCb(void *arg);
64
65 /*
66 * RelationInitLockInfo
67 * Initializes the lock information in a relation descriptor.
68 *
69 * relcache.c must call this during creation of any reldesc.
70 */
71 void
RelationInitLockInfo(Relation relation)72 RelationInitLockInfo(Relation relation)
73 {
74 Assert(RelationIsValid(relation));
75 Assert(OidIsValid(RelationGetRelid(relation)));
76
77 relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
78
79 if (relation->rd_rel->relisshared)
80 relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
81 else
82 relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
83 }
84
85 /*
86 * SetLocktagRelationOid
87 * Set up a locktag for a relation, given only relation OID
88 */
89 static inline void
SetLocktagRelationOid(LOCKTAG * tag,Oid relid)90 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
91 {
92 Oid dbid;
93
94 if (IsSharedRelation(relid))
95 dbid = InvalidOid;
96 else
97 dbid = MyDatabaseId;
98
99 SET_LOCKTAG_RELATION(*tag, dbid, relid);
100 }
101
102 /*
103 * LockRelationOid
104 *
105 * Lock a relation given only its OID. This should generally be used
106 * before attempting to open the relation's relcache entry.
107 */
108 void
LockRelationOid(Oid relid,LOCKMODE lockmode)109 LockRelationOid(Oid relid, LOCKMODE lockmode)
110 {
111 LOCKTAG tag;
112 LOCALLOCK *locallock;
113 LockAcquireResult res;
114
115 SetLocktagRelationOid(&tag, relid);
116
117 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
118
119 /*
120 * Now that we have the lock, check for invalidation messages, so that we
121 * will update or flush any stale relcache entry before we try to use it.
122 * RangeVarGetRelid() specifically relies on us for this. We can skip
123 * this in the not-uncommon case that we already had the same type of lock
124 * being requested, since then no one else could have modified the
125 * relcache entry in an undesirable way. (In the case where our own xact
126 * modifies the rel, the relcache update happens via
127 * CommandCounterIncrement, not here.)
128 *
129 * However, in corner cases where code acts on tables (usually catalogs)
130 * recursively, we might get here while still processing invalidation
131 * messages in some outer execution of this function or a sibling. The
132 * "cleared" status of the lock tells us whether we really are done
133 * absorbing relevant inval messages.
134 */
135 if (res != LOCKACQUIRE_ALREADY_CLEAR)
136 {
137 AcceptInvalidationMessages();
138 MarkLockClear(locallock);
139 }
140 }
141
142 /*
143 * ConditionalLockRelationOid
144 *
145 * As above, but only lock if we can get the lock without blocking.
146 * Returns true iff the lock was acquired.
147 *
148 * NOTE: we do not currently need conditional versions of all the
149 * LockXXX routines in this file, but they could easily be added if needed.
150 */
151 bool
ConditionalLockRelationOid(Oid relid,LOCKMODE lockmode)152 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
153 {
154 LOCKTAG tag;
155 LOCALLOCK *locallock;
156 LockAcquireResult res;
157
158 SetLocktagRelationOid(&tag, relid);
159
160 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
161
162 if (res == LOCKACQUIRE_NOT_AVAIL)
163 return false;
164
165 /*
166 * Now that we have the lock, check for invalidation messages; see notes
167 * in LockRelationOid.
168 */
169 if (res != LOCKACQUIRE_ALREADY_CLEAR)
170 {
171 AcceptInvalidationMessages();
172 MarkLockClear(locallock);
173 }
174
175 return true;
176 }
177
178 /*
179 * UnlockRelationId
180 *
181 * Unlock, given a LockRelId. This is preferred over UnlockRelationOid
182 * for speed reasons.
183 */
184 void
UnlockRelationId(LockRelId * relid,LOCKMODE lockmode)185 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
186 {
187 LOCKTAG tag;
188
189 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
190
191 LockRelease(&tag, lockmode, false);
192 }
193
194 /*
195 * UnlockRelationOid
196 *
197 * Unlock, given only a relation Oid. Use UnlockRelationId if you can.
198 */
199 void
UnlockRelationOid(Oid relid,LOCKMODE lockmode)200 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
201 {
202 LOCKTAG tag;
203
204 SetLocktagRelationOid(&tag, relid);
205
206 LockRelease(&tag, lockmode, false);
207 }
208
209 /*
210 * LockRelation
211 *
212 * This is a convenience routine for acquiring an additional lock on an
213 * already-open relation. Never try to do "relation_open(foo, NoLock)"
214 * and then lock with this.
215 */
216 void
LockRelation(Relation relation,LOCKMODE lockmode)217 LockRelation(Relation relation, LOCKMODE lockmode)
218 {
219 LOCKTAG tag;
220 LOCALLOCK *locallock;
221 LockAcquireResult res;
222
223 SET_LOCKTAG_RELATION(tag,
224 relation->rd_lockInfo.lockRelId.dbId,
225 relation->rd_lockInfo.lockRelId.relId);
226
227 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
228
229 /*
230 * Now that we have the lock, check for invalidation messages; see notes
231 * in LockRelationOid.
232 */
233 if (res != LOCKACQUIRE_ALREADY_CLEAR)
234 {
235 AcceptInvalidationMessages();
236 MarkLockClear(locallock);
237 }
238 }
239
240 /*
241 * ConditionalLockRelation
242 *
243 * This is a convenience routine for acquiring an additional lock on an
244 * already-open relation. Never try to do "relation_open(foo, NoLock)"
245 * and then lock with this.
246 */
247 bool
ConditionalLockRelation(Relation relation,LOCKMODE lockmode)248 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
249 {
250 LOCKTAG tag;
251 LOCALLOCK *locallock;
252 LockAcquireResult res;
253
254 SET_LOCKTAG_RELATION(tag,
255 relation->rd_lockInfo.lockRelId.dbId,
256 relation->rd_lockInfo.lockRelId.relId);
257
258 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
259
260 if (res == LOCKACQUIRE_NOT_AVAIL)
261 return false;
262
263 /*
264 * Now that we have the lock, check for invalidation messages; see notes
265 * in LockRelationOid.
266 */
267 if (res != LOCKACQUIRE_ALREADY_CLEAR)
268 {
269 AcceptInvalidationMessages();
270 MarkLockClear(locallock);
271 }
272
273 return true;
274 }
275
276 /*
277 * UnlockRelation
278 *
279 * This is a convenience routine for unlocking a relation without also
280 * closing it.
281 */
282 void
UnlockRelation(Relation relation,LOCKMODE lockmode)283 UnlockRelation(Relation relation, LOCKMODE lockmode)
284 {
285 LOCKTAG tag;
286
287 SET_LOCKTAG_RELATION(tag,
288 relation->rd_lockInfo.lockRelId.dbId,
289 relation->rd_lockInfo.lockRelId.relId);
290
291 LockRelease(&tag, lockmode, false);
292 }
293
294 /*
295 * CheckRelationLockedByMe
296 *
297 * Returns true if current transaction holds a lock on 'relation' of mode
298 * 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK.
299 * ("Stronger" is defined as "numerically higher", which is a bit
300 * semantically dubious but is OK for the purposes we use this for.)
301 */
302 bool
CheckRelationLockedByMe(Relation relation,LOCKMODE lockmode,bool orstronger)303 CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
304 {
305 LOCKTAG tag;
306
307 SET_LOCKTAG_RELATION(tag,
308 relation->rd_lockInfo.lockRelId.dbId,
309 relation->rd_lockInfo.lockRelId.relId);
310
311 if (LockHeldByMe(&tag, lockmode))
312 return true;
313
314 if (orstronger)
315 {
316 LOCKMODE slockmode;
317
318 for (slockmode = lockmode + 1;
319 slockmode <= MaxLockMode;
320 slockmode++)
321 {
322 if (LockHeldByMe(&tag, slockmode))
323 {
324 #ifdef NOT_USED
325 /* Sometimes this might be useful for debugging purposes */
326 elog(WARNING, "lock mode %s substituted for %s on relation %s",
327 GetLockmodeName(tag.locktag_lockmethodid, slockmode),
328 GetLockmodeName(tag.locktag_lockmethodid, lockmode),
329 RelationGetRelationName(relation));
330 #endif
331 return true;
332 }
333 }
334 }
335
336 return false;
337 }
338
339 /*
340 * LockHasWaitersRelation
341 *
342 * This is a function to check whether someone else is waiting for a
343 * lock which we are currently holding.
344 */
345 bool
LockHasWaitersRelation(Relation relation,LOCKMODE lockmode)346 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
347 {
348 LOCKTAG tag;
349
350 SET_LOCKTAG_RELATION(tag,
351 relation->rd_lockInfo.lockRelId.dbId,
352 relation->rd_lockInfo.lockRelId.relId);
353
354 return LockHasWaiters(&tag, lockmode, false);
355 }
356
357 /*
358 * LockRelationIdForSession
359 *
360 * This routine grabs a session-level lock on the target relation. The
361 * session lock persists across transaction boundaries. It will be removed
362 * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
363 * or if the backend exits.
364 *
365 * Note that one should also grab a transaction-level lock on the rel
366 * in any transaction that actually uses the rel, to ensure that the
367 * relcache entry is up to date.
368 */
369 void
LockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)370 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
371 {
372 LOCKTAG tag;
373
374 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
375
376 (void) LockAcquire(&tag, lockmode, true, false);
377 }
378
379 /*
380 * UnlockRelationIdForSession
381 */
382 void
UnlockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)383 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
384 {
385 LOCKTAG tag;
386
387 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
388
389 LockRelease(&tag, lockmode, true);
390 }
391
392 /*
393 * LockRelationForExtension
394 *
395 * This lock tag is used to interlock addition of pages to relations.
396 * We need such locking because bufmgr/smgr definition of P_NEW is not
397 * race-condition-proof.
398 *
399 * We assume the caller is already holding some type of regular lock on
400 * the relation, so no AcceptInvalidationMessages call is needed here.
401 */
402 void
LockRelationForExtension(Relation relation,LOCKMODE lockmode)403 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
404 {
405 LOCKTAG tag;
406
407 SET_LOCKTAG_RELATION_EXTEND(tag,
408 relation->rd_lockInfo.lockRelId.dbId,
409 relation->rd_lockInfo.lockRelId.relId);
410
411 (void) LockAcquire(&tag, lockmode, false, false);
412 }
413
414 /*
415 * ConditionalLockRelationForExtension
416 *
417 * As above, but only lock if we can get the lock without blocking.
418 * Returns true iff the lock was acquired.
419 */
420 bool
ConditionalLockRelationForExtension(Relation relation,LOCKMODE lockmode)421 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
422 {
423 LOCKTAG tag;
424
425 SET_LOCKTAG_RELATION_EXTEND(tag,
426 relation->rd_lockInfo.lockRelId.dbId,
427 relation->rd_lockInfo.lockRelId.relId);
428
429 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
430 }
431
432 /*
433 * RelationExtensionLockWaiterCount
434 *
435 * Count the number of processes waiting for the given relation extension lock.
436 */
437 int
RelationExtensionLockWaiterCount(Relation relation)438 RelationExtensionLockWaiterCount(Relation relation)
439 {
440 LOCKTAG tag;
441
442 SET_LOCKTAG_RELATION_EXTEND(tag,
443 relation->rd_lockInfo.lockRelId.dbId,
444 relation->rd_lockInfo.lockRelId.relId);
445
446 return LockWaiterCount(&tag);
447 }
448
449 /*
450 * UnlockRelationForExtension
451 */
452 void
UnlockRelationForExtension(Relation relation,LOCKMODE lockmode)453 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
454 {
455 LOCKTAG tag;
456
457 SET_LOCKTAG_RELATION_EXTEND(tag,
458 relation->rd_lockInfo.lockRelId.dbId,
459 relation->rd_lockInfo.lockRelId.relId);
460
461 LockRelease(&tag, lockmode, false);
462 }
463
464 /*
465 * LockDatabaseFrozenIds
466 *
467 * This allows one backend per database to execute vac_update_datfrozenxid().
468 */
469 void
LockDatabaseFrozenIds(LOCKMODE lockmode)470 LockDatabaseFrozenIds(LOCKMODE lockmode)
471 {
472 LOCKTAG tag;
473
474 SET_LOCKTAG_DATABASE_FROZEN_IDS(tag, MyDatabaseId);
475
476 (void) LockAcquire(&tag, lockmode, false, false);
477 }
478
479 /*
480 * LockPage
481 *
482 * Obtain a page-level lock. This is currently used by some index access
483 * methods to lock individual index pages.
484 */
485 void
LockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)486 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
487 {
488 LOCKTAG tag;
489
490 SET_LOCKTAG_PAGE(tag,
491 relation->rd_lockInfo.lockRelId.dbId,
492 relation->rd_lockInfo.lockRelId.relId,
493 blkno);
494
495 (void) LockAcquire(&tag, lockmode, false, false);
496 }
497
498 /*
499 * ConditionalLockPage
500 *
501 * As above, but only lock if we can get the lock without blocking.
502 * Returns true iff the lock was acquired.
503 */
504 bool
ConditionalLockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)505 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
506 {
507 LOCKTAG tag;
508
509 SET_LOCKTAG_PAGE(tag,
510 relation->rd_lockInfo.lockRelId.dbId,
511 relation->rd_lockInfo.lockRelId.relId,
512 blkno);
513
514 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
515 }
516
517 /*
518 * UnlockPage
519 */
520 void
UnlockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)521 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
522 {
523 LOCKTAG tag;
524
525 SET_LOCKTAG_PAGE(tag,
526 relation->rd_lockInfo.lockRelId.dbId,
527 relation->rd_lockInfo.lockRelId.relId,
528 blkno);
529
530 LockRelease(&tag, lockmode, false);
531 }
532
533 /*
534 * LockTuple
535 *
536 * Obtain a tuple-level lock. This is used in a less-than-intuitive fashion
537 * because we can't afford to keep a separate lock in shared memory for every
538 * tuple. See heap_lock_tuple before using this!
539 */
540 void
LockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)541 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
542 {
543 LOCKTAG tag;
544
545 SET_LOCKTAG_TUPLE(tag,
546 relation->rd_lockInfo.lockRelId.dbId,
547 relation->rd_lockInfo.lockRelId.relId,
548 ItemPointerGetBlockNumber(tid),
549 ItemPointerGetOffsetNumber(tid));
550
551 (void) LockAcquire(&tag, lockmode, false, false);
552 }
553
554 /*
555 * ConditionalLockTuple
556 *
557 * As above, but only lock if we can get the lock without blocking.
558 * Returns true iff the lock was acquired.
559 */
560 bool
ConditionalLockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)561 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
562 {
563 LOCKTAG tag;
564
565 SET_LOCKTAG_TUPLE(tag,
566 relation->rd_lockInfo.lockRelId.dbId,
567 relation->rd_lockInfo.lockRelId.relId,
568 ItemPointerGetBlockNumber(tid),
569 ItemPointerGetOffsetNumber(tid));
570
571 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
572 }
573
574 /*
575 * UnlockTuple
576 */
577 void
UnlockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)578 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
579 {
580 LOCKTAG tag;
581
582 SET_LOCKTAG_TUPLE(tag,
583 relation->rd_lockInfo.lockRelId.dbId,
584 relation->rd_lockInfo.lockRelId.relId,
585 ItemPointerGetBlockNumber(tid),
586 ItemPointerGetOffsetNumber(tid));
587
588 LockRelease(&tag, lockmode, false);
589 }
590
591 /*
592 * XactLockTableInsert
593 *
594 * Insert a lock showing that the given transaction ID is running ---
595 * this is done when an XID is acquired by a transaction or subtransaction.
596 * The lock can then be used to wait for the transaction to finish.
597 */
598 void
XactLockTableInsert(TransactionId xid)599 XactLockTableInsert(TransactionId xid)
600 {
601 LOCKTAG tag;
602
603 SET_LOCKTAG_TRANSACTION(tag, xid);
604
605 (void) LockAcquire(&tag, ExclusiveLock, false, false);
606 }
607
608 /*
609 * XactLockTableDelete
610 *
611 * Delete the lock showing that the given transaction ID is running.
612 * (This is never used for main transaction IDs; those locks are only
613 * released implicitly at transaction end. But we do use it for subtrans IDs.)
614 */
615 void
XactLockTableDelete(TransactionId xid)616 XactLockTableDelete(TransactionId xid)
617 {
618 LOCKTAG tag;
619
620 SET_LOCKTAG_TRANSACTION(tag, xid);
621
622 LockRelease(&tag, ExclusiveLock, false);
623 }
624
625 /*
626 * XactLockTableWait
627 *
628 * Wait for the specified transaction to commit or abort. If an operation
629 * is specified, an error context callback is set up. If 'oper' is passed as
630 * None, no error context callback is set up.
631 *
632 * Note that this does the right thing for subtransactions: if we wait on a
633 * subtransaction, we will exit as soon as it aborts or its top parent commits.
634 * It takes some extra work to ensure this, because to save on shared memory
635 * the XID lock of a subtransaction is released when it ends, whether
636 * successfully or unsuccessfully. So we have to check if it's "still running"
637 * and if so wait for its parent.
638 */
639 void
XactLockTableWait(TransactionId xid,Relation rel,ItemPointer ctid,XLTW_Oper oper)640 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
641 XLTW_Oper oper)
642 {
643 LOCKTAG tag;
644 XactLockTableWaitInfo info;
645 ErrorContextCallback callback;
646 bool first = true;
647
648 /*
649 * If an operation is specified, set up our verbose error context
650 * callback.
651 */
652 if (oper != XLTW_None)
653 {
654 Assert(RelationIsValid(rel));
655 Assert(ItemPointerIsValid(ctid));
656
657 info.rel = rel;
658 info.ctid = ctid;
659 info.oper = oper;
660
661 callback.callback = XactLockTableWaitErrorCb;
662 callback.arg = &info;
663 callback.previous = error_context_stack;
664 error_context_stack = &callback;
665 }
666
667 for (;;)
668 {
669 Assert(TransactionIdIsValid(xid));
670 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
671
672 SET_LOCKTAG_TRANSACTION(tag, xid);
673
674 (void) LockAcquire(&tag, ShareLock, false, false);
675
676 LockRelease(&tag, ShareLock, false);
677
678 if (!TransactionIdIsInProgress(xid))
679 break;
680
681 /*
682 * If the Xid belonged to a subtransaction, then the lock would have
683 * gone away as soon as it was finished; for correct tuple visibility,
684 * the right action is to wait on its parent transaction to go away.
685 * But instead of going levels up one by one, we can just wait for the
686 * topmost transaction to finish with the same end result, which also
687 * incurs less locktable traffic.
688 *
689 * Some uses of this function don't involve tuple visibility -- such
690 * as when building snapshots for logical decoding. It is possible to
691 * see a transaction in ProcArray before it registers itself in the
692 * locktable. The topmost transaction in that case is the same xid,
693 * so we try again after a short sleep. (Don't sleep the first time
694 * through, to avoid slowing down the normal case.)
695 */
696 if (!first)
697 pg_usleep(1000L);
698 first = false;
699 xid = SubTransGetTopmostTransaction(xid);
700 }
701
702 if (oper != XLTW_None)
703 error_context_stack = callback.previous;
704 }
705
706 /*
707 * ConditionalXactLockTableWait
708 *
709 * As above, but only lock if we can get the lock without blocking.
710 * Returns true if the lock was acquired.
711 */
712 bool
ConditionalXactLockTableWait(TransactionId xid)713 ConditionalXactLockTableWait(TransactionId xid)
714 {
715 LOCKTAG tag;
716 bool first = true;
717
718 for (;;)
719 {
720 Assert(TransactionIdIsValid(xid));
721 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
722
723 SET_LOCKTAG_TRANSACTION(tag, xid);
724
725 if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
726 return false;
727
728 LockRelease(&tag, ShareLock, false);
729
730 if (!TransactionIdIsInProgress(xid))
731 break;
732
733 /* See XactLockTableWait about this case */
734 if (!first)
735 pg_usleep(1000L);
736 first = false;
737 xid = SubTransGetTopmostTransaction(xid);
738 }
739
740 return true;
741 }
742
743 /*
744 * SpeculativeInsertionLockAcquire
745 *
746 * Insert a lock showing that the given transaction ID is inserting a tuple,
747 * but hasn't yet decided whether it's going to keep it. The lock can then be
748 * used to wait for the decision to go ahead with the insertion, or aborting
749 * it.
750 *
751 * The token is used to distinguish multiple insertions by the same
752 * transaction. It is returned to caller.
753 */
754 uint32
SpeculativeInsertionLockAcquire(TransactionId xid)755 SpeculativeInsertionLockAcquire(TransactionId xid)
756 {
757 LOCKTAG tag;
758
759 speculativeInsertionToken++;
760
761 /*
762 * Check for wrap-around. Zero means no token is held, so don't use that.
763 */
764 if (speculativeInsertionToken == 0)
765 speculativeInsertionToken = 1;
766
767 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
768
769 (void) LockAcquire(&tag, ExclusiveLock, false, false);
770
771 return speculativeInsertionToken;
772 }
773
774 /*
775 * SpeculativeInsertionLockRelease
776 *
777 * Delete the lock showing that the given transaction is speculatively
778 * inserting a tuple.
779 */
780 void
SpeculativeInsertionLockRelease(TransactionId xid)781 SpeculativeInsertionLockRelease(TransactionId xid)
782 {
783 LOCKTAG tag;
784
785 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
786
787 LockRelease(&tag, ExclusiveLock, false);
788 }
789
790 /*
791 * SpeculativeInsertionWait
792 *
793 * Wait for the specified transaction to finish or abort the insertion of a
794 * tuple.
795 */
796 void
SpeculativeInsertionWait(TransactionId xid,uint32 token)797 SpeculativeInsertionWait(TransactionId xid, uint32 token)
798 {
799 LOCKTAG tag;
800
801 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
802
803 Assert(TransactionIdIsValid(xid));
804 Assert(token != 0);
805
806 (void) LockAcquire(&tag, ShareLock, false, false);
807 LockRelease(&tag, ShareLock, false);
808 }
809
810 /*
811 * XactLockTableWaitErrorCb
812 * Error context callback for transaction lock waits.
813 */
814 static void
XactLockTableWaitErrorCb(void * arg)815 XactLockTableWaitErrorCb(void *arg)
816 {
817 XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
818
819 /*
820 * We would like to print schema name too, but that would require a
821 * syscache lookup.
822 */
823 if (info->oper != XLTW_None &&
824 ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
825 {
826 const char *cxt;
827
828 switch (info->oper)
829 {
830 case XLTW_Update:
831 cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
832 break;
833 case XLTW_Delete:
834 cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
835 break;
836 case XLTW_Lock:
837 cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
838 break;
839 case XLTW_LockUpdated:
840 cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
841 break;
842 case XLTW_InsertIndex:
843 cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
844 break;
845 case XLTW_InsertIndexUnique:
846 cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
847 break;
848 case XLTW_FetchUpdated:
849 cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
850 break;
851 case XLTW_RecheckExclusionConstr:
852 cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
853 break;
854
855 default:
856 return;
857 }
858
859 errcontext(cxt,
860 ItemPointerGetBlockNumber(info->ctid),
861 ItemPointerGetOffsetNumber(info->ctid),
862 RelationGetRelationName(info->rel));
863 }
864 }
865
866 /*
867 * WaitForLockersMultiple
868 * Wait until no transaction holds locks that conflict with the given
869 * locktags at the given lockmode.
870 *
871 * To do this, obtain the current list of lockers, and wait on their VXIDs
872 * until they are finished.
873 *
874 * Note we don't try to acquire the locks on the given locktags, only the
875 * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
876 * on the objects after we obtained our initial list of lockers, we will not
877 * wait for them.
878 */
879 void
WaitForLockersMultiple(List * locktags,LOCKMODE lockmode,bool progress)880 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
881 {
882 List *holders = NIL;
883 ListCell *lc;
884 int total = 0;
885 int done = 0;
886
887 /* Done if no locks to wait for */
888 if (list_length(locktags) == 0)
889 return;
890
891 /* Collect the transactions we need to wait on */
892 foreach(lc, locktags)
893 {
894 LOCKTAG *locktag = lfirst(lc);
895 int count;
896
897 holders = lappend(holders,
898 GetLockConflicts(locktag, lockmode,
899 progress ? &count : NULL));
900 if (progress)
901 total += count;
902 }
903
904 if (progress)
905 pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, total);
906
907 /*
908 * Note: GetLockConflicts() never reports our own xid, hence we need not
909 * check for that. Also, prepared xacts are reported and awaited.
910 */
911
912 /* Finally wait for each such transaction to complete */
913 foreach(lc, holders)
914 {
915 VirtualTransactionId *lockholders = lfirst(lc);
916
917 while (VirtualTransactionIdIsValid(*lockholders))
918 {
919 /* If requested, publish who we're going to wait for. */
920 if (progress)
921 {
922 PGPROC *holder = BackendIdGetProc(lockholders->backendId);
923
924 if (holder)
925 pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
926 holder->pid);
927 }
928 VirtualXactLock(*lockholders, true);
929 lockholders++;
930
931 if (progress)
932 pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, ++done);
933 }
934 }
935 if (progress)
936 {
937 const int index[] = {
938 PROGRESS_WAITFOR_TOTAL,
939 PROGRESS_WAITFOR_DONE,
940 PROGRESS_WAITFOR_CURRENT_PID
941 };
942 const int64 values[] = {
943 0, 0, 0
944 };
945
946 pgstat_progress_update_multi_param(3, index, values);
947 }
948
949 list_free_deep(holders);
950 }
951
952 /*
953 * WaitForLockers
954 *
955 * Same as WaitForLockersMultiple, for a single lock tag.
956 */
957 void
WaitForLockers(LOCKTAG heaplocktag,LOCKMODE lockmode,bool progress)958 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress)
959 {
960 List *l;
961
962 l = list_make1(&heaplocktag);
963 WaitForLockersMultiple(l, lockmode, progress);
964 list_free(l);
965 }
966
967
968 /*
969 * LockDatabaseObject
970 *
971 * Obtain a lock on a general object of the current database. Don't use
972 * this for shared objects (such as tablespaces). It's unwise to apply it
973 * to relations, also, since a lock taken this way will NOT conflict with
974 * locks taken via LockRelation and friends.
975 */
976 void
LockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)977 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
978 LOCKMODE lockmode)
979 {
980 LOCKTAG tag;
981
982 SET_LOCKTAG_OBJECT(tag,
983 MyDatabaseId,
984 classid,
985 objid,
986 objsubid);
987
988 (void) LockAcquire(&tag, lockmode, false, false);
989
990 /* Make sure syscaches are up-to-date with any changes we waited for */
991 AcceptInvalidationMessages();
992 }
993
994 /*
995 * UnlockDatabaseObject
996 */
997 void
UnlockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)998 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
999 LOCKMODE lockmode)
1000 {
1001 LOCKTAG tag;
1002
1003 SET_LOCKTAG_OBJECT(tag,
1004 MyDatabaseId,
1005 classid,
1006 objid,
1007 objsubid);
1008
1009 LockRelease(&tag, lockmode, false);
1010 }
1011
1012 /*
1013 * LockSharedObject
1014 *
1015 * Obtain a lock on a shared-across-databases object.
1016 */
1017 void
LockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1018 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1019 LOCKMODE lockmode)
1020 {
1021 LOCKTAG tag;
1022
1023 SET_LOCKTAG_OBJECT(tag,
1024 InvalidOid,
1025 classid,
1026 objid,
1027 objsubid);
1028
1029 (void) LockAcquire(&tag, lockmode, false, false);
1030
1031 /* Make sure syscaches are up-to-date with any changes we waited for */
1032 AcceptInvalidationMessages();
1033 }
1034
1035 /*
1036 * UnlockSharedObject
1037 */
1038 void
UnlockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1039 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1040 LOCKMODE lockmode)
1041 {
1042 LOCKTAG tag;
1043
1044 SET_LOCKTAG_OBJECT(tag,
1045 InvalidOid,
1046 classid,
1047 objid,
1048 objsubid);
1049
1050 LockRelease(&tag, lockmode, false);
1051 }
1052
1053 /*
1054 * LockSharedObjectForSession
1055 *
1056 * Obtain a session-level lock on a shared-across-databases object.
1057 * See LockRelationIdForSession for notes about session-level locks.
1058 */
1059 void
LockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1060 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1061 LOCKMODE lockmode)
1062 {
1063 LOCKTAG tag;
1064
1065 SET_LOCKTAG_OBJECT(tag,
1066 InvalidOid,
1067 classid,
1068 objid,
1069 objsubid);
1070
1071 (void) LockAcquire(&tag, lockmode, true, false);
1072 }
1073
1074 /*
1075 * UnlockSharedObjectForSession
1076 */
1077 void
UnlockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1078 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1079 LOCKMODE lockmode)
1080 {
1081 LOCKTAG tag;
1082
1083 SET_LOCKTAG_OBJECT(tag,
1084 InvalidOid,
1085 classid,
1086 objid,
1087 objsubid);
1088
1089 LockRelease(&tag, lockmode, true);
1090 }
1091
1092
1093 /*
1094 * Append a description of a lockable object to buf.
1095 *
1096 * Ideally we would print names for the numeric values, but that requires
1097 * getting locks on system tables, which might cause problems since this is
1098 * typically used to report deadlock situations.
1099 */
1100 void
DescribeLockTag(StringInfo buf,const LOCKTAG * tag)1101 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1102 {
1103 switch ((LockTagType) tag->locktag_type)
1104 {
1105 case LOCKTAG_RELATION:
1106 appendStringInfo(buf,
1107 _("relation %u of database %u"),
1108 tag->locktag_field2,
1109 tag->locktag_field1);
1110 break;
1111 case LOCKTAG_RELATION_EXTEND:
1112 appendStringInfo(buf,
1113 _("extension of relation %u of database %u"),
1114 tag->locktag_field2,
1115 tag->locktag_field1);
1116 break;
1117 case LOCKTAG_DATABASE_FROZEN_IDS:
1118 appendStringInfo(buf,
1119 _("pg_database.datfrozenxid of database %u"),
1120 tag->locktag_field1);
1121 break;
1122 case LOCKTAG_PAGE:
1123 appendStringInfo(buf,
1124 _("page %u of relation %u of database %u"),
1125 tag->locktag_field3,
1126 tag->locktag_field2,
1127 tag->locktag_field1);
1128 break;
1129 case LOCKTAG_TUPLE:
1130 appendStringInfo(buf,
1131 _("tuple (%u,%u) of relation %u of database %u"),
1132 tag->locktag_field3,
1133 tag->locktag_field4,
1134 tag->locktag_field2,
1135 tag->locktag_field1);
1136 break;
1137 case LOCKTAG_TRANSACTION:
1138 appendStringInfo(buf,
1139 _("transaction %u"),
1140 tag->locktag_field1);
1141 break;
1142 case LOCKTAG_VIRTUALTRANSACTION:
1143 appendStringInfo(buf,
1144 _("virtual transaction %d/%u"),
1145 tag->locktag_field1,
1146 tag->locktag_field2);
1147 break;
1148 case LOCKTAG_SPECULATIVE_TOKEN:
1149 appendStringInfo(buf,
1150 _("speculative token %u of transaction %u"),
1151 tag->locktag_field2,
1152 tag->locktag_field1);
1153 break;
1154 case LOCKTAG_OBJECT:
1155 appendStringInfo(buf,
1156 _("object %u of class %u of database %u"),
1157 tag->locktag_field3,
1158 tag->locktag_field2,
1159 tag->locktag_field1);
1160 break;
1161 case LOCKTAG_USERLOCK:
1162 /* reserved for old contrib code, now on pgfoundry */
1163 appendStringInfo(buf,
1164 _("user lock [%u,%u,%u]"),
1165 tag->locktag_field1,
1166 tag->locktag_field2,
1167 tag->locktag_field3);
1168 break;
1169 case LOCKTAG_ADVISORY:
1170 appendStringInfo(buf,
1171 _("advisory lock [%u,%u,%u,%u]"),
1172 tag->locktag_field1,
1173 tag->locktag_field2,
1174 tag->locktag_field3,
1175 tag->locktag_field4);
1176 break;
1177 default:
1178 appendStringInfo(buf,
1179 _("unrecognized locktag type %d"),
1180 (int) tag->locktag_type);
1181 break;
1182 }
1183 }
1184
1185 /*
1186 * GetLockNameFromTagType
1187 *
1188 * Given locktag type, return the corresponding lock name.
1189 */
1190 const char *
GetLockNameFromTagType(uint16 locktag_type)1191 GetLockNameFromTagType(uint16 locktag_type)
1192 {
1193 if (locktag_type > LOCKTAG_LAST_TYPE)
1194 return "???";
1195 return LockTagTypeNames[locktag_type];
1196 }
1197