1 /*-------------------------------------------------------------------------
2  *
3  * predicate.c
4  *	  POSTGRES predicate locking
5  *	  to support full serializable transaction isolation
6  *
7  *
8  * The approach taken is to implement Serializable Snapshot Isolation (SSI)
9  * as initially described in this paper:
10  *
11  *	Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
12  *	Serializable isolation for snapshot databases.
13  *	In SIGMOD '08: Proceedings of the 2008 ACM SIGMOD
14  *	international conference on Management of data,
15  *	pages 729-738, New York, NY, USA. ACM.
16  *	http://doi.acm.org/10.1145/1376616.1376690
17  *
18  * and further elaborated in Cahill's doctoral thesis:
19  *
20  *	Michael James Cahill. 2009.
21  *	Serializable Isolation for Snapshot Databases.
22  *	Sydney Digital Theses.
23  *	University of Sydney, School of Information Technologies.
24  *	http://hdl.handle.net/2123/5353
25  *
26  *
27  * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
28  * locks, which are so different from normal locks that a distinct set of
29  * structures is required to handle them.  They are needed to detect
30  * rw-conflicts when the read happens before the write.  (When the write
31  * occurs first, the reading transaction can check for a conflict by
32  * examining the MVCC data.)
33  *
34  * (1)	Besides tuples actually read, they must cover ranges of tuples
35  *		which would have been read based on the predicate.  This will
36  *		require modelling the predicates through locks against database
37  *		objects such as pages, index ranges, or entire tables.
38  *
39  * (2)	They must be kept in RAM for quick access.  Because of this, it
40  *		isn't possible to always maintain tuple-level granularity -- when
41  *		the space allocated to store these approaches exhaustion, a
42  *		request for a lock may need to scan for situations where a single
43  *		transaction holds many fine-grained locks which can be coalesced
44  *		into a single coarser-grained lock.
45  *
46  * (3)	They never block anything; they are more like flags than locks
47  *		in that regard; although they refer to database objects and are
48  *		used to identify rw-conflicts with normal write locks.
49  *
50  * (4)	While they are associated with a transaction, they must survive
51  *		a successful COMMIT of that transaction, and remain until all
52  *		overlapping transactions complete.  This even means that they
53  *		must survive termination of the transaction's process.  If a
54  *		top level transaction is rolled back, however, it is immediately
55  *		flagged so that it can be ignored, and its SIREAD locks can be
56  *		released any time after that.
57  *
58  * (5)	The only transactions which create SIREAD locks or check for
59  *		conflicts with them are serializable transactions.
60  *
61  * (6)	When a write lock for a top level transaction is found to cover
62  *		an existing SIREAD lock for the same transaction, the SIREAD lock
63  *		can be deleted.
64  *
65  * (7)	A write from a serializable transaction must ensure that an xact
66  *		record exists for the transaction, with the same lifespan (until
67  *		all concurrent transaction complete or the transaction is rolled
68  *		back) so that rw-dependencies to that transaction can be
69  *		detected.
70  *
71  * We use an optimization for read-only transactions. Under certain
72  * circumstances, a read-only transaction's snapshot can be shown to
73  * never have conflicts with other transactions.  This is referred to
74  * as a "safe" snapshot (and one known not to be is "unsafe").
75  * However, it can't be determined whether a snapshot is safe until
76  * all concurrent read/write transactions complete.
77  *
78  * Once a read-only transaction is known to have a safe snapshot, it
79  * can release its predicate locks and exempt itself from further
80  * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
81  * on safe snapshots, waiting as necessary for one to be available.
82  *
83  *
84  * Lightweight locks to manage access to the predicate locking shared
85  * memory objects must be taken in this order, and should be released in
86  * reverse order:
87  *
88  *	SerializableFinishedListLock
89  *		- Protects the list of transactions which have completed but which
90  *			may yet matter because they overlap still-active transactions.
91  *
92  *	SerializablePredicateLockListLock
93  *		- Protects the linked list of locks held by a transaction.  Note
94  *			that the locks themselves are also covered by the partition
95  *			locks of their respective lock targets; this lock only affects
96  *			the linked list connecting the locks related to a transaction.
97  *		- All transactions share this single lock (with no partitioning).
98  *		- There is never a need for a process other than the one running
99  *			an active transaction to walk the list of locks held by that
100  *			transaction.
101  *		- It is relatively infrequent that another process needs to
102  *			modify the list for a transaction, but it does happen for such
103  *			things as index page splits for pages with predicate locks and
104  *			freeing of predicate locked pages by a vacuum process.  When
105  *			removing a lock in such cases, the lock itself contains the
106  *			pointers needed to remove it from the list.  When adding a
107  *			lock in such cases, the lock can be added using the anchor in
108  *			the transaction structure.  Neither requires walking the list.
109  *		- Cleaning up the list for a terminated transaction is sometimes
110  *			not done on a retail basis, in which case no lock is required.
111  *		- Due to the above, a process accessing its active transaction's
112  *			list always uses a shared lock, regardless of whether it is
113  *			walking or maintaining the list.  This improves concurrency
114  *			for the common access patterns.
115  *		- A process which needs to alter the list of a transaction other
116  *			than its own active transaction must acquire an exclusive
117  *			lock.
118  *
119  *	PredicateLockHashPartitionLock(hashcode)
120  *		- The same lock protects a target, all locks on that target, and
121  *			the linked list of locks on the target.
122  *		- When more than one is needed, acquire in ascending address order.
123  *		- When all are needed (rare), acquire in ascending index order with
124  *			PredicateLockHashPartitionLockByIndex(index).
125  *
126  *	SerializableXactHashLock
127  *		- Protects both PredXact and SerializableXidHash.
128  *
129  *
130  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
131  * Portions Copyright (c) 1994, Regents of the University of California
132  *
133  *
134  * IDENTIFICATION
135  *	  src/backend/storage/lmgr/predicate.c
136  *
137  *-------------------------------------------------------------------------
138  */
139 /*
140  * INTERFACE ROUTINES
141  *
142  * housekeeping for setting up shared memory predicate lock structures
143  *		InitPredicateLocks(void)
144  *		PredicateLockShmemSize(void)
145  *
146  * predicate lock reporting
147  *		GetPredicateLockStatusData(void)
148  *		PageIsPredicateLocked(Relation relation, BlockNumber blkno)
149  *
150  * predicate lock maintenance
151  *		GetSerializableTransactionSnapshot(Snapshot snapshot)
152  *		SetSerializableTransactionSnapshot(Snapshot snapshot,
153  *										   TransactionId sourcexid)
154  *		RegisterPredicateLockingXid(void)
155  *		PredicateLockRelation(Relation relation, Snapshot snapshot)
156  *		PredicateLockPage(Relation relation, BlockNumber blkno,
157  *						Snapshot snapshot)
158  *		PredicateLockTuple(Relation relation, HeapTuple tuple,
159  *						Snapshot snapshot)
160  *		PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
161  *							   BlockNumber newblkno)
162  *		PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
163  *								 BlockNumber newblkno)
164  *		TransferPredicateLocksToHeapRelation(Relation relation)
165  *		ReleasePredicateLocks(bool isCommit)
166  *
167  * conflict detection (may also trigger rollback)
168  *		CheckForSerializableConflictOut(bool visible, Relation relation,
169  *										HeapTupleData *tup, Buffer buffer,
170  *										Snapshot snapshot)
171  *		CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
172  *									   Buffer buffer)
173  *		CheckTableForSerializableConflictIn(Relation relation)
174  *
175  * final rollback checking
176  *		PreCommit_CheckForSerializationFailure(void)
177  *
178  * two-phase commit support
179  *		AtPrepare_PredicateLocks(void);
180  *		PostPrepare_PredicateLocks(TransactionId xid);
181  *		PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
182  *		predicatelock_twophase_recover(TransactionId xid, uint16 info,
183  *									   void *recdata, uint32 len);
184  */
185 
186 #include "postgres.h"
187 
188 #include "access/htup_details.h"
189 #include "access/slru.h"
190 #include "access/subtrans.h"
191 #include "access/transam.h"
192 #include "access/twophase.h"
193 #include "access/twophase_rmgr.h"
194 #include "access/xact.h"
195 #include "access/xlog.h"
196 #include "miscadmin.h"
197 #include "storage/bufmgr.h"
198 #include "storage/predicate.h"
199 #include "storage/predicate_internals.h"
200 #include "storage/proc.h"
201 #include "storage/procarray.h"
202 #include "utils/rel.h"
203 #include "utils/snapmgr.h"
204 #include "utils/tqual.h"
205 
206 /* Uncomment the next line to test the graceful degradation code. */
207 /* #define TEST_OLDSERXID */
208 
209 /*
210  * Test the most selective fields first, for performance.
211  *
212  * a is covered by b if all of the following hold:
213  *	1) a.database = b.database
214  *	2) a.relation = b.relation
215  *	3) b.offset is invalid (b is page-granularity or higher)
216  *	4) either of the following:
217  *		4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
218  *	 or 4b) a.offset is invalid and b.page is invalid (a is
219  *			page-granularity and b is relation-granularity
220  */
221 #define TargetTagIsCoveredBy(covered_target, covering_target)			\
222 	((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */	\
223 	  GET_PREDICATELOCKTARGETTAG_RELATION(covering_target))				\
224 	 && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) ==			\
225 		 InvalidOffsetNumber)								 /* (3) */	\
226 	 && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) !=			\
227 		   InvalidOffsetNumber)								 /* (4a) */ \
228 		  && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==		\
229 			  GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)))			\
230 		 || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==		\
231 			  InvalidBlockNumber)							 /* (4b) */ \
232 			 && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)		\
233 				 != InvalidBlockNumber)))								\
234 	 && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) ==	 /* (1) */	\
235 		 GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
236 
237 /*
238  * The predicate locking target and lock shared hash tables are partitioned to
239  * reduce contention.  To determine which partition a given target belongs to,
240  * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
241  * apply one of these macros.
242  * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
243  */
244 #define PredicateLockHashPartition(hashcode) \
245 	((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
246 #define PredicateLockHashPartitionLock(hashcode) \
247 	(&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \
248 		PredicateLockHashPartition(hashcode)].lock)
249 #define PredicateLockHashPartitionLockByIndex(i) \
250 	(&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
251 
252 #define NPREDICATELOCKTARGETENTS() \
253 	mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
254 
255 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
256 
257 /*
258  * Note that a sxact is marked "prepared" once it has passed
259  * PreCommit_CheckForSerializationFailure, even if it isn't using
260  * 2PC. This is the point at which it can no longer be aborted.
261  *
262  * The PREPARED flag remains set after commit, so SxactIsCommitted
263  * implies SxactIsPrepared.
264  */
265 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
266 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
267 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
268 #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
269 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
270 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
271 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
272 /*
273  * The following macro actually means that the specified transaction has a
274  * conflict out *to a transaction which committed ahead of it*.  It's hard
275  * to get that into a name of a reasonable length.
276  */
277 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
278 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
279 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
280 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
281 
282 /*
283  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
284  *
285  * To avoid unnecessary recomputations of the hash code, we try to do this
286  * just once per function, and then pass it around as needed.  Aside from
287  * passing the hashcode to hash_search_with_hash_value(), we can extract
288  * the lock partition number from the hashcode.
289  */
290 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
291 	get_hash_value(PredicateLockTargetHash, predicatelocktargettag)
292 
293 /*
294  * Given a predicate lock tag, and the hash for its target,
295  * compute the lock hash.
296  *
297  * To make the hash code also depend on the transaction, we xor the sxid
298  * struct's address into the hash code, left-shifted so that the
299  * partition-number bits don't change.  Since this is only a hash, we
300  * don't care if we lose high-order bits of the address; use an
301  * intermediate variable to suppress cast-pointer-to-int warnings.
302  */
303 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
304 	((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
305 	 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
306 
307 
308 /*
309  * The SLRU buffer area through which we access the old xids.
310  */
311 static SlruCtlData OldSerXidSlruCtlData;
312 
313 #define OldSerXidSlruCtl			(&OldSerXidSlruCtlData)
314 
315 #define OLDSERXID_PAGESIZE			BLCKSZ
316 #define OLDSERXID_ENTRYSIZE			sizeof(SerCommitSeqNo)
317 #define OLDSERXID_ENTRIESPERPAGE	(OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
318 
319 /*
320  * Set maximum pages based on the lesser of the number needed to track all
321  * transactions and the maximum that SLRU supports.
322  */
323 #define OLDSERXID_MAX_PAGE			Min(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1, \
324 										(MaxTransactionId) / OLDSERXID_ENTRIESPERPAGE)
325 
326 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
327 
328 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
329 	(OldSerXidSlruCtl->shared->page_buffer[slotno] + \
330 	((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
331 
332 #define OldSerXidPage(xid)	((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
333 #define OldSerXidSegment(page)	((page) / SLRU_PAGES_PER_SEGMENT)
334 
335 typedef struct OldSerXidControlData
336 {
337 	int			headPage;		/* newest initialized page */
338 	TransactionId headXid;		/* newest valid Xid in the SLRU */
339 	TransactionId tailXid;		/* oldest xmin we might be interested in */
340 	bool		warningIssued;	/* have we issued SLRU wrap-around warning? */
341 }	OldSerXidControlData;
342 
343 typedef struct OldSerXidControlData *OldSerXidControl;
344 
345 static OldSerXidControl oldSerXidControl;
346 
347 /*
348  * When the oldest committed transaction on the "finished" list is moved to
349  * SLRU, its predicate locks will be moved to this "dummy" transaction,
350  * collapsing duplicate targets.  When a duplicate is found, the later
351  * commitSeqNo is used.
352  */
353 static SERIALIZABLEXACT *OldCommittedSxact;
354 
355 
356 /* This configuration variable is used to set the predicate lock table size */
357 int			max_predicate_locks_per_xact;		/* set by guc.c */
358 
359 /*
360  * This provides a list of objects in order to track transactions
361  * participating in predicate locking.  Entries in the list are fixed size,
362  * and reside in shared memory.  The memory address of an entry must remain
363  * fixed during its lifetime.  The list will be protected from concurrent
364  * update externally; no provision is made in this code to manage that.  The
365  * number of entries in the list, and the size allowed for each entry is
366  * fixed upon creation.
367  */
368 static PredXactList PredXact;
369 
370 /*
371  * This provides a pool of RWConflict data elements to use in conflict lists
372  * between transactions.
373  */
374 static RWConflictPoolHeader RWConflictPool;
375 
376 /*
377  * The predicate locking hash tables are in shared memory.
378  * Each backend keeps pointers to them.
379  */
380 static HTAB *SerializableXidHash;
381 static HTAB *PredicateLockTargetHash;
382 static HTAB *PredicateLockHash;
383 static SHM_QUEUE *FinishedSerializableTransactions;
384 
385 /*
386  * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing
387  * this entry, you can ensure that there's enough scratch space available for
388  * inserting one entry in the hash table. This is an otherwise-invalid tag.
389  */
390 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0};
391 static uint32 ScratchTargetTagHash;
392 static LWLock *ScratchPartitionLock;
393 
394 /*
395  * The local hash table used to determine when to combine multiple fine-
396  * grained locks into a single courser-grained lock.
397  */
398 static HTAB *LocalPredicateLockHash = NULL;
399 
400 /*
401  * Keep a pointer to the currently-running serializable transaction (if any)
402  * for quick reference. Also, remember if we have written anything that could
403  * cause a rw-conflict.
404  */
405 static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
406 static bool MyXactDidWrite = false;
407 
408 /* local functions */
409 
410 static SERIALIZABLEXACT *CreatePredXact(void);
411 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
412 static SERIALIZABLEXACT *FirstPredXact(void);
413 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
414 
415 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
416 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
417 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
418 static void ReleaseRWConflict(RWConflict conflict);
419 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
420 
421 static bool OldSerXidPagePrecedesLogically(int page1, int page2);
422 static void OldSerXidInit(void);
423 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
424 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
425 static void OldSerXidSetActiveSerXmin(TransactionId xid);
426 
427 static uint32 predicatelock_hash(const void *key, Size keysize);
428 static void SummarizeOldestCommittedSxact(void);
429 static Snapshot GetSafeSnapshot(Snapshot snapshot);
430 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
431 									  TransactionId sourcexid);
432 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
433 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
434 						  PREDICATELOCKTARGETTAG *parent);
435 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
436 static void RemoveScratchTarget(bool lockheld);
437 static void RestoreScratchTarget(bool lockheld);
438 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
439 						   uint32 targettaghash);
440 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
441 static int	PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
442 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
443 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
444 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
445 					uint32 targettaghash,
446 					SERIALIZABLEXACT *sxact);
447 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
448 static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
449 								  PREDICATELOCKTARGETTAG newtargettag,
450 								  bool removeOld);
451 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
452 static void DropAllPredicateLocksFromTable(Relation relation,
453 							   bool transfer);
454 static void SetNewSxactGlobalXmin(void);
455 static void ClearOldPredicateLocks(void);
456 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
457 						   bool summarize);
458 static bool XidIsConcurrent(TransactionId xid);
459 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
460 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
461 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
462 										SERIALIZABLEXACT *writer);
463 
464 
465 /*------------------------------------------------------------------------*/
466 
467 /*
468  * Does this relation participate in predicate locking? Temporary and system
469  * relations are exempt, as are materialized views.
470  */
471 static inline bool
PredicateLockingNeededForRelation(Relation relation)472 PredicateLockingNeededForRelation(Relation relation)
473 {
474 	return !(relation->rd_id < FirstBootstrapObjectId ||
475 			 RelationUsesLocalBuffers(relation) ||
476 			 relation->rd_rel->relkind == RELKIND_MATVIEW);
477 }
478 
479 /*
480  * When a public interface method is called for a read, this is the test to
481  * see if we should do a quick return.
482  *
483  * Note: this function has side-effects! If this transaction has been flagged
484  * as RO-safe since the last call, we release all predicate locks and reset
485  * MySerializableXact. That makes subsequent calls to return quickly.
486  *
487  * This is marked as 'inline' to make to eliminate the function call overhead
488  * in the common case that serialization is not needed.
489  */
490 static inline bool
SerializationNeededForRead(Relation relation,Snapshot snapshot)491 SerializationNeededForRead(Relation relation, Snapshot snapshot)
492 {
493 	/* Nothing to do if this is not a serializable transaction */
494 	if (MySerializableXact == InvalidSerializableXact)
495 		return false;
496 
497 	/*
498 	 * Don't acquire locks or conflict when scanning with a special snapshot.
499 	 * This excludes things like CLUSTER and REINDEX. They use the wholesale
500 	 * functions TransferPredicateLocksToHeapRelation() and
501 	 * CheckTableForSerializableConflictIn() to participate in serialization,
502 	 * but the scans involved don't need serialization.
503 	 */
504 	if (!IsMVCCSnapshot(snapshot))
505 		return false;
506 
507 	/*
508 	 * Check if we have just become "RO-safe". If we have, immediately release
509 	 * all locks as they're not needed anymore. This also resets
510 	 * MySerializableXact, so that subsequent calls to this function can exit
511 	 * quickly.
512 	 *
513 	 * A transaction is flagged as RO_SAFE if all concurrent R/W transactions
514 	 * commit without having conflicts out to an earlier snapshot, thus
515 	 * ensuring that no conflicts are possible for this transaction.
516 	 */
517 	if (SxactIsROSafe(MySerializableXact))
518 	{
519 		ReleasePredicateLocks(false);
520 		return false;
521 	}
522 
523 	/* Check if the relation doesn't participate in predicate locking */
524 	if (!PredicateLockingNeededForRelation(relation))
525 		return false;
526 
527 	return true;				/* no excuse to skip predicate locking */
528 }
529 
530 /*
531  * Like SerializationNeededForRead(), but called on writes.
532  * The logic is the same, but there is no snapshot and we can't be RO-safe.
533  */
534 static inline bool
SerializationNeededForWrite(Relation relation)535 SerializationNeededForWrite(Relation relation)
536 {
537 	/* Nothing to do if this is not a serializable transaction */
538 	if (MySerializableXact == InvalidSerializableXact)
539 		return false;
540 
541 	/* Check if the relation doesn't participate in predicate locking */
542 	if (!PredicateLockingNeededForRelation(relation))
543 		return false;
544 
545 	return true;				/* no excuse to skip predicate locking */
546 }
547 
548 
549 /*------------------------------------------------------------------------*/
550 
551 /*
552  * These functions are a simple implementation of a list for this specific
553  * type of struct.  If there is ever a generalized shared memory list, we
554  * should probably switch to that.
555  */
556 static SERIALIZABLEXACT *
CreatePredXact(void)557 CreatePredXact(void)
558 {
559 	PredXactListElement ptle;
560 
561 	ptle = (PredXactListElement)
562 		SHMQueueNext(&PredXact->availableList,
563 					 &PredXact->availableList,
564 					 offsetof(PredXactListElementData, link));
565 	if (!ptle)
566 		return NULL;
567 
568 	SHMQueueDelete(&ptle->link);
569 	SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
570 	return &ptle->sxact;
571 }
572 
573 static void
ReleasePredXact(SERIALIZABLEXACT * sxact)574 ReleasePredXact(SERIALIZABLEXACT *sxact)
575 {
576 	PredXactListElement ptle;
577 
578 	Assert(ShmemAddrIsValid(sxact));
579 
580 	ptle = (PredXactListElement)
581 		(((char *) sxact)
582 		 - offsetof(PredXactListElementData, sxact)
583 		 + offsetof(PredXactListElementData, link));
584 	SHMQueueDelete(&ptle->link);
585 	SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
586 }
587 
588 static SERIALIZABLEXACT *
FirstPredXact(void)589 FirstPredXact(void)
590 {
591 	PredXactListElement ptle;
592 
593 	ptle = (PredXactListElement)
594 		SHMQueueNext(&PredXact->activeList,
595 					 &PredXact->activeList,
596 					 offsetof(PredXactListElementData, link));
597 	if (!ptle)
598 		return NULL;
599 
600 	return &ptle->sxact;
601 }
602 
603 static SERIALIZABLEXACT *
NextPredXact(SERIALIZABLEXACT * sxact)604 NextPredXact(SERIALIZABLEXACT *sxact)
605 {
606 	PredXactListElement ptle;
607 
608 	Assert(ShmemAddrIsValid(sxact));
609 
610 	ptle = (PredXactListElement)
611 		(((char *) sxact)
612 		 - offsetof(PredXactListElementData, sxact)
613 		 + offsetof(PredXactListElementData, link));
614 	ptle = (PredXactListElement)
615 		SHMQueueNext(&PredXact->activeList,
616 					 &ptle->link,
617 					 offsetof(PredXactListElementData, link));
618 	if (!ptle)
619 		return NULL;
620 
621 	return &ptle->sxact;
622 }
623 
624 /*------------------------------------------------------------------------*/
625 
626 /*
627  * These functions manage primitive access to the RWConflict pool and lists.
628  */
629 static bool
RWConflictExists(const SERIALIZABLEXACT * reader,const SERIALIZABLEXACT * writer)630 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
631 {
632 	RWConflict	conflict;
633 
634 	Assert(reader != writer);
635 
636 	/* Check the ends of the purported conflict first. */
637 	if (SxactIsDoomed(reader)
638 		|| SxactIsDoomed(writer)
639 		|| SHMQueueEmpty(&reader->outConflicts)
640 		|| SHMQueueEmpty(&writer->inConflicts))
641 		return false;
642 
643 	/* A conflict is possible; walk the list to find out. */
644 	conflict = (RWConflict)
645 		SHMQueueNext(&reader->outConflicts,
646 					 &reader->outConflicts,
647 					 offsetof(RWConflictData, outLink));
648 	while (conflict)
649 	{
650 		if (conflict->sxactIn == writer)
651 			return true;
652 		conflict = (RWConflict)
653 			SHMQueueNext(&reader->outConflicts,
654 						 &conflict->outLink,
655 						 offsetof(RWConflictData, outLink));
656 	}
657 
658 	/* No conflict found. */
659 	return false;
660 }
661 
662 static void
SetRWConflict(SERIALIZABLEXACT * reader,SERIALIZABLEXACT * writer)663 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
664 {
665 	RWConflict	conflict;
666 
667 	Assert(reader != writer);
668 	Assert(!RWConflictExists(reader, writer));
669 
670 	conflict = (RWConflict)
671 		SHMQueueNext(&RWConflictPool->availableList,
672 					 &RWConflictPool->availableList,
673 					 offsetof(RWConflictData, outLink));
674 	if (!conflict)
675 		ereport(ERROR,
676 				(errcode(ERRCODE_OUT_OF_MEMORY),
677 				 errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
678 				 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
679 
680 	SHMQueueDelete(&conflict->outLink);
681 
682 	conflict->sxactOut = reader;
683 	conflict->sxactIn = writer;
684 	SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
685 	SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
686 }
687 
688 static void
SetPossibleUnsafeConflict(SERIALIZABLEXACT * roXact,SERIALIZABLEXACT * activeXact)689 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
690 						  SERIALIZABLEXACT *activeXact)
691 {
692 	RWConflict	conflict;
693 
694 	Assert(roXact != activeXact);
695 	Assert(SxactIsReadOnly(roXact));
696 	Assert(!SxactIsReadOnly(activeXact));
697 
698 	conflict = (RWConflict)
699 		SHMQueueNext(&RWConflictPool->availableList,
700 					 &RWConflictPool->availableList,
701 					 offsetof(RWConflictData, outLink));
702 	if (!conflict)
703 		ereport(ERROR,
704 				(errcode(ERRCODE_OUT_OF_MEMORY),
705 				 errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
706 				 errhint("You might need to run fewer transactions at a time or increase max_connections.")));
707 
708 	SHMQueueDelete(&conflict->outLink);
709 
710 	conflict->sxactOut = activeXact;
711 	conflict->sxactIn = roXact;
712 	SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
713 						 &conflict->outLink);
714 	SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
715 						 &conflict->inLink);
716 }
717 
718 static void
ReleaseRWConflict(RWConflict conflict)719 ReleaseRWConflict(RWConflict conflict)
720 {
721 	SHMQueueDelete(&conflict->inLink);
722 	SHMQueueDelete(&conflict->outLink);
723 	SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
724 }
725 
726 static void
FlagSxactUnsafe(SERIALIZABLEXACT * sxact)727 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
728 {
729 	RWConflict	conflict,
730 				nextConflict;
731 
732 	Assert(SxactIsReadOnly(sxact));
733 	Assert(!SxactIsROSafe(sxact));
734 
735 	sxact->flags |= SXACT_FLAG_RO_UNSAFE;
736 
737 	/*
738 	 * We know this isn't a safe snapshot, so we can stop looking for other
739 	 * potential conflicts.
740 	 */
741 	conflict = (RWConflict)
742 		SHMQueueNext(&sxact->possibleUnsafeConflicts,
743 					 &sxact->possibleUnsafeConflicts,
744 					 offsetof(RWConflictData, inLink));
745 	while (conflict)
746 	{
747 		nextConflict = (RWConflict)
748 			SHMQueueNext(&sxact->possibleUnsafeConflicts,
749 						 &conflict->inLink,
750 						 offsetof(RWConflictData, inLink));
751 
752 		Assert(!SxactIsReadOnly(conflict->sxactOut));
753 		Assert(sxact == conflict->sxactIn);
754 
755 		ReleaseRWConflict(conflict);
756 
757 		conflict = nextConflict;
758 	}
759 }
760 
761 /*------------------------------------------------------------------------*/
762 
763 /*
764  * Decide whether an OldSerXid page number is "older" for truncation purposes.
765  * Analogous to CLOGPagePrecedes().
766  */
767 static bool
OldSerXidPagePrecedesLogically(int page1,int page2)768 OldSerXidPagePrecedesLogically(int page1, int page2)
769 {
770 	TransactionId xid1;
771 	TransactionId xid2;
772 
773 	xid1 = ((TransactionId) page1) * OLDSERXID_ENTRIESPERPAGE;
774 	xid1 += FirstNormalTransactionId + 1;
775 	xid2 = ((TransactionId) page2) * OLDSERXID_ENTRIESPERPAGE;
776 	xid2 += FirstNormalTransactionId + 1;
777 
778 	return (TransactionIdPrecedes(xid1, xid2) &&
779 			TransactionIdPrecedes(xid1, xid2 + OLDSERXID_ENTRIESPERPAGE - 1));
780 }
781 
782 #ifdef USE_ASSERT_CHECKING
783 static void
OldSerXidPagePrecedesLogicallyUnitTests(void)784 OldSerXidPagePrecedesLogicallyUnitTests(void)
785 {
786 	int			per_page = OLDSERXID_ENTRIESPERPAGE,
787 				offset = per_page / 2;
788 	int			newestPage,
789 				oldestPage,
790 				headPage,
791 				targetPage;
792 	TransactionId newestXact,
793 				oldestXact;
794 
795 	/* GetNewTransactionId() has assigned the last XID it can safely use. */
796 	newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1;	/* nothing special */
797 	newestXact = newestPage * per_page + offset;
798 	Assert(newestXact / per_page == newestPage);
799 	oldestXact = newestXact + 1;
800 	oldestXact -= 1U << 31;
801 	oldestPage = oldestXact / per_page;
802 
803 	/*
804 	 * In this scenario, the SLRU headPage pertains to the last ~1000 XIDs
805 	 * assigned.  oldestXact finishes, ~2B XIDs having elapsed since it
806 	 * started.  Further transactions cause us to summarize oldestXact to
807 	 * tailPage.  Function must return false so OldSerXidAdd() doesn't zero
808 	 * tailPage (which may contain entries for other old, recently-finished
809 	 * XIDs) and half the SLRU.  Reaching this requires burning ~2B XIDs in
810 	 * single-user mode, a negligible possibility.
811 	 */
812 	headPage = newestPage;
813 	targetPage = oldestPage;
814 	Assert(!OldSerXidPagePrecedesLogically(headPage, targetPage));
815 
816 	/*
817 	 * In this scenario, the SLRU headPage pertains to oldestXact.  We're
818 	 * summarizing an XID near newestXact.  (Assume few other XIDs used
819 	 * SERIALIZABLE, hence the minimal headPage advancement.  Assume
820 	 * oldestXact was long-running and only recently reached the SLRU.)
821 	 * Function must return true to make OldSerXidAdd() create targetPage.
822 	 *
823 	 * Today's implementation mishandles this case, but it doesn't matter
824 	 * enough to fix.  Verify that the defect affects just one page by
825 	 * asserting correct treatment of its prior page.  Reaching this case
826 	 * requires burning ~2B XIDs in single-user mode, a negligible
827 	 * possibility.  Moreover, if it does happen, the consequence would be
828 	 * mild, namely a new transaction failing in SimpleLruReadPage().
829 	 */
830 	headPage = oldestPage;
831 	targetPage = newestPage;
832 	Assert(OldSerXidPagePrecedesLogically(headPage, targetPage - 1));
833 #if 0
834 	Assert(OldSerXidPagePrecedesLogically(headPage, targetPage));
835 #endif
836 }
837 #endif
838 
839 /*
840  * Initialize for the tracking of old serializable committed xids.
841  */
842 static void
OldSerXidInit(void)843 OldSerXidInit(void)
844 {
845 	bool		found;
846 
847 	/*
848 	 * Set up SLRU management of the pg_serial data.
849 	 */
850 	OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
851 	SimpleLruInit(OldSerXidSlruCtl, "oldserxid",
852 				  NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial",
853 				  LWTRANCHE_OLDSERXID_BUFFERS);
854 	/* Override default assumption that writes should be fsync'd */
855 	OldSerXidSlruCtl->do_fsync = false;
856 #ifdef USE_ASSERT_CHECKING
857 	OldSerXidPagePrecedesLogicallyUnitTests();
858 #endif
859 	SlruPagePrecedesUnitTests(OldSerXidSlruCtl, OLDSERXID_ENTRIESPERPAGE);
860 
861 	/*
862 	 * Create or attach to the OldSerXidControl structure.
863 	 */
864 	oldSerXidControl = (OldSerXidControl)
865 		ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
866 
867 	Assert(found == IsUnderPostmaster);
868 	if (!found)
869 	{
870 		/*
871 		 * Set control information to reflect empty SLRU.
872 		 */
873 		oldSerXidControl->headPage = -1;
874 		oldSerXidControl->headXid = InvalidTransactionId;
875 		oldSerXidControl->tailXid = InvalidTransactionId;
876 		oldSerXidControl->warningIssued = false;
877 	}
878 }
879 
880 /*
881  * Record a committed read write serializable xid and the minimum
882  * commitSeqNo of any transactions to which this xid had a rw-conflict out.
883  * An invalid seqNo means that there were no conflicts out from xid.
884  */
885 static void
OldSerXidAdd(TransactionId xid,SerCommitSeqNo minConflictCommitSeqNo)886 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
887 {
888 	TransactionId tailXid;
889 	int			targetPage;
890 	int			slotno;
891 	int			firstZeroPage;
892 	bool		isNewPage;
893 
894 	Assert(TransactionIdIsValid(xid));
895 
896 	targetPage = OldSerXidPage(xid);
897 
898 	LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
899 
900 	/*
901 	 * If no serializable transactions are active, there shouldn't be anything
902 	 * to push out to the SLRU.  Hitting this assert would mean there's
903 	 * something wrong with the earlier cleanup logic.
904 	 */
905 	tailXid = oldSerXidControl->tailXid;
906 	Assert(TransactionIdIsValid(tailXid));
907 
908 	/*
909 	 * If the SLRU is currently unused, zero out the whole active region from
910 	 * tailXid to headXid before taking it into use. Otherwise zero out only
911 	 * any new pages that enter the tailXid-headXid range as we advance
912 	 * headXid.
913 	 */
914 	if (oldSerXidControl->headPage < 0)
915 	{
916 		firstZeroPage = OldSerXidPage(tailXid);
917 		isNewPage = true;
918 	}
919 	else
920 	{
921 		firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
922 		isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
923 												   targetPage);
924 	}
925 
926 	if (!TransactionIdIsValid(oldSerXidControl->headXid)
927 		|| TransactionIdFollows(xid, oldSerXidControl->headXid))
928 		oldSerXidControl->headXid = xid;
929 	if (isNewPage)
930 		oldSerXidControl->headPage = targetPage;
931 
932 	/*
933 	 * Give a warning if we're about to run out of SLRU pages.
934 	 *
935 	 * slru.c has a maximum of 64k segments, with 32 (SLRU_PAGES_PER_SEGMENT)
936 	 * pages each. We need to store a 64-bit integer for each Xid, and with
937 	 * default 8k block size, 65536*32 pages is only enough to cover 2^30
938 	 * XIDs. If we're about to hit that limit and wrap around, warn the user.
939 	 *
940 	 * To avoid spamming the user, we only give one warning when we've used 1
941 	 * billion XIDs, and stay silent until the situation is fixed and the
942 	 * number of XIDs used falls below 800 million again.
943 	 *
944 	 * XXX: We have no safeguard to actually *prevent* the wrap-around,
945 	 * though. All you get is a warning.
946 	 */
947 	if (oldSerXidControl->warningIssued)
948 	{
949 		TransactionId lowWatermark;
950 
951 		lowWatermark = tailXid + 800000000;
952 		if (lowWatermark < FirstNormalTransactionId)
953 			lowWatermark = FirstNormalTransactionId;
954 		if (TransactionIdPrecedes(xid, lowWatermark))
955 			oldSerXidControl->warningIssued = false;
956 	}
957 	else
958 	{
959 		TransactionId highWatermark;
960 
961 		highWatermark = tailXid + 1000000000;
962 		if (highWatermark < FirstNormalTransactionId)
963 			highWatermark = FirstNormalTransactionId;
964 		if (TransactionIdFollows(xid, highWatermark))
965 		{
966 			oldSerXidControl->warningIssued = true;
967 			ereport(WARNING,
968 					(errmsg("memory for serializable conflict tracking is nearly exhausted"),
969 					 errhint("There might be an idle transaction or a forgotten prepared transaction causing this.")));
970 		}
971 	}
972 
973 	if (isNewPage)
974 	{
975 		/* Initialize intervening pages. */
976 		while (firstZeroPage != targetPage)
977 		{
978 			(void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
979 			firstZeroPage = OldSerXidNextPage(firstZeroPage);
980 		}
981 		slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
982 	}
983 	else
984 		slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
985 
986 	OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
987 	OldSerXidSlruCtl->shared->page_dirty[slotno] = true;
988 
989 	LWLockRelease(OldSerXidLock);
990 }
991 
992 /*
993  * Get the minimum commitSeqNo for any conflict out for the given xid.  For
994  * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
995  * will be returned.
996  */
997 static SerCommitSeqNo
OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)998 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
999 {
1000 	TransactionId headXid;
1001 	TransactionId tailXid;
1002 	SerCommitSeqNo val;
1003 	int			slotno;
1004 
1005 	Assert(TransactionIdIsValid(xid));
1006 
1007 	LWLockAcquire(OldSerXidLock, LW_SHARED);
1008 	headXid = oldSerXidControl->headXid;
1009 	tailXid = oldSerXidControl->tailXid;
1010 	LWLockRelease(OldSerXidLock);
1011 
1012 	if (!TransactionIdIsValid(headXid))
1013 		return 0;
1014 
1015 	Assert(TransactionIdIsValid(tailXid));
1016 
1017 	if (TransactionIdPrecedes(xid, tailXid)
1018 		|| TransactionIdFollows(xid, headXid))
1019 		return 0;
1020 
1021 	/*
1022 	 * The following function must be called without holding OldSerXidLock,
1023 	 * but will return with that lock held, which must then be released.
1024 	 */
1025 	slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
1026 										OldSerXidPage(xid), xid);
1027 	val = OldSerXidValue(slotno, xid);
1028 	LWLockRelease(OldSerXidLock);
1029 	return val;
1030 }
1031 
1032 /*
1033  * Call this whenever there is a new xmin for active serializable
1034  * transactions.  We don't need to keep information on transactions which
1035  * precede that.  InvalidTransactionId means none active, so everything in
1036  * the SLRU can be discarded.
1037  */
1038 static void
OldSerXidSetActiveSerXmin(TransactionId xid)1039 OldSerXidSetActiveSerXmin(TransactionId xid)
1040 {
1041 	LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
1042 
1043 	/*
1044 	 * When no sxacts are active, nothing overlaps, set the xid values to
1045 	 * invalid to show that there are no valid entries.  Don't clear headPage,
1046 	 * though.  A new xmin might still land on that page, and we don't want to
1047 	 * repeatedly zero out the same page.
1048 	 */
1049 	if (!TransactionIdIsValid(xid))
1050 	{
1051 		oldSerXidControl->tailXid = InvalidTransactionId;
1052 		oldSerXidControl->headXid = InvalidTransactionId;
1053 		LWLockRelease(OldSerXidLock);
1054 		return;
1055 	}
1056 
1057 	/*
1058 	 * When we're recovering prepared transactions, the global xmin might move
1059 	 * backwards depending on the order they're recovered. Normally that's not
1060 	 * OK, but during recovery no serializable transactions will commit, so
1061 	 * the SLRU is empty and we can get away with it.
1062 	 */
1063 	if (RecoveryInProgress())
1064 	{
1065 		Assert(oldSerXidControl->headPage < 0);
1066 		if (!TransactionIdIsValid(oldSerXidControl->tailXid)
1067 			|| TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
1068 		{
1069 			oldSerXidControl->tailXid = xid;
1070 		}
1071 		LWLockRelease(OldSerXidLock);
1072 		return;
1073 	}
1074 
1075 	Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
1076 		   || TransactionIdFollows(xid, oldSerXidControl->tailXid));
1077 
1078 	oldSerXidControl->tailXid = xid;
1079 
1080 	LWLockRelease(OldSerXidLock);
1081 }
1082 
1083 /*
1084  * Perform a checkpoint --- either during shutdown, or on-the-fly
1085  *
1086  * We don't have any data that needs to survive a restart, but this is a
1087  * convenient place to truncate the SLRU.
1088  */
1089 void
CheckPointPredicate(void)1090 CheckPointPredicate(void)
1091 {
1092 	int			tailPage;
1093 
1094 	LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
1095 
1096 	/* Exit quickly if the SLRU is currently not in use. */
1097 	if (oldSerXidControl->headPage < 0)
1098 	{
1099 		LWLockRelease(OldSerXidLock);
1100 		return;
1101 	}
1102 
1103 	if (TransactionIdIsValid(oldSerXidControl->tailXid))
1104 	{
1105 		/* We can truncate the SLRU up to the page containing tailXid */
1106 		tailPage = OldSerXidPage(oldSerXidControl->tailXid);
1107 	}
1108 	else
1109 	{
1110 		/*----------
1111 		 * The SLRU is no longer needed. Truncate to head before we set head
1112 		 * invalid.
1113 		 *
1114 		 * XXX: It's possible that the SLRU is not needed again until XID
1115 		 * wrap-around has happened, so that the segment containing headPage
1116 		 * that we leave behind will appear to be new again. In that case it
1117 		 * won't be removed until XID horizon advances enough to make it
1118 		 * current again.
1119 		 *
1120 		 * XXX: This should happen in vac_truncate_clog(), not in checkpoints.
1121 		 * Consider this scenario, starting from a system with no in-progress
1122 		 * transactions and VACUUM FREEZE having maximized oldestXact:
1123 		 * - Start a SERIALIZABLE transaction.
1124 		 * - Start, finish, and summarize a SERIALIZABLE transaction, creating
1125 		 *   one SLRU page.
1126 		 * - Consume XIDs to reach xidStopLimit.
1127 		 * - Finish all transactions.  Due to the long-running SERIALIZABLE
1128 		 *   transaction, earlier checkpoints did not touch headPage.  The
1129 		 *   next checkpoint will change it, but that checkpoint happens after
1130 		 *   the end of the scenario.
1131 		 * - VACUUM to advance XID limits.
1132 		 * - Consume ~2M XIDs, crossing the former xidWrapLimit.
1133 		 * - Start, finish, and summarize a SERIALIZABLE transaction.
1134 		 *   OldSerXidAdd() declines to create the targetPage, because
1135 		 *   headPage is not regarded as in the past relative to that
1136 		 *   targetPage.  The transaction instigating the summarize fails in
1137 		 *   SimpleLruReadPage().
1138 		 */
1139 		tailPage = oldSerXidControl->headPage;
1140 		oldSerXidControl->headPage = -1;
1141 	}
1142 
1143 	LWLockRelease(OldSerXidLock);
1144 
1145 	/* Truncate away pages that are no longer required */
1146 	SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
1147 
1148 	/*
1149 	 * Flush dirty SLRU pages to disk
1150 	 *
1151 	 * This is not actually necessary from a correctness point of view. We do
1152 	 * it merely as a debugging aid.
1153 	 *
1154 	 * We're doing this after the truncation to avoid writing pages right
1155 	 * before deleting the file in which they sit, which would be completely
1156 	 * pointless.
1157 	 */
1158 	SimpleLruFlush(OldSerXidSlruCtl, true);
1159 }
1160 
1161 /*------------------------------------------------------------------------*/
1162 
1163 /*
1164  * InitPredicateLocks -- Initialize the predicate locking data structures.
1165  *
1166  * This is called from CreateSharedMemoryAndSemaphores(), which see for
1167  * more comments.  In the normal postmaster case, the shared hash tables
1168  * are created here.  Backends inherit the pointers
1169  * to the shared tables via fork().  In the EXEC_BACKEND case, each
1170  * backend re-executes this code to obtain pointers to the already existing
1171  * shared hash tables.
1172  */
1173 void
InitPredicateLocks(void)1174 InitPredicateLocks(void)
1175 {
1176 	HASHCTL		info;
1177 	long		max_table_size;
1178 	Size		requestSize;
1179 	bool		found;
1180 
1181 #ifndef EXEC_BACKEND
1182 	Assert(!IsUnderPostmaster);
1183 #endif
1184 
1185 	/*
1186 	 * Compute size of predicate lock target hashtable. Note these
1187 	 * calculations must agree with PredicateLockShmemSize!
1188 	 */
1189 	max_table_size = NPREDICATELOCKTARGETENTS();
1190 
1191 	/*
1192 	 * Allocate hash table for PREDICATELOCKTARGET structs.  This stores
1193 	 * per-predicate-lock-target information.
1194 	 */
1195 	MemSet(&info, 0, sizeof(info));
1196 	info.keysize = sizeof(PREDICATELOCKTARGETTAG);
1197 	info.entrysize = sizeof(PREDICATELOCKTARGET);
1198 	info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1199 
1200 	PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
1201 											max_table_size,
1202 											max_table_size,
1203 											&info,
1204 											HASH_ELEM | HASH_BLOBS |
1205 											HASH_PARTITION | HASH_FIXED_SIZE);
1206 
1207 	/*
1208 	 * Reserve a dummy entry in the hash table; we use it to make sure there's
1209 	 * always one entry available when we need to split or combine a page,
1210 	 * because running out of space there could mean aborting a
1211 	 * non-serializable transaction.
1212 	 */
1213 	if (!IsUnderPostmaster)
1214 	{
1215 		(void) hash_search(PredicateLockTargetHash, &ScratchTargetTag,
1216 						   HASH_ENTER, &found);
1217 		Assert(!found);
1218 	}
1219 
1220 	/* Pre-calculate the hash and partition lock of the scratch entry */
1221 	ScratchTargetTagHash = PredicateLockTargetTagHashCode(&ScratchTargetTag);
1222 	ScratchPartitionLock = PredicateLockHashPartitionLock(ScratchTargetTagHash);
1223 
1224 	/*
1225 	 * Allocate hash table for PREDICATELOCK structs.  This stores per
1226 	 * xact-lock-of-a-target information.
1227 	 */
1228 	MemSet(&info, 0, sizeof(info));
1229 	info.keysize = sizeof(PREDICATELOCKTAG);
1230 	info.entrysize = sizeof(PREDICATELOCK);
1231 	info.hash = predicatelock_hash;
1232 	info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1233 
1234 	/* Assume an average of 2 xacts per target */
1235 	max_table_size *= 2;
1236 
1237 	PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
1238 									  max_table_size,
1239 									  max_table_size,
1240 									  &info,
1241 									  HASH_ELEM | HASH_FUNCTION |
1242 									  HASH_PARTITION | HASH_FIXED_SIZE);
1243 
1244 	/*
1245 	 * Compute size for serializable transaction hashtable. Note these
1246 	 * calculations must agree with PredicateLockShmemSize!
1247 	 */
1248 	max_table_size = (MaxBackends + max_prepared_xacts);
1249 
1250 	/*
1251 	 * Allocate a list to hold information on transactions participating in
1252 	 * predicate locking.
1253 	 *
1254 	 * Assume an average of 10 predicate locking transactions per backend.
1255 	 * This allows aggressive cleanup while detail is present before data must
1256 	 * be summarized for storage in SLRU and the "dummy" transaction.
1257 	 */
1258 	max_table_size *= 10;
1259 
1260 	PredXact = ShmemInitStruct("PredXactList",
1261 							   PredXactListDataSize,
1262 							   &found);
1263 	Assert(found == IsUnderPostmaster);
1264 	if (!found)
1265 	{
1266 		int			i;
1267 
1268 		SHMQueueInit(&PredXact->availableList);
1269 		SHMQueueInit(&PredXact->activeList);
1270 		PredXact->SxactGlobalXmin = InvalidTransactionId;
1271 		PredXact->SxactGlobalXminCount = 0;
1272 		PredXact->WritableSxactCount = 0;
1273 		PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
1274 		PredXact->CanPartialClearThrough = 0;
1275 		PredXact->HavePartialClearedThrough = 0;
1276 		requestSize = mul_size((Size) max_table_size,
1277 							   PredXactListElementDataSize);
1278 		PredXact->element = ShmemAlloc(requestSize);
1279 		if (PredXact->element == NULL)
1280 			ereport(ERROR,
1281 					(errcode(ERRCODE_OUT_OF_MEMORY),
1282 			 errmsg("not enough shared memory for elements of data structure"
1283 					" \"%s\" (%zu bytes requested)",
1284 					"PredXactList", requestSize)));
1285 		/* Add all elements to available list, clean. */
1286 		memset(PredXact->element, 0, requestSize);
1287 		for (i = 0; i < max_table_size; i++)
1288 		{
1289 			SHMQueueInsertBefore(&(PredXact->availableList),
1290 								 &(PredXact->element[i].link));
1291 		}
1292 		PredXact->OldCommittedSxact = CreatePredXact();
1293 		SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
1294 		PredXact->OldCommittedSxact->prepareSeqNo = 0;
1295 		PredXact->OldCommittedSxact->commitSeqNo = 0;
1296 		PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
1297 		SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
1298 		SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
1299 		SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
1300 		SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
1301 		SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
1302 		PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
1303 		PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
1304 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
1305 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
1306 		PredXact->OldCommittedSxact->pid = 0;
1307 	}
1308 	/* This never changes, so let's keep a local copy. */
1309 	OldCommittedSxact = PredXact->OldCommittedSxact;
1310 
1311 	/*
1312 	 * Allocate hash table for SERIALIZABLEXID structs.  This stores per-xid
1313 	 * information for serializable transactions which have accessed data.
1314 	 */
1315 	MemSet(&info, 0, sizeof(info));
1316 	info.keysize = sizeof(SERIALIZABLEXIDTAG);
1317 	info.entrysize = sizeof(SERIALIZABLEXID);
1318 
1319 	SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
1320 										max_table_size,
1321 										max_table_size,
1322 										&info,
1323 										HASH_ELEM | HASH_BLOBS |
1324 										HASH_FIXED_SIZE);
1325 
1326 	/*
1327 	 * Allocate space for tracking rw-conflicts in lists attached to the
1328 	 * transactions.
1329 	 *
1330 	 * Assume an average of 5 conflicts per transaction.  Calculations suggest
1331 	 * that this will prevent resource exhaustion in even the most pessimal
1332 	 * loads up to max_connections = 200 with all 200 connections pounding the
1333 	 * database with serializable transactions.  Beyond that, there may be
1334 	 * occasional transactions canceled when trying to flag conflicts. That's
1335 	 * probably OK.
1336 	 */
1337 	max_table_size *= 5;
1338 
1339 	RWConflictPool = ShmemInitStruct("RWConflictPool",
1340 									 RWConflictPoolHeaderDataSize,
1341 									 &found);
1342 	Assert(found == IsUnderPostmaster);
1343 	if (!found)
1344 	{
1345 		int			i;
1346 
1347 		SHMQueueInit(&RWConflictPool->availableList);
1348 		requestSize = mul_size((Size) max_table_size,
1349 							   RWConflictDataSize);
1350 		RWConflictPool->element = ShmemAlloc(requestSize);
1351 		if (RWConflictPool->element == NULL)
1352 			ereport(ERROR,
1353 					(errcode(ERRCODE_OUT_OF_MEMORY),
1354 			 errmsg("not enough shared memory for elements of data structure"
1355 					" \"%s\" (%zu bytes requested)",
1356 					"RWConflictPool", requestSize)));
1357 		/* Add all elements to available list, clean. */
1358 		memset(RWConflictPool->element, 0, requestSize);
1359 		for (i = 0; i < max_table_size; i++)
1360 		{
1361 			SHMQueueInsertBefore(&(RWConflictPool->availableList),
1362 								 &(RWConflictPool->element[i].outLink));
1363 		}
1364 	}
1365 
1366 	/*
1367 	 * Create or attach to the header for the list of finished serializable
1368 	 * transactions.
1369 	 */
1370 	FinishedSerializableTransactions = (SHM_QUEUE *)
1371 		ShmemInitStruct("FinishedSerializableTransactions",
1372 						sizeof(SHM_QUEUE),
1373 						&found);
1374 	Assert(found == IsUnderPostmaster);
1375 	if (!found)
1376 		SHMQueueInit(FinishedSerializableTransactions);
1377 
1378 	/*
1379 	 * Initialize the SLRU storage for old committed serializable
1380 	 * transactions.
1381 	 */
1382 	OldSerXidInit();
1383 }
1384 
1385 /*
1386  * Estimate shared-memory space used for predicate lock table
1387  */
1388 Size
PredicateLockShmemSize(void)1389 PredicateLockShmemSize(void)
1390 {
1391 	Size		size = 0;
1392 	long		max_table_size;
1393 
1394 	/* predicate lock target hash table */
1395 	max_table_size = NPREDICATELOCKTARGETENTS();
1396 	size = add_size(size, hash_estimate_size(max_table_size,
1397 											 sizeof(PREDICATELOCKTARGET)));
1398 
1399 	/* predicate lock hash table */
1400 	max_table_size *= 2;
1401 	size = add_size(size, hash_estimate_size(max_table_size,
1402 											 sizeof(PREDICATELOCK)));
1403 
1404 	/*
1405 	 * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
1406 	 * margin.
1407 	 */
1408 	size = add_size(size, size / 10);
1409 
1410 	/* transaction list */
1411 	max_table_size = MaxBackends + max_prepared_xacts;
1412 	max_table_size *= 10;
1413 	size = add_size(size, PredXactListDataSize);
1414 	size = add_size(size, mul_size((Size) max_table_size,
1415 								   PredXactListElementDataSize));
1416 
1417 	/* transaction xid table */
1418 	size = add_size(size, hash_estimate_size(max_table_size,
1419 											 sizeof(SERIALIZABLEXID)));
1420 
1421 	/* rw-conflict pool */
1422 	max_table_size *= 5;
1423 	size = add_size(size, RWConflictPoolHeaderDataSize);
1424 	size = add_size(size, mul_size((Size) max_table_size,
1425 								   RWConflictDataSize));
1426 
1427 	/* Head for list of finished serializable transactions. */
1428 	size = add_size(size, sizeof(SHM_QUEUE));
1429 
1430 	/* Shared memory structures for SLRU tracking of old committed xids. */
1431 	size = add_size(size, sizeof(OldSerXidControlData));
1432 	size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
1433 
1434 	return size;
1435 }
1436 
1437 
1438 /*
1439  * Compute the hash code associated with a PREDICATELOCKTAG.
1440  *
1441  * Because we want to use just one set of partition locks for both the
1442  * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
1443  * that PREDICATELOCKs fall into the same partition number as their
1444  * associated PREDICATELOCKTARGETs.  dynahash.c expects the partition number
1445  * to be the low-order bits of the hash code, and therefore a
1446  * PREDICATELOCKTAG's hash code must have the same low-order bits as the
1447  * associated PREDICATELOCKTARGETTAG's hash code.  We achieve this with this
1448  * specialized hash function.
1449  */
1450 static uint32
predicatelock_hash(const void * key,Size keysize)1451 predicatelock_hash(const void *key, Size keysize)
1452 {
1453 	const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
1454 	uint32		targethash;
1455 
1456 	Assert(keysize == sizeof(PREDICATELOCKTAG));
1457 
1458 	/* Look into the associated target object, and compute its hash code */
1459 	targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
1460 
1461 	return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
1462 }
1463 
1464 
1465 /*
1466  * GetPredicateLockStatusData
1467  *		Return a table containing the internal state of the predicate
1468  *		lock manager for use in pg_lock_status.
1469  *
1470  * Like GetLockStatusData, this function tries to hold the partition LWLocks
1471  * for as short a time as possible by returning two arrays that simply
1472  * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
1473  * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
1474  * SERIALIZABLEXACT will likely appear.
1475  */
1476 PredicateLockData *
GetPredicateLockStatusData(void)1477 GetPredicateLockStatusData(void)
1478 {
1479 	PredicateLockData *data;
1480 	int			i;
1481 	int			els,
1482 				el;
1483 	HASH_SEQ_STATUS seqstat;
1484 	PREDICATELOCK *predlock;
1485 
1486 	data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
1487 
1488 	/*
1489 	 * To ensure consistency, take simultaneous locks on all partition locks
1490 	 * in ascending order, then SerializableXactHashLock.
1491 	 */
1492 	for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
1493 		LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
1494 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
1495 
1496 	/* Get number of locks and allocate appropriately-sized arrays. */
1497 	els = hash_get_num_entries(PredicateLockHash);
1498 	data->nelements = els;
1499 	data->locktags = (PREDICATELOCKTARGETTAG *)
1500 		palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
1501 	data->xacts = (SERIALIZABLEXACT *)
1502 		palloc(sizeof(SERIALIZABLEXACT) * els);
1503 
1504 
1505 	/* Scan through PredicateLockHash and copy contents */
1506 	hash_seq_init(&seqstat, PredicateLockHash);
1507 
1508 	el = 0;
1509 
1510 	while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
1511 	{
1512 		data->locktags[el] = predlock->tag.myTarget->tag;
1513 		data->xacts[el] = *predlock->tag.myXact;
1514 		el++;
1515 	}
1516 
1517 	Assert(el == els);
1518 
1519 	/* Release locks in reverse order */
1520 	LWLockRelease(SerializableXactHashLock);
1521 	for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
1522 		LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
1523 
1524 	return data;
1525 }
1526 
1527 /*
1528  * Free up shared memory structures by pushing the oldest sxact (the one at
1529  * the front of the SummarizeOldestCommittedSxact queue) into summary form.
1530  * Each call will free exactly one SERIALIZABLEXACT structure and may also
1531  * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
1532  * PREDICATELOCKTARGET, RWConflictData.
1533  */
1534 static void
SummarizeOldestCommittedSxact(void)1535 SummarizeOldestCommittedSxact(void)
1536 {
1537 	SERIALIZABLEXACT *sxact;
1538 
1539 	LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
1540 
1541 	/*
1542 	 * This function is only called if there are no sxact slots available.
1543 	 * Some of them must belong to old, already-finished transactions, so
1544 	 * there should be something in FinishedSerializableTransactions list that
1545 	 * we can summarize. However, there's a race condition: while we were not
1546 	 * holding any locks, a transaction might have ended and cleaned up all
1547 	 * the finished sxact entries already, freeing up their sxact slots. In
1548 	 * that case, we have nothing to do here. The caller will find one of the
1549 	 * slots released by the other backend when it retries.
1550 	 */
1551 	if (SHMQueueEmpty(FinishedSerializableTransactions))
1552 	{
1553 		LWLockRelease(SerializableFinishedListLock);
1554 		return;
1555 	}
1556 
1557 	/*
1558 	 * Grab the first sxact off the finished list -- this will be the earliest
1559 	 * commit.  Remove it from the list.
1560 	 */
1561 	sxact = (SERIALIZABLEXACT *)
1562 		SHMQueueNext(FinishedSerializableTransactions,
1563 					 FinishedSerializableTransactions,
1564 					 offsetof(SERIALIZABLEXACT, finishedLink));
1565 	SHMQueueDelete(&(sxact->finishedLink));
1566 
1567 	/* Add to SLRU summary information. */
1568 	if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
1569 		OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
1570 		   ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
1571 
1572 	/* Summarize and release the detail. */
1573 	ReleaseOneSerializableXact(sxact, false, true);
1574 
1575 	LWLockRelease(SerializableFinishedListLock);
1576 }
1577 
1578 /*
1579  * GetSafeSnapshot
1580  *		Obtain and register a snapshot for a READ ONLY DEFERRABLE
1581  *		transaction. Ensures that the snapshot is "safe", i.e. a
1582  *		read-only transaction running on it can execute serializably
1583  *		without further checks. This requires waiting for concurrent
1584  *		transactions to complete, and retrying with a new snapshot if
1585  *		one of them could possibly create a conflict.
1586  *
1587  *		As with GetSerializableTransactionSnapshot (which this is a subroutine
1588  *		for), the passed-in Snapshot pointer should reference a static data
1589  *		area that can safely be passed to GetSnapshotData.
1590  */
1591 static Snapshot
GetSafeSnapshot(Snapshot origSnapshot)1592 GetSafeSnapshot(Snapshot origSnapshot)
1593 {
1594 	Snapshot	snapshot;
1595 
1596 	Assert(XactReadOnly && XactDeferrable);
1597 
1598 	while (true)
1599 	{
1600 		/*
1601 		 * GetSerializableTransactionSnapshotInt is going to call
1602 		 * GetSnapshotData, so we need to provide it the static snapshot area
1603 		 * our caller passed to us.  The pointer returned is actually the same
1604 		 * one passed to it, but we avoid assuming that here.
1605 		 */
1606 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
1607 													   InvalidTransactionId);
1608 
1609 		if (MySerializableXact == InvalidSerializableXact)
1610 			return snapshot;	/* no concurrent r/w xacts; it's safe */
1611 
1612 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1613 
1614 		/*
1615 		 * Wait for concurrent transactions to finish. Stop early if one of
1616 		 * them marked us as conflicted.
1617 		 */
1618 		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
1619 		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
1620 				 SxactIsROUnsafe(MySerializableXact)))
1621 		{
1622 			LWLockRelease(SerializableXactHashLock);
1623 			ProcWaitForSignal();
1624 			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1625 		}
1626 		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
1627 
1628 		if (!SxactIsROUnsafe(MySerializableXact))
1629 		{
1630 			LWLockRelease(SerializableXactHashLock);
1631 			break;				/* success */
1632 		}
1633 
1634 		LWLockRelease(SerializableXactHashLock);
1635 
1636 		/* else, need to retry... */
1637 		ereport(DEBUG2,
1638 				(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
1639 				 errmsg("deferrable snapshot was unsafe; trying a new one")));
1640 		ReleasePredicateLocks(false);
1641 	}
1642 
1643 	/*
1644 	 * Now we have a safe snapshot, so we don't need to do any further checks.
1645 	 */
1646 	Assert(SxactIsROSafe(MySerializableXact));
1647 	ReleasePredicateLocks(false);
1648 
1649 	return snapshot;
1650 }
1651 
1652 /*
1653  * Acquire a snapshot that can be used for the current transaction.
1654  *
1655  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
1656  * It should be current for this process and be contained in PredXact.
1657  *
1658  * The passed-in Snapshot pointer should reference a static data area that
1659  * can safely be passed to GetSnapshotData.  The return value is actually
1660  * always this same pointer; no new snapshot data structure is allocated
1661  * within this function.
1662  */
1663 Snapshot
GetSerializableTransactionSnapshot(Snapshot snapshot)1664 GetSerializableTransactionSnapshot(Snapshot snapshot)
1665 {
1666 	Assert(IsolationIsSerializable());
1667 
1668 	/*
1669 	 * Can't use serializable mode while recovery is still active, as it is,
1670 	 * for example, on a hot standby.  We could get here despite the check in
1671 	 * check_XactIsoLevel() if default_transaction_isolation is set to
1672 	 * serializable, so phrase the hint accordingly.
1673 	 */
1674 	if (RecoveryInProgress())
1675 		ereport(ERROR,
1676 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1677 				 errmsg("cannot use serializable mode in a hot standby"),
1678 				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
1679 				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
1680 
1681 	/*
1682 	 * A special optimization is available for SERIALIZABLE READ ONLY
1683 	 * DEFERRABLE transactions -- we can wait for a suitable snapshot and
1684 	 * thereby avoid all SSI overhead once it's running.
1685 	 */
1686 	if (XactReadOnly && XactDeferrable)
1687 		return GetSafeSnapshot(snapshot);
1688 
1689 	return GetSerializableTransactionSnapshotInt(snapshot,
1690 												 InvalidTransactionId);
1691 }
1692 
1693 /*
1694  * Import a snapshot to be used for the current transaction.
1695  *
1696  * This is nearly the same as GetSerializableTransactionSnapshot, except that
1697  * we don't take a new snapshot, but rather use the data we're handed.
1698  *
1699  * The caller must have verified that the snapshot came from a serializable
1700  * transaction; and if we're read-write, the source transaction must not be
1701  * read-only.
1702  */
1703 void
SetSerializableTransactionSnapshot(Snapshot snapshot,TransactionId sourcexid)1704 SetSerializableTransactionSnapshot(Snapshot snapshot,
1705 								   TransactionId sourcexid)
1706 {
1707 	Assert(IsolationIsSerializable());
1708 
1709 	/*
1710 	 * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
1711 	 * import snapshots, since there's no way to wait for a safe snapshot when
1712 	 * we're using the snap we're told to.  (XXX instead of throwing an error,
1713 	 * we could just ignore the XactDeferrable flag?)
1714 	 */
1715 	if (XactReadOnly && XactDeferrable)
1716 		ereport(ERROR,
1717 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1718 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
1719 
1720 	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
1721 }
1722 
1723 /*
1724  * Guts of GetSerializableTransactionSnapshot
1725  *
1726  * If sourcexid is valid, this is actually an import operation and we should
1727  * skip calling GetSnapshotData, because the snapshot contents are already
1728  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
1729  * source xact is still running after we acquire SerializableXactHashLock.
1730  * We do that by calling ProcArrayInstallImportedXmin.
1731  */
1732 static Snapshot
GetSerializableTransactionSnapshotInt(Snapshot snapshot,TransactionId sourcexid)1733 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
1734 									  TransactionId sourcexid)
1735 {
1736 	PGPROC	   *proc;
1737 	VirtualTransactionId vxid;
1738 	SERIALIZABLEXACT *sxact,
1739 			   *othersxact;
1740 	HASHCTL		hash_ctl;
1741 
1742 	/* We only do this for serializable transactions.  Once. */
1743 	Assert(MySerializableXact == InvalidSerializableXact);
1744 
1745 	Assert(!RecoveryInProgress());
1746 
1747 	/*
1748 	 * Since all parts of a serializable transaction must use the same
1749 	 * snapshot, it is too late to establish one after a parallel operation
1750 	 * has begun.
1751 	 */
1752 	if (IsInParallelMode())
1753 		elog(ERROR, "cannot establish serializable snapshot during a parallel operation");
1754 
1755 	proc = MyProc;
1756 	Assert(proc != NULL);
1757 	GET_VXID_FROM_PGPROC(vxid, *proc);
1758 
1759 	/*
1760 	 * First we get the sxact structure, which may involve looping and access
1761 	 * to the "finished" list to free a structure for use.
1762 	 *
1763 	 * We must hold SerializableXactHashLock when taking/checking the snapshot
1764 	 * to avoid race conditions, for much the same reasons that
1765 	 * GetSnapshotData takes the ProcArrayLock.  Since we might have to
1766 	 * release SerializableXactHashLock to call SummarizeOldestCommittedSxact,
1767 	 * this means we have to create the sxact first, which is a bit annoying
1768 	 * (in particular, an elog(ERROR) in procarray.c would cause us to leak
1769 	 * the sxact).  Consider refactoring to avoid this.
1770 	 */
1771 #ifdef TEST_OLDSERXID
1772 	SummarizeOldestCommittedSxact();
1773 #endif
1774 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1775 	do
1776 	{
1777 		sxact = CreatePredXact();
1778 		/* If null, push out committed sxact to SLRU summary & retry. */
1779 		if (!sxact)
1780 		{
1781 			LWLockRelease(SerializableXactHashLock);
1782 			SummarizeOldestCommittedSxact();
1783 			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1784 		}
1785 	} while (!sxact);
1786 
1787 	/* Get the snapshot, or check that it's safe to use */
1788 	if (!TransactionIdIsValid(sourcexid))
1789 		snapshot = GetSnapshotData(snapshot);
1790 	else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
1791 	{
1792 		ReleasePredXact(sxact);
1793 		LWLockRelease(SerializableXactHashLock);
1794 		ereport(ERROR,
1795 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1796 				 errmsg("could not import the requested snapshot"),
1797 			   errdetail("The source transaction %u is not running anymore.",
1798 						 sourcexid)));
1799 	}
1800 
1801 	/*
1802 	 * If there are no serializable transactions which are not read-only, we
1803 	 * can "opt out" of predicate locking and conflict checking for a
1804 	 * read-only transaction.
1805 	 *
1806 	 * The reason this is safe is that a read-only transaction can only become
1807 	 * part of a dangerous structure if it overlaps a writable transaction
1808 	 * which in turn overlaps a writable transaction which committed before
1809 	 * the read-only transaction started.  A new writable transaction can
1810 	 * overlap this one, but it can't meet the other condition of overlapping
1811 	 * a transaction which committed before this one started.
1812 	 */
1813 	if (XactReadOnly && PredXact->WritableSxactCount == 0)
1814 	{
1815 		ReleasePredXact(sxact);
1816 		LWLockRelease(SerializableXactHashLock);
1817 		return snapshot;
1818 	}
1819 
1820 	/* Maintain serializable global xmin info. */
1821 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
1822 	{
1823 		Assert(PredXact->SxactGlobalXminCount == 0);
1824 		PredXact->SxactGlobalXmin = snapshot->xmin;
1825 		PredXact->SxactGlobalXminCount = 1;
1826 		OldSerXidSetActiveSerXmin(snapshot->xmin);
1827 	}
1828 	else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
1829 	{
1830 		Assert(PredXact->SxactGlobalXminCount > 0);
1831 		PredXact->SxactGlobalXminCount++;
1832 	}
1833 	else
1834 	{
1835 		Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
1836 	}
1837 
1838 	/* Initialize the structure. */
1839 	sxact->vxid = vxid;
1840 	sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
1841 	sxact->prepareSeqNo = InvalidSerCommitSeqNo;
1842 	sxact->commitSeqNo = InvalidSerCommitSeqNo;
1843 	SHMQueueInit(&(sxact->outConflicts));
1844 	SHMQueueInit(&(sxact->inConflicts));
1845 	SHMQueueInit(&(sxact->possibleUnsafeConflicts));
1846 	sxact->topXid = GetTopTransactionIdIfAny();
1847 	sxact->finishedBefore = InvalidTransactionId;
1848 	sxact->xmin = snapshot->xmin;
1849 	sxact->pid = MyProcPid;
1850 	SHMQueueInit(&(sxact->predicateLocks));
1851 	SHMQueueElemInit(&(sxact->finishedLink));
1852 	sxact->flags = 0;
1853 	if (XactReadOnly)
1854 	{
1855 		sxact->flags |= SXACT_FLAG_READ_ONLY;
1856 
1857 		/*
1858 		 * Register all concurrent r/w transactions as possible conflicts; if
1859 		 * all of them commit without any outgoing conflicts to earlier
1860 		 * transactions then this snapshot can be deemed safe (and we can run
1861 		 * without tracking predicate locks).
1862 		 */
1863 		for (othersxact = FirstPredXact();
1864 			 othersxact != NULL;
1865 			 othersxact = NextPredXact(othersxact))
1866 		{
1867 			if (!SxactIsCommitted(othersxact)
1868 				&& !SxactIsDoomed(othersxact)
1869 				&& !SxactIsReadOnly(othersxact))
1870 			{
1871 				SetPossibleUnsafeConflict(sxact, othersxact);
1872 			}
1873 		}
1874 	}
1875 	else
1876 	{
1877 		++(PredXact->WritableSxactCount);
1878 		Assert(PredXact->WritableSxactCount <=
1879 			   (MaxBackends + max_prepared_xacts));
1880 	}
1881 
1882 	MySerializableXact = sxact;
1883 	MyXactDidWrite = false;		/* haven't written anything yet */
1884 
1885 	LWLockRelease(SerializableXactHashLock);
1886 
1887 	/* Initialize the backend-local hash table of parent locks */
1888 	Assert(LocalPredicateLockHash == NULL);
1889 	MemSet(&hash_ctl, 0, sizeof(hash_ctl));
1890 	hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
1891 	hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
1892 	LocalPredicateLockHash = hash_create("Local predicate lock",
1893 										 max_predicate_locks_per_xact,
1894 										 &hash_ctl,
1895 										 HASH_ELEM | HASH_BLOBS);
1896 
1897 	return snapshot;
1898 }
1899 
1900 /*
1901  * Register the top level XID in SerializableXidHash.
1902  * Also store it for easy reference in MySerializableXact.
1903  */
1904 void
RegisterPredicateLockingXid(TransactionId xid)1905 RegisterPredicateLockingXid(TransactionId xid)
1906 {
1907 	SERIALIZABLEXIDTAG sxidtag;
1908 	SERIALIZABLEXID *sxid;
1909 	bool		found;
1910 
1911 	/*
1912 	 * If we're not tracking predicate lock data for this transaction, we
1913 	 * should ignore the request and return quickly.
1914 	 */
1915 	if (MySerializableXact == InvalidSerializableXact)
1916 		return;
1917 
1918 	/* We should have a valid XID and be at the top level. */
1919 	Assert(TransactionIdIsValid(xid));
1920 
1921 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1922 
1923 	/* This should only be done once per transaction. */
1924 	Assert(MySerializableXact->topXid == InvalidTransactionId);
1925 
1926 	MySerializableXact->topXid = xid;
1927 
1928 	sxidtag.xid = xid;
1929 	sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
1930 										   &sxidtag,
1931 										   HASH_ENTER, &found);
1932 	Assert(!found);
1933 
1934 	/* Initialize the structure. */
1935 	sxid->myXact = MySerializableXact;
1936 	LWLockRelease(SerializableXactHashLock);
1937 }
1938 
1939 
1940 /*
1941  * Check whether there are any predicate locks held by any transaction
1942  * for the page at the given block number.
1943  *
1944  * Note that the transaction may be completed but not yet subject to
1945  * cleanup due to overlapping serializable transactions.  This must
1946  * return valid information regardless of transaction isolation level.
1947  *
1948  * Also note that this doesn't check for a conflicting relation lock,
1949  * just a lock specifically on the given page.
1950  *
1951  * One use is to support proper behavior during GiST index vacuum.
1952  */
1953 bool
PageIsPredicateLocked(Relation relation,BlockNumber blkno)1954 PageIsPredicateLocked(Relation relation, BlockNumber blkno)
1955 {
1956 	PREDICATELOCKTARGETTAG targettag;
1957 	uint32		targettaghash;
1958 	LWLock	   *partitionLock;
1959 	PREDICATELOCKTARGET *target;
1960 
1961 	SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
1962 									relation->rd_node.dbNode,
1963 									relation->rd_id,
1964 									blkno);
1965 
1966 	targettaghash = PredicateLockTargetTagHashCode(&targettag);
1967 	partitionLock = PredicateLockHashPartitionLock(targettaghash);
1968 	LWLockAcquire(partitionLock, LW_SHARED);
1969 	target = (PREDICATELOCKTARGET *)
1970 		hash_search_with_hash_value(PredicateLockTargetHash,
1971 									&targettag, targettaghash,
1972 									HASH_FIND, NULL);
1973 	LWLockRelease(partitionLock);
1974 
1975 	return (target != NULL);
1976 }
1977 
1978 
1979 /*
1980  * Check whether a particular lock is held by this transaction.
1981  *
1982  * Important note: this function may return false even if the lock is
1983  * being held, because it uses the local lock table which is not
1984  * updated if another transaction modifies our lock list (e.g. to
1985  * split an index page). It can also return true when a coarser
1986  * granularity lock that covers this target is being held. Be careful
1987  * to only use this function in circumstances where such errors are
1988  * acceptable!
1989  */
1990 static bool
PredicateLockExists(const PREDICATELOCKTARGETTAG * targettag)1991 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
1992 {
1993 	LOCALPREDICATELOCK *lock;
1994 
1995 	/* check local hash table */
1996 	lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1997 											  targettag,
1998 											  HASH_FIND, NULL);
1999 
2000 	if (!lock)
2001 		return false;
2002 
2003 	/*
2004 	 * Found entry in the table, but still need to check whether it's actually
2005 	 * held -- it could just be a parent of some held lock.
2006 	 */
2007 	return lock->held;
2008 }
2009 
2010 /*
2011  * Return the parent lock tag in the lock hierarchy: the next coarser
2012  * lock that covers the provided tag.
2013  *
2014  * Returns true and sets *parent to the parent tag if one exists,
2015  * returns false if none exists.
2016  */
2017 static bool
GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG * tag,PREDICATELOCKTARGETTAG * parent)2018 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
2019 						  PREDICATELOCKTARGETTAG *parent)
2020 {
2021 	switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
2022 	{
2023 		case PREDLOCKTAG_RELATION:
2024 			/* relation locks have no parent lock */
2025 			return false;
2026 
2027 		case PREDLOCKTAG_PAGE:
2028 			/* parent lock is relation lock */
2029 			SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
2030 										 GET_PREDICATELOCKTARGETTAG_DB(*tag),
2031 								  GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
2032 
2033 			return true;
2034 
2035 		case PREDLOCKTAG_TUPLE:
2036 			/* parent lock is page lock */
2037 			SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
2038 										 GET_PREDICATELOCKTARGETTAG_DB(*tag),
2039 								   GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
2040 									  GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
2041 			return true;
2042 	}
2043 
2044 	/* not reachable */
2045 	Assert(false);
2046 	return false;
2047 }
2048 
2049 /*
2050  * Check whether the lock we are considering is already covered by a
2051  * coarser lock for our transaction.
2052  *
2053  * Like PredicateLockExists, this function might return a false
2054  * negative, but it will never return a false positive.
2055  */
2056 static bool
CoarserLockCovers(const PREDICATELOCKTARGETTAG * newtargettag)2057 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
2058 {
2059 	PREDICATELOCKTARGETTAG targettag,
2060 				parenttag;
2061 
2062 	targettag = *newtargettag;
2063 
2064 	/* check parents iteratively until no more */
2065 	while (GetParentPredicateLockTag(&targettag, &parenttag))
2066 	{
2067 		targettag = parenttag;
2068 		if (PredicateLockExists(&targettag))
2069 			return true;
2070 	}
2071 
2072 	/* no more parents to check; lock is not covered */
2073 	return false;
2074 }
2075 
2076 /*
2077  * Remove the dummy entry from the predicate lock target hash, to free up some
2078  * scratch space. The caller must be holding SerializablePredicateLockListLock,
2079  * and must restore the entry with RestoreScratchTarget() before releasing the
2080  * lock.
2081  *
2082  * If lockheld is true, the caller is already holding the partition lock
2083  * of the partition containing the scratch entry.
2084  */
2085 static void
RemoveScratchTarget(bool lockheld)2086 RemoveScratchTarget(bool lockheld)
2087 {
2088 	bool		found;
2089 
2090 	Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2091 
2092 	if (!lockheld)
2093 		LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
2094 	hash_search_with_hash_value(PredicateLockTargetHash,
2095 								&ScratchTargetTag,
2096 								ScratchTargetTagHash,
2097 								HASH_REMOVE, &found);
2098 	Assert(found);
2099 	if (!lockheld)
2100 		LWLockRelease(ScratchPartitionLock);
2101 }
2102 
2103 /*
2104  * Re-insert the dummy entry in predicate lock target hash.
2105  */
2106 static void
RestoreScratchTarget(bool lockheld)2107 RestoreScratchTarget(bool lockheld)
2108 {
2109 	bool		found;
2110 
2111 	Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2112 
2113 	if (!lockheld)
2114 		LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
2115 	hash_search_with_hash_value(PredicateLockTargetHash,
2116 								&ScratchTargetTag,
2117 								ScratchTargetTagHash,
2118 								HASH_ENTER, &found);
2119 	Assert(!found);
2120 	if (!lockheld)
2121 		LWLockRelease(ScratchPartitionLock);
2122 }
2123 
2124 /*
2125  * Check whether the list of related predicate locks is empty for a
2126  * predicate lock target, and remove the target if it is.
2127  */
2128 static void
RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET * target,uint32 targettaghash)2129 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
2130 {
2131 	PREDICATELOCKTARGET *rmtarget PG_USED_FOR_ASSERTS_ONLY;
2132 
2133 	Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2134 
2135 	/* Can't remove it until no locks at this target. */
2136 	if (!SHMQueueEmpty(&target->predicateLocks))
2137 		return;
2138 
2139 	/* Actually remove the target. */
2140 	rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2141 										   &target->tag,
2142 										   targettaghash,
2143 										   HASH_REMOVE, NULL);
2144 	Assert(rmtarget == target);
2145 }
2146 
2147 /*
2148  * Delete child target locks owned by this process.
2149  * This implementation is assuming that the usage of each target tag field
2150  * is uniform.  No need to make this hard if we don't have to.
2151  *
2152  * We aren't acquiring lightweight locks for the predicate lock or lock
2153  * target structures associated with this transaction unless we're going
2154  * to modify them, because no other process is permitted to modify our
2155  * locks.
2156  */
2157 static void
DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG * newtargettag)2158 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
2159 {
2160 	SERIALIZABLEXACT *sxact;
2161 	PREDICATELOCK *predlock;
2162 
2163 	LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2164 	sxact = MySerializableXact;
2165 	predlock = (PREDICATELOCK *)
2166 		SHMQueueNext(&(sxact->predicateLocks),
2167 					 &(sxact->predicateLocks),
2168 					 offsetof(PREDICATELOCK, xactLink));
2169 	while (predlock)
2170 	{
2171 		SHM_QUEUE  *predlocksxactlink;
2172 		PREDICATELOCK *nextpredlock;
2173 		PREDICATELOCKTAG oldlocktag;
2174 		PREDICATELOCKTARGET *oldtarget;
2175 		PREDICATELOCKTARGETTAG oldtargettag;
2176 
2177 		predlocksxactlink = &(predlock->xactLink);
2178 		nextpredlock = (PREDICATELOCK *)
2179 			SHMQueueNext(&(sxact->predicateLocks),
2180 						 predlocksxactlink,
2181 						 offsetof(PREDICATELOCK, xactLink));
2182 
2183 		oldlocktag = predlock->tag;
2184 		Assert(oldlocktag.myXact == sxact);
2185 		oldtarget = oldlocktag.myTarget;
2186 		oldtargettag = oldtarget->tag;
2187 
2188 		if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
2189 		{
2190 			uint32		oldtargettaghash;
2191 			LWLock	   *partitionLock;
2192 			PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
2193 
2194 			oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2195 			partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2196 
2197 			LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2198 
2199 			SHMQueueDelete(predlocksxactlink);
2200 			SHMQueueDelete(&(predlock->targetLink));
2201 			rmpredlock = hash_search_with_hash_value
2202 				(PredicateLockHash,
2203 				 &oldlocktag,
2204 				 PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
2205 														 oldtargettaghash),
2206 				 HASH_REMOVE, NULL);
2207 			Assert(rmpredlock == predlock);
2208 
2209 			RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2210 
2211 			LWLockRelease(partitionLock);
2212 
2213 			DecrementParentLocks(&oldtargettag);
2214 		}
2215 
2216 		predlock = nextpredlock;
2217 	}
2218 	LWLockRelease(SerializablePredicateLockListLock);
2219 }
2220 
2221 /*
2222  * Returns the promotion threshold for a given predicate lock
2223  * target. This is the number of descendant locks required to promote
2224  * to the specified tag. Note that the threshold includes non-direct
2225  * descendants, e.g. both tuples and pages for a relation lock.
2226  *
2227  * TODO SSI: We should do something more intelligent about what the
2228  * thresholds are, either making it proportional to the number of
2229  * tuples in a page & pages in a relation, or at least making it a
2230  * GUC. Currently the threshold is 3 for a page lock, and
2231  * max_pred_locks_per_transaction/2 for a relation lock, chosen
2232  * entirely arbitrarily (and without benchmarking).
2233  */
2234 static int
PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG * tag)2235 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
2236 {
2237 	switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
2238 	{
2239 		case PREDLOCKTAG_RELATION:
2240 			return max_predicate_locks_per_xact / 2;
2241 
2242 		case PREDLOCKTAG_PAGE:
2243 			return 3;
2244 
2245 		case PREDLOCKTAG_TUPLE:
2246 
2247 			/*
2248 			 * not reachable: nothing is finer-granularity than a tuple, so we
2249 			 * should never try to promote to it.
2250 			 */
2251 			Assert(false);
2252 			return 0;
2253 	}
2254 
2255 	/* not reachable */
2256 	Assert(false);
2257 	return 0;
2258 }
2259 
2260 /*
2261  * For all ancestors of a newly-acquired predicate lock, increment
2262  * their child count in the parent hash table. If any of them have
2263  * more descendants than their promotion threshold, acquire the
2264  * coarsest such lock.
2265  *
2266  * Returns true if a parent lock was acquired and false otherwise.
2267  */
2268 static bool
CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG * reqtag)2269 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
2270 {
2271 	PREDICATELOCKTARGETTAG targettag,
2272 				nexttag,
2273 				promotiontag;
2274 	LOCALPREDICATELOCK *parentlock;
2275 	bool		found,
2276 				promote;
2277 
2278 	promote = false;
2279 
2280 	targettag = *reqtag;
2281 
2282 	/* check parents iteratively */
2283 	while (GetParentPredicateLockTag(&targettag, &nexttag))
2284 	{
2285 		targettag = nexttag;
2286 		parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
2287 														&targettag,
2288 														HASH_ENTER,
2289 														&found);
2290 		if (!found)
2291 		{
2292 			parentlock->held = false;
2293 			parentlock->childLocks = 1;
2294 		}
2295 		else
2296 			parentlock->childLocks++;
2297 
2298 		if (parentlock->childLocks >=
2299 			PredicateLockPromotionThreshold(&targettag))
2300 		{
2301 			/*
2302 			 * We should promote to this parent lock. Continue to check its
2303 			 * ancestors, however, both to get their child counts right and to
2304 			 * check whether we should just go ahead and promote to one of
2305 			 * them.
2306 			 */
2307 			promotiontag = targettag;
2308 			promote = true;
2309 		}
2310 	}
2311 
2312 	if (promote)
2313 	{
2314 		/* acquire coarsest ancestor eligible for promotion */
2315 		PredicateLockAcquire(&promotiontag);
2316 		return true;
2317 	}
2318 	else
2319 		return false;
2320 }
2321 
2322 /*
2323  * When releasing a lock, decrement the child count on all ancestor
2324  * locks.
2325  *
2326  * This is called only when releasing a lock via
2327  * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
2328  * we've acquired its parent, possibly due to promotion) or when a new
2329  * MVCC write lock makes the predicate lock unnecessary. There's no
2330  * point in calling it when locks are released at transaction end, as
2331  * this information is no longer needed.
2332  */
2333 static void
DecrementParentLocks(const PREDICATELOCKTARGETTAG * targettag)2334 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
2335 {
2336 	PREDICATELOCKTARGETTAG parenttag,
2337 				nexttag;
2338 
2339 	parenttag = *targettag;
2340 
2341 	while (GetParentPredicateLockTag(&parenttag, &nexttag))
2342 	{
2343 		uint32		targettaghash;
2344 		LOCALPREDICATELOCK *parentlock,
2345 				   *rmlock PG_USED_FOR_ASSERTS_ONLY;
2346 
2347 		parenttag = nexttag;
2348 		targettaghash = PredicateLockTargetTagHashCode(&parenttag);
2349 		parentlock = (LOCALPREDICATELOCK *)
2350 			hash_search_with_hash_value(LocalPredicateLockHash,
2351 										&parenttag, targettaghash,
2352 										HASH_FIND, NULL);
2353 
2354 		/*
2355 		 * There's a small chance the parent lock doesn't exist in the lock
2356 		 * table. This can happen if we prematurely removed it because an
2357 		 * index split caused the child refcount to be off.
2358 		 */
2359 		if (parentlock == NULL)
2360 			continue;
2361 
2362 		parentlock->childLocks--;
2363 
2364 		/*
2365 		 * Under similar circumstances the parent lock's refcount might be
2366 		 * zero. This only happens if we're holding that lock (otherwise we
2367 		 * would have removed the entry).
2368 		 */
2369 		if (parentlock->childLocks < 0)
2370 		{
2371 			Assert(parentlock->held);
2372 			parentlock->childLocks = 0;
2373 		}
2374 
2375 		if ((parentlock->childLocks == 0) && (!parentlock->held))
2376 		{
2377 			rmlock = (LOCALPREDICATELOCK *)
2378 				hash_search_with_hash_value(LocalPredicateLockHash,
2379 											&parenttag, targettaghash,
2380 											HASH_REMOVE, NULL);
2381 			Assert(rmlock == parentlock);
2382 		}
2383 	}
2384 }
2385 
2386 /*
2387  * Indicate that a predicate lock on the given target is held by the
2388  * specified transaction. Has no effect if the lock is already held.
2389  *
2390  * This updates the lock table and the sxact's lock list, and creates
2391  * the lock target if necessary, but does *not* do anything related to
2392  * granularity promotion or the local lock table. See
2393  * PredicateLockAcquire for that.
2394  */
2395 static void
CreatePredicateLock(const PREDICATELOCKTARGETTAG * targettag,uint32 targettaghash,SERIALIZABLEXACT * sxact)2396 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
2397 					uint32 targettaghash,
2398 					SERIALIZABLEXACT *sxact)
2399 {
2400 	PREDICATELOCKTARGET *target;
2401 	PREDICATELOCKTAG locktag;
2402 	PREDICATELOCK *lock;
2403 	LWLock	   *partitionLock;
2404 	bool		found;
2405 
2406 	partitionLock = PredicateLockHashPartitionLock(targettaghash);
2407 
2408 	LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2409 	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2410 
2411 	/* Make sure that the target is represented. */
2412 	target = (PREDICATELOCKTARGET *)
2413 		hash_search_with_hash_value(PredicateLockTargetHash,
2414 									targettag, targettaghash,
2415 									HASH_ENTER_NULL, &found);
2416 	if (!target)
2417 		ereport(ERROR,
2418 				(errcode(ERRCODE_OUT_OF_MEMORY),
2419 				 errmsg("out of shared memory"),
2420 				 errhint("You might need to increase max_pred_locks_per_transaction.")));
2421 	if (!found)
2422 		SHMQueueInit(&(target->predicateLocks));
2423 
2424 	/* We've got the sxact and target, make sure they're joined. */
2425 	locktag.myTarget = target;
2426 	locktag.myXact = sxact;
2427 	lock = (PREDICATELOCK *)
2428 		hash_search_with_hash_value(PredicateLockHash, &locktag,
2429 			PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
2430 									HASH_ENTER_NULL, &found);
2431 	if (!lock)
2432 		ereport(ERROR,
2433 				(errcode(ERRCODE_OUT_OF_MEMORY),
2434 				 errmsg("out of shared memory"),
2435 				 errhint("You might need to increase max_pred_locks_per_transaction.")));
2436 
2437 	if (!found)
2438 	{
2439 		SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
2440 		SHMQueueInsertBefore(&(sxact->predicateLocks),
2441 							 &(lock->xactLink));
2442 		lock->commitSeqNo = InvalidSerCommitSeqNo;
2443 	}
2444 
2445 	LWLockRelease(partitionLock);
2446 	LWLockRelease(SerializablePredicateLockListLock);
2447 }
2448 
2449 /*
2450  * Acquire a predicate lock on the specified target for the current
2451  * connection if not already held. This updates the local lock table
2452  * and uses it to implement granularity promotion. It will consolidate
2453  * multiple locks into a coarser lock if warranted, and will release
2454  * any finer-grained locks covered by the new one.
2455  */
2456 static void
PredicateLockAcquire(const PREDICATELOCKTARGETTAG * targettag)2457 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
2458 {
2459 	uint32		targettaghash;
2460 	bool		found;
2461 	LOCALPREDICATELOCK *locallock;
2462 
2463 	/* Do we have the lock already, or a covering lock? */
2464 	if (PredicateLockExists(targettag))
2465 		return;
2466 
2467 	if (CoarserLockCovers(targettag))
2468 		return;
2469 
2470 	/* the same hash and LW lock apply to the lock target and the local lock. */
2471 	targettaghash = PredicateLockTargetTagHashCode(targettag);
2472 
2473 	/* Acquire lock in local table */
2474 	locallock = (LOCALPREDICATELOCK *)
2475 		hash_search_with_hash_value(LocalPredicateLockHash,
2476 									targettag, targettaghash,
2477 									HASH_ENTER, &found);
2478 	locallock->held = true;
2479 	if (!found)
2480 		locallock->childLocks = 0;
2481 
2482 	/* Actually create the lock */
2483 	CreatePredicateLock(targettag, targettaghash, MySerializableXact);
2484 
2485 	/*
2486 	 * Lock has been acquired. Check whether it should be promoted to a
2487 	 * coarser granularity, or whether there are finer-granularity locks to
2488 	 * clean up.
2489 	 */
2490 	if (CheckAndPromotePredicateLockRequest(targettag))
2491 	{
2492 		/*
2493 		 * Lock request was promoted to a coarser-granularity lock, and that
2494 		 * lock was acquired. It will delete this lock and any of its
2495 		 * children, so we're done.
2496 		 */
2497 	}
2498 	else
2499 	{
2500 		/* Clean up any finer-granularity locks */
2501 		if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
2502 			DeleteChildTargetLocks(targettag);
2503 	}
2504 }
2505 
2506 
2507 /*
2508  *		PredicateLockRelation
2509  *
2510  * Gets a predicate lock at the relation level.
2511  * Skip if not in full serializable transaction isolation level.
2512  * Skip if this is a temporary table.
2513  * Clear any finer-grained predicate locks this session has on the relation.
2514  */
2515 void
PredicateLockRelation(Relation relation,Snapshot snapshot)2516 PredicateLockRelation(Relation relation, Snapshot snapshot)
2517 {
2518 	PREDICATELOCKTARGETTAG tag;
2519 
2520 	if (!SerializationNeededForRead(relation, snapshot))
2521 		return;
2522 
2523 	SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2524 										relation->rd_node.dbNode,
2525 										relation->rd_id);
2526 	PredicateLockAcquire(&tag);
2527 }
2528 
2529 /*
2530  *		PredicateLockPage
2531  *
2532  * Gets a predicate lock at the page level.
2533  * Skip if not in full serializable transaction isolation level.
2534  * Skip if this is a temporary table.
2535  * Skip if a coarser predicate lock already covers this page.
2536  * Clear any finer-grained predicate locks this session has on the relation.
2537  */
2538 void
PredicateLockPage(Relation relation,BlockNumber blkno,Snapshot snapshot)2539 PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
2540 {
2541 	PREDICATELOCKTARGETTAG tag;
2542 
2543 	if (!SerializationNeededForRead(relation, snapshot))
2544 		return;
2545 
2546 	SET_PREDICATELOCKTARGETTAG_PAGE(tag,
2547 									relation->rd_node.dbNode,
2548 									relation->rd_id,
2549 									blkno);
2550 	PredicateLockAcquire(&tag);
2551 }
2552 
2553 /*
2554  *		PredicateLockTuple
2555  *
2556  * Gets a predicate lock at the tuple level.
2557  * Skip if not in full serializable transaction isolation level.
2558  * Skip if this is a temporary table.
2559  */
2560 void
PredicateLockTuple(Relation relation,HeapTuple tuple,Snapshot snapshot)2561 PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot)
2562 {
2563 	PREDICATELOCKTARGETTAG tag;
2564 	ItemPointer tid;
2565 	TransactionId targetxmin;
2566 
2567 	if (!SerializationNeededForRead(relation, snapshot))
2568 		return;
2569 
2570 	/*
2571 	 * If it's a heap tuple, return if this xact wrote it.
2572 	 */
2573 	if (relation->rd_index == NULL)
2574 	{
2575 		TransactionId myxid;
2576 
2577 		targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
2578 
2579 		myxid = GetTopTransactionIdIfAny();
2580 		if (TransactionIdIsValid(myxid))
2581 		{
2582 			if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
2583 			{
2584 				TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
2585 
2586 				if (TransactionIdEquals(xid, myxid))
2587 				{
2588 					/* We wrote it; we already have a write lock. */
2589 					return;
2590 				}
2591 			}
2592 		}
2593 	}
2594 
2595 	/*
2596 	 * Do quick-but-not-definitive test for a relation lock first.  This will
2597 	 * never cause a return when the relation is *not* locked, but will
2598 	 * occasionally let the check continue when there really *is* a relation
2599 	 * level lock.
2600 	 */
2601 	SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2602 										relation->rd_node.dbNode,
2603 										relation->rd_id);
2604 	if (PredicateLockExists(&tag))
2605 		return;
2606 
2607 	tid = &(tuple->t_self);
2608 	SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
2609 									 relation->rd_node.dbNode,
2610 									 relation->rd_id,
2611 									 ItemPointerGetBlockNumber(tid),
2612 									 ItemPointerGetOffsetNumber(tid));
2613 	PredicateLockAcquire(&tag);
2614 }
2615 
2616 
2617 /*
2618  *		DeleteLockTarget
2619  *
2620  * Remove a predicate lock target along with any locks held for it.
2621  *
2622  * Caller must hold SerializablePredicateLockListLock and the
2623  * appropriate hash partition lock for the target.
2624  */
2625 static void
DeleteLockTarget(PREDICATELOCKTARGET * target,uint32 targettaghash)2626 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
2627 {
2628 	PREDICATELOCK *predlock;
2629 	SHM_QUEUE  *predlocktargetlink;
2630 	PREDICATELOCK *nextpredlock;
2631 	bool		found;
2632 
2633 	Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2634 	Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
2635 
2636 	predlock = (PREDICATELOCK *)
2637 		SHMQueueNext(&(target->predicateLocks),
2638 					 &(target->predicateLocks),
2639 					 offsetof(PREDICATELOCK, targetLink));
2640 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2641 	while (predlock)
2642 	{
2643 		predlocktargetlink = &(predlock->targetLink);
2644 		nextpredlock = (PREDICATELOCK *)
2645 			SHMQueueNext(&(target->predicateLocks),
2646 						 predlocktargetlink,
2647 						 offsetof(PREDICATELOCK, targetLink));
2648 
2649 		SHMQueueDelete(&(predlock->xactLink));
2650 		SHMQueueDelete(&(predlock->targetLink));
2651 
2652 		hash_search_with_hash_value
2653 			(PredicateLockHash,
2654 			 &predlock->tag,
2655 			 PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
2656 													 targettaghash),
2657 			 HASH_REMOVE, &found);
2658 		Assert(found);
2659 
2660 		predlock = nextpredlock;
2661 	}
2662 	LWLockRelease(SerializableXactHashLock);
2663 
2664 	/* Remove the target itself, if possible. */
2665 	RemoveTargetIfNoLongerUsed(target, targettaghash);
2666 }
2667 
2668 
2669 /*
2670  *		TransferPredicateLocksToNewTarget
2671  *
2672  * Move or copy all the predicate locks for a lock target, for use by
2673  * index page splits/combines and other things that create or replace
2674  * lock targets. If 'removeOld' is true, the old locks and the target
2675  * will be removed.
2676  *
2677  * Returns true on success, or false if we ran out of shared memory to
2678  * allocate the new target or locks. Guaranteed to always succeed if
2679  * removeOld is set (by using the scratch entry in PredicateLockTargetHash
2680  * for scratch space).
2681  *
2682  * Warning: the "removeOld" option should be used only with care,
2683  * because this function does not (indeed, can not) update other
2684  * backends' LocalPredicateLockHash. If we are only adding new
2685  * entries, this is not a problem: the local lock table is used only
2686  * as a hint, so missing entries for locks that are held are
2687  * OK. Having entries for locks that are no longer held, as can happen
2688  * when using "removeOld", is not in general OK. We can only use it
2689  * safely when replacing a lock with a coarser-granularity lock that
2690  * covers it, or if we are absolutely certain that no one will need to
2691  * refer to that lock in the future.
2692  *
2693  * Caller must hold SerializablePredicateLockListLock.
2694  */
2695 static bool
TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,PREDICATELOCKTARGETTAG newtargettag,bool removeOld)2696 TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
2697 								  PREDICATELOCKTARGETTAG newtargettag,
2698 								  bool removeOld)
2699 {
2700 	uint32		oldtargettaghash;
2701 	LWLock	   *oldpartitionLock;
2702 	PREDICATELOCKTARGET *oldtarget;
2703 	uint32		newtargettaghash;
2704 	LWLock	   *newpartitionLock;
2705 	bool		found;
2706 	bool		outOfShmem = false;
2707 
2708 	Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2709 
2710 	oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2711 	newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
2712 	oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2713 	newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
2714 
2715 	if (removeOld)
2716 	{
2717 		/*
2718 		 * Remove the dummy entry to give us scratch space, so we know we'll
2719 		 * be able to create the new lock target.
2720 		 */
2721 		RemoveScratchTarget(false);
2722 	}
2723 
2724 	/*
2725 	 * We must get the partition locks in ascending sequence to avoid
2726 	 * deadlocks. If old and new partitions are the same, we must request the
2727 	 * lock only once.
2728 	 */
2729 	if (oldpartitionLock < newpartitionLock)
2730 	{
2731 		LWLockAcquire(oldpartitionLock,
2732 					  (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2733 		LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2734 	}
2735 	else if (oldpartitionLock > newpartitionLock)
2736 	{
2737 		LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2738 		LWLockAcquire(oldpartitionLock,
2739 					  (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2740 	}
2741 	else
2742 		LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2743 
2744 	/*
2745 	 * Look for the old target.  If not found, that's OK; no predicate locks
2746 	 * are affected, so we can just clean up and return. If it does exist,
2747 	 * walk its list of predicate locks and move or copy them to the new
2748 	 * target.
2749 	 */
2750 	oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2751 											&oldtargettag,
2752 											oldtargettaghash,
2753 											HASH_FIND, NULL);
2754 
2755 	if (oldtarget)
2756 	{
2757 		PREDICATELOCKTARGET *newtarget;
2758 		PREDICATELOCK *oldpredlock;
2759 		PREDICATELOCKTAG newpredlocktag;
2760 
2761 		newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2762 												&newtargettag,
2763 												newtargettaghash,
2764 												HASH_ENTER_NULL, &found);
2765 
2766 		if (!newtarget)
2767 		{
2768 			/* Failed to allocate due to insufficient shmem */
2769 			outOfShmem = true;
2770 			goto exit;
2771 		}
2772 
2773 		/* If we created a new entry, initialize it */
2774 		if (!found)
2775 			SHMQueueInit(&(newtarget->predicateLocks));
2776 
2777 		newpredlocktag.myTarget = newtarget;
2778 
2779 		/*
2780 		 * Loop through all the locks on the old target, replacing them with
2781 		 * locks on the new target.
2782 		 */
2783 		oldpredlock = (PREDICATELOCK *)
2784 			SHMQueueNext(&(oldtarget->predicateLocks),
2785 						 &(oldtarget->predicateLocks),
2786 						 offsetof(PREDICATELOCK, targetLink));
2787 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2788 		while (oldpredlock)
2789 		{
2790 			SHM_QUEUE  *predlocktargetlink;
2791 			PREDICATELOCK *nextpredlock;
2792 			PREDICATELOCK *newpredlock;
2793 			SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
2794 
2795 			predlocktargetlink = &(oldpredlock->targetLink);
2796 			nextpredlock = (PREDICATELOCK *)
2797 				SHMQueueNext(&(oldtarget->predicateLocks),
2798 							 predlocktargetlink,
2799 							 offsetof(PREDICATELOCK, targetLink));
2800 			newpredlocktag.myXact = oldpredlock->tag.myXact;
2801 
2802 			if (removeOld)
2803 			{
2804 				SHMQueueDelete(&(oldpredlock->xactLink));
2805 				SHMQueueDelete(&(oldpredlock->targetLink));
2806 
2807 				hash_search_with_hash_value
2808 					(PredicateLockHash,
2809 					 &oldpredlock->tag,
2810 				   PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
2811 														   oldtargettaghash),
2812 					 HASH_REMOVE, &found);
2813 				Assert(found);
2814 			}
2815 
2816 			newpredlock = (PREDICATELOCK *)
2817 				hash_search_with_hash_value(PredicateLockHash,
2818 											&newpredlocktag,
2819 					 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2820 														   newtargettaghash),
2821 											HASH_ENTER_NULL,
2822 											&found);
2823 			if (!newpredlock)
2824 			{
2825 				/* Out of shared memory. Undo what we've done so far. */
2826 				LWLockRelease(SerializableXactHashLock);
2827 				DeleteLockTarget(newtarget, newtargettaghash);
2828 				outOfShmem = true;
2829 				goto exit;
2830 			}
2831 			if (!found)
2832 			{
2833 				SHMQueueInsertBefore(&(newtarget->predicateLocks),
2834 									 &(newpredlock->targetLink));
2835 				SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2836 									 &(newpredlock->xactLink));
2837 				newpredlock->commitSeqNo = oldCommitSeqNo;
2838 			}
2839 			else
2840 			{
2841 				if (newpredlock->commitSeqNo < oldCommitSeqNo)
2842 					newpredlock->commitSeqNo = oldCommitSeqNo;
2843 			}
2844 
2845 			Assert(newpredlock->commitSeqNo != 0);
2846 			Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2847 				   || (newpredlock->tag.myXact == OldCommittedSxact));
2848 
2849 			oldpredlock = nextpredlock;
2850 		}
2851 		LWLockRelease(SerializableXactHashLock);
2852 
2853 		if (removeOld)
2854 		{
2855 			Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
2856 			RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2857 		}
2858 	}
2859 
2860 
2861 exit:
2862 	/* Release partition locks in reverse order of acquisition. */
2863 	if (oldpartitionLock < newpartitionLock)
2864 	{
2865 		LWLockRelease(newpartitionLock);
2866 		LWLockRelease(oldpartitionLock);
2867 	}
2868 	else if (oldpartitionLock > newpartitionLock)
2869 	{
2870 		LWLockRelease(oldpartitionLock);
2871 		LWLockRelease(newpartitionLock);
2872 	}
2873 	else
2874 		LWLockRelease(newpartitionLock);
2875 
2876 	if (removeOld)
2877 	{
2878 		/* We shouldn't run out of memory if we're moving locks */
2879 		Assert(!outOfShmem);
2880 
2881 		/* Put the scrach entry back */
2882 		RestoreScratchTarget(false);
2883 	}
2884 
2885 	return !outOfShmem;
2886 }
2887 
2888 /*
2889  * Drop all predicate locks of any granularity from the specified relation,
2890  * which can be a heap relation or an index relation.  If 'transfer' is true,
2891  * acquire a relation lock on the heap for any transactions with any lock(s)
2892  * on the specified relation.
2893  *
2894  * This requires grabbing a lot of LW locks and scanning the entire lock
2895  * target table for matches.  That makes this more expensive than most
2896  * predicate lock management functions, but it will only be called for DDL
2897  * type commands that are expensive anyway, and there are fast returns when
2898  * no serializable transactions are active or the relation is temporary.
2899  *
2900  * We don't use the TransferPredicateLocksToNewTarget function because it
2901  * acquires its own locks on the partitions of the two targets involved,
2902  * and we'll already be holding all partition locks.
2903  *
2904  * We can't throw an error from here, because the call could be from a
2905  * transaction which is not serializable.
2906  *
2907  * NOTE: This is currently only called with transfer set to true, but that may
2908  * change.  If we decide to clean up the locks from a table on commit of a
2909  * transaction which executed DROP TABLE, the false condition will be useful.
2910  */
2911 static void
DropAllPredicateLocksFromTable(Relation relation,bool transfer)2912 DropAllPredicateLocksFromTable(Relation relation, bool transfer)
2913 {
2914 	HASH_SEQ_STATUS seqstat;
2915 	PREDICATELOCKTARGET *oldtarget;
2916 	PREDICATELOCKTARGET *heaptarget;
2917 	Oid			dbId;
2918 	Oid			relId;
2919 	Oid			heapId;
2920 	int			i;
2921 	bool		isIndex;
2922 	bool		found;
2923 	uint32		heaptargettaghash;
2924 
2925 	/*
2926 	 * Bail out quickly if there are no serializable transactions running.
2927 	 * It's safe to check this without taking locks because the caller is
2928 	 * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
2929 	 * would matter here can be acquired while that is held.
2930 	 */
2931 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2932 		return;
2933 
2934 	if (!PredicateLockingNeededForRelation(relation))
2935 		return;
2936 
2937 	dbId = relation->rd_node.dbNode;
2938 	relId = relation->rd_id;
2939 	if (relation->rd_index == NULL)
2940 	{
2941 		isIndex = false;
2942 		heapId = relId;
2943 	}
2944 	else
2945 	{
2946 		isIndex = true;
2947 		heapId = relation->rd_index->indrelid;
2948 	}
2949 	Assert(heapId != InvalidOid);
2950 	Assert(transfer || !isIndex);		/* index OID only makes sense with
2951 										 * transfer */
2952 
2953 	/* Retrieve first time needed, then keep. */
2954 	heaptargettaghash = 0;
2955 	heaptarget = NULL;
2956 
2957 	/* Acquire locks on all lock partitions */
2958 	LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2959 	for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
2960 		LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
2961 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2962 
2963 	/*
2964 	 * Remove the dummy entry to give us scratch space, so we know we'll be
2965 	 * able to create the new lock target.
2966 	 */
2967 	if (transfer)
2968 		RemoveScratchTarget(true);
2969 
2970 	/* Scan through target map */
2971 	hash_seq_init(&seqstat, PredicateLockTargetHash);
2972 
2973 	while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
2974 	{
2975 		PREDICATELOCK *oldpredlock;
2976 
2977 		/*
2978 		 * Check whether this is a target which needs attention.
2979 		 */
2980 		if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != relId)
2981 			continue;			/* wrong relation id */
2982 		if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
2983 			continue;			/* wrong database id */
2984 		if (transfer && !isIndex
2985 			&& GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
2986 			continue;			/* already the right lock */
2987 
2988 		/*
2989 		 * If we made it here, we have work to do.  We make sure the heap
2990 		 * relation lock exists, then we walk the list of predicate locks for
2991 		 * the old target we found, moving all locks to the heap relation lock
2992 		 * -- unless they already hold that.
2993 		 */
2994 
2995 		/*
2996 		 * First make sure we have the heap relation target.  We only need to
2997 		 * do this once.
2998 		 */
2999 		if (transfer && heaptarget == NULL)
3000 		{
3001 			PREDICATELOCKTARGETTAG heaptargettag;
3002 
3003 			SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
3004 			heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
3005 			heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
3006 													 &heaptargettag,
3007 													 heaptargettaghash,
3008 													 HASH_ENTER, &found);
3009 			if (!found)
3010 				SHMQueueInit(&heaptarget->predicateLocks);
3011 		}
3012 
3013 		/*
3014 		 * Loop through all the locks on the old target, replacing them with
3015 		 * locks on the new target.
3016 		 */
3017 		oldpredlock = (PREDICATELOCK *)
3018 			SHMQueueNext(&(oldtarget->predicateLocks),
3019 						 &(oldtarget->predicateLocks),
3020 						 offsetof(PREDICATELOCK, targetLink));
3021 		while (oldpredlock)
3022 		{
3023 			PREDICATELOCK *nextpredlock;
3024 			PREDICATELOCK *newpredlock;
3025 			SerCommitSeqNo oldCommitSeqNo;
3026 			SERIALIZABLEXACT *oldXact;
3027 
3028 			nextpredlock = (PREDICATELOCK *)
3029 				SHMQueueNext(&(oldtarget->predicateLocks),
3030 							 &(oldpredlock->targetLink),
3031 							 offsetof(PREDICATELOCK, targetLink));
3032 
3033 			/*
3034 			 * Remove the old lock first. This avoids the chance of running
3035 			 * out of lock structure entries for the hash table.
3036 			 */
3037 			oldCommitSeqNo = oldpredlock->commitSeqNo;
3038 			oldXact = oldpredlock->tag.myXact;
3039 
3040 			SHMQueueDelete(&(oldpredlock->xactLink));
3041 
3042 			/*
3043 			 * No need for retail delete from oldtarget list, we're removing
3044 			 * the whole target anyway.
3045 			 */
3046 			hash_search(PredicateLockHash,
3047 						&oldpredlock->tag,
3048 						HASH_REMOVE, &found);
3049 			Assert(found);
3050 
3051 			if (transfer)
3052 			{
3053 				PREDICATELOCKTAG newpredlocktag;
3054 
3055 				newpredlocktag.myTarget = heaptarget;
3056 				newpredlocktag.myXact = oldXact;
3057 				newpredlock = (PREDICATELOCK *)
3058 					hash_search_with_hash_value(PredicateLockHash,
3059 												&newpredlocktag,
3060 					 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
3061 														  heaptargettaghash),
3062 												HASH_ENTER,
3063 												&found);
3064 				if (!found)
3065 				{
3066 					SHMQueueInsertBefore(&(heaptarget->predicateLocks),
3067 										 &(newpredlock->targetLink));
3068 					SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
3069 										 &(newpredlock->xactLink));
3070 					newpredlock->commitSeqNo = oldCommitSeqNo;
3071 				}
3072 				else
3073 				{
3074 					if (newpredlock->commitSeqNo < oldCommitSeqNo)
3075 						newpredlock->commitSeqNo = oldCommitSeqNo;
3076 				}
3077 
3078 				Assert(newpredlock->commitSeqNo != 0);
3079 				Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
3080 					   || (newpredlock->tag.myXact == OldCommittedSxact));
3081 			}
3082 
3083 			oldpredlock = nextpredlock;
3084 		}
3085 
3086 		hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
3087 					&found);
3088 		Assert(found);
3089 	}
3090 
3091 	/* Put the scratch entry back */
3092 	if (transfer)
3093 		RestoreScratchTarget(true);
3094 
3095 	/* Release locks in reverse order */
3096 	LWLockRelease(SerializableXactHashLock);
3097 	for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
3098 		LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
3099 	LWLockRelease(SerializablePredicateLockListLock);
3100 }
3101 
3102 /*
3103  * TransferPredicateLocksToHeapRelation
3104  *		For all transactions, transfer all predicate locks for the given
3105  *		relation to a single relation lock on the heap.
3106  */
3107 void
TransferPredicateLocksToHeapRelation(Relation relation)3108 TransferPredicateLocksToHeapRelation(Relation relation)
3109 {
3110 	DropAllPredicateLocksFromTable(relation, true);
3111 }
3112 
3113 
3114 /*
3115  *		PredicateLockPageSplit
3116  *
3117  * Copies any predicate locks for the old page to the new page.
3118  * Skip if this is a temporary table or toast table.
3119  *
3120  * NOTE: A page split (or overflow) affects all serializable transactions,
3121  * even if it occurs in the context of another transaction isolation level.
3122  *
3123  * NOTE: This currently leaves the local copy of the locks without
3124  * information on the new lock which is in shared memory.  This could cause
3125  * problems if enough page splits occur on locked pages without the processes
3126  * which hold the locks getting in and noticing.
3127  */
3128 void
PredicateLockPageSplit(Relation relation,BlockNumber oldblkno,BlockNumber newblkno)3129 PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
3130 					   BlockNumber newblkno)
3131 {
3132 	PREDICATELOCKTARGETTAG oldtargettag;
3133 	PREDICATELOCKTARGETTAG newtargettag;
3134 	bool		success;
3135 
3136 	/*
3137 	 * Bail out quickly if there are no serializable transactions running.
3138 	 *
3139 	 * It's safe to do this check without taking any additional locks. Even if
3140 	 * a serializable transaction starts concurrently, we know it can't take
3141 	 * any SIREAD locks on the page being split because the caller is holding
3142 	 * the associated buffer page lock. Memory reordering isn't an issue; the
3143 	 * memory barrier in the LWLock acquisition guarantees that this read
3144 	 * occurs while the buffer page lock is held.
3145 	 */
3146 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
3147 		return;
3148 
3149 	if (!PredicateLockingNeededForRelation(relation))
3150 		return;
3151 
3152 	Assert(oldblkno != newblkno);
3153 	Assert(BlockNumberIsValid(oldblkno));
3154 	Assert(BlockNumberIsValid(newblkno));
3155 
3156 	SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
3157 									relation->rd_node.dbNode,
3158 									relation->rd_id,
3159 									oldblkno);
3160 	SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
3161 									relation->rd_node.dbNode,
3162 									relation->rd_id,
3163 									newblkno);
3164 
3165 	LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
3166 
3167 	/*
3168 	 * Try copying the locks over to the new page's tag, creating it if
3169 	 * necessary.
3170 	 */
3171 	success = TransferPredicateLocksToNewTarget(oldtargettag,
3172 												newtargettag,
3173 												false);
3174 
3175 	if (!success)
3176 	{
3177 		/*
3178 		 * No more predicate lock entries are available. Failure isn't an
3179 		 * option here, so promote the page lock to a relation lock.
3180 		 */
3181 
3182 		/* Get the parent relation lock's lock tag */
3183 		success = GetParentPredicateLockTag(&oldtargettag,
3184 											&newtargettag);
3185 		Assert(success);
3186 
3187 		/*
3188 		 * Move the locks to the parent. This shouldn't fail.
3189 		 *
3190 		 * Note that here we are removing locks held by other backends,
3191 		 * leading to a possible inconsistency in their local lock hash table.
3192 		 * This is OK because we're replacing it with a lock that covers the
3193 		 * old one.
3194 		 */
3195 		success = TransferPredicateLocksToNewTarget(oldtargettag,
3196 													newtargettag,
3197 													true);
3198 		Assert(success);
3199 	}
3200 
3201 	LWLockRelease(SerializablePredicateLockListLock);
3202 }
3203 
3204 /*
3205  *		PredicateLockPageCombine
3206  *
3207  * Combines predicate locks for two existing pages.
3208  * Skip if this is a temporary table or toast table.
3209  *
3210  * NOTE: A page combine affects all serializable transactions, even if it
3211  * occurs in the context of another transaction isolation level.
3212  */
3213 void
PredicateLockPageCombine(Relation relation,BlockNumber oldblkno,BlockNumber newblkno)3214 PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
3215 						 BlockNumber newblkno)
3216 {
3217 	/*
3218 	 * Page combines differ from page splits in that we ought to be able to
3219 	 * remove the locks on the old page after transferring them to the new
3220 	 * page, instead of duplicating them. However, because we can't edit other
3221 	 * backends' local lock tables, removing the old lock would leave them
3222 	 * with an entry in their LocalPredicateLockHash for a lock they're not
3223 	 * holding, which isn't acceptable. So we wind up having to do the same
3224 	 * work as a page split, acquiring a lock on the new page and keeping the
3225 	 * old page locked too. That can lead to some false positives, but should
3226 	 * be rare in practice.
3227 	 */
3228 	PredicateLockPageSplit(relation, oldblkno, newblkno);
3229 }
3230 
3231 /*
3232  * Walk the list of in-progress serializable transactions and find the new
3233  * xmin.
3234  */
3235 static void
SetNewSxactGlobalXmin(void)3236 SetNewSxactGlobalXmin(void)
3237 {
3238 	SERIALIZABLEXACT *sxact;
3239 
3240 	Assert(LWLockHeldByMe(SerializableXactHashLock));
3241 
3242 	PredXact->SxactGlobalXmin = InvalidTransactionId;
3243 	PredXact->SxactGlobalXminCount = 0;
3244 
3245 	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
3246 	{
3247 		if (!SxactIsRolledBack(sxact)
3248 			&& !SxactIsCommitted(sxact)
3249 			&& sxact != OldCommittedSxact)
3250 		{
3251 			Assert(sxact->xmin != InvalidTransactionId);
3252 			if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3253 				|| TransactionIdPrecedes(sxact->xmin,
3254 										 PredXact->SxactGlobalXmin))
3255 			{
3256 				PredXact->SxactGlobalXmin = sxact->xmin;
3257 				PredXact->SxactGlobalXminCount = 1;
3258 			}
3259 			else if (TransactionIdEquals(sxact->xmin,
3260 										 PredXact->SxactGlobalXmin))
3261 				PredXact->SxactGlobalXminCount++;
3262 		}
3263 	}
3264 
3265 	OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
3266 }
3267 
3268 /*
3269  *		ReleasePredicateLocks
3270  *
3271  * Releases predicate locks based on completion of the current transaction,
3272  * whether committed or rolled back.  It can also be called for a read only
3273  * transaction when it becomes impossible for the transaction to become
3274  * part of a dangerous structure.
3275  *
3276  * We do nothing unless this is a serializable transaction.
3277  *
3278  * This method must ensure that shared memory hash tables are cleaned
3279  * up in some relatively timely fashion.
3280  *
3281  * If this transaction is committing and is holding any predicate locks,
3282  * it must be added to a list of completed serializable transactions still
3283  * holding locks.
3284  */
3285 void
ReleasePredicateLocks(bool isCommit)3286 ReleasePredicateLocks(bool isCommit)
3287 {
3288 	bool		needToClear;
3289 	RWConflict	conflict,
3290 				nextConflict,
3291 				possibleUnsafeConflict;
3292 	SERIALIZABLEXACT *roXact;
3293 
3294 	/*
3295 	 * We can't trust XactReadOnly here, because a transaction which started
3296 	 * as READ WRITE can show as READ ONLY later, e.g., within
3297 	 * subtransactions.  We want to flag a transaction as READ ONLY if it
3298 	 * commits without writing so that de facto READ ONLY transactions get the
3299 	 * benefit of some RO optimizations, so we will use this local variable to
3300 	 * get some cleanup logic right which is based on whether the transaction
3301 	 * was declared READ ONLY at the top level.
3302 	 */
3303 	bool		topLevelIsDeclaredReadOnly;
3304 
3305 	if (MySerializableXact == InvalidSerializableXact)
3306 	{
3307 		Assert(LocalPredicateLockHash == NULL);
3308 		return;
3309 	}
3310 
3311 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3312 
3313 	Assert(!isCommit || SxactIsPrepared(MySerializableXact));
3314 	Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
3315 	Assert(!SxactIsCommitted(MySerializableXact));
3316 	Assert(!SxactIsRolledBack(MySerializableXact));
3317 
3318 	/* may not be serializable during COMMIT/ROLLBACK PREPARED */
3319 	Assert(MySerializableXact->pid == 0 || IsolationIsSerializable());
3320 
3321 	/* We'd better not already be on the cleanup list. */
3322 	Assert(!SxactIsOnFinishedList(MySerializableXact));
3323 
3324 	topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
3325 
3326 	/*
3327 	 * We don't hold XidGenLock lock here, assuming that TransactionId is
3328 	 * atomic!
3329 	 *
3330 	 * If this value is changing, we don't care that much whether we get the
3331 	 * old or new value -- it is just used to determine how far
3332 	 * GlobalSerializableXmin must advance before this transaction can be
3333 	 * fully cleaned up.  The worst that could happen is we wait for one more
3334 	 * transaction to complete before freeing some RAM; correctness of visible
3335 	 * behavior is not affected.
3336 	 */
3337 	MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
3338 
3339 	/*
3340 	 * If it's not a commit it's a rollback, and we can clear our locks
3341 	 * immediately.
3342 	 */
3343 	if (isCommit)
3344 	{
3345 		MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
3346 		MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
3347 		/* Recognize implicit read-only transaction (commit without write). */
3348 		if (!MyXactDidWrite)
3349 			MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
3350 	}
3351 	else
3352 	{
3353 		/*
3354 		 * The DOOMED flag indicates that we intend to roll back this
3355 		 * transaction and so it should not cause serialization failures for
3356 		 * other transactions that conflict with it. Note that this flag might
3357 		 * already be set, if another backend marked this transaction for
3358 		 * abort.
3359 		 *
3360 		 * The ROLLED_BACK flag further indicates that ReleasePredicateLocks
3361 		 * has been called, and so the SerializableXact is eligible for
3362 		 * cleanup. This means it should not be considered when calculating
3363 		 * SxactGlobalXmin.
3364 		 */
3365 		MySerializableXact->flags |= SXACT_FLAG_DOOMED;
3366 		MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
3367 
3368 		/*
3369 		 * If the transaction was previously prepared, but is now failing due
3370 		 * to a ROLLBACK PREPARED or (hopefully very rare) error after the
3371 		 * prepare, clear the prepared flag.  This simplifies conflict
3372 		 * checking.
3373 		 */
3374 		MySerializableXact->flags &= ~SXACT_FLAG_PREPARED;
3375 	}
3376 
3377 	if (!topLevelIsDeclaredReadOnly)
3378 	{
3379 		Assert(PredXact->WritableSxactCount > 0);
3380 		if (--(PredXact->WritableSxactCount) == 0)
3381 		{
3382 			/*
3383 			 * Release predicate locks and rw-conflicts in for all committed
3384 			 * transactions.  There are no longer any transactions which might
3385 			 * conflict with the locks and no chance for new transactions to
3386 			 * overlap.  Similarly, existing conflicts in can't cause pivots,
3387 			 * and any conflicts in which could have completed a dangerous
3388 			 * structure would already have caused a rollback, so any
3389 			 * remaining ones must be benign.
3390 			 */
3391 			PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
3392 		}
3393 	}
3394 	else
3395 	{
3396 		/*
3397 		 * Read-only transactions: clear the list of transactions that might
3398 		 * make us unsafe. Note that we use 'inLink' for the iteration as
3399 		 * opposed to 'outLink' for the r/w xacts.
3400 		 */
3401 		possibleUnsafeConflict = (RWConflict)
3402 			SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3403 						 &MySerializableXact->possibleUnsafeConflicts,
3404 						 offsetof(RWConflictData, inLink));
3405 		while (possibleUnsafeConflict)
3406 		{
3407 			nextConflict = (RWConflict)
3408 				SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3409 							 &possibleUnsafeConflict->inLink,
3410 							 offsetof(RWConflictData, inLink));
3411 
3412 			Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
3413 			Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
3414 
3415 			ReleaseRWConflict(possibleUnsafeConflict);
3416 
3417 			possibleUnsafeConflict = nextConflict;
3418 		}
3419 	}
3420 
3421 	/* Check for conflict out to old committed transactions. */
3422 	if (isCommit
3423 		&& !SxactIsReadOnly(MySerializableXact)
3424 		&& SxactHasSummaryConflictOut(MySerializableXact))
3425 	{
3426 		/*
3427 		 * we don't know which old committed transaction we conflicted with,
3428 		 * so be conservative and use FirstNormalSerCommitSeqNo here
3429 		 */
3430 		MySerializableXact->SeqNo.earliestOutConflictCommit =
3431 			FirstNormalSerCommitSeqNo;
3432 		MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3433 	}
3434 
3435 	/*
3436 	 * Release all outConflicts to committed transactions.  If we're rolling
3437 	 * back clear them all.  Set SXACT_FLAG_CONFLICT_OUT if any point to
3438 	 * previously committed transactions.
3439 	 */
3440 	conflict = (RWConflict)
3441 		SHMQueueNext(&MySerializableXact->outConflicts,
3442 					 &MySerializableXact->outConflicts,
3443 					 offsetof(RWConflictData, outLink));
3444 	while (conflict)
3445 	{
3446 		nextConflict = (RWConflict)
3447 			SHMQueueNext(&MySerializableXact->outConflicts,
3448 						 &conflict->outLink,
3449 						 offsetof(RWConflictData, outLink));
3450 
3451 		if (isCommit
3452 			&& !SxactIsReadOnly(MySerializableXact)
3453 			&& SxactIsCommitted(conflict->sxactIn))
3454 		{
3455 			if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
3456 				|| conflict->sxactIn->prepareSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
3457 				MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->prepareSeqNo;
3458 			MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3459 		}
3460 
3461 		if (!isCommit
3462 			|| SxactIsCommitted(conflict->sxactIn)
3463 			|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
3464 			ReleaseRWConflict(conflict);
3465 
3466 		conflict = nextConflict;
3467 	}
3468 
3469 	/*
3470 	 * Release all inConflicts from committed and read-only transactions. If
3471 	 * we're rolling back, clear them all.
3472 	 */
3473 	conflict = (RWConflict)
3474 		SHMQueueNext(&MySerializableXact->inConflicts,
3475 					 &MySerializableXact->inConflicts,
3476 					 offsetof(RWConflictData, inLink));
3477 	while (conflict)
3478 	{
3479 		nextConflict = (RWConflict)
3480 			SHMQueueNext(&MySerializableXact->inConflicts,
3481 						 &conflict->inLink,
3482 						 offsetof(RWConflictData, inLink));
3483 
3484 		if (!isCommit
3485 			|| SxactIsCommitted(conflict->sxactOut)
3486 			|| SxactIsReadOnly(conflict->sxactOut))
3487 			ReleaseRWConflict(conflict);
3488 
3489 		conflict = nextConflict;
3490 	}
3491 
3492 	if (!topLevelIsDeclaredReadOnly)
3493 	{
3494 		/*
3495 		 * Remove ourselves from the list of possible conflicts for concurrent
3496 		 * READ ONLY transactions, flagging them as unsafe if we have a
3497 		 * conflict out. If any are waiting DEFERRABLE transactions, wake them
3498 		 * up if they are known safe or known unsafe.
3499 		 */
3500 		possibleUnsafeConflict = (RWConflict)
3501 			SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3502 						 &MySerializableXact->possibleUnsafeConflicts,
3503 						 offsetof(RWConflictData, outLink));
3504 		while (possibleUnsafeConflict)
3505 		{
3506 			nextConflict = (RWConflict)
3507 				SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3508 							 &possibleUnsafeConflict->outLink,
3509 							 offsetof(RWConflictData, outLink));
3510 
3511 			roXact = possibleUnsafeConflict->sxactIn;
3512 			Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
3513 			Assert(SxactIsReadOnly(roXact));
3514 
3515 			/* Mark conflicted if necessary. */
3516 			if (isCommit
3517 				&& MyXactDidWrite
3518 				&& SxactHasConflictOut(MySerializableXact)
3519 				&& (MySerializableXact->SeqNo.earliestOutConflictCommit
3520 					<= roXact->SeqNo.lastCommitBeforeSnapshot))
3521 			{
3522 				/*
3523 				 * This releases possibleUnsafeConflict (as well as all other
3524 				 * possible conflicts for roXact)
3525 				 */
3526 				FlagSxactUnsafe(roXact);
3527 			}
3528 			else
3529 			{
3530 				ReleaseRWConflict(possibleUnsafeConflict);
3531 
3532 				/*
3533 				 * If we were the last possible conflict, flag it safe. The
3534 				 * transaction can now safely release its predicate locks (but
3535 				 * that transaction's backend has to do that itself).
3536 				 */
3537 				if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
3538 					roXact->flags |= SXACT_FLAG_RO_SAFE;
3539 			}
3540 
3541 			/*
3542 			 * Wake up the process for a waiting DEFERRABLE transaction if we
3543 			 * now know it's either safe or conflicted.
3544 			 */
3545 			if (SxactIsDeferrableWaiting(roXact) &&
3546 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
3547 				ProcSendSignal(roXact->pid);
3548 
3549 			possibleUnsafeConflict = nextConflict;
3550 		}
3551 	}
3552 
3553 	/*
3554 	 * Check whether it's time to clean up old transactions. This can only be
3555 	 * done when the last serializable transaction with the oldest xmin among
3556 	 * serializable transactions completes.  We then find the "new oldest"
3557 	 * xmin and purge any transactions which finished before this transaction
3558 	 * was launched.
3559 	 */
3560 	needToClear = false;
3561 	if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
3562 	{
3563 		Assert(PredXact->SxactGlobalXminCount > 0);
3564 		if (--(PredXact->SxactGlobalXminCount) == 0)
3565 		{
3566 			SetNewSxactGlobalXmin();
3567 			needToClear = true;
3568 		}
3569 	}
3570 
3571 	LWLockRelease(SerializableXactHashLock);
3572 
3573 	LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3574 
3575 	/* Add this to the list of transactions to check for later cleanup. */
3576 	if (isCommit)
3577 		SHMQueueInsertBefore(FinishedSerializableTransactions,
3578 							 &MySerializableXact->finishedLink);
3579 
3580 	if (!isCommit)
3581 		ReleaseOneSerializableXact(MySerializableXact, false, false);
3582 
3583 	LWLockRelease(SerializableFinishedListLock);
3584 
3585 	if (needToClear)
3586 		ClearOldPredicateLocks();
3587 
3588 	MySerializableXact = InvalidSerializableXact;
3589 	MyXactDidWrite = false;
3590 
3591 	/* Delete per-transaction lock table */
3592 	if (LocalPredicateLockHash != NULL)
3593 	{
3594 		hash_destroy(LocalPredicateLockHash);
3595 		LocalPredicateLockHash = NULL;
3596 	}
3597 }
3598 
3599 /*
3600  * Clear old predicate locks, belonging to committed transactions that are no
3601  * longer interesting to any in-progress transaction.
3602  */
3603 static void
ClearOldPredicateLocks(void)3604 ClearOldPredicateLocks(void)
3605 {
3606 	SERIALIZABLEXACT *finishedSxact;
3607 	PREDICATELOCK *predlock;
3608 
3609 	/*
3610 	 * Loop through finished transactions. They are in commit order, so we can
3611 	 * stop as soon as we find one that's still interesting.
3612 	 */
3613 	LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3614 	finishedSxact = (SERIALIZABLEXACT *)
3615 		SHMQueueNext(FinishedSerializableTransactions,
3616 					 FinishedSerializableTransactions,
3617 					 offsetof(SERIALIZABLEXACT, finishedLink));
3618 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3619 	while (finishedSxact)
3620 	{
3621 		SERIALIZABLEXACT *nextSxact;
3622 
3623 		nextSxact = (SERIALIZABLEXACT *)
3624 			SHMQueueNext(FinishedSerializableTransactions,
3625 						 &(finishedSxact->finishedLink),
3626 						 offsetof(SERIALIZABLEXACT, finishedLink));
3627 		if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3628 			|| TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
3629 											 PredXact->SxactGlobalXmin))
3630 		{
3631 			/*
3632 			 * This transaction committed before any in-progress transaction
3633 			 * took its snapshot. It's no longer interesting.
3634 			 */
3635 			LWLockRelease(SerializableXactHashLock);
3636 			SHMQueueDelete(&(finishedSxact->finishedLink));
3637 			ReleaseOneSerializableXact(finishedSxact, false, false);
3638 			LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3639 		}
3640 		else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
3641 		   && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
3642 		{
3643 			/*
3644 			 * Any active transactions that took their snapshot before this
3645 			 * transaction committed are read-only, so we can clear part of
3646 			 * its state.
3647 			 */
3648 			LWLockRelease(SerializableXactHashLock);
3649 
3650 			if (SxactIsReadOnly(finishedSxact))
3651 			{
3652 				/* A read-only transaction can be removed entirely */
3653 				SHMQueueDelete(&(finishedSxact->finishedLink));
3654 				ReleaseOneSerializableXact(finishedSxact, false, false);
3655 			}
3656 			else
3657 			{
3658 				/*
3659 				 * A read-write transaction can only be partially cleared. We
3660 				 * need to keep the SERIALIZABLEXACT but can release the
3661 				 * SIREAD locks and conflicts in.
3662 				 */
3663 				ReleaseOneSerializableXact(finishedSxact, true, false);
3664 			}
3665 
3666 			PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
3667 			LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3668 		}
3669 		else
3670 		{
3671 			/* Still interesting. */
3672 			break;
3673 		}
3674 		finishedSxact = nextSxact;
3675 	}
3676 	LWLockRelease(SerializableXactHashLock);
3677 
3678 	/*
3679 	 * Loop through predicate locks on dummy transaction for summarized data.
3680 	 */
3681 	LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3682 	predlock = (PREDICATELOCK *)
3683 		SHMQueueNext(&OldCommittedSxact->predicateLocks,
3684 					 &OldCommittedSxact->predicateLocks,
3685 					 offsetof(PREDICATELOCK, xactLink));
3686 	while (predlock)
3687 	{
3688 		PREDICATELOCK *nextpredlock;
3689 		bool		canDoPartialCleanup;
3690 
3691 		nextpredlock = (PREDICATELOCK *)
3692 			SHMQueueNext(&OldCommittedSxact->predicateLocks,
3693 						 &predlock->xactLink,
3694 						 offsetof(PREDICATELOCK, xactLink));
3695 
3696 		LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3697 		Assert(predlock->commitSeqNo != 0);
3698 		Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3699 		canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
3700 		LWLockRelease(SerializableXactHashLock);
3701 
3702 		/*
3703 		 * If this lock originally belonged to an old enough transaction, we
3704 		 * can release it.
3705 		 */
3706 		if (canDoPartialCleanup)
3707 		{
3708 			PREDICATELOCKTAG tag;
3709 			PREDICATELOCKTARGET *target;
3710 			PREDICATELOCKTARGETTAG targettag;
3711 			uint32		targettaghash;
3712 			LWLock	   *partitionLock;
3713 
3714 			tag = predlock->tag;
3715 			target = tag.myTarget;
3716 			targettag = target->tag;
3717 			targettaghash = PredicateLockTargetTagHashCode(&targettag);
3718 			partitionLock = PredicateLockHashPartitionLock(targettaghash);
3719 
3720 			LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3721 
3722 			SHMQueueDelete(&(predlock->targetLink));
3723 			SHMQueueDelete(&(predlock->xactLink));
3724 
3725 			hash_search_with_hash_value(PredicateLockHash, &tag,
3726 								PredicateLockHashCodeFromTargetHashCode(&tag,
3727 															  targettaghash),
3728 										HASH_REMOVE, NULL);
3729 			RemoveTargetIfNoLongerUsed(target, targettaghash);
3730 
3731 			LWLockRelease(partitionLock);
3732 		}
3733 
3734 		predlock = nextpredlock;
3735 	}
3736 
3737 	LWLockRelease(SerializablePredicateLockListLock);
3738 	LWLockRelease(SerializableFinishedListLock);
3739 }
3740 
3741 /*
3742  * This is the normal way to delete anything from any of the predicate
3743  * locking hash tables.  Given a transaction which we know can be deleted:
3744  * delete all predicate locks held by that transaction and any predicate
3745  * lock targets which are now unreferenced by a lock; delete all conflicts
3746  * for the transaction; delete all xid values for the transaction; then
3747  * delete the transaction.
3748  *
3749  * When the partial flag is set, we can release all predicate locks and
3750  * in-conflict information -- we've established that there are no longer
3751  * any overlapping read write transactions for which this transaction could
3752  * matter -- but keep the transaction entry itself and any outConflicts.
3753  *
3754  * When the summarize flag is set, we've run short of room for sxact data
3755  * and must summarize to the SLRU.  Predicate locks are transferred to a
3756  * dummy "old" transaction, with duplicate locks on a single target
3757  * collapsing to a single lock with the "latest" commitSeqNo from among
3758  * the conflicting locks..
3759  */
3760 static void
ReleaseOneSerializableXact(SERIALIZABLEXACT * sxact,bool partial,bool summarize)3761 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
3762 						   bool summarize)
3763 {
3764 	PREDICATELOCK *predlock;
3765 	SERIALIZABLEXIDTAG sxidtag;
3766 	RWConflict	conflict,
3767 				nextConflict;
3768 
3769 	Assert(sxact != NULL);
3770 	Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
3771 	Assert(partial || !SxactIsOnFinishedList(sxact));
3772 	Assert(LWLockHeldByMe(SerializableFinishedListLock));
3773 
3774 	/*
3775 	 * First release all the predicate locks held by this xact (or transfer
3776 	 * them to OldCommittedSxact if summarize is true)
3777 	 */
3778 	LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3779 	predlock = (PREDICATELOCK *)
3780 		SHMQueueNext(&(sxact->predicateLocks),
3781 					 &(sxact->predicateLocks),
3782 					 offsetof(PREDICATELOCK, xactLink));
3783 	while (predlock)
3784 	{
3785 		PREDICATELOCK *nextpredlock;
3786 		PREDICATELOCKTAG tag;
3787 		SHM_QUEUE  *targetLink;
3788 		PREDICATELOCKTARGET *target;
3789 		PREDICATELOCKTARGETTAG targettag;
3790 		uint32		targettaghash;
3791 		LWLock	   *partitionLock;
3792 
3793 		nextpredlock = (PREDICATELOCK *)
3794 			SHMQueueNext(&(sxact->predicateLocks),
3795 						 &(predlock->xactLink),
3796 						 offsetof(PREDICATELOCK, xactLink));
3797 
3798 		tag = predlock->tag;
3799 		targetLink = &(predlock->targetLink);
3800 		target = tag.myTarget;
3801 		targettag = target->tag;
3802 		targettaghash = PredicateLockTargetTagHashCode(&targettag);
3803 		partitionLock = PredicateLockHashPartitionLock(targettaghash);
3804 
3805 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3806 
3807 		SHMQueueDelete(targetLink);
3808 
3809 		hash_search_with_hash_value(PredicateLockHash, &tag,
3810 								PredicateLockHashCodeFromTargetHashCode(&tag,
3811 															  targettaghash),
3812 									HASH_REMOVE, NULL);
3813 		if (summarize)
3814 		{
3815 			bool		found;
3816 
3817 			/* Fold into dummy transaction list. */
3818 			tag.myXact = OldCommittedSxact;
3819 			predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
3820 								PredicateLockHashCodeFromTargetHashCode(&tag,
3821 															  targettaghash),
3822 												   HASH_ENTER_NULL, &found);
3823 			if (!predlock)
3824 				ereport(ERROR,
3825 						(errcode(ERRCODE_OUT_OF_MEMORY),
3826 						 errmsg("out of shared memory"),
3827 						 errhint("You might need to increase max_pred_locks_per_transaction.")));
3828 			if (found)
3829 			{
3830 				Assert(predlock->commitSeqNo != 0);
3831 				Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3832 				if (predlock->commitSeqNo < sxact->commitSeqNo)
3833 					predlock->commitSeqNo = sxact->commitSeqNo;
3834 			}
3835 			else
3836 			{
3837 				SHMQueueInsertBefore(&(target->predicateLocks),
3838 									 &(predlock->targetLink));
3839 				SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
3840 									 &(predlock->xactLink));
3841 				predlock->commitSeqNo = sxact->commitSeqNo;
3842 			}
3843 		}
3844 		else
3845 			RemoveTargetIfNoLongerUsed(target, targettaghash);
3846 
3847 		LWLockRelease(partitionLock);
3848 
3849 		predlock = nextpredlock;
3850 	}
3851 
3852 	/*
3853 	 * Rather than retail removal, just re-init the head after we've run
3854 	 * through the list.
3855 	 */
3856 	SHMQueueInit(&sxact->predicateLocks);
3857 
3858 	LWLockRelease(SerializablePredicateLockListLock);
3859 
3860 	sxidtag.xid = sxact->topXid;
3861 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3862 
3863 	/* Release all outConflicts (unless 'partial' is true) */
3864 	if (!partial)
3865 	{
3866 		conflict = (RWConflict)
3867 			SHMQueueNext(&sxact->outConflicts,
3868 						 &sxact->outConflicts,
3869 						 offsetof(RWConflictData, outLink));
3870 		while (conflict)
3871 		{
3872 			nextConflict = (RWConflict)
3873 				SHMQueueNext(&sxact->outConflicts,
3874 							 &conflict->outLink,
3875 							 offsetof(RWConflictData, outLink));
3876 			if (summarize)
3877 				conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
3878 			ReleaseRWConflict(conflict);
3879 			conflict = nextConflict;
3880 		}
3881 	}
3882 
3883 	/* Release all inConflicts. */
3884 	conflict = (RWConflict)
3885 		SHMQueueNext(&sxact->inConflicts,
3886 					 &sxact->inConflicts,
3887 					 offsetof(RWConflictData, inLink));
3888 	while (conflict)
3889 	{
3890 		nextConflict = (RWConflict)
3891 			SHMQueueNext(&sxact->inConflicts,
3892 						 &conflict->inLink,
3893 						 offsetof(RWConflictData, inLink));
3894 		if (summarize)
3895 			conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3896 		ReleaseRWConflict(conflict);
3897 		conflict = nextConflict;
3898 	}
3899 
3900 	/* Finally, get rid of the xid and the record of the transaction itself. */
3901 	if (!partial)
3902 	{
3903 		if (sxidtag.xid != InvalidTransactionId)
3904 			hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
3905 		ReleasePredXact(sxact);
3906 	}
3907 
3908 	LWLockRelease(SerializableXactHashLock);
3909 }
3910 
3911 /*
3912  * Tests whether the given top level transaction is concurrent with
3913  * (overlaps) our current transaction.
3914  *
3915  * We need to identify the top level transaction for SSI, anyway, so pass
3916  * that to this function to save the overhead of checking the snapshot's
3917  * subxip array.
3918  */
3919 static bool
XidIsConcurrent(TransactionId xid)3920 XidIsConcurrent(TransactionId xid)
3921 {
3922 	Snapshot	snap;
3923 	uint32		i;
3924 
3925 	Assert(TransactionIdIsValid(xid));
3926 	Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
3927 
3928 	snap = GetTransactionSnapshot();
3929 
3930 	if (TransactionIdPrecedes(xid, snap->xmin))
3931 		return false;
3932 
3933 	if (TransactionIdFollowsOrEquals(xid, snap->xmax))
3934 		return true;
3935 
3936 	for (i = 0; i < snap->xcnt; i++)
3937 	{
3938 		if (xid == snap->xip[i])
3939 			return true;
3940 	}
3941 
3942 	return false;
3943 }
3944 
3945 /*
3946  * CheckForSerializableConflictOut
3947  *		We are reading a tuple which has been modified.  If it is visible to
3948  *		us but has been deleted, that indicates a rw-conflict out.  If it's
3949  *		not visible and was created by a concurrent (overlapping)
3950  *		serializable transaction, that is also a rw-conflict out,
3951  *
3952  * We will determine the top level xid of the writing transaction with which
3953  * we may be in conflict, and check for overlap with our own transaction.
3954  * If the transactions overlap (i.e., they cannot see each other's writes),
3955  * then we have a conflict out.
3956  *
3957  * This function should be called just about anywhere in heapam.c where a
3958  * tuple has been read. The caller must hold at least a shared lock on the
3959  * buffer, because this function might set hint bits on the tuple. There is
3960  * currently no known reason to call this function from an index AM.
3961  */
3962 void
CheckForSerializableConflictOut(bool visible,Relation relation,HeapTuple tuple,Buffer buffer,Snapshot snapshot)3963 CheckForSerializableConflictOut(bool visible, Relation relation,
3964 								HeapTuple tuple, Buffer buffer,
3965 								Snapshot snapshot)
3966 {
3967 	TransactionId xid;
3968 	SERIALIZABLEXIDTAG sxidtag;
3969 	SERIALIZABLEXID *sxid;
3970 	SERIALIZABLEXACT *sxact;
3971 	HTSV_Result htsvResult;
3972 
3973 	if (!SerializationNeededForRead(relation, snapshot))
3974 		return;
3975 
3976 	/* Check if someone else has already decided that we need to die */
3977 	if (SxactIsDoomed(MySerializableXact))
3978 	{
3979 		ereport(ERROR,
3980 				(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3981 				 errmsg("could not serialize access due to read/write dependencies among transactions"),
3982 				 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."),
3983 				 errhint("The transaction might succeed if retried.")));
3984 	}
3985 
3986 	/*
3987 	 * Check to see whether the tuple has been written to by a concurrent
3988 	 * transaction, either to create it not visible to us, or to delete it
3989 	 * while it is visible to us.  The "visible" bool indicates whether the
3990 	 * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
3991 	 * is going on with it.
3992 	 *
3993 	 * In the event of a concurrently inserted tuple that also happens to have
3994 	 * been concurrently updated (by a separate transaction), the xmin of the
3995 	 * tuple will be used -- not the updater's xid.
3996 	 */
3997 	htsvResult = HeapTupleSatisfiesVacuum(tuple, TransactionXmin, buffer);
3998 	switch (htsvResult)
3999 	{
4000 		case HEAPTUPLE_LIVE:
4001 			if (visible)
4002 				return;
4003 			xid = HeapTupleHeaderGetXmin(tuple->t_data);
4004 			break;
4005 		case HEAPTUPLE_RECENTLY_DEAD:
4006 		case HEAPTUPLE_DELETE_IN_PROGRESS:
4007 			if (visible)
4008 				xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
4009 			else
4010 				xid = HeapTupleHeaderGetXmin(tuple->t_data);
4011 
4012 			if (TransactionIdPrecedes(xid, TransactionXmin))
4013 			{
4014 				/* This is like the HEAPTUPLE_DEAD case */
4015 				Assert(!visible);
4016 				return;
4017 			}
4018 			break;
4019 		case HEAPTUPLE_INSERT_IN_PROGRESS:
4020 			xid = HeapTupleHeaderGetXmin(tuple->t_data);
4021 			break;
4022 		case HEAPTUPLE_DEAD:
4023 			Assert(!visible);
4024 			return;
4025 		default:
4026 
4027 			/*
4028 			 * The only way to get to this default clause is if a new value is
4029 			 * added to the enum type without adding it to this switch
4030 			 * statement.  That's a bug, so elog.
4031 			 */
4032 			elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
4033 
4034 			/*
4035 			 * In spite of having all enum values covered and calling elog on
4036 			 * this default, some compilers think this is a code path which
4037 			 * allows xid to be used below without initialization. Silence
4038 			 * that warning.
4039 			 */
4040 			xid = InvalidTransactionId;
4041 	}
4042 	Assert(TransactionIdIsValid(xid));
4043 	Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
4044 
4045 	/*
4046 	 * Find top level xid.  Bail out if xid is too early to be a conflict, or
4047 	 * if it's our own xid.
4048 	 */
4049 	if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
4050 		return;
4051 	xid = SubTransGetTopmostTransaction(xid);
4052 	if (TransactionIdPrecedes(xid, TransactionXmin))
4053 		return;
4054 	if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
4055 		return;
4056 
4057 	/*
4058 	 * Find sxact or summarized info for the top level xid.
4059 	 */
4060 	sxidtag.xid = xid;
4061 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4062 	sxid = (SERIALIZABLEXID *)
4063 		hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4064 	if (!sxid)
4065 	{
4066 		/*
4067 		 * Transaction not found in "normal" SSI structures.  Check whether it
4068 		 * got pushed out to SLRU storage for "old committed" transactions.
4069 		 */
4070 		SerCommitSeqNo conflictCommitSeqNo;
4071 
4072 		conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
4073 		if (conflictCommitSeqNo != 0)
4074 		{
4075 			if (conflictCommitSeqNo != InvalidSerCommitSeqNo
4076 				&& (!SxactIsReadOnly(MySerializableXact)
4077 					|| conflictCommitSeqNo
4078 					<= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
4079 				ereport(ERROR,
4080 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4081 						 errmsg("could not serialize access due to read/write dependencies among transactions"),
4082 						 errdetail_internal("Reason code: Canceled on conflict out to old pivot %u.", xid),
4083 					  errhint("The transaction might succeed if retried.")));
4084 
4085 			if (SxactHasSummaryConflictIn(MySerializableXact)
4086 				|| !SHMQueueEmpty(&MySerializableXact->inConflicts))
4087 				ereport(ERROR,
4088 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4089 						 errmsg("could not serialize access due to read/write dependencies among transactions"),
4090 						 errdetail_internal("Reason code: Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
4091 					  errhint("The transaction might succeed if retried.")));
4092 
4093 			MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4094 		}
4095 
4096 		/* It's not serializable or otherwise not important. */
4097 		LWLockRelease(SerializableXactHashLock);
4098 		return;
4099 	}
4100 	sxact = sxid->myXact;
4101 	Assert(TransactionIdEquals(sxact->topXid, xid));
4102 	if (sxact == MySerializableXact || SxactIsDoomed(sxact))
4103 	{
4104 		/* Can't conflict with ourself or a transaction that will roll back. */
4105 		LWLockRelease(SerializableXactHashLock);
4106 		return;
4107 	}
4108 
4109 	/*
4110 	 * We have a conflict out to a transaction which has a conflict out to a
4111 	 * summarized transaction.  That summarized transaction must have
4112 	 * committed first, and we can't tell when it committed in relation to our
4113 	 * snapshot acquisition, so something needs to be canceled.
4114 	 */
4115 	if (SxactHasSummaryConflictOut(sxact))
4116 	{
4117 		if (!SxactIsPrepared(sxact))
4118 		{
4119 			sxact->flags |= SXACT_FLAG_DOOMED;
4120 			LWLockRelease(SerializableXactHashLock);
4121 			return;
4122 		}
4123 		else
4124 		{
4125 			LWLockRelease(SerializableXactHashLock);
4126 			ereport(ERROR,
4127 					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4128 					 errmsg("could not serialize access due to read/write dependencies among transactions"),
4129 					 errdetail_internal("Reason code: Canceled on conflict out to old pivot."),
4130 					 errhint("The transaction might succeed if retried.")));
4131 		}
4132 	}
4133 
4134 	/*
4135 	 * If this is a read-only transaction and the writing transaction has
4136 	 * committed, and it doesn't have a rw-conflict to a transaction which
4137 	 * committed before it, no conflict.
4138 	 */
4139 	if (SxactIsReadOnly(MySerializableXact)
4140 		&& SxactIsCommitted(sxact)
4141 		&& !SxactHasSummaryConflictOut(sxact)
4142 		&& (!SxactHasConflictOut(sxact)
4143 			|| MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
4144 	{
4145 		/* Read-only transaction will appear to run first.  No conflict. */
4146 		LWLockRelease(SerializableXactHashLock);
4147 		return;
4148 	}
4149 
4150 	if (!XidIsConcurrent(xid))
4151 	{
4152 		/* This write was already in our snapshot; no conflict. */
4153 		LWLockRelease(SerializableXactHashLock);
4154 		return;
4155 	}
4156 
4157 	if (RWConflictExists(MySerializableXact, sxact))
4158 	{
4159 		/* We don't want duplicate conflict records in the list. */
4160 		LWLockRelease(SerializableXactHashLock);
4161 		return;
4162 	}
4163 
4164 	/*
4165 	 * Flag the conflict.  But first, if this conflict creates a dangerous
4166 	 * structure, ereport an error.
4167 	 */
4168 	FlagRWConflict(MySerializableXact, sxact);
4169 	LWLockRelease(SerializableXactHashLock);
4170 }
4171 
4172 /*
4173  * Check a particular target for rw-dependency conflict in. A subroutine of
4174  * CheckForSerializableConflictIn().
4175  */
4176 static void
CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG * targettag)4177 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
4178 {
4179 	uint32		targettaghash;
4180 	LWLock	   *partitionLock;
4181 	PREDICATELOCKTARGET *target;
4182 	PREDICATELOCK *predlock;
4183 	PREDICATELOCK *mypredlock = NULL;
4184 	PREDICATELOCKTAG mypredlocktag;
4185 
4186 	Assert(MySerializableXact != InvalidSerializableXact);
4187 
4188 	/*
4189 	 * The same hash and LW lock apply to the lock target and the lock itself.
4190 	 */
4191 	targettaghash = PredicateLockTargetTagHashCode(targettag);
4192 	partitionLock = PredicateLockHashPartitionLock(targettaghash);
4193 	LWLockAcquire(partitionLock, LW_SHARED);
4194 	target = (PREDICATELOCKTARGET *)
4195 		hash_search_with_hash_value(PredicateLockTargetHash,
4196 									targettag, targettaghash,
4197 									HASH_FIND, NULL);
4198 	if (!target)
4199 	{
4200 		/* Nothing has this target locked; we're done here. */
4201 		LWLockRelease(partitionLock);
4202 		return;
4203 	}
4204 
4205 	/*
4206 	 * Each lock for an overlapping transaction represents a conflict: a
4207 	 * rw-dependency in to this transaction.
4208 	 */
4209 	predlock = (PREDICATELOCK *)
4210 		SHMQueueNext(&(target->predicateLocks),
4211 					 &(target->predicateLocks),
4212 					 offsetof(PREDICATELOCK, targetLink));
4213 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4214 	while (predlock)
4215 	{
4216 		SHM_QUEUE  *predlocktargetlink;
4217 		PREDICATELOCK *nextpredlock;
4218 		SERIALIZABLEXACT *sxact;
4219 
4220 		predlocktargetlink = &(predlock->targetLink);
4221 		nextpredlock = (PREDICATELOCK *)
4222 			SHMQueueNext(&(target->predicateLocks),
4223 						 predlocktargetlink,
4224 						 offsetof(PREDICATELOCK, targetLink));
4225 
4226 		sxact = predlock->tag.myXact;
4227 		if (sxact == MySerializableXact)
4228 		{
4229 			/*
4230 			 * If we're getting a write lock on a tuple, we don't need a
4231 			 * predicate (SIREAD) lock on the same tuple. We can safely remove
4232 			 * our SIREAD lock, but we'll defer doing so until after the loop
4233 			 * because that requires upgrading to an exclusive partition lock.
4234 			 *
4235 			 * We can't use this optimization within a subtransaction because
4236 			 * the subtransaction could roll back, and we would be left
4237 			 * without any lock at the top level.
4238 			 */
4239 			if (!IsSubTransaction()
4240 				&& GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
4241 			{
4242 				mypredlock = predlock;
4243 				mypredlocktag = predlock->tag;
4244 			}
4245 		}
4246 		else if (!SxactIsDoomed(sxact)
4247 				 && (!SxactIsCommitted(sxact)
4248 					 || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
4249 											  sxact->finishedBefore))
4250 				 && !RWConflictExists(sxact, MySerializableXact))
4251 		{
4252 			LWLockRelease(SerializableXactHashLock);
4253 			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4254 
4255 			/*
4256 			 * Re-check after getting exclusive lock because the other
4257 			 * transaction may have flagged a conflict.
4258 			 */
4259 			if (!SxactIsDoomed(sxact)
4260 				&& (!SxactIsCommitted(sxact)
4261 					|| TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
4262 											 sxact->finishedBefore))
4263 				&& !RWConflictExists(sxact, MySerializableXact))
4264 			{
4265 				FlagRWConflict(sxact, MySerializableXact);
4266 			}
4267 
4268 			LWLockRelease(SerializableXactHashLock);
4269 			LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4270 		}
4271 
4272 		predlock = nextpredlock;
4273 	}
4274 	LWLockRelease(SerializableXactHashLock);
4275 	LWLockRelease(partitionLock);
4276 
4277 	/*
4278 	 * If we found one of our own SIREAD locks to remove, remove it now.
4279 	 *
4280 	 * At this point our transaction already has an ExclusiveRowLock on the
4281 	 * relation, so we are OK to drop the predicate lock on the tuple, if
4282 	 * found, without fearing that another write against the tuple will occur
4283 	 * before the MVCC information makes it to the buffer.
4284 	 */
4285 	if (mypredlock != NULL)
4286 	{
4287 		uint32		predlockhashcode;
4288 		PREDICATELOCK *rmpredlock;
4289 
4290 		LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4291 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4292 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4293 
4294 		/*
4295 		 * Remove the predicate lock from shared memory, if it wasn't removed
4296 		 * while the locks were released.  One way that could happen is from
4297 		 * autovacuum cleaning up an index.
4298 		 */
4299 		predlockhashcode = PredicateLockHashCodeFromTargetHashCode
4300 			(&mypredlocktag, targettaghash);
4301 		rmpredlock = (PREDICATELOCK *)
4302 			hash_search_with_hash_value(PredicateLockHash,
4303 										&mypredlocktag,
4304 										predlockhashcode,
4305 										HASH_FIND, NULL);
4306 		if (rmpredlock != NULL)
4307 		{
4308 			Assert(rmpredlock == mypredlock);
4309 
4310 			SHMQueueDelete(&(mypredlock->targetLink));
4311 			SHMQueueDelete(&(mypredlock->xactLink));
4312 
4313 			rmpredlock = (PREDICATELOCK *)
4314 				hash_search_with_hash_value(PredicateLockHash,
4315 											&mypredlocktag,
4316 											predlockhashcode,
4317 											HASH_REMOVE, NULL);
4318 			Assert(rmpredlock == mypredlock);
4319 
4320 			RemoveTargetIfNoLongerUsed(target, targettaghash);
4321 		}
4322 
4323 		LWLockRelease(SerializableXactHashLock);
4324 		LWLockRelease(partitionLock);
4325 		LWLockRelease(SerializablePredicateLockListLock);
4326 
4327 		if (rmpredlock != NULL)
4328 		{
4329 			/*
4330 			 * Remove entry in local lock table if it exists. It's OK if it
4331 			 * doesn't exist; that means the lock was transferred to a new
4332 			 * target by a different backend.
4333 			 */
4334 			hash_search_with_hash_value(LocalPredicateLockHash,
4335 										targettag, targettaghash,
4336 										HASH_REMOVE, NULL);
4337 
4338 			DecrementParentLocks(targettag);
4339 		}
4340 	}
4341 }
4342 
4343 /*
4344  * CheckForSerializableConflictIn
4345  *		We are writing the given tuple.  If that indicates a rw-conflict
4346  *		in from another serializable transaction, take appropriate action.
4347  *
4348  * Skip checking for any granularity for which a parameter is missing.
4349  *
4350  * A tuple update or delete is in conflict if we have a predicate lock
4351  * against the relation or page in which the tuple exists, or against the
4352  * tuple itself.
4353  */
4354 void
CheckForSerializableConflictIn(Relation relation,HeapTuple tuple,Buffer buffer)4355 CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
4356 							   Buffer buffer)
4357 {
4358 	PREDICATELOCKTARGETTAG targettag;
4359 
4360 	if (!SerializationNeededForWrite(relation))
4361 		return;
4362 
4363 	/* Check if someone else has already decided that we need to die */
4364 	if (SxactIsDoomed(MySerializableXact))
4365 		ereport(ERROR,
4366 				(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4367 				 errmsg("could not serialize access due to read/write dependencies among transactions"),
4368 				 errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
4369 				 errhint("The transaction might succeed if retried.")));
4370 
4371 	/*
4372 	 * We're doing a write which might cause rw-conflicts now or later.
4373 	 * Memorize that fact.
4374 	 */
4375 	MyXactDidWrite = true;
4376 
4377 	/*
4378 	 * It is important that we check for locks from the finest granularity to
4379 	 * the coarsest granularity, so that granularity promotion doesn't cause
4380 	 * us to miss a lock.  The new (coarser) lock will be acquired before the
4381 	 * old (finer) locks are released.
4382 	 *
4383 	 * It is not possible to take and hold a lock across the checks for all
4384 	 * granularities because each target could be in a separate partition.
4385 	 */
4386 	if (tuple != NULL)
4387 	{
4388 		SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
4389 										 relation->rd_node.dbNode,
4390 										 relation->rd_id,
4391 								 ItemPointerGetBlockNumber(&(tuple->t_self)),
4392 							   ItemPointerGetOffsetNumber(&(tuple->t_self)));
4393 		CheckTargetForConflictsIn(&targettag);
4394 	}
4395 
4396 	if (BufferIsValid(buffer))
4397 	{
4398 		SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
4399 										relation->rd_node.dbNode,
4400 										relation->rd_id,
4401 										BufferGetBlockNumber(buffer));
4402 		CheckTargetForConflictsIn(&targettag);
4403 	}
4404 
4405 	SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
4406 										relation->rd_node.dbNode,
4407 										relation->rd_id);
4408 	CheckTargetForConflictsIn(&targettag);
4409 }
4410 
4411 /*
4412  * CheckTableForSerializableConflictIn
4413  *		The entire table is going through a DDL-style logical mass delete
4414  *		like TRUNCATE or DROP TABLE.  If that causes a rw-conflict in from
4415  *		another serializable transaction, take appropriate action.
4416  *
4417  * While these operations do not operate entirely within the bounds of
4418  * snapshot isolation, they can occur inside a serializable transaction, and
4419  * will logically occur after any reads which saw rows which were destroyed
4420  * by these operations, so we do what we can to serialize properly under
4421  * SSI.
4422  *
4423  * The relation passed in must be a heap relation. Any predicate lock of any
4424  * granularity on the heap will cause a rw-conflict in to this transaction.
4425  * Predicate locks on indexes do not matter because they only exist to guard
4426  * against conflicting inserts into the index, and this is a mass *delete*.
4427  * When a table is truncated or dropped, the index will also be truncated
4428  * or dropped, and we'll deal with locks on the index when that happens.
4429  *
4430  * Dropping or truncating a table also needs to drop any existing predicate
4431  * locks on heap tuples or pages, because they're about to go away. This
4432  * should be done before altering the predicate locks because the transaction
4433  * could be rolled back because of a conflict, in which case the lock changes
4434  * are not needed. (At the moment, we don't actually bother to drop the
4435  * existing locks on a dropped or truncated table at the moment. That might
4436  * lead to some false positives, but it doesn't seem worth the trouble.)
4437  */
4438 void
CheckTableForSerializableConflictIn(Relation relation)4439 CheckTableForSerializableConflictIn(Relation relation)
4440 {
4441 	HASH_SEQ_STATUS seqstat;
4442 	PREDICATELOCKTARGET *target;
4443 	Oid			dbId;
4444 	Oid			heapId;
4445 	int			i;
4446 
4447 	/*
4448 	 * Bail out quickly if there are no serializable transactions running.
4449 	 * It's safe to check this without taking locks because the caller is
4450 	 * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
4451 	 * would matter here can be acquired while that is held.
4452 	 */
4453 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
4454 		return;
4455 
4456 	if (!SerializationNeededForWrite(relation))
4457 		return;
4458 
4459 	/*
4460 	 * We're doing a write which might cause rw-conflicts now or later.
4461 	 * Memorize that fact.
4462 	 */
4463 	MyXactDidWrite = true;
4464 
4465 	Assert(relation->rd_index == NULL); /* not an index relation */
4466 
4467 	dbId = relation->rd_node.dbNode;
4468 	heapId = relation->rd_id;
4469 
4470 	LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
4471 	for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
4472 		LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
4473 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4474 
4475 	/* Scan through target list */
4476 	hash_seq_init(&seqstat, PredicateLockTargetHash);
4477 
4478 	while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
4479 	{
4480 		PREDICATELOCK *predlock;
4481 
4482 		/*
4483 		 * Check whether this is a target which needs attention.
4484 		 */
4485 		if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId)
4486 			continue;			/* wrong relation id */
4487 		if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
4488 			continue;			/* wrong database id */
4489 
4490 		/*
4491 		 * Loop through locks for this target and flag conflicts.
4492 		 */
4493 		predlock = (PREDICATELOCK *)
4494 			SHMQueueNext(&(target->predicateLocks),
4495 						 &(target->predicateLocks),
4496 						 offsetof(PREDICATELOCK, targetLink));
4497 		while (predlock)
4498 		{
4499 			PREDICATELOCK *nextpredlock;
4500 
4501 			nextpredlock = (PREDICATELOCK *)
4502 				SHMQueueNext(&(target->predicateLocks),
4503 							 &(predlock->targetLink),
4504 							 offsetof(PREDICATELOCK, targetLink));
4505 
4506 			if (predlock->tag.myXact != MySerializableXact
4507 			  && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
4508 			{
4509 				FlagRWConflict(predlock->tag.myXact, MySerializableXact);
4510 			}
4511 
4512 			predlock = nextpredlock;
4513 		}
4514 	}
4515 
4516 	/* Release locks in reverse order */
4517 	LWLockRelease(SerializableXactHashLock);
4518 	for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
4519 		LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
4520 	LWLockRelease(SerializablePredicateLockListLock);
4521 }
4522 
4523 
4524 /*
4525  * Flag a rw-dependency between two serializable transactions.
4526  *
4527  * The caller is responsible for ensuring that we have a LW lock on
4528  * the transaction hash table.
4529  */
4530 static void
FlagRWConflict(SERIALIZABLEXACT * reader,SERIALIZABLEXACT * writer)4531 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
4532 {
4533 	Assert(reader != writer);
4534 
4535 	/* First, see if this conflict causes failure. */
4536 	OnConflict_CheckForSerializationFailure(reader, writer);
4537 
4538 	/* Actually do the conflict flagging. */
4539 	if (reader == OldCommittedSxact)
4540 		writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4541 	else if (writer == OldCommittedSxact)
4542 		reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4543 	else
4544 		SetRWConflict(reader, writer);
4545 }
4546 
4547 /*----------------------------------------------------------------------------
4548  * We are about to add a RW-edge to the dependency graph - check that we don't
4549  * introduce a dangerous structure by doing so, and abort one of the
4550  * transactions if so.
4551  *
4552  * A serialization failure can only occur if there is a dangerous structure
4553  * in the dependency graph:
4554  *
4555  *		Tin ------> Tpivot ------> Tout
4556  *			  rw			 rw
4557  *
4558  * Furthermore, Tout must commit first.
4559  *
4560  * One more optimization is that if Tin is declared READ ONLY (or commits
4561  * without writing), we can only have a problem if Tout committed before Tin
4562  * acquired its snapshot.
4563  *----------------------------------------------------------------------------
4564  */
4565 static void
OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT * reader,SERIALIZABLEXACT * writer)4566 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
4567 										SERIALIZABLEXACT *writer)
4568 {
4569 	bool		failure;
4570 	RWConflict	conflict;
4571 
4572 	Assert(LWLockHeldByMe(SerializableXactHashLock));
4573 
4574 	failure = false;
4575 
4576 	/*------------------------------------------------------------------------
4577 	 * Check for already-committed writer with rw-conflict out flagged
4578 	 * (conflict-flag on W means that T2 committed before W):
4579 	 *
4580 	 *		R ------> W ------> T2
4581 	 *			rw		  rw
4582 	 *
4583 	 * That is a dangerous structure, so we must abort. (Since the writer
4584 	 * has already committed, we must be the reader)
4585 	 *------------------------------------------------------------------------
4586 	 */
4587 	if (SxactIsCommitted(writer)
4588 	  && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
4589 		failure = true;
4590 
4591 	/*------------------------------------------------------------------------
4592 	 * Check whether the writer has become a pivot with an out-conflict
4593 	 * committed transaction (T2), and T2 committed first:
4594 	 *
4595 	 *		R ------> W ------> T2
4596 	 *			rw		  rw
4597 	 *
4598 	 * Because T2 must've committed first, there is no anomaly if:
4599 	 * - the reader committed before T2
4600 	 * - the writer committed before T2
4601 	 * - the reader is a READ ONLY transaction and the reader was concurrent
4602 	 *	 with T2 (= reader acquired its snapshot before T2 committed)
4603 	 *
4604 	 * We also handle the case that T2 is prepared but not yet committed
4605 	 * here. In that case T2 has already checked for conflicts, so if it
4606 	 * commits first, making the above conflict real, it's too late for it
4607 	 * to abort.
4608 	 *------------------------------------------------------------------------
4609 	 */
4610 	if (!failure)
4611 	{
4612 		if (SxactHasSummaryConflictOut(writer))
4613 		{
4614 			failure = true;
4615 			conflict = NULL;
4616 		}
4617 		else
4618 			conflict = (RWConflict)
4619 				SHMQueueNext(&writer->outConflicts,
4620 							 &writer->outConflicts,
4621 							 offsetof(RWConflictData, outLink));
4622 		while (conflict)
4623 		{
4624 			SERIALIZABLEXACT *t2 = conflict->sxactIn;
4625 
4626 			if (SxactIsPrepared(t2)
4627 				&& (!SxactIsCommitted(reader)
4628 					|| t2->prepareSeqNo <= reader->commitSeqNo)
4629 				&& (!SxactIsCommitted(writer)
4630 					|| t2->prepareSeqNo <= writer->commitSeqNo)
4631 				&& (!SxactIsReadOnly(reader)
4632 			  || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
4633 			{
4634 				failure = true;
4635 				break;
4636 			}
4637 			conflict = (RWConflict)
4638 				SHMQueueNext(&writer->outConflicts,
4639 							 &conflict->outLink,
4640 							 offsetof(RWConflictData, outLink));
4641 		}
4642 	}
4643 
4644 	/*------------------------------------------------------------------------
4645 	 * Check whether the reader has become a pivot with a writer
4646 	 * that's committed (or prepared):
4647 	 *
4648 	 *		T0 ------> R ------> W
4649 	 *			 rw		   rw
4650 	 *
4651 	 * Because W must've committed first for an anomaly to occur, there is no
4652 	 * anomaly if:
4653 	 * - T0 committed before the writer
4654 	 * - T0 is READ ONLY, and overlaps the writer
4655 	 *------------------------------------------------------------------------
4656 	 */
4657 	if (!failure && SxactIsPrepared(writer) && !SxactIsReadOnly(reader))
4658 	{
4659 		if (SxactHasSummaryConflictIn(reader))
4660 		{
4661 			failure = true;
4662 			conflict = NULL;
4663 		}
4664 		else
4665 			conflict = (RWConflict)
4666 				SHMQueueNext(&reader->inConflicts,
4667 							 &reader->inConflicts,
4668 							 offsetof(RWConflictData, inLink));
4669 		while (conflict)
4670 		{
4671 			SERIALIZABLEXACT *t0 = conflict->sxactOut;
4672 
4673 			if (!SxactIsDoomed(t0)
4674 				&& (!SxactIsCommitted(t0)
4675 					|| t0->commitSeqNo >= writer->prepareSeqNo)
4676 				&& (!SxactIsReadOnly(t0)
4677 			  || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
4678 			{
4679 				failure = true;
4680 				break;
4681 			}
4682 			conflict = (RWConflict)
4683 				SHMQueueNext(&reader->inConflicts,
4684 							 &conflict->inLink,
4685 							 offsetof(RWConflictData, inLink));
4686 		}
4687 	}
4688 
4689 	if (failure)
4690 	{
4691 		/*
4692 		 * We have to kill a transaction to avoid a possible anomaly from
4693 		 * occurring. If the writer is us, we can just ereport() to cause a
4694 		 * transaction abort. Otherwise we flag the writer for termination,
4695 		 * causing it to abort when it tries to commit. However, if the writer
4696 		 * is a prepared transaction, already prepared, we can't abort it
4697 		 * anymore, so we have to kill the reader instead.
4698 		 */
4699 		if (MySerializableXact == writer)
4700 		{
4701 			LWLockRelease(SerializableXactHashLock);
4702 			ereport(ERROR,
4703 					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4704 					 errmsg("could not serialize access due to read/write dependencies among transactions"),
4705 					 errdetail_internal("Reason code: Canceled on identification as a pivot, during write."),
4706 					 errhint("The transaction might succeed if retried.")));
4707 		}
4708 		else if (SxactIsPrepared(writer))
4709 		{
4710 			LWLockRelease(SerializableXactHashLock);
4711 
4712 			/* if we're not the writer, we have to be the reader */
4713 			Assert(MySerializableXact == reader);
4714 			ereport(ERROR,
4715 					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4716 					 errmsg("could not serialize access due to read/write dependencies among transactions"),
4717 					 errdetail_internal("Reason code: Canceled on conflict out to pivot %u, during read.", writer->topXid),
4718 					 errhint("The transaction might succeed if retried.")));
4719 		}
4720 		writer->flags |= SXACT_FLAG_DOOMED;
4721 	}
4722 }
4723 
4724 /*
4725  * PreCommit_CheckForSerializableConflicts
4726  *		Check for dangerous structures in a serializable transaction
4727  *		at commit.
4728  *
4729  * We're checking for a dangerous structure as each conflict is recorded.
4730  * The only way we could have a problem at commit is if this is the "out"
4731  * side of a pivot, and neither the "in" side nor the pivot has yet
4732  * committed.
4733  *
4734  * If a dangerous structure is found, the pivot (the near conflict) is
4735  * marked for death, because rolling back another transaction might mean
4736  * that we flail without ever making progress.  This transaction is
4737  * committing writes, so letting it commit ensures progress.  If we
4738  * canceled the far conflict, it might immediately fail again on retry.
4739  */
4740 void
PreCommit_CheckForSerializationFailure(void)4741 PreCommit_CheckForSerializationFailure(void)
4742 {
4743 	RWConflict	nearConflict;
4744 
4745 	if (MySerializableXact == InvalidSerializableXact)
4746 		return;
4747 
4748 	Assert(IsolationIsSerializable());
4749 
4750 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4751 
4752 	/* Check if someone else has already decided that we need to die */
4753 	if (SxactIsDoomed(MySerializableXact))
4754 	{
4755 		LWLockRelease(SerializableXactHashLock);
4756 		ereport(ERROR,
4757 				(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4758 				 errmsg("could not serialize access due to read/write dependencies among transactions"),
4759 				 errdetail_internal("Reason code: Canceled on identification as a pivot, during commit attempt."),
4760 				 errhint("The transaction might succeed if retried.")));
4761 	}
4762 
4763 	nearConflict = (RWConflict)
4764 		SHMQueueNext(&MySerializableXact->inConflicts,
4765 					 &MySerializableXact->inConflicts,
4766 					 offsetof(RWConflictData, inLink));
4767 	while (nearConflict)
4768 	{
4769 		if (!SxactIsCommitted(nearConflict->sxactOut)
4770 			&& !SxactIsDoomed(nearConflict->sxactOut))
4771 		{
4772 			RWConflict	farConflict;
4773 
4774 			farConflict = (RWConflict)
4775 				SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4776 							 &nearConflict->sxactOut->inConflicts,
4777 							 offsetof(RWConflictData, inLink));
4778 			while (farConflict)
4779 			{
4780 				if (farConflict->sxactOut == MySerializableXact
4781 					|| (!SxactIsCommitted(farConflict->sxactOut)
4782 						&& !SxactIsReadOnly(farConflict->sxactOut)
4783 						&& !SxactIsDoomed(farConflict->sxactOut)))
4784 				{
4785 					/*
4786 					 * Normally, we kill the pivot transaction to make sure we
4787 					 * make progress if the failing transaction is retried.
4788 					 * However, we can't kill it if it's already prepared, so
4789 					 * in that case we commit suicide instead.
4790 					 */
4791 					if (SxactIsPrepared(nearConflict->sxactOut))
4792 					{
4793 						LWLockRelease(SerializableXactHashLock);
4794 						ereport(ERROR,
4795 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4796 								 errmsg("could not serialize access due to read/write dependencies among transactions"),
4797 								 errdetail_internal("Reason code: Canceled on commit attempt with conflict in from prepared pivot."),
4798 								 errhint("The transaction might succeed if retried.")));
4799 					}
4800 					nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
4801 					break;
4802 				}
4803 				farConflict = (RWConflict)
4804 					SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4805 								 &farConflict->inLink,
4806 								 offsetof(RWConflictData, inLink));
4807 			}
4808 		}
4809 
4810 		nearConflict = (RWConflict)
4811 			SHMQueueNext(&MySerializableXact->inConflicts,
4812 						 &nearConflict->inLink,
4813 						 offsetof(RWConflictData, inLink));
4814 	}
4815 
4816 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
4817 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
4818 
4819 	LWLockRelease(SerializableXactHashLock);
4820 }
4821 
4822 /*------------------------------------------------------------------------*/
4823 
4824 /*
4825  * Two-phase commit support
4826  */
4827 
4828 /*
4829  * AtPrepare_Locks
4830  *		Do the preparatory work for a PREPARE: make 2PC state file
4831  *		records for all predicate locks currently held.
4832  */
4833 void
AtPrepare_PredicateLocks(void)4834 AtPrepare_PredicateLocks(void)
4835 {
4836 	PREDICATELOCK *predlock;
4837 	SERIALIZABLEXACT *sxact;
4838 	TwoPhasePredicateRecord record;
4839 	TwoPhasePredicateXactRecord *xactRecord;
4840 	TwoPhasePredicateLockRecord *lockRecord;
4841 
4842 	sxact = MySerializableXact;
4843 	xactRecord = &(record.data.xactRecord);
4844 	lockRecord = &(record.data.lockRecord);
4845 
4846 	if (MySerializableXact == InvalidSerializableXact)
4847 		return;
4848 
4849 	/* Generate an xact record for our SERIALIZABLEXACT */
4850 	record.type = TWOPHASEPREDICATERECORD_XACT;
4851 	xactRecord->xmin = MySerializableXact->xmin;
4852 	xactRecord->flags = MySerializableXact->flags;
4853 
4854 	/*
4855 	 * Note that we don't include the list of conflicts in our out in the
4856 	 * statefile, because new conflicts can be added even after the
4857 	 * transaction prepares. We'll just make a conservative assumption during
4858 	 * recovery instead.
4859 	 */
4860 
4861 	RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4862 						   &record, sizeof(record));
4863 
4864 	/*
4865 	 * Generate a lock record for each lock.
4866 	 *
4867 	 * To do this, we need to walk the predicate lock list in our sxact rather
4868 	 * than using the local predicate lock table because the latter is not
4869 	 * guaranteed to be accurate.
4870 	 */
4871 	LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4872 
4873 	predlock = (PREDICATELOCK *)
4874 		SHMQueueNext(&(sxact->predicateLocks),
4875 					 &(sxact->predicateLocks),
4876 					 offsetof(PREDICATELOCK, xactLink));
4877 
4878 	while (predlock != NULL)
4879 	{
4880 		record.type = TWOPHASEPREDICATERECORD_LOCK;
4881 		lockRecord->target = predlock->tag.myTarget->tag;
4882 
4883 		RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4884 							   &record, sizeof(record));
4885 
4886 		predlock = (PREDICATELOCK *)
4887 			SHMQueueNext(&(sxact->predicateLocks),
4888 						 &(predlock->xactLink),
4889 						 offsetof(PREDICATELOCK, xactLink));
4890 	}
4891 
4892 	LWLockRelease(SerializablePredicateLockListLock);
4893 }
4894 
4895 /*
4896  * PostPrepare_Locks
4897  *		Clean up after successful PREPARE. Unlike the non-predicate
4898  *		lock manager, we do not need to transfer locks to a dummy
4899  *		PGPROC because our SERIALIZABLEXACT will stay around
4900  *		anyway. We only need to clean up our local state.
4901  */
4902 void
PostPrepare_PredicateLocks(TransactionId xid)4903 PostPrepare_PredicateLocks(TransactionId xid)
4904 {
4905 	if (MySerializableXact == InvalidSerializableXact)
4906 		return;
4907 
4908 	Assert(SxactIsPrepared(MySerializableXact));
4909 
4910 	MySerializableXact->pid = 0;
4911 
4912 	hash_destroy(LocalPredicateLockHash);
4913 	LocalPredicateLockHash = NULL;
4914 
4915 	MySerializableXact = InvalidSerializableXact;
4916 	MyXactDidWrite = false;
4917 }
4918 
4919 /*
4920  * PredicateLockTwoPhaseFinish
4921  *		Release a prepared transaction's predicate locks once it
4922  *		commits or aborts.
4923  */
4924 void
PredicateLockTwoPhaseFinish(TransactionId xid,bool isCommit)4925 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
4926 {
4927 	SERIALIZABLEXID *sxid;
4928 	SERIALIZABLEXIDTAG sxidtag;
4929 
4930 	sxidtag.xid = xid;
4931 
4932 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4933 	sxid = (SERIALIZABLEXID *)
4934 		hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4935 	LWLockRelease(SerializableXactHashLock);
4936 
4937 	/* xid will not be found if it wasn't a serializable transaction */
4938 	if (sxid == NULL)
4939 		return;
4940 
4941 	/* Release its locks */
4942 	MySerializableXact = sxid->myXact;
4943 	MyXactDidWrite = true;		/* conservatively assume that we wrote
4944 								 * something */
4945 	ReleasePredicateLocks(isCommit);
4946 }
4947 
4948 /*
4949  * Re-acquire a predicate lock belonging to a transaction that was prepared.
4950  */
4951 void
predicatelock_twophase_recover(TransactionId xid,uint16 info,void * recdata,uint32 len)4952 predicatelock_twophase_recover(TransactionId xid, uint16 info,
4953 							   void *recdata, uint32 len)
4954 {
4955 	TwoPhasePredicateRecord *record;
4956 
4957 	Assert(len == sizeof(TwoPhasePredicateRecord));
4958 
4959 	record = (TwoPhasePredicateRecord *) recdata;
4960 
4961 	Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
4962 		   (record->type == TWOPHASEPREDICATERECORD_LOCK));
4963 
4964 	if (record->type == TWOPHASEPREDICATERECORD_XACT)
4965 	{
4966 		/* Per-transaction record. Set up a SERIALIZABLEXACT. */
4967 		TwoPhasePredicateXactRecord *xactRecord;
4968 		SERIALIZABLEXACT *sxact;
4969 		SERIALIZABLEXID *sxid;
4970 		SERIALIZABLEXIDTAG sxidtag;
4971 		bool		found;
4972 
4973 		xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
4974 
4975 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4976 		sxact = CreatePredXact();
4977 		if (!sxact)
4978 			ereport(ERROR,
4979 					(errcode(ERRCODE_OUT_OF_MEMORY),
4980 					 errmsg("out of shared memory")));
4981 
4982 		/* vxid for a prepared xact is InvalidBackendId/xid; no pid */
4983 		sxact->vxid.backendId = InvalidBackendId;
4984 		sxact->vxid.localTransactionId = (LocalTransactionId) xid;
4985 		sxact->pid = 0;
4986 
4987 		/* a prepared xact hasn't committed yet */
4988 		sxact->prepareSeqNo = RecoverySerCommitSeqNo;
4989 		sxact->commitSeqNo = InvalidSerCommitSeqNo;
4990 		sxact->finishedBefore = InvalidTransactionId;
4991 
4992 		sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
4993 
4994 		/*
4995 		 * Don't need to track this; no transactions running at the time the
4996 		 * recovered xact started are still active, except possibly other
4997 		 * prepared xacts and we don't care whether those are RO_SAFE or not.
4998 		 */
4999 		SHMQueueInit(&(sxact->possibleUnsafeConflicts));
5000 
5001 		SHMQueueInit(&(sxact->predicateLocks));
5002 		SHMQueueElemInit(&(sxact->finishedLink));
5003 
5004 		sxact->topXid = xid;
5005 		sxact->xmin = xactRecord->xmin;
5006 		sxact->flags = xactRecord->flags;
5007 		Assert(SxactIsPrepared(sxact));
5008 		if (!SxactIsReadOnly(sxact))
5009 		{
5010 			++(PredXact->WritableSxactCount);
5011 			Assert(PredXact->WritableSxactCount <=
5012 				   (MaxBackends + max_prepared_xacts));
5013 		}
5014 
5015 		/*
5016 		 * We don't know whether the transaction had any conflicts or not, so
5017 		 * we'll conservatively assume that it had both a conflict in and a
5018 		 * conflict out, and represent that with the summary conflict flags.
5019 		 */
5020 		SHMQueueInit(&(sxact->outConflicts));
5021 		SHMQueueInit(&(sxact->inConflicts));
5022 		sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
5023 		sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
5024 
5025 		/* Register the transaction's xid */
5026 		sxidtag.xid = xid;
5027 		sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
5028 											   &sxidtag,
5029 											   HASH_ENTER, &found);
5030 		Assert(sxid != NULL);
5031 		Assert(!found);
5032 		sxid->myXact = (SERIALIZABLEXACT *) sxact;
5033 
5034 		/*
5035 		 * Update global xmin. Note that this is a special case compared to
5036 		 * registering a normal transaction, because the global xmin might go
5037 		 * backwards. That's OK, because until recovery is over we're not
5038 		 * going to complete any transactions or create any non-prepared
5039 		 * transactions, so there's no danger of throwing away.
5040 		 */
5041 		if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
5042 			(TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
5043 		{
5044 			PredXact->SxactGlobalXmin = sxact->xmin;
5045 			PredXact->SxactGlobalXminCount = 1;
5046 			OldSerXidSetActiveSerXmin(sxact->xmin);
5047 		}
5048 		else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
5049 		{
5050 			Assert(PredXact->SxactGlobalXminCount > 0);
5051 			PredXact->SxactGlobalXminCount++;
5052 		}
5053 
5054 		LWLockRelease(SerializableXactHashLock);
5055 	}
5056 	else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
5057 	{
5058 		/* Lock record. Recreate the PREDICATELOCK */
5059 		TwoPhasePredicateLockRecord *lockRecord;
5060 		SERIALIZABLEXID *sxid;
5061 		SERIALIZABLEXACT *sxact;
5062 		SERIALIZABLEXIDTAG sxidtag;
5063 		uint32		targettaghash;
5064 
5065 		lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
5066 		targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
5067 
5068 		LWLockAcquire(SerializableXactHashLock, LW_SHARED);
5069 		sxidtag.xid = xid;
5070 		sxid = (SERIALIZABLEXID *)
5071 			hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
5072 		LWLockRelease(SerializableXactHashLock);
5073 
5074 		Assert(sxid != NULL);
5075 		sxact = sxid->myXact;
5076 		Assert(sxact != InvalidSerializableXact);
5077 
5078 		CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
5079 	}
5080 }
5081