1 /*-------------------------------------------------------------------------
2 *
3 * predicate.c
4 * POSTGRES predicate locking
5 * to support full serializable transaction isolation
6 *
7 *
8 * The approach taken is to implement Serializable Snapshot Isolation (SSI)
9 * as initially described in this paper:
10 *
11 * Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
12 * Serializable isolation for snapshot databases.
13 * In SIGMOD '08: Proceedings of the 2008 ACM SIGMOD
14 * international conference on Management of data,
15 * pages 729-738, New York, NY, USA. ACM.
16 * http://doi.acm.org/10.1145/1376616.1376690
17 *
18 * and further elaborated in Cahill's doctoral thesis:
19 *
20 * Michael James Cahill. 2009.
21 * Serializable Isolation for Snapshot Databases.
22 * Sydney Digital Theses.
23 * University of Sydney, School of Information Technologies.
24 * http://hdl.handle.net/2123/5353
25 *
26 *
27 * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
28 * locks, which are so different from normal locks that a distinct set of
29 * structures is required to handle them. They are needed to detect
30 * rw-conflicts when the read happens before the write. (When the write
31 * occurs first, the reading transaction can check for a conflict by
32 * examining the MVCC data.)
33 *
34 * (1) Besides tuples actually read, they must cover ranges of tuples
35 * which would have been read based on the predicate. This will
36 * require modelling the predicates through locks against database
37 * objects such as pages, index ranges, or entire tables.
38 *
39 * (2) They must be kept in RAM for quick access. Because of this, it
40 * isn't possible to always maintain tuple-level granularity -- when
41 * the space allocated to store these approaches exhaustion, a
42 * request for a lock may need to scan for situations where a single
43 * transaction holds many fine-grained locks which can be coalesced
44 * into a single coarser-grained lock.
45 *
46 * (3) They never block anything; they are more like flags than locks
47 * in that regard; although they refer to database objects and are
48 * used to identify rw-conflicts with normal write locks.
49 *
50 * (4) While they are associated with a transaction, they must survive
51 * a successful COMMIT of that transaction, and remain until all
52 * overlapping transactions complete. This even means that they
53 * must survive termination of the transaction's process. If a
54 * top level transaction is rolled back, however, it is immediately
55 * flagged so that it can be ignored, and its SIREAD locks can be
56 * released any time after that.
57 *
58 * (5) The only transactions which create SIREAD locks or check for
59 * conflicts with them are serializable transactions.
60 *
61 * (6) When a write lock for a top level transaction is found to cover
62 * an existing SIREAD lock for the same transaction, the SIREAD lock
63 * can be deleted.
64 *
65 * (7) A write from a serializable transaction must ensure that an xact
66 * record exists for the transaction, with the same lifespan (until
67 * all concurrent transaction complete or the transaction is rolled
68 * back) so that rw-dependencies to that transaction can be
69 * detected.
70 *
71 * We use an optimization for read-only transactions. Under certain
72 * circumstances, a read-only transaction's snapshot can be shown to
73 * never have conflicts with other transactions. This is referred to
74 * as a "safe" snapshot (and one known not to be is "unsafe").
75 * However, it can't be determined whether a snapshot is safe until
76 * all concurrent read/write transactions complete.
77 *
78 * Once a read-only transaction is known to have a safe snapshot, it
79 * can release its predicate locks and exempt itself from further
80 * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
81 * on safe snapshots, waiting as necessary for one to be available.
82 *
83 *
84 * Lightweight locks to manage access to the predicate locking shared
85 * memory objects must be taken in this order, and should be released in
86 * reverse order:
87 *
88 * SerializableFinishedListLock
89 * - Protects the list of transactions which have completed but which
90 * may yet matter because they overlap still-active transactions.
91 *
92 * SerializablePredicateLockListLock
93 * - Protects the linked list of locks held by a transaction. Note
94 * that the locks themselves are also covered by the partition
95 * locks of their respective lock targets; this lock only affects
96 * the linked list connecting the locks related to a transaction.
97 * - All transactions share this single lock (with no partitioning).
98 * - There is never a need for a process other than the one running
99 * an active transaction to walk the list of locks held by that
100 * transaction.
101 * - It is relatively infrequent that another process needs to
102 * modify the list for a transaction, but it does happen for such
103 * things as index page splits for pages with predicate locks and
104 * freeing of predicate locked pages by a vacuum process. When
105 * removing a lock in such cases, the lock itself contains the
106 * pointers needed to remove it from the list. When adding a
107 * lock in such cases, the lock can be added using the anchor in
108 * the transaction structure. Neither requires walking the list.
109 * - Cleaning up the list for a terminated transaction is sometimes
110 * not done on a retail basis, in which case no lock is required.
111 * - Due to the above, a process accessing its active transaction's
112 * list always uses a shared lock, regardless of whether it is
113 * walking or maintaining the list. This improves concurrency
114 * for the common access patterns.
115 * - A process which needs to alter the list of a transaction other
116 * than its own active transaction must acquire an exclusive
117 * lock.
118 *
119 * PredicateLockHashPartitionLock(hashcode)
120 * - The same lock protects a target, all locks on that target, and
121 * the linked list of locks on the target.
122 * - When more than one is needed, acquire in ascending address order.
123 * - When all are needed (rare), acquire in ascending index order with
124 * PredicateLockHashPartitionLockByIndex(index).
125 *
126 * SerializableXactHashLock
127 * - Protects both PredXact and SerializableXidHash.
128 *
129 *
130 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
131 * Portions Copyright (c) 1994, Regents of the University of California
132 *
133 *
134 * IDENTIFICATION
135 * src/backend/storage/lmgr/predicate.c
136 *
137 *-------------------------------------------------------------------------
138 */
139 /*
140 * INTERFACE ROUTINES
141 *
142 * housekeeping for setting up shared memory predicate lock structures
143 * InitPredicateLocks(void)
144 * PredicateLockShmemSize(void)
145 *
146 * predicate lock reporting
147 * GetPredicateLockStatusData(void)
148 * PageIsPredicateLocked(Relation relation, BlockNumber blkno)
149 *
150 * predicate lock maintenance
151 * GetSerializableTransactionSnapshot(Snapshot snapshot)
152 * SetSerializableTransactionSnapshot(Snapshot snapshot,
153 * TransactionId sourcexid)
154 * RegisterPredicateLockingXid(void)
155 * PredicateLockRelation(Relation relation, Snapshot snapshot)
156 * PredicateLockPage(Relation relation, BlockNumber blkno,
157 * Snapshot snapshot)
158 * PredicateLockTuple(Relation relation, HeapTuple tuple,
159 * Snapshot snapshot)
160 * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
161 * BlockNumber newblkno)
162 * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
163 * BlockNumber newblkno)
164 * TransferPredicateLocksToHeapRelation(Relation relation)
165 * ReleasePredicateLocks(bool isCommit)
166 *
167 * conflict detection (may also trigger rollback)
168 * CheckForSerializableConflictOut(bool visible, Relation relation,
169 * HeapTupleData *tup, Buffer buffer,
170 * Snapshot snapshot)
171 * CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
172 * Buffer buffer)
173 * CheckTableForSerializableConflictIn(Relation relation)
174 *
175 * final rollback checking
176 * PreCommit_CheckForSerializationFailure(void)
177 *
178 * two-phase commit support
179 * AtPrepare_PredicateLocks(void);
180 * PostPrepare_PredicateLocks(TransactionId xid);
181 * PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
182 * predicatelock_twophase_recover(TransactionId xid, uint16 info,
183 * void *recdata, uint32 len);
184 */
185
186 #include "postgres.h"
187
188 #include "access/htup_details.h"
189 #include "access/slru.h"
190 #include "access/subtrans.h"
191 #include "access/transam.h"
192 #include "access/twophase.h"
193 #include "access/twophase_rmgr.h"
194 #include "access/xact.h"
195 #include "access/xlog.h"
196 #include "miscadmin.h"
197 #include "storage/bufmgr.h"
198 #include "storage/predicate.h"
199 #include "storage/predicate_internals.h"
200 #include "storage/proc.h"
201 #include "storage/procarray.h"
202 #include "utils/rel.h"
203 #include "utils/snapmgr.h"
204 #include "utils/tqual.h"
205
206 /* Uncomment the next line to test the graceful degradation code. */
207 /* #define TEST_OLDSERXID */
208
209 /*
210 * Test the most selective fields first, for performance.
211 *
212 * a is covered by b if all of the following hold:
213 * 1) a.database = b.database
214 * 2) a.relation = b.relation
215 * 3) b.offset is invalid (b is page-granularity or higher)
216 * 4) either of the following:
217 * 4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
218 * or 4b) a.offset is invalid and b.page is invalid (a is
219 * page-granularity and b is relation-granularity
220 */
221 #define TargetTagIsCoveredBy(covered_target, covering_target) \
222 ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */ \
223 GET_PREDICATELOCKTARGETTAG_RELATION(covering_target)) \
224 && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) == \
225 InvalidOffsetNumber) /* (3) */ \
226 && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) != \
227 InvalidOffsetNumber) /* (4a) */ \
228 && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
229 GET_PREDICATELOCKTARGETTAG_PAGE(covered_target))) \
230 || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
231 InvalidBlockNumber) /* (4b) */ \
232 && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target) \
233 != InvalidBlockNumber))) \
234 && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) == /* (1) */ \
235 GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
236
237 /*
238 * The predicate locking target and lock shared hash tables are partitioned to
239 * reduce contention. To determine which partition a given target belongs to,
240 * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
241 * apply one of these macros.
242 * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
243 */
244 #define PredicateLockHashPartition(hashcode) \
245 ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
246 #define PredicateLockHashPartitionLock(hashcode) \
247 (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \
248 PredicateLockHashPartition(hashcode)].lock)
249 #define PredicateLockHashPartitionLockByIndex(i) \
250 (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
251
252 #define NPREDICATELOCKTARGETENTS() \
253 mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
254
255 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
256
257 /*
258 * Note that a sxact is marked "prepared" once it has passed
259 * PreCommit_CheckForSerializationFailure, even if it isn't using
260 * 2PC. This is the point at which it can no longer be aborted.
261 *
262 * The PREPARED flag remains set after commit, so SxactIsCommitted
263 * implies SxactIsPrepared.
264 */
265 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
266 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
267 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
268 #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
269 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
270 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
271 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
272 /*
273 * The following macro actually means that the specified transaction has a
274 * conflict out *to a transaction which committed ahead of it*. It's hard
275 * to get that into a name of a reasonable length.
276 */
277 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
278 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
279 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
280 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
281
282 /*
283 * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
284 *
285 * To avoid unnecessary recomputations of the hash code, we try to do this
286 * just once per function, and then pass it around as needed. Aside from
287 * passing the hashcode to hash_search_with_hash_value(), we can extract
288 * the lock partition number from the hashcode.
289 */
290 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
291 get_hash_value(PredicateLockTargetHash, predicatelocktargettag)
292
293 /*
294 * Given a predicate lock tag, and the hash for its target,
295 * compute the lock hash.
296 *
297 * To make the hash code also depend on the transaction, we xor the sxid
298 * struct's address into the hash code, left-shifted so that the
299 * partition-number bits don't change. Since this is only a hash, we
300 * don't care if we lose high-order bits of the address; use an
301 * intermediate variable to suppress cast-pointer-to-int warnings.
302 */
303 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
304 ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
305 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
306
307
308 /*
309 * The SLRU buffer area through which we access the old xids.
310 */
311 static SlruCtlData OldSerXidSlruCtlData;
312
313 #define OldSerXidSlruCtl (&OldSerXidSlruCtlData)
314
315 #define OLDSERXID_PAGESIZE BLCKSZ
316 #define OLDSERXID_ENTRYSIZE sizeof(SerCommitSeqNo)
317 #define OLDSERXID_ENTRIESPERPAGE (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
318
319 /*
320 * Set maximum pages based on the lesser of the number needed to track all
321 * transactions and the maximum that SLRU supports.
322 */
323 #define OLDSERXID_MAX_PAGE Min(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1, \
324 (MaxTransactionId) / OLDSERXID_ENTRIESPERPAGE)
325
326 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
327
328 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
329 (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
330 ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
331
332 #define OldSerXidPage(xid) ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
333 #define OldSerXidSegment(page) ((page) / SLRU_PAGES_PER_SEGMENT)
334
335 typedef struct OldSerXidControlData
336 {
337 int headPage; /* newest initialized page */
338 TransactionId headXid; /* newest valid Xid in the SLRU */
339 TransactionId tailXid; /* oldest xmin we might be interested in */
340 bool warningIssued; /* have we issued SLRU wrap-around warning? */
341 } OldSerXidControlData;
342
343 typedef struct OldSerXidControlData *OldSerXidControl;
344
345 static OldSerXidControl oldSerXidControl;
346
347 /*
348 * When the oldest committed transaction on the "finished" list is moved to
349 * SLRU, its predicate locks will be moved to this "dummy" transaction,
350 * collapsing duplicate targets. When a duplicate is found, the later
351 * commitSeqNo is used.
352 */
353 static SERIALIZABLEXACT *OldCommittedSxact;
354
355
356 /* This configuration variable is used to set the predicate lock table size */
357 int max_predicate_locks_per_xact; /* set by guc.c */
358
359 /*
360 * This provides a list of objects in order to track transactions
361 * participating in predicate locking. Entries in the list are fixed size,
362 * and reside in shared memory. The memory address of an entry must remain
363 * fixed during its lifetime. The list will be protected from concurrent
364 * update externally; no provision is made in this code to manage that. The
365 * number of entries in the list, and the size allowed for each entry is
366 * fixed upon creation.
367 */
368 static PredXactList PredXact;
369
370 /*
371 * This provides a pool of RWConflict data elements to use in conflict lists
372 * between transactions.
373 */
374 static RWConflictPoolHeader RWConflictPool;
375
376 /*
377 * The predicate locking hash tables are in shared memory.
378 * Each backend keeps pointers to them.
379 */
380 static HTAB *SerializableXidHash;
381 static HTAB *PredicateLockTargetHash;
382 static HTAB *PredicateLockHash;
383 static SHM_QUEUE *FinishedSerializableTransactions;
384
385 /*
386 * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing
387 * this entry, you can ensure that there's enough scratch space available for
388 * inserting one entry in the hash table. This is an otherwise-invalid tag.
389 */
390 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0};
391 static uint32 ScratchTargetTagHash;
392 static LWLock *ScratchPartitionLock;
393
394 /*
395 * The local hash table used to determine when to combine multiple fine-
396 * grained locks into a single courser-grained lock.
397 */
398 static HTAB *LocalPredicateLockHash = NULL;
399
400 /*
401 * Keep a pointer to the currently-running serializable transaction (if any)
402 * for quick reference. Also, remember if we have written anything that could
403 * cause a rw-conflict.
404 */
405 static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
406 static bool MyXactDidWrite = false;
407
408 /* local functions */
409
410 static SERIALIZABLEXACT *CreatePredXact(void);
411 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
412 static SERIALIZABLEXACT *FirstPredXact(void);
413 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
414
415 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
416 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
417 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
418 static void ReleaseRWConflict(RWConflict conflict);
419 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
420
421 static bool OldSerXidPagePrecedesLogically(int page1, int page2);
422 static void OldSerXidInit(void);
423 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
424 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
425 static void OldSerXidSetActiveSerXmin(TransactionId xid);
426
427 static uint32 predicatelock_hash(const void *key, Size keysize);
428 static void SummarizeOldestCommittedSxact(void);
429 static Snapshot GetSafeSnapshot(Snapshot snapshot);
430 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
431 TransactionId sourcexid);
432 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
433 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
434 PREDICATELOCKTARGETTAG *parent);
435 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
436 static void RemoveScratchTarget(bool lockheld);
437 static void RestoreScratchTarget(bool lockheld);
438 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
439 uint32 targettaghash);
440 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
441 static int PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
442 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
443 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
444 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
445 uint32 targettaghash,
446 SERIALIZABLEXACT *sxact);
447 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
448 static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
449 PREDICATELOCKTARGETTAG newtargettag,
450 bool removeOld);
451 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
452 static void DropAllPredicateLocksFromTable(Relation relation,
453 bool transfer);
454 static void SetNewSxactGlobalXmin(void);
455 static void ClearOldPredicateLocks(void);
456 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
457 bool summarize);
458 static bool XidIsConcurrent(TransactionId xid);
459 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
460 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
461 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
462 SERIALIZABLEXACT *writer);
463
464
465 /*------------------------------------------------------------------------*/
466
467 /*
468 * Does this relation participate in predicate locking? Temporary and system
469 * relations are exempt, as are materialized views.
470 */
471 static inline bool
PredicateLockingNeededForRelation(Relation relation)472 PredicateLockingNeededForRelation(Relation relation)
473 {
474 return !(relation->rd_id < FirstBootstrapObjectId ||
475 RelationUsesLocalBuffers(relation) ||
476 relation->rd_rel->relkind == RELKIND_MATVIEW);
477 }
478
479 /*
480 * When a public interface method is called for a read, this is the test to
481 * see if we should do a quick return.
482 *
483 * Note: this function has side-effects! If this transaction has been flagged
484 * as RO-safe since the last call, we release all predicate locks and reset
485 * MySerializableXact. That makes subsequent calls to return quickly.
486 *
487 * This is marked as 'inline' to make to eliminate the function call overhead
488 * in the common case that serialization is not needed.
489 */
490 static inline bool
SerializationNeededForRead(Relation relation,Snapshot snapshot)491 SerializationNeededForRead(Relation relation, Snapshot snapshot)
492 {
493 /* Nothing to do if this is not a serializable transaction */
494 if (MySerializableXact == InvalidSerializableXact)
495 return false;
496
497 /*
498 * Don't acquire locks or conflict when scanning with a special snapshot.
499 * This excludes things like CLUSTER and REINDEX. They use the wholesale
500 * functions TransferPredicateLocksToHeapRelation() and
501 * CheckTableForSerializableConflictIn() to participate in serialization,
502 * but the scans involved don't need serialization.
503 */
504 if (!IsMVCCSnapshot(snapshot))
505 return false;
506
507 /*
508 * Check if we have just become "RO-safe". If we have, immediately release
509 * all locks as they're not needed anymore. This also resets
510 * MySerializableXact, so that subsequent calls to this function can exit
511 * quickly.
512 *
513 * A transaction is flagged as RO_SAFE if all concurrent R/W transactions
514 * commit without having conflicts out to an earlier snapshot, thus
515 * ensuring that no conflicts are possible for this transaction.
516 */
517 if (SxactIsROSafe(MySerializableXact))
518 {
519 ReleasePredicateLocks(false);
520 return false;
521 }
522
523 /* Check if the relation doesn't participate in predicate locking */
524 if (!PredicateLockingNeededForRelation(relation))
525 return false;
526
527 return true; /* no excuse to skip predicate locking */
528 }
529
530 /*
531 * Like SerializationNeededForRead(), but called on writes.
532 * The logic is the same, but there is no snapshot and we can't be RO-safe.
533 */
534 static inline bool
SerializationNeededForWrite(Relation relation)535 SerializationNeededForWrite(Relation relation)
536 {
537 /* Nothing to do if this is not a serializable transaction */
538 if (MySerializableXact == InvalidSerializableXact)
539 return false;
540
541 /* Check if the relation doesn't participate in predicate locking */
542 if (!PredicateLockingNeededForRelation(relation))
543 return false;
544
545 return true; /* no excuse to skip predicate locking */
546 }
547
548
549 /*------------------------------------------------------------------------*/
550
551 /*
552 * These functions are a simple implementation of a list for this specific
553 * type of struct. If there is ever a generalized shared memory list, we
554 * should probably switch to that.
555 */
556 static SERIALIZABLEXACT *
CreatePredXact(void)557 CreatePredXact(void)
558 {
559 PredXactListElement ptle;
560
561 ptle = (PredXactListElement)
562 SHMQueueNext(&PredXact->availableList,
563 &PredXact->availableList,
564 offsetof(PredXactListElementData, link));
565 if (!ptle)
566 return NULL;
567
568 SHMQueueDelete(&ptle->link);
569 SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
570 return &ptle->sxact;
571 }
572
573 static void
ReleasePredXact(SERIALIZABLEXACT * sxact)574 ReleasePredXact(SERIALIZABLEXACT *sxact)
575 {
576 PredXactListElement ptle;
577
578 Assert(ShmemAddrIsValid(sxact));
579
580 ptle = (PredXactListElement)
581 (((char *) sxact)
582 - offsetof(PredXactListElementData, sxact)
583 + offsetof(PredXactListElementData, link));
584 SHMQueueDelete(&ptle->link);
585 SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
586 }
587
588 static SERIALIZABLEXACT *
FirstPredXact(void)589 FirstPredXact(void)
590 {
591 PredXactListElement ptle;
592
593 ptle = (PredXactListElement)
594 SHMQueueNext(&PredXact->activeList,
595 &PredXact->activeList,
596 offsetof(PredXactListElementData, link));
597 if (!ptle)
598 return NULL;
599
600 return &ptle->sxact;
601 }
602
603 static SERIALIZABLEXACT *
NextPredXact(SERIALIZABLEXACT * sxact)604 NextPredXact(SERIALIZABLEXACT *sxact)
605 {
606 PredXactListElement ptle;
607
608 Assert(ShmemAddrIsValid(sxact));
609
610 ptle = (PredXactListElement)
611 (((char *) sxact)
612 - offsetof(PredXactListElementData, sxact)
613 + offsetof(PredXactListElementData, link));
614 ptle = (PredXactListElement)
615 SHMQueueNext(&PredXact->activeList,
616 &ptle->link,
617 offsetof(PredXactListElementData, link));
618 if (!ptle)
619 return NULL;
620
621 return &ptle->sxact;
622 }
623
624 /*------------------------------------------------------------------------*/
625
626 /*
627 * These functions manage primitive access to the RWConflict pool and lists.
628 */
629 static bool
RWConflictExists(const SERIALIZABLEXACT * reader,const SERIALIZABLEXACT * writer)630 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
631 {
632 RWConflict conflict;
633
634 Assert(reader != writer);
635
636 /* Check the ends of the purported conflict first. */
637 if (SxactIsDoomed(reader)
638 || SxactIsDoomed(writer)
639 || SHMQueueEmpty(&reader->outConflicts)
640 || SHMQueueEmpty(&writer->inConflicts))
641 return false;
642
643 /* A conflict is possible; walk the list to find out. */
644 conflict = (RWConflict)
645 SHMQueueNext(&reader->outConflicts,
646 &reader->outConflicts,
647 offsetof(RWConflictData, outLink));
648 while (conflict)
649 {
650 if (conflict->sxactIn == writer)
651 return true;
652 conflict = (RWConflict)
653 SHMQueueNext(&reader->outConflicts,
654 &conflict->outLink,
655 offsetof(RWConflictData, outLink));
656 }
657
658 /* No conflict found. */
659 return false;
660 }
661
662 static void
SetRWConflict(SERIALIZABLEXACT * reader,SERIALIZABLEXACT * writer)663 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
664 {
665 RWConflict conflict;
666
667 Assert(reader != writer);
668 Assert(!RWConflictExists(reader, writer));
669
670 conflict = (RWConflict)
671 SHMQueueNext(&RWConflictPool->availableList,
672 &RWConflictPool->availableList,
673 offsetof(RWConflictData, outLink));
674 if (!conflict)
675 ereport(ERROR,
676 (errcode(ERRCODE_OUT_OF_MEMORY),
677 errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
678 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
679
680 SHMQueueDelete(&conflict->outLink);
681
682 conflict->sxactOut = reader;
683 conflict->sxactIn = writer;
684 SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
685 SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
686 }
687
688 static void
SetPossibleUnsafeConflict(SERIALIZABLEXACT * roXact,SERIALIZABLEXACT * activeXact)689 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
690 SERIALIZABLEXACT *activeXact)
691 {
692 RWConflict conflict;
693
694 Assert(roXact != activeXact);
695 Assert(SxactIsReadOnly(roXact));
696 Assert(!SxactIsReadOnly(activeXact));
697
698 conflict = (RWConflict)
699 SHMQueueNext(&RWConflictPool->availableList,
700 &RWConflictPool->availableList,
701 offsetof(RWConflictData, outLink));
702 if (!conflict)
703 ereport(ERROR,
704 (errcode(ERRCODE_OUT_OF_MEMORY),
705 errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
706 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
707
708 SHMQueueDelete(&conflict->outLink);
709
710 conflict->sxactOut = activeXact;
711 conflict->sxactIn = roXact;
712 SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
713 &conflict->outLink);
714 SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
715 &conflict->inLink);
716 }
717
718 static void
ReleaseRWConflict(RWConflict conflict)719 ReleaseRWConflict(RWConflict conflict)
720 {
721 SHMQueueDelete(&conflict->inLink);
722 SHMQueueDelete(&conflict->outLink);
723 SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
724 }
725
726 static void
FlagSxactUnsafe(SERIALIZABLEXACT * sxact)727 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
728 {
729 RWConflict conflict,
730 nextConflict;
731
732 Assert(SxactIsReadOnly(sxact));
733 Assert(!SxactIsROSafe(sxact));
734
735 sxact->flags |= SXACT_FLAG_RO_UNSAFE;
736
737 /*
738 * We know this isn't a safe snapshot, so we can stop looking for other
739 * potential conflicts.
740 */
741 conflict = (RWConflict)
742 SHMQueueNext(&sxact->possibleUnsafeConflicts,
743 &sxact->possibleUnsafeConflicts,
744 offsetof(RWConflictData, inLink));
745 while (conflict)
746 {
747 nextConflict = (RWConflict)
748 SHMQueueNext(&sxact->possibleUnsafeConflicts,
749 &conflict->inLink,
750 offsetof(RWConflictData, inLink));
751
752 Assert(!SxactIsReadOnly(conflict->sxactOut));
753 Assert(sxact == conflict->sxactIn);
754
755 ReleaseRWConflict(conflict);
756
757 conflict = nextConflict;
758 }
759 }
760
761 /*------------------------------------------------------------------------*/
762
763 /*
764 * Decide whether an OldSerXid page number is "older" for truncation purposes.
765 * Analogous to CLOGPagePrecedes().
766 */
767 static bool
OldSerXidPagePrecedesLogically(int page1,int page2)768 OldSerXidPagePrecedesLogically(int page1, int page2)
769 {
770 TransactionId xid1;
771 TransactionId xid2;
772
773 xid1 = ((TransactionId) page1) * OLDSERXID_ENTRIESPERPAGE;
774 xid1 += FirstNormalTransactionId + 1;
775 xid2 = ((TransactionId) page2) * OLDSERXID_ENTRIESPERPAGE;
776 xid2 += FirstNormalTransactionId + 1;
777
778 return (TransactionIdPrecedes(xid1, xid2) &&
779 TransactionIdPrecedes(xid1, xid2 + OLDSERXID_ENTRIESPERPAGE - 1));
780 }
781
782 #ifdef USE_ASSERT_CHECKING
783 static void
OldSerXidPagePrecedesLogicallyUnitTests(void)784 OldSerXidPagePrecedesLogicallyUnitTests(void)
785 {
786 int per_page = OLDSERXID_ENTRIESPERPAGE,
787 offset = per_page / 2;
788 int newestPage,
789 oldestPage,
790 headPage,
791 targetPage;
792 TransactionId newestXact,
793 oldestXact;
794
795 /* GetNewTransactionId() has assigned the last XID it can safely use. */
796 newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1; /* nothing special */
797 newestXact = newestPage * per_page + offset;
798 Assert(newestXact / per_page == newestPage);
799 oldestXact = newestXact + 1;
800 oldestXact -= 1U << 31;
801 oldestPage = oldestXact / per_page;
802
803 /*
804 * In this scenario, the SLRU headPage pertains to the last ~1000 XIDs
805 * assigned. oldestXact finishes, ~2B XIDs having elapsed since it
806 * started. Further transactions cause us to summarize oldestXact to
807 * tailPage. Function must return false so OldSerXidAdd() doesn't zero
808 * tailPage (which may contain entries for other old, recently-finished
809 * XIDs) and half the SLRU. Reaching this requires burning ~2B XIDs in
810 * single-user mode, a negligible possibility.
811 */
812 headPage = newestPage;
813 targetPage = oldestPage;
814 Assert(!OldSerXidPagePrecedesLogically(headPage, targetPage));
815
816 /*
817 * In this scenario, the SLRU headPage pertains to oldestXact. We're
818 * summarizing an XID near newestXact. (Assume few other XIDs used
819 * SERIALIZABLE, hence the minimal headPage advancement. Assume
820 * oldestXact was long-running and only recently reached the SLRU.)
821 * Function must return true to make OldSerXidAdd() create targetPage.
822 *
823 * Today's implementation mishandles this case, but it doesn't matter
824 * enough to fix. Verify that the defect affects just one page by
825 * asserting correct treatment of its prior page. Reaching this case
826 * requires burning ~2B XIDs in single-user mode, a negligible
827 * possibility. Moreover, if it does happen, the consequence would be
828 * mild, namely a new transaction failing in SimpleLruReadPage().
829 */
830 headPage = oldestPage;
831 targetPage = newestPage;
832 Assert(OldSerXidPagePrecedesLogically(headPage, targetPage - 1));
833 #if 0
834 Assert(OldSerXidPagePrecedesLogically(headPage, targetPage));
835 #endif
836 }
837 #endif
838
839 /*
840 * Initialize for the tracking of old serializable committed xids.
841 */
842 static void
OldSerXidInit(void)843 OldSerXidInit(void)
844 {
845 bool found;
846
847 /*
848 * Set up SLRU management of the pg_serial data.
849 */
850 OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
851 SimpleLruInit(OldSerXidSlruCtl, "oldserxid",
852 NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial",
853 LWTRANCHE_OLDSERXID_BUFFERS);
854 /* Override default assumption that writes should be fsync'd */
855 OldSerXidSlruCtl->do_fsync = false;
856 #ifdef USE_ASSERT_CHECKING
857 OldSerXidPagePrecedesLogicallyUnitTests();
858 #endif
859 SlruPagePrecedesUnitTests(OldSerXidSlruCtl, OLDSERXID_ENTRIESPERPAGE);
860
861 /*
862 * Create or attach to the OldSerXidControl structure.
863 */
864 oldSerXidControl = (OldSerXidControl)
865 ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
866
867 Assert(found == IsUnderPostmaster);
868 if (!found)
869 {
870 /*
871 * Set control information to reflect empty SLRU.
872 */
873 oldSerXidControl->headPage = -1;
874 oldSerXidControl->headXid = InvalidTransactionId;
875 oldSerXidControl->tailXid = InvalidTransactionId;
876 oldSerXidControl->warningIssued = false;
877 }
878 }
879
880 /*
881 * Record a committed read write serializable xid and the minimum
882 * commitSeqNo of any transactions to which this xid had a rw-conflict out.
883 * An invalid seqNo means that there were no conflicts out from xid.
884 */
885 static void
OldSerXidAdd(TransactionId xid,SerCommitSeqNo minConflictCommitSeqNo)886 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
887 {
888 TransactionId tailXid;
889 int targetPage;
890 int slotno;
891 int firstZeroPage;
892 bool isNewPage;
893
894 Assert(TransactionIdIsValid(xid));
895
896 targetPage = OldSerXidPage(xid);
897
898 LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
899
900 /*
901 * If no serializable transactions are active, there shouldn't be anything
902 * to push out to the SLRU. Hitting this assert would mean there's
903 * something wrong with the earlier cleanup logic.
904 */
905 tailXid = oldSerXidControl->tailXid;
906 Assert(TransactionIdIsValid(tailXid));
907
908 /*
909 * If the SLRU is currently unused, zero out the whole active region from
910 * tailXid to headXid before taking it into use. Otherwise zero out only
911 * any new pages that enter the tailXid-headXid range as we advance
912 * headXid.
913 */
914 if (oldSerXidControl->headPage < 0)
915 {
916 firstZeroPage = OldSerXidPage(tailXid);
917 isNewPage = true;
918 }
919 else
920 {
921 firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
922 isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
923 targetPage);
924 }
925
926 if (!TransactionIdIsValid(oldSerXidControl->headXid)
927 || TransactionIdFollows(xid, oldSerXidControl->headXid))
928 oldSerXidControl->headXid = xid;
929 if (isNewPage)
930 oldSerXidControl->headPage = targetPage;
931
932 /*
933 * Give a warning if we're about to run out of SLRU pages.
934 *
935 * slru.c has a maximum of 64k segments, with 32 (SLRU_PAGES_PER_SEGMENT)
936 * pages each. We need to store a 64-bit integer for each Xid, and with
937 * default 8k block size, 65536*32 pages is only enough to cover 2^30
938 * XIDs. If we're about to hit that limit and wrap around, warn the user.
939 *
940 * To avoid spamming the user, we only give one warning when we've used 1
941 * billion XIDs, and stay silent until the situation is fixed and the
942 * number of XIDs used falls below 800 million again.
943 *
944 * XXX: We have no safeguard to actually *prevent* the wrap-around,
945 * though. All you get is a warning.
946 */
947 if (oldSerXidControl->warningIssued)
948 {
949 TransactionId lowWatermark;
950
951 lowWatermark = tailXid + 800000000;
952 if (lowWatermark < FirstNormalTransactionId)
953 lowWatermark = FirstNormalTransactionId;
954 if (TransactionIdPrecedes(xid, lowWatermark))
955 oldSerXidControl->warningIssued = false;
956 }
957 else
958 {
959 TransactionId highWatermark;
960
961 highWatermark = tailXid + 1000000000;
962 if (highWatermark < FirstNormalTransactionId)
963 highWatermark = FirstNormalTransactionId;
964 if (TransactionIdFollows(xid, highWatermark))
965 {
966 oldSerXidControl->warningIssued = true;
967 ereport(WARNING,
968 (errmsg("memory for serializable conflict tracking is nearly exhausted"),
969 errhint("There might be an idle transaction or a forgotten prepared transaction causing this.")));
970 }
971 }
972
973 if (isNewPage)
974 {
975 /* Initialize intervening pages. */
976 while (firstZeroPage != targetPage)
977 {
978 (void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
979 firstZeroPage = OldSerXidNextPage(firstZeroPage);
980 }
981 slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
982 }
983 else
984 slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
985
986 OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
987 OldSerXidSlruCtl->shared->page_dirty[slotno] = true;
988
989 LWLockRelease(OldSerXidLock);
990 }
991
992 /*
993 * Get the minimum commitSeqNo for any conflict out for the given xid. For
994 * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
995 * will be returned.
996 */
997 static SerCommitSeqNo
OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)998 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
999 {
1000 TransactionId headXid;
1001 TransactionId tailXid;
1002 SerCommitSeqNo val;
1003 int slotno;
1004
1005 Assert(TransactionIdIsValid(xid));
1006
1007 LWLockAcquire(OldSerXidLock, LW_SHARED);
1008 headXid = oldSerXidControl->headXid;
1009 tailXid = oldSerXidControl->tailXid;
1010 LWLockRelease(OldSerXidLock);
1011
1012 if (!TransactionIdIsValid(headXid))
1013 return 0;
1014
1015 Assert(TransactionIdIsValid(tailXid));
1016
1017 if (TransactionIdPrecedes(xid, tailXid)
1018 || TransactionIdFollows(xid, headXid))
1019 return 0;
1020
1021 /*
1022 * The following function must be called without holding OldSerXidLock,
1023 * but will return with that lock held, which must then be released.
1024 */
1025 slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
1026 OldSerXidPage(xid), xid);
1027 val = OldSerXidValue(slotno, xid);
1028 LWLockRelease(OldSerXidLock);
1029 return val;
1030 }
1031
1032 /*
1033 * Call this whenever there is a new xmin for active serializable
1034 * transactions. We don't need to keep information on transactions which
1035 * precede that. InvalidTransactionId means none active, so everything in
1036 * the SLRU can be discarded.
1037 */
1038 static void
OldSerXidSetActiveSerXmin(TransactionId xid)1039 OldSerXidSetActiveSerXmin(TransactionId xid)
1040 {
1041 LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
1042
1043 /*
1044 * When no sxacts are active, nothing overlaps, set the xid values to
1045 * invalid to show that there are no valid entries. Don't clear headPage,
1046 * though. A new xmin might still land on that page, and we don't want to
1047 * repeatedly zero out the same page.
1048 */
1049 if (!TransactionIdIsValid(xid))
1050 {
1051 oldSerXidControl->tailXid = InvalidTransactionId;
1052 oldSerXidControl->headXid = InvalidTransactionId;
1053 LWLockRelease(OldSerXidLock);
1054 return;
1055 }
1056
1057 /*
1058 * When we're recovering prepared transactions, the global xmin might move
1059 * backwards depending on the order they're recovered. Normally that's not
1060 * OK, but during recovery no serializable transactions will commit, so
1061 * the SLRU is empty and we can get away with it.
1062 */
1063 if (RecoveryInProgress())
1064 {
1065 Assert(oldSerXidControl->headPage < 0);
1066 if (!TransactionIdIsValid(oldSerXidControl->tailXid)
1067 || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
1068 {
1069 oldSerXidControl->tailXid = xid;
1070 }
1071 LWLockRelease(OldSerXidLock);
1072 return;
1073 }
1074
1075 Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
1076 || TransactionIdFollows(xid, oldSerXidControl->tailXid));
1077
1078 oldSerXidControl->tailXid = xid;
1079
1080 LWLockRelease(OldSerXidLock);
1081 }
1082
1083 /*
1084 * Perform a checkpoint --- either during shutdown, or on-the-fly
1085 *
1086 * We don't have any data that needs to survive a restart, but this is a
1087 * convenient place to truncate the SLRU.
1088 */
1089 void
CheckPointPredicate(void)1090 CheckPointPredicate(void)
1091 {
1092 int tailPage;
1093
1094 LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
1095
1096 /* Exit quickly if the SLRU is currently not in use. */
1097 if (oldSerXidControl->headPage < 0)
1098 {
1099 LWLockRelease(OldSerXidLock);
1100 return;
1101 }
1102
1103 if (TransactionIdIsValid(oldSerXidControl->tailXid))
1104 {
1105 /* We can truncate the SLRU up to the page containing tailXid */
1106 tailPage = OldSerXidPage(oldSerXidControl->tailXid);
1107 }
1108 else
1109 {
1110 /*----------
1111 * The SLRU is no longer needed. Truncate to head before we set head
1112 * invalid.
1113 *
1114 * XXX: It's possible that the SLRU is not needed again until XID
1115 * wrap-around has happened, so that the segment containing headPage
1116 * that we leave behind will appear to be new again. In that case it
1117 * won't be removed until XID horizon advances enough to make it
1118 * current again.
1119 *
1120 * XXX: This should happen in vac_truncate_clog(), not in checkpoints.
1121 * Consider this scenario, starting from a system with no in-progress
1122 * transactions and VACUUM FREEZE having maximized oldestXact:
1123 * - Start a SERIALIZABLE transaction.
1124 * - Start, finish, and summarize a SERIALIZABLE transaction, creating
1125 * one SLRU page.
1126 * - Consume XIDs to reach xidStopLimit.
1127 * - Finish all transactions. Due to the long-running SERIALIZABLE
1128 * transaction, earlier checkpoints did not touch headPage. The
1129 * next checkpoint will change it, but that checkpoint happens after
1130 * the end of the scenario.
1131 * - VACUUM to advance XID limits.
1132 * - Consume ~2M XIDs, crossing the former xidWrapLimit.
1133 * - Start, finish, and summarize a SERIALIZABLE transaction.
1134 * OldSerXidAdd() declines to create the targetPage, because
1135 * headPage is not regarded as in the past relative to that
1136 * targetPage. The transaction instigating the summarize fails in
1137 * SimpleLruReadPage().
1138 */
1139 tailPage = oldSerXidControl->headPage;
1140 oldSerXidControl->headPage = -1;
1141 }
1142
1143 LWLockRelease(OldSerXidLock);
1144
1145 /* Truncate away pages that are no longer required */
1146 SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
1147
1148 /*
1149 * Flush dirty SLRU pages to disk
1150 *
1151 * This is not actually necessary from a correctness point of view. We do
1152 * it merely as a debugging aid.
1153 *
1154 * We're doing this after the truncation to avoid writing pages right
1155 * before deleting the file in which they sit, which would be completely
1156 * pointless.
1157 */
1158 SimpleLruFlush(OldSerXidSlruCtl, true);
1159 }
1160
1161 /*------------------------------------------------------------------------*/
1162
1163 /*
1164 * InitPredicateLocks -- Initialize the predicate locking data structures.
1165 *
1166 * This is called from CreateSharedMemoryAndSemaphores(), which see for
1167 * more comments. In the normal postmaster case, the shared hash tables
1168 * are created here. Backends inherit the pointers
1169 * to the shared tables via fork(). In the EXEC_BACKEND case, each
1170 * backend re-executes this code to obtain pointers to the already existing
1171 * shared hash tables.
1172 */
1173 void
InitPredicateLocks(void)1174 InitPredicateLocks(void)
1175 {
1176 HASHCTL info;
1177 long max_table_size;
1178 Size requestSize;
1179 bool found;
1180
1181 #ifndef EXEC_BACKEND
1182 Assert(!IsUnderPostmaster);
1183 #endif
1184
1185 /*
1186 * Compute size of predicate lock target hashtable. Note these
1187 * calculations must agree with PredicateLockShmemSize!
1188 */
1189 max_table_size = NPREDICATELOCKTARGETENTS();
1190
1191 /*
1192 * Allocate hash table for PREDICATELOCKTARGET structs. This stores
1193 * per-predicate-lock-target information.
1194 */
1195 MemSet(&info, 0, sizeof(info));
1196 info.keysize = sizeof(PREDICATELOCKTARGETTAG);
1197 info.entrysize = sizeof(PREDICATELOCKTARGET);
1198 info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1199
1200 PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
1201 max_table_size,
1202 max_table_size,
1203 &info,
1204 HASH_ELEM | HASH_BLOBS |
1205 HASH_PARTITION | HASH_FIXED_SIZE);
1206
1207 /*
1208 * Reserve a dummy entry in the hash table; we use it to make sure there's
1209 * always one entry available when we need to split or combine a page,
1210 * because running out of space there could mean aborting a
1211 * non-serializable transaction.
1212 */
1213 if (!IsUnderPostmaster)
1214 {
1215 (void) hash_search(PredicateLockTargetHash, &ScratchTargetTag,
1216 HASH_ENTER, &found);
1217 Assert(!found);
1218 }
1219
1220 /* Pre-calculate the hash and partition lock of the scratch entry */
1221 ScratchTargetTagHash = PredicateLockTargetTagHashCode(&ScratchTargetTag);
1222 ScratchPartitionLock = PredicateLockHashPartitionLock(ScratchTargetTagHash);
1223
1224 /*
1225 * Allocate hash table for PREDICATELOCK structs. This stores per
1226 * xact-lock-of-a-target information.
1227 */
1228 MemSet(&info, 0, sizeof(info));
1229 info.keysize = sizeof(PREDICATELOCKTAG);
1230 info.entrysize = sizeof(PREDICATELOCK);
1231 info.hash = predicatelock_hash;
1232 info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1233
1234 /* Assume an average of 2 xacts per target */
1235 max_table_size *= 2;
1236
1237 PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
1238 max_table_size,
1239 max_table_size,
1240 &info,
1241 HASH_ELEM | HASH_FUNCTION |
1242 HASH_PARTITION | HASH_FIXED_SIZE);
1243
1244 /*
1245 * Compute size for serializable transaction hashtable. Note these
1246 * calculations must agree with PredicateLockShmemSize!
1247 */
1248 max_table_size = (MaxBackends + max_prepared_xacts);
1249
1250 /*
1251 * Allocate a list to hold information on transactions participating in
1252 * predicate locking.
1253 *
1254 * Assume an average of 10 predicate locking transactions per backend.
1255 * This allows aggressive cleanup while detail is present before data must
1256 * be summarized for storage in SLRU and the "dummy" transaction.
1257 */
1258 max_table_size *= 10;
1259
1260 PredXact = ShmemInitStruct("PredXactList",
1261 PredXactListDataSize,
1262 &found);
1263 Assert(found == IsUnderPostmaster);
1264 if (!found)
1265 {
1266 int i;
1267
1268 SHMQueueInit(&PredXact->availableList);
1269 SHMQueueInit(&PredXact->activeList);
1270 PredXact->SxactGlobalXmin = InvalidTransactionId;
1271 PredXact->SxactGlobalXminCount = 0;
1272 PredXact->WritableSxactCount = 0;
1273 PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
1274 PredXact->CanPartialClearThrough = 0;
1275 PredXact->HavePartialClearedThrough = 0;
1276 requestSize = mul_size((Size) max_table_size,
1277 PredXactListElementDataSize);
1278 PredXact->element = ShmemAlloc(requestSize);
1279 if (PredXact->element == NULL)
1280 ereport(ERROR,
1281 (errcode(ERRCODE_OUT_OF_MEMORY),
1282 errmsg("not enough shared memory for elements of data structure"
1283 " \"%s\" (%zu bytes requested)",
1284 "PredXactList", requestSize)));
1285 /* Add all elements to available list, clean. */
1286 memset(PredXact->element, 0, requestSize);
1287 for (i = 0; i < max_table_size; i++)
1288 {
1289 SHMQueueInsertBefore(&(PredXact->availableList),
1290 &(PredXact->element[i].link));
1291 }
1292 PredXact->OldCommittedSxact = CreatePredXact();
1293 SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
1294 PredXact->OldCommittedSxact->prepareSeqNo = 0;
1295 PredXact->OldCommittedSxact->commitSeqNo = 0;
1296 PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
1297 SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
1298 SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
1299 SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
1300 SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
1301 SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
1302 PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
1303 PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
1304 PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
1305 PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
1306 PredXact->OldCommittedSxact->pid = 0;
1307 }
1308 /* This never changes, so let's keep a local copy. */
1309 OldCommittedSxact = PredXact->OldCommittedSxact;
1310
1311 /*
1312 * Allocate hash table for SERIALIZABLEXID structs. This stores per-xid
1313 * information for serializable transactions which have accessed data.
1314 */
1315 MemSet(&info, 0, sizeof(info));
1316 info.keysize = sizeof(SERIALIZABLEXIDTAG);
1317 info.entrysize = sizeof(SERIALIZABLEXID);
1318
1319 SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
1320 max_table_size,
1321 max_table_size,
1322 &info,
1323 HASH_ELEM | HASH_BLOBS |
1324 HASH_FIXED_SIZE);
1325
1326 /*
1327 * Allocate space for tracking rw-conflicts in lists attached to the
1328 * transactions.
1329 *
1330 * Assume an average of 5 conflicts per transaction. Calculations suggest
1331 * that this will prevent resource exhaustion in even the most pessimal
1332 * loads up to max_connections = 200 with all 200 connections pounding the
1333 * database with serializable transactions. Beyond that, there may be
1334 * occasional transactions canceled when trying to flag conflicts. That's
1335 * probably OK.
1336 */
1337 max_table_size *= 5;
1338
1339 RWConflictPool = ShmemInitStruct("RWConflictPool",
1340 RWConflictPoolHeaderDataSize,
1341 &found);
1342 Assert(found == IsUnderPostmaster);
1343 if (!found)
1344 {
1345 int i;
1346
1347 SHMQueueInit(&RWConflictPool->availableList);
1348 requestSize = mul_size((Size) max_table_size,
1349 RWConflictDataSize);
1350 RWConflictPool->element = ShmemAlloc(requestSize);
1351 if (RWConflictPool->element == NULL)
1352 ereport(ERROR,
1353 (errcode(ERRCODE_OUT_OF_MEMORY),
1354 errmsg("not enough shared memory for elements of data structure"
1355 " \"%s\" (%zu bytes requested)",
1356 "RWConflictPool", requestSize)));
1357 /* Add all elements to available list, clean. */
1358 memset(RWConflictPool->element, 0, requestSize);
1359 for (i = 0; i < max_table_size; i++)
1360 {
1361 SHMQueueInsertBefore(&(RWConflictPool->availableList),
1362 &(RWConflictPool->element[i].outLink));
1363 }
1364 }
1365
1366 /*
1367 * Create or attach to the header for the list of finished serializable
1368 * transactions.
1369 */
1370 FinishedSerializableTransactions = (SHM_QUEUE *)
1371 ShmemInitStruct("FinishedSerializableTransactions",
1372 sizeof(SHM_QUEUE),
1373 &found);
1374 Assert(found == IsUnderPostmaster);
1375 if (!found)
1376 SHMQueueInit(FinishedSerializableTransactions);
1377
1378 /*
1379 * Initialize the SLRU storage for old committed serializable
1380 * transactions.
1381 */
1382 OldSerXidInit();
1383 }
1384
1385 /*
1386 * Estimate shared-memory space used for predicate lock table
1387 */
1388 Size
PredicateLockShmemSize(void)1389 PredicateLockShmemSize(void)
1390 {
1391 Size size = 0;
1392 long max_table_size;
1393
1394 /* predicate lock target hash table */
1395 max_table_size = NPREDICATELOCKTARGETENTS();
1396 size = add_size(size, hash_estimate_size(max_table_size,
1397 sizeof(PREDICATELOCKTARGET)));
1398
1399 /* predicate lock hash table */
1400 max_table_size *= 2;
1401 size = add_size(size, hash_estimate_size(max_table_size,
1402 sizeof(PREDICATELOCK)));
1403
1404 /*
1405 * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
1406 * margin.
1407 */
1408 size = add_size(size, size / 10);
1409
1410 /* transaction list */
1411 max_table_size = MaxBackends + max_prepared_xacts;
1412 max_table_size *= 10;
1413 size = add_size(size, PredXactListDataSize);
1414 size = add_size(size, mul_size((Size) max_table_size,
1415 PredXactListElementDataSize));
1416
1417 /* transaction xid table */
1418 size = add_size(size, hash_estimate_size(max_table_size,
1419 sizeof(SERIALIZABLEXID)));
1420
1421 /* rw-conflict pool */
1422 max_table_size *= 5;
1423 size = add_size(size, RWConflictPoolHeaderDataSize);
1424 size = add_size(size, mul_size((Size) max_table_size,
1425 RWConflictDataSize));
1426
1427 /* Head for list of finished serializable transactions. */
1428 size = add_size(size, sizeof(SHM_QUEUE));
1429
1430 /* Shared memory structures for SLRU tracking of old committed xids. */
1431 size = add_size(size, sizeof(OldSerXidControlData));
1432 size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
1433
1434 return size;
1435 }
1436
1437
1438 /*
1439 * Compute the hash code associated with a PREDICATELOCKTAG.
1440 *
1441 * Because we want to use just one set of partition locks for both the
1442 * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
1443 * that PREDICATELOCKs fall into the same partition number as their
1444 * associated PREDICATELOCKTARGETs. dynahash.c expects the partition number
1445 * to be the low-order bits of the hash code, and therefore a
1446 * PREDICATELOCKTAG's hash code must have the same low-order bits as the
1447 * associated PREDICATELOCKTARGETTAG's hash code. We achieve this with this
1448 * specialized hash function.
1449 */
1450 static uint32
predicatelock_hash(const void * key,Size keysize)1451 predicatelock_hash(const void *key, Size keysize)
1452 {
1453 const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
1454 uint32 targethash;
1455
1456 Assert(keysize == sizeof(PREDICATELOCKTAG));
1457
1458 /* Look into the associated target object, and compute its hash code */
1459 targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
1460
1461 return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
1462 }
1463
1464
1465 /*
1466 * GetPredicateLockStatusData
1467 * Return a table containing the internal state of the predicate
1468 * lock manager for use in pg_lock_status.
1469 *
1470 * Like GetLockStatusData, this function tries to hold the partition LWLocks
1471 * for as short a time as possible by returning two arrays that simply
1472 * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
1473 * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
1474 * SERIALIZABLEXACT will likely appear.
1475 */
1476 PredicateLockData *
GetPredicateLockStatusData(void)1477 GetPredicateLockStatusData(void)
1478 {
1479 PredicateLockData *data;
1480 int i;
1481 int els,
1482 el;
1483 HASH_SEQ_STATUS seqstat;
1484 PREDICATELOCK *predlock;
1485
1486 data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
1487
1488 /*
1489 * To ensure consistency, take simultaneous locks on all partition locks
1490 * in ascending order, then SerializableXactHashLock.
1491 */
1492 for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
1493 LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
1494 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
1495
1496 /* Get number of locks and allocate appropriately-sized arrays. */
1497 els = hash_get_num_entries(PredicateLockHash);
1498 data->nelements = els;
1499 data->locktags = (PREDICATELOCKTARGETTAG *)
1500 palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
1501 data->xacts = (SERIALIZABLEXACT *)
1502 palloc(sizeof(SERIALIZABLEXACT) * els);
1503
1504
1505 /* Scan through PredicateLockHash and copy contents */
1506 hash_seq_init(&seqstat, PredicateLockHash);
1507
1508 el = 0;
1509
1510 while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
1511 {
1512 data->locktags[el] = predlock->tag.myTarget->tag;
1513 data->xacts[el] = *predlock->tag.myXact;
1514 el++;
1515 }
1516
1517 Assert(el == els);
1518
1519 /* Release locks in reverse order */
1520 LWLockRelease(SerializableXactHashLock);
1521 for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
1522 LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
1523
1524 return data;
1525 }
1526
1527 /*
1528 * Free up shared memory structures by pushing the oldest sxact (the one at
1529 * the front of the SummarizeOldestCommittedSxact queue) into summary form.
1530 * Each call will free exactly one SERIALIZABLEXACT structure and may also
1531 * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
1532 * PREDICATELOCKTARGET, RWConflictData.
1533 */
1534 static void
SummarizeOldestCommittedSxact(void)1535 SummarizeOldestCommittedSxact(void)
1536 {
1537 SERIALIZABLEXACT *sxact;
1538
1539 LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
1540
1541 /*
1542 * This function is only called if there are no sxact slots available.
1543 * Some of them must belong to old, already-finished transactions, so
1544 * there should be something in FinishedSerializableTransactions list that
1545 * we can summarize. However, there's a race condition: while we were not
1546 * holding any locks, a transaction might have ended and cleaned up all
1547 * the finished sxact entries already, freeing up their sxact slots. In
1548 * that case, we have nothing to do here. The caller will find one of the
1549 * slots released by the other backend when it retries.
1550 */
1551 if (SHMQueueEmpty(FinishedSerializableTransactions))
1552 {
1553 LWLockRelease(SerializableFinishedListLock);
1554 return;
1555 }
1556
1557 /*
1558 * Grab the first sxact off the finished list -- this will be the earliest
1559 * commit. Remove it from the list.
1560 */
1561 sxact = (SERIALIZABLEXACT *)
1562 SHMQueueNext(FinishedSerializableTransactions,
1563 FinishedSerializableTransactions,
1564 offsetof(SERIALIZABLEXACT, finishedLink));
1565 SHMQueueDelete(&(sxact->finishedLink));
1566
1567 /* Add to SLRU summary information. */
1568 if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
1569 OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
1570 ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
1571
1572 /* Summarize and release the detail. */
1573 ReleaseOneSerializableXact(sxact, false, true);
1574
1575 LWLockRelease(SerializableFinishedListLock);
1576 }
1577
1578 /*
1579 * GetSafeSnapshot
1580 * Obtain and register a snapshot for a READ ONLY DEFERRABLE
1581 * transaction. Ensures that the snapshot is "safe", i.e. a
1582 * read-only transaction running on it can execute serializably
1583 * without further checks. This requires waiting for concurrent
1584 * transactions to complete, and retrying with a new snapshot if
1585 * one of them could possibly create a conflict.
1586 *
1587 * As with GetSerializableTransactionSnapshot (which this is a subroutine
1588 * for), the passed-in Snapshot pointer should reference a static data
1589 * area that can safely be passed to GetSnapshotData.
1590 */
1591 static Snapshot
GetSafeSnapshot(Snapshot origSnapshot)1592 GetSafeSnapshot(Snapshot origSnapshot)
1593 {
1594 Snapshot snapshot;
1595
1596 Assert(XactReadOnly && XactDeferrable);
1597
1598 while (true)
1599 {
1600 /*
1601 * GetSerializableTransactionSnapshotInt is going to call
1602 * GetSnapshotData, so we need to provide it the static snapshot area
1603 * our caller passed to us. The pointer returned is actually the same
1604 * one passed to it, but we avoid assuming that here.
1605 */
1606 snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
1607 InvalidTransactionId);
1608
1609 if (MySerializableXact == InvalidSerializableXact)
1610 return snapshot; /* no concurrent r/w xacts; it's safe */
1611
1612 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1613
1614 /*
1615 * Wait for concurrent transactions to finish. Stop early if one of
1616 * them marked us as conflicted.
1617 */
1618 MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
1619 while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
1620 SxactIsROUnsafe(MySerializableXact)))
1621 {
1622 LWLockRelease(SerializableXactHashLock);
1623 ProcWaitForSignal();
1624 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1625 }
1626 MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
1627
1628 if (!SxactIsROUnsafe(MySerializableXact))
1629 {
1630 LWLockRelease(SerializableXactHashLock);
1631 break; /* success */
1632 }
1633
1634 LWLockRelease(SerializableXactHashLock);
1635
1636 /* else, need to retry... */
1637 ereport(DEBUG2,
1638 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
1639 errmsg("deferrable snapshot was unsafe; trying a new one")));
1640 ReleasePredicateLocks(false);
1641 }
1642
1643 /*
1644 * Now we have a safe snapshot, so we don't need to do any further checks.
1645 */
1646 Assert(SxactIsROSafe(MySerializableXact));
1647 ReleasePredicateLocks(false);
1648
1649 return snapshot;
1650 }
1651
1652 /*
1653 * Acquire a snapshot that can be used for the current transaction.
1654 *
1655 * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
1656 * It should be current for this process and be contained in PredXact.
1657 *
1658 * The passed-in Snapshot pointer should reference a static data area that
1659 * can safely be passed to GetSnapshotData. The return value is actually
1660 * always this same pointer; no new snapshot data structure is allocated
1661 * within this function.
1662 */
1663 Snapshot
GetSerializableTransactionSnapshot(Snapshot snapshot)1664 GetSerializableTransactionSnapshot(Snapshot snapshot)
1665 {
1666 Assert(IsolationIsSerializable());
1667
1668 /*
1669 * Can't use serializable mode while recovery is still active, as it is,
1670 * for example, on a hot standby. We could get here despite the check in
1671 * check_XactIsoLevel() if default_transaction_isolation is set to
1672 * serializable, so phrase the hint accordingly.
1673 */
1674 if (RecoveryInProgress())
1675 ereport(ERROR,
1676 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1677 errmsg("cannot use serializable mode in a hot standby"),
1678 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
1679 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
1680
1681 /*
1682 * A special optimization is available for SERIALIZABLE READ ONLY
1683 * DEFERRABLE transactions -- we can wait for a suitable snapshot and
1684 * thereby avoid all SSI overhead once it's running.
1685 */
1686 if (XactReadOnly && XactDeferrable)
1687 return GetSafeSnapshot(snapshot);
1688
1689 return GetSerializableTransactionSnapshotInt(snapshot,
1690 InvalidTransactionId);
1691 }
1692
1693 /*
1694 * Import a snapshot to be used for the current transaction.
1695 *
1696 * This is nearly the same as GetSerializableTransactionSnapshot, except that
1697 * we don't take a new snapshot, but rather use the data we're handed.
1698 *
1699 * The caller must have verified that the snapshot came from a serializable
1700 * transaction; and if we're read-write, the source transaction must not be
1701 * read-only.
1702 */
1703 void
SetSerializableTransactionSnapshot(Snapshot snapshot,TransactionId sourcexid)1704 SetSerializableTransactionSnapshot(Snapshot snapshot,
1705 TransactionId sourcexid)
1706 {
1707 Assert(IsolationIsSerializable());
1708
1709 /*
1710 * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
1711 * import snapshots, since there's no way to wait for a safe snapshot when
1712 * we're using the snap we're told to. (XXX instead of throwing an error,
1713 * we could just ignore the XactDeferrable flag?)
1714 */
1715 if (XactReadOnly && XactDeferrable)
1716 ereport(ERROR,
1717 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1718 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
1719
1720 (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
1721 }
1722
1723 /*
1724 * Guts of GetSerializableTransactionSnapshot
1725 *
1726 * If sourcexid is valid, this is actually an import operation and we should
1727 * skip calling GetSnapshotData, because the snapshot contents are already
1728 * loaded up. HOWEVER: to avoid race conditions, we must check that the
1729 * source xact is still running after we acquire SerializableXactHashLock.
1730 * We do that by calling ProcArrayInstallImportedXmin.
1731 */
1732 static Snapshot
GetSerializableTransactionSnapshotInt(Snapshot snapshot,TransactionId sourcexid)1733 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
1734 TransactionId sourcexid)
1735 {
1736 PGPROC *proc;
1737 VirtualTransactionId vxid;
1738 SERIALIZABLEXACT *sxact,
1739 *othersxact;
1740 HASHCTL hash_ctl;
1741
1742 /* We only do this for serializable transactions. Once. */
1743 Assert(MySerializableXact == InvalidSerializableXact);
1744
1745 Assert(!RecoveryInProgress());
1746
1747 /*
1748 * Since all parts of a serializable transaction must use the same
1749 * snapshot, it is too late to establish one after a parallel operation
1750 * has begun.
1751 */
1752 if (IsInParallelMode())
1753 elog(ERROR, "cannot establish serializable snapshot during a parallel operation");
1754
1755 proc = MyProc;
1756 Assert(proc != NULL);
1757 GET_VXID_FROM_PGPROC(vxid, *proc);
1758
1759 /*
1760 * First we get the sxact structure, which may involve looping and access
1761 * to the "finished" list to free a structure for use.
1762 *
1763 * We must hold SerializableXactHashLock when taking/checking the snapshot
1764 * to avoid race conditions, for much the same reasons that
1765 * GetSnapshotData takes the ProcArrayLock. Since we might have to
1766 * release SerializableXactHashLock to call SummarizeOldestCommittedSxact,
1767 * this means we have to create the sxact first, which is a bit annoying
1768 * (in particular, an elog(ERROR) in procarray.c would cause us to leak
1769 * the sxact). Consider refactoring to avoid this.
1770 */
1771 #ifdef TEST_OLDSERXID
1772 SummarizeOldestCommittedSxact();
1773 #endif
1774 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1775 do
1776 {
1777 sxact = CreatePredXact();
1778 /* If null, push out committed sxact to SLRU summary & retry. */
1779 if (!sxact)
1780 {
1781 LWLockRelease(SerializableXactHashLock);
1782 SummarizeOldestCommittedSxact();
1783 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1784 }
1785 } while (!sxact);
1786
1787 /* Get the snapshot, or check that it's safe to use */
1788 if (!TransactionIdIsValid(sourcexid))
1789 snapshot = GetSnapshotData(snapshot);
1790 else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
1791 {
1792 ReleasePredXact(sxact);
1793 LWLockRelease(SerializableXactHashLock);
1794 ereport(ERROR,
1795 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1796 errmsg("could not import the requested snapshot"),
1797 errdetail("The source transaction %u is not running anymore.",
1798 sourcexid)));
1799 }
1800
1801 /*
1802 * If there are no serializable transactions which are not read-only, we
1803 * can "opt out" of predicate locking and conflict checking for a
1804 * read-only transaction.
1805 *
1806 * The reason this is safe is that a read-only transaction can only become
1807 * part of a dangerous structure if it overlaps a writable transaction
1808 * which in turn overlaps a writable transaction which committed before
1809 * the read-only transaction started. A new writable transaction can
1810 * overlap this one, but it can't meet the other condition of overlapping
1811 * a transaction which committed before this one started.
1812 */
1813 if (XactReadOnly && PredXact->WritableSxactCount == 0)
1814 {
1815 ReleasePredXact(sxact);
1816 LWLockRelease(SerializableXactHashLock);
1817 return snapshot;
1818 }
1819
1820 /* Maintain serializable global xmin info. */
1821 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
1822 {
1823 Assert(PredXact->SxactGlobalXminCount == 0);
1824 PredXact->SxactGlobalXmin = snapshot->xmin;
1825 PredXact->SxactGlobalXminCount = 1;
1826 OldSerXidSetActiveSerXmin(snapshot->xmin);
1827 }
1828 else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
1829 {
1830 Assert(PredXact->SxactGlobalXminCount > 0);
1831 PredXact->SxactGlobalXminCount++;
1832 }
1833 else
1834 {
1835 Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
1836 }
1837
1838 /* Initialize the structure. */
1839 sxact->vxid = vxid;
1840 sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
1841 sxact->prepareSeqNo = InvalidSerCommitSeqNo;
1842 sxact->commitSeqNo = InvalidSerCommitSeqNo;
1843 SHMQueueInit(&(sxact->outConflicts));
1844 SHMQueueInit(&(sxact->inConflicts));
1845 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
1846 sxact->topXid = GetTopTransactionIdIfAny();
1847 sxact->finishedBefore = InvalidTransactionId;
1848 sxact->xmin = snapshot->xmin;
1849 sxact->pid = MyProcPid;
1850 SHMQueueInit(&(sxact->predicateLocks));
1851 SHMQueueElemInit(&(sxact->finishedLink));
1852 sxact->flags = 0;
1853 if (XactReadOnly)
1854 {
1855 sxact->flags |= SXACT_FLAG_READ_ONLY;
1856
1857 /*
1858 * Register all concurrent r/w transactions as possible conflicts; if
1859 * all of them commit without any outgoing conflicts to earlier
1860 * transactions then this snapshot can be deemed safe (and we can run
1861 * without tracking predicate locks).
1862 */
1863 for (othersxact = FirstPredXact();
1864 othersxact != NULL;
1865 othersxact = NextPredXact(othersxact))
1866 {
1867 if (!SxactIsCommitted(othersxact)
1868 && !SxactIsDoomed(othersxact)
1869 && !SxactIsReadOnly(othersxact))
1870 {
1871 SetPossibleUnsafeConflict(sxact, othersxact);
1872 }
1873 }
1874 }
1875 else
1876 {
1877 ++(PredXact->WritableSxactCount);
1878 Assert(PredXact->WritableSxactCount <=
1879 (MaxBackends + max_prepared_xacts));
1880 }
1881
1882 MySerializableXact = sxact;
1883 MyXactDidWrite = false; /* haven't written anything yet */
1884
1885 LWLockRelease(SerializableXactHashLock);
1886
1887 /* Initialize the backend-local hash table of parent locks */
1888 Assert(LocalPredicateLockHash == NULL);
1889 MemSet(&hash_ctl, 0, sizeof(hash_ctl));
1890 hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
1891 hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
1892 LocalPredicateLockHash = hash_create("Local predicate lock",
1893 max_predicate_locks_per_xact,
1894 &hash_ctl,
1895 HASH_ELEM | HASH_BLOBS);
1896
1897 return snapshot;
1898 }
1899
1900 /*
1901 * Register the top level XID in SerializableXidHash.
1902 * Also store it for easy reference in MySerializableXact.
1903 */
1904 void
RegisterPredicateLockingXid(TransactionId xid)1905 RegisterPredicateLockingXid(TransactionId xid)
1906 {
1907 SERIALIZABLEXIDTAG sxidtag;
1908 SERIALIZABLEXID *sxid;
1909 bool found;
1910
1911 /*
1912 * If we're not tracking predicate lock data for this transaction, we
1913 * should ignore the request and return quickly.
1914 */
1915 if (MySerializableXact == InvalidSerializableXact)
1916 return;
1917
1918 /* We should have a valid XID and be at the top level. */
1919 Assert(TransactionIdIsValid(xid));
1920
1921 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1922
1923 /* This should only be done once per transaction. */
1924 Assert(MySerializableXact->topXid == InvalidTransactionId);
1925
1926 MySerializableXact->topXid = xid;
1927
1928 sxidtag.xid = xid;
1929 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
1930 &sxidtag,
1931 HASH_ENTER, &found);
1932 Assert(!found);
1933
1934 /* Initialize the structure. */
1935 sxid->myXact = MySerializableXact;
1936 LWLockRelease(SerializableXactHashLock);
1937 }
1938
1939
1940 /*
1941 * Check whether there are any predicate locks held by any transaction
1942 * for the page at the given block number.
1943 *
1944 * Note that the transaction may be completed but not yet subject to
1945 * cleanup due to overlapping serializable transactions. This must
1946 * return valid information regardless of transaction isolation level.
1947 *
1948 * Also note that this doesn't check for a conflicting relation lock,
1949 * just a lock specifically on the given page.
1950 *
1951 * One use is to support proper behavior during GiST index vacuum.
1952 */
1953 bool
PageIsPredicateLocked(Relation relation,BlockNumber blkno)1954 PageIsPredicateLocked(Relation relation, BlockNumber blkno)
1955 {
1956 PREDICATELOCKTARGETTAG targettag;
1957 uint32 targettaghash;
1958 LWLock *partitionLock;
1959 PREDICATELOCKTARGET *target;
1960
1961 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
1962 relation->rd_node.dbNode,
1963 relation->rd_id,
1964 blkno);
1965
1966 targettaghash = PredicateLockTargetTagHashCode(&targettag);
1967 partitionLock = PredicateLockHashPartitionLock(targettaghash);
1968 LWLockAcquire(partitionLock, LW_SHARED);
1969 target = (PREDICATELOCKTARGET *)
1970 hash_search_with_hash_value(PredicateLockTargetHash,
1971 &targettag, targettaghash,
1972 HASH_FIND, NULL);
1973 LWLockRelease(partitionLock);
1974
1975 return (target != NULL);
1976 }
1977
1978
1979 /*
1980 * Check whether a particular lock is held by this transaction.
1981 *
1982 * Important note: this function may return false even if the lock is
1983 * being held, because it uses the local lock table which is not
1984 * updated if another transaction modifies our lock list (e.g. to
1985 * split an index page). It can also return true when a coarser
1986 * granularity lock that covers this target is being held. Be careful
1987 * to only use this function in circumstances where such errors are
1988 * acceptable!
1989 */
1990 static bool
PredicateLockExists(const PREDICATELOCKTARGETTAG * targettag)1991 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
1992 {
1993 LOCALPREDICATELOCK *lock;
1994
1995 /* check local hash table */
1996 lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1997 targettag,
1998 HASH_FIND, NULL);
1999
2000 if (!lock)
2001 return false;
2002
2003 /*
2004 * Found entry in the table, but still need to check whether it's actually
2005 * held -- it could just be a parent of some held lock.
2006 */
2007 return lock->held;
2008 }
2009
2010 /*
2011 * Return the parent lock tag in the lock hierarchy: the next coarser
2012 * lock that covers the provided tag.
2013 *
2014 * Returns true and sets *parent to the parent tag if one exists,
2015 * returns false if none exists.
2016 */
2017 static bool
GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG * tag,PREDICATELOCKTARGETTAG * parent)2018 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
2019 PREDICATELOCKTARGETTAG *parent)
2020 {
2021 switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
2022 {
2023 case PREDLOCKTAG_RELATION:
2024 /* relation locks have no parent lock */
2025 return false;
2026
2027 case PREDLOCKTAG_PAGE:
2028 /* parent lock is relation lock */
2029 SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
2030 GET_PREDICATELOCKTARGETTAG_DB(*tag),
2031 GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
2032
2033 return true;
2034
2035 case PREDLOCKTAG_TUPLE:
2036 /* parent lock is page lock */
2037 SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
2038 GET_PREDICATELOCKTARGETTAG_DB(*tag),
2039 GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
2040 GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
2041 return true;
2042 }
2043
2044 /* not reachable */
2045 Assert(false);
2046 return false;
2047 }
2048
2049 /*
2050 * Check whether the lock we are considering is already covered by a
2051 * coarser lock for our transaction.
2052 *
2053 * Like PredicateLockExists, this function might return a false
2054 * negative, but it will never return a false positive.
2055 */
2056 static bool
CoarserLockCovers(const PREDICATELOCKTARGETTAG * newtargettag)2057 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
2058 {
2059 PREDICATELOCKTARGETTAG targettag,
2060 parenttag;
2061
2062 targettag = *newtargettag;
2063
2064 /* check parents iteratively until no more */
2065 while (GetParentPredicateLockTag(&targettag, &parenttag))
2066 {
2067 targettag = parenttag;
2068 if (PredicateLockExists(&targettag))
2069 return true;
2070 }
2071
2072 /* no more parents to check; lock is not covered */
2073 return false;
2074 }
2075
2076 /*
2077 * Remove the dummy entry from the predicate lock target hash, to free up some
2078 * scratch space. The caller must be holding SerializablePredicateLockListLock,
2079 * and must restore the entry with RestoreScratchTarget() before releasing the
2080 * lock.
2081 *
2082 * If lockheld is true, the caller is already holding the partition lock
2083 * of the partition containing the scratch entry.
2084 */
2085 static void
RemoveScratchTarget(bool lockheld)2086 RemoveScratchTarget(bool lockheld)
2087 {
2088 bool found;
2089
2090 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2091
2092 if (!lockheld)
2093 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
2094 hash_search_with_hash_value(PredicateLockTargetHash,
2095 &ScratchTargetTag,
2096 ScratchTargetTagHash,
2097 HASH_REMOVE, &found);
2098 Assert(found);
2099 if (!lockheld)
2100 LWLockRelease(ScratchPartitionLock);
2101 }
2102
2103 /*
2104 * Re-insert the dummy entry in predicate lock target hash.
2105 */
2106 static void
RestoreScratchTarget(bool lockheld)2107 RestoreScratchTarget(bool lockheld)
2108 {
2109 bool found;
2110
2111 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2112
2113 if (!lockheld)
2114 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
2115 hash_search_with_hash_value(PredicateLockTargetHash,
2116 &ScratchTargetTag,
2117 ScratchTargetTagHash,
2118 HASH_ENTER, &found);
2119 Assert(!found);
2120 if (!lockheld)
2121 LWLockRelease(ScratchPartitionLock);
2122 }
2123
2124 /*
2125 * Check whether the list of related predicate locks is empty for a
2126 * predicate lock target, and remove the target if it is.
2127 */
2128 static void
RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET * target,uint32 targettaghash)2129 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
2130 {
2131 PREDICATELOCKTARGET *rmtarget PG_USED_FOR_ASSERTS_ONLY;
2132
2133 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2134
2135 /* Can't remove it until no locks at this target. */
2136 if (!SHMQueueEmpty(&target->predicateLocks))
2137 return;
2138
2139 /* Actually remove the target. */
2140 rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2141 &target->tag,
2142 targettaghash,
2143 HASH_REMOVE, NULL);
2144 Assert(rmtarget == target);
2145 }
2146
2147 /*
2148 * Delete child target locks owned by this process.
2149 * This implementation is assuming that the usage of each target tag field
2150 * is uniform. No need to make this hard if we don't have to.
2151 *
2152 * We aren't acquiring lightweight locks for the predicate lock or lock
2153 * target structures associated with this transaction unless we're going
2154 * to modify them, because no other process is permitted to modify our
2155 * locks.
2156 */
2157 static void
DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG * newtargettag)2158 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
2159 {
2160 SERIALIZABLEXACT *sxact;
2161 PREDICATELOCK *predlock;
2162
2163 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2164 sxact = MySerializableXact;
2165 predlock = (PREDICATELOCK *)
2166 SHMQueueNext(&(sxact->predicateLocks),
2167 &(sxact->predicateLocks),
2168 offsetof(PREDICATELOCK, xactLink));
2169 while (predlock)
2170 {
2171 SHM_QUEUE *predlocksxactlink;
2172 PREDICATELOCK *nextpredlock;
2173 PREDICATELOCKTAG oldlocktag;
2174 PREDICATELOCKTARGET *oldtarget;
2175 PREDICATELOCKTARGETTAG oldtargettag;
2176
2177 predlocksxactlink = &(predlock->xactLink);
2178 nextpredlock = (PREDICATELOCK *)
2179 SHMQueueNext(&(sxact->predicateLocks),
2180 predlocksxactlink,
2181 offsetof(PREDICATELOCK, xactLink));
2182
2183 oldlocktag = predlock->tag;
2184 Assert(oldlocktag.myXact == sxact);
2185 oldtarget = oldlocktag.myTarget;
2186 oldtargettag = oldtarget->tag;
2187
2188 if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
2189 {
2190 uint32 oldtargettaghash;
2191 LWLock *partitionLock;
2192 PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
2193
2194 oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2195 partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2196
2197 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2198
2199 SHMQueueDelete(predlocksxactlink);
2200 SHMQueueDelete(&(predlock->targetLink));
2201 rmpredlock = hash_search_with_hash_value
2202 (PredicateLockHash,
2203 &oldlocktag,
2204 PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
2205 oldtargettaghash),
2206 HASH_REMOVE, NULL);
2207 Assert(rmpredlock == predlock);
2208
2209 RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2210
2211 LWLockRelease(partitionLock);
2212
2213 DecrementParentLocks(&oldtargettag);
2214 }
2215
2216 predlock = nextpredlock;
2217 }
2218 LWLockRelease(SerializablePredicateLockListLock);
2219 }
2220
2221 /*
2222 * Returns the promotion threshold for a given predicate lock
2223 * target. This is the number of descendant locks required to promote
2224 * to the specified tag. Note that the threshold includes non-direct
2225 * descendants, e.g. both tuples and pages for a relation lock.
2226 *
2227 * TODO SSI: We should do something more intelligent about what the
2228 * thresholds are, either making it proportional to the number of
2229 * tuples in a page & pages in a relation, or at least making it a
2230 * GUC. Currently the threshold is 3 for a page lock, and
2231 * max_pred_locks_per_transaction/2 for a relation lock, chosen
2232 * entirely arbitrarily (and without benchmarking).
2233 */
2234 static int
PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG * tag)2235 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
2236 {
2237 switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
2238 {
2239 case PREDLOCKTAG_RELATION:
2240 return max_predicate_locks_per_xact / 2;
2241
2242 case PREDLOCKTAG_PAGE:
2243 return 3;
2244
2245 case PREDLOCKTAG_TUPLE:
2246
2247 /*
2248 * not reachable: nothing is finer-granularity than a tuple, so we
2249 * should never try to promote to it.
2250 */
2251 Assert(false);
2252 return 0;
2253 }
2254
2255 /* not reachable */
2256 Assert(false);
2257 return 0;
2258 }
2259
2260 /*
2261 * For all ancestors of a newly-acquired predicate lock, increment
2262 * their child count in the parent hash table. If any of them have
2263 * more descendants than their promotion threshold, acquire the
2264 * coarsest such lock.
2265 *
2266 * Returns true if a parent lock was acquired and false otherwise.
2267 */
2268 static bool
CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG * reqtag)2269 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
2270 {
2271 PREDICATELOCKTARGETTAG targettag,
2272 nexttag,
2273 promotiontag;
2274 LOCALPREDICATELOCK *parentlock;
2275 bool found,
2276 promote;
2277
2278 promote = false;
2279
2280 targettag = *reqtag;
2281
2282 /* check parents iteratively */
2283 while (GetParentPredicateLockTag(&targettag, &nexttag))
2284 {
2285 targettag = nexttag;
2286 parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
2287 &targettag,
2288 HASH_ENTER,
2289 &found);
2290 if (!found)
2291 {
2292 parentlock->held = false;
2293 parentlock->childLocks = 1;
2294 }
2295 else
2296 parentlock->childLocks++;
2297
2298 if (parentlock->childLocks >=
2299 PredicateLockPromotionThreshold(&targettag))
2300 {
2301 /*
2302 * We should promote to this parent lock. Continue to check its
2303 * ancestors, however, both to get their child counts right and to
2304 * check whether we should just go ahead and promote to one of
2305 * them.
2306 */
2307 promotiontag = targettag;
2308 promote = true;
2309 }
2310 }
2311
2312 if (promote)
2313 {
2314 /* acquire coarsest ancestor eligible for promotion */
2315 PredicateLockAcquire(&promotiontag);
2316 return true;
2317 }
2318 else
2319 return false;
2320 }
2321
2322 /*
2323 * When releasing a lock, decrement the child count on all ancestor
2324 * locks.
2325 *
2326 * This is called only when releasing a lock via
2327 * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
2328 * we've acquired its parent, possibly due to promotion) or when a new
2329 * MVCC write lock makes the predicate lock unnecessary. There's no
2330 * point in calling it when locks are released at transaction end, as
2331 * this information is no longer needed.
2332 */
2333 static void
DecrementParentLocks(const PREDICATELOCKTARGETTAG * targettag)2334 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
2335 {
2336 PREDICATELOCKTARGETTAG parenttag,
2337 nexttag;
2338
2339 parenttag = *targettag;
2340
2341 while (GetParentPredicateLockTag(&parenttag, &nexttag))
2342 {
2343 uint32 targettaghash;
2344 LOCALPREDICATELOCK *parentlock,
2345 *rmlock PG_USED_FOR_ASSERTS_ONLY;
2346
2347 parenttag = nexttag;
2348 targettaghash = PredicateLockTargetTagHashCode(&parenttag);
2349 parentlock = (LOCALPREDICATELOCK *)
2350 hash_search_with_hash_value(LocalPredicateLockHash,
2351 &parenttag, targettaghash,
2352 HASH_FIND, NULL);
2353
2354 /*
2355 * There's a small chance the parent lock doesn't exist in the lock
2356 * table. This can happen if we prematurely removed it because an
2357 * index split caused the child refcount to be off.
2358 */
2359 if (parentlock == NULL)
2360 continue;
2361
2362 parentlock->childLocks--;
2363
2364 /*
2365 * Under similar circumstances the parent lock's refcount might be
2366 * zero. This only happens if we're holding that lock (otherwise we
2367 * would have removed the entry).
2368 */
2369 if (parentlock->childLocks < 0)
2370 {
2371 Assert(parentlock->held);
2372 parentlock->childLocks = 0;
2373 }
2374
2375 if ((parentlock->childLocks == 0) && (!parentlock->held))
2376 {
2377 rmlock = (LOCALPREDICATELOCK *)
2378 hash_search_with_hash_value(LocalPredicateLockHash,
2379 &parenttag, targettaghash,
2380 HASH_REMOVE, NULL);
2381 Assert(rmlock == parentlock);
2382 }
2383 }
2384 }
2385
2386 /*
2387 * Indicate that a predicate lock on the given target is held by the
2388 * specified transaction. Has no effect if the lock is already held.
2389 *
2390 * This updates the lock table and the sxact's lock list, and creates
2391 * the lock target if necessary, but does *not* do anything related to
2392 * granularity promotion or the local lock table. See
2393 * PredicateLockAcquire for that.
2394 */
2395 static void
CreatePredicateLock(const PREDICATELOCKTARGETTAG * targettag,uint32 targettaghash,SERIALIZABLEXACT * sxact)2396 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
2397 uint32 targettaghash,
2398 SERIALIZABLEXACT *sxact)
2399 {
2400 PREDICATELOCKTARGET *target;
2401 PREDICATELOCKTAG locktag;
2402 PREDICATELOCK *lock;
2403 LWLock *partitionLock;
2404 bool found;
2405
2406 partitionLock = PredicateLockHashPartitionLock(targettaghash);
2407
2408 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2409 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2410
2411 /* Make sure that the target is represented. */
2412 target = (PREDICATELOCKTARGET *)
2413 hash_search_with_hash_value(PredicateLockTargetHash,
2414 targettag, targettaghash,
2415 HASH_ENTER_NULL, &found);
2416 if (!target)
2417 ereport(ERROR,
2418 (errcode(ERRCODE_OUT_OF_MEMORY),
2419 errmsg("out of shared memory"),
2420 errhint("You might need to increase max_pred_locks_per_transaction.")));
2421 if (!found)
2422 SHMQueueInit(&(target->predicateLocks));
2423
2424 /* We've got the sxact and target, make sure they're joined. */
2425 locktag.myTarget = target;
2426 locktag.myXact = sxact;
2427 lock = (PREDICATELOCK *)
2428 hash_search_with_hash_value(PredicateLockHash, &locktag,
2429 PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
2430 HASH_ENTER_NULL, &found);
2431 if (!lock)
2432 ereport(ERROR,
2433 (errcode(ERRCODE_OUT_OF_MEMORY),
2434 errmsg("out of shared memory"),
2435 errhint("You might need to increase max_pred_locks_per_transaction.")));
2436
2437 if (!found)
2438 {
2439 SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
2440 SHMQueueInsertBefore(&(sxact->predicateLocks),
2441 &(lock->xactLink));
2442 lock->commitSeqNo = InvalidSerCommitSeqNo;
2443 }
2444
2445 LWLockRelease(partitionLock);
2446 LWLockRelease(SerializablePredicateLockListLock);
2447 }
2448
2449 /*
2450 * Acquire a predicate lock on the specified target for the current
2451 * connection if not already held. This updates the local lock table
2452 * and uses it to implement granularity promotion. It will consolidate
2453 * multiple locks into a coarser lock if warranted, and will release
2454 * any finer-grained locks covered by the new one.
2455 */
2456 static void
PredicateLockAcquire(const PREDICATELOCKTARGETTAG * targettag)2457 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
2458 {
2459 uint32 targettaghash;
2460 bool found;
2461 LOCALPREDICATELOCK *locallock;
2462
2463 /* Do we have the lock already, or a covering lock? */
2464 if (PredicateLockExists(targettag))
2465 return;
2466
2467 if (CoarserLockCovers(targettag))
2468 return;
2469
2470 /* the same hash and LW lock apply to the lock target and the local lock. */
2471 targettaghash = PredicateLockTargetTagHashCode(targettag);
2472
2473 /* Acquire lock in local table */
2474 locallock = (LOCALPREDICATELOCK *)
2475 hash_search_with_hash_value(LocalPredicateLockHash,
2476 targettag, targettaghash,
2477 HASH_ENTER, &found);
2478 locallock->held = true;
2479 if (!found)
2480 locallock->childLocks = 0;
2481
2482 /* Actually create the lock */
2483 CreatePredicateLock(targettag, targettaghash, MySerializableXact);
2484
2485 /*
2486 * Lock has been acquired. Check whether it should be promoted to a
2487 * coarser granularity, or whether there are finer-granularity locks to
2488 * clean up.
2489 */
2490 if (CheckAndPromotePredicateLockRequest(targettag))
2491 {
2492 /*
2493 * Lock request was promoted to a coarser-granularity lock, and that
2494 * lock was acquired. It will delete this lock and any of its
2495 * children, so we're done.
2496 */
2497 }
2498 else
2499 {
2500 /* Clean up any finer-granularity locks */
2501 if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
2502 DeleteChildTargetLocks(targettag);
2503 }
2504 }
2505
2506
2507 /*
2508 * PredicateLockRelation
2509 *
2510 * Gets a predicate lock at the relation level.
2511 * Skip if not in full serializable transaction isolation level.
2512 * Skip if this is a temporary table.
2513 * Clear any finer-grained predicate locks this session has on the relation.
2514 */
2515 void
PredicateLockRelation(Relation relation,Snapshot snapshot)2516 PredicateLockRelation(Relation relation, Snapshot snapshot)
2517 {
2518 PREDICATELOCKTARGETTAG tag;
2519
2520 if (!SerializationNeededForRead(relation, snapshot))
2521 return;
2522
2523 SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2524 relation->rd_node.dbNode,
2525 relation->rd_id);
2526 PredicateLockAcquire(&tag);
2527 }
2528
2529 /*
2530 * PredicateLockPage
2531 *
2532 * Gets a predicate lock at the page level.
2533 * Skip if not in full serializable transaction isolation level.
2534 * Skip if this is a temporary table.
2535 * Skip if a coarser predicate lock already covers this page.
2536 * Clear any finer-grained predicate locks this session has on the relation.
2537 */
2538 void
PredicateLockPage(Relation relation,BlockNumber blkno,Snapshot snapshot)2539 PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
2540 {
2541 PREDICATELOCKTARGETTAG tag;
2542
2543 if (!SerializationNeededForRead(relation, snapshot))
2544 return;
2545
2546 SET_PREDICATELOCKTARGETTAG_PAGE(tag,
2547 relation->rd_node.dbNode,
2548 relation->rd_id,
2549 blkno);
2550 PredicateLockAcquire(&tag);
2551 }
2552
2553 /*
2554 * PredicateLockTuple
2555 *
2556 * Gets a predicate lock at the tuple level.
2557 * Skip if not in full serializable transaction isolation level.
2558 * Skip if this is a temporary table.
2559 */
2560 void
PredicateLockTuple(Relation relation,HeapTuple tuple,Snapshot snapshot)2561 PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot)
2562 {
2563 PREDICATELOCKTARGETTAG tag;
2564 ItemPointer tid;
2565 TransactionId targetxmin;
2566
2567 if (!SerializationNeededForRead(relation, snapshot))
2568 return;
2569
2570 /*
2571 * If it's a heap tuple, return if this xact wrote it.
2572 */
2573 if (relation->rd_index == NULL)
2574 {
2575 TransactionId myxid;
2576
2577 targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
2578
2579 myxid = GetTopTransactionIdIfAny();
2580 if (TransactionIdIsValid(myxid))
2581 {
2582 if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
2583 {
2584 TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
2585
2586 if (TransactionIdEquals(xid, myxid))
2587 {
2588 /* We wrote it; we already have a write lock. */
2589 return;
2590 }
2591 }
2592 }
2593 }
2594
2595 /*
2596 * Do quick-but-not-definitive test for a relation lock first. This will
2597 * never cause a return when the relation is *not* locked, but will
2598 * occasionally let the check continue when there really *is* a relation
2599 * level lock.
2600 */
2601 SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2602 relation->rd_node.dbNode,
2603 relation->rd_id);
2604 if (PredicateLockExists(&tag))
2605 return;
2606
2607 tid = &(tuple->t_self);
2608 SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
2609 relation->rd_node.dbNode,
2610 relation->rd_id,
2611 ItemPointerGetBlockNumber(tid),
2612 ItemPointerGetOffsetNumber(tid));
2613 PredicateLockAcquire(&tag);
2614 }
2615
2616
2617 /*
2618 * DeleteLockTarget
2619 *
2620 * Remove a predicate lock target along with any locks held for it.
2621 *
2622 * Caller must hold SerializablePredicateLockListLock and the
2623 * appropriate hash partition lock for the target.
2624 */
2625 static void
DeleteLockTarget(PREDICATELOCKTARGET * target,uint32 targettaghash)2626 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
2627 {
2628 PREDICATELOCK *predlock;
2629 SHM_QUEUE *predlocktargetlink;
2630 PREDICATELOCK *nextpredlock;
2631 bool found;
2632
2633 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2634 Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
2635
2636 predlock = (PREDICATELOCK *)
2637 SHMQueueNext(&(target->predicateLocks),
2638 &(target->predicateLocks),
2639 offsetof(PREDICATELOCK, targetLink));
2640 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2641 while (predlock)
2642 {
2643 predlocktargetlink = &(predlock->targetLink);
2644 nextpredlock = (PREDICATELOCK *)
2645 SHMQueueNext(&(target->predicateLocks),
2646 predlocktargetlink,
2647 offsetof(PREDICATELOCK, targetLink));
2648
2649 SHMQueueDelete(&(predlock->xactLink));
2650 SHMQueueDelete(&(predlock->targetLink));
2651
2652 hash_search_with_hash_value
2653 (PredicateLockHash,
2654 &predlock->tag,
2655 PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
2656 targettaghash),
2657 HASH_REMOVE, &found);
2658 Assert(found);
2659
2660 predlock = nextpredlock;
2661 }
2662 LWLockRelease(SerializableXactHashLock);
2663
2664 /* Remove the target itself, if possible. */
2665 RemoveTargetIfNoLongerUsed(target, targettaghash);
2666 }
2667
2668
2669 /*
2670 * TransferPredicateLocksToNewTarget
2671 *
2672 * Move or copy all the predicate locks for a lock target, for use by
2673 * index page splits/combines and other things that create or replace
2674 * lock targets. If 'removeOld' is true, the old locks and the target
2675 * will be removed.
2676 *
2677 * Returns true on success, or false if we ran out of shared memory to
2678 * allocate the new target or locks. Guaranteed to always succeed if
2679 * removeOld is set (by using the scratch entry in PredicateLockTargetHash
2680 * for scratch space).
2681 *
2682 * Warning: the "removeOld" option should be used only with care,
2683 * because this function does not (indeed, can not) update other
2684 * backends' LocalPredicateLockHash. If we are only adding new
2685 * entries, this is not a problem: the local lock table is used only
2686 * as a hint, so missing entries for locks that are held are
2687 * OK. Having entries for locks that are no longer held, as can happen
2688 * when using "removeOld", is not in general OK. We can only use it
2689 * safely when replacing a lock with a coarser-granularity lock that
2690 * covers it, or if we are absolutely certain that no one will need to
2691 * refer to that lock in the future.
2692 *
2693 * Caller must hold SerializablePredicateLockListLock.
2694 */
2695 static bool
TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,PREDICATELOCKTARGETTAG newtargettag,bool removeOld)2696 TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
2697 PREDICATELOCKTARGETTAG newtargettag,
2698 bool removeOld)
2699 {
2700 uint32 oldtargettaghash;
2701 LWLock *oldpartitionLock;
2702 PREDICATELOCKTARGET *oldtarget;
2703 uint32 newtargettaghash;
2704 LWLock *newpartitionLock;
2705 bool found;
2706 bool outOfShmem = false;
2707
2708 Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2709
2710 oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2711 newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
2712 oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2713 newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
2714
2715 if (removeOld)
2716 {
2717 /*
2718 * Remove the dummy entry to give us scratch space, so we know we'll
2719 * be able to create the new lock target.
2720 */
2721 RemoveScratchTarget(false);
2722 }
2723
2724 /*
2725 * We must get the partition locks in ascending sequence to avoid
2726 * deadlocks. If old and new partitions are the same, we must request the
2727 * lock only once.
2728 */
2729 if (oldpartitionLock < newpartitionLock)
2730 {
2731 LWLockAcquire(oldpartitionLock,
2732 (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2733 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2734 }
2735 else if (oldpartitionLock > newpartitionLock)
2736 {
2737 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2738 LWLockAcquire(oldpartitionLock,
2739 (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2740 }
2741 else
2742 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2743
2744 /*
2745 * Look for the old target. If not found, that's OK; no predicate locks
2746 * are affected, so we can just clean up and return. If it does exist,
2747 * walk its list of predicate locks and move or copy them to the new
2748 * target.
2749 */
2750 oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2751 &oldtargettag,
2752 oldtargettaghash,
2753 HASH_FIND, NULL);
2754
2755 if (oldtarget)
2756 {
2757 PREDICATELOCKTARGET *newtarget;
2758 PREDICATELOCK *oldpredlock;
2759 PREDICATELOCKTAG newpredlocktag;
2760
2761 newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2762 &newtargettag,
2763 newtargettaghash,
2764 HASH_ENTER_NULL, &found);
2765
2766 if (!newtarget)
2767 {
2768 /* Failed to allocate due to insufficient shmem */
2769 outOfShmem = true;
2770 goto exit;
2771 }
2772
2773 /* If we created a new entry, initialize it */
2774 if (!found)
2775 SHMQueueInit(&(newtarget->predicateLocks));
2776
2777 newpredlocktag.myTarget = newtarget;
2778
2779 /*
2780 * Loop through all the locks on the old target, replacing them with
2781 * locks on the new target.
2782 */
2783 oldpredlock = (PREDICATELOCK *)
2784 SHMQueueNext(&(oldtarget->predicateLocks),
2785 &(oldtarget->predicateLocks),
2786 offsetof(PREDICATELOCK, targetLink));
2787 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2788 while (oldpredlock)
2789 {
2790 SHM_QUEUE *predlocktargetlink;
2791 PREDICATELOCK *nextpredlock;
2792 PREDICATELOCK *newpredlock;
2793 SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
2794
2795 predlocktargetlink = &(oldpredlock->targetLink);
2796 nextpredlock = (PREDICATELOCK *)
2797 SHMQueueNext(&(oldtarget->predicateLocks),
2798 predlocktargetlink,
2799 offsetof(PREDICATELOCK, targetLink));
2800 newpredlocktag.myXact = oldpredlock->tag.myXact;
2801
2802 if (removeOld)
2803 {
2804 SHMQueueDelete(&(oldpredlock->xactLink));
2805 SHMQueueDelete(&(oldpredlock->targetLink));
2806
2807 hash_search_with_hash_value
2808 (PredicateLockHash,
2809 &oldpredlock->tag,
2810 PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
2811 oldtargettaghash),
2812 HASH_REMOVE, &found);
2813 Assert(found);
2814 }
2815
2816 newpredlock = (PREDICATELOCK *)
2817 hash_search_with_hash_value(PredicateLockHash,
2818 &newpredlocktag,
2819 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2820 newtargettaghash),
2821 HASH_ENTER_NULL,
2822 &found);
2823 if (!newpredlock)
2824 {
2825 /* Out of shared memory. Undo what we've done so far. */
2826 LWLockRelease(SerializableXactHashLock);
2827 DeleteLockTarget(newtarget, newtargettaghash);
2828 outOfShmem = true;
2829 goto exit;
2830 }
2831 if (!found)
2832 {
2833 SHMQueueInsertBefore(&(newtarget->predicateLocks),
2834 &(newpredlock->targetLink));
2835 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2836 &(newpredlock->xactLink));
2837 newpredlock->commitSeqNo = oldCommitSeqNo;
2838 }
2839 else
2840 {
2841 if (newpredlock->commitSeqNo < oldCommitSeqNo)
2842 newpredlock->commitSeqNo = oldCommitSeqNo;
2843 }
2844
2845 Assert(newpredlock->commitSeqNo != 0);
2846 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2847 || (newpredlock->tag.myXact == OldCommittedSxact));
2848
2849 oldpredlock = nextpredlock;
2850 }
2851 LWLockRelease(SerializableXactHashLock);
2852
2853 if (removeOld)
2854 {
2855 Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
2856 RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2857 }
2858 }
2859
2860
2861 exit:
2862 /* Release partition locks in reverse order of acquisition. */
2863 if (oldpartitionLock < newpartitionLock)
2864 {
2865 LWLockRelease(newpartitionLock);
2866 LWLockRelease(oldpartitionLock);
2867 }
2868 else if (oldpartitionLock > newpartitionLock)
2869 {
2870 LWLockRelease(oldpartitionLock);
2871 LWLockRelease(newpartitionLock);
2872 }
2873 else
2874 LWLockRelease(newpartitionLock);
2875
2876 if (removeOld)
2877 {
2878 /* We shouldn't run out of memory if we're moving locks */
2879 Assert(!outOfShmem);
2880
2881 /* Put the scrach entry back */
2882 RestoreScratchTarget(false);
2883 }
2884
2885 return !outOfShmem;
2886 }
2887
2888 /*
2889 * Drop all predicate locks of any granularity from the specified relation,
2890 * which can be a heap relation or an index relation. If 'transfer' is true,
2891 * acquire a relation lock on the heap for any transactions with any lock(s)
2892 * on the specified relation.
2893 *
2894 * This requires grabbing a lot of LW locks and scanning the entire lock
2895 * target table for matches. That makes this more expensive than most
2896 * predicate lock management functions, but it will only be called for DDL
2897 * type commands that are expensive anyway, and there are fast returns when
2898 * no serializable transactions are active or the relation is temporary.
2899 *
2900 * We don't use the TransferPredicateLocksToNewTarget function because it
2901 * acquires its own locks on the partitions of the two targets involved,
2902 * and we'll already be holding all partition locks.
2903 *
2904 * We can't throw an error from here, because the call could be from a
2905 * transaction which is not serializable.
2906 *
2907 * NOTE: This is currently only called with transfer set to true, but that may
2908 * change. If we decide to clean up the locks from a table on commit of a
2909 * transaction which executed DROP TABLE, the false condition will be useful.
2910 */
2911 static void
DropAllPredicateLocksFromTable(Relation relation,bool transfer)2912 DropAllPredicateLocksFromTable(Relation relation, bool transfer)
2913 {
2914 HASH_SEQ_STATUS seqstat;
2915 PREDICATELOCKTARGET *oldtarget;
2916 PREDICATELOCKTARGET *heaptarget;
2917 Oid dbId;
2918 Oid relId;
2919 Oid heapId;
2920 int i;
2921 bool isIndex;
2922 bool found;
2923 uint32 heaptargettaghash;
2924
2925 /*
2926 * Bail out quickly if there are no serializable transactions running.
2927 * It's safe to check this without taking locks because the caller is
2928 * holding an ACCESS EXCLUSIVE lock on the relation. No new locks which
2929 * would matter here can be acquired while that is held.
2930 */
2931 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2932 return;
2933
2934 if (!PredicateLockingNeededForRelation(relation))
2935 return;
2936
2937 dbId = relation->rd_node.dbNode;
2938 relId = relation->rd_id;
2939 if (relation->rd_index == NULL)
2940 {
2941 isIndex = false;
2942 heapId = relId;
2943 }
2944 else
2945 {
2946 isIndex = true;
2947 heapId = relation->rd_index->indrelid;
2948 }
2949 Assert(heapId != InvalidOid);
2950 Assert(transfer || !isIndex); /* index OID only makes sense with
2951 * transfer */
2952
2953 /* Retrieve first time needed, then keep. */
2954 heaptargettaghash = 0;
2955 heaptarget = NULL;
2956
2957 /* Acquire locks on all lock partitions */
2958 LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2959 for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
2960 LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
2961 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2962
2963 /*
2964 * Remove the dummy entry to give us scratch space, so we know we'll be
2965 * able to create the new lock target.
2966 */
2967 if (transfer)
2968 RemoveScratchTarget(true);
2969
2970 /* Scan through target map */
2971 hash_seq_init(&seqstat, PredicateLockTargetHash);
2972
2973 while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
2974 {
2975 PREDICATELOCK *oldpredlock;
2976
2977 /*
2978 * Check whether this is a target which needs attention.
2979 */
2980 if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != relId)
2981 continue; /* wrong relation id */
2982 if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
2983 continue; /* wrong database id */
2984 if (transfer && !isIndex
2985 && GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
2986 continue; /* already the right lock */
2987
2988 /*
2989 * If we made it here, we have work to do. We make sure the heap
2990 * relation lock exists, then we walk the list of predicate locks for
2991 * the old target we found, moving all locks to the heap relation lock
2992 * -- unless they already hold that.
2993 */
2994
2995 /*
2996 * First make sure we have the heap relation target. We only need to
2997 * do this once.
2998 */
2999 if (transfer && heaptarget == NULL)
3000 {
3001 PREDICATELOCKTARGETTAG heaptargettag;
3002
3003 SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
3004 heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
3005 heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
3006 &heaptargettag,
3007 heaptargettaghash,
3008 HASH_ENTER, &found);
3009 if (!found)
3010 SHMQueueInit(&heaptarget->predicateLocks);
3011 }
3012
3013 /*
3014 * Loop through all the locks on the old target, replacing them with
3015 * locks on the new target.
3016 */
3017 oldpredlock = (PREDICATELOCK *)
3018 SHMQueueNext(&(oldtarget->predicateLocks),
3019 &(oldtarget->predicateLocks),
3020 offsetof(PREDICATELOCK, targetLink));
3021 while (oldpredlock)
3022 {
3023 PREDICATELOCK *nextpredlock;
3024 PREDICATELOCK *newpredlock;
3025 SerCommitSeqNo oldCommitSeqNo;
3026 SERIALIZABLEXACT *oldXact;
3027
3028 nextpredlock = (PREDICATELOCK *)
3029 SHMQueueNext(&(oldtarget->predicateLocks),
3030 &(oldpredlock->targetLink),
3031 offsetof(PREDICATELOCK, targetLink));
3032
3033 /*
3034 * Remove the old lock first. This avoids the chance of running
3035 * out of lock structure entries for the hash table.
3036 */
3037 oldCommitSeqNo = oldpredlock->commitSeqNo;
3038 oldXact = oldpredlock->tag.myXact;
3039
3040 SHMQueueDelete(&(oldpredlock->xactLink));
3041
3042 /*
3043 * No need for retail delete from oldtarget list, we're removing
3044 * the whole target anyway.
3045 */
3046 hash_search(PredicateLockHash,
3047 &oldpredlock->tag,
3048 HASH_REMOVE, &found);
3049 Assert(found);
3050
3051 if (transfer)
3052 {
3053 PREDICATELOCKTAG newpredlocktag;
3054
3055 newpredlocktag.myTarget = heaptarget;
3056 newpredlocktag.myXact = oldXact;
3057 newpredlock = (PREDICATELOCK *)
3058 hash_search_with_hash_value(PredicateLockHash,
3059 &newpredlocktag,
3060 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
3061 heaptargettaghash),
3062 HASH_ENTER,
3063 &found);
3064 if (!found)
3065 {
3066 SHMQueueInsertBefore(&(heaptarget->predicateLocks),
3067 &(newpredlock->targetLink));
3068 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
3069 &(newpredlock->xactLink));
3070 newpredlock->commitSeqNo = oldCommitSeqNo;
3071 }
3072 else
3073 {
3074 if (newpredlock->commitSeqNo < oldCommitSeqNo)
3075 newpredlock->commitSeqNo = oldCommitSeqNo;
3076 }
3077
3078 Assert(newpredlock->commitSeqNo != 0);
3079 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
3080 || (newpredlock->tag.myXact == OldCommittedSxact));
3081 }
3082
3083 oldpredlock = nextpredlock;
3084 }
3085
3086 hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
3087 &found);
3088 Assert(found);
3089 }
3090
3091 /* Put the scratch entry back */
3092 if (transfer)
3093 RestoreScratchTarget(true);
3094
3095 /* Release locks in reverse order */
3096 LWLockRelease(SerializableXactHashLock);
3097 for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
3098 LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
3099 LWLockRelease(SerializablePredicateLockListLock);
3100 }
3101
3102 /*
3103 * TransferPredicateLocksToHeapRelation
3104 * For all transactions, transfer all predicate locks for the given
3105 * relation to a single relation lock on the heap.
3106 */
3107 void
TransferPredicateLocksToHeapRelation(Relation relation)3108 TransferPredicateLocksToHeapRelation(Relation relation)
3109 {
3110 DropAllPredicateLocksFromTable(relation, true);
3111 }
3112
3113
3114 /*
3115 * PredicateLockPageSplit
3116 *
3117 * Copies any predicate locks for the old page to the new page.
3118 * Skip if this is a temporary table or toast table.
3119 *
3120 * NOTE: A page split (or overflow) affects all serializable transactions,
3121 * even if it occurs in the context of another transaction isolation level.
3122 *
3123 * NOTE: This currently leaves the local copy of the locks without
3124 * information on the new lock which is in shared memory. This could cause
3125 * problems if enough page splits occur on locked pages without the processes
3126 * which hold the locks getting in and noticing.
3127 */
3128 void
PredicateLockPageSplit(Relation relation,BlockNumber oldblkno,BlockNumber newblkno)3129 PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
3130 BlockNumber newblkno)
3131 {
3132 PREDICATELOCKTARGETTAG oldtargettag;
3133 PREDICATELOCKTARGETTAG newtargettag;
3134 bool success;
3135
3136 /*
3137 * Bail out quickly if there are no serializable transactions running.
3138 *
3139 * It's safe to do this check without taking any additional locks. Even if
3140 * a serializable transaction starts concurrently, we know it can't take
3141 * any SIREAD locks on the page being split because the caller is holding
3142 * the associated buffer page lock. Memory reordering isn't an issue; the
3143 * memory barrier in the LWLock acquisition guarantees that this read
3144 * occurs while the buffer page lock is held.
3145 */
3146 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
3147 return;
3148
3149 if (!PredicateLockingNeededForRelation(relation))
3150 return;
3151
3152 Assert(oldblkno != newblkno);
3153 Assert(BlockNumberIsValid(oldblkno));
3154 Assert(BlockNumberIsValid(newblkno));
3155
3156 SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
3157 relation->rd_node.dbNode,
3158 relation->rd_id,
3159 oldblkno);
3160 SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
3161 relation->rd_node.dbNode,
3162 relation->rd_id,
3163 newblkno);
3164
3165 LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
3166
3167 /*
3168 * Try copying the locks over to the new page's tag, creating it if
3169 * necessary.
3170 */
3171 success = TransferPredicateLocksToNewTarget(oldtargettag,
3172 newtargettag,
3173 false);
3174
3175 if (!success)
3176 {
3177 /*
3178 * No more predicate lock entries are available. Failure isn't an
3179 * option here, so promote the page lock to a relation lock.
3180 */
3181
3182 /* Get the parent relation lock's lock tag */
3183 success = GetParentPredicateLockTag(&oldtargettag,
3184 &newtargettag);
3185 Assert(success);
3186
3187 /*
3188 * Move the locks to the parent. This shouldn't fail.
3189 *
3190 * Note that here we are removing locks held by other backends,
3191 * leading to a possible inconsistency in their local lock hash table.
3192 * This is OK because we're replacing it with a lock that covers the
3193 * old one.
3194 */
3195 success = TransferPredicateLocksToNewTarget(oldtargettag,
3196 newtargettag,
3197 true);
3198 Assert(success);
3199 }
3200
3201 LWLockRelease(SerializablePredicateLockListLock);
3202 }
3203
3204 /*
3205 * PredicateLockPageCombine
3206 *
3207 * Combines predicate locks for two existing pages.
3208 * Skip if this is a temporary table or toast table.
3209 *
3210 * NOTE: A page combine affects all serializable transactions, even if it
3211 * occurs in the context of another transaction isolation level.
3212 */
3213 void
PredicateLockPageCombine(Relation relation,BlockNumber oldblkno,BlockNumber newblkno)3214 PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
3215 BlockNumber newblkno)
3216 {
3217 /*
3218 * Page combines differ from page splits in that we ought to be able to
3219 * remove the locks on the old page after transferring them to the new
3220 * page, instead of duplicating them. However, because we can't edit other
3221 * backends' local lock tables, removing the old lock would leave them
3222 * with an entry in their LocalPredicateLockHash for a lock they're not
3223 * holding, which isn't acceptable. So we wind up having to do the same
3224 * work as a page split, acquiring a lock on the new page and keeping the
3225 * old page locked too. That can lead to some false positives, but should
3226 * be rare in practice.
3227 */
3228 PredicateLockPageSplit(relation, oldblkno, newblkno);
3229 }
3230
3231 /*
3232 * Walk the list of in-progress serializable transactions and find the new
3233 * xmin.
3234 */
3235 static void
SetNewSxactGlobalXmin(void)3236 SetNewSxactGlobalXmin(void)
3237 {
3238 SERIALIZABLEXACT *sxact;
3239
3240 Assert(LWLockHeldByMe(SerializableXactHashLock));
3241
3242 PredXact->SxactGlobalXmin = InvalidTransactionId;
3243 PredXact->SxactGlobalXminCount = 0;
3244
3245 for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
3246 {
3247 if (!SxactIsRolledBack(sxact)
3248 && !SxactIsCommitted(sxact)
3249 && sxact != OldCommittedSxact)
3250 {
3251 Assert(sxact->xmin != InvalidTransactionId);
3252 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3253 || TransactionIdPrecedes(sxact->xmin,
3254 PredXact->SxactGlobalXmin))
3255 {
3256 PredXact->SxactGlobalXmin = sxact->xmin;
3257 PredXact->SxactGlobalXminCount = 1;
3258 }
3259 else if (TransactionIdEquals(sxact->xmin,
3260 PredXact->SxactGlobalXmin))
3261 PredXact->SxactGlobalXminCount++;
3262 }
3263 }
3264
3265 OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
3266 }
3267
3268 /*
3269 * ReleasePredicateLocks
3270 *
3271 * Releases predicate locks based on completion of the current transaction,
3272 * whether committed or rolled back. It can also be called for a read only
3273 * transaction when it becomes impossible for the transaction to become
3274 * part of a dangerous structure.
3275 *
3276 * We do nothing unless this is a serializable transaction.
3277 *
3278 * This method must ensure that shared memory hash tables are cleaned
3279 * up in some relatively timely fashion.
3280 *
3281 * If this transaction is committing and is holding any predicate locks,
3282 * it must be added to a list of completed serializable transactions still
3283 * holding locks.
3284 */
3285 void
ReleasePredicateLocks(bool isCommit)3286 ReleasePredicateLocks(bool isCommit)
3287 {
3288 bool needToClear;
3289 RWConflict conflict,
3290 nextConflict,
3291 possibleUnsafeConflict;
3292 SERIALIZABLEXACT *roXact;
3293
3294 /*
3295 * We can't trust XactReadOnly here, because a transaction which started
3296 * as READ WRITE can show as READ ONLY later, e.g., within
3297 * subtransactions. We want to flag a transaction as READ ONLY if it
3298 * commits without writing so that de facto READ ONLY transactions get the
3299 * benefit of some RO optimizations, so we will use this local variable to
3300 * get some cleanup logic right which is based on whether the transaction
3301 * was declared READ ONLY at the top level.
3302 */
3303 bool topLevelIsDeclaredReadOnly;
3304
3305 if (MySerializableXact == InvalidSerializableXact)
3306 {
3307 Assert(LocalPredicateLockHash == NULL);
3308 return;
3309 }
3310
3311 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3312
3313 Assert(!isCommit || SxactIsPrepared(MySerializableXact));
3314 Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
3315 Assert(!SxactIsCommitted(MySerializableXact));
3316 Assert(!SxactIsRolledBack(MySerializableXact));
3317
3318 /* may not be serializable during COMMIT/ROLLBACK PREPARED */
3319 Assert(MySerializableXact->pid == 0 || IsolationIsSerializable());
3320
3321 /* We'd better not already be on the cleanup list. */
3322 Assert(!SxactIsOnFinishedList(MySerializableXact));
3323
3324 topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
3325
3326 /*
3327 * We don't hold XidGenLock lock here, assuming that TransactionId is
3328 * atomic!
3329 *
3330 * If this value is changing, we don't care that much whether we get the
3331 * old or new value -- it is just used to determine how far
3332 * GlobalSerializableXmin must advance before this transaction can be
3333 * fully cleaned up. The worst that could happen is we wait for one more
3334 * transaction to complete before freeing some RAM; correctness of visible
3335 * behavior is not affected.
3336 */
3337 MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
3338
3339 /*
3340 * If it's not a commit it's a rollback, and we can clear our locks
3341 * immediately.
3342 */
3343 if (isCommit)
3344 {
3345 MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
3346 MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
3347 /* Recognize implicit read-only transaction (commit without write). */
3348 if (!MyXactDidWrite)
3349 MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
3350 }
3351 else
3352 {
3353 /*
3354 * The DOOMED flag indicates that we intend to roll back this
3355 * transaction and so it should not cause serialization failures for
3356 * other transactions that conflict with it. Note that this flag might
3357 * already be set, if another backend marked this transaction for
3358 * abort.
3359 *
3360 * The ROLLED_BACK flag further indicates that ReleasePredicateLocks
3361 * has been called, and so the SerializableXact is eligible for
3362 * cleanup. This means it should not be considered when calculating
3363 * SxactGlobalXmin.
3364 */
3365 MySerializableXact->flags |= SXACT_FLAG_DOOMED;
3366 MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
3367
3368 /*
3369 * If the transaction was previously prepared, but is now failing due
3370 * to a ROLLBACK PREPARED or (hopefully very rare) error after the
3371 * prepare, clear the prepared flag. This simplifies conflict
3372 * checking.
3373 */
3374 MySerializableXact->flags &= ~SXACT_FLAG_PREPARED;
3375 }
3376
3377 if (!topLevelIsDeclaredReadOnly)
3378 {
3379 Assert(PredXact->WritableSxactCount > 0);
3380 if (--(PredXact->WritableSxactCount) == 0)
3381 {
3382 /*
3383 * Release predicate locks and rw-conflicts in for all committed
3384 * transactions. There are no longer any transactions which might
3385 * conflict with the locks and no chance for new transactions to
3386 * overlap. Similarly, existing conflicts in can't cause pivots,
3387 * and any conflicts in which could have completed a dangerous
3388 * structure would already have caused a rollback, so any
3389 * remaining ones must be benign.
3390 */
3391 PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
3392 }
3393 }
3394 else
3395 {
3396 /*
3397 * Read-only transactions: clear the list of transactions that might
3398 * make us unsafe. Note that we use 'inLink' for the iteration as
3399 * opposed to 'outLink' for the r/w xacts.
3400 */
3401 possibleUnsafeConflict = (RWConflict)
3402 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3403 &MySerializableXact->possibleUnsafeConflicts,
3404 offsetof(RWConflictData, inLink));
3405 while (possibleUnsafeConflict)
3406 {
3407 nextConflict = (RWConflict)
3408 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3409 &possibleUnsafeConflict->inLink,
3410 offsetof(RWConflictData, inLink));
3411
3412 Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
3413 Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
3414
3415 ReleaseRWConflict(possibleUnsafeConflict);
3416
3417 possibleUnsafeConflict = nextConflict;
3418 }
3419 }
3420
3421 /* Check for conflict out to old committed transactions. */
3422 if (isCommit
3423 && !SxactIsReadOnly(MySerializableXact)
3424 && SxactHasSummaryConflictOut(MySerializableXact))
3425 {
3426 /*
3427 * we don't know which old committed transaction we conflicted with,
3428 * so be conservative and use FirstNormalSerCommitSeqNo here
3429 */
3430 MySerializableXact->SeqNo.earliestOutConflictCommit =
3431 FirstNormalSerCommitSeqNo;
3432 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3433 }
3434
3435 /*
3436 * Release all outConflicts to committed transactions. If we're rolling
3437 * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
3438 * previously committed transactions.
3439 */
3440 conflict = (RWConflict)
3441 SHMQueueNext(&MySerializableXact->outConflicts,
3442 &MySerializableXact->outConflicts,
3443 offsetof(RWConflictData, outLink));
3444 while (conflict)
3445 {
3446 nextConflict = (RWConflict)
3447 SHMQueueNext(&MySerializableXact->outConflicts,
3448 &conflict->outLink,
3449 offsetof(RWConflictData, outLink));
3450
3451 if (isCommit
3452 && !SxactIsReadOnly(MySerializableXact)
3453 && SxactIsCommitted(conflict->sxactIn))
3454 {
3455 if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
3456 || conflict->sxactIn->prepareSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
3457 MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->prepareSeqNo;
3458 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3459 }
3460
3461 if (!isCommit
3462 || SxactIsCommitted(conflict->sxactIn)
3463 || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
3464 ReleaseRWConflict(conflict);
3465
3466 conflict = nextConflict;
3467 }
3468
3469 /*
3470 * Release all inConflicts from committed and read-only transactions. If
3471 * we're rolling back, clear them all.
3472 */
3473 conflict = (RWConflict)
3474 SHMQueueNext(&MySerializableXact->inConflicts,
3475 &MySerializableXact->inConflicts,
3476 offsetof(RWConflictData, inLink));
3477 while (conflict)
3478 {
3479 nextConflict = (RWConflict)
3480 SHMQueueNext(&MySerializableXact->inConflicts,
3481 &conflict->inLink,
3482 offsetof(RWConflictData, inLink));
3483
3484 if (!isCommit
3485 || SxactIsCommitted(conflict->sxactOut)
3486 || SxactIsReadOnly(conflict->sxactOut))
3487 ReleaseRWConflict(conflict);
3488
3489 conflict = nextConflict;
3490 }
3491
3492 if (!topLevelIsDeclaredReadOnly)
3493 {
3494 /*
3495 * Remove ourselves from the list of possible conflicts for concurrent
3496 * READ ONLY transactions, flagging them as unsafe if we have a
3497 * conflict out. If any are waiting DEFERRABLE transactions, wake them
3498 * up if they are known safe or known unsafe.
3499 */
3500 possibleUnsafeConflict = (RWConflict)
3501 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3502 &MySerializableXact->possibleUnsafeConflicts,
3503 offsetof(RWConflictData, outLink));
3504 while (possibleUnsafeConflict)
3505 {
3506 nextConflict = (RWConflict)
3507 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3508 &possibleUnsafeConflict->outLink,
3509 offsetof(RWConflictData, outLink));
3510
3511 roXact = possibleUnsafeConflict->sxactIn;
3512 Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
3513 Assert(SxactIsReadOnly(roXact));
3514
3515 /* Mark conflicted if necessary. */
3516 if (isCommit
3517 && MyXactDidWrite
3518 && SxactHasConflictOut(MySerializableXact)
3519 && (MySerializableXact->SeqNo.earliestOutConflictCommit
3520 <= roXact->SeqNo.lastCommitBeforeSnapshot))
3521 {
3522 /*
3523 * This releases possibleUnsafeConflict (as well as all other
3524 * possible conflicts for roXact)
3525 */
3526 FlagSxactUnsafe(roXact);
3527 }
3528 else
3529 {
3530 ReleaseRWConflict(possibleUnsafeConflict);
3531
3532 /*
3533 * If we were the last possible conflict, flag it safe. The
3534 * transaction can now safely release its predicate locks (but
3535 * that transaction's backend has to do that itself).
3536 */
3537 if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
3538 roXact->flags |= SXACT_FLAG_RO_SAFE;
3539 }
3540
3541 /*
3542 * Wake up the process for a waiting DEFERRABLE transaction if we
3543 * now know it's either safe or conflicted.
3544 */
3545 if (SxactIsDeferrableWaiting(roXact) &&
3546 (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
3547 ProcSendSignal(roXact->pid);
3548
3549 possibleUnsafeConflict = nextConflict;
3550 }
3551 }
3552
3553 /*
3554 * Check whether it's time to clean up old transactions. This can only be
3555 * done when the last serializable transaction with the oldest xmin among
3556 * serializable transactions completes. We then find the "new oldest"
3557 * xmin and purge any transactions which finished before this transaction
3558 * was launched.
3559 */
3560 needToClear = false;
3561 if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
3562 {
3563 Assert(PredXact->SxactGlobalXminCount > 0);
3564 if (--(PredXact->SxactGlobalXminCount) == 0)
3565 {
3566 SetNewSxactGlobalXmin();
3567 needToClear = true;
3568 }
3569 }
3570
3571 LWLockRelease(SerializableXactHashLock);
3572
3573 LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3574
3575 /* Add this to the list of transactions to check for later cleanup. */
3576 if (isCommit)
3577 SHMQueueInsertBefore(FinishedSerializableTransactions,
3578 &MySerializableXact->finishedLink);
3579
3580 if (!isCommit)
3581 ReleaseOneSerializableXact(MySerializableXact, false, false);
3582
3583 LWLockRelease(SerializableFinishedListLock);
3584
3585 if (needToClear)
3586 ClearOldPredicateLocks();
3587
3588 MySerializableXact = InvalidSerializableXact;
3589 MyXactDidWrite = false;
3590
3591 /* Delete per-transaction lock table */
3592 if (LocalPredicateLockHash != NULL)
3593 {
3594 hash_destroy(LocalPredicateLockHash);
3595 LocalPredicateLockHash = NULL;
3596 }
3597 }
3598
3599 /*
3600 * Clear old predicate locks, belonging to committed transactions that are no
3601 * longer interesting to any in-progress transaction.
3602 */
3603 static void
ClearOldPredicateLocks(void)3604 ClearOldPredicateLocks(void)
3605 {
3606 SERIALIZABLEXACT *finishedSxact;
3607 PREDICATELOCK *predlock;
3608
3609 /*
3610 * Loop through finished transactions. They are in commit order, so we can
3611 * stop as soon as we find one that's still interesting.
3612 */
3613 LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3614 finishedSxact = (SERIALIZABLEXACT *)
3615 SHMQueueNext(FinishedSerializableTransactions,
3616 FinishedSerializableTransactions,
3617 offsetof(SERIALIZABLEXACT, finishedLink));
3618 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3619 while (finishedSxact)
3620 {
3621 SERIALIZABLEXACT *nextSxact;
3622
3623 nextSxact = (SERIALIZABLEXACT *)
3624 SHMQueueNext(FinishedSerializableTransactions,
3625 &(finishedSxact->finishedLink),
3626 offsetof(SERIALIZABLEXACT, finishedLink));
3627 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3628 || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
3629 PredXact->SxactGlobalXmin))
3630 {
3631 /*
3632 * This transaction committed before any in-progress transaction
3633 * took its snapshot. It's no longer interesting.
3634 */
3635 LWLockRelease(SerializableXactHashLock);
3636 SHMQueueDelete(&(finishedSxact->finishedLink));
3637 ReleaseOneSerializableXact(finishedSxact, false, false);
3638 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3639 }
3640 else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
3641 && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
3642 {
3643 /*
3644 * Any active transactions that took their snapshot before this
3645 * transaction committed are read-only, so we can clear part of
3646 * its state.
3647 */
3648 LWLockRelease(SerializableXactHashLock);
3649
3650 if (SxactIsReadOnly(finishedSxact))
3651 {
3652 /* A read-only transaction can be removed entirely */
3653 SHMQueueDelete(&(finishedSxact->finishedLink));
3654 ReleaseOneSerializableXact(finishedSxact, false, false);
3655 }
3656 else
3657 {
3658 /*
3659 * A read-write transaction can only be partially cleared. We
3660 * need to keep the SERIALIZABLEXACT but can release the
3661 * SIREAD locks and conflicts in.
3662 */
3663 ReleaseOneSerializableXact(finishedSxact, true, false);
3664 }
3665
3666 PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
3667 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3668 }
3669 else
3670 {
3671 /* Still interesting. */
3672 break;
3673 }
3674 finishedSxact = nextSxact;
3675 }
3676 LWLockRelease(SerializableXactHashLock);
3677
3678 /*
3679 * Loop through predicate locks on dummy transaction for summarized data.
3680 */
3681 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3682 predlock = (PREDICATELOCK *)
3683 SHMQueueNext(&OldCommittedSxact->predicateLocks,
3684 &OldCommittedSxact->predicateLocks,
3685 offsetof(PREDICATELOCK, xactLink));
3686 while (predlock)
3687 {
3688 PREDICATELOCK *nextpredlock;
3689 bool canDoPartialCleanup;
3690
3691 nextpredlock = (PREDICATELOCK *)
3692 SHMQueueNext(&OldCommittedSxact->predicateLocks,
3693 &predlock->xactLink,
3694 offsetof(PREDICATELOCK, xactLink));
3695
3696 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3697 Assert(predlock->commitSeqNo != 0);
3698 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3699 canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
3700 LWLockRelease(SerializableXactHashLock);
3701
3702 /*
3703 * If this lock originally belonged to an old enough transaction, we
3704 * can release it.
3705 */
3706 if (canDoPartialCleanup)
3707 {
3708 PREDICATELOCKTAG tag;
3709 PREDICATELOCKTARGET *target;
3710 PREDICATELOCKTARGETTAG targettag;
3711 uint32 targettaghash;
3712 LWLock *partitionLock;
3713
3714 tag = predlock->tag;
3715 target = tag.myTarget;
3716 targettag = target->tag;
3717 targettaghash = PredicateLockTargetTagHashCode(&targettag);
3718 partitionLock = PredicateLockHashPartitionLock(targettaghash);
3719
3720 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3721
3722 SHMQueueDelete(&(predlock->targetLink));
3723 SHMQueueDelete(&(predlock->xactLink));
3724
3725 hash_search_with_hash_value(PredicateLockHash, &tag,
3726 PredicateLockHashCodeFromTargetHashCode(&tag,
3727 targettaghash),
3728 HASH_REMOVE, NULL);
3729 RemoveTargetIfNoLongerUsed(target, targettaghash);
3730
3731 LWLockRelease(partitionLock);
3732 }
3733
3734 predlock = nextpredlock;
3735 }
3736
3737 LWLockRelease(SerializablePredicateLockListLock);
3738 LWLockRelease(SerializableFinishedListLock);
3739 }
3740
3741 /*
3742 * This is the normal way to delete anything from any of the predicate
3743 * locking hash tables. Given a transaction which we know can be deleted:
3744 * delete all predicate locks held by that transaction and any predicate
3745 * lock targets which are now unreferenced by a lock; delete all conflicts
3746 * for the transaction; delete all xid values for the transaction; then
3747 * delete the transaction.
3748 *
3749 * When the partial flag is set, we can release all predicate locks and
3750 * in-conflict information -- we've established that there are no longer
3751 * any overlapping read write transactions for which this transaction could
3752 * matter -- but keep the transaction entry itself and any outConflicts.
3753 *
3754 * When the summarize flag is set, we've run short of room for sxact data
3755 * and must summarize to the SLRU. Predicate locks are transferred to a
3756 * dummy "old" transaction, with duplicate locks on a single target
3757 * collapsing to a single lock with the "latest" commitSeqNo from among
3758 * the conflicting locks..
3759 */
3760 static void
ReleaseOneSerializableXact(SERIALIZABLEXACT * sxact,bool partial,bool summarize)3761 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
3762 bool summarize)
3763 {
3764 PREDICATELOCK *predlock;
3765 SERIALIZABLEXIDTAG sxidtag;
3766 RWConflict conflict,
3767 nextConflict;
3768
3769 Assert(sxact != NULL);
3770 Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
3771 Assert(partial || !SxactIsOnFinishedList(sxact));
3772 Assert(LWLockHeldByMe(SerializableFinishedListLock));
3773
3774 /*
3775 * First release all the predicate locks held by this xact (or transfer
3776 * them to OldCommittedSxact if summarize is true)
3777 */
3778 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3779 predlock = (PREDICATELOCK *)
3780 SHMQueueNext(&(sxact->predicateLocks),
3781 &(sxact->predicateLocks),
3782 offsetof(PREDICATELOCK, xactLink));
3783 while (predlock)
3784 {
3785 PREDICATELOCK *nextpredlock;
3786 PREDICATELOCKTAG tag;
3787 SHM_QUEUE *targetLink;
3788 PREDICATELOCKTARGET *target;
3789 PREDICATELOCKTARGETTAG targettag;
3790 uint32 targettaghash;
3791 LWLock *partitionLock;
3792
3793 nextpredlock = (PREDICATELOCK *)
3794 SHMQueueNext(&(sxact->predicateLocks),
3795 &(predlock->xactLink),
3796 offsetof(PREDICATELOCK, xactLink));
3797
3798 tag = predlock->tag;
3799 targetLink = &(predlock->targetLink);
3800 target = tag.myTarget;
3801 targettag = target->tag;
3802 targettaghash = PredicateLockTargetTagHashCode(&targettag);
3803 partitionLock = PredicateLockHashPartitionLock(targettaghash);
3804
3805 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3806
3807 SHMQueueDelete(targetLink);
3808
3809 hash_search_with_hash_value(PredicateLockHash, &tag,
3810 PredicateLockHashCodeFromTargetHashCode(&tag,
3811 targettaghash),
3812 HASH_REMOVE, NULL);
3813 if (summarize)
3814 {
3815 bool found;
3816
3817 /* Fold into dummy transaction list. */
3818 tag.myXact = OldCommittedSxact;
3819 predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
3820 PredicateLockHashCodeFromTargetHashCode(&tag,
3821 targettaghash),
3822 HASH_ENTER_NULL, &found);
3823 if (!predlock)
3824 ereport(ERROR,
3825 (errcode(ERRCODE_OUT_OF_MEMORY),
3826 errmsg("out of shared memory"),
3827 errhint("You might need to increase max_pred_locks_per_transaction.")));
3828 if (found)
3829 {
3830 Assert(predlock->commitSeqNo != 0);
3831 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3832 if (predlock->commitSeqNo < sxact->commitSeqNo)
3833 predlock->commitSeqNo = sxact->commitSeqNo;
3834 }
3835 else
3836 {
3837 SHMQueueInsertBefore(&(target->predicateLocks),
3838 &(predlock->targetLink));
3839 SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
3840 &(predlock->xactLink));
3841 predlock->commitSeqNo = sxact->commitSeqNo;
3842 }
3843 }
3844 else
3845 RemoveTargetIfNoLongerUsed(target, targettaghash);
3846
3847 LWLockRelease(partitionLock);
3848
3849 predlock = nextpredlock;
3850 }
3851
3852 /*
3853 * Rather than retail removal, just re-init the head after we've run
3854 * through the list.
3855 */
3856 SHMQueueInit(&sxact->predicateLocks);
3857
3858 LWLockRelease(SerializablePredicateLockListLock);
3859
3860 sxidtag.xid = sxact->topXid;
3861 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3862
3863 /* Release all outConflicts (unless 'partial' is true) */
3864 if (!partial)
3865 {
3866 conflict = (RWConflict)
3867 SHMQueueNext(&sxact->outConflicts,
3868 &sxact->outConflicts,
3869 offsetof(RWConflictData, outLink));
3870 while (conflict)
3871 {
3872 nextConflict = (RWConflict)
3873 SHMQueueNext(&sxact->outConflicts,
3874 &conflict->outLink,
3875 offsetof(RWConflictData, outLink));
3876 if (summarize)
3877 conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
3878 ReleaseRWConflict(conflict);
3879 conflict = nextConflict;
3880 }
3881 }
3882
3883 /* Release all inConflicts. */
3884 conflict = (RWConflict)
3885 SHMQueueNext(&sxact->inConflicts,
3886 &sxact->inConflicts,
3887 offsetof(RWConflictData, inLink));
3888 while (conflict)
3889 {
3890 nextConflict = (RWConflict)
3891 SHMQueueNext(&sxact->inConflicts,
3892 &conflict->inLink,
3893 offsetof(RWConflictData, inLink));
3894 if (summarize)
3895 conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3896 ReleaseRWConflict(conflict);
3897 conflict = nextConflict;
3898 }
3899
3900 /* Finally, get rid of the xid and the record of the transaction itself. */
3901 if (!partial)
3902 {
3903 if (sxidtag.xid != InvalidTransactionId)
3904 hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
3905 ReleasePredXact(sxact);
3906 }
3907
3908 LWLockRelease(SerializableXactHashLock);
3909 }
3910
3911 /*
3912 * Tests whether the given top level transaction is concurrent with
3913 * (overlaps) our current transaction.
3914 *
3915 * We need to identify the top level transaction for SSI, anyway, so pass
3916 * that to this function to save the overhead of checking the snapshot's
3917 * subxip array.
3918 */
3919 static bool
XidIsConcurrent(TransactionId xid)3920 XidIsConcurrent(TransactionId xid)
3921 {
3922 Snapshot snap;
3923 uint32 i;
3924
3925 Assert(TransactionIdIsValid(xid));
3926 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
3927
3928 snap = GetTransactionSnapshot();
3929
3930 if (TransactionIdPrecedes(xid, snap->xmin))
3931 return false;
3932
3933 if (TransactionIdFollowsOrEquals(xid, snap->xmax))
3934 return true;
3935
3936 for (i = 0; i < snap->xcnt; i++)
3937 {
3938 if (xid == snap->xip[i])
3939 return true;
3940 }
3941
3942 return false;
3943 }
3944
3945 /*
3946 * CheckForSerializableConflictOut
3947 * We are reading a tuple which has been modified. If it is visible to
3948 * us but has been deleted, that indicates a rw-conflict out. If it's
3949 * not visible and was created by a concurrent (overlapping)
3950 * serializable transaction, that is also a rw-conflict out,
3951 *
3952 * We will determine the top level xid of the writing transaction with which
3953 * we may be in conflict, and check for overlap with our own transaction.
3954 * If the transactions overlap (i.e., they cannot see each other's writes),
3955 * then we have a conflict out.
3956 *
3957 * This function should be called just about anywhere in heapam.c where a
3958 * tuple has been read. The caller must hold at least a shared lock on the
3959 * buffer, because this function might set hint bits on the tuple. There is
3960 * currently no known reason to call this function from an index AM.
3961 */
3962 void
CheckForSerializableConflictOut(bool visible,Relation relation,HeapTuple tuple,Buffer buffer,Snapshot snapshot)3963 CheckForSerializableConflictOut(bool visible, Relation relation,
3964 HeapTuple tuple, Buffer buffer,
3965 Snapshot snapshot)
3966 {
3967 TransactionId xid;
3968 SERIALIZABLEXIDTAG sxidtag;
3969 SERIALIZABLEXID *sxid;
3970 SERIALIZABLEXACT *sxact;
3971 HTSV_Result htsvResult;
3972
3973 if (!SerializationNeededForRead(relation, snapshot))
3974 return;
3975
3976 /* Check if someone else has already decided that we need to die */
3977 if (SxactIsDoomed(MySerializableXact))
3978 {
3979 ereport(ERROR,
3980 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3981 errmsg("could not serialize access due to read/write dependencies among transactions"),
3982 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."),
3983 errhint("The transaction might succeed if retried.")));
3984 }
3985
3986 /*
3987 * Check to see whether the tuple has been written to by a concurrent
3988 * transaction, either to create it not visible to us, or to delete it
3989 * while it is visible to us. The "visible" bool indicates whether the
3990 * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
3991 * is going on with it.
3992 *
3993 * In the event of a concurrently inserted tuple that also happens to have
3994 * been concurrently updated (by a separate transaction), the xmin of the
3995 * tuple will be used -- not the updater's xid.
3996 */
3997 htsvResult = HeapTupleSatisfiesVacuum(tuple, TransactionXmin, buffer);
3998 switch (htsvResult)
3999 {
4000 case HEAPTUPLE_LIVE:
4001 if (visible)
4002 return;
4003 xid = HeapTupleHeaderGetXmin(tuple->t_data);
4004 break;
4005 case HEAPTUPLE_RECENTLY_DEAD:
4006 case HEAPTUPLE_DELETE_IN_PROGRESS:
4007 if (visible)
4008 xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
4009 else
4010 xid = HeapTupleHeaderGetXmin(tuple->t_data);
4011
4012 if (TransactionIdPrecedes(xid, TransactionXmin))
4013 {
4014 /* This is like the HEAPTUPLE_DEAD case */
4015 Assert(!visible);
4016 return;
4017 }
4018 break;
4019 case HEAPTUPLE_INSERT_IN_PROGRESS:
4020 xid = HeapTupleHeaderGetXmin(tuple->t_data);
4021 break;
4022 case HEAPTUPLE_DEAD:
4023 Assert(!visible);
4024 return;
4025 default:
4026
4027 /*
4028 * The only way to get to this default clause is if a new value is
4029 * added to the enum type without adding it to this switch
4030 * statement. That's a bug, so elog.
4031 */
4032 elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
4033
4034 /*
4035 * In spite of having all enum values covered and calling elog on
4036 * this default, some compilers think this is a code path which
4037 * allows xid to be used below without initialization. Silence
4038 * that warning.
4039 */
4040 xid = InvalidTransactionId;
4041 }
4042 Assert(TransactionIdIsValid(xid));
4043 Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
4044
4045 /*
4046 * Find top level xid. Bail out if xid is too early to be a conflict, or
4047 * if it's our own xid.
4048 */
4049 if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
4050 return;
4051 xid = SubTransGetTopmostTransaction(xid);
4052 if (TransactionIdPrecedes(xid, TransactionXmin))
4053 return;
4054 if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
4055 return;
4056
4057 /*
4058 * Find sxact or summarized info for the top level xid.
4059 */
4060 sxidtag.xid = xid;
4061 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4062 sxid = (SERIALIZABLEXID *)
4063 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4064 if (!sxid)
4065 {
4066 /*
4067 * Transaction not found in "normal" SSI structures. Check whether it
4068 * got pushed out to SLRU storage for "old committed" transactions.
4069 */
4070 SerCommitSeqNo conflictCommitSeqNo;
4071
4072 conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
4073 if (conflictCommitSeqNo != 0)
4074 {
4075 if (conflictCommitSeqNo != InvalidSerCommitSeqNo
4076 && (!SxactIsReadOnly(MySerializableXact)
4077 || conflictCommitSeqNo
4078 <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
4079 ereport(ERROR,
4080 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4081 errmsg("could not serialize access due to read/write dependencies among transactions"),
4082 errdetail_internal("Reason code: Canceled on conflict out to old pivot %u.", xid),
4083 errhint("The transaction might succeed if retried.")));
4084
4085 if (SxactHasSummaryConflictIn(MySerializableXact)
4086 || !SHMQueueEmpty(&MySerializableXact->inConflicts))
4087 ereport(ERROR,
4088 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4089 errmsg("could not serialize access due to read/write dependencies among transactions"),
4090 errdetail_internal("Reason code: Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
4091 errhint("The transaction might succeed if retried.")));
4092
4093 MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4094 }
4095
4096 /* It's not serializable or otherwise not important. */
4097 LWLockRelease(SerializableXactHashLock);
4098 return;
4099 }
4100 sxact = sxid->myXact;
4101 Assert(TransactionIdEquals(sxact->topXid, xid));
4102 if (sxact == MySerializableXact || SxactIsDoomed(sxact))
4103 {
4104 /* Can't conflict with ourself or a transaction that will roll back. */
4105 LWLockRelease(SerializableXactHashLock);
4106 return;
4107 }
4108
4109 /*
4110 * We have a conflict out to a transaction which has a conflict out to a
4111 * summarized transaction. That summarized transaction must have
4112 * committed first, and we can't tell when it committed in relation to our
4113 * snapshot acquisition, so something needs to be canceled.
4114 */
4115 if (SxactHasSummaryConflictOut(sxact))
4116 {
4117 if (!SxactIsPrepared(sxact))
4118 {
4119 sxact->flags |= SXACT_FLAG_DOOMED;
4120 LWLockRelease(SerializableXactHashLock);
4121 return;
4122 }
4123 else
4124 {
4125 LWLockRelease(SerializableXactHashLock);
4126 ereport(ERROR,
4127 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4128 errmsg("could not serialize access due to read/write dependencies among transactions"),
4129 errdetail_internal("Reason code: Canceled on conflict out to old pivot."),
4130 errhint("The transaction might succeed if retried.")));
4131 }
4132 }
4133
4134 /*
4135 * If this is a read-only transaction and the writing transaction has
4136 * committed, and it doesn't have a rw-conflict to a transaction which
4137 * committed before it, no conflict.
4138 */
4139 if (SxactIsReadOnly(MySerializableXact)
4140 && SxactIsCommitted(sxact)
4141 && !SxactHasSummaryConflictOut(sxact)
4142 && (!SxactHasConflictOut(sxact)
4143 || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
4144 {
4145 /* Read-only transaction will appear to run first. No conflict. */
4146 LWLockRelease(SerializableXactHashLock);
4147 return;
4148 }
4149
4150 if (!XidIsConcurrent(xid))
4151 {
4152 /* This write was already in our snapshot; no conflict. */
4153 LWLockRelease(SerializableXactHashLock);
4154 return;
4155 }
4156
4157 if (RWConflictExists(MySerializableXact, sxact))
4158 {
4159 /* We don't want duplicate conflict records in the list. */
4160 LWLockRelease(SerializableXactHashLock);
4161 return;
4162 }
4163
4164 /*
4165 * Flag the conflict. But first, if this conflict creates a dangerous
4166 * structure, ereport an error.
4167 */
4168 FlagRWConflict(MySerializableXact, sxact);
4169 LWLockRelease(SerializableXactHashLock);
4170 }
4171
4172 /*
4173 * Check a particular target for rw-dependency conflict in. A subroutine of
4174 * CheckForSerializableConflictIn().
4175 */
4176 static void
CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG * targettag)4177 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
4178 {
4179 uint32 targettaghash;
4180 LWLock *partitionLock;
4181 PREDICATELOCKTARGET *target;
4182 PREDICATELOCK *predlock;
4183 PREDICATELOCK *mypredlock = NULL;
4184 PREDICATELOCKTAG mypredlocktag;
4185
4186 Assert(MySerializableXact != InvalidSerializableXact);
4187
4188 /*
4189 * The same hash and LW lock apply to the lock target and the lock itself.
4190 */
4191 targettaghash = PredicateLockTargetTagHashCode(targettag);
4192 partitionLock = PredicateLockHashPartitionLock(targettaghash);
4193 LWLockAcquire(partitionLock, LW_SHARED);
4194 target = (PREDICATELOCKTARGET *)
4195 hash_search_with_hash_value(PredicateLockTargetHash,
4196 targettag, targettaghash,
4197 HASH_FIND, NULL);
4198 if (!target)
4199 {
4200 /* Nothing has this target locked; we're done here. */
4201 LWLockRelease(partitionLock);
4202 return;
4203 }
4204
4205 /*
4206 * Each lock for an overlapping transaction represents a conflict: a
4207 * rw-dependency in to this transaction.
4208 */
4209 predlock = (PREDICATELOCK *)
4210 SHMQueueNext(&(target->predicateLocks),
4211 &(target->predicateLocks),
4212 offsetof(PREDICATELOCK, targetLink));
4213 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4214 while (predlock)
4215 {
4216 SHM_QUEUE *predlocktargetlink;
4217 PREDICATELOCK *nextpredlock;
4218 SERIALIZABLEXACT *sxact;
4219
4220 predlocktargetlink = &(predlock->targetLink);
4221 nextpredlock = (PREDICATELOCK *)
4222 SHMQueueNext(&(target->predicateLocks),
4223 predlocktargetlink,
4224 offsetof(PREDICATELOCK, targetLink));
4225
4226 sxact = predlock->tag.myXact;
4227 if (sxact == MySerializableXact)
4228 {
4229 /*
4230 * If we're getting a write lock on a tuple, we don't need a
4231 * predicate (SIREAD) lock on the same tuple. We can safely remove
4232 * our SIREAD lock, but we'll defer doing so until after the loop
4233 * because that requires upgrading to an exclusive partition lock.
4234 *
4235 * We can't use this optimization within a subtransaction because
4236 * the subtransaction could roll back, and we would be left
4237 * without any lock at the top level.
4238 */
4239 if (!IsSubTransaction()
4240 && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
4241 {
4242 mypredlock = predlock;
4243 mypredlocktag = predlock->tag;
4244 }
4245 }
4246 else if (!SxactIsDoomed(sxact)
4247 && (!SxactIsCommitted(sxact)
4248 || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
4249 sxact->finishedBefore))
4250 && !RWConflictExists(sxact, MySerializableXact))
4251 {
4252 LWLockRelease(SerializableXactHashLock);
4253 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4254
4255 /*
4256 * Re-check after getting exclusive lock because the other
4257 * transaction may have flagged a conflict.
4258 */
4259 if (!SxactIsDoomed(sxact)
4260 && (!SxactIsCommitted(sxact)
4261 || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
4262 sxact->finishedBefore))
4263 && !RWConflictExists(sxact, MySerializableXact))
4264 {
4265 FlagRWConflict(sxact, MySerializableXact);
4266 }
4267
4268 LWLockRelease(SerializableXactHashLock);
4269 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4270 }
4271
4272 predlock = nextpredlock;
4273 }
4274 LWLockRelease(SerializableXactHashLock);
4275 LWLockRelease(partitionLock);
4276
4277 /*
4278 * If we found one of our own SIREAD locks to remove, remove it now.
4279 *
4280 * At this point our transaction already has an ExclusiveRowLock on the
4281 * relation, so we are OK to drop the predicate lock on the tuple, if
4282 * found, without fearing that another write against the tuple will occur
4283 * before the MVCC information makes it to the buffer.
4284 */
4285 if (mypredlock != NULL)
4286 {
4287 uint32 predlockhashcode;
4288 PREDICATELOCK *rmpredlock;
4289
4290 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4291 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4292 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4293
4294 /*
4295 * Remove the predicate lock from shared memory, if it wasn't removed
4296 * while the locks were released. One way that could happen is from
4297 * autovacuum cleaning up an index.
4298 */
4299 predlockhashcode = PredicateLockHashCodeFromTargetHashCode
4300 (&mypredlocktag, targettaghash);
4301 rmpredlock = (PREDICATELOCK *)
4302 hash_search_with_hash_value(PredicateLockHash,
4303 &mypredlocktag,
4304 predlockhashcode,
4305 HASH_FIND, NULL);
4306 if (rmpredlock != NULL)
4307 {
4308 Assert(rmpredlock == mypredlock);
4309
4310 SHMQueueDelete(&(mypredlock->targetLink));
4311 SHMQueueDelete(&(mypredlock->xactLink));
4312
4313 rmpredlock = (PREDICATELOCK *)
4314 hash_search_with_hash_value(PredicateLockHash,
4315 &mypredlocktag,
4316 predlockhashcode,
4317 HASH_REMOVE, NULL);
4318 Assert(rmpredlock == mypredlock);
4319
4320 RemoveTargetIfNoLongerUsed(target, targettaghash);
4321 }
4322
4323 LWLockRelease(SerializableXactHashLock);
4324 LWLockRelease(partitionLock);
4325 LWLockRelease(SerializablePredicateLockListLock);
4326
4327 if (rmpredlock != NULL)
4328 {
4329 /*
4330 * Remove entry in local lock table if it exists. It's OK if it
4331 * doesn't exist; that means the lock was transferred to a new
4332 * target by a different backend.
4333 */
4334 hash_search_with_hash_value(LocalPredicateLockHash,
4335 targettag, targettaghash,
4336 HASH_REMOVE, NULL);
4337
4338 DecrementParentLocks(targettag);
4339 }
4340 }
4341 }
4342
4343 /*
4344 * CheckForSerializableConflictIn
4345 * We are writing the given tuple. If that indicates a rw-conflict
4346 * in from another serializable transaction, take appropriate action.
4347 *
4348 * Skip checking for any granularity for which a parameter is missing.
4349 *
4350 * A tuple update or delete is in conflict if we have a predicate lock
4351 * against the relation or page in which the tuple exists, or against the
4352 * tuple itself.
4353 */
4354 void
CheckForSerializableConflictIn(Relation relation,HeapTuple tuple,Buffer buffer)4355 CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
4356 Buffer buffer)
4357 {
4358 PREDICATELOCKTARGETTAG targettag;
4359
4360 if (!SerializationNeededForWrite(relation))
4361 return;
4362
4363 /* Check if someone else has already decided that we need to die */
4364 if (SxactIsDoomed(MySerializableXact))
4365 ereport(ERROR,
4366 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4367 errmsg("could not serialize access due to read/write dependencies among transactions"),
4368 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
4369 errhint("The transaction might succeed if retried.")));
4370
4371 /*
4372 * We're doing a write which might cause rw-conflicts now or later.
4373 * Memorize that fact.
4374 */
4375 MyXactDidWrite = true;
4376
4377 /*
4378 * It is important that we check for locks from the finest granularity to
4379 * the coarsest granularity, so that granularity promotion doesn't cause
4380 * us to miss a lock. The new (coarser) lock will be acquired before the
4381 * old (finer) locks are released.
4382 *
4383 * It is not possible to take and hold a lock across the checks for all
4384 * granularities because each target could be in a separate partition.
4385 */
4386 if (tuple != NULL)
4387 {
4388 SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
4389 relation->rd_node.dbNode,
4390 relation->rd_id,
4391 ItemPointerGetBlockNumber(&(tuple->t_self)),
4392 ItemPointerGetOffsetNumber(&(tuple->t_self)));
4393 CheckTargetForConflictsIn(&targettag);
4394 }
4395
4396 if (BufferIsValid(buffer))
4397 {
4398 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
4399 relation->rd_node.dbNode,
4400 relation->rd_id,
4401 BufferGetBlockNumber(buffer));
4402 CheckTargetForConflictsIn(&targettag);
4403 }
4404
4405 SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
4406 relation->rd_node.dbNode,
4407 relation->rd_id);
4408 CheckTargetForConflictsIn(&targettag);
4409 }
4410
4411 /*
4412 * CheckTableForSerializableConflictIn
4413 * The entire table is going through a DDL-style logical mass delete
4414 * like TRUNCATE or DROP TABLE. If that causes a rw-conflict in from
4415 * another serializable transaction, take appropriate action.
4416 *
4417 * While these operations do not operate entirely within the bounds of
4418 * snapshot isolation, they can occur inside a serializable transaction, and
4419 * will logically occur after any reads which saw rows which were destroyed
4420 * by these operations, so we do what we can to serialize properly under
4421 * SSI.
4422 *
4423 * The relation passed in must be a heap relation. Any predicate lock of any
4424 * granularity on the heap will cause a rw-conflict in to this transaction.
4425 * Predicate locks on indexes do not matter because they only exist to guard
4426 * against conflicting inserts into the index, and this is a mass *delete*.
4427 * When a table is truncated or dropped, the index will also be truncated
4428 * or dropped, and we'll deal with locks on the index when that happens.
4429 *
4430 * Dropping or truncating a table also needs to drop any existing predicate
4431 * locks on heap tuples or pages, because they're about to go away. This
4432 * should be done before altering the predicate locks because the transaction
4433 * could be rolled back because of a conflict, in which case the lock changes
4434 * are not needed. (At the moment, we don't actually bother to drop the
4435 * existing locks on a dropped or truncated table at the moment. That might
4436 * lead to some false positives, but it doesn't seem worth the trouble.)
4437 */
4438 void
CheckTableForSerializableConflictIn(Relation relation)4439 CheckTableForSerializableConflictIn(Relation relation)
4440 {
4441 HASH_SEQ_STATUS seqstat;
4442 PREDICATELOCKTARGET *target;
4443 Oid dbId;
4444 Oid heapId;
4445 int i;
4446
4447 /*
4448 * Bail out quickly if there are no serializable transactions running.
4449 * It's safe to check this without taking locks because the caller is
4450 * holding an ACCESS EXCLUSIVE lock on the relation. No new locks which
4451 * would matter here can be acquired while that is held.
4452 */
4453 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
4454 return;
4455
4456 if (!SerializationNeededForWrite(relation))
4457 return;
4458
4459 /*
4460 * We're doing a write which might cause rw-conflicts now or later.
4461 * Memorize that fact.
4462 */
4463 MyXactDidWrite = true;
4464
4465 Assert(relation->rd_index == NULL); /* not an index relation */
4466
4467 dbId = relation->rd_node.dbNode;
4468 heapId = relation->rd_id;
4469
4470 LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
4471 for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
4472 LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
4473 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4474
4475 /* Scan through target list */
4476 hash_seq_init(&seqstat, PredicateLockTargetHash);
4477
4478 while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
4479 {
4480 PREDICATELOCK *predlock;
4481
4482 /*
4483 * Check whether this is a target which needs attention.
4484 */
4485 if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId)
4486 continue; /* wrong relation id */
4487 if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
4488 continue; /* wrong database id */
4489
4490 /*
4491 * Loop through locks for this target and flag conflicts.
4492 */
4493 predlock = (PREDICATELOCK *)
4494 SHMQueueNext(&(target->predicateLocks),
4495 &(target->predicateLocks),
4496 offsetof(PREDICATELOCK, targetLink));
4497 while (predlock)
4498 {
4499 PREDICATELOCK *nextpredlock;
4500
4501 nextpredlock = (PREDICATELOCK *)
4502 SHMQueueNext(&(target->predicateLocks),
4503 &(predlock->targetLink),
4504 offsetof(PREDICATELOCK, targetLink));
4505
4506 if (predlock->tag.myXact != MySerializableXact
4507 && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
4508 {
4509 FlagRWConflict(predlock->tag.myXact, MySerializableXact);
4510 }
4511
4512 predlock = nextpredlock;
4513 }
4514 }
4515
4516 /* Release locks in reverse order */
4517 LWLockRelease(SerializableXactHashLock);
4518 for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
4519 LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
4520 LWLockRelease(SerializablePredicateLockListLock);
4521 }
4522
4523
4524 /*
4525 * Flag a rw-dependency between two serializable transactions.
4526 *
4527 * The caller is responsible for ensuring that we have a LW lock on
4528 * the transaction hash table.
4529 */
4530 static void
FlagRWConflict(SERIALIZABLEXACT * reader,SERIALIZABLEXACT * writer)4531 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
4532 {
4533 Assert(reader != writer);
4534
4535 /* First, see if this conflict causes failure. */
4536 OnConflict_CheckForSerializationFailure(reader, writer);
4537
4538 /* Actually do the conflict flagging. */
4539 if (reader == OldCommittedSxact)
4540 writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4541 else if (writer == OldCommittedSxact)
4542 reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4543 else
4544 SetRWConflict(reader, writer);
4545 }
4546
4547 /*----------------------------------------------------------------------------
4548 * We are about to add a RW-edge to the dependency graph - check that we don't
4549 * introduce a dangerous structure by doing so, and abort one of the
4550 * transactions if so.
4551 *
4552 * A serialization failure can only occur if there is a dangerous structure
4553 * in the dependency graph:
4554 *
4555 * Tin ------> Tpivot ------> Tout
4556 * rw rw
4557 *
4558 * Furthermore, Tout must commit first.
4559 *
4560 * One more optimization is that if Tin is declared READ ONLY (or commits
4561 * without writing), we can only have a problem if Tout committed before Tin
4562 * acquired its snapshot.
4563 *----------------------------------------------------------------------------
4564 */
4565 static void
OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT * reader,SERIALIZABLEXACT * writer)4566 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
4567 SERIALIZABLEXACT *writer)
4568 {
4569 bool failure;
4570 RWConflict conflict;
4571
4572 Assert(LWLockHeldByMe(SerializableXactHashLock));
4573
4574 failure = false;
4575
4576 /*------------------------------------------------------------------------
4577 * Check for already-committed writer with rw-conflict out flagged
4578 * (conflict-flag on W means that T2 committed before W):
4579 *
4580 * R ------> W ------> T2
4581 * rw rw
4582 *
4583 * That is a dangerous structure, so we must abort. (Since the writer
4584 * has already committed, we must be the reader)
4585 *------------------------------------------------------------------------
4586 */
4587 if (SxactIsCommitted(writer)
4588 && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
4589 failure = true;
4590
4591 /*------------------------------------------------------------------------
4592 * Check whether the writer has become a pivot with an out-conflict
4593 * committed transaction (T2), and T2 committed first:
4594 *
4595 * R ------> W ------> T2
4596 * rw rw
4597 *
4598 * Because T2 must've committed first, there is no anomaly if:
4599 * - the reader committed before T2
4600 * - the writer committed before T2
4601 * - the reader is a READ ONLY transaction and the reader was concurrent
4602 * with T2 (= reader acquired its snapshot before T2 committed)
4603 *
4604 * We also handle the case that T2 is prepared but not yet committed
4605 * here. In that case T2 has already checked for conflicts, so if it
4606 * commits first, making the above conflict real, it's too late for it
4607 * to abort.
4608 *------------------------------------------------------------------------
4609 */
4610 if (!failure)
4611 {
4612 if (SxactHasSummaryConflictOut(writer))
4613 {
4614 failure = true;
4615 conflict = NULL;
4616 }
4617 else
4618 conflict = (RWConflict)
4619 SHMQueueNext(&writer->outConflicts,
4620 &writer->outConflicts,
4621 offsetof(RWConflictData, outLink));
4622 while (conflict)
4623 {
4624 SERIALIZABLEXACT *t2 = conflict->sxactIn;
4625
4626 if (SxactIsPrepared(t2)
4627 && (!SxactIsCommitted(reader)
4628 || t2->prepareSeqNo <= reader->commitSeqNo)
4629 && (!SxactIsCommitted(writer)
4630 || t2->prepareSeqNo <= writer->commitSeqNo)
4631 && (!SxactIsReadOnly(reader)
4632 || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
4633 {
4634 failure = true;
4635 break;
4636 }
4637 conflict = (RWConflict)
4638 SHMQueueNext(&writer->outConflicts,
4639 &conflict->outLink,
4640 offsetof(RWConflictData, outLink));
4641 }
4642 }
4643
4644 /*------------------------------------------------------------------------
4645 * Check whether the reader has become a pivot with a writer
4646 * that's committed (or prepared):
4647 *
4648 * T0 ------> R ------> W
4649 * rw rw
4650 *
4651 * Because W must've committed first for an anomaly to occur, there is no
4652 * anomaly if:
4653 * - T0 committed before the writer
4654 * - T0 is READ ONLY, and overlaps the writer
4655 *------------------------------------------------------------------------
4656 */
4657 if (!failure && SxactIsPrepared(writer) && !SxactIsReadOnly(reader))
4658 {
4659 if (SxactHasSummaryConflictIn(reader))
4660 {
4661 failure = true;
4662 conflict = NULL;
4663 }
4664 else
4665 conflict = (RWConflict)
4666 SHMQueueNext(&reader->inConflicts,
4667 &reader->inConflicts,
4668 offsetof(RWConflictData, inLink));
4669 while (conflict)
4670 {
4671 SERIALIZABLEXACT *t0 = conflict->sxactOut;
4672
4673 if (!SxactIsDoomed(t0)
4674 && (!SxactIsCommitted(t0)
4675 || t0->commitSeqNo >= writer->prepareSeqNo)
4676 && (!SxactIsReadOnly(t0)
4677 || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
4678 {
4679 failure = true;
4680 break;
4681 }
4682 conflict = (RWConflict)
4683 SHMQueueNext(&reader->inConflicts,
4684 &conflict->inLink,
4685 offsetof(RWConflictData, inLink));
4686 }
4687 }
4688
4689 if (failure)
4690 {
4691 /*
4692 * We have to kill a transaction to avoid a possible anomaly from
4693 * occurring. If the writer is us, we can just ereport() to cause a
4694 * transaction abort. Otherwise we flag the writer for termination,
4695 * causing it to abort when it tries to commit. However, if the writer
4696 * is a prepared transaction, already prepared, we can't abort it
4697 * anymore, so we have to kill the reader instead.
4698 */
4699 if (MySerializableXact == writer)
4700 {
4701 LWLockRelease(SerializableXactHashLock);
4702 ereport(ERROR,
4703 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4704 errmsg("could not serialize access due to read/write dependencies among transactions"),
4705 errdetail_internal("Reason code: Canceled on identification as a pivot, during write."),
4706 errhint("The transaction might succeed if retried.")));
4707 }
4708 else if (SxactIsPrepared(writer))
4709 {
4710 LWLockRelease(SerializableXactHashLock);
4711
4712 /* if we're not the writer, we have to be the reader */
4713 Assert(MySerializableXact == reader);
4714 ereport(ERROR,
4715 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4716 errmsg("could not serialize access due to read/write dependencies among transactions"),
4717 errdetail_internal("Reason code: Canceled on conflict out to pivot %u, during read.", writer->topXid),
4718 errhint("The transaction might succeed if retried.")));
4719 }
4720 writer->flags |= SXACT_FLAG_DOOMED;
4721 }
4722 }
4723
4724 /*
4725 * PreCommit_CheckForSerializableConflicts
4726 * Check for dangerous structures in a serializable transaction
4727 * at commit.
4728 *
4729 * We're checking for a dangerous structure as each conflict is recorded.
4730 * The only way we could have a problem at commit is if this is the "out"
4731 * side of a pivot, and neither the "in" side nor the pivot has yet
4732 * committed.
4733 *
4734 * If a dangerous structure is found, the pivot (the near conflict) is
4735 * marked for death, because rolling back another transaction might mean
4736 * that we flail without ever making progress. This transaction is
4737 * committing writes, so letting it commit ensures progress. If we
4738 * canceled the far conflict, it might immediately fail again on retry.
4739 */
4740 void
PreCommit_CheckForSerializationFailure(void)4741 PreCommit_CheckForSerializationFailure(void)
4742 {
4743 RWConflict nearConflict;
4744
4745 if (MySerializableXact == InvalidSerializableXact)
4746 return;
4747
4748 Assert(IsolationIsSerializable());
4749
4750 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4751
4752 /* Check if someone else has already decided that we need to die */
4753 if (SxactIsDoomed(MySerializableXact))
4754 {
4755 LWLockRelease(SerializableXactHashLock);
4756 ereport(ERROR,
4757 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4758 errmsg("could not serialize access due to read/write dependencies among transactions"),
4759 errdetail_internal("Reason code: Canceled on identification as a pivot, during commit attempt."),
4760 errhint("The transaction might succeed if retried.")));
4761 }
4762
4763 nearConflict = (RWConflict)
4764 SHMQueueNext(&MySerializableXact->inConflicts,
4765 &MySerializableXact->inConflicts,
4766 offsetof(RWConflictData, inLink));
4767 while (nearConflict)
4768 {
4769 if (!SxactIsCommitted(nearConflict->sxactOut)
4770 && !SxactIsDoomed(nearConflict->sxactOut))
4771 {
4772 RWConflict farConflict;
4773
4774 farConflict = (RWConflict)
4775 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4776 &nearConflict->sxactOut->inConflicts,
4777 offsetof(RWConflictData, inLink));
4778 while (farConflict)
4779 {
4780 if (farConflict->sxactOut == MySerializableXact
4781 || (!SxactIsCommitted(farConflict->sxactOut)
4782 && !SxactIsReadOnly(farConflict->sxactOut)
4783 && !SxactIsDoomed(farConflict->sxactOut)))
4784 {
4785 /*
4786 * Normally, we kill the pivot transaction to make sure we
4787 * make progress if the failing transaction is retried.
4788 * However, we can't kill it if it's already prepared, so
4789 * in that case we commit suicide instead.
4790 */
4791 if (SxactIsPrepared(nearConflict->sxactOut))
4792 {
4793 LWLockRelease(SerializableXactHashLock);
4794 ereport(ERROR,
4795 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4796 errmsg("could not serialize access due to read/write dependencies among transactions"),
4797 errdetail_internal("Reason code: Canceled on commit attempt with conflict in from prepared pivot."),
4798 errhint("The transaction might succeed if retried.")));
4799 }
4800 nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
4801 break;
4802 }
4803 farConflict = (RWConflict)
4804 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4805 &farConflict->inLink,
4806 offsetof(RWConflictData, inLink));
4807 }
4808 }
4809
4810 nearConflict = (RWConflict)
4811 SHMQueueNext(&MySerializableXact->inConflicts,
4812 &nearConflict->inLink,
4813 offsetof(RWConflictData, inLink));
4814 }
4815
4816 MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
4817 MySerializableXact->flags |= SXACT_FLAG_PREPARED;
4818
4819 LWLockRelease(SerializableXactHashLock);
4820 }
4821
4822 /*------------------------------------------------------------------------*/
4823
4824 /*
4825 * Two-phase commit support
4826 */
4827
4828 /*
4829 * AtPrepare_Locks
4830 * Do the preparatory work for a PREPARE: make 2PC state file
4831 * records for all predicate locks currently held.
4832 */
4833 void
AtPrepare_PredicateLocks(void)4834 AtPrepare_PredicateLocks(void)
4835 {
4836 PREDICATELOCK *predlock;
4837 SERIALIZABLEXACT *sxact;
4838 TwoPhasePredicateRecord record;
4839 TwoPhasePredicateXactRecord *xactRecord;
4840 TwoPhasePredicateLockRecord *lockRecord;
4841
4842 sxact = MySerializableXact;
4843 xactRecord = &(record.data.xactRecord);
4844 lockRecord = &(record.data.lockRecord);
4845
4846 if (MySerializableXact == InvalidSerializableXact)
4847 return;
4848
4849 /* Generate an xact record for our SERIALIZABLEXACT */
4850 record.type = TWOPHASEPREDICATERECORD_XACT;
4851 xactRecord->xmin = MySerializableXact->xmin;
4852 xactRecord->flags = MySerializableXact->flags;
4853
4854 /*
4855 * Note that we don't include the list of conflicts in our out in the
4856 * statefile, because new conflicts can be added even after the
4857 * transaction prepares. We'll just make a conservative assumption during
4858 * recovery instead.
4859 */
4860
4861 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4862 &record, sizeof(record));
4863
4864 /*
4865 * Generate a lock record for each lock.
4866 *
4867 * To do this, we need to walk the predicate lock list in our sxact rather
4868 * than using the local predicate lock table because the latter is not
4869 * guaranteed to be accurate.
4870 */
4871 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4872
4873 predlock = (PREDICATELOCK *)
4874 SHMQueueNext(&(sxact->predicateLocks),
4875 &(sxact->predicateLocks),
4876 offsetof(PREDICATELOCK, xactLink));
4877
4878 while (predlock != NULL)
4879 {
4880 record.type = TWOPHASEPREDICATERECORD_LOCK;
4881 lockRecord->target = predlock->tag.myTarget->tag;
4882
4883 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4884 &record, sizeof(record));
4885
4886 predlock = (PREDICATELOCK *)
4887 SHMQueueNext(&(sxact->predicateLocks),
4888 &(predlock->xactLink),
4889 offsetof(PREDICATELOCK, xactLink));
4890 }
4891
4892 LWLockRelease(SerializablePredicateLockListLock);
4893 }
4894
4895 /*
4896 * PostPrepare_Locks
4897 * Clean up after successful PREPARE. Unlike the non-predicate
4898 * lock manager, we do not need to transfer locks to a dummy
4899 * PGPROC because our SERIALIZABLEXACT will stay around
4900 * anyway. We only need to clean up our local state.
4901 */
4902 void
PostPrepare_PredicateLocks(TransactionId xid)4903 PostPrepare_PredicateLocks(TransactionId xid)
4904 {
4905 if (MySerializableXact == InvalidSerializableXact)
4906 return;
4907
4908 Assert(SxactIsPrepared(MySerializableXact));
4909
4910 MySerializableXact->pid = 0;
4911
4912 hash_destroy(LocalPredicateLockHash);
4913 LocalPredicateLockHash = NULL;
4914
4915 MySerializableXact = InvalidSerializableXact;
4916 MyXactDidWrite = false;
4917 }
4918
4919 /*
4920 * PredicateLockTwoPhaseFinish
4921 * Release a prepared transaction's predicate locks once it
4922 * commits or aborts.
4923 */
4924 void
PredicateLockTwoPhaseFinish(TransactionId xid,bool isCommit)4925 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
4926 {
4927 SERIALIZABLEXID *sxid;
4928 SERIALIZABLEXIDTAG sxidtag;
4929
4930 sxidtag.xid = xid;
4931
4932 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4933 sxid = (SERIALIZABLEXID *)
4934 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4935 LWLockRelease(SerializableXactHashLock);
4936
4937 /* xid will not be found if it wasn't a serializable transaction */
4938 if (sxid == NULL)
4939 return;
4940
4941 /* Release its locks */
4942 MySerializableXact = sxid->myXact;
4943 MyXactDidWrite = true; /* conservatively assume that we wrote
4944 * something */
4945 ReleasePredicateLocks(isCommit);
4946 }
4947
4948 /*
4949 * Re-acquire a predicate lock belonging to a transaction that was prepared.
4950 */
4951 void
predicatelock_twophase_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4952 predicatelock_twophase_recover(TransactionId xid, uint16 info,
4953 void *recdata, uint32 len)
4954 {
4955 TwoPhasePredicateRecord *record;
4956
4957 Assert(len == sizeof(TwoPhasePredicateRecord));
4958
4959 record = (TwoPhasePredicateRecord *) recdata;
4960
4961 Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
4962 (record->type == TWOPHASEPREDICATERECORD_LOCK));
4963
4964 if (record->type == TWOPHASEPREDICATERECORD_XACT)
4965 {
4966 /* Per-transaction record. Set up a SERIALIZABLEXACT. */
4967 TwoPhasePredicateXactRecord *xactRecord;
4968 SERIALIZABLEXACT *sxact;
4969 SERIALIZABLEXID *sxid;
4970 SERIALIZABLEXIDTAG sxidtag;
4971 bool found;
4972
4973 xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
4974
4975 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4976 sxact = CreatePredXact();
4977 if (!sxact)
4978 ereport(ERROR,
4979 (errcode(ERRCODE_OUT_OF_MEMORY),
4980 errmsg("out of shared memory")));
4981
4982 /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
4983 sxact->vxid.backendId = InvalidBackendId;
4984 sxact->vxid.localTransactionId = (LocalTransactionId) xid;
4985 sxact->pid = 0;
4986
4987 /* a prepared xact hasn't committed yet */
4988 sxact->prepareSeqNo = RecoverySerCommitSeqNo;
4989 sxact->commitSeqNo = InvalidSerCommitSeqNo;
4990 sxact->finishedBefore = InvalidTransactionId;
4991
4992 sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
4993
4994 /*
4995 * Don't need to track this; no transactions running at the time the
4996 * recovered xact started are still active, except possibly other
4997 * prepared xacts and we don't care whether those are RO_SAFE or not.
4998 */
4999 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
5000
5001 SHMQueueInit(&(sxact->predicateLocks));
5002 SHMQueueElemInit(&(sxact->finishedLink));
5003
5004 sxact->topXid = xid;
5005 sxact->xmin = xactRecord->xmin;
5006 sxact->flags = xactRecord->flags;
5007 Assert(SxactIsPrepared(sxact));
5008 if (!SxactIsReadOnly(sxact))
5009 {
5010 ++(PredXact->WritableSxactCount);
5011 Assert(PredXact->WritableSxactCount <=
5012 (MaxBackends + max_prepared_xacts));
5013 }
5014
5015 /*
5016 * We don't know whether the transaction had any conflicts or not, so
5017 * we'll conservatively assume that it had both a conflict in and a
5018 * conflict out, and represent that with the summary conflict flags.
5019 */
5020 SHMQueueInit(&(sxact->outConflicts));
5021 SHMQueueInit(&(sxact->inConflicts));
5022 sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
5023 sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
5024
5025 /* Register the transaction's xid */
5026 sxidtag.xid = xid;
5027 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
5028 &sxidtag,
5029 HASH_ENTER, &found);
5030 Assert(sxid != NULL);
5031 Assert(!found);
5032 sxid->myXact = (SERIALIZABLEXACT *) sxact;
5033
5034 /*
5035 * Update global xmin. Note that this is a special case compared to
5036 * registering a normal transaction, because the global xmin might go
5037 * backwards. That's OK, because until recovery is over we're not
5038 * going to complete any transactions or create any non-prepared
5039 * transactions, so there's no danger of throwing away.
5040 */
5041 if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
5042 (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
5043 {
5044 PredXact->SxactGlobalXmin = sxact->xmin;
5045 PredXact->SxactGlobalXminCount = 1;
5046 OldSerXidSetActiveSerXmin(sxact->xmin);
5047 }
5048 else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
5049 {
5050 Assert(PredXact->SxactGlobalXminCount > 0);
5051 PredXact->SxactGlobalXminCount++;
5052 }
5053
5054 LWLockRelease(SerializableXactHashLock);
5055 }
5056 else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
5057 {
5058 /* Lock record. Recreate the PREDICATELOCK */
5059 TwoPhasePredicateLockRecord *lockRecord;
5060 SERIALIZABLEXID *sxid;
5061 SERIALIZABLEXACT *sxact;
5062 SERIALIZABLEXIDTAG sxidtag;
5063 uint32 targettaghash;
5064
5065 lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
5066 targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
5067
5068 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
5069 sxidtag.xid = xid;
5070 sxid = (SERIALIZABLEXID *)
5071 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
5072 LWLockRelease(SerializableXactHashLock);
5073
5074 Assert(sxid != NULL);
5075 sxact = sxid->myXact;
5076 Assert(sxact != InvalidSerializableXact);
5077
5078 CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
5079 }
5080 }
5081