1 /*-------------------------------------------------------------------------
2 *
3 * lock.c
4 * POSTGRES primary lock mechanism
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/lmgr/lock.c
12 *
13 * NOTES
14 * A lock table is a shared memory hash table. When
15 * a process tries to acquire a lock of a type that conflicts
16 * with existing locks, it is put to sleep using the routines
17 * in storage/lmgr/proc.c.
18 *
19 * For the most part, this code should be invoked via lmgr.c
20 * or another lock-management module, not directly.
21 *
22 * Interface:
23 *
24 * InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
25 * LockAcquire(), LockRelease(), LockReleaseAll(),
26 * LockCheckConflicts(), GrantLock()
27 *
28 *-------------------------------------------------------------------------
29 */
30 #include "postgres.h"
31
32 #include <signal.h>
33 #include <unistd.h>
34
35 #include "access/transam.h"
36 #include "access/twophase.h"
37 #include "access/twophase_rmgr.h"
38 #include "access/xact.h"
39 #include "access/xlog.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/sinvaladt.h"
46 #include "storage/spin.h"
47 #include "storage/standby.h"
48 #include "utils/memutils.h"
49 #include "utils/ps_status.h"
50 #include "utils/resowner_private.h"
51
52
53 /* This configuration variable is used to set the lock table size */
54 int max_locks_per_xact; /* set by guc.c */
55
56 #define NLOCKENTS() \
57 mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
58
59
60 /*
61 * Data structures defining the semantics of the standard lock methods.
62 *
63 * The conflict table defines the semantics of the various lock modes.
64 */
65 static const LOCKMASK LockConflicts[] = {
66 0,
67
68 /* AccessShareLock */
69 LOCKBIT_ON(AccessExclusiveLock),
70
71 /* RowShareLock */
72 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
73
74 /* RowExclusiveLock */
75 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
76 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
77
78 /* ShareUpdateExclusiveLock */
79 LOCKBIT_ON(ShareUpdateExclusiveLock) |
80 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
81 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
82
83 /* ShareLock */
84 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
85 LOCKBIT_ON(ShareRowExclusiveLock) |
86 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
87
88 /* ShareRowExclusiveLock */
89 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
90 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
91 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
92
93 /* ExclusiveLock */
94 LOCKBIT_ON(RowShareLock) |
95 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
96 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
97 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
98
99 /* AccessExclusiveLock */
100 LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
101 LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
102 LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
103 LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)
104
105 };
106
107 /* Names of lock modes, for debug printouts */
108 static const char *const lock_mode_names[] =
109 {
110 "INVALID",
111 "AccessShareLock",
112 "RowShareLock",
113 "RowExclusiveLock",
114 "ShareUpdateExclusiveLock",
115 "ShareLock",
116 "ShareRowExclusiveLock",
117 "ExclusiveLock",
118 "AccessExclusiveLock"
119 };
120
121 #ifndef LOCK_DEBUG
122 static bool Dummy_trace = false;
123 #endif
124
125 static const LockMethodData default_lockmethod = {
126 AccessExclusiveLock, /* highest valid lock mode number */
127 LockConflicts,
128 lock_mode_names,
129 #ifdef LOCK_DEBUG
130 &Trace_locks
131 #else
132 &Dummy_trace
133 #endif
134 };
135
136 static const LockMethodData user_lockmethod = {
137 AccessExclusiveLock, /* highest valid lock mode number */
138 LockConflicts,
139 lock_mode_names,
140 #ifdef LOCK_DEBUG
141 &Trace_userlocks
142 #else
143 &Dummy_trace
144 #endif
145 };
146
147 /*
148 * map from lock method id to the lock table data structures
149 */
150 static const LockMethod LockMethods[] = {
151 NULL,
152 &default_lockmethod,
153 &user_lockmethod
154 };
155
156
157 /* Record that's written to 2PC state file when a lock is persisted */
158 typedef struct TwoPhaseLockRecord
159 {
160 LOCKTAG locktag;
161 LOCKMODE lockmode;
162 } TwoPhaseLockRecord;
163
164
165 /*
166 * Count of the number of fast path lock slots we believe to be used. This
167 * might be higher than the real number if another backend has transferred
168 * our locks to the primary lock table, but it can never be lower than the
169 * real value, since only we can acquire locks on our own behalf.
170 */
171 static int FastPathLocalUseCount = 0;
172
173 /* Macros for manipulating proc->fpLockBits */
174 #define FAST_PATH_BITS_PER_SLOT 3
175 #define FAST_PATH_LOCKNUMBER_OFFSET 1
176 #define FAST_PATH_MASK ((1 << FAST_PATH_BITS_PER_SLOT) - 1)
177 #define FAST_PATH_GET_BITS(proc, n) \
178 (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
179 #define FAST_PATH_BIT_POSITION(n, l) \
180 (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
181 AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
182 AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
183 ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
184 #define FAST_PATH_SET_LOCKMODE(proc, n, l) \
185 (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
186 #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
187 (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
188 #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
189 ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
190
191 /*
192 * The fast-path lock mechanism is concerned only with relation locks on
193 * unshared relations by backends bound to a database. The fast-path
194 * mechanism exists mostly to accelerate acquisition and release of locks
195 * that rarely conflict. Because ShareUpdateExclusiveLock is
196 * self-conflicting, it can't use the fast-path mechanism; but it also does
197 * not conflict with any of the locks that do, so we can ignore it completely.
198 */
199 #define EligibleForRelationFastPath(locktag, mode) \
200 ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
201 (locktag)->locktag_type == LOCKTAG_RELATION && \
202 (locktag)->locktag_field1 == MyDatabaseId && \
203 MyDatabaseId != InvalidOid && \
204 (mode) < ShareUpdateExclusiveLock)
205 #define ConflictsWithRelationFastPath(locktag, mode) \
206 ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
207 (locktag)->locktag_type == LOCKTAG_RELATION && \
208 (locktag)->locktag_field1 != InvalidOid && \
209 (mode) > ShareUpdateExclusiveLock)
210
211 static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
212 static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
213 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
214 const LOCKTAG *locktag, uint32 hashcode);
215 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
216
217 /*
218 * To make the fast-path lock mechanism work, we must have some way of
219 * preventing the use of the fast-path when a conflicting lock might be
220 * present. We partition* the locktag space into FAST_PATH_HASH_BUCKETS
221 * partitions, and maintain an integer count of the number of "strong" lockers
222 * in each partition. When any "strong" lockers are present (which is
223 * hopefully not very often), the fast-path mechanism can't be used, and we
224 * must fall back to the slower method of pushing matching locks directly
225 * into the main lock tables.
226 *
227 * The deadlock detector does not know anything about the fast path mechanism,
228 * so any locks that might be involved in a deadlock must be transferred from
229 * the fast-path queues to the main lock table.
230 */
231
232 #define FAST_PATH_STRONG_LOCK_HASH_BITS 10
233 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
234 (1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
235 #define FastPathStrongLockHashPartition(hashcode) \
236 ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
237
238 typedef struct
239 {
240 slock_t mutex;
241 uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
242 } FastPathStrongRelationLockData;
243
244 static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;
245
246
247 /*
248 * Pointers to hash tables containing lock state
249 *
250 * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
251 * shared memory; LockMethodLocalHash is local to each backend.
252 */
253 static HTAB *LockMethodLockHash;
254 static HTAB *LockMethodProcLockHash;
255 static HTAB *LockMethodLocalHash;
256
257
258 /* private state for error cleanup */
259 static LOCALLOCK *StrongLockInProgress;
260 static LOCALLOCK *awaitedLock;
261 static ResourceOwner awaitedOwner;
262
263
264 #ifdef LOCK_DEBUG
265
266 /*------
267 * The following configuration options are available for lock debugging:
268 *
269 * TRACE_LOCKS -- give a bunch of output what's going on in this file
270 * TRACE_USERLOCKS -- same but for user locks
271 * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
272 * (use to avoid output on system tables)
273 * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
274 * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;)
275 *
276 * Furthermore, but in storage/lmgr/lwlock.c:
277 * TRACE_LWLOCKS -- trace lightweight locks (pretty useless)
278 *
279 * Define LOCK_DEBUG at compile time to get all these enabled.
280 * --------
281 */
282
283 int Trace_lock_oidmin = FirstNormalObjectId;
284 bool Trace_locks = false;
285 bool Trace_userlocks = false;
286 int Trace_lock_table = 0;
287 bool Debug_deadlocks = false;
288
289
290 inline static bool
LOCK_DEBUG_ENABLED(const LOCKTAG * tag)291 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
292 {
293 return
294 (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
295 ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
296 || (Trace_lock_table &&
297 (tag->locktag_field2 == Trace_lock_table));
298 }
299
300
301 inline static void
LOCK_PRINT(const char * where,const LOCK * lock,LOCKMODE type)302 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
303 {
304 if (LOCK_DEBUG_ENABLED(&lock->tag))
305 elog(LOG,
306 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
307 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
308 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
309 where, lock,
310 lock->tag.locktag_field1, lock->tag.locktag_field2,
311 lock->tag.locktag_field3, lock->tag.locktag_field4,
312 lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
313 lock->grantMask,
314 lock->requested[1], lock->requested[2], lock->requested[3],
315 lock->requested[4], lock->requested[5], lock->requested[6],
316 lock->requested[7], lock->nRequested,
317 lock->granted[1], lock->granted[2], lock->granted[3],
318 lock->granted[4], lock->granted[5], lock->granted[6],
319 lock->granted[7], lock->nGranted,
320 lock->waitProcs.size,
321 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
322 }
323
324
325 inline static void
PROCLOCK_PRINT(const char * where,const PROCLOCK * proclockP)326 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
327 {
328 if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
329 elog(LOG,
330 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
331 where, proclockP, proclockP->tag.myLock,
332 PROCLOCK_LOCKMETHOD(*(proclockP)),
333 proclockP->tag.myProc, (int) proclockP->holdMask);
334 }
335 #else /* not LOCK_DEBUG */
336
337 #define LOCK_PRINT(where, lock, type) ((void) 0)
338 #define PROCLOCK_PRINT(where, proclockP) ((void) 0)
339 #endif /* not LOCK_DEBUG */
340
341
342 static uint32 proclock_hash(const void *key, Size keysize);
343 static void RemoveLocalLock(LOCALLOCK *locallock);
344 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
345 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
346 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
347 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
348 static void FinishStrongLockAcquire(void);
349 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
350 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
351 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
352 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
353 PROCLOCK *proclock, LockMethod lockMethodTable);
354 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
355 LockMethod lockMethodTable, uint32 hashcode,
356 bool wakeupNeeded);
357 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
358 LOCKTAG *locktag, LOCKMODE lockmode,
359 bool decrement_strong_lock_count);
360 static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc,
361 BlockedProcsData *data);
362
363
364 /*
365 * InitLocks -- Initialize the lock manager's data structures.
366 *
367 * This is called from CreateSharedMemoryAndSemaphores(), which see for
368 * more comments. In the normal postmaster case, the shared hash tables
369 * are created here, as well as a locallock hash table that will remain
370 * unused and empty in the postmaster itself. Backends inherit the pointers
371 * to the shared tables via fork(), and also inherit an image of the locallock
372 * hash table, which they proceed to use. In the EXEC_BACKEND case, each
373 * backend re-executes this code to obtain pointers to the already existing
374 * shared hash tables and to create its locallock hash table.
375 */
376 void
InitLocks(void)377 InitLocks(void)
378 {
379 HASHCTL info;
380 long init_table_size,
381 max_table_size;
382 bool found;
383
384 /*
385 * Compute init/max size to request for lock hashtables. Note these
386 * calculations must agree with LockShmemSize!
387 */
388 max_table_size = NLOCKENTS();
389 init_table_size = max_table_size / 2;
390
391 /*
392 * Allocate hash table for LOCK structs. This stores per-locked-object
393 * information.
394 */
395 MemSet(&info, 0, sizeof(info));
396 info.keysize = sizeof(LOCKTAG);
397 info.entrysize = sizeof(LOCK);
398 info.num_partitions = NUM_LOCK_PARTITIONS;
399
400 LockMethodLockHash = ShmemInitHash("LOCK hash",
401 init_table_size,
402 max_table_size,
403 &info,
404 HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
405
406 /* Assume an average of 2 holders per lock */
407 max_table_size *= 2;
408 init_table_size *= 2;
409
410 /*
411 * Allocate hash table for PROCLOCK structs. This stores
412 * per-lock-per-holder information.
413 */
414 info.keysize = sizeof(PROCLOCKTAG);
415 info.entrysize = sizeof(PROCLOCK);
416 info.hash = proclock_hash;
417 info.num_partitions = NUM_LOCK_PARTITIONS;
418
419 LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
420 init_table_size,
421 max_table_size,
422 &info,
423 HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
424
425 /*
426 * Allocate fast-path structures.
427 */
428 FastPathStrongRelationLocks =
429 ShmemInitStruct("Fast Path Strong Relation Lock Data",
430 sizeof(FastPathStrongRelationLockData), &found);
431 if (!found)
432 SpinLockInit(&FastPathStrongRelationLocks->mutex);
433
434 /*
435 * Allocate non-shared hash table for LOCALLOCK structs. This stores lock
436 * counts and resource owner information.
437 *
438 * The non-shared table could already exist in this process (this occurs
439 * when the postmaster is recreating shared memory after a backend crash).
440 * If so, delete and recreate it. (We could simply leave it, since it
441 * ought to be empty in the postmaster, but for safety let's zap it.)
442 */
443 if (LockMethodLocalHash)
444 hash_destroy(LockMethodLocalHash);
445
446 info.keysize = sizeof(LOCALLOCKTAG);
447 info.entrysize = sizeof(LOCALLOCK);
448
449 LockMethodLocalHash = hash_create("LOCALLOCK hash",
450 16,
451 &info,
452 HASH_ELEM | HASH_BLOBS);
453 }
454
455
456 /*
457 * Fetch the lock method table associated with a given lock
458 */
459 LockMethod
GetLocksMethodTable(const LOCK * lock)460 GetLocksMethodTable(const LOCK *lock)
461 {
462 LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
463
464 Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
465 return LockMethods[lockmethodid];
466 }
467
468 /*
469 * Fetch the lock method table associated with a given locktag
470 */
471 LockMethod
GetLockTagsMethodTable(const LOCKTAG * locktag)472 GetLockTagsMethodTable(const LOCKTAG *locktag)
473 {
474 LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid;
475
476 Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
477 return LockMethods[lockmethodid];
478 }
479
480
481 /*
482 * Compute the hash code associated with a LOCKTAG.
483 *
484 * To avoid unnecessary recomputations of the hash code, we try to do this
485 * just once per function, and then pass it around as needed. Aside from
486 * passing the hashcode to hash_search_with_hash_value(), we can extract
487 * the lock partition number from the hashcode.
488 */
489 uint32
LockTagHashCode(const LOCKTAG * locktag)490 LockTagHashCode(const LOCKTAG *locktag)
491 {
492 return get_hash_value(LockMethodLockHash, (const void *) locktag);
493 }
494
495 /*
496 * Compute the hash code associated with a PROCLOCKTAG.
497 *
498 * Because we want to use just one set of partition locks for both the
499 * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
500 * fall into the same partition number as their associated LOCKs.
501 * dynahash.c expects the partition number to be the low-order bits of
502 * the hash code, and therefore a PROCLOCKTAG's hash code must have the
503 * same low-order bits as the associated LOCKTAG's hash code. We achieve
504 * this with this specialized hash function.
505 */
506 static uint32
proclock_hash(const void * key,Size keysize)507 proclock_hash(const void *key, Size keysize)
508 {
509 const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
510 uint32 lockhash;
511 Datum procptr;
512
513 Assert(keysize == sizeof(PROCLOCKTAG));
514
515 /* Look into the associated LOCK object, and compute its hash code */
516 lockhash = LockTagHashCode(&proclocktag->myLock->tag);
517
518 /*
519 * To make the hash code also depend on the PGPROC, we xor the proc
520 * struct's address into the hash code, left-shifted so that the
521 * partition-number bits don't change. Since this is only a hash, we
522 * don't care if we lose high-order bits of the address; use an
523 * intermediate variable to suppress cast-pointer-to-int warnings.
524 */
525 procptr = PointerGetDatum(proclocktag->myProc);
526 lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
527
528 return lockhash;
529 }
530
531 /*
532 * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
533 * for its underlying LOCK.
534 *
535 * We use this just to avoid redundant calls of LockTagHashCode().
536 */
537 static inline uint32
ProcLockHashCode(const PROCLOCKTAG * proclocktag,uint32 hashcode)538 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
539 {
540 uint32 lockhash = hashcode;
541 Datum procptr;
542
543 /*
544 * This must match proclock_hash()!
545 */
546 procptr = PointerGetDatum(proclocktag->myProc);
547 lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
548
549 return lockhash;
550 }
551
552 /*
553 * Given two lock modes, return whether they would conflict.
554 */
555 bool
DoLockModesConflict(LOCKMODE mode1,LOCKMODE mode2)556 DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
557 {
558 LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
559
560 if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
561 return true;
562
563 return false;
564 }
565
566 /*
567 * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
568 * by the current transaction
569 */
570 bool
LockHeldByMe(const LOCKTAG * locktag,LOCKMODE lockmode)571 LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
572 {
573 LOCALLOCKTAG localtag;
574 LOCALLOCK *locallock;
575
576 /*
577 * See if there is a LOCALLOCK entry for this lock and lockmode
578 */
579 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
580 localtag.lock = *locktag;
581 localtag.mode = lockmode;
582
583 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
584 (void *) &localtag,
585 HASH_FIND, NULL);
586
587 return (locallock && locallock->nLocks > 0);
588 }
589
590 /*
591 * LockHasWaiters -- look up 'locktag' and check if releasing this
592 * lock would wake up other processes waiting for it.
593 */
594 bool
LockHasWaiters(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)595 LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
596 {
597 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
598 LockMethod lockMethodTable;
599 LOCALLOCKTAG localtag;
600 LOCALLOCK *locallock;
601 LOCK *lock;
602 PROCLOCK *proclock;
603 LWLock *partitionLock;
604 bool hasWaiters = false;
605
606 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
607 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
608 lockMethodTable = LockMethods[lockmethodid];
609 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
610 elog(ERROR, "unrecognized lock mode: %d", lockmode);
611
612 #ifdef LOCK_DEBUG
613 if (LOCK_DEBUG_ENABLED(locktag))
614 elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
615 locktag->locktag_field1, locktag->locktag_field2,
616 lockMethodTable->lockModeNames[lockmode]);
617 #endif
618
619 /*
620 * Find the LOCALLOCK entry for this lock and lockmode
621 */
622 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
623 localtag.lock = *locktag;
624 localtag.mode = lockmode;
625
626 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
627 (void *) &localtag,
628 HASH_FIND, NULL);
629
630 /*
631 * let the caller print its own error message, too. Do not ereport(ERROR).
632 */
633 if (!locallock || locallock->nLocks <= 0)
634 {
635 elog(WARNING, "you don't own a lock of type %s",
636 lockMethodTable->lockModeNames[lockmode]);
637 return false;
638 }
639
640 /*
641 * Check the shared lock table.
642 */
643 partitionLock = LockHashPartitionLock(locallock->hashcode);
644
645 LWLockAcquire(partitionLock, LW_SHARED);
646
647 /*
648 * We don't need to re-find the lock or proclock, since we kept their
649 * addresses in the locallock table, and they couldn't have been removed
650 * while we were holding a lock on them.
651 */
652 lock = locallock->lock;
653 LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
654 proclock = locallock->proclock;
655 PROCLOCK_PRINT("LockHasWaiters: found", proclock);
656
657 /*
658 * Double-check that we are actually holding a lock of the type we want to
659 * release.
660 */
661 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
662 {
663 PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
664 LWLockRelease(partitionLock);
665 elog(WARNING, "you don't own a lock of type %s",
666 lockMethodTable->lockModeNames[lockmode]);
667 RemoveLocalLock(locallock);
668 return false;
669 }
670
671 /*
672 * Do the checking.
673 */
674 if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
675 hasWaiters = true;
676
677 LWLockRelease(partitionLock);
678
679 return hasWaiters;
680 }
681
682 /*
683 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
684 * set lock if/when no conflicts.
685 *
686 * Inputs:
687 * locktag: unique identifier for the lockable object
688 * lockmode: lock mode to acquire
689 * sessionLock: if true, acquire lock for session not current transaction
690 * dontWait: if true, don't wait to acquire lock
691 *
692 * Returns one of:
693 * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
694 * LOCKACQUIRE_OK lock successfully acquired
695 * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
696 * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear
697 *
698 * In the normal case where dontWait=false and the caller doesn't need to
699 * distinguish a freshly acquired lock from one already taken earlier in
700 * this same transaction, there is no need to examine the return value.
701 *
702 * Side Effects: The lock is acquired and recorded in lock tables.
703 *
704 * NOTE: if we wait for the lock, there is no way to abort the wait
705 * short of aborting the transaction.
706 */
707 LockAcquireResult
LockAcquire(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait)708 LockAcquire(const LOCKTAG *locktag,
709 LOCKMODE lockmode,
710 bool sessionLock,
711 bool dontWait)
712 {
713 return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
714 true, NULL);
715 }
716
717 /*
718 * LockAcquireExtended - allows us to specify additional options
719 *
720 * reportMemoryError specifies whether a lock request that fills the lock
721 * table should generate an ERROR or not. Passing "false" allows the caller
722 * to attempt to recover from lock-table-full situations, perhaps by forcibly
723 * cancelling other lock holders and then retrying. Note, however, that the
724 * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use
725 * in combination with dontWait = true, as the cause of failure couldn't be
726 * distinguished.
727 *
728 * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
729 * table entry if a lock is successfully acquired, or NULL if not.
730 */
731 LockAcquireResult
LockAcquireExtended(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock,bool dontWait,bool reportMemoryError,LOCALLOCK ** locallockp)732 LockAcquireExtended(const LOCKTAG *locktag,
733 LOCKMODE lockmode,
734 bool sessionLock,
735 bool dontWait,
736 bool reportMemoryError,
737 LOCALLOCK **locallockp)
738 {
739 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
740 LockMethod lockMethodTable;
741 LOCALLOCKTAG localtag;
742 LOCALLOCK *locallock;
743 LOCK *lock;
744 PROCLOCK *proclock;
745 bool found;
746 ResourceOwner owner;
747 uint32 hashcode;
748 LWLock *partitionLock;
749 int status;
750 bool log_lock = false;
751
752 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
753 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
754 lockMethodTable = LockMethods[lockmethodid];
755 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
756 elog(ERROR, "unrecognized lock mode: %d", lockmode);
757
758 if (RecoveryInProgress() && !InRecovery &&
759 (locktag->locktag_type == LOCKTAG_OBJECT ||
760 locktag->locktag_type == LOCKTAG_RELATION) &&
761 lockmode > RowExclusiveLock)
762 ereport(ERROR,
763 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
764 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
765 lockMethodTable->lockModeNames[lockmode]),
766 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
767
768 #ifdef LOCK_DEBUG
769 if (LOCK_DEBUG_ENABLED(locktag))
770 elog(LOG, "LockAcquire: lock [%u,%u] %s",
771 locktag->locktag_field1, locktag->locktag_field2,
772 lockMethodTable->lockModeNames[lockmode]);
773 #endif
774
775 /* Identify owner for lock */
776 if (sessionLock)
777 owner = NULL;
778 else
779 owner = CurrentResourceOwner;
780
781 /*
782 * Find or create a LOCALLOCK entry for this lock and lockmode
783 */
784 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
785 localtag.lock = *locktag;
786 localtag.mode = lockmode;
787
788 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
789 (void *) &localtag,
790 HASH_ENTER, &found);
791
792 /*
793 * if it's a new locallock object, initialize it
794 */
795 if (!found)
796 {
797 locallock->lock = NULL;
798 locallock->proclock = NULL;
799 locallock->hashcode = LockTagHashCode(&(localtag.lock));
800 locallock->nLocks = 0;
801 locallock->holdsStrongLockCount = false;
802 locallock->lockCleared = false;
803 locallock->numLockOwners = 0;
804 locallock->maxLockOwners = 8;
805 locallock->lockOwners = NULL; /* in case next line fails */
806 locallock->lockOwners = (LOCALLOCKOWNER *)
807 MemoryContextAlloc(TopMemoryContext,
808 locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
809 }
810 else
811 {
812 /* Make sure there will be room to remember the lock */
813 if (locallock->numLockOwners >= locallock->maxLockOwners)
814 {
815 int newsize = locallock->maxLockOwners * 2;
816
817 locallock->lockOwners = (LOCALLOCKOWNER *)
818 repalloc(locallock->lockOwners,
819 newsize * sizeof(LOCALLOCKOWNER));
820 locallock->maxLockOwners = newsize;
821 }
822 }
823 hashcode = locallock->hashcode;
824
825 if (locallockp)
826 *locallockp = locallock;
827
828 /*
829 * If we already hold the lock, we can just increase the count locally.
830 *
831 * If lockCleared is already set, caller need not worry about absorbing
832 * sinval messages related to the lock's object.
833 */
834 if (locallock->nLocks > 0)
835 {
836 GrantLockLocal(locallock, owner);
837 if (locallock->lockCleared)
838 return LOCKACQUIRE_ALREADY_CLEAR;
839 else
840 return LOCKACQUIRE_ALREADY_HELD;
841 }
842
843 /*
844 * Prepare to emit a WAL record if acquisition of this lock needs to be
845 * replayed in a standby server.
846 *
847 * Here we prepare to log; after lock is acquired we'll issue log record.
848 * This arrangement simplifies error recovery in case the preparation step
849 * fails.
850 *
851 * Only AccessExclusiveLocks can conflict with lock types that read-only
852 * transactions can acquire in a standby server. Make sure this definition
853 * matches the one in GetRunningTransactionLocks().
854 */
855 if (lockmode >= AccessExclusiveLock &&
856 locktag->locktag_type == LOCKTAG_RELATION &&
857 !RecoveryInProgress() &&
858 XLogStandbyInfoActive())
859 {
860 LogAccessExclusiveLockPrepare();
861 log_lock = true;
862 }
863
864 /*
865 * Attempt to take lock via fast path, if eligible. But if we remember
866 * having filled up the fast path array, we don't attempt to make any
867 * further use of it until we release some locks. It's possible that some
868 * other backend has transferred some of those locks to the shared hash
869 * table, leaving space free, but it's not worth acquiring the LWLock just
870 * to check. It's also possible that we're acquiring a second or third
871 * lock type on a relation we have already locked using the fast-path, but
872 * for now we don't worry about that case either.
873 */
874 if (EligibleForRelationFastPath(locktag, lockmode) &&
875 FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
876 {
877 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
878 bool acquired;
879
880 /*
881 * LWLockAcquire acts as a memory sequencing point, so it's safe to
882 * assume that any strong locker whose increment to
883 * FastPathStrongRelationLocks->counts becomes visible after we test
884 * it has yet to begin to transfer fast-path locks.
885 */
886 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
887 if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
888 acquired = false;
889 else
890 acquired = FastPathGrantRelationLock(locktag->locktag_field2,
891 lockmode);
892 LWLockRelease(&MyProc->backendLock);
893 if (acquired)
894 {
895 /*
896 * The locallock might contain stale pointers to some old shared
897 * objects; we MUST reset these to null before considering the
898 * lock to be acquired via fast-path.
899 */
900 locallock->lock = NULL;
901 locallock->proclock = NULL;
902 GrantLockLocal(locallock, owner);
903 return LOCKACQUIRE_OK;
904 }
905 }
906
907 /*
908 * If this lock could potentially have been taken via the fast-path by
909 * some other backend, we must (temporarily) disable further use of the
910 * fast-path for this lock tag, and migrate any locks already taken via
911 * this method to the main lock table.
912 */
913 if (ConflictsWithRelationFastPath(locktag, lockmode))
914 {
915 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
916
917 BeginStrongLockAcquire(locallock, fasthashcode);
918 if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
919 hashcode))
920 {
921 AbortStrongLockAcquire();
922 if (locallock->nLocks == 0)
923 RemoveLocalLock(locallock);
924 if (locallockp)
925 *locallockp = NULL;
926 if (reportMemoryError)
927 ereport(ERROR,
928 (errcode(ERRCODE_OUT_OF_MEMORY),
929 errmsg("out of shared memory"),
930 errhint("You might need to increase max_locks_per_transaction.")));
931 else
932 return LOCKACQUIRE_NOT_AVAIL;
933 }
934 }
935
936 /*
937 * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
938 * take it via the fast-path, either, so we've got to mess with the shared
939 * lock table.
940 */
941 partitionLock = LockHashPartitionLock(hashcode);
942
943 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
944
945 /*
946 * Find or create lock and proclock entries with this tag
947 *
948 * Note: if the locallock object already existed, it might have a pointer
949 * to the lock already ... but we should not assume that that pointer is
950 * valid, since a lock object with zero hold and request counts can go
951 * away anytime. So we have to use SetupLockInTable() to recompute the
952 * lock and proclock pointers, even if they're already set.
953 */
954 proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
955 hashcode, lockmode);
956 if (!proclock)
957 {
958 AbortStrongLockAcquire();
959 LWLockRelease(partitionLock);
960 if (locallock->nLocks == 0)
961 RemoveLocalLock(locallock);
962 if (locallockp)
963 *locallockp = NULL;
964 if (reportMemoryError)
965 ereport(ERROR,
966 (errcode(ERRCODE_OUT_OF_MEMORY),
967 errmsg("out of shared memory"),
968 errhint("You might need to increase max_locks_per_transaction.")));
969 else
970 return LOCKACQUIRE_NOT_AVAIL;
971 }
972 locallock->proclock = proclock;
973 lock = proclock->tag.myLock;
974 locallock->lock = lock;
975
976 /*
977 * If lock requested conflicts with locks requested by waiters, must join
978 * wait queue. Otherwise, check for conflict with already-held locks.
979 * (That's last because most complex check.)
980 */
981 if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
982 status = STATUS_FOUND;
983 else
984 status = LockCheckConflicts(lockMethodTable, lockmode,
985 lock, proclock);
986
987 if (status == STATUS_OK)
988 {
989 /* No conflict with held or previously requested locks */
990 GrantLock(lock, proclock, lockmode);
991 GrantLockLocal(locallock, owner);
992 }
993 else
994 {
995 Assert(status == STATUS_FOUND);
996
997 /*
998 * We can't acquire the lock immediately. If caller specified no
999 * blocking, remove useless table entries and return NOT_AVAIL without
1000 * waiting.
1001 */
1002 if (dontWait)
1003 {
1004 AbortStrongLockAcquire();
1005 if (proclock->holdMask == 0)
1006 {
1007 uint32 proclock_hashcode;
1008
1009 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1010 SHMQueueDelete(&proclock->lockLink);
1011 SHMQueueDelete(&proclock->procLink);
1012 if (!hash_search_with_hash_value(LockMethodProcLockHash,
1013 (void *) &(proclock->tag),
1014 proclock_hashcode,
1015 HASH_REMOVE,
1016 NULL))
1017 elog(PANIC, "proclock table corrupted");
1018 }
1019 else
1020 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
1021 lock->nRequested--;
1022 lock->requested[lockmode]--;
1023 LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
1024 Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
1025 Assert(lock->nGranted <= lock->nRequested);
1026 LWLockRelease(partitionLock);
1027 if (locallock->nLocks == 0)
1028 RemoveLocalLock(locallock);
1029 if (locallockp)
1030 *locallockp = NULL;
1031 return LOCKACQUIRE_NOT_AVAIL;
1032 }
1033
1034 /*
1035 * Set bitmask of locks this process already holds on this object.
1036 */
1037 MyProc->heldLocks = proclock->holdMask;
1038
1039 /*
1040 * Sleep till someone wakes me up.
1041 */
1042
1043 TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
1044 locktag->locktag_field2,
1045 locktag->locktag_field3,
1046 locktag->locktag_field4,
1047 locktag->locktag_type,
1048 lockmode);
1049
1050 WaitOnLock(locallock, owner);
1051
1052 TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
1053 locktag->locktag_field2,
1054 locktag->locktag_field3,
1055 locktag->locktag_field4,
1056 locktag->locktag_type,
1057 lockmode);
1058
1059 /*
1060 * NOTE: do not do any material change of state between here and
1061 * return. All required changes in locktable state must have been
1062 * done when the lock was granted to us --- see notes in WaitOnLock.
1063 */
1064
1065 /*
1066 * Check the proclock entry status, in case something in the ipc
1067 * communication doesn't work correctly.
1068 */
1069 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1070 {
1071 AbortStrongLockAcquire();
1072 PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
1073 LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
1074 /* Should we retry ? */
1075 LWLockRelease(partitionLock);
1076 elog(ERROR, "LockAcquire failed");
1077 }
1078 PROCLOCK_PRINT("LockAcquire: granted", proclock);
1079 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
1080 }
1081
1082 /*
1083 * Lock state is fully up-to-date now; if we error out after this, no
1084 * special error cleanup is required.
1085 */
1086 FinishStrongLockAcquire();
1087
1088 LWLockRelease(partitionLock);
1089
1090 /*
1091 * Emit a WAL record if acquisition of this lock needs to be replayed in a
1092 * standby server.
1093 */
1094 if (log_lock)
1095 {
1096 /*
1097 * Decode the locktag back to the original values, to avoid sending
1098 * lots of empty bytes with every message. See lock.h to check how a
1099 * locktag is defined for LOCKTAG_RELATION
1100 */
1101 LogAccessExclusiveLock(locktag->locktag_field1,
1102 locktag->locktag_field2);
1103 }
1104
1105 return LOCKACQUIRE_OK;
1106 }
1107
1108 /*
1109 * Find or create LOCK and PROCLOCK objects as needed for a new lock
1110 * request.
1111 *
1112 * Returns the PROCLOCK object, or NULL if we failed to create the objects
1113 * for lack of shared memory.
1114 *
1115 * The appropriate partition lock must be held at entry, and will be
1116 * held at exit.
1117 */
1118 static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable,PGPROC * proc,const LOCKTAG * locktag,uint32 hashcode,LOCKMODE lockmode)1119 SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
1120 const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
1121 {
1122 LOCK *lock;
1123 PROCLOCK *proclock;
1124 PROCLOCKTAG proclocktag;
1125 uint32 proclock_hashcode;
1126 bool found;
1127
1128 /*
1129 * Find or create a lock with this tag.
1130 */
1131 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1132 (const void *) locktag,
1133 hashcode,
1134 HASH_ENTER_NULL,
1135 &found);
1136 if (!lock)
1137 return NULL;
1138
1139 /*
1140 * if it's a new lock object, initialize it
1141 */
1142 if (!found)
1143 {
1144 lock->grantMask = 0;
1145 lock->waitMask = 0;
1146 SHMQueueInit(&(lock->procLocks));
1147 ProcQueueInit(&(lock->waitProcs));
1148 lock->nRequested = 0;
1149 lock->nGranted = 0;
1150 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
1151 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
1152 LOCK_PRINT("LockAcquire: new", lock, lockmode);
1153 }
1154 else
1155 {
1156 LOCK_PRINT("LockAcquire: found", lock, lockmode);
1157 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1158 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1159 Assert(lock->nGranted <= lock->nRequested);
1160 }
1161
1162 /*
1163 * Create the hash key for the proclock table.
1164 */
1165 proclocktag.myLock = lock;
1166 proclocktag.myProc = proc;
1167
1168 proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
1169
1170 /*
1171 * Find or create a proclock entry with this tag
1172 */
1173 proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
1174 (void *) &proclocktag,
1175 proclock_hashcode,
1176 HASH_ENTER_NULL,
1177 &found);
1178 if (!proclock)
1179 {
1180 /* Oops, not enough shmem for the proclock */
1181 if (lock->nRequested == 0)
1182 {
1183 /*
1184 * There are no other requestors of this lock, so garbage-collect
1185 * the lock object. We *must* do this to avoid a permanent leak
1186 * of shared memory, because there won't be anything to cause
1187 * anyone to release the lock object later.
1188 */
1189 Assert(SHMQueueEmpty(&(lock->procLocks)));
1190 if (!hash_search_with_hash_value(LockMethodLockHash,
1191 (void *) &(lock->tag),
1192 hashcode,
1193 HASH_REMOVE,
1194 NULL))
1195 elog(PANIC, "lock table corrupted");
1196 }
1197 return NULL;
1198 }
1199
1200 /*
1201 * If new, initialize the new entry
1202 */
1203 if (!found)
1204 {
1205 uint32 partition = LockHashPartition(hashcode);
1206
1207 /*
1208 * It might seem unsafe to access proclock->groupLeader without a
1209 * lock, but it's not really. Either we are initializing a proclock
1210 * on our own behalf, in which case our group leader isn't changing
1211 * because the group leader for a process can only ever be changed by
1212 * the process itself; or else we are transferring a fast-path lock to
1213 * the main lock table, in which case that process can't change it's
1214 * lock group leader without first releasing all of its locks (and in
1215 * particular the one we are currently transferring).
1216 */
1217 proclock->groupLeader = proc->lockGroupLeader != NULL ?
1218 proc->lockGroupLeader : proc;
1219 proclock->holdMask = 0;
1220 proclock->releaseMask = 0;
1221 /* Add proclock to appropriate lists */
1222 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1223 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1224 &proclock->procLink);
1225 PROCLOCK_PRINT("LockAcquire: new", proclock);
1226 }
1227 else
1228 {
1229 PROCLOCK_PRINT("LockAcquire: found", proclock);
1230 Assert((proclock->holdMask & ~lock->grantMask) == 0);
1231
1232 #ifdef CHECK_DEADLOCK_RISK
1233
1234 /*
1235 * Issue warning if we already hold a lower-level lock on this object
1236 * and do not hold a lock of the requested level or higher. This
1237 * indicates a deadlock-prone coding practice (eg, we'd have a
1238 * deadlock if another backend were following the same code path at
1239 * about the same time).
1240 *
1241 * This is not enabled by default, because it may generate log entries
1242 * about user-level coding practices that are in fact safe in context.
1243 * It can be enabled to help find system-level problems.
1244 *
1245 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
1246 * better to use a table. For now, though, this works.
1247 */
1248 {
1249 int i;
1250
1251 for (i = lockMethodTable->numLockModes; i > 0; i--)
1252 {
1253 if (proclock->holdMask & LOCKBIT_ON(i))
1254 {
1255 if (i >= (int) lockmode)
1256 break; /* safe: we have a lock >= req level */
1257 elog(LOG, "deadlock risk: raising lock level"
1258 " from %s to %s on object %u/%u/%u",
1259 lockMethodTable->lockModeNames[i],
1260 lockMethodTable->lockModeNames[lockmode],
1261 lock->tag.locktag_field1, lock->tag.locktag_field2,
1262 lock->tag.locktag_field3);
1263 break;
1264 }
1265 }
1266 }
1267 #endif /* CHECK_DEADLOCK_RISK */
1268 }
1269
1270 /*
1271 * lock->nRequested and lock->requested[] count the total number of
1272 * requests, whether granted or waiting, so increment those immediately.
1273 * The other counts don't increment till we get the lock.
1274 */
1275 lock->nRequested++;
1276 lock->requested[lockmode]++;
1277 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1278
1279 /*
1280 * We shouldn't already hold the desired lock; else locallock table is
1281 * broken.
1282 */
1283 if (proclock->holdMask & LOCKBIT_ON(lockmode))
1284 elog(ERROR, "lock %s on object %u/%u/%u is already held",
1285 lockMethodTable->lockModeNames[lockmode],
1286 lock->tag.locktag_field1, lock->tag.locktag_field2,
1287 lock->tag.locktag_field3);
1288
1289 return proclock;
1290 }
1291
1292 /*
1293 * Subroutine to free a locallock entry
1294 */
1295 static void
RemoveLocalLock(LOCALLOCK * locallock)1296 RemoveLocalLock(LOCALLOCK *locallock)
1297 {
1298 int i;
1299
1300 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1301 {
1302 if (locallock->lockOwners[i].owner != NULL)
1303 ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
1304 }
1305 locallock->numLockOwners = 0;
1306 if (locallock->lockOwners != NULL)
1307 pfree(locallock->lockOwners);
1308 locallock->lockOwners = NULL;
1309
1310 if (locallock->holdsStrongLockCount)
1311 {
1312 uint32 fasthashcode;
1313
1314 fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1315
1316 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1317 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1318 FastPathStrongRelationLocks->count[fasthashcode]--;
1319 locallock->holdsStrongLockCount = false;
1320 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1321 }
1322
1323 if (!hash_search(LockMethodLocalHash,
1324 (void *) &(locallock->tag),
1325 HASH_REMOVE, NULL))
1326 elog(WARNING, "locallock table corrupted");
1327 }
1328
1329 /*
1330 * LockCheckConflicts -- test whether requested lock conflicts
1331 * with those already granted
1332 *
1333 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1334 *
1335 * NOTES:
1336 * Here's what makes this complicated: one process's locks don't
1337 * conflict with one another, no matter what purpose they are held for
1338 * (eg, session and transaction locks do not conflict). Nor do the locks
1339 * of one process in a lock group conflict with those of another process in
1340 * the same group. So, we must subtract off these locks when determining
1341 * whether the requested new lock conflicts with those already held.
1342 */
1343 int
LockCheckConflicts(LockMethod lockMethodTable,LOCKMODE lockmode,LOCK * lock,PROCLOCK * proclock)1344 LockCheckConflicts(LockMethod lockMethodTable,
1345 LOCKMODE lockmode,
1346 LOCK *lock,
1347 PROCLOCK *proclock)
1348 {
1349 int numLockModes = lockMethodTable->numLockModes;
1350 LOCKMASK myLocks;
1351 int conflictMask = lockMethodTable->conflictTab[lockmode];
1352 int conflictsRemaining[MAX_LOCKMODES];
1353 int totalConflictsRemaining = 0;
1354 int i;
1355 SHM_QUEUE *procLocks;
1356 PROCLOCK *otherproclock;
1357
1358 /*
1359 * first check for global conflicts: If no locks conflict with my request,
1360 * then I get the lock.
1361 *
1362 * Checking for conflict: lock->grantMask represents the types of
1363 * currently held locks. conflictTable[lockmode] has a bit set for each
1364 * type of lock that conflicts with request. Bitwise compare tells if
1365 * there is a conflict.
1366 */
1367 if (!(conflictMask & lock->grantMask))
1368 {
1369 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1370 return STATUS_OK;
1371 }
1372
1373 /*
1374 * Rats. Something conflicts. But it could still be my own lock, or a
1375 * lock held by another member of my locking group. First, figure out how
1376 * many conflicts remain after subtracting out any locks I hold myself.
1377 */
1378 myLocks = proclock->holdMask;
1379 for (i = 1; i <= numLockModes; i++)
1380 {
1381 if ((conflictMask & LOCKBIT_ON(i)) == 0)
1382 {
1383 conflictsRemaining[i] = 0;
1384 continue;
1385 }
1386 conflictsRemaining[i] = lock->granted[i];
1387 if (myLocks & LOCKBIT_ON(i))
1388 --conflictsRemaining[i];
1389 totalConflictsRemaining += conflictsRemaining[i];
1390 }
1391
1392 /* If no conflicts remain, we get the lock. */
1393 if (totalConflictsRemaining == 0)
1394 {
1395 PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
1396 return STATUS_OK;
1397 }
1398
1399 /* If no group locking, it's definitely a conflict. */
1400 if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
1401 {
1402 Assert(proclock->tag.myProc == MyProc);
1403 PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
1404 proclock);
1405 return STATUS_FOUND;
1406 }
1407
1408 /*
1409 * Locks held in conflicting modes by members of our own lock group are
1410 * not real conflicts; we can subtract those out and see if we still have
1411 * a conflict. This is O(N) in the number of processes holding or
1412 * awaiting locks on this object. We could improve that by making the
1413 * shared memory state more complex (and larger) but it doesn't seem worth
1414 * it.
1415 */
1416 procLocks = &(lock->procLocks);
1417 otherproclock = (PROCLOCK *)
1418 SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
1419 while (otherproclock != NULL)
1420 {
1421 if (proclock != otherproclock &&
1422 proclock->groupLeader == otherproclock->groupLeader &&
1423 (otherproclock->holdMask & conflictMask) != 0)
1424 {
1425 int intersectMask = otherproclock->holdMask & conflictMask;
1426
1427 for (i = 1; i <= numLockModes; i++)
1428 {
1429 if ((intersectMask & LOCKBIT_ON(i)) != 0)
1430 {
1431 if (conflictsRemaining[i] <= 0)
1432 elog(PANIC, "proclocks held do not match lock");
1433 conflictsRemaining[i]--;
1434 totalConflictsRemaining--;
1435 }
1436 }
1437
1438 if (totalConflictsRemaining == 0)
1439 {
1440 PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
1441 proclock);
1442 return STATUS_OK;
1443 }
1444 }
1445 otherproclock = (PROCLOCK *)
1446 SHMQueueNext(procLocks, &otherproclock->lockLink,
1447 offsetof(PROCLOCK, lockLink));
1448 }
1449
1450 /* Nope, it's a real conflict. */
1451 PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
1452 return STATUS_FOUND;
1453 }
1454
1455 /*
1456 * GrantLock -- update the lock and proclock data structures to show
1457 * the lock request has been granted.
1458 *
1459 * NOTE: if proc was blocked, it also needs to be removed from the wait list
1460 * and have its waitLock/waitProcLock fields cleared. That's not done here.
1461 *
1462 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
1463 * table entry; but since we may be awaking some other process, we can't do
1464 * that here; it's done by GrantLockLocal, instead.
1465 */
1466 void
GrantLock(LOCK * lock,PROCLOCK * proclock,LOCKMODE lockmode)1467 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
1468 {
1469 lock->nGranted++;
1470 lock->granted[lockmode]++;
1471 lock->grantMask |= LOCKBIT_ON(lockmode);
1472 if (lock->granted[lockmode] == lock->requested[lockmode])
1473 lock->waitMask &= LOCKBIT_OFF(lockmode);
1474 proclock->holdMask |= LOCKBIT_ON(lockmode);
1475 LOCK_PRINT("GrantLock", lock, lockmode);
1476 Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1477 Assert(lock->nGranted <= lock->nRequested);
1478 }
1479
1480 /*
1481 * UnGrantLock -- opposite of GrantLock.
1482 *
1483 * Updates the lock and proclock data structures to show that the lock
1484 * is no longer held nor requested by the current holder.
1485 *
1486 * Returns true if there were any waiters waiting on the lock that
1487 * should now be woken up with ProcLockWakeup.
1488 */
1489 static bool
UnGrantLock(LOCK * lock,LOCKMODE lockmode,PROCLOCK * proclock,LockMethod lockMethodTable)1490 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
1491 PROCLOCK *proclock, LockMethod lockMethodTable)
1492 {
1493 bool wakeupNeeded = false;
1494
1495 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1496 Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1497 Assert(lock->nGranted <= lock->nRequested);
1498
1499 /*
1500 * fix the general lock stats
1501 */
1502 lock->nRequested--;
1503 lock->requested[lockmode]--;
1504 lock->nGranted--;
1505 lock->granted[lockmode]--;
1506
1507 if (lock->granted[lockmode] == 0)
1508 {
1509 /* change the conflict mask. No more of this lock type. */
1510 lock->grantMask &= LOCKBIT_OFF(lockmode);
1511 }
1512
1513 LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
1514
1515 /*
1516 * We need only run ProcLockWakeup if the released lock conflicts with at
1517 * least one of the lock types requested by waiter(s). Otherwise whatever
1518 * conflict made them wait must still exist. NOTE: before MVCC, we could
1519 * skip wakeup if lock->granted[lockmode] was still positive. But that's
1520 * not true anymore, because the remaining granted locks might belong to
1521 * some waiter, who could now be awakened because he doesn't conflict with
1522 * his own locks.
1523 */
1524 if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1525 wakeupNeeded = true;
1526
1527 /*
1528 * Now fix the per-proclock state.
1529 */
1530 proclock->holdMask &= LOCKBIT_OFF(lockmode);
1531 PROCLOCK_PRINT("UnGrantLock: updated", proclock);
1532
1533 return wakeupNeeded;
1534 }
1535
1536 /*
1537 * CleanUpLock -- clean up after releasing a lock. We garbage-collect the
1538 * proclock and lock objects if possible, and call ProcLockWakeup if there
1539 * are remaining requests and the caller says it's OK. (Normally, this
1540 * should be called after UnGrantLock, and wakeupNeeded is the result from
1541 * UnGrantLock.)
1542 *
1543 * The appropriate partition lock must be held at entry, and will be
1544 * held at exit.
1545 */
1546 static void
CleanUpLock(LOCK * lock,PROCLOCK * proclock,LockMethod lockMethodTable,uint32 hashcode,bool wakeupNeeded)1547 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1548 LockMethod lockMethodTable, uint32 hashcode,
1549 bool wakeupNeeded)
1550 {
1551 /*
1552 * If this was my last hold on this lock, delete my entry in the proclock
1553 * table.
1554 */
1555 if (proclock->holdMask == 0)
1556 {
1557 uint32 proclock_hashcode;
1558
1559 PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
1560 SHMQueueDelete(&proclock->lockLink);
1561 SHMQueueDelete(&proclock->procLink);
1562 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1563 if (!hash_search_with_hash_value(LockMethodProcLockHash,
1564 (void *) &(proclock->tag),
1565 proclock_hashcode,
1566 HASH_REMOVE,
1567 NULL))
1568 elog(PANIC, "proclock table corrupted");
1569 }
1570
1571 if (lock->nRequested == 0)
1572 {
1573 /*
1574 * The caller just released the last lock, so garbage-collect the lock
1575 * object.
1576 */
1577 LOCK_PRINT("CleanUpLock: deleting", lock, 0);
1578 Assert(SHMQueueEmpty(&(lock->procLocks)));
1579 if (!hash_search_with_hash_value(LockMethodLockHash,
1580 (void *) &(lock->tag),
1581 hashcode,
1582 HASH_REMOVE,
1583 NULL))
1584 elog(PANIC, "lock table corrupted");
1585 }
1586 else if (wakeupNeeded)
1587 {
1588 /* There are waiters on this lock, so wake them up. */
1589 ProcLockWakeup(lockMethodTable, lock);
1590 }
1591 }
1592
1593 /*
1594 * GrantLockLocal -- update the locallock data structures to show
1595 * the lock request has been granted.
1596 *
1597 * We expect that LockAcquire made sure there is room to add a new
1598 * ResourceOwner entry.
1599 */
1600 static void
GrantLockLocal(LOCALLOCK * locallock,ResourceOwner owner)1601 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1602 {
1603 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1604 int i;
1605
1606 Assert(locallock->numLockOwners < locallock->maxLockOwners);
1607 /* Count the total */
1608 locallock->nLocks++;
1609 /* Count the per-owner lock */
1610 for (i = 0; i < locallock->numLockOwners; i++)
1611 {
1612 if (lockOwners[i].owner == owner)
1613 {
1614 lockOwners[i].nLocks++;
1615 return;
1616 }
1617 }
1618 lockOwners[i].owner = owner;
1619 lockOwners[i].nLocks = 1;
1620 locallock->numLockOwners++;
1621 if (owner != NULL)
1622 ResourceOwnerRememberLock(owner, locallock);
1623 }
1624
1625 /*
1626 * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
1627 * and arrange for error cleanup if it fails
1628 */
1629 static void
BeginStrongLockAcquire(LOCALLOCK * locallock,uint32 fasthashcode)1630 BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
1631 {
1632 Assert(StrongLockInProgress == NULL);
1633 Assert(locallock->holdsStrongLockCount == false);
1634
1635 /*
1636 * Adding to a memory location is not atomic, so we take a spinlock to
1637 * ensure we don't collide with someone else trying to bump the count at
1638 * the same time.
1639 *
1640 * XXX: It might be worth considering using an atomic fetch-and-add
1641 * instruction here, on architectures where that is supported.
1642 */
1643
1644 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1645 FastPathStrongRelationLocks->count[fasthashcode]++;
1646 locallock->holdsStrongLockCount = true;
1647 StrongLockInProgress = locallock;
1648 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1649 }
1650
1651 /*
1652 * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
1653 * acquisition once it's no longer needed
1654 */
1655 static void
FinishStrongLockAcquire(void)1656 FinishStrongLockAcquire(void)
1657 {
1658 StrongLockInProgress = NULL;
1659 }
1660
1661 /*
1662 * AbortStrongLockAcquire - undo strong lock state changes performed by
1663 * BeginStrongLockAcquire.
1664 */
1665 void
AbortStrongLockAcquire(void)1666 AbortStrongLockAcquire(void)
1667 {
1668 uint32 fasthashcode;
1669 LOCALLOCK *locallock = StrongLockInProgress;
1670
1671 if (locallock == NULL)
1672 return;
1673
1674 fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1675 Assert(locallock->holdsStrongLockCount == true);
1676 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1677 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1678 FastPathStrongRelationLocks->count[fasthashcode]--;
1679 locallock->holdsStrongLockCount = false;
1680 StrongLockInProgress = NULL;
1681 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1682 }
1683
1684 /*
1685 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1686 * WaitOnLock on.
1687 *
1688 * proc.c needs this for the case where we are booted off the lock by
1689 * timeout, but discover that someone granted us the lock anyway.
1690 *
1691 * We could just export GrantLockLocal, but that would require including
1692 * resowner.h in lock.h, which creates circularity.
1693 */
1694 void
GrantAwaitedLock(void)1695 GrantAwaitedLock(void)
1696 {
1697 GrantLockLocal(awaitedLock, awaitedOwner);
1698 }
1699
1700 /*
1701 * MarkLockClear -- mark an acquired lock as "clear"
1702 *
1703 * This means that we know we have absorbed all sinval messages that other
1704 * sessions generated before we acquired this lock, and so we can confidently
1705 * assume we know about any catalog changes protected by this lock.
1706 */
1707 void
MarkLockClear(LOCALLOCK * locallock)1708 MarkLockClear(LOCALLOCK *locallock)
1709 {
1710 Assert(locallock->nLocks > 0);
1711 locallock->lockCleared = true;
1712 }
1713
1714 /*
1715 * WaitOnLock -- wait to acquire a lock
1716 *
1717 * Caller must have set MyProc->heldLocks to reflect locks already held
1718 * on the lockable object by this process.
1719 *
1720 * The appropriate partition lock must be held at entry.
1721 */
1722 static void
WaitOnLock(LOCALLOCK * locallock,ResourceOwner owner)1723 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1724 {
1725 LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1726 LockMethod lockMethodTable = LockMethods[lockmethodid];
1727 char *volatile new_status = NULL;
1728
1729 LOCK_PRINT("WaitOnLock: sleeping on lock",
1730 locallock->lock, locallock->tag.mode);
1731
1732 /* Report change to waiting status */
1733 if (update_process_title)
1734 {
1735 const char *old_status;
1736 int len;
1737
1738 old_status = get_ps_display(&len);
1739 new_status = (char *) palloc(len + 8 + 1);
1740 memcpy(new_status, old_status, len);
1741 strcpy(new_status + len, " waiting");
1742 set_ps_display(new_status, false);
1743 new_status[len] = '\0'; /* truncate off " waiting" */
1744 }
1745
1746 awaitedLock = locallock;
1747 awaitedOwner = owner;
1748
1749 /*
1750 * NOTE: Think not to put any shared-state cleanup after the call to
1751 * ProcSleep, in either the normal or failure path. The lock state must
1752 * be fully set by the lock grantor, or by CheckDeadLock if we give up
1753 * waiting for the lock. This is necessary because of the possibility
1754 * that a cancel/die interrupt will interrupt ProcSleep after someone else
1755 * grants us the lock, but before we've noticed it. Hence, after granting,
1756 * the locktable state must fully reflect the fact that we own the lock;
1757 * we can't do additional work on return.
1758 *
1759 * We can and do use a PG_TRY block to try to clean up after failure, but
1760 * this still has a major limitation: elog(FATAL) can occur while waiting
1761 * (eg, a "die" interrupt), and then control won't come back here. So all
1762 * cleanup of essential state should happen in LockErrorCleanup, not here.
1763 * We can use PG_TRY to clear the "waiting" status flags, since doing that
1764 * is unimportant if the process exits.
1765 */
1766 PG_TRY();
1767 {
1768 if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1769 {
1770 /*
1771 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1772 * now.
1773 */
1774 awaitedLock = NULL;
1775 LOCK_PRINT("WaitOnLock: aborting on lock",
1776 locallock->lock, locallock->tag.mode);
1777 LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1778
1779 /*
1780 * Now that we aren't holding the partition lock, we can give an
1781 * error report including details about the detected deadlock.
1782 */
1783 DeadLockReport();
1784 /* not reached */
1785 }
1786 }
1787 PG_CATCH();
1788 {
1789 /* In this path, awaitedLock remains set until LockErrorCleanup */
1790
1791 /* Report change to non-waiting status */
1792 if (update_process_title)
1793 {
1794 set_ps_display(new_status, false);
1795 pfree(new_status);
1796 }
1797
1798 /* and propagate the error */
1799 PG_RE_THROW();
1800 }
1801 PG_END_TRY();
1802
1803 awaitedLock = NULL;
1804
1805 /* Report change to non-waiting status */
1806 if (update_process_title)
1807 {
1808 set_ps_display(new_status, false);
1809 pfree(new_status);
1810 }
1811
1812 LOCK_PRINT("WaitOnLock: wakeup on lock",
1813 locallock->lock, locallock->tag.mode);
1814 }
1815
1816 /*
1817 * Remove a proc from the wait-queue it is on (caller must know it is on one).
1818 * This is only used when the proc has failed to get the lock, so we set its
1819 * waitStatus to STATUS_ERROR.
1820 *
1821 * Appropriate partition lock must be held by caller. Also, caller is
1822 * responsible for signaling the proc if needed.
1823 *
1824 * NB: this does not clean up any locallock object that may exist for the lock.
1825 */
1826 void
RemoveFromWaitQueue(PGPROC * proc,uint32 hashcode)1827 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1828 {
1829 LOCK *waitLock = proc->waitLock;
1830 PROCLOCK *proclock = proc->waitProcLock;
1831 LOCKMODE lockmode = proc->waitLockMode;
1832 LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1833
1834 /* Make sure proc is waiting */
1835 Assert(proc->waitStatus == STATUS_WAITING);
1836 Assert(proc->links.next != NULL);
1837 Assert(waitLock);
1838 Assert(waitLock->waitProcs.size > 0);
1839 Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1840
1841 /* Remove proc from lock's wait queue */
1842 SHMQueueDelete(&(proc->links));
1843 waitLock->waitProcs.size--;
1844
1845 /* Undo increments of request counts by waiting process */
1846 Assert(waitLock->nRequested > 0);
1847 Assert(waitLock->nRequested > proc->waitLock->nGranted);
1848 waitLock->nRequested--;
1849 Assert(waitLock->requested[lockmode] > 0);
1850 waitLock->requested[lockmode]--;
1851 /* don't forget to clear waitMask bit if appropriate */
1852 if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1853 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1854
1855 /* Clean up the proc's own state, and pass it the ok/fail signal */
1856 proc->waitLock = NULL;
1857 proc->waitProcLock = NULL;
1858 proc->waitStatus = STATUS_ERROR;
1859
1860 /*
1861 * Delete the proclock immediately if it represents no already-held locks.
1862 * (This must happen now because if the owner of the lock decides to
1863 * release it, and the requested/granted counts then go to zero,
1864 * LockRelease expects there to be no remaining proclocks.) Then see if
1865 * any other waiters for the lock can be woken up now.
1866 */
1867 CleanUpLock(waitLock, proclock,
1868 LockMethods[lockmethodid], hashcode,
1869 true);
1870 }
1871
1872 /*
1873 * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
1874 * Release a session lock if 'sessionLock' is true, else release a
1875 * regular transaction lock.
1876 *
1877 * Side Effects: find any waiting processes that are now wakable,
1878 * grant them their requested locks and awaken them.
1879 * (We have to grant the lock here to avoid a race between
1880 * the waking process and any new process to
1881 * come along and request the lock.)
1882 */
1883 bool
LockRelease(const LOCKTAG * locktag,LOCKMODE lockmode,bool sessionLock)1884 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1885 {
1886 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
1887 LockMethod lockMethodTable;
1888 LOCALLOCKTAG localtag;
1889 LOCALLOCK *locallock;
1890 LOCK *lock;
1891 PROCLOCK *proclock;
1892 LWLock *partitionLock;
1893 bool wakeupNeeded;
1894
1895 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1896 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1897 lockMethodTable = LockMethods[lockmethodid];
1898 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
1899 elog(ERROR, "unrecognized lock mode: %d", lockmode);
1900
1901 #ifdef LOCK_DEBUG
1902 if (LOCK_DEBUG_ENABLED(locktag))
1903 elog(LOG, "LockRelease: lock [%u,%u] %s",
1904 locktag->locktag_field1, locktag->locktag_field2,
1905 lockMethodTable->lockModeNames[lockmode]);
1906 #endif
1907
1908 /*
1909 * Find the LOCALLOCK entry for this lock and lockmode
1910 */
1911 MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
1912 localtag.lock = *locktag;
1913 localtag.mode = lockmode;
1914
1915 locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1916 (void *) &localtag,
1917 HASH_FIND, NULL);
1918
1919 /*
1920 * let the caller print its own error message, too. Do not ereport(ERROR).
1921 */
1922 if (!locallock || locallock->nLocks <= 0)
1923 {
1924 elog(WARNING, "you don't own a lock of type %s",
1925 lockMethodTable->lockModeNames[lockmode]);
1926 return false;
1927 }
1928
1929 /*
1930 * Decrease the count for the resource owner.
1931 */
1932 {
1933 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1934 ResourceOwner owner;
1935 int i;
1936
1937 /* Identify owner for lock */
1938 if (sessionLock)
1939 owner = NULL;
1940 else
1941 owner = CurrentResourceOwner;
1942
1943 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1944 {
1945 if (lockOwners[i].owner == owner)
1946 {
1947 Assert(lockOwners[i].nLocks > 0);
1948 if (--lockOwners[i].nLocks == 0)
1949 {
1950 if (owner != NULL)
1951 ResourceOwnerForgetLock(owner, locallock);
1952 /* compact out unused slot */
1953 locallock->numLockOwners--;
1954 if (i < locallock->numLockOwners)
1955 lockOwners[i] = lockOwners[locallock->numLockOwners];
1956 }
1957 break;
1958 }
1959 }
1960 if (i < 0)
1961 {
1962 /* don't release a lock belonging to another owner */
1963 elog(WARNING, "you don't own a lock of type %s",
1964 lockMethodTable->lockModeNames[lockmode]);
1965 return false;
1966 }
1967 }
1968
1969 /*
1970 * Decrease the total local count. If we're still holding the lock, we're
1971 * done.
1972 */
1973 locallock->nLocks--;
1974
1975 if (locallock->nLocks > 0)
1976 return true;
1977
1978 /*
1979 * At this point we can no longer suppose we are clear of invalidation
1980 * messages related to this lock. Although we'll delete the LOCALLOCK
1981 * object before any intentional return from this routine, it seems worth
1982 * the trouble to explicitly reset lockCleared right now, just in case
1983 * some error prevents us from deleting the LOCALLOCK.
1984 */
1985 locallock->lockCleared = false;
1986
1987 /* Attempt fast release of any lock eligible for the fast path. */
1988 if (EligibleForRelationFastPath(locktag, lockmode) &&
1989 FastPathLocalUseCount > 0)
1990 {
1991 bool released;
1992
1993 /*
1994 * We might not find the lock here, even if we originally entered it
1995 * here. Another backend may have moved it to the main table.
1996 */
1997 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
1998 released = FastPathUnGrantRelationLock(locktag->locktag_field2,
1999 lockmode);
2000 LWLockRelease(&MyProc->backendLock);
2001 if (released)
2002 {
2003 RemoveLocalLock(locallock);
2004 return true;
2005 }
2006 }
2007
2008 /*
2009 * Otherwise we've got to mess with the shared lock table.
2010 */
2011 partitionLock = LockHashPartitionLock(locallock->hashcode);
2012
2013 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2014
2015 /*
2016 * Normally, we don't need to re-find the lock or proclock, since we kept
2017 * their addresses in the locallock table, and they couldn't have been
2018 * removed while we were holding a lock on them. But it's possible that
2019 * the lock was taken fast-path and has since been moved to the main hash
2020 * table by another backend, in which case we will need to look up the
2021 * objects here. We assume the lock field is NULL if so.
2022 */
2023 lock = locallock->lock;
2024 if (!lock)
2025 {
2026 PROCLOCKTAG proclocktag;
2027
2028 Assert(EligibleForRelationFastPath(locktag, lockmode));
2029 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2030 (const void *) locktag,
2031 locallock->hashcode,
2032 HASH_FIND,
2033 NULL);
2034 if (!lock)
2035 elog(ERROR, "failed to re-find shared lock object");
2036 locallock->lock = lock;
2037
2038 proclocktag.myLock = lock;
2039 proclocktag.myProc = MyProc;
2040 locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2041 (void *) &proclocktag,
2042 HASH_FIND,
2043 NULL);
2044 if (!locallock->proclock)
2045 elog(ERROR, "failed to re-find shared proclock object");
2046 }
2047 LOCK_PRINT("LockRelease: found", lock, lockmode);
2048 proclock = locallock->proclock;
2049 PROCLOCK_PRINT("LockRelease: found", proclock);
2050
2051 /*
2052 * Double-check that we are actually holding a lock of the type we want to
2053 * release.
2054 */
2055 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
2056 {
2057 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
2058 LWLockRelease(partitionLock);
2059 elog(WARNING, "you don't own a lock of type %s",
2060 lockMethodTable->lockModeNames[lockmode]);
2061 RemoveLocalLock(locallock);
2062 return false;
2063 }
2064
2065 /*
2066 * Do the releasing. CleanUpLock will waken any now-wakable waiters.
2067 */
2068 wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
2069
2070 CleanUpLock(lock, proclock,
2071 lockMethodTable, locallock->hashcode,
2072 wakeupNeeded);
2073
2074 LWLockRelease(partitionLock);
2075
2076 RemoveLocalLock(locallock);
2077 return true;
2078 }
2079
2080 /*
2081 * LockReleaseAll -- Release all locks of the specified lock method that
2082 * are held by the current process.
2083 *
2084 * Well, not necessarily *all* locks. The available behaviors are:
2085 * allLocks == true: release all locks including session locks.
2086 * allLocks == false: release all non-session locks.
2087 */
2088 void
LockReleaseAll(LOCKMETHODID lockmethodid,bool allLocks)2089 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
2090 {
2091 HASH_SEQ_STATUS status;
2092 LockMethod lockMethodTable;
2093 int i,
2094 numLockModes;
2095 LOCALLOCK *locallock;
2096 LOCK *lock;
2097 PROCLOCK *proclock;
2098 int partition;
2099 bool have_fast_path_lwlock = false;
2100
2101 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2102 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2103 lockMethodTable = LockMethods[lockmethodid];
2104
2105 #ifdef LOCK_DEBUG
2106 if (*(lockMethodTable->trace_flag))
2107 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
2108 #endif
2109
2110 /*
2111 * Get rid of our fast-path VXID lock, if appropriate. Note that this is
2112 * the only way that the lock we hold on our own VXID can ever get
2113 * released: it is always and only released when a toplevel transaction
2114 * ends.
2115 */
2116 if (lockmethodid == DEFAULT_LOCKMETHOD)
2117 VirtualXactLockTableCleanup();
2118
2119 numLockModes = lockMethodTable->numLockModes;
2120
2121 /*
2122 * First we run through the locallock table and get rid of unwanted
2123 * entries, then we scan the process's proclocks and get rid of those. We
2124 * do this separately because we may have multiple locallock entries
2125 * pointing to the same proclock, and we daren't end up with any dangling
2126 * pointers. Fast-path locks are cleaned up during the locallock table
2127 * scan, though.
2128 */
2129 hash_seq_init(&status, LockMethodLocalHash);
2130
2131 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2132 {
2133 /*
2134 * If the LOCALLOCK entry is unused, we must've run out of shared
2135 * memory while trying to set up this lock. Just forget the local
2136 * entry.
2137 */
2138 if (locallock->nLocks == 0)
2139 {
2140 RemoveLocalLock(locallock);
2141 continue;
2142 }
2143
2144 /* Ignore items that are not of the lockmethod to be removed */
2145 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2146 continue;
2147
2148 /*
2149 * If we are asked to release all locks, we can just zap the entry.
2150 * Otherwise, must scan to see if there are session locks. We assume
2151 * there is at most one lockOwners entry for session locks.
2152 */
2153 if (!allLocks)
2154 {
2155 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
2156
2157 /* If session lock is above array position 0, move it down to 0 */
2158 for (i = 0; i < locallock->numLockOwners; i++)
2159 {
2160 if (lockOwners[i].owner == NULL)
2161 lockOwners[0] = lockOwners[i];
2162 else
2163 ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
2164 }
2165
2166 if (locallock->numLockOwners > 0 &&
2167 lockOwners[0].owner == NULL &&
2168 lockOwners[0].nLocks > 0)
2169 {
2170 /* Fix the locallock to show just the session locks */
2171 locallock->nLocks = lockOwners[0].nLocks;
2172 locallock->numLockOwners = 1;
2173 /* We aren't deleting this locallock, so done */
2174 continue;
2175 }
2176 else
2177 locallock->numLockOwners = 0;
2178 }
2179
2180 /*
2181 * If the lock or proclock pointers are NULL, this lock was taken via
2182 * the relation fast-path (and is not known to have been transferred).
2183 */
2184 if (locallock->proclock == NULL || locallock->lock == NULL)
2185 {
2186 LOCKMODE lockmode = locallock->tag.mode;
2187 Oid relid;
2188
2189 /* Verify that a fast-path lock is what we've got. */
2190 if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
2191 elog(PANIC, "locallock table corrupted");
2192
2193 /*
2194 * If we don't currently hold the LWLock that protects our
2195 * fast-path data structures, we must acquire it before attempting
2196 * to release the lock via the fast-path. We will continue to
2197 * hold the LWLock until we're done scanning the locallock table,
2198 * unless we hit a transferred fast-path lock. (XXX is this
2199 * really such a good idea? There could be a lot of entries ...)
2200 */
2201 if (!have_fast_path_lwlock)
2202 {
2203 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2204 have_fast_path_lwlock = true;
2205 }
2206
2207 /* Attempt fast-path release. */
2208 relid = locallock->tag.lock.locktag_field2;
2209 if (FastPathUnGrantRelationLock(relid, lockmode))
2210 {
2211 RemoveLocalLock(locallock);
2212 continue;
2213 }
2214
2215 /*
2216 * Our lock, originally taken via the fast path, has been
2217 * transferred to the main lock table. That's going to require
2218 * some extra work, so release our fast-path lock before starting.
2219 */
2220 LWLockRelease(&MyProc->backendLock);
2221 have_fast_path_lwlock = false;
2222
2223 /*
2224 * Now dump the lock. We haven't got a pointer to the LOCK or
2225 * PROCLOCK in this case, so we have to handle this a bit
2226 * differently than a normal lock release. Unfortunately, this
2227 * requires an extra LWLock acquire-and-release cycle on the
2228 * partitionLock, but hopefully it shouldn't happen often.
2229 */
2230 LockRefindAndRelease(lockMethodTable, MyProc,
2231 &locallock->tag.lock, lockmode, false);
2232 RemoveLocalLock(locallock);
2233 continue;
2234 }
2235
2236 /* Mark the proclock to show we need to release this lockmode */
2237 if (locallock->nLocks > 0)
2238 locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
2239
2240 /* And remove the locallock hashtable entry */
2241 RemoveLocalLock(locallock);
2242 }
2243
2244 /* Done with the fast-path data structures */
2245 if (have_fast_path_lwlock)
2246 LWLockRelease(&MyProc->backendLock);
2247
2248 /*
2249 * Now, scan each lock partition separately.
2250 */
2251 for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
2252 {
2253 LWLock *partitionLock;
2254 SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
2255 PROCLOCK *nextplock;
2256
2257 partitionLock = LockHashPartitionLockByIndex(partition);
2258
2259 /*
2260 * If the proclock list for this partition is empty, we can skip
2261 * acquiring the partition lock. This optimization is trickier than
2262 * it looks, because another backend could be in process of adding
2263 * something to our proclock list due to promoting one of our
2264 * fast-path locks. However, any such lock must be one that we
2265 * decided not to delete above, so it's okay to skip it again now;
2266 * we'd just decide not to delete it again. We must, however, be
2267 * careful to re-fetch the list header once we've acquired the
2268 * partition lock, to be sure we have a valid, up-to-date pointer.
2269 * (There is probably no significant risk if pointer fetch/store is
2270 * atomic, but we don't wish to assume that.)
2271 *
2272 * XXX This argument assumes that the locallock table correctly
2273 * represents all of our fast-path locks. While allLocks mode
2274 * guarantees to clean up all of our normal locks regardless of the
2275 * locallock situation, we lose that guarantee for fast-path locks.
2276 * This is not ideal.
2277 */
2278 if (SHMQueueNext(procLocks, procLocks,
2279 offsetof(PROCLOCK, procLink)) == NULL)
2280 continue; /* needn't examine this partition */
2281
2282 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2283
2284 for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2285 offsetof(PROCLOCK, procLink));
2286 proclock;
2287 proclock = nextplock)
2288 {
2289 bool wakeupNeeded = false;
2290
2291 /* Get link first, since we may unlink/delete this proclock */
2292 nextplock = (PROCLOCK *)
2293 SHMQueueNext(procLocks, &proclock->procLink,
2294 offsetof(PROCLOCK, procLink));
2295
2296 Assert(proclock->tag.myProc == MyProc);
2297
2298 lock = proclock->tag.myLock;
2299
2300 /* Ignore items that are not of the lockmethod to be removed */
2301 if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
2302 continue;
2303
2304 /*
2305 * In allLocks mode, force release of all locks even if locallock
2306 * table had problems
2307 */
2308 if (allLocks)
2309 proclock->releaseMask = proclock->holdMask;
2310 else
2311 Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
2312
2313 /*
2314 * Ignore items that have nothing to be released, unless they have
2315 * holdMask == 0 and are therefore recyclable
2316 */
2317 if (proclock->releaseMask == 0 && proclock->holdMask != 0)
2318 continue;
2319
2320 PROCLOCK_PRINT("LockReleaseAll", proclock);
2321 LOCK_PRINT("LockReleaseAll", lock, 0);
2322 Assert(lock->nRequested >= 0);
2323 Assert(lock->nGranted >= 0);
2324 Assert(lock->nGranted <= lock->nRequested);
2325 Assert((proclock->holdMask & ~lock->grantMask) == 0);
2326
2327 /*
2328 * Release the previously-marked lock modes
2329 */
2330 for (i = 1; i <= numLockModes; i++)
2331 {
2332 if (proclock->releaseMask & LOCKBIT_ON(i))
2333 wakeupNeeded |= UnGrantLock(lock, i, proclock,
2334 lockMethodTable);
2335 }
2336 Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
2337 Assert(lock->nGranted <= lock->nRequested);
2338 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
2339
2340 proclock->releaseMask = 0;
2341
2342 /* CleanUpLock will wake up waiters if needed. */
2343 CleanUpLock(lock, proclock,
2344 lockMethodTable,
2345 LockTagHashCode(&lock->tag),
2346 wakeupNeeded);
2347 } /* loop over PROCLOCKs within this partition */
2348
2349 LWLockRelease(partitionLock);
2350 } /* loop over partitions */
2351
2352 #ifdef LOCK_DEBUG
2353 if (*(lockMethodTable->trace_flag))
2354 elog(LOG, "LockReleaseAll done");
2355 #endif
2356 }
2357
2358 /*
2359 * LockReleaseSession -- Release all session locks of the specified lock method
2360 * that are held by the current process.
2361 */
2362 void
LockReleaseSession(LOCKMETHODID lockmethodid)2363 LockReleaseSession(LOCKMETHODID lockmethodid)
2364 {
2365 HASH_SEQ_STATUS status;
2366 LOCALLOCK *locallock;
2367
2368 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2369 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2370
2371 hash_seq_init(&status, LockMethodLocalHash);
2372
2373 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2374 {
2375 /* Ignore items that are not of the specified lock method */
2376 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2377 continue;
2378
2379 ReleaseLockIfHeld(locallock, true);
2380 }
2381 }
2382
2383 /*
2384 * LockReleaseCurrentOwner
2385 * Release all locks belonging to CurrentResourceOwner
2386 *
2387 * If the caller knows what those locks are, it can pass them as an array.
2388 * That speeds up the call significantly, when a lot of locks are held.
2389 * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
2390 * table to find them.
2391 */
2392 void
LockReleaseCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2393 LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2394 {
2395 if (locallocks == NULL)
2396 {
2397 HASH_SEQ_STATUS status;
2398 LOCALLOCK *locallock;
2399
2400 hash_seq_init(&status, LockMethodLocalHash);
2401
2402 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2403 ReleaseLockIfHeld(locallock, false);
2404 }
2405 else
2406 {
2407 int i;
2408
2409 for (i = nlocks - 1; i >= 0; i--)
2410 ReleaseLockIfHeld(locallocks[i], false);
2411 }
2412 }
2413
2414 /*
2415 * ReleaseLockIfHeld
2416 * Release any session-level locks on this lockable object if sessionLock
2417 * is true; else, release any locks held by CurrentResourceOwner.
2418 *
2419 * It is tempting to pass this a ResourceOwner pointer (or NULL for session
2420 * locks), but without refactoring LockRelease() we cannot support releasing
2421 * locks belonging to resource owners other than CurrentResourceOwner.
2422 * If we were to refactor, it'd be a good idea to fix it so we don't have to
2423 * do a hashtable lookup of the locallock, too. However, currently this
2424 * function isn't used heavily enough to justify refactoring for its
2425 * convenience.
2426 */
2427 static void
ReleaseLockIfHeld(LOCALLOCK * locallock,bool sessionLock)2428 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
2429 {
2430 ResourceOwner owner;
2431 LOCALLOCKOWNER *lockOwners;
2432 int i;
2433
2434 /* Identify owner for lock (must match LockRelease!) */
2435 if (sessionLock)
2436 owner = NULL;
2437 else
2438 owner = CurrentResourceOwner;
2439
2440 /* Scan to see if there are any locks belonging to the target owner */
2441 lockOwners = locallock->lockOwners;
2442 for (i = locallock->numLockOwners - 1; i >= 0; i--)
2443 {
2444 if (lockOwners[i].owner == owner)
2445 {
2446 Assert(lockOwners[i].nLocks > 0);
2447 if (lockOwners[i].nLocks < locallock->nLocks)
2448 {
2449 /*
2450 * We will still hold this lock after forgetting this
2451 * ResourceOwner.
2452 */
2453 locallock->nLocks -= lockOwners[i].nLocks;
2454 /* compact out unused slot */
2455 locallock->numLockOwners--;
2456 if (owner != NULL)
2457 ResourceOwnerForgetLock(owner, locallock);
2458 if (i < locallock->numLockOwners)
2459 lockOwners[i] = lockOwners[locallock->numLockOwners];
2460 }
2461 else
2462 {
2463 Assert(lockOwners[i].nLocks == locallock->nLocks);
2464 /* We want to call LockRelease just once */
2465 lockOwners[i].nLocks = 1;
2466 locallock->nLocks = 1;
2467 if (!LockRelease(&locallock->tag.lock,
2468 locallock->tag.mode,
2469 sessionLock))
2470 elog(WARNING, "ReleaseLockIfHeld: failed??");
2471 }
2472 break;
2473 }
2474 }
2475 }
2476
2477 /*
2478 * LockReassignCurrentOwner
2479 * Reassign all locks belonging to CurrentResourceOwner to belong
2480 * to its parent resource owner.
2481 *
2482 * If the caller knows what those locks are, it can pass them as an array.
2483 * That speeds up the call significantly, when a lot of locks are held
2484 * (e.g pg_dump with a large schema). Otherwise, pass NULL for locallocks,
2485 * and we'll traverse through our hash table to find them.
2486 */
2487 void
LockReassignCurrentOwner(LOCALLOCK ** locallocks,int nlocks)2488 LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2489 {
2490 ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
2491
2492 Assert(parent != NULL);
2493
2494 if (locallocks == NULL)
2495 {
2496 HASH_SEQ_STATUS status;
2497 LOCALLOCK *locallock;
2498
2499 hash_seq_init(&status, LockMethodLocalHash);
2500
2501 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2502 LockReassignOwner(locallock, parent);
2503 }
2504 else
2505 {
2506 int i;
2507
2508 for (i = nlocks - 1; i >= 0; i--)
2509 LockReassignOwner(locallocks[i], parent);
2510 }
2511 }
2512
2513 /*
2514 * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
2515 * CurrentResourceOwner to its parent.
2516 */
2517 static void
LockReassignOwner(LOCALLOCK * locallock,ResourceOwner parent)2518 LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
2519 {
2520 LOCALLOCKOWNER *lockOwners;
2521 int i;
2522 int ic = -1;
2523 int ip = -1;
2524
2525 /*
2526 * Scan to see if there are any locks belonging to current owner or its
2527 * parent
2528 */
2529 lockOwners = locallock->lockOwners;
2530 for (i = locallock->numLockOwners - 1; i >= 0; i--)
2531 {
2532 if (lockOwners[i].owner == CurrentResourceOwner)
2533 ic = i;
2534 else if (lockOwners[i].owner == parent)
2535 ip = i;
2536 }
2537
2538 if (ic < 0)
2539 return; /* no current locks */
2540
2541 if (ip < 0)
2542 {
2543 /* Parent has no slot, so just give it the child's slot */
2544 lockOwners[ic].owner = parent;
2545 ResourceOwnerRememberLock(parent, locallock);
2546 }
2547 else
2548 {
2549 /* Merge child's count with parent's */
2550 lockOwners[ip].nLocks += lockOwners[ic].nLocks;
2551 /* compact out unused slot */
2552 locallock->numLockOwners--;
2553 if (ic < locallock->numLockOwners)
2554 lockOwners[ic] = lockOwners[locallock->numLockOwners];
2555 }
2556 ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
2557 }
2558
2559 /*
2560 * FastPathGrantRelationLock
2561 * Grant lock using per-backend fast-path array, if there is space.
2562 */
2563 static bool
FastPathGrantRelationLock(Oid relid,LOCKMODE lockmode)2564 FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2565 {
2566 uint32 f;
2567 uint32 unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
2568
2569 /* Scan for existing entry for this relid, remembering empty slot. */
2570 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2571 {
2572 if (FAST_PATH_GET_BITS(MyProc, f) == 0)
2573 unused_slot = f;
2574 else if (MyProc->fpRelId[f] == relid)
2575 {
2576 Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
2577 FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
2578 return true;
2579 }
2580 }
2581
2582 /* If no existing entry, use any empty slot. */
2583 if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
2584 {
2585 MyProc->fpRelId[unused_slot] = relid;
2586 FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
2587 ++FastPathLocalUseCount;
2588 return true;
2589 }
2590
2591 /* No existing entry, and no empty slot. */
2592 return false;
2593 }
2594
2595 /*
2596 * FastPathUnGrantRelationLock
2597 * Release fast-path lock, if present. Update backend-private local
2598 * use count, while we're at it.
2599 */
2600 static bool
FastPathUnGrantRelationLock(Oid relid,LOCKMODE lockmode)2601 FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2602 {
2603 uint32 f;
2604 bool result = false;
2605
2606 FastPathLocalUseCount = 0;
2607 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2608 {
2609 if (MyProc->fpRelId[f] == relid
2610 && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2611 {
2612 Assert(!result);
2613 FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2614 result = true;
2615 /* we continue iterating so as to update FastPathLocalUseCount */
2616 }
2617 if (FAST_PATH_GET_BITS(MyProc, f) != 0)
2618 ++FastPathLocalUseCount;
2619 }
2620 return result;
2621 }
2622
2623 /*
2624 * FastPathTransferRelationLocks
2625 * Transfer locks matching the given lock tag from per-backend fast-path
2626 * arrays to the shared hash table.
2627 *
2628 * Returns true if successful, false if ran out of shared memory.
2629 */
2630 static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable,const LOCKTAG * locktag,uint32 hashcode)2631 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2632 uint32 hashcode)
2633 {
2634 LWLock *partitionLock = LockHashPartitionLock(hashcode);
2635 Oid relid = locktag->locktag_field2;
2636 uint32 i;
2637
2638 /*
2639 * Every PGPROC that can potentially hold a fast-path lock is present in
2640 * ProcGlobal->allProcs. Prepared transactions are not, but any
2641 * outstanding fast-path locks held by prepared transactions are
2642 * transferred to the main lock table.
2643 */
2644 for (i = 0; i < ProcGlobal->allProcCount; i++)
2645 {
2646 PGPROC *proc = &ProcGlobal->allProcs[i];
2647 uint32 f;
2648
2649 LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
2650
2651 /*
2652 * If the target backend isn't referencing the same database as the
2653 * lock, then we needn't examine the individual relation IDs at all;
2654 * none of them can be relevant.
2655 *
2656 * proc->databaseId is set at backend startup time and never changes
2657 * thereafter, so it might be safe to perform this test before
2658 * acquiring &proc->backendLock. In particular, it's certainly safe
2659 * to assume that if the target backend holds any fast-path locks, it
2660 * must have performed a memory-fencing operation (in particular, an
2661 * LWLock acquisition) since setting proc->databaseId. However, it's
2662 * less clear that our backend is certain to have performed a memory
2663 * fencing operation since the other backend set proc->databaseId. So
2664 * for now, we test it after acquiring the LWLock just to be safe.
2665 */
2666 if (proc->databaseId != locktag->locktag_field1)
2667 {
2668 LWLockRelease(&proc->backendLock);
2669 continue;
2670 }
2671
2672 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2673 {
2674 uint32 lockmode;
2675
2676 /* Look for an allocated slot matching the given relid. */
2677 if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
2678 continue;
2679
2680 /* Find or create lock object. */
2681 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2682 for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
2683 lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
2684 ++lockmode)
2685 {
2686 PROCLOCK *proclock;
2687
2688 if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
2689 continue;
2690 proclock = SetupLockInTable(lockMethodTable, proc, locktag,
2691 hashcode, lockmode);
2692 if (!proclock)
2693 {
2694 LWLockRelease(partitionLock);
2695 LWLockRelease(&proc->backendLock);
2696 return false;
2697 }
2698 GrantLock(proclock->tag.myLock, proclock, lockmode);
2699 FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
2700 }
2701 LWLockRelease(partitionLock);
2702
2703 /* No need to examine remaining slots. */
2704 break;
2705 }
2706 LWLockRelease(&proc->backendLock);
2707 }
2708 return true;
2709 }
2710
2711 /*
2712 * FastPathGetLockEntry
2713 * Return the PROCLOCK for a lock originally taken via the fast-path,
2714 * transferring it to the primary lock table if necessary.
2715 *
2716 * Note: caller takes care of updating the locallock object.
2717 */
2718 static PROCLOCK *
FastPathGetRelationLockEntry(LOCALLOCK * locallock)2719 FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2720 {
2721 LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
2722 LOCKTAG *locktag = &locallock->tag.lock;
2723 PROCLOCK *proclock = NULL;
2724 LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode);
2725 Oid relid = locktag->locktag_field2;
2726 uint32 f;
2727
2728 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2729
2730 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2731 {
2732 uint32 lockmode;
2733
2734 /* Look for an allocated slot matching the given relid. */
2735 if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
2736 continue;
2737
2738 /* If we don't have a lock of the given mode, forget it! */
2739 lockmode = locallock->tag.mode;
2740 if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2741 break;
2742
2743 /* Find or create lock object. */
2744 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2745
2746 proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
2747 locallock->hashcode, lockmode);
2748 if (!proclock)
2749 {
2750 LWLockRelease(partitionLock);
2751 LWLockRelease(&MyProc->backendLock);
2752 ereport(ERROR,
2753 (errcode(ERRCODE_OUT_OF_MEMORY),
2754 errmsg("out of shared memory"),
2755 errhint("You might need to increase max_locks_per_transaction.")));
2756 }
2757 GrantLock(proclock->tag.myLock, proclock, lockmode);
2758 FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2759
2760 LWLockRelease(partitionLock);
2761
2762 /* No need to examine remaining slots. */
2763 break;
2764 }
2765
2766 LWLockRelease(&MyProc->backendLock);
2767
2768 /* Lock may have already been transferred by some other backend. */
2769 if (proclock == NULL)
2770 {
2771 LOCK *lock;
2772 PROCLOCKTAG proclocktag;
2773 uint32 proclock_hashcode;
2774
2775 LWLockAcquire(partitionLock, LW_SHARED);
2776
2777 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2778 (void *) locktag,
2779 locallock->hashcode,
2780 HASH_FIND,
2781 NULL);
2782 if (!lock)
2783 elog(ERROR, "failed to re-find shared lock object");
2784
2785 proclocktag.myLock = lock;
2786 proclocktag.myProc = MyProc;
2787
2788 proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
2789 proclock = (PROCLOCK *)
2790 hash_search_with_hash_value(LockMethodProcLockHash,
2791 (void *) &proclocktag,
2792 proclock_hashcode,
2793 HASH_FIND,
2794 NULL);
2795 if (!proclock)
2796 elog(ERROR, "failed to re-find shared proclock object");
2797 LWLockRelease(partitionLock);
2798 }
2799
2800 return proclock;
2801 }
2802
2803 /*
2804 * GetLockConflicts
2805 * Get an array of VirtualTransactionIds of xacts currently holding locks
2806 * that would conflict with the specified lock/lockmode.
2807 * xacts merely awaiting such a lock are NOT reported.
2808 *
2809 * The result array is palloc'd and is terminated with an invalid VXID.
2810 * *countp, if not null, is updated to the number of items set.
2811 *
2812 * Of course, the result could be out of date by the time it's returned, so
2813 * use of this function has to be thought about carefully. Similarly, a
2814 * PGPROC with no "lxid" will be considered non-conflicting regardless of any
2815 * lock it holds. Existing callers don't care about a locker after that
2816 * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after
2817 * pg_xact updates and before releasing locks.
2818 *
2819 * Note we never include the current xact's vxid in the result array,
2820 * since an xact never blocks itself.
2821 */
2822 VirtualTransactionId *
GetLockConflicts(const LOCKTAG * locktag,LOCKMODE lockmode,int * countp)2823 GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
2824 {
2825 static VirtualTransactionId *vxids;
2826 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
2827 LockMethod lockMethodTable;
2828 LOCK *lock;
2829 LOCKMASK conflictMask;
2830 SHM_QUEUE *procLocks;
2831 PROCLOCK *proclock;
2832 uint32 hashcode;
2833 LWLock *partitionLock;
2834 int count = 0;
2835 int fast_count = 0;
2836
2837 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2838 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2839 lockMethodTable = LockMethods[lockmethodid];
2840 if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
2841 elog(ERROR, "unrecognized lock mode: %d", lockmode);
2842
2843 /*
2844 * Allocate memory to store results, and fill with InvalidVXID. We only
2845 * need enough space for MaxBackends + max_prepared_xacts + a terminator.
2846 * InHotStandby allocate once in TopMemoryContext.
2847 */
2848 if (InHotStandby)
2849 {
2850 if (vxids == NULL)
2851 vxids = (VirtualTransactionId *)
2852 MemoryContextAlloc(TopMemoryContext,
2853 sizeof(VirtualTransactionId) *
2854 (MaxBackends + max_prepared_xacts + 1));
2855 }
2856 else
2857 vxids = (VirtualTransactionId *)
2858 palloc0(sizeof(VirtualTransactionId) *
2859 (MaxBackends + max_prepared_xacts + 1));
2860
2861 /* Compute hash code and partition lock, and look up conflicting modes. */
2862 hashcode = LockTagHashCode(locktag);
2863 partitionLock = LockHashPartitionLock(hashcode);
2864 conflictMask = lockMethodTable->conflictTab[lockmode];
2865
2866 /*
2867 * Fast path locks might not have been entered in the primary lock table.
2868 * If the lock we're dealing with could conflict with such a lock, we must
2869 * examine each backend's fast-path array for conflicts.
2870 */
2871 if (ConflictsWithRelationFastPath(locktag, lockmode))
2872 {
2873 int i;
2874 Oid relid = locktag->locktag_field2;
2875 VirtualTransactionId vxid;
2876
2877 /*
2878 * Iterate over relevant PGPROCs. Anything held by a prepared
2879 * transaction will have been transferred to the primary lock table,
2880 * so we need not worry about those. This is all a bit fuzzy, because
2881 * new locks could be taken after we've visited a particular
2882 * partition, but the callers had better be prepared to deal with that
2883 * anyway, since the locks could equally well be taken between the
2884 * time we return the value and the time the caller does something
2885 * with it.
2886 */
2887 for (i = 0; i < ProcGlobal->allProcCount; i++)
2888 {
2889 PGPROC *proc = &ProcGlobal->allProcs[i];
2890 uint32 f;
2891
2892 /* A backend never blocks itself */
2893 if (proc == MyProc)
2894 continue;
2895
2896 LWLockAcquire(&proc->backendLock, LW_SHARED);
2897
2898 /*
2899 * If the target backend isn't referencing the same database as
2900 * the lock, then we needn't examine the individual relation IDs
2901 * at all; none of them can be relevant.
2902 *
2903 * See FastPathTransferLocks() for discussion of why we do this
2904 * test after acquiring the lock.
2905 */
2906 if (proc->databaseId != locktag->locktag_field1)
2907 {
2908 LWLockRelease(&proc->backendLock);
2909 continue;
2910 }
2911
2912 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2913 {
2914 uint32 lockmask;
2915
2916 /* Look for an allocated slot matching the given relid. */
2917 if (relid != proc->fpRelId[f])
2918 continue;
2919 lockmask = FAST_PATH_GET_BITS(proc, f);
2920 if (!lockmask)
2921 continue;
2922 lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
2923
2924 /*
2925 * There can only be one entry per relation, so if we found it
2926 * and it doesn't conflict, we can skip the rest of the slots.
2927 */
2928 if ((lockmask & conflictMask) == 0)
2929 break;
2930
2931 /* Conflict! */
2932 GET_VXID_FROM_PGPROC(vxid, *proc);
2933
2934 if (VirtualTransactionIdIsValid(vxid))
2935 vxids[count++] = vxid;
2936 /* else, xact already committed or aborted */
2937
2938 /* No need to examine remaining slots. */
2939 break;
2940 }
2941
2942 LWLockRelease(&proc->backendLock);
2943 }
2944 }
2945
2946 /* Remember how many fast-path conflicts we found. */
2947 fast_count = count;
2948
2949 /*
2950 * Look up the lock object matching the tag.
2951 */
2952 LWLockAcquire(partitionLock, LW_SHARED);
2953
2954 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2955 (const void *) locktag,
2956 hashcode,
2957 HASH_FIND,
2958 NULL);
2959 if (!lock)
2960 {
2961 /*
2962 * If the lock object doesn't exist, there is nothing holding a lock
2963 * on this lockable object.
2964 */
2965 LWLockRelease(partitionLock);
2966 vxids[count].backendId = InvalidBackendId;
2967 vxids[count].localTransactionId = InvalidLocalTransactionId;
2968 if (countp)
2969 *countp = count;
2970 return vxids;
2971 }
2972
2973 /*
2974 * Examine each existing holder (or awaiter) of the lock.
2975 */
2976
2977 procLocks = &(lock->procLocks);
2978
2979 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2980 offsetof(PROCLOCK, lockLink));
2981
2982 while (proclock)
2983 {
2984 if (conflictMask & proclock->holdMask)
2985 {
2986 PGPROC *proc = proclock->tag.myProc;
2987
2988 /* A backend never blocks itself */
2989 if (proc != MyProc)
2990 {
2991 VirtualTransactionId vxid;
2992
2993 GET_VXID_FROM_PGPROC(vxid, *proc);
2994
2995 if (VirtualTransactionIdIsValid(vxid))
2996 {
2997 int i;
2998
2999 /* Avoid duplicate entries. */
3000 for (i = 0; i < fast_count; ++i)
3001 if (VirtualTransactionIdEquals(vxids[i], vxid))
3002 break;
3003 if (i >= fast_count)
3004 vxids[count++] = vxid;
3005 }
3006 /* else, xact already committed or aborted */
3007 }
3008 }
3009
3010 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3011 offsetof(PROCLOCK, lockLink));
3012 }
3013
3014 LWLockRelease(partitionLock);
3015
3016 if (count > MaxBackends + max_prepared_xacts) /* should never happen */
3017 elog(PANIC, "too many conflicting locks found");
3018
3019 vxids[count].backendId = InvalidBackendId;
3020 vxids[count].localTransactionId = InvalidLocalTransactionId;
3021 if (countp)
3022 *countp = count;
3023 return vxids;
3024 }
3025
3026 /*
3027 * Find a lock in the shared lock table and release it. It is the caller's
3028 * responsibility to verify that this is a sane thing to do. (For example, it
3029 * would be bad to release a lock here if there might still be a LOCALLOCK
3030 * object with pointers to it.)
3031 *
3032 * We currently use this in two situations: first, to release locks held by
3033 * prepared transactions on commit (see lock_twophase_postcommit); and second,
3034 * to release locks taken via the fast-path, transferred to the main hash
3035 * table, and then released (see LockReleaseAll).
3036 */
3037 static void
LockRefindAndRelease(LockMethod lockMethodTable,PGPROC * proc,LOCKTAG * locktag,LOCKMODE lockmode,bool decrement_strong_lock_count)3038 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
3039 LOCKTAG *locktag, LOCKMODE lockmode,
3040 bool decrement_strong_lock_count)
3041 {
3042 LOCK *lock;
3043 PROCLOCK *proclock;
3044 PROCLOCKTAG proclocktag;
3045 uint32 hashcode;
3046 uint32 proclock_hashcode;
3047 LWLock *partitionLock;
3048 bool wakeupNeeded;
3049
3050 hashcode = LockTagHashCode(locktag);
3051 partitionLock = LockHashPartitionLock(hashcode);
3052
3053 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3054
3055 /*
3056 * Re-find the lock object (it had better be there).
3057 */
3058 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
3059 (void *) locktag,
3060 hashcode,
3061 HASH_FIND,
3062 NULL);
3063 if (!lock)
3064 elog(PANIC, "failed to re-find shared lock object");
3065
3066 /*
3067 * Re-find the proclock object (ditto).
3068 */
3069 proclocktag.myLock = lock;
3070 proclocktag.myProc = proc;
3071
3072 proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3073
3074 proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
3075 (void *) &proclocktag,
3076 proclock_hashcode,
3077 HASH_FIND,
3078 NULL);
3079 if (!proclock)
3080 elog(PANIC, "failed to re-find shared proclock object");
3081
3082 /*
3083 * Double-check that we are actually holding a lock of the type we want to
3084 * release.
3085 */
3086 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
3087 {
3088 PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
3089 LWLockRelease(partitionLock);
3090 elog(WARNING, "you don't own a lock of type %s",
3091 lockMethodTable->lockModeNames[lockmode]);
3092 return;
3093 }
3094
3095 /*
3096 * Do the releasing. CleanUpLock will waken any now-wakable waiters.
3097 */
3098 wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
3099
3100 CleanUpLock(lock, proclock,
3101 lockMethodTable, hashcode,
3102 wakeupNeeded);
3103
3104 LWLockRelease(partitionLock);
3105
3106 /*
3107 * Decrement strong lock count. This logic is needed only for 2PC.
3108 */
3109 if (decrement_strong_lock_count
3110 && ConflictsWithRelationFastPath(locktag, lockmode))
3111 {
3112 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
3113
3114 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
3115 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
3116 FastPathStrongRelationLocks->count[fasthashcode]--;
3117 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3118 }
3119 }
3120
3121 /*
3122 * CheckForSessionAndXactLocks
3123 * Check to see if transaction holds both session-level and xact-level
3124 * locks on the same object; if so, throw an error.
3125 *
3126 * If we have both session- and transaction-level locks on the same object,
3127 * PREPARE TRANSACTION must fail. This should never happen with regular
3128 * locks, since we only take those at session level in some special operations
3129 * like VACUUM. It's possible to hit this with advisory locks, though.
3130 *
3131 * It would be nice if we could keep the session hold and give away the
3132 * transactional hold to the prepared xact. However, that would require two
3133 * PROCLOCK objects, and we cannot be sure that another PROCLOCK will be
3134 * available when it comes time for PostPrepare_Locks to do the deed.
3135 * So for now, we error out while we can still do so safely.
3136 *
3137 * Since the LOCALLOCK table stores a separate entry for each lockmode,
3138 * we can't implement this check by examining LOCALLOCK entries in isolation.
3139 * We must build a transient hashtable that is indexed by locktag only.
3140 */
3141 static void
CheckForSessionAndXactLocks(void)3142 CheckForSessionAndXactLocks(void)
3143 {
3144 typedef struct
3145 {
3146 LOCKTAG lock; /* identifies the lockable object */
3147 bool sessLock; /* is any lockmode held at session level? */
3148 bool xactLock; /* is any lockmode held at xact level? */
3149 } PerLockTagEntry;
3150
3151 HASHCTL hash_ctl;
3152 HTAB *lockhtab;
3153 HASH_SEQ_STATUS status;
3154 LOCALLOCK *locallock;
3155
3156 /* Create a local hash table keyed by LOCKTAG only */
3157 hash_ctl.keysize = sizeof(LOCKTAG);
3158 hash_ctl.entrysize = sizeof(PerLockTagEntry);
3159 hash_ctl.hcxt = CurrentMemoryContext;
3160
3161 lockhtab = hash_create("CheckForSessionAndXactLocks table",
3162 256, /* arbitrary initial size */
3163 &hash_ctl,
3164 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3165
3166 /* Scan local lock table to find entries for each LOCKTAG */
3167 hash_seq_init(&status, LockMethodLocalHash);
3168
3169 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3170 {
3171 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3172 PerLockTagEntry *hentry;
3173 bool found;
3174 int i;
3175
3176 /*
3177 * Ignore VXID locks. We don't want those to be held by prepared
3178 * transactions, since they aren't meaningful after a restart.
3179 */
3180 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3181 continue;
3182
3183 /* Ignore it if we don't actually hold the lock */
3184 if (locallock->nLocks <= 0)
3185 continue;
3186
3187 /* Otherwise, find or make an entry in lockhtab */
3188 hentry = (PerLockTagEntry *) hash_search(lockhtab,
3189 (void *) &locallock->tag.lock,
3190 HASH_ENTER, &found);
3191 if (!found) /* initialize, if newly created */
3192 hentry->sessLock = hentry->xactLock = false;
3193
3194 /* Scan to see if we hold lock at session or xact level or both */
3195 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3196 {
3197 if (lockOwners[i].owner == NULL)
3198 hentry->sessLock = true;
3199 else
3200 hentry->xactLock = true;
3201 }
3202
3203 /*
3204 * We can throw error immediately when we see both types of locks; no
3205 * need to wait around to see if there are more violations.
3206 */
3207 if (hentry->sessLock && hentry->xactLock)
3208 ereport(ERROR,
3209 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3210 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3211 }
3212
3213 /* Success, so clean up */
3214 hash_destroy(lockhtab);
3215 }
3216
3217 /*
3218 * AtPrepare_Locks
3219 * Do the preparatory work for a PREPARE: make 2PC state file records
3220 * for all locks currently held.
3221 *
3222 * Session-level locks are ignored, as are VXID locks.
3223 *
3224 * For the most part, we don't need to touch shared memory for this ---
3225 * all the necessary state information is in the locallock table.
3226 * Fast-path locks are an exception, however: we move any such locks to
3227 * the main table before allowing PREPARE TRANSACTION to succeed.
3228 */
3229 void
AtPrepare_Locks(void)3230 AtPrepare_Locks(void)
3231 {
3232 HASH_SEQ_STATUS status;
3233 LOCALLOCK *locallock;
3234
3235 /* First, verify there aren't locks of both xact and session level */
3236 CheckForSessionAndXactLocks();
3237
3238 /* Now do the per-locallock cleanup work */
3239 hash_seq_init(&status, LockMethodLocalHash);
3240
3241 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3242 {
3243 TwoPhaseLockRecord record;
3244 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3245 bool haveSessionLock;
3246 bool haveXactLock;
3247 int i;
3248
3249 /*
3250 * Ignore VXID locks. We don't want those to be held by prepared
3251 * transactions, since they aren't meaningful after a restart.
3252 */
3253 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3254 continue;
3255
3256 /* Ignore it if we don't actually hold the lock */
3257 if (locallock->nLocks <= 0)
3258 continue;
3259
3260 /* Scan to see whether we hold it at session or transaction level */
3261 haveSessionLock = haveXactLock = false;
3262 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3263 {
3264 if (lockOwners[i].owner == NULL)
3265 haveSessionLock = true;
3266 else
3267 haveXactLock = true;
3268 }
3269
3270 /* Ignore it if we have only session lock */
3271 if (!haveXactLock)
3272 continue;
3273
3274 /* This can't happen, because we already checked it */
3275 if (haveSessionLock)
3276 ereport(ERROR,
3277 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3278 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3279
3280 /*
3281 * If the local lock was taken via the fast-path, we need to move it
3282 * to the primary lock table, or just get a pointer to the existing
3283 * primary lock table entry if by chance it's already been
3284 * transferred.
3285 */
3286 if (locallock->proclock == NULL)
3287 {
3288 locallock->proclock = FastPathGetRelationLockEntry(locallock);
3289 locallock->lock = locallock->proclock->tag.myLock;
3290 }
3291
3292 /*
3293 * Arrange to not release any strong lock count held by this lock
3294 * entry. We must retain the count until the prepared transaction is
3295 * committed or rolled back.
3296 */
3297 locallock->holdsStrongLockCount = false;
3298
3299 /*
3300 * Create a 2PC record.
3301 */
3302 memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
3303 record.lockmode = locallock->tag.mode;
3304
3305 RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
3306 &record, sizeof(TwoPhaseLockRecord));
3307 }
3308 }
3309
3310 /*
3311 * PostPrepare_Locks
3312 * Clean up after successful PREPARE
3313 *
3314 * Here, we want to transfer ownership of our locks to a dummy PGPROC
3315 * that's now associated with the prepared transaction, and we want to
3316 * clean out the corresponding entries in the LOCALLOCK table.
3317 *
3318 * Note: by removing the LOCALLOCK entries, we are leaving dangling
3319 * pointers in the transaction's resource owner. This is OK at the
3320 * moment since resowner.c doesn't try to free locks retail at a toplevel
3321 * transaction commit or abort. We could alternatively zero out nLocks
3322 * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
3323 * but that probably costs more cycles.
3324 */
3325 void
PostPrepare_Locks(TransactionId xid)3326 PostPrepare_Locks(TransactionId xid)
3327 {
3328 PGPROC *newproc = TwoPhaseGetDummyProc(xid, false);
3329 HASH_SEQ_STATUS status;
3330 LOCALLOCK *locallock;
3331 LOCK *lock;
3332 PROCLOCK *proclock;
3333 PROCLOCKTAG proclocktag;
3334 int partition;
3335
3336 /* Can't prepare a lock group follower. */
3337 Assert(MyProc->lockGroupLeader == NULL ||
3338 MyProc->lockGroupLeader == MyProc);
3339
3340 /* This is a critical section: any error means big trouble */
3341 START_CRIT_SECTION();
3342
3343 /*
3344 * First we run through the locallock table and get rid of unwanted
3345 * entries, then we scan the process's proclocks and transfer them to the
3346 * target proc.
3347 *
3348 * We do this separately because we may have multiple locallock entries
3349 * pointing to the same proclock, and we daren't end up with any dangling
3350 * pointers.
3351 */
3352 hash_seq_init(&status, LockMethodLocalHash);
3353
3354 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3355 {
3356 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3357 bool haveSessionLock;
3358 bool haveXactLock;
3359 int i;
3360
3361 if (locallock->proclock == NULL || locallock->lock == NULL)
3362 {
3363 /*
3364 * We must've run out of shared memory while trying to set up this
3365 * lock. Just forget the local entry.
3366 */
3367 Assert(locallock->nLocks == 0);
3368 RemoveLocalLock(locallock);
3369 continue;
3370 }
3371
3372 /* Ignore VXID locks */
3373 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3374 continue;
3375
3376 /* Scan to see whether we hold it at session or transaction level */
3377 haveSessionLock = haveXactLock = false;
3378 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3379 {
3380 if (lockOwners[i].owner == NULL)
3381 haveSessionLock = true;
3382 else
3383 haveXactLock = true;
3384 }
3385
3386 /* Ignore it if we have only session lock */
3387 if (!haveXactLock)
3388 continue;
3389
3390 /* This can't happen, because we already checked it */
3391 if (haveSessionLock)
3392 ereport(PANIC,
3393 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3394 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3395
3396 /* Mark the proclock to show we need to release this lockmode */
3397 if (locallock->nLocks > 0)
3398 locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
3399
3400 /* And remove the locallock hashtable entry */
3401 RemoveLocalLock(locallock);
3402 }
3403
3404 /*
3405 * Now, scan each lock partition separately.
3406 */
3407 for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
3408 {
3409 LWLock *partitionLock;
3410 SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
3411 PROCLOCK *nextplock;
3412
3413 partitionLock = LockHashPartitionLockByIndex(partition);
3414
3415 /*
3416 * If the proclock list for this partition is empty, we can skip
3417 * acquiring the partition lock. This optimization is safer than the
3418 * situation in LockReleaseAll, because we got rid of any fast-path
3419 * locks during AtPrepare_Locks, so there cannot be any case where
3420 * another backend is adding something to our lists now. For safety,
3421 * though, we code this the same way as in LockReleaseAll.
3422 */
3423 if (SHMQueueNext(procLocks, procLocks,
3424 offsetof(PROCLOCK, procLink)) == NULL)
3425 continue; /* needn't examine this partition */
3426
3427 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3428
3429 for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3430 offsetof(PROCLOCK, procLink));
3431 proclock;
3432 proclock = nextplock)
3433 {
3434 /* Get link first, since we may unlink/relink this proclock */
3435 nextplock = (PROCLOCK *)
3436 SHMQueueNext(procLocks, &proclock->procLink,
3437 offsetof(PROCLOCK, procLink));
3438
3439 Assert(proclock->tag.myProc == MyProc);
3440
3441 lock = proclock->tag.myLock;
3442
3443 /* Ignore VXID locks */
3444 if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3445 continue;
3446
3447 PROCLOCK_PRINT("PostPrepare_Locks", proclock);
3448 LOCK_PRINT("PostPrepare_Locks", lock, 0);
3449 Assert(lock->nRequested >= 0);
3450 Assert(lock->nGranted >= 0);
3451 Assert(lock->nGranted <= lock->nRequested);
3452 Assert((proclock->holdMask & ~lock->grantMask) == 0);
3453
3454 /* Ignore it if nothing to release (must be a session lock) */
3455 if (proclock->releaseMask == 0)
3456 continue;
3457
3458 /* Else we should be releasing all locks */
3459 if (proclock->releaseMask != proclock->holdMask)
3460 elog(PANIC, "we seem to have dropped a bit somewhere");
3461
3462 /*
3463 * We cannot simply modify proclock->tag.myProc to reassign
3464 * ownership of the lock, because that's part of the hash key and
3465 * the proclock would then be in the wrong hash chain. Instead
3466 * use hash_update_hash_key. (We used to create a new hash entry,
3467 * but that risks out-of-memory failure if other processes are
3468 * busy making proclocks too.) We must unlink the proclock from
3469 * our procLink chain and put it into the new proc's chain, too.
3470 *
3471 * Note: the updated proclock hash key will still belong to the
3472 * same hash partition, cf proclock_hash(). So the partition lock
3473 * we already hold is sufficient for this.
3474 */
3475 SHMQueueDelete(&proclock->procLink);
3476
3477 /*
3478 * Create the new hash key for the proclock.
3479 */
3480 proclocktag.myLock = lock;
3481 proclocktag.myProc = newproc;
3482
3483 /*
3484 * Update groupLeader pointer to point to the new proc. (We'd
3485 * better not be a member of somebody else's lock group!)
3486 */
3487 Assert(proclock->groupLeader == proclock->tag.myProc);
3488 proclock->groupLeader = newproc;
3489
3490 /*
3491 * Update the proclock. We should not find any existing entry for
3492 * the same hash key, since there can be only one entry for any
3493 * given lock with my own proc.
3494 */
3495 if (!hash_update_hash_key(LockMethodProcLockHash,
3496 (void *) proclock,
3497 (void *) &proclocktag))
3498 elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3499
3500 /* Re-link into the new proc's proclock list */
3501 SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
3502 &proclock->procLink);
3503
3504 PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
3505 } /* loop over PROCLOCKs within this partition */
3506
3507 LWLockRelease(partitionLock);
3508 } /* loop over partitions */
3509
3510 END_CRIT_SECTION();
3511 }
3512
3513
3514 /*
3515 * Estimate shared-memory space used for lock tables
3516 */
3517 Size
LockShmemSize(void)3518 LockShmemSize(void)
3519 {
3520 Size size = 0;
3521 long max_table_size;
3522
3523 /* lock hash table */
3524 max_table_size = NLOCKENTS();
3525 size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
3526
3527 /* proclock hash table */
3528 max_table_size *= 2;
3529 size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
3530
3531 /*
3532 * Since NLOCKENTS is only an estimate, add 10% safety margin.
3533 */
3534 size = add_size(size, size / 10);
3535
3536 return size;
3537 }
3538
3539 /*
3540 * GetLockStatusData - Return a summary of the lock manager's internal
3541 * status, for use in a user-level reporting function.
3542 *
3543 * The return data consists of an array of LockInstanceData objects,
3544 * which are a lightly abstracted version of the PROCLOCK data structures,
3545 * i.e. there is one entry for each unique lock and interested PGPROC.
3546 * It is the caller's responsibility to match up related items (such as
3547 * references to the same lockable object or PGPROC) if wanted.
3548 *
3549 * The design goal is to hold the LWLocks for as short a time as possible;
3550 * thus, this function simply makes a copy of the necessary data and releases
3551 * the locks, allowing the caller to contemplate and format the data for as
3552 * long as it pleases.
3553 */
3554 LockData *
GetLockStatusData(void)3555 GetLockStatusData(void)
3556 {
3557 LockData *data;
3558 PROCLOCK *proclock;
3559 HASH_SEQ_STATUS seqstat;
3560 int els;
3561 int el;
3562 int i;
3563
3564 data = (LockData *) palloc(sizeof(LockData));
3565
3566 /* Guess how much space we'll need. */
3567 els = MaxBackends;
3568 el = 0;
3569 data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
3570
3571 /*
3572 * First, we iterate through the per-backend fast-path arrays, locking
3573 * them one at a time. This might produce an inconsistent picture of the
3574 * system state, but taking all of those LWLocks at the same time seems
3575 * impractical (in particular, note MAX_SIMUL_LWLOCKS). It shouldn't
3576 * matter too much, because none of these locks can be involved in lock
3577 * conflicts anyway - anything that might must be present in the main lock
3578 * table. (For the same reason, we don't sweat about making leaderPid
3579 * completely valid. We cannot safely dereference another backend's
3580 * lockGroupLeader field without holding all lock partition locks, and
3581 * it's not worth that.)
3582 */
3583 for (i = 0; i < ProcGlobal->allProcCount; ++i)
3584 {
3585 PGPROC *proc = &ProcGlobal->allProcs[i];
3586 uint32 f;
3587
3588 LWLockAcquire(&proc->backendLock, LW_SHARED);
3589
3590 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
3591 {
3592 LockInstanceData *instance;
3593 uint32 lockbits = FAST_PATH_GET_BITS(proc, f);
3594
3595 /* Skip unallocated slots. */
3596 if (!lockbits)
3597 continue;
3598
3599 if (el >= els)
3600 {
3601 els += MaxBackends;
3602 data->locks = (LockInstanceData *)
3603 repalloc(data->locks, sizeof(LockInstanceData) * els);
3604 }
3605
3606 instance = &data->locks[el];
3607 SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
3608 proc->fpRelId[f]);
3609 instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
3610 instance->waitLockMode = NoLock;
3611 instance->backend = proc->backendId;
3612 instance->lxid = proc->lxid;
3613 instance->pid = proc->pid;
3614 instance->leaderPid = proc->pid;
3615 instance->fastpath = true;
3616
3617 el++;
3618 }
3619
3620 if (proc->fpVXIDLock)
3621 {
3622 VirtualTransactionId vxid;
3623 LockInstanceData *instance;
3624
3625 if (el >= els)
3626 {
3627 els += MaxBackends;
3628 data->locks = (LockInstanceData *)
3629 repalloc(data->locks, sizeof(LockInstanceData) * els);
3630 }
3631
3632 vxid.backendId = proc->backendId;
3633 vxid.localTransactionId = proc->fpLocalTransactionId;
3634
3635 instance = &data->locks[el];
3636 SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
3637 instance->holdMask = LOCKBIT_ON(ExclusiveLock);
3638 instance->waitLockMode = NoLock;
3639 instance->backend = proc->backendId;
3640 instance->lxid = proc->lxid;
3641 instance->pid = proc->pid;
3642 instance->leaderPid = proc->pid;
3643 instance->fastpath = true;
3644
3645 el++;
3646 }
3647
3648 LWLockRelease(&proc->backendLock);
3649 }
3650
3651 /*
3652 * Next, acquire lock on the entire shared lock data structure. We do
3653 * this so that, at least for locks in the primary lock table, the state
3654 * will be self-consistent.
3655 *
3656 * Since this is a read-only operation, we take shared instead of
3657 * exclusive lock. There's not a whole lot of point to this, because all
3658 * the normal operations require exclusive lock, but it doesn't hurt
3659 * anything either. It will at least allow two backends to do
3660 * GetLockStatusData in parallel.
3661 *
3662 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3663 */
3664 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3665 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3666
3667 /* Now we can safely count the number of proclocks */
3668 data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
3669 if (data->nelements > els)
3670 {
3671 els = data->nelements;
3672 data->locks = (LockInstanceData *)
3673 repalloc(data->locks, sizeof(LockInstanceData) * els);
3674 }
3675
3676 /* Now scan the tables to copy the data */
3677 hash_seq_init(&seqstat, LockMethodProcLockHash);
3678
3679 while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3680 {
3681 PGPROC *proc = proclock->tag.myProc;
3682 LOCK *lock = proclock->tag.myLock;
3683 LockInstanceData *instance = &data->locks[el];
3684
3685 memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3686 instance->holdMask = proclock->holdMask;
3687 if (proc->waitLock == proclock->tag.myLock)
3688 instance->waitLockMode = proc->waitLockMode;
3689 else
3690 instance->waitLockMode = NoLock;
3691 instance->backend = proc->backendId;
3692 instance->lxid = proc->lxid;
3693 instance->pid = proc->pid;
3694 instance->leaderPid = proclock->groupLeader->pid;
3695 instance->fastpath = false;
3696
3697 el++;
3698 }
3699
3700 /*
3701 * And release locks. We do this in reverse order for two reasons: (1)
3702 * Anyone else who needs more than one of the locks will be trying to lock
3703 * them in increasing order; we don't want to release the other process
3704 * until it can get all the locks it needs. (2) This avoids O(N^2)
3705 * behavior inside LWLockRelease.
3706 */
3707 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3708 LWLockRelease(LockHashPartitionLockByIndex(i));
3709
3710 Assert(el == data->nelements);
3711
3712 return data;
3713 }
3714
3715 /*
3716 * GetBlockerStatusData - Return a summary of the lock manager's state
3717 * concerning locks that are blocking the specified PID or any member of
3718 * the PID's lock group, for use in a user-level reporting function.
3719 *
3720 * For each PID within the lock group that is awaiting some heavyweight lock,
3721 * the return data includes an array of LockInstanceData objects, which are
3722 * the same data structure used by GetLockStatusData; but unlike that function,
3723 * this one reports only the PROCLOCKs associated with the lock that that PID
3724 * is blocked on. (Hence, all the locktags should be the same for any one
3725 * blocked PID.) In addition, we return an array of the PIDs of those backends
3726 * that are ahead of the blocked PID in the lock's wait queue. These can be
3727 * compared with the PIDs in the LockInstanceData objects to determine which
3728 * waiters are ahead of or behind the blocked PID in the queue.
3729 *
3730 * If blocked_pid isn't a valid backend PID or nothing in its lock group is
3731 * waiting on any heavyweight lock, return empty arrays.
3732 *
3733 * The design goal is to hold the LWLocks for as short a time as possible;
3734 * thus, this function simply makes a copy of the necessary data and releases
3735 * the locks, allowing the caller to contemplate and format the data for as
3736 * long as it pleases.
3737 */
3738 BlockedProcsData *
GetBlockerStatusData(int blocked_pid)3739 GetBlockerStatusData(int blocked_pid)
3740 {
3741 BlockedProcsData *data;
3742 PGPROC *proc;
3743 int i;
3744
3745 data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData));
3746
3747 /*
3748 * Guess how much space we'll need, and preallocate. Most of the time
3749 * this will avoid needing to do repalloc while holding the LWLocks. (We
3750 * assume, but check with an Assert, that MaxBackends is enough entries
3751 * for the procs[] array; the other two could need enlargement, though.)
3752 */
3753 data->nprocs = data->nlocks = data->npids = 0;
3754 data->maxprocs = data->maxlocks = data->maxpids = MaxBackends;
3755 data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs);
3756 data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks);
3757 data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids);
3758
3759 /*
3760 * In order to search the ProcArray for blocked_pid and assume that that
3761 * entry won't immediately disappear under us, we must hold ProcArrayLock.
3762 * In addition, to examine the lock grouping fields of any other backend,
3763 * we must hold all the hash partition locks. (Only one of those locks is
3764 * actually relevant for any one lock group, but we can't know which one
3765 * ahead of time.) It's fairly annoying to hold all those locks
3766 * throughout this, but it's no worse than GetLockStatusData(), and it
3767 * does have the advantage that we're guaranteed to return a
3768 * self-consistent instantaneous state.
3769 */
3770 LWLockAcquire(ProcArrayLock, LW_SHARED);
3771
3772 proc = BackendPidGetProcWithLock(blocked_pid);
3773
3774 /* Nothing to do if it's gone */
3775 if (proc != NULL)
3776 {
3777 /*
3778 * Acquire lock on the entire shared lock data structure. See notes
3779 * in GetLockStatusData().
3780 */
3781 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3782 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3783
3784 if (proc->lockGroupLeader == NULL)
3785 {
3786 /* Easy case, proc is not a lock group member */
3787 GetSingleProcBlockerStatusData(proc, data);
3788 }
3789 else
3790 {
3791 /* Examine all procs in proc's lock group */
3792 dlist_iter iter;
3793
3794 dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers)
3795 {
3796 PGPROC *memberProc;
3797
3798 memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);
3799 GetSingleProcBlockerStatusData(memberProc, data);
3800 }
3801 }
3802
3803 /*
3804 * And release locks. See notes in GetLockStatusData().
3805 */
3806 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3807 LWLockRelease(LockHashPartitionLockByIndex(i));
3808
3809 Assert(data->nprocs <= data->maxprocs);
3810 }
3811
3812 LWLockRelease(ProcArrayLock);
3813
3814 return data;
3815 }
3816
3817 /* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */
3818 static void
GetSingleProcBlockerStatusData(PGPROC * blocked_proc,BlockedProcsData * data)3819 GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
3820 {
3821 LOCK *theLock = blocked_proc->waitLock;
3822 BlockedProcData *bproc;
3823 SHM_QUEUE *procLocks;
3824 PROCLOCK *proclock;
3825 PROC_QUEUE *waitQueue;
3826 PGPROC *proc;
3827 int queue_size;
3828 int i;
3829
3830 /* Nothing to do if this proc is not blocked */
3831 if (theLock == NULL)
3832 return;
3833
3834 /* Set up a procs[] element */
3835 bproc = &data->procs[data->nprocs++];
3836 bproc->pid = blocked_proc->pid;
3837 bproc->first_lock = data->nlocks;
3838 bproc->first_waiter = data->npids;
3839
3840 /*
3841 * We may ignore the proc's fast-path arrays, since nothing in those could
3842 * be related to a contended lock.
3843 */
3844
3845 /* Collect all PROCLOCKs associated with theLock */
3846 procLocks = &(theLock->procLocks);
3847 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3848 offsetof(PROCLOCK, lockLink));
3849 while (proclock)
3850 {
3851 PGPROC *proc = proclock->tag.myProc;
3852 LOCK *lock = proclock->tag.myLock;
3853 LockInstanceData *instance;
3854
3855 if (data->nlocks >= data->maxlocks)
3856 {
3857 data->maxlocks += MaxBackends;
3858 data->locks = (LockInstanceData *)
3859 repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks);
3860 }
3861
3862 instance = &data->locks[data->nlocks];
3863 memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3864 instance->holdMask = proclock->holdMask;
3865 if (proc->waitLock == lock)
3866 instance->waitLockMode = proc->waitLockMode;
3867 else
3868 instance->waitLockMode = NoLock;
3869 instance->backend = proc->backendId;
3870 instance->lxid = proc->lxid;
3871 instance->pid = proc->pid;
3872 instance->leaderPid = proclock->groupLeader->pid;
3873 instance->fastpath = false;
3874 data->nlocks++;
3875
3876 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3877 offsetof(PROCLOCK, lockLink));
3878 }
3879
3880 /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
3881 waitQueue = &(theLock->waitProcs);
3882 queue_size = waitQueue->size;
3883
3884 if (queue_size > data->maxpids - data->npids)
3885 {
3886 data->maxpids = Max(data->maxpids + MaxBackends,
3887 data->npids + queue_size);
3888 data->waiter_pids = (int *) repalloc(data->waiter_pids,
3889 sizeof(int) * data->maxpids);
3890 }
3891
3892 /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
3893 proc = (PGPROC *) waitQueue->links.next;
3894 for (i = 0; i < queue_size; i++)
3895 {
3896 if (proc == blocked_proc)
3897 break;
3898 data->waiter_pids[data->npids++] = proc->pid;
3899 proc = (PGPROC *) proc->links.next;
3900 }
3901
3902 bproc->num_locks = data->nlocks - bproc->first_lock;
3903 bproc->num_waiters = data->npids - bproc->first_waiter;
3904 }
3905
3906 /*
3907 * Returns a list of currently held AccessExclusiveLocks, for use by
3908 * LogStandbySnapshot(). The result is a palloc'd array,
3909 * with the number of elements returned into *nlocks.
3910 *
3911 * XXX This currently takes a lock on all partitions of the lock table,
3912 * but it's possible to do better. By reference counting locks and storing
3913 * the value in the ProcArray entry for each backend we could tell if any
3914 * locks need recording without having to acquire the partition locks and
3915 * scan the lock table. Whether that's worth the additional overhead
3916 * is pretty dubious though.
3917 */
3918 xl_standby_lock *
GetRunningTransactionLocks(int * nlocks)3919 GetRunningTransactionLocks(int *nlocks)
3920 {
3921 xl_standby_lock *accessExclusiveLocks;
3922 PROCLOCK *proclock;
3923 HASH_SEQ_STATUS seqstat;
3924 int i;
3925 int index;
3926 int els;
3927
3928 /*
3929 * Acquire lock on the entire shared lock data structure.
3930 *
3931 * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3932 */
3933 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3934 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3935
3936 /* Now we can safely count the number of proclocks */
3937 els = hash_get_num_entries(LockMethodProcLockHash);
3938
3939 /*
3940 * Allocating enough space for all locks in the lock table is overkill,
3941 * but it's more convenient and faster than having to enlarge the array.
3942 */
3943 accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
3944
3945 /* Now scan the tables to copy the data */
3946 hash_seq_init(&seqstat, LockMethodProcLockHash);
3947
3948 /*
3949 * If lock is a currently granted AccessExclusiveLock then it will have
3950 * just one proclock holder, so locks are never accessed twice in this
3951 * particular case. Don't copy this code for use elsewhere because in the
3952 * general case this will give you duplicate locks when looking at
3953 * non-exclusive lock types.
3954 */
3955 index = 0;
3956 while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3957 {
3958 /* make sure this definition matches the one used in LockAcquire */
3959 if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
3960 proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
3961 {
3962 PGPROC *proc = proclock->tag.myProc;
3963 PGXACT *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
3964 LOCK *lock = proclock->tag.myLock;
3965 TransactionId xid = pgxact->xid;
3966
3967 /*
3968 * Don't record locks for transactions if we know they have
3969 * already issued their WAL record for commit but not yet released
3970 * lock. It is still possible that we see locks held by already
3971 * complete transactions, if they haven't yet zeroed their xids.
3972 */
3973 if (!TransactionIdIsValid(xid))
3974 continue;
3975
3976 accessExclusiveLocks[index].xid = xid;
3977 accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3978 accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
3979
3980 index++;
3981 }
3982 }
3983
3984 Assert(index <= els);
3985
3986 /*
3987 * And release locks. We do this in reverse order for two reasons: (1)
3988 * Anyone else who needs more than one of the locks will be trying to lock
3989 * them in increasing order; we don't want to release the other process
3990 * until it can get all the locks it needs. (2) This avoids O(N^2)
3991 * behavior inside LWLockRelease.
3992 */
3993 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3994 LWLockRelease(LockHashPartitionLockByIndex(i));
3995
3996 *nlocks = index;
3997 return accessExclusiveLocks;
3998 }
3999
4000 /* Provide the textual name of any lock mode */
4001 const char *
GetLockmodeName(LOCKMETHODID lockmethodid,LOCKMODE mode)4002 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
4003 {
4004 Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
4005 Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
4006 return LockMethods[lockmethodid]->lockModeNames[mode];
4007 }
4008
4009 #ifdef LOCK_DEBUG
4010 /*
4011 * Dump all locks in the given proc's myProcLocks lists.
4012 *
4013 * Caller is responsible for having acquired appropriate LWLocks.
4014 */
4015 void
DumpLocks(PGPROC * proc)4016 DumpLocks(PGPROC *proc)
4017 {
4018 SHM_QUEUE *procLocks;
4019 PROCLOCK *proclock;
4020 LOCK *lock;
4021 int i;
4022
4023 if (proc == NULL)
4024 return;
4025
4026 if (proc->waitLock)
4027 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
4028
4029 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
4030 {
4031 procLocks = &(proc->myProcLocks[i]);
4032
4033 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
4034 offsetof(PROCLOCK, procLink));
4035
4036 while (proclock)
4037 {
4038 Assert(proclock->tag.myProc == proc);
4039
4040 lock = proclock->tag.myLock;
4041
4042 PROCLOCK_PRINT("DumpLocks", proclock);
4043 LOCK_PRINT("DumpLocks", lock, 0);
4044
4045 proclock = (PROCLOCK *)
4046 SHMQueueNext(procLocks, &proclock->procLink,
4047 offsetof(PROCLOCK, procLink));
4048 }
4049 }
4050 }
4051
4052 /*
4053 * Dump all lmgr locks.
4054 *
4055 * Caller is responsible for having acquired appropriate LWLocks.
4056 */
4057 void
DumpAllLocks(void)4058 DumpAllLocks(void)
4059 {
4060 PGPROC *proc;
4061 PROCLOCK *proclock;
4062 LOCK *lock;
4063 HASH_SEQ_STATUS status;
4064
4065 proc = MyProc;
4066
4067 if (proc && proc->waitLock)
4068 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
4069
4070 hash_seq_init(&status, LockMethodProcLockHash);
4071
4072 while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
4073 {
4074 PROCLOCK_PRINT("DumpAllLocks", proclock);
4075
4076 lock = proclock->tag.myLock;
4077 if (lock)
4078 LOCK_PRINT("DumpAllLocks", lock, 0);
4079 else
4080 elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
4081 }
4082 }
4083 #endif /* LOCK_DEBUG */
4084
4085 /*
4086 * LOCK 2PC resource manager's routines
4087 */
4088
4089 /*
4090 * Re-acquire a lock belonging to a transaction that was prepared.
4091 *
4092 * Because this function is run at db startup, re-acquiring the locks should
4093 * never conflict with running transactions because there are none. We
4094 * assume that the lock state represented by the stored 2PC files is legal.
4095 *
4096 * When switching from Hot Standby mode to normal operation, the locks will
4097 * be already held by the startup process. The locks are acquired for the new
4098 * procs without checking for conflicts, so we don't get a conflict between the
4099 * startup process and the dummy procs, even though we will momentarily have
4100 * a situation where two procs are holding the same AccessExclusiveLock,
4101 * which isn't normally possible because the conflict. If we're in standby
4102 * mode, but a recovery snapshot hasn't been established yet, it's possible
4103 * that some but not all of the locks are already held by the startup process.
4104 *
4105 * This approach is simple, but also a bit dangerous, because if there isn't
4106 * enough shared memory to acquire the locks, an error will be thrown, which
4107 * is promoted to FATAL and recovery will abort, bringing down postmaster.
4108 * A safer approach would be to transfer the locks like we do in
4109 * AtPrepare_Locks, but then again, in hot standby mode it's possible for
4110 * read-only backends to use up all the shared lock memory anyway, so that
4111 * replaying the WAL record that needs to acquire a lock will throw an error
4112 * and PANIC anyway.
4113 */
4114 void
lock_twophase_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4115 lock_twophase_recover(TransactionId xid, uint16 info,
4116 void *recdata, uint32 len)
4117 {
4118 TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4119 PGPROC *proc = TwoPhaseGetDummyProc(xid, false);
4120 LOCKTAG *locktag;
4121 LOCKMODE lockmode;
4122 LOCKMETHODID lockmethodid;
4123 LOCK *lock;
4124 PROCLOCK *proclock;
4125 PROCLOCKTAG proclocktag;
4126 bool found;
4127 uint32 hashcode;
4128 uint32 proclock_hashcode;
4129 int partition;
4130 LWLock *partitionLock;
4131 LockMethod lockMethodTable;
4132
4133 Assert(len == sizeof(TwoPhaseLockRecord));
4134 locktag = &rec->locktag;
4135 lockmode = rec->lockmode;
4136 lockmethodid = locktag->locktag_lockmethodid;
4137
4138 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4139 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4140 lockMethodTable = LockMethods[lockmethodid];
4141
4142 hashcode = LockTagHashCode(locktag);
4143 partition = LockHashPartition(hashcode);
4144 partitionLock = LockHashPartitionLock(hashcode);
4145
4146 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4147
4148 /*
4149 * Find or create a lock with this tag.
4150 */
4151 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4152 (void *) locktag,
4153 hashcode,
4154 HASH_ENTER_NULL,
4155 &found);
4156 if (!lock)
4157 {
4158 LWLockRelease(partitionLock);
4159 ereport(ERROR,
4160 (errcode(ERRCODE_OUT_OF_MEMORY),
4161 errmsg("out of shared memory"),
4162 errhint("You might need to increase max_locks_per_transaction.")));
4163 }
4164
4165 /*
4166 * if it's a new lock object, initialize it
4167 */
4168 if (!found)
4169 {
4170 lock->grantMask = 0;
4171 lock->waitMask = 0;
4172 SHMQueueInit(&(lock->procLocks));
4173 ProcQueueInit(&(lock->waitProcs));
4174 lock->nRequested = 0;
4175 lock->nGranted = 0;
4176 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
4177 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
4178 LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
4179 }
4180 else
4181 {
4182 LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
4183 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
4184 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
4185 Assert(lock->nGranted <= lock->nRequested);
4186 }
4187
4188 /*
4189 * Create the hash key for the proclock table.
4190 */
4191 proclocktag.myLock = lock;
4192 proclocktag.myProc = proc;
4193
4194 proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
4195
4196 /*
4197 * Find or create a proclock entry with this tag
4198 */
4199 proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
4200 (void *) &proclocktag,
4201 proclock_hashcode,
4202 HASH_ENTER_NULL,
4203 &found);
4204 if (!proclock)
4205 {
4206 /* Oops, not enough shmem for the proclock */
4207 if (lock->nRequested == 0)
4208 {
4209 /*
4210 * There are no other requestors of this lock, so garbage-collect
4211 * the lock object. We *must* do this to avoid a permanent leak
4212 * of shared memory, because there won't be anything to cause
4213 * anyone to release the lock object later.
4214 */
4215 Assert(SHMQueueEmpty(&(lock->procLocks)));
4216 if (!hash_search_with_hash_value(LockMethodLockHash,
4217 (void *) &(lock->tag),
4218 hashcode,
4219 HASH_REMOVE,
4220 NULL))
4221 elog(PANIC, "lock table corrupted");
4222 }
4223 LWLockRelease(partitionLock);
4224 ereport(ERROR,
4225 (errcode(ERRCODE_OUT_OF_MEMORY),
4226 errmsg("out of shared memory"),
4227 errhint("You might need to increase max_locks_per_transaction.")));
4228 }
4229
4230 /*
4231 * If new, initialize the new entry
4232 */
4233 if (!found)
4234 {
4235 Assert(proc->lockGroupLeader == NULL);
4236 proclock->groupLeader = proc;
4237 proclock->holdMask = 0;
4238 proclock->releaseMask = 0;
4239 /* Add proclock to appropriate lists */
4240 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
4241 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
4242 &proclock->procLink);
4243 PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
4244 }
4245 else
4246 {
4247 PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
4248 Assert((proclock->holdMask & ~lock->grantMask) == 0);
4249 }
4250
4251 /*
4252 * lock->nRequested and lock->requested[] count the total number of
4253 * requests, whether granted or waiting, so increment those immediately.
4254 */
4255 lock->nRequested++;
4256 lock->requested[lockmode]++;
4257 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
4258
4259 /*
4260 * We shouldn't already hold the desired lock.
4261 */
4262 if (proclock->holdMask & LOCKBIT_ON(lockmode))
4263 elog(ERROR, "lock %s on object %u/%u/%u is already held",
4264 lockMethodTable->lockModeNames[lockmode],
4265 lock->tag.locktag_field1, lock->tag.locktag_field2,
4266 lock->tag.locktag_field3);
4267
4268 /*
4269 * We ignore any possible conflicts and just grant ourselves the lock. Not
4270 * only because we don't bother, but also to avoid deadlocks when
4271 * switching from standby to normal mode. See function comment.
4272 */
4273 GrantLock(lock, proclock, lockmode);
4274
4275 /*
4276 * Bump strong lock count, to make sure any fast-path lock requests won't
4277 * be granted without consulting the primary lock table.
4278 */
4279 if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
4280 {
4281 uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
4282
4283 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
4284 FastPathStrongRelationLocks->count[fasthashcode]++;
4285 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
4286 }
4287
4288 LWLockRelease(partitionLock);
4289 }
4290
4291 /*
4292 * Re-acquire a lock belonging to a transaction that was prepared, when
4293 * starting up into hot standby mode.
4294 */
4295 void
lock_twophase_standby_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4296 lock_twophase_standby_recover(TransactionId xid, uint16 info,
4297 void *recdata, uint32 len)
4298 {
4299 TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4300 LOCKTAG *locktag;
4301 LOCKMODE lockmode;
4302 LOCKMETHODID lockmethodid;
4303
4304 Assert(len == sizeof(TwoPhaseLockRecord));
4305 locktag = &rec->locktag;
4306 lockmode = rec->lockmode;
4307 lockmethodid = locktag->locktag_lockmethodid;
4308
4309 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4310 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4311
4312 if (lockmode == AccessExclusiveLock &&
4313 locktag->locktag_type == LOCKTAG_RELATION)
4314 {
4315 StandbyAcquireAccessExclusiveLock(xid,
4316 locktag->locktag_field1 /* dboid */ ,
4317 locktag->locktag_field2 /* reloid */ );
4318 }
4319 }
4320
4321
4322 /*
4323 * 2PC processing routine for COMMIT PREPARED case.
4324 *
4325 * Find and release the lock indicated by the 2PC record.
4326 */
4327 void
lock_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)4328 lock_twophase_postcommit(TransactionId xid, uint16 info,
4329 void *recdata, uint32 len)
4330 {
4331 TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4332 PGPROC *proc = TwoPhaseGetDummyProc(xid, true);
4333 LOCKTAG *locktag;
4334 LOCKMETHODID lockmethodid;
4335 LockMethod lockMethodTable;
4336
4337 Assert(len == sizeof(TwoPhaseLockRecord));
4338 locktag = &rec->locktag;
4339 lockmethodid = locktag->locktag_lockmethodid;
4340
4341 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4342 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4343 lockMethodTable = LockMethods[lockmethodid];
4344
4345 LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
4346 }
4347
4348 /*
4349 * 2PC processing routine for ROLLBACK PREPARED case.
4350 *
4351 * This is actually just the same as the COMMIT case.
4352 */
4353 void
lock_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)4354 lock_twophase_postabort(TransactionId xid, uint16 info,
4355 void *recdata, uint32 len)
4356 {
4357 lock_twophase_postcommit(xid, info, recdata, len);
4358 }
4359
4360 /*
4361 * VirtualXactLockTableInsert
4362 *
4363 * Take vxid lock via the fast-path. There can't be any pre-existing
4364 * lockers, as we haven't advertised this vxid via the ProcArray yet.
4365 *
4366 * Since MyProc->fpLocalTransactionId will normally contain the same data
4367 * as MyProc->lxid, you might wonder if we really need both. The
4368 * difference is that MyProc->lxid is set and cleared unlocked, and
4369 * examined by procarray.c, while fpLocalTransactionId is protected by
4370 * backendLock and is used only by the locking subsystem. Doing it this
4371 * way makes it easier to verify that there are no funny race conditions.
4372 *
4373 * We don't bother recording this lock in the local lock table, since it's
4374 * only ever released at the end of a transaction. Instead,
4375 * LockReleaseAll() calls VirtualXactLockTableCleanup().
4376 */
4377 void
VirtualXactLockTableInsert(VirtualTransactionId vxid)4378 VirtualXactLockTableInsert(VirtualTransactionId vxid)
4379 {
4380 Assert(VirtualTransactionIdIsValid(vxid));
4381
4382 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4383
4384 Assert(MyProc->backendId == vxid.backendId);
4385 Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
4386 Assert(MyProc->fpVXIDLock == false);
4387
4388 MyProc->fpVXIDLock = true;
4389 MyProc->fpLocalTransactionId = vxid.localTransactionId;
4390
4391 LWLockRelease(&MyProc->backendLock);
4392 }
4393
4394 /*
4395 * VirtualXactLockTableCleanup
4396 *
4397 * Check whether a VXID lock has been materialized; if so, release it,
4398 * unblocking waiters.
4399 */
4400 void
VirtualXactLockTableCleanup(void)4401 VirtualXactLockTableCleanup(void)
4402 {
4403 bool fastpath;
4404 LocalTransactionId lxid;
4405
4406 Assert(MyProc->backendId != InvalidBackendId);
4407
4408 /*
4409 * Clean up shared memory state.
4410 */
4411 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4412
4413 fastpath = MyProc->fpVXIDLock;
4414 lxid = MyProc->fpLocalTransactionId;
4415 MyProc->fpVXIDLock = false;
4416 MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
4417
4418 LWLockRelease(&MyProc->backendLock);
4419
4420 /*
4421 * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
4422 * that means someone transferred the lock to the main lock table.
4423 */
4424 if (!fastpath && LocalTransactionIdIsValid(lxid))
4425 {
4426 VirtualTransactionId vxid;
4427 LOCKTAG locktag;
4428
4429 vxid.backendId = MyBackendId;
4430 vxid.localTransactionId = lxid;
4431 SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
4432
4433 LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
4434 &locktag, ExclusiveLock, false);
4435 }
4436 }
4437
4438 /*
4439 * XactLockForVirtualXact
4440 *
4441 * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
4442 * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those
4443 * functions, it assumes "xid" is never a subtransaction and that "xid" is
4444 * prepared, committed, or aborted.
4445 *
4446 * If !TransactionIdIsValid(xid), this locks every prepared XID having been
4447 * known as "vxid" before its PREPARE TRANSACTION.
4448 */
4449 static bool
XactLockForVirtualXact(VirtualTransactionId vxid,TransactionId xid,bool wait)4450 XactLockForVirtualXact(VirtualTransactionId vxid,
4451 TransactionId xid, bool wait)
4452 {
4453 bool more = false;
4454
4455 /* There is no point to wait for 2PCs if you have no 2PCs. */
4456 if (max_prepared_xacts == 0)
4457 return true;
4458
4459 do
4460 {
4461 LockAcquireResult lar;
4462 LOCKTAG tag;
4463
4464 /* Clear state from previous iterations. */
4465 if (more)
4466 {
4467 xid = InvalidTransactionId;
4468 more = false;
4469 }
4470
4471 /* If we have no xid, try to find one. */
4472 if (!TransactionIdIsValid(xid))
4473 xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
4474 if (!TransactionIdIsValid(xid))
4475 {
4476 Assert(!more);
4477 return true;
4478 }
4479
4480 /* Check or wait for XID completion. */
4481 SET_LOCKTAG_TRANSACTION(tag, xid);
4482 lar = LockAcquire(&tag, ShareLock, false, !wait);
4483 if (lar == LOCKACQUIRE_NOT_AVAIL)
4484 return false;
4485 LockRelease(&tag, ShareLock, false);
4486 } while (more);
4487
4488 return true;
4489 }
4490
4491 /*
4492 * VirtualXactLock
4493 *
4494 * If wait = true, wait as long as the given VXID or any XID acquired by the
4495 * same transaction is still running. Then, return true.
4496 *
4497 * If wait = false, just check whether that VXID or one of those XIDs is still
4498 * running, and return true or false.
4499 */
4500 bool
VirtualXactLock(VirtualTransactionId vxid,bool wait)4501 VirtualXactLock(VirtualTransactionId vxid, bool wait)
4502 {
4503 LOCKTAG tag;
4504 PGPROC *proc;
4505 TransactionId xid = InvalidTransactionId;
4506
4507 Assert(VirtualTransactionIdIsValid(vxid));
4508
4509 if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
4510 /* no vxid lock; localTransactionId is a normal, locked XID */
4511 return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
4512
4513 SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
4514
4515 /*
4516 * If a lock table entry must be made, this is the PGPROC on whose behalf
4517 * it must be done. Note that the transaction might end or the PGPROC
4518 * might be reassigned to a new backend before we get around to examining
4519 * it, but it doesn't matter. If we find upon examination that the
4520 * relevant lxid is no longer running here, that's enough to prove that
4521 * it's no longer running anywhere.
4522 */
4523 proc = BackendIdGetProc(vxid.backendId);
4524 if (proc == NULL)
4525 return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4526
4527 /*
4528 * We must acquire this lock before checking the backendId and lxid
4529 * against the ones we're waiting for. The target backend will only set
4530 * or clear lxid while holding this lock.
4531 */
4532 LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
4533
4534 if (proc->backendId != vxid.backendId
4535 || proc->fpLocalTransactionId != vxid.localTransactionId)
4536 {
4537 /* VXID ended */
4538 LWLockRelease(&proc->backendLock);
4539 return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
4540 }
4541
4542 /*
4543 * If we aren't asked to wait, there's no need to set up a lock table
4544 * entry. The transaction is still in progress, so just return false.
4545 */
4546 if (!wait)
4547 {
4548 LWLockRelease(&proc->backendLock);
4549 return false;
4550 }
4551
4552 /*
4553 * OK, we're going to need to sleep on the VXID. But first, we must set
4554 * up the primary lock table entry, if needed (ie, convert the proc's
4555 * fast-path lock on its VXID to a regular lock).
4556 */
4557 if (proc->fpVXIDLock)
4558 {
4559 PROCLOCK *proclock;
4560 uint32 hashcode;
4561 LWLock *partitionLock;
4562
4563 hashcode = LockTagHashCode(&tag);
4564
4565 partitionLock = LockHashPartitionLock(hashcode);
4566 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4567
4568 proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
4569 &tag, hashcode, ExclusiveLock);
4570 if (!proclock)
4571 {
4572 LWLockRelease(partitionLock);
4573 LWLockRelease(&proc->backendLock);
4574 ereport(ERROR,
4575 (errcode(ERRCODE_OUT_OF_MEMORY),
4576 errmsg("out of shared memory"),
4577 errhint("You might need to increase max_locks_per_transaction.")));
4578 }
4579 GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
4580
4581 LWLockRelease(partitionLock);
4582
4583 proc->fpVXIDLock = false;
4584 }
4585
4586 /*
4587 * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
4588 * search. The proc might have assigned this XID but not yet locked it,
4589 * in which case the proc will lock this XID before releasing the VXID.
4590 * The backendLock critical section excludes VirtualXactLockTableCleanup(),
4591 * so we won't save an XID of a different VXID. It doesn't matter whether
4592 * we save this before or after setting up the primary lock table entry.
4593 */
4594 xid = ProcGlobal->allPgXact[proc->pgprocno].xid;
4595
4596 /* Done with proc->fpLockBits */
4597 LWLockRelease(&proc->backendLock);
4598
4599 /* Time to wait. */
4600 (void) LockAcquire(&tag, ShareLock, false, false);
4601
4602 LockRelease(&tag, ShareLock, false);
4603 return XactLockForVirtualXact(vxid, xid, wait);
4604 }
4605
4606 /*
4607 * LockWaiterCount
4608 *
4609 * Find the number of lock requester on this locktag
4610 */
4611 int
LockWaiterCount(const LOCKTAG * locktag)4612 LockWaiterCount(const LOCKTAG *locktag)
4613 {
4614 LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
4615 LOCK *lock;
4616 bool found;
4617 uint32 hashcode;
4618 LWLock *partitionLock;
4619 int waiters = 0;
4620
4621 if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4622 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4623
4624 hashcode = LockTagHashCode(locktag);
4625 partitionLock = LockHashPartitionLock(hashcode);
4626 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4627
4628 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4629 (const void *) locktag,
4630 hashcode,
4631 HASH_FIND,
4632 &found);
4633 if (found)
4634 {
4635 Assert(lock != NULL);
4636 waiters = lock->nRequested;
4637 }
4638 LWLockRelease(partitionLock);
4639
4640 return waiters;
4641 }
4642