1 /*-------------------------------------------------------------------------
2 *
3 * lock.c
4 * POSTGRES primary lock mechanism
5 *
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/lmgr/lock.c
12 *
13 * NOTES
14 * A lock table is a shared memory hash table. When
15 * a process tries to acquire a lock of a type that conflicts
16 * with existing locks, it is put to sleep using the routines
17 * in storage/lmgr/proc.c.
18 *
19 * For the most part, this code should be invoked via lmgr.c
20 * or another lock-management module, not directly.
21 *
22 * Interface:
23 *
24 * InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
25 * LockAcquire(), LockRelease(), LockReleaseAll(),
26 * LockCheckConflicts(), GrantLock()
27 *
28 *-------------------------------------------------------------------------
29 */
30 #include "postgres.h"
31
32 #include <signal.h>
33 #include <unistd.h>
34
35 #include "access/transam.h"
36 #include "access/twophase.h"
37 #include "access/twophase_rmgr.h"
38 #include "access/xact.h"
39 #include "access/xlog.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/sinvaladt.h"
46 #include "storage/spin.h"
47 #include "storage/standby.h"
48 #include "utils/memutils.h"
49 #include "utils/ps_status.h"
50 #include "utils/resowner_private.h"
51
52
53 /* This configuration variable is used to set the lock table size */
54 int max_locks_per_xact; /* set by guc.c */
55
56 #define NLOCKENTS() \
57 mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
58
59
60 /*
61 * Data structures defining the semantics of the standard lock methods.
62 *
63 * The conflict table defines the semantics of the various lock modes.
64 */
65 static const LOCKMASK LockConflicts[] = {
66 0,
67
68 /* AccessShareLock */
69 LOCKBIT_ON(AccessExclusiveLock),
70
71 /* RowShareLock */
72 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
73
74 /* RowExclusiveLock */
75 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
76 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
77
78 /* ShareUpdateExclusiveLock */
79 LOCKBIT_ON(ShareUpdateExclusiveLock) |
80 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
81 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
82
83 /* ShareLock */
84 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
85 LOCKBIT_ON(ShareRowExclusiveLock) |
86 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
87
88 /* ShareRowExclusiveLock */
89 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
90 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
91 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
92
93 /* ExclusiveLock */
94 LOCKBIT_ON(RowShareLock) |
95 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
96 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
97 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
98
99 /* AccessExclusiveLock */
100 LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
101 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
102 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
103 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)
104
105 };
106
107 /* Names of lock modes, for debug printouts */
108 static const char *const lock_mode_names[] =
109 {
110 "INVALID",
111 "AccessShareLock",
112 "RowShareLock",
113 "RowExclusiveLock",
114 "ShareUpdateExclusiveLock",
115 "ShareLock",
116 "ShareRowExclusiveLock",
117 "ExclusiveLock",
118 "AccessExclusiveLock"
119 };
120
121 #ifndef LOCK_DEBUG
122 static bool Dummy_trace = false;
123 #endif
124
125 static const LockMethodData default_lockmethod = {
126 AccessExclusiveLock, /* highest valid lock mode number */
127 LockConflicts,
128 lock_mode_names,
129 #ifdef LOCK_DEBUG
130 &Trace_locks
131 #else
132 &Dummy_trace
133 #endif
134 };
135
136 static const LockMethodData user_lockmethod = {
137 AccessExclusiveLock, /* highest valid lock mode number */
138 LockConflicts,
139 lock_mode_names,
140 #ifdef LOCK_DEBUG
141 &Trace_userlocks
142 #else
143 &Dummy_trace
144 #endif
145 };
146
147 /*
148 * map from lock method id to the lock table data structures
149 */
150 static const LockMethod LockMethods[] = {
151 NULL,
152 &default_lockmethod,
153 &user_lockmethod
154 };
155
156
157 /* Record that's written to 2PC state file when a lock is persisted */
158 typedef struct TwoPhaseLockRecord
159 {
160 LOCKTAG locktag;
161 LOCKMODE lockmode;
162 } TwoPhaseLockRecord;
163
164
165 /*
166 * Count of the number of fast path lock slots we believe to be used. This
167 * might be higher than the real number if another backend has transferred
168 * our locks to the primary lock table, but it can never be lower than the
169 * real value, since only we can acquire locks on our own behalf.
170 */
171 static int FastPathLocalUseCount = 0;
172
173 /* Macros for manipulating proc->fpLockBits */
174 #define FAST_PATH_BITS_PER_SLOT 3
175 #define FAST_PATH_LOCKNUMBER_OFFSET 1
176 #define FAST_PATH_MASK ((1 << FAST_PATH_BITS_PER_SLOT) - 1)
177 #define FAST_PATH_GET_BITS(proc, n) \
178 (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
179 #define FAST_PATH_BIT_POSITION(n, l) \
180 (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
181 AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
182 AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
183 ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
184 #define FAST_PATH_SET_LOCKMODE(proc, n, l) \
185 (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
186 #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
187 (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
188 #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
189 ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
190
191 /*
192 * The fast-path lock mechanism is concerned only with relation locks on
193 * unshared relations by backends bound to a database. The fast-path
194 * mechanism exists mostly to accelerate acquisition and release of locks
195 * that rarely conflict. Because ShareUpdateExclusiveLock is
196 * self-conflicting, it can't use the fast-path mechanism; but it also does
197 * not conflict with any of the locks that do, so we can ignore it completely.
198 */
199 #define EligibleForRelationFastPath(locktag, mode) \
200 ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
201 (locktag)->locktag_type == LOCKTAG_RELATION && \
202 (locktag)->locktag_field1 == MyDatabaseId && \
203 MyDatabaseId != InvalidOid && \
204 (mode) < ShareUpdateExclusiveLock)
205 #define ConflictsWithRelationFastPath(locktag, mode) \
206 ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
207 (locktag)->locktag_type == LOCKTAG_RELATION && \
208 (locktag)->locktag_field1 != InvalidOid && \
209 (mode) > ShareUpdateExclusiveLock)
210
211 static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
212 static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
213 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
214 const LOCKTAG *locktag, uint32 hashcode);
215 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
216
217 /*
218 * To make the fast-path lock mechanism work, we must have some way of
219 * preventing the use of the fast-path when a conflicting lock might be
220 * present. We partition* the locktag space into FAST_PATH_HASH_BUCKETS
221 * partitions, and maintain an integer count of the number of "strong" lockers
222 * in each partition. When any "strong" lockers are present (which is
223 * hopefully not very often), the fast-path mechanism can't be used, and we
224 * must fall back to the slower method of pushing matching locks directly
225 * into the main lock tables.
226 *
227 * The deadlock detector does not know anything about the fast path mechanism,
228 * so any locks that might be involved in a deadlock must be transferred from
229 * the fast-path queues to the main lock table.
230 */
231
232 #define FAST_PATH_STRONG_LOCK_HASH_BITS 10
233 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
234 (1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
235 #define FastPathStrongLockHashPartition(hashcode) \
236 ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
237
238 typedef struct
239 {
240 slock_t mutex;
241 uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
242 } FastPathStrongRelationLockData;
243
244 static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;
245
246
247 /*
248 * Pointers to hash tables containing lock state
249 *
250 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
251 * shared memory; LockMethodLocalHash is local to each backend.
252 */
253 static HTAB *LockMethodLockHash;
254 static HTAB *LockMethodProcLockHash;
255 static HTAB *LockMethodLocalHash;
256
257
258 /* private state for error cleanup */
259 static LOCALLOCK *StrongLockInProgress;
260 static LOCALLOCK *awaitedLock;
261 static ResourceOwner awaitedOwner;
262
263
264 #ifdef LOCK_DEBUG
265
266 /*------
267 * The following configuration options are available for lock debugging:
268 *
269 * TRACE_LOCKS -- give a bunch of output what's going on in this file
270 * TRACE_USERLOCKS -- same but for user locks
271 * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
272 * (use to avoid output on system tables)
273 * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
274 * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;)
275 *
276 * Furthermore, but in storage/lmgr/lwlock.c:
277 * TRACE_LWLOCKS -- trace lightweight locks (pretty useless)
278 *
279 * Define LOCK_DEBUG at compile time to get all these enabled.
280 * --------
281 */
282
283 int Trace_lock_oidmin = FirstNormalObjectId;
284 bool Trace_locks = false;
285 bool Trace_userlocks = false;
286 int Trace_lock_table = 0;
287 bool Debug_deadlocks = false;
288
289
290 inline static bool
LOCK_DEBUG_ENABLED(const LOCKTAG * tag)291 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
292 {
293 return
294 (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
295 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
296 || (Trace_lock_table &&
297 (tag->locktag_field2 == Trace_lock_table));
298 }
299
300
301 inline static void
LOCK_PRINT(const char * where,const LOCK * lock,LOCKMODE type)302 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
303 {
304 if (LOCK_DEBUG_ENABLED(&lock->tag))
305 elog(LOG,
306 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
307 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
308 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
309 where, lock,
310 lock->tag.locktag_field1, lock->tag.locktag_field2,
311 lock->tag.locktag_field3, lock->tag.locktag_field4,
312 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
313 lock->grantMask,
314 lock->requested[1], lock->requested[2], lock->requested[3],
315 lock->requested[4], lock->requested[5], lock->requested[6],
316 lock->requested[7], lock->nRequested,
317 lock->granted[1], lock->granted[2], lock->granted[3],
318 lock->granted[4], lock->granted[5], lock->granted[6],
319 lock->granted[7], lock->nGranted,
320 lock->waitProcs.size,
321 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
322 }
323
324
325 inline static void
PROCLOCK_PRINT(const char * where,const PROCLOCK * proclockP)326 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
327 {
328 if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
329 elog(LOG,
330 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
331 where, proclockP, proclockP->tag.myLock,
332 PROCLOCK_LOCKMETHOD(*(proclockP)),
333 proclockP->tag.myProc, (int) proclockP->holdMask);
334 }
335 #else /* not LOCK_DEBUG */
336
337 #define LOCK_PRINT(where, lock, type) ((void) 0)
338 #define PROCLOCK_PRINT(where, proclockP) ((void) 0)
339 #endif /* not LOCK_DEBUG */
340
341
342 static uint32 proclock_hash(const void *key, Size keysize);
343 static void RemoveLocalLock(LOCALLOCK *locallock);
344 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
345 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
346 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
347 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
348 static void FinishStrongLockAcquire(void);
349 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
350 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
351 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
352 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
353 PROCLOCK *proclock, LockMethod lockMethodTable);
354 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
355 LockMethod lockMethodTable, uint32 hashcode,
356 bool wakeupNeeded);
357 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
358 LOCKTAG *locktag, LOCKMODE lockmode,
359 bool decrement_strong_lock_count);
360 static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc,
361 BlockedProcsData *data);
362
363
364 /*
365 * InitLocks -- Initialize the lock manager's data structures.
366 *
367 * This is called from CreateSharedMemoryAndSemaphores(), which see for
368 * more comments. In the normal postmaster case, the shared hash tables
369 * are created here, as well as a locallock hash table that will remain
370 * unused and empty in the postmaster itself. Backends inherit the pointers
371 * to the shared tables via fork(), and also inherit an image of the locallock
372 * hash table, which they proceed to use. In the EXEC_BACKEND case, each
373 * backend re-executes this code to obtain pointers to the already existing
374 * shared hash tables and to create its locallock hash table.
375 */
376 void
InitLocks(void)377 InitLocks(void)
378 {
379 HASHCTL info;
380 long init_table_size,
381 max_table_size;
382 bool found;
383
384 /*
385 * Compute init/max size to request for lock hashtables. Note these
386 * calculations must agree with LockShmemSize!
387 */
388 max_table_size = NLOCKENTS();
389 init_table_size = max_table_size / 2;
390
391 /*
392 * Allocate hash table for LOCK structs. This stores per-locked-object
393 * information.
394 */
395 MemSet(&info, 0, sizeof(info));
396 info.keysize = sizeof(LOCKTAG);
397 info.entrysize = sizeof(LOCK);
398 info.num_partitions = NUM_LOCK_PARTITIONS;
399
400 LockMethodLockHash = ShmemInitHash("LOCK hash",
401 init_table_size,
402 max_table_size,
403 &info,
404 HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
405
406 /* Assume an average of 2 holders per lock */
407 max_table_size *= 2;
408 init_table_size *= 2;
409
410 /*
411 * Allocate hash table for PROCLOCK structs. This stores
412 * per-lock-per-holder information.
413 */
414 info.keysize = sizeof(PROCLOCKTAG);
415 info.entrysize = sizeof(PROCLOCK);
416 info.hash = proclock_hash;
417 info.num_partitions = NUM_LOCK_PARTITIONS;
418
419 LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
420 init_table_size,
421 max_table_size,
422 &info,
423 HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
424
425 /*
426 * Allocate fast-path structures.
427 */
428 FastPathStrongRelationLocks =
429 ShmemInitStruct("Fast Path Strong Relation Lock Data",
430 sizeof(FastPathStrongRelationLockData), &found);
431 if (!found)
432 SpinLockInit(&FastPathStrongRelationLocks->mutex);
433
434 /*
435 * Allocate non-shared hash table for LOCALLOCK structs. This stores lock
436 * counts and resource owner information.
437 *
438 * The non-shared table could already exist in this process (this occurs
439 * when the postmaster is recreating shared memory after a backend crash).
440 * If so, delete and recreate it. (We could simply leave it, since it
441 * ought to be empty in the postmaster, but for safety let's zap it.)
442 */
443 if (LockMethodLocalHash)
444 hash_destroy(LockMethodLocalHash);
445
446 info.keysize = sizeof(LOCALLOCKTAG);
447 info.entrysize = sizeof(LOCALLOCK);
448
449 LockMethodLocalHash = hash_create("LOCALLOCK hash",
450 16,
451 &info,
452 HASH_ELEM | HASH_BLOBS);
453 }
454
455
456 /*
457 * Fetch the lock method table associated with a given lock
458 */
459 LockMethod
GetLocksMethodTable(const LOCK * lock)460 GetLocksMethodTable(const LOCK *lock)
461 {
462 LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
463
464 Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
465 return LockMethods[lockmethodid];
466 }
467
468 /*
469 * Fetch the lock method table associated with a given locktag
470 */
471 LockMethod
GetLockTagsMethodTable(const LOCKTAG * locktag)472 GetLockTagsMethodTable(const LOCKTAG *locktag)
473 {
474 LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid;
475
476 Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
477 return LockMethods[lockmethodid];
478 }
479
480
481 /*
482 * Compute the hash code associated with a LOCKTAG.
483 *
484 * To avoid unnecessary recomputations of the hash code, we try to do this
485 * just once per function, and then pass it around as needed. Aside from
486 * passing the hashcode to hash_search_with_hash_value(), we can extract
487 * the lock partition number from the hashcode.
488 */
489 uint32
LockTagHashCode(const LOCKTAG * locktag)490 LockTagHashCode(const LOCKTAG *locktag)
491 {
492 return get_hash_value(LockMethodLockHash, (const void *) locktag);
493 }
494
495 /*
496 * Compute the hash code associated with a PROCLOCKTAG.
497 *
498 * Because we want to use just one set of partition locks for both the
499 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
500 * fall into the same partition number as their associated LOCKs.
501 * dynahash.c expects the partition number to be the low-order bits of
502 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
503 * same low-order bits as the associated LOCKTAG's hash code. We achieve
504 * this with this specialized hash function.
505 */
506 static uint32
proclock_hash(const void * key,Size keysize)507 proclock_hash(const void *key, Size keysize)
508 {
509 const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
510 uint32 lockhash;
511 Datum procptr;
512
513 Assert(keysize == sizeof(PROCLOCKTAG));
514
515 /* Look into the associated LOCK object, and compute its hash code */
516 lockhash = LockTagHashCode(&proclocktag->myLock->tag);
517
518 /*
519 * To make the hash code also depend on the PGPROC, we xor the proc
520 * struct's address into the hash code, left-shifted so that the
521 * partition-number bits don't change. Since this is only a hash, we
522 * don't care if we lose high-order bits of the address; use an
523 * intermediate variable to suppress cast-pointer-to-int warnings.
524 */
525 procptr = PointerGetDatum(proclocktag->myProc);
526 lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
527
528 return lockhash;
529 }
530
531 /*
532 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
533 * for its underlying LOCK.
534 *
535 * We use this just to avoid redundant calls of LockTagHashCode().
536 */
537 static inline uint32
ProcLockHashCode(const PROCLOCKTAG * proclocktag,uint32 hashcode)538 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
539 {
540 uint32 lockhash = hashcode;
541 Datum procptr;
542
543 /*
544 * This must match proclock_hash()!
545 */
546 procptr = PointerGetDatum(proclocktag->myProc);
547 lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
548
549 return lockhash;
550 }
551
552 /*
553 * Given two lock modes, return whether they would conflict.
554 */
555 bool
DoLockModesConflict(LOCKMODE mode1,LOCKMODE mode2)556 DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
557 {
558 LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
559
560 if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
561 return true;
562
563 return false;
564 }
565
566 /*
567 * LockHasWaiters -- look up 'locktag' and check if releasing this
568 * lock would wake up other processes waiting for it.
569 */
570 bool
LockHasWaiters(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)571 LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
572 {
573 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
574 LockMethod lockMethodTable;
575 LOCALLOCKTAG localtag;
576 LOCALLOCK *locallock;
577 LOCK *lock;
578 PROCLOCK *proclock;
579 LWLock *partitionLock;
580 bool hasWaiters = false;
581
582 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
583 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
584 lockMethodTable = LockMethods[lockmethodid];
585 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
586 elog(ERROR, "unrecognized lock mode: %d", lockmode);
587
588 #ifdef LOCK_DEBUG
589 if (LOCK_DEBUG_ENABLED(locktag))
590 elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
591 locktag->locktag_field1, locktag->locktag_field2,
592 lockMethodTable->lockModeNames[lockmode]);
593 #endif
594
595 /*
596 * Find the LOCALLOCK entry for this lock and lockmode
597 */
598 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
599 localtag.lock = *locktag;
600 localtag.mode = lockmode;
601
602 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
603 (void *) &localtag,
604 HASH_FIND, NULL);
605
606 /*
607 * let the caller print its own error message, too. Do not ereport(ERROR).
608 */
609 if (!locallock || locallock->nLocks <= 0)
610 {
611 elog(WARNING, "you don't own a lock of type %s",
612 lockMethodTable->lockModeNames[lockmode]);
613 return false;
614 }
615
616 /*
617 * Check the shared lock table.
618 */
619 partitionLock = LockHashPartitionLock(locallock->hashcode);
620
621 LWLockAcquire(partitionLock, LW_SHARED);
622
623 /*
624 * We don't need to re-find the lock or proclock, since we kept their
625 * addresses in the locallock table, and they couldn't have been removed
626 * while we were holding a lock on them.
627 */
628 lock = locallock->lock;
629 LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
630 proclock = locallock->proclock;
631 PROCLOCK_PRINT("LockHasWaiters: found", proclock);
632
633 /*
634 * Double-check that we are actually holding a lock of the type we want to
635 * release.
636 */
637 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
638 {
639 PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
640 LWLockRelease(partitionLock);
641 elog(WARNING, "you don't own a lock of type %s",
642 lockMethodTable->lockModeNames[lockmode]);
643 RemoveLocalLock(locallock);
644 return false;
645 }
646
647 /*
648 * Do the checking.
649 */
650 if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
651 hasWaiters = true;
652
653 LWLockRelease(partitionLock);
654
655 return hasWaiters;
656 }
657
658 /*
659 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
660 * set lock if/when no conflicts.
661 *
662 * Inputs:
663 * locktag: unique identifier for the lockable object
664 * lockmode: lock mode to acquire
665 * sessionLock: if true, acquire lock for session not current transaction
666 * dontWait: if true, don't wait to acquire lock
667 *
668 * Returns one of:
669 * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
670 * LOCKACQUIRE_OK lock successfully acquired
671 * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
672 * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear
673 *
674 * In the normal case where dontWait=false and the caller doesn't need to
675 * distinguish a freshly acquired lock from one already taken earlier in
676 * this same transaction, there is no need to examine the return value.
677 *
678 * Side Effects: The lock is acquired and recorded in lock tables.
679 *
680 * NOTE: if we wait for the lock, there is no way to abort the wait
681 * short of aborting the transaction.
682 */
683 LockAcquireResult
LockAcquire(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait)684 LockAcquire(const LOCKTAG *locktag,
685 LOCKMODE lockmode,
686 bool sessionLock,
687 bool dontWait)
688 {
689 return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
690 true, NULL);
691 }
692
693 /*
694 * LockAcquireExtended - allows us to specify additional options
695 *
696 * reportMemoryError specifies whether a lock request that fills the lock
697 * table should generate an ERROR or not. Passing "false" allows the caller
698 * to attempt to recover from lock-table-full situations, perhaps by forcibly
699 * cancelling other lock holders and then retrying. Note, however, that the
700 * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use
701 * in combination with dontWait = true, as the cause of failure couldn't be
702 * distinguished.
703 *
704 * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
705 * table entry if a lock is successfully acquired, or NULL if not.
706 */
707 LockAcquireResult
LockAcquireExtended(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait,bool reportMemoryError,LOCALLOCK ** locallockp)708 LockAcquireExtended(const LOCKTAG *locktag,
709 LOCKMODE lockmode,
710 bool sessionLock,
711 bool dontWait,
712 bool reportMemoryError,
713 LOCALLOCK **locallockp)
714 {
715 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
716 LockMethod lockMethodTable;
717 LOCALLOCKTAG localtag;
718 LOCALLOCK *locallock;
719 LOCK *lock;
720 PROCLOCK *proclock;
721 bool found;
722 ResourceOwner owner;
723 uint32 hashcode;
724 LWLock *partitionLock;
725 int status;
726 bool log_lock = false;
727
728 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
729 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
730 lockMethodTable = LockMethods[lockmethodid];
731 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
732 elog(ERROR, "unrecognized lock mode: %d", lockmode);
733
734 if (RecoveryInProgress() && !InRecovery &&
735 (locktag->locktag_type == LOCKTAG_OBJECT ||
736 locktag->locktag_type == LOCKTAG_RELATION) &&
737 lockmode > RowExclusiveLock)
738 ereport(ERROR,
739 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
740 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
741 lockMethodTable->lockModeNames[lockmode]),
742 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
743
744 #ifdef LOCK_DEBUG
745 if (LOCK_DEBUG_ENABLED(locktag))
746 elog(LOG, "LockAcquire: lock [%u,%u] %s",
747 locktag->locktag_field1, locktag->locktag_field2,
748 lockMethodTable->lockModeNames[lockmode]);
749 #endif
750
751 /* Identify owner for lock */
752 if (sessionLock)
753 owner = NULL;
754 else
755 owner = CurrentResourceOwner;
756
757 /*
758 * Find or create a LOCALLOCK entry for this lock and lockmode
759 */
760 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
761 localtag.lock = *locktag;
762 localtag.mode = lockmode;
763
764 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
765 (void *) &localtag,
766 HASH_ENTER, &found);
767
768 /*
769 * if it's a new locallock object, initialize it
770 */
771 if (!found)
772 {
773 locallock->lock = NULL;
774 locallock->proclock = NULL;
775 locallock->hashcode = LockTagHashCode(&(localtag.lock));
776 locallock->nLocks = 0;
777 locallock->numLockOwners = 0;
778 locallock->maxLockOwners = 8;
779 locallock->holdsStrongLockCount = FALSE;
780 locallock->lockCleared = false;
781 locallock->lockOwners = NULL; /* in case next line fails */
782 locallock->lockOwners = (LOCALLOCKOWNER *)
783 MemoryContextAlloc(TopMemoryContext,
784 locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
785 }
786 else
787 {
788 /* Make sure there will be room to remember the lock */
789 if (locallock->numLockOwners >= locallock->maxLockOwners)
790 {
791 int newsize = locallock->maxLockOwners * 2;
792
793 locallock->lockOwners = (LOCALLOCKOWNER *)
794 repalloc(locallock->lockOwners,
795 newsize * sizeof(LOCALLOCKOWNER));
796 locallock->maxLockOwners = newsize;
797 }
798 }
799 hashcode = locallock->hashcode;
800
801 if (locallockp)
802 *locallockp = locallock;
803
804 /*
805 * If we already hold the lock, we can just increase the count locally.
806 *
807 * If lockCleared is already set, caller need not worry about absorbing
808 * sinval messages related to the lock's object.
809 */
810 if (locallock->nLocks > 0)
811 {
812 GrantLockLocal(locallock, owner);
813 if (locallock->lockCleared)
814 return LOCKACQUIRE_ALREADY_CLEAR;
815 else
816 return LOCKACQUIRE_ALREADY_HELD;
817 }
818
819 /*
820 * Prepare to emit a WAL record if acquisition of this lock needs to be
821 * replayed in a standby server.
822 *
823 * Here we prepare to log; after lock is acquired we'll issue log record.
824 * This arrangement simplifies error recovery in case the preparation step
825 * fails.
826 *
827 * Only AccessExclusiveLocks can conflict with lock types that read-only
828 * transactions can acquire in a standby server. Make sure this definition
829 * matches the one in GetRunningTransactionLocks().
830 */
831 if (lockmode >= AccessExclusiveLock &&
832 locktag->locktag_type == LOCKTAG_RELATION &&
833 !RecoveryInProgress() &&
834 XLogStandbyInfoActive())
835 {
836 LogAccessExclusiveLockPrepare();
837 log_lock = true;
838 }
839
840 /*
841 * Attempt to take lock via fast path, if eligible. But if we remember
842 * having filled up the fast path array, we don't attempt to make any
843 * further use of it until we release some locks. It's possible that some
844 * other backend has transferred some of those locks to the shared hash
845 * table, leaving space free, but it's not worth acquiring the LWLock just
846 * to check. It's also possible that we're acquiring a second or third
847 * lock type on a relation we have already locked using the fast-path, but
848 * for now we don't worry about that case either.
849 */
850 if (EligibleForRelationFastPath(locktag, lockmode) &&
851 FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
852 {
853 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
854 bool acquired;
855
856 /*
857 * LWLockAcquire acts as a memory sequencing point, so it's safe to
858 * assume that any strong locker whose increment to
859 * FastPathStrongRelationLocks->counts becomes visible after we test
860 * it has yet to begin to transfer fast-path locks.
861 */
862 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
863 if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
864 acquired = false;
865 else
866 acquired = FastPathGrantRelationLock(locktag->locktag_field2,
867 lockmode);
868 LWLockRelease(&MyProc->backendLock);
869 if (acquired)
870 {
871 /*
872 * The locallock might contain stale pointers to some old shared
873 * objects; we MUST reset these to null before considering the
874 * lock to be acquired via fast-path.
875 */
876 locallock->lock = NULL;
877 locallock->proclock = NULL;
878 GrantLockLocal(locallock, owner);
879 return LOCKACQUIRE_OK;
880 }
881 }
882
883 /*
884 * If this lock could potentially have been taken via the fast-path by
885 * some other backend, we must (temporarily) disable further use of the
886 * fast-path for this lock tag, and migrate any locks already taken via
887 * this method to the main lock table.
888 */
889 if (ConflictsWithRelationFastPath(locktag, lockmode))
890 {
891 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
892
893 BeginStrongLockAcquire(locallock, fasthashcode);
894 if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
895 hashcode))
896 {
897 AbortStrongLockAcquire();
898 if (locallock->nLocks == 0)
899 RemoveLocalLock(locallock);
900 if (locallockp)
901 *locallockp = NULL;
902 if (reportMemoryError)
903 ereport(ERROR,
904 (errcode(ERRCODE_OUT_OF_MEMORY),
905 errmsg("out of shared memory"),
906 errhint("You might need to increase max_locks_per_transaction.")));
907 else
908 return LOCKACQUIRE_NOT_AVAIL;
909 }
910 }
911
912 /*
913 * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
914 * take it via the fast-path, either, so we've got to mess with the shared
915 * lock table.
916 */
917 partitionLock = LockHashPartitionLock(hashcode);
918
919 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
920
921 /*
922 * Find or create lock and proclock entries with this tag
923 *
924 * Note: if the locallock object already existed, it might have a pointer
925 * to the lock already ... but we should not assume that that pointer is
926 * valid, since a lock object with zero hold and request counts can go
927 * away anytime. So we have to use SetupLockInTable() to recompute the
928 * lock and proclock pointers, even if they're already set.
929 */
930 proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
931 hashcode, lockmode);
932 if (!proclock)
933 {
934 AbortStrongLockAcquire();
935 LWLockRelease(partitionLock);
936 if (locallock->nLocks == 0)
937 RemoveLocalLock(locallock);
938 if (locallockp)
939 *locallockp = NULL;
940 if (reportMemoryError)
941 ereport(ERROR,
942 (errcode(ERRCODE_OUT_OF_MEMORY),
943 errmsg("out of shared memory"),
944 errhint("You might need to increase max_locks_per_transaction.")));
945 else
946 return LOCKACQUIRE_NOT_AVAIL;
947 }
948 locallock->proclock = proclock;
949 lock = proclock->tag.myLock;
950 locallock->lock = lock;
951
952 /*
953 * If lock requested conflicts with locks requested by waiters, must join
954 * wait queue. Otherwise, check for conflict with already-held locks.
955 * (That's last because most complex check.)
956 */
957 if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
958 status = STATUS_FOUND;
959 else
960 status = LockCheckConflicts(lockMethodTable, lockmode,
961 lock, proclock);
962
963 if (status == STATUS_OK)
964 {
965 /* No conflict with held or previously requested locks */
966 GrantLock(lock, proclock, lockmode);
967 GrantLockLocal(locallock, owner);
968 }
969 else
970 {
971 Assert(status == STATUS_FOUND);
972
973 /*
974 * We can't acquire the lock immediately. If caller specified no
975 * blocking, remove useless table entries and return NOT_AVAIL without
976 * waiting.
977 */
978 if (dontWait)
979 {
980 AbortStrongLockAcquire();
981 if (proclock->holdMask == 0)
982 {
983 uint32 proclock_hashcode;
984
985 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
986 SHMQueueDelete(&proclock->lockLink);
987 SHMQueueDelete(&proclock->procLink);
988 if (!hash_search_with_hash_value(LockMethodProcLockHash,
989 (void *) &(proclock->tag),
990 proclock_hashcode,
991 HASH_REMOVE,
992 NULL))
993 elog(PANIC, "proclock table corrupted");
994 }
995 else
996 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
997 lock->nRequested--;
998 lock->requested[lockmode]--;
999 LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
1000 Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
1001 Assert(lock->nGranted <= lock->nRequested);
1002 LWLockRelease(partitionLock);
1003 if (locallock->nLocks == 0)
1004 RemoveLocalLock(locallock);
1005 if (locallockp)
1006 *locallockp = NULL;
1007 return LOCKACQUIRE_NOT_AVAIL;
1008 }
1009
1010 /*
1011 * Set bitmask of locks this process already holds on this object.
1012 */
1013 MyProc->heldLocks = proclock->holdMask;
1014
1015 /*
1016 * Sleep till someone wakes me up.
1017 */
1018
1019 TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
1020 locktag->locktag_field2,
1021 locktag->locktag_field3,
1022 locktag->locktag_field4,
1023 locktag->locktag_type,
1024 lockmode);
1025
1026 WaitOnLock(locallock, owner);
1027
1028 TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
1029 locktag->locktag_field2,
1030 locktag->locktag_field3,
1031 locktag->locktag_field4,
1032 locktag->locktag_type,
1033 lockmode);
1034
1035 /*
1036 * NOTE: do not do any material change of state between here and
1037 * return. All required changes in locktable state must have been
1038 * done when the lock was granted to us --- see notes in WaitOnLock.
1039 */
1040
1041 /*
1042 * Check the proclock entry status, in case something in the ipc
1043 * communication doesn't work correctly.
1044 */
1045 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1046 {
1047 AbortStrongLockAcquire();
1048 PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
1049 LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
1050 /* Should we retry ? */
1051 LWLockRelease(partitionLock);
1052 elog(ERROR, "LockAcquire failed");
1053 }
1054 PROCLOCK_PRINT("LockAcquire: granted", proclock);
1055 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
1056 }
1057
1058 /*
1059 * Lock state is fully up-to-date now; if we error out after this, no
1060 * special error cleanup is required.
1061 */
1062 FinishStrongLockAcquire();
1063
1064 LWLockRelease(partitionLock);
1065
1066 /*
1067 * Emit a WAL record if acquisition of this lock needs to be replayed in a
1068 * standby server.
1069 */
1070 if (log_lock)
1071 {
1072 /*
1073 * Decode the locktag back to the original values, to avoid sending
1074 * lots of empty bytes with every message. See lock.h to check how a
1075 * locktag is defined for LOCKTAG_RELATION
1076 */
1077 LogAccessExclusiveLock(locktag->locktag_field1,
1078 locktag->locktag_field2);
1079 }
1080
1081 return LOCKACQUIRE_OK;
1082 }
1083
1084 /*
1085 * Find or create LOCK and PROCLOCK objects as needed for a new lock
1086 * request.
1087 *
1088 * Returns the PROCLOCK object, or NULL if we failed to create the objects
1089 * for lack of shared memory.
1090 *
1091 * The appropriate partition lock must be held at entry, and will be
1092 * held at exit.
1093 */
1094 static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable,PGPROC * proc,const LOCKTAG * locktag,uint32 hashcode,LOCKMODE lockmode)1095 SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
1096 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
1097 {
1098 LOCK *lock;
1099 PROCLOCK *proclock;
1100 PROCLOCKTAG proclocktag;
1101 uint32 proclock_hashcode;
1102 bool found;
1103
1104 /*
1105 * Find or create a lock with this tag.
1106 */
1107 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1108 (const void *) locktag,
1109 hashcode,
1110 HASH_ENTER_NULL,
1111 &found);
1112 if (!lock)
1113 return NULL;
1114
1115 /*
1116 * if it's a new lock object, initialize it
1117 */
1118 if (!found)
1119 {
1120 lock->grantMask = 0;
1121 lock->waitMask = 0;
1122 SHMQueueInit(&(lock->procLocks));
1123 ProcQueueInit(&(lock->waitProcs));
1124 lock->nRequested = 0;
1125 lock->nGranted = 0;
1126 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
1127 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
1128 LOCK_PRINT("LockAcquire: new", lock, lockmode);
1129 }
1130 else
1131 {
1132 LOCK_PRINT("LockAcquire: found", lock, lockmode);
1133 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1134 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1135 Assert(lock->nGranted <= lock->nRequested);
1136 }
1137
1138 /*
1139 * Create the hash key for the proclock table.
1140 */
1141 proclocktag.myLock = lock;
1142 proclocktag.myProc = proc;
1143
1144 proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
1145
1146 /*
1147 * Find or create a proclock entry with this tag
1148 */
1149 proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
1150 (void *) &proclocktag,
1151 proclock_hashcode,
1152 HASH_ENTER_NULL,
1153 &found);
1154 if (!proclock)
1155 {
1156 /* Ooops, not enough shmem for the proclock */
1157 if (lock->nRequested == 0)
1158 {
1159 /*
1160 * There are no other requestors of this lock, so garbage-collect
1161 * the lock object. We *must* do this to avoid a permanent leak
1162 * of shared memory, because there won't be anything to cause
1163 * anyone to release the lock object later.
1164 */
1165 Assert(SHMQueueEmpty(&(lock->procLocks)));
1166 if (!hash_search_with_hash_value(LockMethodLockHash,
1167 (void *) &(lock->tag),
1168 hashcode,
1169 HASH_REMOVE,
1170 NULL))
1171 elog(PANIC, "lock table corrupted");
1172 }
1173 return NULL;
1174 }
1175
1176 /*
1177 * If new, initialize the new entry
1178 */
1179 if (!found)
1180 {
1181 uint32 partition = LockHashPartition(hashcode);
1182
1183 /*
1184 * It might seem unsafe to access proclock->groupLeader without a
1185 * lock, but it's not really. Either we are initializing a proclock
1186 * on our own behalf, in which case our group leader isn't changing
1187 * because the group leader for a process can only ever be changed by
1188 * the process itself; or else we are transferring a fast-path lock to
1189 * the main lock table, in which case that process can't change it's
1190 * lock group leader without first releasing all of its locks (and in
1191 * particular the one we are currently transferring).
1192 */
1193 proclock->groupLeader = proc->lockGroupLeader != NULL ?
1194 proc->lockGroupLeader : proc;
1195 proclock->holdMask = 0;
1196 proclock->releaseMask = 0;
1197 /* Add proclock to appropriate lists */
1198 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1199 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1200 &proclock->procLink);
1201 PROCLOCK_PRINT("LockAcquire: new", proclock);
1202 }
1203 else
1204 {
1205 PROCLOCK_PRINT("LockAcquire: found", proclock);
1206 Assert((proclock->holdMask & ~lock->grantMask) == 0);
1207
1208 #ifdef CHECK_DEADLOCK_RISK
1209
1210 /*
1211 * Issue warning if we already hold a lower-level lock on this object
1212 * and do not hold a lock of the requested level or higher. This
1213 * indicates a deadlock-prone coding practice (eg, we'd have a
1214 * deadlock if another backend were following the same code path at
1215 * about the same time).
1216 *
1217 * This is not enabled by default, because it may generate log entries
1218 * about user-level coding practices that are in fact safe in context.
1219 * It can be enabled to help find system-level problems.
1220 *
1221 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
1222 * better to use a table. For now, though, this works.
1223 */
1224 {
1225 int i;
1226
1227 for (i = lockMethodTable->numLockModes; i > 0; i--)
1228 {
1229 if (proclock->holdMask & LOCKBIT_ON(i))
1230 {
1231 if (i >= (int) lockmode)
1232 break; /* safe: we have a lock >= req level */
1233 elog(LOG, "deadlock risk: raising lock level"
1234 " from %s to %s on object %u/%u/%u",
1235 lockMethodTable->lockModeNames[i],
1236 lockMethodTable->lockModeNames[lockmode],
1237 lock->tag.locktag_field1, lock->tag.locktag_field2,
1238 lock->tag.locktag_field3);
1239 break;
1240 }
1241 }
1242 }
1243 #endif /* CHECK_DEADLOCK_RISK */
1244 }
1245
1246 /*
1247 * lock->nRequested and lock->requested[] count the total number of
1248 * requests, whether granted or waiting, so increment those immediately.
1249 * The other counts don't increment till we get the lock.
1250 */
1251 lock->nRequested++;
1252 lock->requested[lockmode]++;
1253 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1254
1255 /*
1256 * We shouldn't already hold the desired lock; else locallock table is
1257 * broken.
1258 */
1259 if (proclock->holdMask & LOCKBIT_ON(lockmode))
1260 elog(ERROR, "lock %s on object %u/%u/%u is already held",
1261 lockMethodTable->lockModeNames[lockmode],
1262 lock->tag.locktag_field1, lock->tag.locktag_field2,
1263 lock->tag.locktag_field3);
1264
1265 return proclock;
1266 }
1267
1268 /*
1269 * Subroutine to free a locallock entry
1270 */
1271 static void
RemoveLocalLock(LOCALLOCK * locallock)1272 RemoveLocalLock(LOCALLOCK *locallock)
1273 {
1274 int i;
1275
1276 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1277 {
1278 if (locallock->lockOwners[i].owner != NULL)
1279 ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
1280 }
1281 locallock->numLockOwners = 0;
1282 if (locallock->lockOwners != NULL)
1283 pfree(locallock->lockOwners);
1284 locallock->lockOwners = NULL;
1285
1286 if (locallock->holdsStrongLockCount)
1287 {
1288 uint32 fasthashcode;
1289
1290 fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1291
1292 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1293 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1294 FastPathStrongRelationLocks->count[fasthashcode]--;
1295 locallock->holdsStrongLockCount = FALSE;
1296 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1297 }
1298
1299 if (!hash_search(LockMethodLocalHash,
1300 (void *) &(locallock->tag),
1301 HASH_REMOVE, NULL))
1302 elog(WARNING, "locallock table corrupted");
1303 }
1304
1305 /*
1306 * LockCheckConflicts -- test whether requested lock conflicts
1307 * with those already granted
1308 *
1309 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1310 *
1311 * NOTES:
1312 * Here's what makes this complicated: one process's locks don't
1313 * conflict with one another, no matter what purpose they are held for
1314 * (eg, session and transaction locks do not conflict). Nor do the locks
1315 * of one process in a lock group conflict with those of another process in
1316 * the same group. So, we must subtract off these locks when determining
1317 * whether the requested new lock conflicts with those already held.
1318 */
1319 int
LockCheckConflicts(LockMethod lockMethodTable,LOCKMODE lockmode,LOCK * lock,PROCLOCK * proclock)1320 LockCheckConflicts(LockMethod lockMethodTable,
1321 LOCKMODE lockmode,
1322 LOCK *lock,
1323 PROCLOCK *proclock)
1324 {
1325 int numLockModes = lockMethodTable->numLockModes;
1326 LOCKMASK myLocks;
1327 int conflictMask = lockMethodTable->conflictTab[lockmode];
1328 int conflictsRemaining[MAX_LOCKMODES];
1329 int totalConflictsRemaining = 0;
1330 int i;
1331 SHM_QUEUE *procLocks;
1332 PROCLOCK *otherproclock;
1333
1334 /*
1335 * first check for global conflicts: If no locks conflict with my request,
1336 * then I get the lock.
1337 *
1338 * Checking for conflict: lock->grantMask represents the types of
1339 * currently held locks. conflictTable[lockmode] has a bit set for each
1340 * type of lock that conflicts with request. Bitwise compare tells if
1341 * there is a conflict.
1342 */
1343 if (!(conflictMask & lock->grantMask))
1344 {
1345 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1346 return STATUS_OK;
1347 }
1348
1349 /*
1350 * Rats. Something conflicts. But it could still be my own lock, or a
1351 * lock held by another member of my locking group. First, figure out how
1352 * many conflicts remain after subtracting out any locks I hold myself.
1353 */
1354 myLocks = proclock->holdMask;
1355 for (i = 1; i <= numLockModes; i++)
1356 {
1357 if ((conflictMask & LOCKBIT_ON(i)) == 0)
1358 {
1359 conflictsRemaining[i] = 0;
1360 continue;
1361 }
1362 conflictsRemaining[i] = lock->granted[i];
1363 if (myLocks & LOCKBIT_ON(i))
1364 --conflictsRemaining[i];
1365 totalConflictsRemaining += conflictsRemaining[i];
1366 }
1367
1368 /* If no conflicts remain, we get the lock. */
1369 if (totalConflictsRemaining == 0)
1370 {
1371 PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
1372 return STATUS_OK;
1373 }
1374
1375 /* If no group locking, it's definitely a conflict. */
1376 if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
1377 {
1378 Assert(proclock->tag.myProc == MyProc);
1379 PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
1380 proclock);
1381 return STATUS_FOUND;
1382 }
1383
1384 /*
1385 * Locks held in conflicting modes by members of our own lock group are
1386 * not real conflicts; we can subtract those out and see if we still have
1387 * a conflict. This is O(N) in the number of processes holding or
1388 * awaiting locks on this object. We could improve that by making the
1389 * shared memory state more complex (and larger) but it doesn't seem worth
1390 * it.
1391 */
1392 procLocks = &(lock->procLocks);
1393 otherproclock = (PROCLOCK *)
1394 SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
1395 while (otherproclock != NULL)
1396 {
1397 if (proclock != otherproclock &&
1398 proclock->groupLeader == otherproclock->groupLeader &&
1399 (otherproclock->holdMask & conflictMask) != 0)
1400 {
1401 int intersectMask = otherproclock->holdMask & conflictMask;
1402
1403 for (i = 1; i <= numLockModes; i++)
1404 {
1405 if ((intersectMask & LOCKBIT_ON(i)) != 0)
1406 {
1407 if (conflictsRemaining[i] <= 0)
1408 elog(PANIC, "proclocks held do not match lock");
1409 conflictsRemaining[i]--;
1410 totalConflictsRemaining--;
1411 }
1412 }
1413
1414 if (totalConflictsRemaining == 0)
1415 {
1416 PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
1417 proclock);
1418 return STATUS_OK;
1419 }
1420 }
1421 otherproclock = (PROCLOCK *)
1422 SHMQueueNext(procLocks, &otherproclock->lockLink,
1423 offsetof(PROCLOCK, lockLink));
1424 }
1425
1426 /* Nope, it's a real conflict. */
1427 PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
1428 return STATUS_FOUND;
1429 }
1430
1431 /*
1432 * GrantLock -- update the lock and proclock data structures to show
1433 * the lock request has been granted.
1434 *
1435 * NOTE: if proc was blocked, it also needs to be removed from the wait list
1436 * and have its waitLock/waitProcLock fields cleared. That's not done here.
1437 *
1438 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
1439 * table entry; but since we may be awaking some other process, we can't do
1440 * that here; it's done by GrantLockLocal, instead.
1441 */
1442 void
GrantLock(LOCK * lock,PROCLOCK * proclock,LOCKMODE lockmode)1443 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
1444 {
1445 lock->nGranted++;
1446 lock->granted[lockmode]++;
1447 lock->grantMask |= LOCKBIT_ON(lockmode);
1448 if (lock->granted[lockmode] == lock->requested[lockmode])
1449 lock->waitMask &= LOCKBIT_OFF(lockmode);
1450 proclock->holdMask |= LOCKBIT_ON(lockmode);
1451 LOCK_PRINT("GrantLock", lock, lockmode);
1452 Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1453 Assert(lock->nGranted <= lock->nRequested);
1454 }
1455
1456 /*
1457 * UnGrantLock -- opposite of GrantLock.
1458 *
1459 * Updates the lock and proclock data structures to show that the lock
1460 * is no longer held nor requested by the current holder.
1461 *
1462 * Returns true if there were any waiters waiting on the lock that
1463 * should now be woken up with ProcLockWakeup.
1464 */
1465 static bool
UnGrantLock(LOCK * lock,LOCKMODE lockmode,PROCLOCK * proclock,LockMethod lockMethodTable)1466 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
1467 PROCLOCK *proclock, LockMethod lockMethodTable)
1468 {
1469 bool wakeupNeeded = false;
1470
1471 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1472 Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1473 Assert(lock->nGranted <= lock->nRequested);
1474
1475 /*
1476 * fix the general lock stats
1477 */
1478 lock->nRequested--;
1479 lock->requested[lockmode]--;
1480 lock->nGranted--;
1481 lock->granted[lockmode]--;
1482
1483 if (lock->granted[lockmode] == 0)
1484 {
1485 /* change the conflict mask. No more of this lock type. */
1486 lock->grantMask &= LOCKBIT_OFF(lockmode);
1487 }
1488
1489 LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
1490
1491 /*
1492 * We need only run ProcLockWakeup if the released lock conflicts with at
1493 * least one of the lock types requested by waiter(s). Otherwise whatever
1494 * conflict made them wait must still exist. NOTE: before MVCC, we could
1495 * skip wakeup if lock->granted[lockmode] was still positive. But that's
1496 * not true anymore, because the remaining granted locks might belong to
1497 * some waiter, who could now be awakened because he doesn't conflict with
1498 * his own locks.
1499 */
1500 if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1501 wakeupNeeded = true;
1502
1503 /*
1504 * Now fix the per-proclock state.
1505 */
1506 proclock->holdMask &= LOCKBIT_OFF(lockmode);
1507 PROCLOCK_PRINT("UnGrantLock: updated", proclock);
1508
1509 return wakeupNeeded;
1510 }
1511
1512 /*
1513 * CleanUpLock -- clean up after releasing a lock. We garbage-collect the
1514 * proclock and lock objects if possible, and call ProcLockWakeup if there
1515 * are remaining requests and the caller says it's OK. (Normally, this
1516 * should be called after UnGrantLock, and wakeupNeeded is the result from
1517 * UnGrantLock.)
1518 *
1519 * The appropriate partition lock must be held at entry, and will be
1520 * held at exit.
1521 */
1522 static void
CleanUpLock(LOCK * lock,PROCLOCK * proclock,LockMethod lockMethodTable,uint32 hashcode,bool wakeupNeeded)1523 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1524 LockMethod lockMethodTable, uint32 hashcode,
1525 bool wakeupNeeded)
1526 {
1527 /*
1528 * If this was my last hold on this lock, delete my entry in the proclock
1529 * table.
1530 */
1531 if (proclock->holdMask == 0)
1532 {
1533 uint32 proclock_hashcode;
1534
1535 PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
1536 SHMQueueDelete(&proclock->lockLink);
1537 SHMQueueDelete(&proclock->procLink);
1538 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1539 if (!hash_search_with_hash_value(LockMethodProcLockHash,
1540 (void *) &(proclock->tag),
1541 proclock_hashcode,
1542 HASH_REMOVE,
1543 NULL))
1544 elog(PANIC, "proclock table corrupted");
1545 }
1546
1547 if (lock->nRequested == 0)
1548 {
1549 /*
1550 * The caller just released the last lock, so garbage-collect the lock
1551 * object.
1552 */
1553 LOCK_PRINT("CleanUpLock: deleting", lock, 0);
1554 Assert(SHMQueueEmpty(&(lock->procLocks)));
1555 if (!hash_search_with_hash_value(LockMethodLockHash,
1556 (void *) &(lock->tag),
1557 hashcode,
1558 HASH_REMOVE,
1559 NULL))
1560 elog(PANIC, "lock table corrupted");
1561 }
1562 else if (wakeupNeeded)
1563 {
1564 /* There are waiters on this lock, so wake them up. */
1565 ProcLockWakeup(lockMethodTable, lock);
1566 }
1567 }
1568
1569 /*
1570 * GrantLockLocal -- update the locallock data structures to show
1571 * the lock request has been granted.
1572 *
1573 * We expect that LockAcquire made sure there is room to add a new
1574 * ResourceOwner entry.
1575 */
1576 static void
GrantLockLocal(LOCALLOCK * locallock,ResourceOwner owner)1577 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1578 {
1579 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1580 int i;
1581
1582 Assert(locallock->numLockOwners < locallock->maxLockOwners);
1583 /* Count the total */
1584 locallock->nLocks++;
1585 /* Count the per-owner lock */
1586 for (i = 0; i < locallock->numLockOwners; i++)
1587 {
1588 if (lockOwners[i].owner == owner)
1589 {
1590 lockOwners[i].nLocks++;
1591 return;
1592 }
1593 }
1594 lockOwners[i].owner = owner;
1595 lockOwners[i].nLocks = 1;
1596 locallock->numLockOwners++;
1597 if (owner != NULL)
1598 ResourceOwnerRememberLock(owner, locallock);
1599 }
1600
1601 /*
1602 * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
1603 * and arrange for error cleanup if it fails
1604 */
1605 static void
BeginStrongLockAcquire(LOCALLOCK * locallock,uint32 fasthashcode)1606 BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
1607 {
1608 Assert(StrongLockInProgress == NULL);
1609 Assert(locallock->holdsStrongLockCount == FALSE);
1610
1611 /*
1612 * Adding to a memory location is not atomic, so we take a spinlock to
1613 * ensure we don't collide with someone else trying to bump the count at
1614 * the same time.
1615 *
1616 * XXX: It might be worth considering using an atomic fetch-and-add
1617 * instruction here, on architectures where that is supported.
1618 */
1619
1620 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1621 FastPathStrongRelationLocks->count[fasthashcode]++;
1622 locallock->holdsStrongLockCount = TRUE;
1623 StrongLockInProgress = locallock;
1624 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1625 }
1626
1627 /*
1628 * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
1629 * acquisition once it's no longer needed
1630 */
1631 static void
FinishStrongLockAcquire(void)1632 FinishStrongLockAcquire(void)
1633 {
1634 StrongLockInProgress = NULL;
1635 }
1636
1637 /*
1638 * AbortStrongLockAcquire - undo strong lock state changes performed by
1639 * BeginStrongLockAcquire.
1640 */
1641 void
AbortStrongLockAcquire(void)1642 AbortStrongLockAcquire(void)
1643 {
1644 uint32 fasthashcode;
1645 LOCALLOCK *locallock = StrongLockInProgress;
1646
1647 if (locallock == NULL)
1648 return;
1649
1650 fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1651 Assert(locallock->holdsStrongLockCount == TRUE);
1652 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1653 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1654 FastPathStrongRelationLocks->count[fasthashcode]--;
1655 locallock->holdsStrongLockCount = FALSE;
1656 StrongLockInProgress = NULL;
1657 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1658 }
1659
1660 /*
1661 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1662 * WaitOnLock on.
1663 *
1664 * proc.c needs this for the case where we are booted off the lock by
1665 * timeout, but discover that someone granted us the lock anyway.
1666 *
1667 * We could just export GrantLockLocal, but that would require including
1668 * resowner.h in lock.h, which creates circularity.
1669 */
1670 void
GrantAwaitedLock(void)1671 GrantAwaitedLock(void)
1672 {
1673 GrantLockLocal(awaitedLock, awaitedOwner);
1674 }
1675
1676 /*
1677 * MarkLockClear -- mark an acquired lock as "clear"
1678 *
1679 * This means that we know we have absorbed all sinval messages that other
1680 * sessions generated before we acquired this lock, and so we can confidently
1681 * assume we know about any catalog changes protected by this lock.
1682 */
1683 void
MarkLockClear(LOCALLOCK * locallock)1684 MarkLockClear(LOCALLOCK *locallock)
1685 {
1686 Assert(locallock->nLocks > 0);
1687 locallock->lockCleared = true;
1688 }
1689
1690 /*
1691 * WaitOnLock -- wait to acquire a lock
1692 *
1693 * Caller must have set MyProc->heldLocks to reflect locks already held
1694 * on the lockable object by this process.
1695 *
1696 * The appropriate partition lock must be held at entry.
1697 */
1698 static void
WaitOnLock(LOCALLOCK * locallock,ResourceOwner owner)1699 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1700 {
1701 LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1702 LockMethod lockMethodTable = LockMethods[lockmethodid];
1703 char *volatile new_status = NULL;
1704
1705 LOCK_PRINT("WaitOnLock: sleeping on lock",
1706 locallock->lock, locallock->tag.mode);
1707
1708 /* Report change to waiting status */
1709 if (update_process_title)
1710 {
1711 const char *old_status;
1712 int len;
1713
1714 old_status = get_ps_display(&len);
1715 new_status = (char *) palloc(len + 8 + 1);
1716 memcpy(new_status, old_status, len);
1717 strcpy(new_status + len, " waiting");
1718 set_ps_display(new_status, false);
1719 new_status[len] = '\0'; /* truncate off " waiting" */
1720 }
1721 pgstat_report_wait_start(WAIT_LOCK, locallock->tag.lock.locktag_type);
1722
1723 awaitedLock = locallock;
1724 awaitedOwner = owner;
1725
1726 /*
1727 * NOTE: Think not to put any shared-state cleanup after the call to
1728 * ProcSleep, in either the normal or failure path. The lock state must
1729 * be fully set by the lock grantor, or by CheckDeadLock if we give up
1730 * waiting for the lock. This is necessary because of the possibility
1731 * that a cancel/die interrupt will interrupt ProcSleep after someone else
1732 * grants us the lock, but before we've noticed it. Hence, after granting,
1733 * the locktable state must fully reflect the fact that we own the lock;
1734 * we can't do additional work on return.
1735 *
1736 * We can and do use a PG_TRY block to try to clean up after failure, but
1737 * this still has a major limitation: elog(FATAL) can occur while waiting
1738 * (eg, a "die" interrupt), and then control won't come back here. So all
1739 * cleanup of essential state should happen in LockErrorCleanup, not here.
1740 * We can use PG_TRY to clear the "waiting" status flags, since doing that
1741 * is unimportant if the process exits.
1742 */
1743 PG_TRY();
1744 {
1745 if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1746 {
1747 /*
1748 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1749 * now.
1750 */
1751 awaitedLock = NULL;
1752 LOCK_PRINT("WaitOnLock: aborting on lock",
1753 locallock->lock, locallock->tag.mode);
1754 LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1755
1756 /*
1757 * Now that we aren't holding the partition lock, we can give an
1758 * error report including details about the detected deadlock.
1759 */
1760 DeadLockReport();
1761 /* not reached */
1762 }
1763 }
1764 PG_CATCH();
1765 {
1766 /* In this path, awaitedLock remains set until LockErrorCleanup */
1767
1768 /* Report change to non-waiting status */
1769 pgstat_report_wait_end();
1770 if (update_process_title)
1771 {
1772 set_ps_display(new_status, false);
1773 pfree(new_status);
1774 }
1775
1776 /* and propagate the error */
1777 PG_RE_THROW();
1778 }
1779 PG_END_TRY();
1780
1781 awaitedLock = NULL;
1782
1783 /* Report change to non-waiting status */
1784 pgstat_report_wait_end();
1785 if (update_process_title)
1786 {
1787 set_ps_display(new_status, false);
1788 pfree(new_status);
1789 }
1790
1791 LOCK_PRINT("WaitOnLock: wakeup on lock",
1792 locallock->lock, locallock->tag.mode);
1793 }
1794
1795 /*
1796 * Remove a proc from the wait-queue it is on (caller must know it is on one).
1797 * This is only used when the proc has failed to get the lock, so we set its
1798 * waitStatus to STATUS_ERROR.
1799 *
1800 * Appropriate partition lock must be held by caller. Also, caller is
1801 * responsible for signaling the proc if needed.
1802 *
1803 * NB: this does not clean up any locallock object that may exist for the lock.
1804 */
1805 void
RemoveFromWaitQueue(PGPROC * proc,uint32 hashcode)1806 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1807 {
1808 LOCK *waitLock = proc->waitLock;
1809 PROCLOCK *proclock = proc->waitProcLock;
1810 LOCKMODE lockmode = proc->waitLockMode;
1811 LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1812
1813 /* Make sure proc is waiting */
1814 Assert(proc->waitStatus == STATUS_WAITING);
1815 Assert(proc->links.next != NULL);
1816 Assert(waitLock);
1817 Assert(waitLock->waitProcs.size > 0);
1818 Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1819
1820 /* Remove proc from lock's wait queue */
1821 SHMQueueDelete(&(proc->links));
1822 waitLock->waitProcs.size--;
1823
1824 /* Undo increments of request counts by waiting process */
1825 Assert(waitLock->nRequested > 0);
1826 Assert(waitLock->nRequested > proc->waitLock->nGranted);
1827 waitLock->nRequested--;
1828 Assert(waitLock->requested[lockmode] > 0);
1829 waitLock->requested[lockmode]--;
1830 /* don't forget to clear waitMask bit if appropriate */
1831 if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1832 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1833
1834 /* Clean up the proc's own state, and pass it the ok/fail signal */
1835 proc->waitLock = NULL;
1836 proc->waitProcLock = NULL;
1837 proc->waitStatus = STATUS_ERROR;
1838
1839 /*
1840 * Delete the proclock immediately if it represents no already-held locks.
1841 * (This must happen now because if the owner of the lock decides to
1842 * release it, and the requested/granted counts then go to zero,
1843 * LockRelease expects there to be no remaining proclocks.) Then see if
1844 * any other waiters for the lock can be woken up now.
1845 */
1846 CleanUpLock(waitLock, proclock,
1847 LockMethods[lockmethodid], hashcode,
1848 true);
1849 }
1850
1851 /*
1852 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
1853 * Release a session lock if 'sessionLock' is true, else release a
1854 * regular transaction lock.
1855 *
1856 * Side Effects: find any waiting processes that are now wakable,
1857 * grant them their requested locks and awaken them.
1858 * (We have to grant the lock here to avoid a race between
1859 * the waking process and any new process to
1860 * come along and request the lock.)
1861 */
1862 bool
LockRelease(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)1863 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1864 {
1865 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
1866 LockMethod lockMethodTable;
1867 LOCALLOCKTAG localtag;
1868 LOCALLOCK *locallock;
1869 LOCK *lock;
1870 PROCLOCK *proclock;
1871 LWLock *partitionLock;
1872 bool wakeupNeeded;
1873
1874 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1875 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1876 lockMethodTable = LockMethods[lockmethodid];
1877 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
1878 elog(ERROR, "unrecognized lock mode: %d", lockmode);
1879
1880 #ifdef LOCK_DEBUG
1881 if (LOCK_DEBUG_ENABLED(locktag))
1882 elog(LOG, "LockRelease: lock [%u,%u] %s",
1883 locktag->locktag_field1, locktag->locktag_field2,
1884 lockMethodTable->lockModeNames[lockmode]);
1885 #endif
1886
1887 /*
1888 * Find the LOCALLOCK entry for this lock and lockmode
1889 */
1890 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
1891 localtag.lock = *locktag;
1892 localtag.mode = lockmode;
1893
1894 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1895 (void *) &localtag,
1896 HASH_FIND, NULL);
1897
1898 /*
1899 * let the caller print its own error message, too. Do not ereport(ERROR).
1900 */
1901 if (!locallock || locallock->nLocks <= 0)
1902 {
1903 elog(WARNING, "you don't own a lock of type %s",
1904 lockMethodTable->lockModeNames[lockmode]);
1905 return FALSE;
1906 }
1907
1908 /*
1909 * Decrease the count for the resource owner.
1910 */
1911 {
1912 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1913 ResourceOwner owner;
1914 int i;
1915
1916 /* Identify owner for lock */
1917 if (sessionLock)
1918 owner = NULL;
1919 else
1920 owner = CurrentResourceOwner;
1921
1922 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1923 {
1924 if (lockOwners[i].owner == owner)
1925 {
1926 Assert(lockOwners[i].nLocks > 0);
1927 if (--lockOwners[i].nLocks == 0)
1928 {
1929 if (owner != NULL)
1930 ResourceOwnerForgetLock(owner, locallock);
1931 /* compact out unused slot */
1932 locallock->numLockOwners--;
1933 if (i < locallock->numLockOwners)
1934 lockOwners[i] = lockOwners[locallock->numLockOwners];
1935 }
1936 break;
1937 }
1938 }
1939 if (i < 0)
1940 {
1941 /* don't release a lock belonging to another owner */
1942 elog(WARNING, "you don't own a lock of type %s",
1943 lockMethodTable->lockModeNames[lockmode]);
1944 return FALSE;
1945 }
1946 }
1947
1948 /*
1949 * Decrease the total local count. If we're still holding the lock, we're
1950 * done.
1951 */
1952 locallock->nLocks--;
1953
1954 if (locallock->nLocks > 0)
1955 return TRUE;
1956
1957 /*
1958 * At this point we can no longer suppose we are clear of invalidation
1959 * messages related to this lock. Although we'll delete the LOCALLOCK
1960 * object before any intentional return from this routine, it seems worth
1961 * the trouble to explicitly reset lockCleared right now, just in case
1962 * some error prevents us from deleting the LOCALLOCK.
1963 */
1964 locallock->lockCleared = false;
1965
1966 /* Attempt fast release of any lock eligible for the fast path. */
1967 if (EligibleForRelationFastPath(locktag, lockmode) &&
1968 FastPathLocalUseCount > 0)
1969 {
1970 bool released;
1971
1972 /*
1973 * We might not find the lock here, even if we originally entered it
1974 * here. Another backend may have moved it to the main table.
1975 */
1976 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
1977 released = FastPathUnGrantRelationLock(locktag->locktag_field2,
1978 lockmode);
1979 LWLockRelease(&MyProc->backendLock);
1980 if (released)
1981 {
1982 RemoveLocalLock(locallock);
1983 return TRUE;
1984 }
1985 }
1986
1987 /*
1988 * Otherwise we've got to mess with the shared lock table.
1989 */
1990 partitionLock = LockHashPartitionLock(locallock->hashcode);
1991
1992 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1993
1994 /*
1995 * Normally, we don't need to re-find the lock or proclock, since we kept
1996 * their addresses in the locallock table, and they couldn't have been
1997 * removed while we were holding a lock on them. But it's possible that
1998 * the lock was taken fast-path and has since been moved to the main hash
1999 * table by another backend, in which case we will need to look up the
2000 * objects here. We assume the lock field is NULL if so.
2001 */
2002 lock = locallock->lock;
2003 if (!lock)
2004 {
2005 PROCLOCKTAG proclocktag;
2006
2007 Assert(EligibleForRelationFastPath(locktag, lockmode));
2008 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2009 (const void *) locktag,
2010 locallock->hashcode,
2011 HASH_FIND,
2012 NULL);
2013 if (!lock)
2014 elog(ERROR, "failed to re-find shared lock object");
2015 locallock->lock = lock;
2016
2017 proclocktag.myLock = lock;
2018 proclocktag.myProc = MyProc;
2019 locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2020 (void *) &proclocktag,
2021 HASH_FIND,
2022 NULL);
2023 if (!locallock->proclock)
2024 elog(ERROR, "failed to re-find shared proclock object");
2025 }
2026 LOCK_PRINT("LockRelease: found", lock, lockmode);
2027 proclock = locallock->proclock;
2028 PROCLOCK_PRINT("LockRelease: found", proclock);
2029
2030 /*
2031 * Double-check that we are actually holding a lock of the type we want to
2032 * release.
2033 */
2034 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
2035 {
2036 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
2037 LWLockRelease(partitionLock);
2038 elog(WARNING, "you don't own a lock of type %s",
2039 lockMethodTable->lockModeNames[lockmode]);
2040 RemoveLocalLock(locallock);
2041 return FALSE;
2042 }
2043
2044 /*
2045 * Do the releasing. CleanUpLock will waken any now-wakable waiters.
2046 */
2047 wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
2048
2049 CleanUpLock(lock, proclock,
2050 lockMethodTable, locallock->hashcode,
2051 wakeupNeeded);
2052
2053 LWLockRelease(partitionLock);
2054
2055 RemoveLocalLock(locallock);
2056 return TRUE;
2057 }
2058
2059 /*
2060 * LockReleaseAll -- Release all locks of the specified lock method that
2061 * are held by the current process.
2062 *
2063 * Well, not necessarily *all* locks. The available behaviors are:
2064 * allLocks == true: release all locks including session locks.
2065 * allLocks == false: release all non-session locks.
2066 */
2067 void
LockReleaseAll(LOCKMETHODID lockmethodid,bool allLocks)2068 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
2069 {
2070 HASH_SEQ_STATUS status;
2071 LockMethod lockMethodTable;
2072 int i,
2073 numLockModes;
2074 LOCALLOCK *locallock;
2075 LOCK *lock;
2076 PROCLOCK *proclock;
2077 int partition;
2078 bool have_fast_path_lwlock = false;
2079
2080 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2081 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2082 lockMethodTable = LockMethods[lockmethodid];
2083
2084 #ifdef LOCK_DEBUG
2085 if (*(lockMethodTable->trace_flag))
2086 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
2087 #endif
2088
2089 /*
2090 * Get rid of our fast-path VXID lock, if appropriate. Note that this is
2091 * the only way that the lock we hold on our own VXID can ever get
2092 * released: it is always and only released when a toplevel transaction
2093 * ends.
2094 */
2095 if (lockmethodid == DEFAULT_LOCKMETHOD)
2096 VirtualXactLockTableCleanup();
2097
2098 numLockModes = lockMethodTable->numLockModes;
2099
2100 /*
2101 * First we run through the locallock table and get rid of unwanted
2102 * entries, then we scan the process's proclocks and get rid of those. We
2103 * do this separately because we may have multiple locallock entries
2104 * pointing to the same proclock, and we daren't end up with any dangling
2105 * pointers. Fast-path locks are cleaned up during the locallock table
2106 * scan, though.
2107 */
2108 hash_seq_init(&status, LockMethodLocalHash);
2109
2110 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2111 {
2112 /*
2113 * If the LOCALLOCK entry is unused, we must've run out of shared
2114 * memory while trying to set up this lock. Just forget the local
2115 * entry.
2116 */
2117 if (locallock->nLocks == 0)
2118 {
2119 RemoveLocalLock(locallock);
2120 continue;
2121 }
2122
2123 /* Ignore items that are not of the lockmethod to be removed */
2124 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2125 continue;
2126
2127 /*
2128 * If we are asked to release all locks, we can just zap the entry.
2129 * Otherwise, must scan to see if there are session locks. We assume
2130 * there is at most one lockOwners entry for session locks.
2131 */
2132 if (!allLocks)
2133 {
2134 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
2135
2136 /* If session lock is above array position 0, move it down to 0 */
2137 for (i = 0; i < locallock->numLockOwners; i++)
2138 {
2139 if (lockOwners[i].owner == NULL)
2140 lockOwners[0] = lockOwners[i];
2141 else
2142 ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
2143 }
2144
2145 if (locallock->numLockOwners > 0 &&
2146 lockOwners[0].owner == NULL &&
2147 lockOwners[0].nLocks > 0)
2148 {
2149 /* Fix the locallock to show just the session locks */
2150 locallock->nLocks = lockOwners[0].nLocks;
2151 locallock->numLockOwners = 1;
2152 /* We aren't deleting this locallock, so done */
2153 continue;
2154 }
2155 else
2156 locallock->numLockOwners = 0;
2157 }
2158
2159 /*
2160 * If the lock or proclock pointers are NULL, this lock was taken via
2161 * the relation fast-path (and is not known to have been transferred).
2162 */
2163 if (locallock->proclock == NULL || locallock->lock == NULL)
2164 {
2165 LOCKMODE lockmode = locallock->tag.mode;
2166 Oid relid;
2167
2168 /* Verify that a fast-path lock is what we've got. */
2169 if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
2170 elog(PANIC, "locallock table corrupted");
2171
2172 /*
2173 * If we don't currently hold the LWLock that protects our
2174 * fast-path data structures, we must acquire it before attempting
2175 * to release the lock via the fast-path. We will continue to
2176 * hold the LWLock until we're done scanning the locallock table,
2177 * unless we hit a transferred fast-path lock. (XXX is this
2178 * really such a good idea? There could be a lot of entries ...)
2179 */
2180 if (!have_fast_path_lwlock)
2181 {
2182 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2183 have_fast_path_lwlock = true;
2184 }
2185
2186 /* Attempt fast-path release. */
2187 relid = locallock->tag.lock.locktag_field2;
2188 if (FastPathUnGrantRelationLock(relid, lockmode))
2189 {
2190 RemoveLocalLock(locallock);
2191 continue;
2192 }
2193
2194 /*
2195 * Our lock, originally taken via the fast path, has been
2196 * transferred to the main lock table. That's going to require
2197 * some extra work, so release our fast-path lock before starting.
2198 */
2199 LWLockRelease(&MyProc->backendLock);
2200 have_fast_path_lwlock = false;
2201
2202 /*
2203 * Now dump the lock. We haven't got a pointer to the LOCK or
2204 * PROCLOCK in this case, so we have to handle this a bit
2205 * differently than a normal lock release. Unfortunately, this
2206 * requires an extra LWLock acquire-and-release cycle on the
2207 * partitionLock, but hopefully it shouldn't happen often.
2208 */
2209 LockRefindAndRelease(lockMethodTable, MyProc,
2210 &locallock->tag.lock, lockmode, false);
2211 RemoveLocalLock(locallock);
2212 continue;
2213 }
2214
2215 /* Mark the proclock to show we need to release this lockmode */
2216 if (locallock->nLocks > 0)
2217 locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
2218
2219 /* And remove the locallock hashtable entry */
2220 RemoveLocalLock(locallock);
2221 }
2222
2223 /* Done with the fast-path data structures */
2224 if (have_fast_path_lwlock)
2225 LWLockRelease(&MyProc->backendLock);
2226
2227 /*
2228 * Now, scan each lock partition separately.
2229 */
2230 for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
2231 {
2232 LWLock *partitionLock;
2233 SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
2234 PROCLOCK *nextplock;
2235
2236 partitionLock = LockHashPartitionLockByIndex(partition);
2237
2238 /*
2239 * If the proclock list for this partition is empty, we can skip
2240 * acquiring the partition lock. This optimization is trickier than
2241 * it looks, because another backend could be in process of adding
2242 * something to our proclock list due to promoting one of our
2243 * fast-path locks. However, any such lock must be one that we
2244 * decided not to delete above, so it's okay to skip it again now;
2245 * we'd just decide not to delete it again. We must, however, be
2246 * careful to re-fetch the list header once we've acquired the
2247 * partition lock, to be sure we have a valid, up-to-date pointer.
2248 * (There is probably no significant risk if pointer fetch/store is
2249 * atomic, but we don't wish to assume that.)
2250 *
2251 * XXX This argument assumes that the locallock table correctly
2252 * represents all of our fast-path locks. While allLocks mode
2253 * guarantees to clean up all of our normal locks regardless of the
2254 * locallock situation, we lose that guarantee for fast-path locks.
2255 * This is not ideal.
2256 */
2257 if (SHMQueueNext(procLocks, procLocks,
2258 offsetof(PROCLOCK, procLink)) == NULL)
2259 continue; /* needn't examine this partition */
2260
2261 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2262
2263 for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2264 offsetof(PROCLOCK, procLink));
2265 proclock;
2266 proclock = nextplock)
2267 {
2268 bool wakeupNeeded = false;
2269
2270 /* Get link first, since we may unlink/delete this proclock */
2271 nextplock = (PROCLOCK *)
2272 SHMQueueNext(procLocks, &proclock->procLink,
2273 offsetof(PROCLOCK, procLink));
2274
2275 Assert(proclock->tag.myProc == MyProc);
2276
2277 lock = proclock->tag.myLock;
2278
2279 /* Ignore items that are not of the lockmethod to be removed */
2280 if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
2281 continue;
2282
2283 /*
2284 * In allLocks mode, force release of all locks even if locallock
2285 * table had problems
2286 */
2287 if (allLocks)
2288 proclock->releaseMask = proclock->holdMask;
2289 else
2290 Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
2291
2292 /*
2293 * Ignore items that have nothing to be released, unless they have
2294 * holdMask == 0 and are therefore recyclable
2295 */
2296 if (proclock->releaseMask == 0 && proclock->holdMask != 0)
2297 continue;
2298
2299 PROCLOCK_PRINT("LockReleaseAll", proclock);
2300 LOCK_PRINT("LockReleaseAll", lock, 0);
2301 Assert(lock->nRequested >= 0);
2302 Assert(lock->nGranted >= 0);
2303 Assert(lock->nGranted <= lock->nRequested);
2304 Assert((proclock->holdMask & ~lock->grantMask) == 0);
2305
2306 /*
2307 * Release the previously-marked lock modes
2308 */
2309 for (i = 1; i <= numLockModes; i++)
2310 {
2311 if (proclock->releaseMask & LOCKBIT_ON(i))
2312 wakeupNeeded |= UnGrantLock(lock, i, proclock,
2313 lockMethodTable);
2314 }
2315 Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
2316 Assert(lock->nGranted <= lock->nRequested);
2317 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
2318
2319 proclock->releaseMask = 0;
2320
2321 /* CleanUpLock will wake up waiters if needed. */
2322 CleanUpLock(lock, proclock,
2323 lockMethodTable,
2324 LockTagHashCode(&lock->tag),
2325 wakeupNeeded);
2326 } /* loop over PROCLOCKs within this partition */
2327
2328 LWLockRelease(partitionLock);
2329 } /* loop over partitions */
2330
2331 #ifdef LOCK_DEBUG
2332 if (*(lockMethodTable->trace_flag))
2333 elog(LOG, "LockReleaseAll done");
2334 #endif
2335 }
2336
2337 /*
2338 * LockReleaseSession -- Release all session locks of the specified lock method
2339 * that are held by the current process.
2340 */
2341 void
LockReleaseSession(LOCKMETHODID lockmethodid)2342 LockReleaseSession(LOCKMETHODID lockmethodid)
2343 {
2344 HASH_SEQ_STATUS status;
2345 LOCALLOCK *locallock;
2346
2347 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2348 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2349
2350 hash_seq_init(&status, LockMethodLocalHash);
2351
2352 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2353 {
2354 /* Ignore items that are not of the specified lock method */
2355 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2356 continue;
2357
2358 ReleaseLockIfHeld(locallock, true);
2359 }
2360 }
2361
2362 /*
2363 * LockReleaseCurrentOwner
2364 * Release all locks belonging to CurrentResourceOwner
2365 *
2366 * If the caller knows what those locks are, it can pass them as an array.
2367 * That speeds up the call significantly, when a lot of locks are held.
2368 * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
2369 * table to find them.
2370 */
2371 void
LockReleaseCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2372 LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2373 {
2374 if (locallocks == NULL)
2375 {
2376 HASH_SEQ_STATUS status;
2377 LOCALLOCK *locallock;
2378
2379 hash_seq_init(&status, LockMethodLocalHash);
2380
2381 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2382 ReleaseLockIfHeld(locallock, false);
2383 }
2384 else
2385 {
2386 int i;
2387
2388 for (i = nlocks - 1; i >= 0; i--)
2389 ReleaseLockIfHeld(locallocks[i], false);
2390 }
2391 }
2392
2393 /*
2394 * ReleaseLockIfHeld
2395 * Release any session-level locks on this lockable object if sessionLock
2396 * is true; else, release any locks held by CurrentResourceOwner.
2397 *
2398 * It is tempting to pass this a ResourceOwner pointer (or NULL for session
2399 * locks), but without refactoring LockRelease() we cannot support releasing
2400 * locks belonging to resource owners other than CurrentResourceOwner.
2401 * If we were to refactor, it'd be a good idea to fix it so we don't have to
2402 * do a hashtable lookup of the locallock, too. However, currently this
2403 * function isn't used heavily enough to justify refactoring for its
2404 * convenience.
2405 */
2406 static void
ReleaseLockIfHeld(LOCALLOCK * locallock,bool sessionLock)2407 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
2408 {
2409 ResourceOwner owner;
2410 LOCALLOCKOWNER *lockOwners;
2411 int i;
2412
2413 /* Identify owner for lock (must match LockRelease!) */
2414 if (sessionLock)
2415 owner = NULL;
2416 else
2417 owner = CurrentResourceOwner;
2418
2419 /* Scan to see if there are any locks belonging to the target owner */
2420 lockOwners = locallock->lockOwners;
2421 for (i = locallock->numLockOwners - 1; i >= 0; i--)
2422 {
2423 if (lockOwners[i].owner == owner)
2424 {
2425 Assert(lockOwners[i].nLocks > 0);
2426 if (lockOwners[i].nLocks < locallock->nLocks)
2427 {
2428 /*
2429 * We will still hold this lock after forgetting this
2430 * ResourceOwner.
2431 */
2432 locallock->nLocks -= lockOwners[i].nLocks;
2433 /* compact out unused slot */
2434 locallock->numLockOwners--;
2435 if (owner != NULL)
2436 ResourceOwnerForgetLock(owner, locallock);
2437 if (i < locallock->numLockOwners)
2438 lockOwners[i] = lockOwners[locallock->numLockOwners];
2439 }
2440 else
2441 {
2442 Assert(lockOwners[i].nLocks == locallock->nLocks);
2443 /* We want to call LockRelease just once */
2444 lockOwners[i].nLocks = 1;
2445 locallock->nLocks = 1;
2446 if (!LockRelease(&locallock->tag.lock,
2447 locallock->tag.mode,
2448 sessionLock))
2449 elog(WARNING, "ReleaseLockIfHeld: failed??");
2450 }
2451 break;
2452 }
2453 }
2454 }
2455
2456 /*
2457 * LockReassignCurrentOwner
2458 * Reassign all locks belonging to CurrentResourceOwner to belong
2459 * to its parent resource owner.
2460 *
2461 * If the caller knows what those locks are, it can pass them as an array.
2462 * That speeds up the call significantly, when a lot of locks are held
2463 * (e.g pg_dump with a large schema). Otherwise, pass NULL for locallocks,
2464 * and we'll traverse through our hash table to find them.
2465 */
2466 void
LockReassignCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2467 LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2468 {
2469 ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
2470
2471 Assert(parent != NULL);
2472
2473 if (locallocks == NULL)
2474 {
2475 HASH_SEQ_STATUS status;
2476 LOCALLOCK *locallock;
2477
2478 hash_seq_init(&status, LockMethodLocalHash);
2479
2480 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2481 LockReassignOwner(locallock, parent);
2482 }
2483 else
2484 {
2485 int i;
2486
2487 for (i = nlocks - 1; i >= 0; i--)
2488 LockReassignOwner(locallocks[i], parent);
2489 }
2490 }
2491
2492 /*
2493 * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
2494 * CurrentResourceOwner to its parent.
2495 */
2496 static void
LockReassignOwner(LOCALLOCK * locallock,ResourceOwner parent)2497 LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
2498 {
2499 LOCALLOCKOWNER *lockOwners;
2500 int i;
2501 int ic = -1;
2502 int ip = -1;
2503
2504 /*
2505 * Scan to see if there are any locks belonging to current owner or its
2506 * parent
2507 */
2508 lockOwners = locallock->lockOwners;
2509 for (i = locallock->numLockOwners - 1; i >= 0; i--)
2510 {
2511 if (lockOwners[i].owner == CurrentResourceOwner)
2512 ic = i;
2513 else if (lockOwners[i].owner == parent)
2514 ip = i;
2515 }
2516
2517 if (ic < 0)
2518 return; /* no current locks */
2519
2520 if (ip < 0)
2521 {
2522 /* Parent has no slot, so just give it the child's slot */
2523 lockOwners[ic].owner = parent;
2524 ResourceOwnerRememberLock(parent, locallock);
2525 }
2526 else
2527 {
2528 /* Merge child's count with parent's */
2529 lockOwners[ip].nLocks += lockOwners[ic].nLocks;
2530 /* compact out unused slot */
2531 locallock->numLockOwners--;
2532 if (ic < locallock->numLockOwners)
2533 lockOwners[ic] = lockOwners[locallock->numLockOwners];
2534 }
2535 ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
2536 }
2537
2538 /*
2539 * FastPathGrantRelationLock
2540 * Grant lock using per-backend fast-path array, if there is space.
2541 */
2542 static bool
FastPathGrantRelationLock(Oid relid,LOCKMODE lockmode)2543 FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2544 {
2545 uint32 f;
2546 uint32 unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
2547
2548 /* Scan for existing entry for this relid, remembering empty slot. */
2549 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2550 {
2551 if (FAST_PATH_GET_BITS(MyProc, f) == 0)
2552 unused_slot = f;
2553 else if (MyProc->fpRelId[f] == relid)
2554 {
2555 Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
2556 FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
2557 return true;
2558 }
2559 }
2560
2561 /* If no existing entry, use any empty slot. */
2562 if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
2563 {
2564 MyProc->fpRelId[unused_slot] = relid;
2565 FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
2566 ++FastPathLocalUseCount;
2567 return true;
2568 }
2569
2570 /* No existing entry, and no empty slot. */
2571 return false;
2572 }
2573
2574 /*
2575 * FastPathUnGrantRelationLock
2576 * Release fast-path lock, if present. Update backend-private local
2577 * use count, while we're at it.
2578 */
2579 static bool
FastPathUnGrantRelationLock(Oid relid,LOCKMODE lockmode)2580 FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2581 {
2582 uint32 f;
2583 bool result = false;
2584
2585 FastPathLocalUseCount = 0;
2586 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2587 {
2588 if (MyProc->fpRelId[f] == relid
2589 && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2590 {
2591 Assert(!result);
2592 FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2593 result = true;
2594 /* we continue iterating so as to update FastPathLocalUseCount */
2595 }
2596 if (FAST_PATH_GET_BITS(MyProc, f) != 0)
2597 ++FastPathLocalUseCount;
2598 }
2599 return result;
2600 }
2601
2602 /*
2603 * FastPathTransferRelationLocks
2604 * Transfer locks matching the given lock tag from per-backend fast-path
2605 * arrays to the shared hash table.
2606 *
2607 * Returns true if successful, false if ran out of shared memory.
2608 */
2609 static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable,const LOCKTAG * locktag,uint32 hashcode)2610 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2611 uint32 hashcode)
2612 {
2613 LWLock *partitionLock = LockHashPartitionLock(hashcode);
2614 Oid relid = locktag->locktag_field2;
2615 uint32 i;
2616
2617 /*
2618 * Every PGPROC that can potentially hold a fast-path lock is present in
2619 * ProcGlobal->allProcs. Prepared transactions are not, but any
2620 * outstanding fast-path locks held by prepared transactions are
2621 * transferred to the main lock table.
2622 */
2623 for (i = 0; i < ProcGlobal->allProcCount; i++)
2624 {
2625 PGPROC *proc = &ProcGlobal->allProcs[i];
2626 uint32 f;
2627
2628 LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
2629
2630 /*
2631 * If the target backend isn't referencing the same database as the
2632 * lock, then we needn't examine the individual relation IDs at all;
2633 * none of them can be relevant.
2634 *
2635 * proc->databaseId is set at backend startup time and never changes
2636 * thereafter, so it might be safe to perform this test before
2637 * acquiring &proc->backendLock. In particular, it's certainly safe
2638 * to assume that if the target backend holds any fast-path locks, it
2639 * must have performed a memory-fencing operation (in particular, an
2640 * LWLock acquisition) since setting proc->databaseId. However, it's
2641 * less clear that our backend is certain to have performed a memory
2642 * fencing operation since the other backend set proc->databaseId. So
2643 * for now, we test it after acquiring the LWLock just to be safe.
2644 */
2645 if (proc->databaseId != locktag->locktag_field1)
2646 {
2647 LWLockRelease(&proc->backendLock);
2648 continue;
2649 }
2650
2651 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2652 {
2653 uint32 lockmode;
2654
2655 /* Look for an allocated slot matching the given relid. */
2656 if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
2657 continue;
2658
2659 /* Find or create lock object. */
2660 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2661 for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
2662 lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
2663 ++lockmode)
2664 {
2665 PROCLOCK *proclock;
2666
2667 if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
2668 continue;
2669 proclock = SetupLockInTable(lockMethodTable, proc, locktag,
2670 hashcode, lockmode);
2671 if (!proclock)
2672 {
2673 LWLockRelease(partitionLock);
2674 LWLockRelease(&proc->backendLock);
2675 return false;
2676 }
2677 GrantLock(proclock->tag.myLock, proclock, lockmode);
2678 FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
2679 }
2680 LWLockRelease(partitionLock);
2681
2682 /* No need to examine remaining slots. */
2683 break;
2684 }
2685 LWLockRelease(&proc->backendLock);
2686 }
2687 return true;
2688 }
2689
2690 /*
2691 * FastPathGetLockEntry
2692 * Return the PROCLOCK for a lock originally taken via the fast-path,
2693 * transferring it to the primary lock table if necessary.
2694 *
2695 * Note: caller takes care of updating the locallock object.
2696 */
2697 static PROCLOCK *
FastPathGetRelationLockEntry(LOCALLOCK * locallock)2698 FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2699 {
2700 LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
2701 LOCKTAG *locktag = &locallock->tag.lock;
2702 PROCLOCK *proclock = NULL;
2703 LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode);
2704 Oid relid = locktag->locktag_field2;
2705 uint32 f;
2706
2707 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2708
2709 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2710 {
2711 uint32 lockmode;
2712
2713 /* Look for an allocated slot matching the given relid. */
2714 if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
2715 continue;
2716
2717 /* If we don't have a lock of the given mode, forget it! */
2718 lockmode = locallock->tag.mode;
2719 if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2720 break;
2721
2722 /* Find or create lock object. */
2723 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2724
2725 proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
2726 locallock->hashcode, lockmode);
2727 if (!proclock)
2728 {
2729 LWLockRelease(partitionLock);
2730 LWLockRelease(&MyProc->backendLock);
2731 ereport(ERROR,
2732 (errcode(ERRCODE_OUT_OF_MEMORY),
2733 errmsg("out of shared memory"),
2734 errhint("You might need to increase max_locks_per_transaction.")));
2735 }
2736 GrantLock(proclock->tag.myLock, proclock, lockmode);
2737 FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2738
2739 LWLockRelease(partitionLock);
2740
2741 /* No need to examine remaining slots. */
2742 break;
2743 }
2744
2745 LWLockRelease(&MyProc->backendLock);
2746
2747 /* Lock may have already been transferred by some other backend. */
2748 if (proclock == NULL)
2749 {
2750 LOCK *lock;
2751 PROCLOCKTAG proclocktag;
2752 uint32 proclock_hashcode;
2753
2754 LWLockAcquire(partitionLock, LW_SHARED);
2755
2756 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2757 (void *) locktag,
2758 locallock->hashcode,
2759 HASH_FIND,
2760 NULL);
2761 if (!lock)
2762 elog(ERROR, "failed to re-find shared lock object");
2763
2764 proclocktag.myLock = lock;
2765 proclocktag.myProc = MyProc;
2766
2767 proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
2768 proclock = (PROCLOCK *)
2769 hash_search_with_hash_value(LockMethodProcLockHash,
2770 (void *) &proclocktag,
2771 proclock_hashcode,
2772 HASH_FIND,
2773 NULL);
2774 if (!proclock)
2775 elog(ERROR, "failed to re-find shared proclock object");
2776 LWLockRelease(partitionLock);
2777 }
2778
2779 return proclock;
2780 }
2781
2782 /*
2783 * GetLockConflicts
2784 * Get an array of VirtualTransactionIds of xacts currently holding locks
2785 * that would conflict with the specified lock/lockmode.
2786 * xacts merely awaiting such a lock are NOT reported.
2787 *
2788 * The result array is palloc'd and is terminated with an invalid VXID.
2789 *
2790 * Of course, the result could be out of date by the time it's returned, so
2791 * use of this function has to be thought about carefully. Similarly, a
2792 * PGPROC with no "lxid" will be considered non-conflicting regardless of any
2793 * lock it holds. Existing callers don't care about a locker after that
2794 * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after
2795 * pg_xact updates and before releasing locks.
2796 *
2797 * Note we never include the current xact's vxid in the result array,
2798 * since an xact never blocks itself.
2799 */
2800 VirtualTransactionId *
GetLockConflicts(const LOCKTAG * locktag,LOCKMODE lockmode)2801 GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
2802 {
2803 static VirtualTransactionId *vxids;
2804 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
2805 LockMethod lockMethodTable;
2806 LOCK *lock;
2807 LOCKMASK conflictMask;
2808 SHM_QUEUE *procLocks;
2809 PROCLOCK *proclock;
2810 uint32 hashcode;
2811 LWLock *partitionLock;
2812 int count = 0;
2813 int fast_count = 0;
2814
2815 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2816 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2817 lockMethodTable = LockMethods[lockmethodid];
2818 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
2819 elog(ERROR, "unrecognized lock mode: %d", lockmode);
2820
2821 /*
2822 * Allocate memory to store results, and fill with InvalidVXID. We only
2823 * need enough space for MaxBackends + max_prepared_xacts + a terminator.
2824 * InHotStandby allocate once in TopMemoryContext.
2825 */
2826 if (InHotStandby)
2827 {
2828 if (vxids == NULL)
2829 vxids = (VirtualTransactionId *)
2830 MemoryContextAlloc(TopMemoryContext,
2831 sizeof(VirtualTransactionId) *
2832 (MaxBackends + max_prepared_xacts + 1));
2833 }
2834 else
2835 vxids = (VirtualTransactionId *)
2836 palloc0(sizeof(VirtualTransactionId) *
2837 (MaxBackends + max_prepared_xacts + 1));
2838
2839 /* Compute hash code and partition lock, and look up conflicting modes. */
2840 hashcode = LockTagHashCode(locktag);
2841 partitionLock = LockHashPartitionLock(hashcode);
2842 conflictMask = lockMethodTable->conflictTab[lockmode];
2843
2844 /*
2845 * Fast path locks might not have been entered in the primary lock table.
2846 * If the lock we're dealing with could conflict with such a lock, we must
2847 * examine each backend's fast-path array for conflicts.
2848 */
2849 if (ConflictsWithRelationFastPath(locktag, lockmode))
2850 {
2851 int i;
2852 Oid relid = locktag->locktag_field2;
2853 VirtualTransactionId vxid;
2854
2855 /*
2856 * Iterate over relevant PGPROCs. Anything held by a prepared
2857 * transaction will have been transferred to the primary lock table,
2858 * so we need not worry about those. This is all a bit fuzzy, because
2859 * new locks could be taken after we've visited a particular
2860 * partition, but the callers had better be prepared to deal with that
2861 * anyway, since the locks could equally well be taken between the
2862 * time we return the value and the time the caller does something
2863 * with it.
2864 */
2865 for (i = 0; i < ProcGlobal->allProcCount; i++)
2866 {
2867 PGPROC *proc = &ProcGlobal->allProcs[i];
2868 uint32 f;
2869
2870 /* A backend never blocks itself */
2871 if (proc == MyProc)
2872 continue;
2873
2874 LWLockAcquire(&proc->backendLock, LW_SHARED);
2875
2876 /*
2877 * If the target backend isn't referencing the same database as
2878 * the lock, then we needn't examine the individual relation IDs
2879 * at all; none of them can be relevant.
2880 *
2881 * See FastPathTransferLocks() for discussion of why we do this
2882 * test after acquiring the lock.
2883 */
2884 if (proc->databaseId != locktag->locktag_field1)
2885 {
2886 LWLockRelease(&proc->backendLock);
2887 continue;
2888 }
2889
2890 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2891 {
2892 uint32 lockmask;
2893
2894 /* Look for an allocated slot matching the given relid. */
2895 if (relid != proc->fpRelId[f])
2896 continue;
2897 lockmask = FAST_PATH_GET_BITS(proc, f);
2898 if (!lockmask)
2899 continue;
2900 lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
2901
2902 /*
2903 * There can only be one entry per relation, so if we found it
2904 * and it doesn't conflict, we can skip the rest of the slots.
2905 */
2906 if ((lockmask & conflictMask) == 0)
2907 break;
2908
2909 /* Conflict! */
2910 GET_VXID_FROM_PGPROC(vxid, *proc);
2911
2912 if (VirtualTransactionIdIsValid(vxid))
2913 vxids[count++] = vxid;
2914 /* else, xact already committed or aborted */
2915
2916 /* No need to examine remaining slots. */
2917 break;
2918 }
2919
2920 LWLockRelease(&proc->backendLock);
2921 }
2922 }
2923
2924 /* Remember how many fast-path conflicts we found. */
2925 fast_count = count;
2926
2927 /*
2928 * Look up the lock object matching the tag.
2929 */
2930 LWLockAcquire(partitionLock, LW_SHARED);
2931
2932 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2933 (const void *) locktag,
2934 hashcode,
2935 HASH_FIND,
2936 NULL);
2937 if (!lock)
2938 {
2939 /*
2940 * If the lock object doesn't exist, there is nothing holding a lock
2941 * on this lockable object.
2942 */
2943 LWLockRelease(partitionLock);
2944 vxids[count].backendId = InvalidBackendId;
2945 vxids[count].localTransactionId = InvalidLocalTransactionId;
2946 return vxids;
2947 }
2948
2949 /*
2950 * Examine each existing holder (or awaiter) of the lock.
2951 */
2952
2953 procLocks = &(lock->procLocks);
2954
2955 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2956 offsetof(PROCLOCK, lockLink));
2957
2958 while (proclock)
2959 {
2960 if (conflictMask & proclock->holdMask)
2961 {
2962 PGPROC *proc = proclock->tag.myProc;
2963
2964 /* A backend never blocks itself */
2965 if (proc != MyProc)
2966 {
2967 VirtualTransactionId vxid;
2968
2969 GET_VXID_FROM_PGPROC(vxid, *proc);
2970
2971 if (VirtualTransactionIdIsValid(vxid))
2972 {
2973 int i;
2974
2975 /* Avoid duplicate entries. */
2976 for (i = 0; i < fast_count; ++i)
2977 if (VirtualTransactionIdEquals(vxids[i], vxid))
2978 break;
2979 if (i >= fast_count)
2980 vxids[count++] = vxid;
2981 }
2982 /* else, xact already committed or aborted */
2983 }
2984 }
2985
2986 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
2987 offsetof(PROCLOCK, lockLink));
2988 }
2989
2990 LWLockRelease(partitionLock);
2991
2992 if (count > MaxBackends + max_prepared_xacts) /* should never happen */
2993 elog(PANIC, "too many conflicting locks found");
2994
2995 vxids[count].backendId = InvalidBackendId;
2996 vxids[count].localTransactionId = InvalidLocalTransactionId;
2997 return vxids;
2998 }
2999
3000 /*
3001 * Find a lock in the shared lock table and release it. It is the caller's
3002 * responsibility to verify that this is a sane thing to do. (For example, it
3003 * would be bad to release a lock here if there might still be a LOCALLOCK
3004 * object with pointers to it.)
3005 *
3006 * We currently use this in two situations: first, to release locks held by
3007 * prepared transactions on commit (see lock_twophase_postcommit); and second,
3008 * to release locks taken via the fast-path, transferred to the main hash
3009 * table, and then released (see LockReleaseAll).
3010 */
3011 static void
LockRefindAndRelease(LockMethod lockMethodTable,PGPROC * proc,LOCKTAG * locktag,LOCKMODE lockmode,bool decrement_strong_lock_count)3012 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
3013 LOCKTAG *locktag, LOCKMODE lockmode,
3014 bool decrement_strong_lock_count)
3015 {
3016 LOCK *lock;
3017 PROCLOCK *proclock;
3018 PROCLOCKTAG proclocktag;
3019 uint32 hashcode;
3020 uint32 proclock_hashcode;
3021 LWLock *partitionLock;
3022 bool wakeupNeeded;
3023
3024 hashcode = LockTagHashCode(locktag);
3025 partitionLock = LockHashPartitionLock(hashcode);
3026
3027 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3028
3029 /*
3030 * Re-find the lock object (it had better be there).
3031 */
3032 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
3033 (void *) locktag,
3034 hashcode,
3035 HASH_FIND,
3036 NULL);
3037 if (!lock)
3038 elog(PANIC, "failed to re-find shared lock object");
3039
3040 /*
3041 * Re-find the proclock object (ditto).
3042 */
3043 proclocktag.myLock = lock;
3044 proclocktag.myProc = proc;
3045
3046 proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3047
3048 proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
3049 (void *) &proclocktag,
3050 proclock_hashcode,
3051 HASH_FIND,
3052 NULL);
3053 if (!proclock)
3054 elog(PANIC, "failed to re-find shared proclock object");
3055
3056 /*
3057 * Double-check that we are actually holding a lock of the type we want to
3058 * release.
3059 */
3060 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
3061 {
3062 PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
3063 LWLockRelease(partitionLock);
3064 elog(WARNING, "you don't own a lock of type %s",
3065 lockMethodTable->lockModeNames[lockmode]);
3066 return;
3067 }
3068
3069 /*
3070 * Do the releasing. CleanUpLock will waken any now-wakable waiters.
3071 */
3072 wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
3073
3074 CleanUpLock(lock, proclock,
3075 lockMethodTable, hashcode,
3076 wakeupNeeded);
3077
3078 LWLockRelease(partitionLock);
3079
3080 /*
3081 * Decrement strong lock count. This logic is needed only for 2PC.
3082 */
3083 if (decrement_strong_lock_count
3084 && ConflictsWithRelationFastPath(locktag, lockmode))
3085 {
3086 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
3087
3088 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
3089 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
3090 FastPathStrongRelationLocks->count[fasthashcode]--;
3091 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3092 }
3093 }
3094
3095 /*
3096 * CheckForSessionAndXactLocks
3097 * Check to see if transaction holds both session-level and xact-level
3098 * locks on the same object; if so, throw an error.
3099 *
3100 * If we have both session- and transaction-level locks on the same object,
3101 * PREPARE TRANSACTION must fail. This should never happen with regular
3102 * locks, since we only take those at session level in some special operations
3103 * like VACUUM. It's possible to hit this with advisory locks, though.
3104 *
3105 * It would be nice if we could keep the session hold and give away the
3106 * transactional hold to the prepared xact. However, that would require two
3107 * PROCLOCK objects, and we cannot be sure that another PROCLOCK will be
3108 * available when it comes time for PostPrepare_Locks to do the deed.
3109 * So for now, we error out while we can still do so safely.
3110 *
3111 * Since the LOCALLOCK table stores a separate entry for each lockmode,
3112 * we can't implement this check by examining LOCALLOCK entries in isolation.
3113 * We must build a transient hashtable that is indexed by locktag only.
3114 */
3115 static void
CheckForSessionAndXactLocks(void)3116 CheckForSessionAndXactLocks(void)
3117 {
3118 typedef struct
3119 {
3120 LOCKTAG lock; /* identifies the lockable object */
3121 bool sessLock; /* is any lockmode held at session level? */
3122 bool xactLock; /* is any lockmode held at xact level? */
3123 } PerLockTagEntry;
3124
3125 HASHCTL hash_ctl;
3126 HTAB *lockhtab;
3127 HASH_SEQ_STATUS status;
3128 LOCALLOCK *locallock;
3129
3130 /* Create a local hash table keyed by LOCKTAG only */
3131 hash_ctl.keysize = sizeof(LOCKTAG);
3132 hash_ctl.entrysize = sizeof(PerLockTagEntry);
3133 hash_ctl.hcxt = CurrentMemoryContext;
3134
3135 lockhtab = hash_create("CheckForSessionAndXactLocks table",
3136 256, /* arbitrary initial size */
3137 &hash_ctl,
3138 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3139
3140 /* Scan local lock table to find entries for each LOCKTAG */
3141 hash_seq_init(&status, LockMethodLocalHash);
3142
3143 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3144 {
3145 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3146 PerLockTagEntry *hentry;
3147 bool found;
3148 int i;
3149
3150 /*
3151 * Ignore VXID locks. We don't want those to be held by prepared
3152 * transactions, since they aren't meaningful after a restart.
3153 */
3154 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3155 continue;
3156
3157 /* Ignore it if we don't actually hold the lock */
3158 if (locallock->nLocks <= 0)
3159 continue;
3160
3161 /* Otherwise, find or make an entry in lockhtab */
3162 hentry = (PerLockTagEntry *) hash_search(lockhtab,
3163 (void *) &locallock->tag.lock,
3164 HASH_ENTER, &found);
3165 if (!found) /* initialize, if newly created */
3166 hentry->sessLock = hentry->xactLock = false;
3167
3168 /* Scan to see if we hold lock at session or xact level or both */
3169 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3170 {
3171 if (lockOwners[i].owner == NULL)
3172 hentry->sessLock = true;
3173 else
3174 hentry->xactLock = true;
3175 }
3176
3177 /*
3178 * We can throw error immediately when we see both types of locks; no
3179 * need to wait around to see if there are more violations.
3180 */
3181 if (hentry->sessLock && hentry->xactLock)
3182 ereport(ERROR,
3183 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3184 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3185 }
3186
3187 /* Success, so clean up */
3188 hash_destroy(lockhtab);
3189 }
3190
3191 /*
3192 * AtPrepare_Locks
3193 * Do the preparatory work for a PREPARE: make 2PC state file records
3194 * for all locks currently held.
3195 *
3196 * Session-level locks are ignored, as are VXID locks.
3197 *
3198 * For the most part, we don't need to touch shared memory for this ---
3199 * all the necessary state information is in the locallock table.
3200 * Fast-path locks are an exception, however: we move any such locks to
3201 * the main table before allowing PREPARE TRANSACTION to succeed.
3202 */
3203 void
AtPrepare_Locks(void)3204 AtPrepare_Locks(void)
3205 {
3206 HASH_SEQ_STATUS status;
3207 LOCALLOCK *locallock;
3208
3209 /* First, verify there aren't locks of both xact and session level */
3210 CheckForSessionAndXactLocks();
3211
3212 /* Now do the per-locallock cleanup work */
3213 hash_seq_init(&status, LockMethodLocalHash);
3214
3215 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3216 {
3217 TwoPhaseLockRecord record;
3218 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3219 bool haveSessionLock;
3220 bool haveXactLock;
3221 int i;
3222
3223 /*
3224 * Ignore VXID locks. We don't want those to be held by prepared
3225 * transactions, since they aren't meaningful after a restart.
3226 */
3227 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3228 continue;
3229
3230 /* Ignore it if we don't actually hold the lock */
3231 if (locallock->nLocks <= 0)
3232 continue;
3233
3234 /* Scan to see whether we hold it at session or transaction level */
3235 haveSessionLock = haveXactLock = false;
3236 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3237 {
3238 if (lockOwners[i].owner == NULL)
3239 haveSessionLock = true;
3240 else
3241 haveXactLock = true;
3242 }
3243
3244 /* Ignore it if we have only session lock */
3245 if (!haveXactLock)
3246 continue;
3247
3248 /* This can't happen, because we already checked it */
3249 if (haveSessionLock)
3250 ereport(ERROR,
3251 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3252 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3253
3254 /*
3255 * If the local lock was taken via the fast-path, we need to move it
3256 * to the primary lock table, or just get a pointer to the existing
3257 * primary lock table entry if by chance it's already been
3258 * transferred.
3259 */
3260 if (locallock->proclock == NULL)
3261 {
3262 locallock->proclock = FastPathGetRelationLockEntry(locallock);
3263 locallock->lock = locallock->proclock->tag.myLock;
3264 }
3265
3266 /*
3267 * Arrange to not release any strong lock count held by this lock
3268 * entry. We must retain the count until the prepared transaction is
3269 * committed or rolled back.
3270 */
3271 locallock->holdsStrongLockCount = FALSE;
3272
3273 /*
3274 * Create a 2PC record.
3275 */
3276 memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
3277 record.lockmode = locallock->tag.mode;
3278
3279 RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
3280 &record, sizeof(TwoPhaseLockRecord));
3281 }
3282 }
3283
3284 /*
3285 * PostPrepare_Locks
3286 * Clean up after successful PREPARE
3287 *
3288 * Here, we want to transfer ownership of our locks to a dummy PGPROC
3289 * that's now associated with the prepared transaction, and we want to
3290 * clean out the corresponding entries in the LOCALLOCK table.
3291 *
3292 * Note: by removing the LOCALLOCK entries, we are leaving dangling
3293 * pointers in the transaction's resource owner. This is OK at the
3294 * moment since resowner.c doesn't try to free locks retail at a toplevel
3295 * transaction commit or abort. We could alternatively zero out nLocks
3296 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
3297 * but that probably costs more cycles.
3298 */
3299 void
PostPrepare_Locks(TransactionId xid)3300 PostPrepare_Locks(TransactionId xid)
3301 {
3302 PGPROC *newproc = TwoPhaseGetDummyProc(xid);
3303 HASH_SEQ_STATUS status;
3304 LOCALLOCK *locallock;
3305 LOCK *lock;
3306 PROCLOCK *proclock;
3307 PROCLOCKTAG proclocktag;
3308 int partition;
3309
3310 /* Can't prepare a lock group follower. */
3311 Assert(MyProc->lockGroupLeader == NULL ||
3312 MyProc->lockGroupLeader == MyProc);
3313
3314 /* This is a critical section: any error means big trouble */
3315 START_CRIT_SECTION();
3316
3317 /*
3318 * First we run through the locallock table and get rid of unwanted
3319 * entries, then we scan the process's proclocks and transfer them to the
3320 * target proc.
3321 *
3322 * We do this separately because we may have multiple locallock entries
3323 * pointing to the same proclock, and we daren't end up with any dangling
3324 * pointers.
3325 */
3326 hash_seq_init(&status, LockMethodLocalHash);
3327
3328 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3329 {
3330 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3331 bool haveSessionLock;
3332 bool haveXactLock;
3333 int i;
3334
3335 if (locallock->proclock == NULL || locallock->lock == NULL)
3336 {
3337 /*
3338 * We must've run out of shared memory while trying to set up this
3339 * lock. Just forget the local entry.
3340 */
3341 Assert(locallock->nLocks == 0);
3342 RemoveLocalLock(locallock);
3343 continue;
3344 }
3345
3346 /* Ignore VXID locks */
3347 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3348 continue;
3349
3350 /* Scan to see whether we hold it at session or transaction level */
3351 haveSessionLock = haveXactLock = false;
3352 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3353 {
3354 if (lockOwners[i].owner == NULL)
3355 haveSessionLock = true;
3356 else
3357 haveXactLock = true;
3358 }
3359
3360 /* Ignore it if we have only session lock */
3361 if (!haveXactLock)
3362 continue;
3363
3364 /* This can't happen, because we already checked it */
3365 if (haveSessionLock)
3366 ereport(PANIC,
3367 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3368 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3369
3370 /* Mark the proclock to show we need to release this lockmode */
3371 if (locallock->nLocks > 0)
3372 locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
3373
3374 /* And remove the locallock hashtable entry */
3375 RemoveLocalLock(locallock);
3376 }
3377
3378 /*
3379 * Now, scan each lock partition separately.
3380 */
3381 for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
3382 {
3383 LWLock *partitionLock;
3384 SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
3385 PROCLOCK *nextplock;
3386
3387 partitionLock = LockHashPartitionLockByIndex(partition);
3388
3389 /*
3390 * If the proclock list for this partition is empty, we can skip
3391 * acquiring the partition lock. This optimization is safer than the
3392 * situation in LockReleaseAll, because we got rid of any fast-path
3393 * locks during AtPrepare_Locks, so there cannot be any case where
3394 * another backend is adding something to our lists now. For safety,
3395 * though, we code this the same way as in LockReleaseAll.
3396 */
3397 if (SHMQueueNext(procLocks, procLocks,
3398 offsetof(PROCLOCK, procLink)) == NULL)
3399 continue; /* needn't examine this partition */
3400
3401 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3402
3403 for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3404 offsetof(PROCLOCK, procLink));
3405 proclock;
3406 proclock = nextplock)
3407 {
3408 /* Get link first, since we may unlink/relink this proclock */
3409 nextplock = (PROCLOCK *)
3410 SHMQueueNext(procLocks, &proclock->procLink,
3411 offsetof(PROCLOCK, procLink));
3412
3413 Assert(proclock->tag.myProc == MyProc);
3414
3415 lock = proclock->tag.myLock;
3416
3417 /* Ignore VXID locks */
3418 if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3419 continue;
3420
3421 PROCLOCK_PRINT("PostPrepare_Locks", proclock);
3422 LOCK_PRINT("PostPrepare_Locks", lock, 0);
3423 Assert(lock->nRequested >= 0);
3424 Assert(lock->nGranted >= 0);
3425 Assert(lock->nGranted <= lock->nRequested);
3426 Assert((proclock->holdMask & ~lock->grantMask) == 0);
3427
3428 /* Ignore it if nothing to release (must be a session lock) */
3429 if (proclock->releaseMask == 0)
3430 continue;
3431
3432 /* Else we should be releasing all locks */
3433 if (proclock->releaseMask != proclock->holdMask)
3434 elog(PANIC, "we seem to have dropped a bit somewhere");
3435
3436 /*
3437 * We cannot simply modify proclock->tag.myProc to reassign
3438 * ownership of the lock, because that's part of the hash key and
3439 * the proclock would then be in the wrong hash chain. Instead
3440 * use hash_update_hash_key. (We used to create a new hash entry,
3441 * but that risks out-of-memory failure if other processes are
3442 * busy making proclocks too.) We must unlink the proclock from
3443 * our procLink chain and put it into the new proc's chain, too.
3444 *
3445 * Note: the updated proclock hash key will still belong to the
3446 * same hash partition, cf proclock_hash(). So the partition lock
3447 * we already hold is sufficient for this.
3448 */
3449 SHMQueueDelete(&proclock->procLink);
3450
3451 /*
3452 * Create the new hash key for the proclock.
3453 */
3454 proclocktag.myLock = lock;
3455 proclocktag.myProc = newproc;
3456
3457 /*
3458 * Update groupLeader pointer to point to the new proc. (We'd
3459 * better not be a member of somebody else's lock group!)
3460 */
3461 Assert(proclock->groupLeader == proclock->tag.myProc);
3462 proclock->groupLeader = newproc;
3463
3464 /*
3465 * Update the proclock. We should not find any existing entry for
3466 * the same hash key, since there can be only one entry for any
3467 * given lock with my own proc.
3468 */
3469 if (!hash_update_hash_key(LockMethodProcLockHash,
3470 (void *) proclock,
3471 (void *) &proclocktag))
3472 elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3473
3474 /* Re-link into the new proc's proclock list */
3475 SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
3476 &proclock->procLink);
3477
3478 PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
3479 } /* loop over PROCLOCKs within this partition */
3480
3481 LWLockRelease(partitionLock);
3482 } /* loop over partitions */
3483
3484 END_CRIT_SECTION();
3485 }
3486
3487
3488 /*
3489 * Estimate shared-memory space used for lock tables
3490 */
3491 Size
LockShmemSize(void)3492 LockShmemSize(void)
3493 {
3494 Size size = 0;
3495 long max_table_size;
3496
3497 /* lock hash table */
3498 max_table_size = NLOCKENTS();
3499 size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
3500
3501 /* proclock hash table */
3502 max_table_size *= 2;
3503 size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
3504
3505 /*
3506 * Since NLOCKENTS is only an estimate, add 10% safety margin.
3507 */
3508 size = add_size(size, size / 10);
3509
3510 return size;
3511 }
3512
3513 /*
3514 * GetLockStatusData - Return a summary of the lock manager's internal
3515 * status, for use in a user-level reporting function.
3516 *
3517 * The return data consists of an array of LockInstanceData objects,
3518 * which are a lightly abstracted version of the PROCLOCK data structures,
3519 * i.e. there is one entry for each unique lock and interested PGPROC.
3520 * It is the caller's responsibility to match up related items (such as
3521 * references to the same lockable object or PGPROC) if wanted.
3522 *
3523 * The design goal is to hold the LWLocks for as short a time as possible;
3524 * thus, this function simply makes a copy of the necessary data and releases
3525 * the locks, allowing the caller to contemplate and format the data for as
3526 * long as it pleases.
3527 */
3528 LockData *
GetLockStatusData(void)3529 GetLockStatusData(void)
3530 {
3531 LockData *data;
3532 PROCLOCK *proclock;
3533 HASH_SEQ_STATUS seqstat;
3534 int els;
3535 int el;
3536 int i;
3537
3538 data = (LockData *) palloc(sizeof(LockData));
3539
3540 /* Guess how much space we'll need. */
3541 els = MaxBackends;
3542 el = 0;
3543 data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
3544
3545 /*
3546 * First, we iterate through the per-backend fast-path arrays, locking
3547 * them one at a time. This might produce an inconsistent picture of the
3548 * system state, but taking all of those LWLocks at the same time seems
3549 * impractical (in particular, note MAX_SIMUL_LWLOCKS). It shouldn't
3550 * matter too much, because none of these locks can be involved in lock
3551 * conflicts anyway - anything that might must be present in the main lock
3552 * table. (For the same reason, we don't sweat about making leaderPid
3553 * completely valid. We cannot safely dereference another backend's
3554 * lockGroupLeader field without holding all lock partition locks, and
3555 * it's not worth that.)
3556 */
3557 for (i = 0; i < ProcGlobal->allProcCount; ++i)
3558 {
3559 PGPROC *proc = &ProcGlobal->allProcs[i];
3560 uint32 f;
3561
3562 LWLockAcquire(&proc->backendLock, LW_SHARED);
3563
3564 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
3565 {
3566 LockInstanceData *instance;
3567 uint32 lockbits = FAST_PATH_GET_BITS(proc, f);
3568
3569 /* Skip unallocated slots. */
3570 if (!lockbits)
3571 continue;
3572
3573 if (el >= els)
3574 {
3575 els += MaxBackends;
3576 data->locks = (LockInstanceData *)
3577 repalloc(data->locks, sizeof(LockInstanceData) * els);
3578 }
3579
3580 instance = &data->locks[el];
3581 SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
3582 proc->fpRelId[f]);
3583 instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
3584 instance->waitLockMode = NoLock;
3585 instance->backend = proc->backendId;
3586 instance->lxid = proc->lxid;
3587 instance->pid = proc->pid;
3588 instance->leaderPid = proc->pid;
3589 instance->fastpath = true;
3590
3591 el++;
3592 }
3593
3594 if (proc->fpVXIDLock)
3595 {
3596 VirtualTransactionId vxid;
3597 LockInstanceData *instance;
3598
3599 if (el >= els)
3600 {
3601 els += MaxBackends;
3602 data->locks = (LockInstanceData *)
3603 repalloc(data->locks, sizeof(LockInstanceData) * els);
3604 }
3605
3606 vxid.backendId = proc->backendId;
3607 vxid.localTransactionId = proc->fpLocalTransactionId;
3608
3609 instance = &data->locks[el];
3610 SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
3611 instance->holdMask = LOCKBIT_ON(ExclusiveLock);
3612 instance->waitLockMode = NoLock;
3613 instance->backend = proc->backendId;
3614 instance->lxid = proc->lxid;
3615 instance->pid = proc->pid;
3616 instance->leaderPid = proc->pid;
3617 instance->fastpath = true;
3618
3619 el++;
3620 }
3621
3622 LWLockRelease(&proc->backendLock);
3623 }
3624
3625 /*
3626 * Next, acquire lock on the entire shared lock data structure. We do
3627 * this so that, at least for locks in the primary lock table, the state
3628 * will be self-consistent.
3629 *
3630 * Since this is a read-only operation, we take shared instead of
3631 * exclusive lock. There's not a whole lot of point to this, because all
3632 * the normal operations require exclusive lock, but it doesn't hurt
3633 * anything either. It will at least allow two backends to do
3634 * GetLockStatusData in parallel.
3635 *
3636 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3637 */
3638 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3639 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3640
3641 /* Now we can safely count the number of proclocks */
3642 data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
3643 if (data->nelements > els)
3644 {
3645 els = data->nelements;
3646 data->locks = (LockInstanceData *)
3647 repalloc(data->locks, sizeof(LockInstanceData) * els);
3648 }
3649
3650 /* Now scan the tables to copy the data */
3651 hash_seq_init(&seqstat, LockMethodProcLockHash);
3652
3653 while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3654 {
3655 PGPROC *proc = proclock->tag.myProc;
3656 LOCK *lock = proclock->tag.myLock;
3657 LockInstanceData *instance = &data->locks[el];
3658
3659 memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3660 instance->holdMask = proclock->holdMask;
3661 if (proc->waitLock == proclock->tag.myLock)
3662 instance->waitLockMode = proc->waitLockMode;
3663 else
3664 instance->waitLockMode = NoLock;
3665 instance->backend = proc->backendId;
3666 instance->lxid = proc->lxid;
3667 instance->pid = proc->pid;
3668 instance->leaderPid = proclock->groupLeader->pid;
3669 instance->fastpath = false;
3670
3671 el++;
3672 }
3673
3674 /*
3675 * And release locks. We do this in reverse order for two reasons: (1)
3676 * Anyone else who needs more than one of the locks will be trying to lock
3677 * them in increasing order; we don't want to release the other process
3678 * until it can get all the locks it needs. (2) This avoids O(N^2)
3679 * behavior inside LWLockRelease.
3680 */
3681 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3682 LWLockRelease(LockHashPartitionLockByIndex(i));
3683
3684 Assert(el == data->nelements);
3685
3686 return data;
3687 }
3688
3689 /*
3690 * GetBlockerStatusData - Return a summary of the lock manager's state
3691 * concerning locks that are blocking the specified PID or any member of
3692 * the PID's lock group, for use in a user-level reporting function.
3693 *
3694 * For each PID within the lock group that is awaiting some heavyweight lock,
3695 * the return data includes an array of LockInstanceData objects, which are
3696 * the same data structure used by GetLockStatusData; but unlike that function,
3697 * this one reports only the PROCLOCKs associated with the lock that that PID
3698 * is blocked on. (Hence, all the locktags should be the same for any one
3699 * blocked PID.) In addition, we return an array of the PIDs of those backends
3700 * that are ahead of the blocked PID in the lock's wait queue. These can be
3701 * compared with the PIDs in the LockInstanceData objects to determine which
3702 * waiters are ahead of or behind the blocked PID in the queue.
3703 *
3704 * If blocked_pid isn't a valid backend PID or nothing in its lock group is
3705 * waiting on any heavyweight lock, return empty arrays.
3706 *
3707 * The design goal is to hold the LWLocks for as short a time as possible;
3708 * thus, this function simply makes a copy of the necessary data and releases
3709 * the locks, allowing the caller to contemplate and format the data for as
3710 * long as it pleases.
3711 */
3712 BlockedProcsData *
GetBlockerStatusData(int blocked_pid)3713 GetBlockerStatusData(int blocked_pid)
3714 {
3715 BlockedProcsData *data;
3716 PGPROC *proc;
3717 int i;
3718
3719 data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData));
3720
3721 /*
3722 * Guess how much space we'll need, and preallocate. Most of the time
3723 * this will avoid needing to do repalloc while holding the LWLocks. (We
3724 * assume, but check with an Assert, that MaxBackends is enough entries
3725 * for the procs[] array; the other two could need enlargement, though.)
3726 */
3727 data->nprocs = data->nlocks = data->npids = 0;
3728 data->maxprocs = data->maxlocks = data->maxpids = MaxBackends;
3729 data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs);
3730 data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks);
3731 data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids);
3732
3733 /*
3734 * In order to search the ProcArray for blocked_pid and assume that that
3735 * entry won't immediately disappear under us, we must hold ProcArrayLock.
3736 * In addition, to examine the lock grouping fields of any other backend,
3737 * we must hold all the hash partition locks. (Only one of those locks is
3738 * actually relevant for any one lock group, but we can't know which one
3739 * ahead of time.) It's fairly annoying to hold all those locks
3740 * throughout this, but it's no worse than GetLockStatusData(), and it
3741 * does have the advantage that we're guaranteed to return a
3742 * self-consistent instantaneous state.
3743 */
3744 LWLockAcquire(ProcArrayLock, LW_SHARED);
3745
3746 proc = BackendPidGetProcWithLock(blocked_pid);
3747
3748 /* Nothing to do if it's gone */
3749 if (proc != NULL)
3750 {
3751 /*
3752 * Acquire lock on the entire shared lock data structure. See notes
3753 * in GetLockStatusData().
3754 */
3755 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3756 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3757
3758 if (proc->lockGroupLeader == NULL)
3759 {
3760 /* Easy case, proc is not a lock group member */
3761 GetSingleProcBlockerStatusData(proc, data);
3762 }
3763 else
3764 {
3765 /* Examine all procs in proc's lock group */
3766 dlist_iter iter;
3767
3768 dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers)
3769 {
3770 PGPROC *memberProc;
3771
3772 memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);
3773 GetSingleProcBlockerStatusData(memberProc, data);
3774 }
3775 }
3776
3777 /*
3778 * And release locks. See notes in GetLockStatusData().
3779 */
3780 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3781 LWLockRelease(LockHashPartitionLockByIndex(i));
3782
3783 Assert(data->nprocs <= data->maxprocs);
3784 }
3785
3786 LWLockRelease(ProcArrayLock);
3787
3788 return data;
3789 }
3790
3791 /* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */
3792 static void
GetSingleProcBlockerStatusData(PGPROC * blocked_proc,BlockedProcsData * data)3793 GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
3794 {
3795 LOCK *theLock = blocked_proc->waitLock;
3796 BlockedProcData *bproc;
3797 SHM_QUEUE *procLocks;
3798 PROCLOCK *proclock;
3799 PROC_QUEUE *waitQueue;
3800 PGPROC *proc;
3801 int queue_size;
3802 int i;
3803
3804 /* Nothing to do if this proc is not blocked */
3805 if (theLock == NULL)
3806 return;
3807
3808 /* Set up a procs[] element */
3809 bproc = &data->procs[data->nprocs++];
3810 bproc->pid = blocked_proc->pid;
3811 bproc->first_lock = data->nlocks;
3812 bproc->first_waiter = data->npids;
3813
3814 /*
3815 * We may ignore the proc's fast-path arrays, since nothing in those could
3816 * be related to a contended lock.
3817 */
3818
3819 /* Collect all PROCLOCKs associated with theLock */
3820 procLocks = &(theLock->procLocks);
3821 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3822 offsetof(PROCLOCK, lockLink));
3823 while (proclock)
3824 {
3825 PGPROC *proc = proclock->tag.myProc;
3826 LOCK *lock = proclock->tag.myLock;
3827 LockInstanceData *instance;
3828
3829 if (data->nlocks >= data->maxlocks)
3830 {
3831 data->maxlocks += MaxBackends;
3832 data->locks = (LockInstanceData *)
3833 repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks);
3834 }
3835
3836 instance = &data->locks[data->nlocks];
3837 memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3838 instance->holdMask = proclock->holdMask;
3839 if (proc->waitLock == lock)
3840 instance->waitLockMode = proc->waitLockMode;
3841 else
3842 instance->waitLockMode = NoLock;
3843 instance->backend = proc->backendId;
3844 instance->lxid = proc->lxid;
3845 instance->pid = proc->pid;
3846 instance->leaderPid = proclock->groupLeader->pid;
3847 instance->fastpath = false;
3848 data->nlocks++;
3849
3850 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3851 offsetof(PROCLOCK, lockLink));
3852 }
3853
3854 /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
3855 waitQueue = &(theLock->waitProcs);
3856 queue_size = waitQueue->size;
3857
3858 if (queue_size > data->maxpids - data->npids)
3859 {
3860 data->maxpids = Max(data->maxpids + MaxBackends,
3861 data->npids + queue_size);
3862 data->waiter_pids = (int *) repalloc(data->waiter_pids,
3863 sizeof(int) * data->maxpids);
3864 }
3865
3866 /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
3867 proc = (PGPROC *) waitQueue->links.next;
3868 for (i = 0; i < queue_size; i++)
3869 {
3870 if (proc == blocked_proc)
3871 break;
3872 data->waiter_pids[data->npids++] = proc->pid;
3873 proc = (PGPROC *) proc->links.next;
3874 }
3875
3876 bproc->num_locks = data->nlocks - bproc->first_lock;
3877 bproc->num_waiters = data->npids - bproc->first_waiter;
3878 }
3879
3880 /*
3881 * Returns a list of currently held AccessExclusiveLocks, for use by
3882 * LogStandbySnapshot(). The result is a palloc'd array,
3883 * with the number of elements returned into *nlocks.
3884 *
3885 * XXX This currently takes a lock on all partitions of the lock table,
3886 * but it's possible to do better. By reference counting locks and storing
3887 * the value in the ProcArray entry for each backend we could tell if any
3888 * locks need recording without having to acquire the partition locks and
3889 * scan the lock table. Whether that's worth the additional overhead
3890 * is pretty dubious though.
3891 */
3892 xl_standby_lock *
GetRunningTransactionLocks(int * nlocks)3893 GetRunningTransactionLocks(int *nlocks)
3894 {
3895 xl_standby_lock *accessExclusiveLocks;
3896 PROCLOCK *proclock;
3897 HASH_SEQ_STATUS seqstat;
3898 int i;
3899 int index;
3900 int els;
3901
3902 /*
3903 * Acquire lock on the entire shared lock data structure.
3904 *
3905 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3906 */
3907 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3908 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3909
3910 /* Now we can safely count the number of proclocks */
3911 els = hash_get_num_entries(LockMethodProcLockHash);
3912
3913 /*
3914 * Allocating enough space for all locks in the lock table is overkill,
3915 * but it's more convenient and faster than having to enlarge the array.
3916 */
3917 accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
3918
3919 /* Now scan the tables to copy the data */
3920 hash_seq_init(&seqstat, LockMethodProcLockHash);
3921
3922 /*
3923 * If lock is a currently granted AccessExclusiveLock then it will have
3924 * just one proclock holder, so locks are never accessed twice in this
3925 * particular case. Don't copy this code for use elsewhere because in the
3926 * general case this will give you duplicate locks when looking at
3927 * non-exclusive lock types.
3928 */
3929 index = 0;
3930 while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3931 {
3932 /* make sure this definition matches the one used in LockAcquire */
3933 if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
3934 proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
3935 {
3936 PGPROC *proc = proclock->tag.myProc;
3937 PGXACT *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
3938 LOCK *lock = proclock->tag.myLock;
3939 TransactionId xid = pgxact->xid;
3940
3941 /*
3942 * Don't record locks for transactions if we know they have
3943 * already issued their WAL record for commit but not yet released
3944 * lock. It is still possible that we see locks held by already
3945 * complete transactions, if they haven't yet zeroed their xids.
3946 */
3947 if (!TransactionIdIsValid(xid))
3948 continue;
3949
3950 accessExclusiveLocks[index].xid = xid;
3951 accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3952 accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
3953
3954 index++;
3955 }
3956 }
3957
3958 Assert(index <= els);
3959
3960 /*
3961 * And release locks. We do this in reverse order for two reasons: (1)
3962 * Anyone else who needs more than one of the locks will be trying to lock
3963 * them in increasing order; we don't want to release the other process
3964 * until it can get all the locks it needs. (2) This avoids O(N^2)
3965 * behavior inside LWLockRelease.
3966 */
3967 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3968 LWLockRelease(LockHashPartitionLockByIndex(i));
3969
3970 *nlocks = index;
3971 return accessExclusiveLocks;
3972 }
3973
3974 /* Provide the textual name of any lock mode */
3975 const char *
GetLockmodeName(LOCKMETHODID lockmethodid,LOCKMODE mode)3976 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
3977 {
3978 Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
3979 Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
3980 return LockMethods[lockmethodid]->lockModeNames[mode];
3981 }
3982
3983 #ifdef LOCK_DEBUG
3984 /*
3985 * Dump all locks in the given proc's myProcLocks lists.
3986 *
3987 * Caller is responsible for having acquired appropriate LWLocks.
3988 */
3989 void
DumpLocks(PGPROC * proc)3990 DumpLocks(PGPROC *proc)
3991 {
3992 SHM_QUEUE *procLocks;
3993 PROCLOCK *proclock;
3994 LOCK *lock;
3995 int i;
3996
3997 if (proc == NULL)
3998 return;
3999
4000 if (proc->waitLock)
4001 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
4002
4003 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
4004 {
4005 procLocks = &(proc->myProcLocks[i]);
4006
4007 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
4008 offsetof(PROCLOCK, procLink));
4009
4010 while (proclock)
4011 {
4012 Assert(proclock->tag.myProc == proc);
4013
4014 lock = proclock->tag.myLock;
4015
4016 PROCLOCK_PRINT("DumpLocks", proclock);
4017 LOCK_PRINT("DumpLocks", lock, 0);
4018
4019 proclock = (PROCLOCK *)
4020 SHMQueueNext(procLocks, &proclock->procLink,
4021 offsetof(PROCLOCK, procLink));
4022 }
4023 }
4024 }
4025
4026 /*
4027 * Dump all lmgr locks.
4028 *
4029 * Caller is responsible for having acquired appropriate LWLocks.
4030 */
4031 void
DumpAllLocks(void)4032 DumpAllLocks(void)
4033 {
4034 PGPROC *proc;
4035 PROCLOCK *proclock;
4036 LOCK *lock;
4037 HASH_SEQ_STATUS status;
4038
4039 proc = MyProc;
4040
4041 if (proc && proc->waitLock)
4042 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
4043
4044 hash_seq_init(&status, LockMethodProcLockHash);
4045
4046 while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
4047 {
4048 PROCLOCK_PRINT("DumpAllLocks", proclock);
4049
4050 lock = proclock->tag.myLock;
4051 if (lock)
4052 LOCK_PRINT("DumpAllLocks", lock, 0);
4053 else
4054 elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
4055 }
4056 }
4057 #endif /* LOCK_DEBUG */
4058
4059 /*
4060 * LOCK 2PC resource manager's routines
4061 */
4062
4063 /*
4064 * Re-acquire a lock belonging to a transaction that was prepared.
4065 *
4066 * Because this function is run at db startup, re-acquiring the locks should
4067 * never conflict with running transactions because there are none. We
4068 * assume that the lock state represented by the stored 2PC files is legal.
4069 *
4070 * When switching from Hot Standby mode to normal operation, the locks will
4071 * be already held by the startup process. The locks are acquired for the new
4072 * procs without checking for conflicts, so we don't get a conflict between the
4073 * startup process and the dummy procs, even though we will momentarily have
4074 * a situation where two procs are holding the same AccessExclusiveLock,
4075 * which isn't normally possible because the conflict. If we're in standby
4076 * mode, but a recovery snapshot hasn't been established yet, it's possible
4077 * that some but not all of the locks are already held by the startup process.
4078 *
4079 * This approach is simple, but also a bit dangerous, because if there isn't
4080 * enough shared memory to acquire the locks, an error will be thrown, which
4081 * is promoted to FATAL and recovery will abort, bringing down postmaster.
4082 * A safer approach would be to transfer the locks like we do in
4083 * AtPrepare_Locks, but then again, in hot standby mode it's possible for
4084 * read-only backends to use up all the shared lock memory anyway, so that
4085 * replaying the WAL record that needs to acquire a lock will throw an error
4086 * and PANIC anyway.
4087 */
4088 void
lock_twophase_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4089 lock_twophase_recover(TransactionId xid, uint16 info,
4090 void *recdata, uint32 len)
4091 {
4092 TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4093 PGPROC *proc = TwoPhaseGetDummyProc(xid);
4094 LOCKTAG *locktag;
4095 LOCKMODE lockmode;
4096 LOCKMETHODID lockmethodid;
4097 LOCK *lock;
4098 PROCLOCK *proclock;
4099 PROCLOCKTAG proclocktag;
4100 bool found;
4101 uint32 hashcode;
4102 uint32 proclock_hashcode;
4103 int partition;
4104 LWLock *partitionLock;
4105 LockMethod lockMethodTable;
4106
4107 Assert(len == sizeof(TwoPhaseLockRecord));
4108 locktag = &rec->locktag;
4109 lockmode = rec->lockmode;
4110 lockmethodid = locktag->locktag_lockmethodid;
4111
4112 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4113 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4114 lockMethodTable = LockMethods[lockmethodid];
4115
4116 hashcode = LockTagHashCode(locktag);
4117 partition = LockHashPartition(hashcode);
4118 partitionLock = LockHashPartitionLock(hashcode);
4119
4120 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4121
4122 /*
4123 * Find or create a lock with this tag.
4124 */
4125 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4126 (void *) locktag,
4127 hashcode,
4128 HASH_ENTER_NULL,
4129 &found);
4130 if (!lock)
4131 {
4132 LWLockRelease(partitionLock);
4133 ereport(ERROR,
4134 (errcode(ERRCODE_OUT_OF_MEMORY),
4135 errmsg("out of shared memory"),
4136 errhint("You might need to increase max_locks_per_transaction.")));
4137 }
4138
4139 /*
4140 * if it's a new lock object, initialize it
4141 */
4142 if (!found)
4143 {
4144 lock->grantMask = 0;
4145 lock->waitMask = 0;
4146 SHMQueueInit(&(lock->procLocks));
4147 ProcQueueInit(&(lock->waitProcs));
4148 lock->nRequested = 0;
4149 lock->nGranted = 0;
4150 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
4151 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
4152 LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
4153 }
4154 else
4155 {
4156 LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
4157 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
4158 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
4159 Assert(lock->nGranted <= lock->nRequested);
4160 }
4161
4162 /*
4163 * Create the hash key for the proclock table.
4164 */
4165 proclocktag.myLock = lock;
4166 proclocktag.myProc = proc;
4167
4168 proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
4169
4170 /*
4171 * Find or create a proclock entry with this tag
4172 */
4173 proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
4174 (void *) &proclocktag,
4175 proclock_hashcode,
4176 HASH_ENTER_NULL,
4177 &found);
4178 if (!proclock)
4179 {
4180 /* Ooops, not enough shmem for the proclock */
4181 if (lock->nRequested == 0)
4182 {
4183 /*
4184 * There are no other requestors of this lock, so garbage-collect
4185 * the lock object. We *must* do this to avoid a permanent leak
4186 * of shared memory, because there won't be anything to cause
4187 * anyone to release the lock object later.
4188 */
4189 Assert(SHMQueueEmpty(&(lock->procLocks)));
4190 if (!hash_search_with_hash_value(LockMethodLockHash,
4191 (void *) &(lock->tag),
4192 hashcode,
4193 HASH_REMOVE,
4194 NULL))
4195 elog(PANIC, "lock table corrupted");
4196 }
4197 LWLockRelease(partitionLock);
4198 ereport(ERROR,
4199 (errcode(ERRCODE_OUT_OF_MEMORY),
4200 errmsg("out of shared memory"),
4201 errhint("You might need to increase max_locks_per_transaction.")));
4202 }
4203
4204 /*
4205 * If new, initialize the new entry
4206 */
4207 if (!found)
4208 {
4209 Assert(proc->lockGroupLeader == NULL);
4210 proclock->groupLeader = proc;
4211 proclock->holdMask = 0;
4212 proclock->releaseMask = 0;
4213 /* Add proclock to appropriate lists */
4214 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
4215 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
4216 &proclock->procLink);
4217 PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
4218 }
4219 else
4220 {
4221 PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
4222 Assert((proclock->holdMask & ~lock->grantMask) == 0);
4223 }
4224
4225 /*
4226 * lock->nRequested and lock->requested[] count the total number of
4227 * requests, whether granted or waiting, so increment those immediately.
4228 */
4229 lock->nRequested++;
4230 lock->requested[lockmode]++;
4231 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
4232
4233 /*
4234 * We shouldn't already hold the desired lock.
4235 */
4236 if (proclock->holdMask & LOCKBIT_ON(lockmode))
4237 elog(ERROR, "lock %s on object %u/%u/%u is already held",
4238 lockMethodTable->lockModeNames[lockmode],
4239 lock->tag.locktag_field1, lock->tag.locktag_field2,
4240 lock->tag.locktag_field3);
4241
4242 /*
4243 * We ignore any possible conflicts and just grant ourselves the lock. Not
4244 * only because we don't bother, but also to avoid deadlocks when
4245 * switching from standby to normal mode. See function comment.
4246 */
4247 GrantLock(lock, proclock, lockmode);
4248
4249 /*
4250 * Bump strong lock count, to make sure any fast-path lock requests won't
4251 * be granted without consulting the primary lock table.
4252 */
4253 if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
4254 {
4255 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
4256
4257 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
4258 FastPathStrongRelationLocks->count[fasthashcode]++;
4259 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
4260 }
4261
4262 LWLockRelease(partitionLock);
4263 }
4264
4265 /*
4266 * Re-acquire a lock belonging to a transaction that was prepared, when
4267 * starting up into hot standby mode.
4268 */
4269 void
lock_twophase_standby_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4270 lock_twophase_standby_recover(TransactionId xid, uint16 info,
4271 void *recdata, uint32 len)
4272 {
4273 TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4274 LOCKTAG *locktag;
4275 LOCKMODE lockmode;
4276 LOCKMETHODID lockmethodid;
4277
4278 Assert(len == sizeof(TwoPhaseLockRecord));
4279 locktag = &rec->locktag;
4280 lockmode = rec->lockmode;
4281 lockmethodid = locktag->locktag_lockmethodid;
4282
4283 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4284 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4285
4286 if (lockmode == AccessExclusiveLock &&
4287 locktag->locktag_type == LOCKTAG_RELATION)
4288 {
4289 StandbyAcquireAccessExclusiveLock(xid,
4290 locktag->locktag_field1 /* dboid */ ,
4291 locktag->locktag_field2 /* reloid */ );
4292 }
4293 }
4294
4295
4296 /*
4297 * 2PC processing routine for COMMIT PREPARED case.
4298 *
4299 * Find and release the lock indicated by the 2PC record.
4300 */
4301 void
lock_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)4302 lock_twophase_postcommit(TransactionId xid, uint16 info,
4303 void *recdata, uint32 len)
4304 {
4305 TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4306 PGPROC *proc = TwoPhaseGetDummyProc(xid);
4307 LOCKTAG *locktag;
4308 LOCKMETHODID lockmethodid;
4309 LockMethod lockMethodTable;
4310
4311 Assert(len == sizeof(TwoPhaseLockRecord));
4312 locktag = &rec->locktag;
4313 lockmethodid = locktag->locktag_lockmethodid;
4314
4315 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4316 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4317 lockMethodTable = LockMethods[lockmethodid];
4318
4319 LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
4320 }
4321
4322 /*
4323 * 2PC processing routine for ROLLBACK PREPARED case.
4324 *
4325 * This is actually just the same as the COMMIT case.
4326 */
4327 void
lock_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)4328 lock_twophase_postabort(TransactionId xid, uint16 info,
4329 void *recdata, uint32 len)
4330 {
4331 lock_twophase_postcommit(xid, info, recdata, len);
4332 }
4333
4334 /*
4335 * VirtualXactLockTableInsert
4336 *
4337 * Take vxid lock via the fast-path. There can't be any pre-existing
4338 * lockers, as we haven't advertised this vxid via the ProcArray yet.
4339 *
4340 * Since MyProc->fpLocalTransactionId will normally contain the same data
4341 * as MyProc->lxid, you might wonder if we really need both. The
4342 * difference is that MyProc->lxid is set and cleared unlocked, and
4343 * examined by procarray.c, while fpLocalTransactionId is protected by
4344 * backendLock and is used only by the locking subsystem. Doing it this
4345 * way makes it easier to verify that there are no funny race conditions.
4346 *
4347 * We don't bother recording this lock in the local lock table, since it's
4348 * only ever released at the end of a transaction. Instead,
4349 * LockReleaseAll() calls VirtualXactLockTableCleanup().
4350 */
4351 void
VirtualXactLockTableInsert(VirtualTransactionId vxid)4352 VirtualXactLockTableInsert(VirtualTransactionId vxid)
4353 {
4354 Assert(VirtualTransactionIdIsValid(vxid));
4355
4356 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4357
4358 Assert(MyProc->backendId == vxid.backendId);
4359 Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
4360 Assert(MyProc->fpVXIDLock == false);
4361
4362 MyProc->fpVXIDLock = true;
4363 MyProc->fpLocalTransactionId = vxid.localTransactionId;
4364
4365 LWLockRelease(&MyProc->backendLock);
4366 }
4367
4368 /*
4369 * VirtualXactLockTableCleanup
4370 *
4371 * Check whether a VXID lock has been materialized; if so, release it,
4372 * unblocking waiters.
4373 */
4374 void
VirtualXactLockTableCleanup(void)4375 VirtualXactLockTableCleanup(void)
4376 {
4377 bool fastpath;
4378 LocalTransactionId lxid;
4379
4380 Assert(MyProc->backendId != InvalidBackendId);
4381
4382 /*
4383 * Clean up shared memory state.
4384 */
4385 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4386
4387 fastpath = MyProc->fpVXIDLock;
4388 lxid = MyProc->fpLocalTransactionId;
4389 MyProc->fpVXIDLock = false;
4390 MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
4391
4392 LWLockRelease(&MyProc->backendLock);
4393
4394 /*
4395 * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
4396 * that means someone transferred the lock to the main lock table.
4397 */
4398 if (!fastpath && LocalTransactionIdIsValid(lxid))
4399 {
4400 VirtualTransactionId vxid;
4401 LOCKTAG locktag;
4402
4403 vxid.backendId = MyBackendId;
4404 vxid.localTransactionId = lxid;
4405 SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
4406
4407 LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
4408 &locktag, ExclusiveLock, false);
4409 }
4410 }
4411
4412 /*
4413 * XactLockForVirtualXact
4414 *
4415 * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
4416 * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those
4417 * functions, it assumes "xid" is never a subtransaction and that "xid" is
4418 * prepared, committed, or aborted.
4419 *
4420 * If !TransactionIdIsValid(xid), this locks every prepared XID having been
4421 * known as "vxid" before its PREPARE TRANSACTION.
4422 */
4423 static bool
XactLockForVirtualXact(VirtualTransactionId vxid,TransactionId xid,bool wait)4424 XactLockForVirtualXact(VirtualTransactionId vxid,
4425 TransactionId xid, bool wait)
4426 {
4427 bool more = false;
4428
4429 /* There is no point to wait for 2PCs if you have no 2PCs. */
4430 if (max_prepared_xacts == 0)
4431 return true;
4432
4433 do
4434 {
4435 LockAcquireResult lar;
4436 LOCKTAG tag;
4437
4438 /* Clear state from previous iterations. */
4439 if (more)
4440 {
4441 xid = InvalidTransactionId;
4442 more = false;
4443 }
4444
4445 /* If we have no xid, try to find one. */
4446 if (!TransactionIdIsValid(xid))
4447 xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
4448 if (!TransactionIdIsValid(xid))
4449 {
4450 Assert(!more);
4451 return true;
4452 }
4453
4454 /* Check or wait for XID completion. */
4455 SET_LOCKTAG_TRANSACTION(tag, xid);
4456 lar = LockAcquire(&tag, ShareLock, false, !wait);
4457 if (lar == LOCKACQUIRE_NOT_AVAIL)
4458 return false;
4459 LockRelease(&tag, ShareLock, false);
4460 } while (more);
4461
4462 return true;
4463 }
4464
4465 /*
4466 * VirtualXactLock
4467 *
4468 * If wait = true, wait as long as the given VXID or any XID acquired by the
4469 * same transaction is still running. Then, return true.
4470 *
4471 * If wait = false, just check whether that VXID or one of those XIDs is still
4472 * running, and return true or false.
4473 */
4474 bool
VirtualXactLock(VirtualTransactionId vxid,bool wait)4475 VirtualXactLock(VirtualTransactionId vxid, bool wait)
4476 {
4477 LOCKTAG tag;
4478 PGPROC *proc;
4479 TransactionId xid = InvalidTransactionId;
4480
4481 Assert(VirtualTransactionIdIsValid(vxid));
4482
4483 if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
4484 /* no vxid lock; localTransactionId is a normal, locked XID */
4485 return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
4486
4487 SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
4488
4489 /*
4490 * If a lock table entry must be made, this is the PGPROC on whose behalf
4491 * it must be done. Note that the transaction might end or the PGPROC
4492 * might be reassigned to a new backend before we get around to examining
4493 * it, but it doesn't matter. If we find upon examination that the
4494 * relevant lxid is no longer running here, that's enough to prove that
4495 * it's no longer running anywhere.
4496 */
4497 proc = BackendIdGetProc(vxid.backendId);
4498 if (proc == NULL)
4499 return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4500
4501 /*
4502 * We must acquire this lock before checking the backendId and lxid
4503 * against the ones we're waiting for. The target backend will only set
4504 * or clear lxid while holding this lock.
4505 */
4506 LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
4507
4508 if (proc->backendId != vxid.backendId
4509 || proc->fpLocalTransactionId != vxid.localTransactionId)
4510 {
4511 /* VXID ended */
4512 LWLockRelease(&proc->backendLock);
4513 return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4514 }
4515
4516 /*
4517 * If we aren't asked to wait, there's no need to set up a lock table
4518 * entry. The transaction is still in progress, so just return false.
4519 */
4520 if (!wait)
4521 {
4522 LWLockRelease(&proc->backendLock);
4523 return false;
4524 }
4525
4526 /*
4527 * OK, we're going to need to sleep on the VXID. But first, we must set
4528 * up the primary lock table entry, if needed (ie, convert the proc's
4529 * fast-path lock on its VXID to a regular lock).
4530 */
4531 if (proc->fpVXIDLock)
4532 {
4533 PROCLOCK *proclock;
4534 uint32 hashcode;
4535 LWLock *partitionLock;
4536
4537 hashcode = LockTagHashCode(&tag);
4538
4539 partitionLock = LockHashPartitionLock(hashcode);
4540 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4541
4542 proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
4543 &tag, hashcode, ExclusiveLock);
4544 if (!proclock)
4545 {
4546 LWLockRelease(partitionLock);
4547 LWLockRelease(&proc->backendLock);
4548 ereport(ERROR,
4549 (errcode(ERRCODE_OUT_OF_MEMORY),
4550 errmsg("out of shared memory"),
4551 errhint("You might need to increase max_locks_per_transaction.")));
4552 }
4553 GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
4554
4555 LWLockRelease(partitionLock);
4556
4557 proc->fpVXIDLock = false;
4558 }
4559
4560 /*
4561 * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
4562 * search. The proc might have assigned this XID but not yet locked it,
4563 * in which case the proc will lock this XID before releasing the VXID.
4564 * The backendLock critical section excludes VirtualXactLockTableCleanup(),
4565 * so we won't save an XID of a different VXID. It doesn't matter whether
4566 * we save this before or after setting up the primary lock table entry.
4567 */
4568 xid = ProcGlobal->allPgXact[proc->pgprocno].xid;
4569
4570 /* Done with proc->fpLockBits */
4571 LWLockRelease(&proc->backendLock);
4572
4573 /* Time to wait. */
4574 (void) LockAcquire(&tag, ShareLock, false, false);
4575
4576 LockRelease(&tag, ShareLock, false);
4577 return XactLockForVirtualXact(vxid, xid, wait);
4578 }
4579
4580 /*
4581 * LockWaiterCount
4582 *
4583 * Find the number of lock requester on this locktag
4584 */
4585 int
LockWaiterCount(const LOCKTAG * locktag)4586 LockWaiterCount(const LOCKTAG *locktag)
4587 {
4588 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
4589 LOCK *lock;
4590 bool found;
4591 uint32 hashcode;
4592 LWLock *partitionLock;
4593 int waiters = 0;
4594
4595 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4596 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4597
4598 hashcode = LockTagHashCode(locktag);
4599 partitionLock = LockHashPartitionLock(hashcode);
4600 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4601
4602 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4603 (const void *) locktag,
4604 hashcode,
4605 HASH_FIND,
4606 &found);
4607 if (found)
4608 {
4609 Assert(lock != NULL);
4610 waiters = lock->nRequested;
4611 }
4612 LWLockRelease(partitionLock);
4613
4614 return waiters;
4615 }
4616