1 /*-------------------------------------------------------------------------
2 *
3 * lmgr.c
4 * POSTGRES lock manager code
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/lmgr/lmgr.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/subtrans.h"
19 #include "access/transam.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "commands/progress.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "storage/lmgr.h"
26 #include "storage/procarray.h"
27 #include "storage/sinvaladt.h"
28 #include "utils/inval.h"
29
30
31 /*
32 * Per-backend counter for generating speculative insertion tokens.
33 *
34 * This may wrap around, but that's OK as it's only used for the short
35 * duration between inserting a tuple and checking that there are no (unique)
36 * constraint violations. It's theoretically possible that a backend sees a
37 * tuple that was speculatively inserted by another backend, but before it has
38 * started waiting on the token, the other backend completes its insertion,
39 * and then performs 2^32 unrelated insertions. And after all that, the
40 * first backend finally calls SpeculativeInsertionLockAcquire(), with the
41 * intention of waiting for the first insertion to complete, but ends up
42 * waiting for the latest unrelated insertion instead. Even then, nothing
43 * particularly bad happens: in the worst case they deadlock, causing one of
44 * the transactions to abort.
45 */
46 static uint32 speculativeInsertionToken = 0;
47
48
49 /*
50 * Struct to hold context info for transaction lock waits.
51 *
52 * 'oper' is the operation that needs to wait for the other transaction; 'rel'
53 * and 'ctid' specify the address of the tuple being waited for.
54 */
55 typedef struct XactLockTableWaitInfo
56 {
57 XLTW_Oper oper;
58 Relation rel;
59 ItemPointer ctid;
60 } XactLockTableWaitInfo;
61
62 static void XactLockTableWaitErrorCb(void *arg);
63
64 /*
65 * RelationInitLockInfo
66 * Initializes the lock information in a relation descriptor.
67 *
68 * relcache.c must call this during creation of any reldesc.
69 */
70 void
RelationInitLockInfo(Relation relation)71 RelationInitLockInfo(Relation relation)
72 {
73 Assert(RelationIsValid(relation));
74 Assert(OidIsValid(RelationGetRelid(relation)));
75
76 relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
77
78 if (relation->rd_rel->relisshared)
79 relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
80 else
81 relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
82 }
83
84 /*
85 * SetLocktagRelationOid
86 * Set up a locktag for a relation, given only relation OID
87 */
88 static inline void
SetLocktagRelationOid(LOCKTAG * tag,Oid relid)89 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
90 {
91 Oid dbid;
92
93 if (IsSharedRelation(relid))
94 dbid = InvalidOid;
95 else
96 dbid = MyDatabaseId;
97
98 SET_LOCKTAG_RELATION(*tag, dbid, relid);
99 }
100
101 /*
102 * LockRelationOid
103 *
104 * Lock a relation given only its OID. This should generally be used
105 * before attempting to open the relation's relcache entry.
106 */
107 void
LockRelationOid(Oid relid,LOCKMODE lockmode)108 LockRelationOid(Oid relid, LOCKMODE lockmode)
109 {
110 LOCKTAG tag;
111 LOCALLOCK *locallock;
112 LockAcquireResult res;
113
114 SetLocktagRelationOid(&tag, relid);
115
116 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
117
118 /*
119 * Now that we have the lock, check for invalidation messages, so that we
120 * will update or flush any stale relcache entry before we try to use it.
121 * RangeVarGetRelid() specifically relies on us for this. We can skip
122 * this in the not-uncommon case that we already had the same type of lock
123 * being requested, since then no one else could have modified the
124 * relcache entry in an undesirable way. (In the case where our own xact
125 * modifies the rel, the relcache update happens via
126 * CommandCounterIncrement, not here.)
127 *
128 * However, in corner cases where code acts on tables (usually catalogs)
129 * recursively, we might get here while still processing invalidation
130 * messages in some outer execution of this function or a sibling. The
131 * "cleared" status of the lock tells us whether we really are done
132 * absorbing relevant inval messages.
133 */
134 if (res != LOCKACQUIRE_ALREADY_CLEAR)
135 {
136 AcceptInvalidationMessages();
137 MarkLockClear(locallock);
138 }
139 }
140
141 /*
142 * ConditionalLockRelationOid
143 *
144 * As above, but only lock if we can get the lock without blocking.
145 * Returns true iff the lock was acquired.
146 *
147 * NOTE: we do not currently need conditional versions of all the
148 * LockXXX routines in this file, but they could easily be added if needed.
149 */
150 bool
ConditionalLockRelationOid(Oid relid,LOCKMODE lockmode)151 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
152 {
153 LOCKTAG tag;
154 LOCALLOCK *locallock;
155 LockAcquireResult res;
156
157 SetLocktagRelationOid(&tag, relid);
158
159 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
160
161 if (res == LOCKACQUIRE_NOT_AVAIL)
162 return false;
163
164 /*
165 * Now that we have the lock, check for invalidation messages; see notes
166 * in LockRelationOid.
167 */
168 if (res != LOCKACQUIRE_ALREADY_CLEAR)
169 {
170 AcceptInvalidationMessages();
171 MarkLockClear(locallock);
172 }
173
174 return true;
175 }
176
177 /*
178 * UnlockRelationId
179 *
180 * Unlock, given a LockRelId. This is preferred over UnlockRelationOid
181 * for speed reasons.
182 */
183 void
UnlockRelationId(LockRelId * relid,LOCKMODE lockmode)184 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
185 {
186 LOCKTAG tag;
187
188 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
189
190 LockRelease(&tag, lockmode, false);
191 }
192
193 /*
194 * UnlockRelationOid
195 *
196 * Unlock, given only a relation Oid. Use UnlockRelationId if you can.
197 */
198 void
UnlockRelationOid(Oid relid,LOCKMODE lockmode)199 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
200 {
201 LOCKTAG tag;
202
203 SetLocktagRelationOid(&tag, relid);
204
205 LockRelease(&tag, lockmode, false);
206 }
207
208 /*
209 * LockRelation
210 *
211 * This is a convenience routine for acquiring an additional lock on an
212 * already-open relation. Never try to do "relation_open(foo, NoLock)"
213 * and then lock with this.
214 */
215 void
LockRelation(Relation relation,LOCKMODE lockmode)216 LockRelation(Relation relation, LOCKMODE lockmode)
217 {
218 LOCKTAG tag;
219 LOCALLOCK *locallock;
220 LockAcquireResult res;
221
222 SET_LOCKTAG_RELATION(tag,
223 relation->rd_lockInfo.lockRelId.dbId,
224 relation->rd_lockInfo.lockRelId.relId);
225
226 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
227
228 /*
229 * Now that we have the lock, check for invalidation messages; see notes
230 * in LockRelationOid.
231 */
232 if (res != LOCKACQUIRE_ALREADY_CLEAR)
233 {
234 AcceptInvalidationMessages();
235 MarkLockClear(locallock);
236 }
237 }
238
239 /*
240 * ConditionalLockRelation
241 *
242 * This is a convenience routine for acquiring an additional lock on an
243 * already-open relation. Never try to do "relation_open(foo, NoLock)"
244 * and then lock with this.
245 */
246 bool
ConditionalLockRelation(Relation relation,LOCKMODE lockmode)247 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
248 {
249 LOCKTAG tag;
250 LOCALLOCK *locallock;
251 LockAcquireResult res;
252
253 SET_LOCKTAG_RELATION(tag,
254 relation->rd_lockInfo.lockRelId.dbId,
255 relation->rd_lockInfo.lockRelId.relId);
256
257 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
258
259 if (res == LOCKACQUIRE_NOT_AVAIL)
260 return false;
261
262 /*
263 * Now that we have the lock, check for invalidation messages; see notes
264 * in LockRelationOid.
265 */
266 if (res != LOCKACQUIRE_ALREADY_CLEAR)
267 {
268 AcceptInvalidationMessages();
269 MarkLockClear(locallock);
270 }
271
272 return true;
273 }
274
275 /*
276 * UnlockRelation
277 *
278 * This is a convenience routine for unlocking a relation without also
279 * closing it.
280 */
281 void
UnlockRelation(Relation relation,LOCKMODE lockmode)282 UnlockRelation(Relation relation, LOCKMODE lockmode)
283 {
284 LOCKTAG tag;
285
286 SET_LOCKTAG_RELATION(tag,
287 relation->rd_lockInfo.lockRelId.dbId,
288 relation->rd_lockInfo.lockRelId.relId);
289
290 LockRelease(&tag, lockmode, false);
291 }
292
293 /*
294 * CheckRelationLockedByMe
295 *
296 * Returns true if current transaction holds a lock on 'relation' of mode
297 * 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK.
298 * ("Stronger" is defined as "numerically higher", which is a bit
299 * semantically dubious but is OK for the purposes we use this for.)
300 */
301 bool
CheckRelationLockedByMe(Relation relation,LOCKMODE lockmode,bool orstronger)302 CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
303 {
304 LOCKTAG tag;
305
306 SET_LOCKTAG_RELATION(tag,
307 relation->rd_lockInfo.lockRelId.dbId,
308 relation->rd_lockInfo.lockRelId.relId);
309
310 if (LockHeldByMe(&tag, lockmode))
311 return true;
312
313 if (orstronger)
314 {
315 LOCKMODE slockmode;
316
317 for (slockmode = lockmode + 1;
318 slockmode <= MaxLockMode;
319 slockmode++)
320 {
321 if (LockHeldByMe(&tag, slockmode))
322 {
323 #ifdef NOT_USED
324 /* Sometimes this might be useful for debugging purposes */
325 elog(WARNING, "lock mode %s substituted for %s on relation %s",
326 GetLockmodeName(tag.locktag_lockmethodid, slockmode),
327 GetLockmodeName(tag.locktag_lockmethodid, lockmode),
328 RelationGetRelationName(relation));
329 #endif
330 return true;
331 }
332 }
333 }
334
335 return false;
336 }
337
338 /*
339 * LockHasWaitersRelation
340 *
341 * This is a function to check whether someone else is waiting for a
342 * lock which we are currently holding.
343 */
344 bool
LockHasWaitersRelation(Relation relation,LOCKMODE lockmode)345 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
346 {
347 LOCKTAG tag;
348
349 SET_LOCKTAG_RELATION(tag,
350 relation->rd_lockInfo.lockRelId.dbId,
351 relation->rd_lockInfo.lockRelId.relId);
352
353 return LockHasWaiters(&tag, lockmode, false);
354 }
355
356 /*
357 * LockRelationIdForSession
358 *
359 * This routine grabs a session-level lock on the target relation. The
360 * session lock persists across transaction boundaries. It will be removed
361 * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
362 * or if the backend exits.
363 *
364 * Note that one should also grab a transaction-level lock on the rel
365 * in any transaction that actually uses the rel, to ensure that the
366 * relcache entry is up to date.
367 */
368 void
LockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)369 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
370 {
371 LOCKTAG tag;
372
373 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
374
375 (void) LockAcquire(&tag, lockmode, true, false);
376 }
377
378 /*
379 * UnlockRelationIdForSession
380 */
381 void
UnlockRelationIdForSession(LockRelId * relid,LOCKMODE lockmode)382 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
383 {
384 LOCKTAG tag;
385
386 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
387
388 LockRelease(&tag, lockmode, true);
389 }
390
391 /*
392 * LockRelationForExtension
393 *
394 * This lock tag is used to interlock addition of pages to relations.
395 * We need such locking because bufmgr/smgr definition of P_NEW is not
396 * race-condition-proof.
397 *
398 * We assume the caller is already holding some type of regular lock on
399 * the relation, so no AcceptInvalidationMessages call is needed here.
400 */
401 void
LockRelationForExtension(Relation relation,LOCKMODE lockmode)402 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
403 {
404 LOCKTAG tag;
405
406 SET_LOCKTAG_RELATION_EXTEND(tag,
407 relation->rd_lockInfo.lockRelId.dbId,
408 relation->rd_lockInfo.lockRelId.relId);
409
410 (void) LockAcquire(&tag, lockmode, false, false);
411 }
412
413 /*
414 * ConditionalLockRelationForExtension
415 *
416 * As above, but only lock if we can get the lock without blocking.
417 * Returns true iff the lock was acquired.
418 */
419 bool
ConditionalLockRelationForExtension(Relation relation,LOCKMODE lockmode)420 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
421 {
422 LOCKTAG tag;
423
424 SET_LOCKTAG_RELATION_EXTEND(tag,
425 relation->rd_lockInfo.lockRelId.dbId,
426 relation->rd_lockInfo.lockRelId.relId);
427
428 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
429 }
430
431 /*
432 * RelationExtensionLockWaiterCount
433 *
434 * Count the number of processes waiting for the given relation extension lock.
435 */
436 int
RelationExtensionLockWaiterCount(Relation relation)437 RelationExtensionLockWaiterCount(Relation relation)
438 {
439 LOCKTAG tag;
440
441 SET_LOCKTAG_RELATION_EXTEND(tag,
442 relation->rd_lockInfo.lockRelId.dbId,
443 relation->rd_lockInfo.lockRelId.relId);
444
445 return LockWaiterCount(&tag);
446 }
447
448 /*
449 * UnlockRelationForExtension
450 */
451 void
UnlockRelationForExtension(Relation relation,LOCKMODE lockmode)452 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
453 {
454 LOCKTAG tag;
455
456 SET_LOCKTAG_RELATION_EXTEND(tag,
457 relation->rd_lockInfo.lockRelId.dbId,
458 relation->rd_lockInfo.lockRelId.relId);
459
460 LockRelease(&tag, lockmode, false);
461 }
462
463 /*
464 * LockDatabaseFrozenIds
465 *
466 * This allows one backend per database to execute vac_update_datfrozenxid().
467 */
468 void
LockDatabaseFrozenIds(LOCKMODE lockmode)469 LockDatabaseFrozenIds(LOCKMODE lockmode)
470 {
471 LOCKTAG tag;
472
473 SET_LOCKTAG_DATABASE_FROZEN_IDS(tag, MyDatabaseId);
474
475 (void) LockAcquire(&tag, lockmode, false, false);
476 }
477
478 /*
479 * LockPage
480 *
481 * Obtain a page-level lock. This is currently used by some index access
482 * methods to lock individual index pages.
483 */
484 void
LockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)485 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
486 {
487 LOCKTAG tag;
488
489 SET_LOCKTAG_PAGE(tag,
490 relation->rd_lockInfo.lockRelId.dbId,
491 relation->rd_lockInfo.lockRelId.relId,
492 blkno);
493
494 (void) LockAcquire(&tag, lockmode, false, false);
495 }
496
497 /*
498 * ConditionalLockPage
499 *
500 * As above, but only lock if we can get the lock without blocking.
501 * Returns true iff the lock was acquired.
502 */
503 bool
ConditionalLockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)504 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
505 {
506 LOCKTAG tag;
507
508 SET_LOCKTAG_PAGE(tag,
509 relation->rd_lockInfo.lockRelId.dbId,
510 relation->rd_lockInfo.lockRelId.relId,
511 blkno);
512
513 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
514 }
515
516 /*
517 * UnlockPage
518 */
519 void
UnlockPage(Relation relation,BlockNumber blkno,LOCKMODE lockmode)520 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
521 {
522 LOCKTAG tag;
523
524 SET_LOCKTAG_PAGE(tag,
525 relation->rd_lockInfo.lockRelId.dbId,
526 relation->rd_lockInfo.lockRelId.relId,
527 blkno);
528
529 LockRelease(&tag, lockmode, false);
530 }
531
532 /*
533 * LockTuple
534 *
535 * Obtain a tuple-level lock. This is used in a less-than-intuitive fashion
536 * because we can't afford to keep a separate lock in shared memory for every
537 * tuple. See heap_lock_tuple before using this!
538 */
539 void
LockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)540 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
541 {
542 LOCKTAG tag;
543
544 SET_LOCKTAG_TUPLE(tag,
545 relation->rd_lockInfo.lockRelId.dbId,
546 relation->rd_lockInfo.lockRelId.relId,
547 ItemPointerGetBlockNumber(tid),
548 ItemPointerGetOffsetNumber(tid));
549
550 (void) LockAcquire(&tag, lockmode, false, false);
551 }
552
553 /*
554 * ConditionalLockTuple
555 *
556 * As above, but only lock if we can get the lock without blocking.
557 * Returns true iff the lock was acquired.
558 */
559 bool
ConditionalLockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)560 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
561 {
562 LOCKTAG tag;
563
564 SET_LOCKTAG_TUPLE(tag,
565 relation->rd_lockInfo.lockRelId.dbId,
566 relation->rd_lockInfo.lockRelId.relId,
567 ItemPointerGetBlockNumber(tid),
568 ItemPointerGetOffsetNumber(tid));
569
570 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
571 }
572
573 /*
574 * UnlockTuple
575 */
576 void
UnlockTuple(Relation relation,ItemPointer tid,LOCKMODE lockmode)577 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
578 {
579 LOCKTAG tag;
580
581 SET_LOCKTAG_TUPLE(tag,
582 relation->rd_lockInfo.lockRelId.dbId,
583 relation->rd_lockInfo.lockRelId.relId,
584 ItemPointerGetBlockNumber(tid),
585 ItemPointerGetOffsetNumber(tid));
586
587 LockRelease(&tag, lockmode, false);
588 }
589
590 /*
591 * XactLockTableInsert
592 *
593 * Insert a lock showing that the given transaction ID is running ---
594 * this is done when an XID is acquired by a transaction or subtransaction.
595 * The lock can then be used to wait for the transaction to finish.
596 */
597 void
XactLockTableInsert(TransactionId xid)598 XactLockTableInsert(TransactionId xid)
599 {
600 LOCKTAG tag;
601
602 SET_LOCKTAG_TRANSACTION(tag, xid);
603
604 (void) LockAcquire(&tag, ExclusiveLock, false, false);
605 }
606
607 /*
608 * XactLockTableDelete
609 *
610 * Delete the lock showing that the given transaction ID is running.
611 * (This is never used for main transaction IDs; those locks are only
612 * released implicitly at transaction end. But we do use it for subtrans IDs.)
613 */
614 void
XactLockTableDelete(TransactionId xid)615 XactLockTableDelete(TransactionId xid)
616 {
617 LOCKTAG tag;
618
619 SET_LOCKTAG_TRANSACTION(tag, xid);
620
621 LockRelease(&tag, ExclusiveLock, false);
622 }
623
624 /*
625 * XactLockTableWait
626 *
627 * Wait for the specified transaction to commit or abort. If an operation
628 * is specified, an error context callback is set up. If 'oper' is passed as
629 * None, no error context callback is set up.
630 *
631 * Note that this does the right thing for subtransactions: if we wait on a
632 * subtransaction, we will exit as soon as it aborts or its top parent commits.
633 * It takes some extra work to ensure this, because to save on shared memory
634 * the XID lock of a subtransaction is released when it ends, whether
635 * successfully or unsuccessfully. So we have to check if it's "still running"
636 * and if so wait for its parent.
637 */
638 void
XactLockTableWait(TransactionId xid,Relation rel,ItemPointer ctid,XLTW_Oper oper)639 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
640 XLTW_Oper oper)
641 {
642 LOCKTAG tag;
643 XactLockTableWaitInfo info;
644 ErrorContextCallback callback;
645 bool first = true;
646
647 /*
648 * If an operation is specified, set up our verbose error context
649 * callback.
650 */
651 if (oper != XLTW_None)
652 {
653 Assert(RelationIsValid(rel));
654 Assert(ItemPointerIsValid(ctid));
655
656 info.rel = rel;
657 info.ctid = ctid;
658 info.oper = oper;
659
660 callback.callback = XactLockTableWaitErrorCb;
661 callback.arg = &info;
662 callback.previous = error_context_stack;
663 error_context_stack = &callback;
664 }
665
666 for (;;)
667 {
668 Assert(TransactionIdIsValid(xid));
669 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
670
671 SET_LOCKTAG_TRANSACTION(tag, xid);
672
673 (void) LockAcquire(&tag, ShareLock, false, false);
674
675 LockRelease(&tag, ShareLock, false);
676
677 if (!TransactionIdIsInProgress(xid))
678 break;
679
680 /*
681 * If the Xid belonged to a subtransaction, then the lock would have
682 * gone away as soon as it was finished; for correct tuple visibility,
683 * the right action is to wait on its parent transaction to go away.
684 * But instead of going levels up one by one, we can just wait for the
685 * topmost transaction to finish with the same end result, which also
686 * incurs less locktable traffic.
687 *
688 * Some uses of this function don't involve tuple visibility -- such
689 * as when building snapshots for logical decoding. It is possible to
690 * see a transaction in ProcArray before it registers itself in the
691 * locktable. The topmost transaction in that case is the same xid,
692 * so we try again after a short sleep. (Don't sleep the first time
693 * through, to avoid slowing down the normal case.)
694 */
695 if (!first)
696 pg_usleep(1000L);
697 first = false;
698 xid = SubTransGetTopmostTransaction(xid);
699 }
700
701 if (oper != XLTW_None)
702 error_context_stack = callback.previous;
703 }
704
705 /*
706 * ConditionalXactLockTableWait
707 *
708 * As above, but only lock if we can get the lock without blocking.
709 * Returns true if the lock was acquired.
710 */
711 bool
ConditionalXactLockTableWait(TransactionId xid)712 ConditionalXactLockTableWait(TransactionId xid)
713 {
714 LOCKTAG tag;
715 bool first = true;
716
717 for (;;)
718 {
719 Assert(TransactionIdIsValid(xid));
720 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
721
722 SET_LOCKTAG_TRANSACTION(tag, xid);
723
724 if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
725 return false;
726
727 LockRelease(&tag, ShareLock, false);
728
729 if (!TransactionIdIsInProgress(xid))
730 break;
731
732 /* See XactLockTableWait about this case */
733 if (!first)
734 pg_usleep(1000L);
735 first = false;
736 xid = SubTransGetTopmostTransaction(xid);
737 }
738
739 return true;
740 }
741
742 /*
743 * SpeculativeInsertionLockAcquire
744 *
745 * Insert a lock showing that the given transaction ID is inserting a tuple,
746 * but hasn't yet decided whether it's going to keep it. The lock can then be
747 * used to wait for the decision to go ahead with the insertion, or aborting
748 * it.
749 *
750 * The token is used to distinguish multiple insertions by the same
751 * transaction. It is returned to caller.
752 */
753 uint32
SpeculativeInsertionLockAcquire(TransactionId xid)754 SpeculativeInsertionLockAcquire(TransactionId xid)
755 {
756 LOCKTAG tag;
757
758 speculativeInsertionToken++;
759
760 /*
761 * Check for wrap-around. Zero means no token is held, so don't use that.
762 */
763 if (speculativeInsertionToken == 0)
764 speculativeInsertionToken = 1;
765
766 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
767
768 (void) LockAcquire(&tag, ExclusiveLock, false, false);
769
770 return speculativeInsertionToken;
771 }
772
773 /*
774 * SpeculativeInsertionLockRelease
775 *
776 * Delete the lock showing that the given transaction is speculatively
777 * inserting a tuple.
778 */
779 void
SpeculativeInsertionLockRelease(TransactionId xid)780 SpeculativeInsertionLockRelease(TransactionId xid)
781 {
782 LOCKTAG tag;
783
784 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
785
786 LockRelease(&tag, ExclusiveLock, false);
787 }
788
789 /*
790 * SpeculativeInsertionWait
791 *
792 * Wait for the specified transaction to finish or abort the insertion of a
793 * tuple.
794 */
795 void
SpeculativeInsertionWait(TransactionId xid,uint32 token)796 SpeculativeInsertionWait(TransactionId xid, uint32 token)
797 {
798 LOCKTAG tag;
799
800 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
801
802 Assert(TransactionIdIsValid(xid));
803 Assert(token != 0);
804
805 (void) LockAcquire(&tag, ShareLock, false, false);
806 LockRelease(&tag, ShareLock, false);
807 }
808
809 /*
810 * XactLockTableWaitErrorContextCb
811 * Error context callback for transaction lock waits.
812 */
813 static void
XactLockTableWaitErrorCb(void * arg)814 XactLockTableWaitErrorCb(void *arg)
815 {
816 XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
817
818 /*
819 * We would like to print schema name too, but that would require a
820 * syscache lookup.
821 */
822 if (info->oper != XLTW_None &&
823 ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
824 {
825 const char *cxt;
826
827 switch (info->oper)
828 {
829 case XLTW_Update:
830 cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
831 break;
832 case XLTW_Delete:
833 cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
834 break;
835 case XLTW_Lock:
836 cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
837 break;
838 case XLTW_LockUpdated:
839 cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
840 break;
841 case XLTW_InsertIndex:
842 cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
843 break;
844 case XLTW_InsertIndexUnique:
845 cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
846 break;
847 case XLTW_FetchUpdated:
848 cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
849 break;
850 case XLTW_RecheckExclusionConstr:
851 cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
852 break;
853
854 default:
855 return;
856 }
857
858 errcontext(cxt,
859 ItemPointerGetBlockNumber(info->ctid),
860 ItemPointerGetOffsetNumber(info->ctid),
861 RelationGetRelationName(info->rel));
862 }
863 }
864
865 /*
866 * WaitForLockersMultiple
867 * Wait until no transaction holds locks that conflict with the given
868 * locktags at the given lockmode.
869 *
870 * To do this, obtain the current list of lockers, and wait on their VXIDs
871 * until they are finished.
872 *
873 * Note we don't try to acquire the locks on the given locktags, only the
874 * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
875 * on the objects after we obtained our initial list of lockers, we will not
876 * wait for them.
877 */
878 void
WaitForLockersMultiple(List * locktags,LOCKMODE lockmode,bool progress)879 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
880 {
881 List *holders = NIL;
882 ListCell *lc;
883 int total = 0;
884 int done = 0;
885
886 /* Done if no locks to wait for */
887 if (list_length(locktags) == 0)
888 return;
889
890 /* Collect the transactions we need to wait on */
891 foreach(lc, locktags)
892 {
893 LOCKTAG *locktag = lfirst(lc);
894 int count;
895
896 holders = lappend(holders,
897 GetLockConflicts(locktag, lockmode,
898 progress ? &count : NULL));
899 if (progress)
900 total += count;
901 }
902
903 if (progress)
904 pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, total);
905
906 /*
907 * Note: GetLockConflicts() never reports our own xid, hence we need not
908 * check for that. Also, prepared xacts are reported and awaited.
909 */
910
911 /* Finally wait for each such transaction to complete */
912 foreach(lc, holders)
913 {
914 VirtualTransactionId *lockholders = lfirst(lc);
915
916 while (VirtualTransactionIdIsValid(*lockholders))
917 {
918 /* If requested, publish who we're going to wait for. */
919 if (progress)
920 {
921 PGPROC *holder = BackendIdGetProc(lockholders->backendId);
922
923 if (holder)
924 pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
925 holder->pid);
926 }
927 VirtualXactLock(*lockholders, true);
928 lockholders++;
929
930 if (progress)
931 pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, ++done);
932 }
933 }
934 if (progress)
935 {
936 const int index[] = {
937 PROGRESS_WAITFOR_TOTAL,
938 PROGRESS_WAITFOR_DONE,
939 PROGRESS_WAITFOR_CURRENT_PID
940 };
941 const int64 values[] = {
942 0, 0, 0
943 };
944
945 pgstat_progress_update_multi_param(3, index, values);
946 }
947
948 list_free_deep(holders);
949 }
950
951 /*
952 * WaitForLockers
953 *
954 * Same as WaitForLockersMultiple, for a single lock tag.
955 */
956 void
WaitForLockers(LOCKTAG heaplocktag,LOCKMODE lockmode,bool progress)957 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress)
958 {
959 List *l;
960
961 l = list_make1(&heaplocktag);
962 WaitForLockersMultiple(l, lockmode, progress);
963 list_free(l);
964 }
965
966
967 /*
968 * LockDatabaseObject
969 *
970 * Obtain a lock on a general object of the current database. Don't use
971 * this for shared objects (such as tablespaces). It's unwise to apply it
972 * to relations, also, since a lock taken this way will NOT conflict with
973 * locks taken via LockRelation and friends.
974 */
975 void
LockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)976 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
977 LOCKMODE lockmode)
978 {
979 LOCKTAG tag;
980
981 SET_LOCKTAG_OBJECT(tag,
982 MyDatabaseId,
983 classid,
984 objid,
985 objsubid);
986
987 (void) LockAcquire(&tag, lockmode, false, false);
988
989 /* Make sure syscaches are up-to-date with any changes we waited for */
990 AcceptInvalidationMessages();
991 }
992
993 /*
994 * UnlockDatabaseObject
995 */
996 void
UnlockDatabaseObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)997 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
998 LOCKMODE lockmode)
999 {
1000 LOCKTAG tag;
1001
1002 SET_LOCKTAG_OBJECT(tag,
1003 MyDatabaseId,
1004 classid,
1005 objid,
1006 objsubid);
1007
1008 LockRelease(&tag, lockmode, false);
1009 }
1010
1011 /*
1012 * LockSharedObject
1013 *
1014 * Obtain a lock on a shared-across-databases object.
1015 */
1016 void
LockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1017 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1018 LOCKMODE lockmode)
1019 {
1020 LOCKTAG tag;
1021
1022 SET_LOCKTAG_OBJECT(tag,
1023 InvalidOid,
1024 classid,
1025 objid,
1026 objsubid);
1027
1028 (void) LockAcquire(&tag, lockmode, false, false);
1029
1030 /* Make sure syscaches are up-to-date with any changes we waited for */
1031 AcceptInvalidationMessages();
1032 }
1033
1034 /*
1035 * UnlockSharedObject
1036 */
1037 void
UnlockSharedObject(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1038 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1039 LOCKMODE lockmode)
1040 {
1041 LOCKTAG tag;
1042
1043 SET_LOCKTAG_OBJECT(tag,
1044 InvalidOid,
1045 classid,
1046 objid,
1047 objsubid);
1048
1049 LockRelease(&tag, lockmode, false);
1050 }
1051
1052 /*
1053 * LockSharedObjectForSession
1054 *
1055 * Obtain a session-level lock on a shared-across-databases object.
1056 * See LockRelationIdForSession for notes about session-level locks.
1057 */
1058 void
LockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1059 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1060 LOCKMODE lockmode)
1061 {
1062 LOCKTAG tag;
1063
1064 SET_LOCKTAG_OBJECT(tag,
1065 InvalidOid,
1066 classid,
1067 objid,
1068 objsubid);
1069
1070 (void) LockAcquire(&tag, lockmode, true, false);
1071 }
1072
1073 /*
1074 * UnlockSharedObjectForSession
1075 */
1076 void
UnlockSharedObjectForSession(Oid classid,Oid objid,uint16 objsubid,LOCKMODE lockmode)1077 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1078 LOCKMODE lockmode)
1079 {
1080 LOCKTAG tag;
1081
1082 SET_LOCKTAG_OBJECT(tag,
1083 InvalidOid,
1084 classid,
1085 objid,
1086 objsubid);
1087
1088 LockRelease(&tag, lockmode, true);
1089 }
1090
1091
1092 /*
1093 * Append a description of a lockable object to buf.
1094 *
1095 * Ideally we would print names for the numeric values, but that requires
1096 * getting locks on system tables, which might cause problems since this is
1097 * typically used to report deadlock situations.
1098 */
1099 void
DescribeLockTag(StringInfo buf,const LOCKTAG * tag)1100 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1101 {
1102 switch ((LockTagType) tag->locktag_type)
1103 {
1104 case LOCKTAG_RELATION:
1105 appendStringInfo(buf,
1106 _("relation %u of database %u"),
1107 tag->locktag_field2,
1108 tag->locktag_field1);
1109 break;
1110 case LOCKTAG_RELATION_EXTEND:
1111 appendStringInfo(buf,
1112 _("extension of relation %u of database %u"),
1113 tag->locktag_field2,
1114 tag->locktag_field1);
1115 break;
1116 case LOCKTAG_DATABASE_FROZEN_IDS:
1117 appendStringInfo(buf,
1118 _("pg_database.datfrozenxid of database %u"),
1119 tag->locktag_field1);
1120 break;
1121 case LOCKTAG_PAGE:
1122 appendStringInfo(buf,
1123 _("page %u of relation %u of database %u"),
1124 tag->locktag_field3,
1125 tag->locktag_field2,
1126 tag->locktag_field1);
1127 break;
1128 case LOCKTAG_TUPLE:
1129 appendStringInfo(buf,
1130 _("tuple (%u,%u) of relation %u of database %u"),
1131 tag->locktag_field3,
1132 tag->locktag_field4,
1133 tag->locktag_field2,
1134 tag->locktag_field1);
1135 break;
1136 case LOCKTAG_TRANSACTION:
1137 appendStringInfo(buf,
1138 _("transaction %u"),
1139 tag->locktag_field1);
1140 break;
1141 case LOCKTAG_VIRTUALTRANSACTION:
1142 appendStringInfo(buf,
1143 _("virtual transaction %d/%u"),
1144 tag->locktag_field1,
1145 tag->locktag_field2);
1146 break;
1147 case LOCKTAG_SPECULATIVE_TOKEN:
1148 appendStringInfo(buf,
1149 _("speculative token %u of transaction %u"),
1150 tag->locktag_field2,
1151 tag->locktag_field1);
1152 break;
1153 case LOCKTAG_OBJECT:
1154 appendStringInfo(buf,
1155 _("object %u of class %u of database %u"),
1156 tag->locktag_field3,
1157 tag->locktag_field2,
1158 tag->locktag_field1);
1159 break;
1160 case LOCKTAG_USERLOCK:
1161 /* reserved for old contrib code, now on pgfoundry */
1162 appendStringInfo(buf,
1163 _("user lock [%u,%u,%u]"),
1164 tag->locktag_field1,
1165 tag->locktag_field2,
1166 tag->locktag_field3);
1167 break;
1168 case LOCKTAG_ADVISORY:
1169 appendStringInfo(buf,
1170 _("advisory lock [%u,%u,%u,%u]"),
1171 tag->locktag_field1,
1172 tag->locktag_field2,
1173 tag->locktag_field3,
1174 tag->locktag_field4);
1175 break;
1176 default:
1177 appendStringInfo(buf,
1178 _("unrecognized locktag type %d"),
1179 (int) tag->locktag_type);
1180 break;
1181 }
1182 }
1183
1184 /*
1185 * GetLockNameFromTagType
1186 *
1187 * Given locktag type, return the corresponding lock name.
1188 */
1189 const char *
GetLockNameFromTagType(uint16 locktag_type)1190 GetLockNameFromTagType(uint16 locktag_type)
1191 {
1192 if (locktag_type > LOCKTAG_LAST_TYPE)
1193 return "???";
1194 return LockTagTypeNames[locktag_type];
1195 }
1196