1 /*-------------------------------------------------------------------------
2  *
3  * snapmgr.c
4  *		PostgreSQL snapshot manager
5  *
6  * We keep track of snapshots in two ways: those "registered" by resowner.c,
7  * and the "active snapshot" stack.  All snapshots in either of them live in
8  * persistent memory.  When a snapshot is no longer in any of these lists
9  * (tracked by separate refcounts on each snapshot), its memory can be freed.
10  *
11  * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12  * regd_count and list it in RegisteredSnapshots, but this reference is not
13  * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14  * to track this snapshot reference, but that introduces logical circularity
15  * and thus makes it impossible to clean up in a sane fashion.  It's better to
16  * handle this reference as an internally-tracked registration, so that this
17  * module is entirely lower-level than ResourceOwners.
18  *
19  * Likewise, any snapshots that have been exported by pg_export_snapshot
20  * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21  * tracked by any resource owner.
22  *
23  * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24  * is valid, but is not tracked by any resource owner.
25  *
26  * The same is true for historic snapshots used during logical decoding,
27  * their lifetime is managed separately (as they live longer than one xact.c
28  * transaction).
29  *
30  * These arrangements let us reset MyProc->xmin when there are no snapshots
31  * referenced by this transaction, and advance it when the one with oldest
32  * Xmin is no longer referenced.  For simplicity however, only registered
33  * snapshots not active snapshots participate in tracking which one is oldest;
34  * we don't try to change MyProc->xmin except when the active-snapshot
35  * stack is empty.
36  *
37  *
38  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  * IDENTIFICATION
42  *	  src/backend/utils/time/snapmgr.c
43  *
44  *-------------------------------------------------------------------------
45  */
46 #include "postgres.h"
47 
48 #include <sys/stat.h>
49 #include <unistd.h>
50 
51 #include "access/subtrans.h"
52 #include "access/transam.h"
53 #include "access/xact.h"
54 #include "access/xlog.h"
55 #include "catalog/catalog.h"
56 #include "datatype/timestamp.h"
57 #include "lib/pairingheap.h"
58 #include "miscadmin.h"
59 #include "storage/predicate.h"
60 #include "storage/proc.h"
61 #include "storage/procarray.h"
62 #include "storage/sinval.h"
63 #include "storage/sinvaladt.h"
64 #include "storage/spin.h"
65 #include "utils/builtins.h"
66 #include "utils/memutils.h"
67 #include "utils/old_snapshot.h"
68 #include "utils/rel.h"
69 #include "utils/resowner_private.h"
70 #include "utils/snapmgr.h"
71 #include "utils/syscache.h"
72 #include "utils/timestamp.h"
73 
74 
75 /*
76  * GUC parameters
77  */
78 int			old_snapshot_threshold; /* number of minutes, -1 disables */
79 
80 volatile OldSnapshotControlData *oldSnapshotControl;
81 
82 
83 /*
84  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
85  * mode, and to the latest one taken in a read-committed transaction.
86  * SecondarySnapshot is a snapshot that's always up-to-date as of the current
87  * instant, even in transaction-snapshot mode.  It should only be used for
88  * special-purpose code (say, RI checking.)  CatalogSnapshot points to an
89  * MVCC snapshot intended to be used for catalog scans; we must invalidate it
90  * whenever a system catalog change occurs.
91  *
92  * These SnapshotData structs are static to simplify memory allocation
93  * (see the hack in GetSnapshotData to avoid repeated malloc/free).
94  */
95 static SnapshotData CurrentSnapshotData = {SNAPSHOT_MVCC};
96 static SnapshotData SecondarySnapshotData = {SNAPSHOT_MVCC};
97 SnapshotData CatalogSnapshotData = {SNAPSHOT_MVCC};
98 SnapshotData SnapshotSelfData = {SNAPSHOT_SELF};
99 SnapshotData SnapshotAnyData = {SNAPSHOT_ANY};
100 
101 /* Pointers to valid snapshots */
102 static Snapshot CurrentSnapshot = NULL;
103 static Snapshot SecondarySnapshot = NULL;
104 static Snapshot CatalogSnapshot = NULL;
105 static Snapshot HistoricSnapshot = NULL;
106 
107 /*
108  * These are updated by GetSnapshotData.  We initialize them this way
109  * for the convenience of TransactionIdIsInProgress: even in bootstrap
110  * mode, we don't want it to say that BootstrapTransactionId is in progress.
111  */
112 TransactionId TransactionXmin = FirstNormalTransactionId;
113 TransactionId RecentXmin = FirstNormalTransactionId;
114 
115 /* (table, ctid) => (cmin, cmax) mapping during timetravel */
116 static HTAB *tuplecid_data = NULL;
117 
118 /*
119  * Elements of the active snapshot stack.
120  *
121  * Each element here accounts for exactly one active_count on SnapshotData.
122  *
123  * NB: the code assumes that elements in this list are in non-increasing
124  * order of as_level; also, the list must be NULL-terminated.
125  */
126 typedef struct ActiveSnapshotElt
127 {
128 	Snapshot	as_snap;
129 	int			as_level;
130 	struct ActiveSnapshotElt *as_next;
131 } ActiveSnapshotElt;
132 
133 /* Top of the stack of active snapshots */
134 static ActiveSnapshotElt *ActiveSnapshot = NULL;
135 
136 /* Bottom of the stack of active snapshots */
137 static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
138 
139 /*
140  * Currently registered Snapshots.  Ordered in a heap by xmin, so that we can
141  * quickly find the one with lowest xmin, to advance our MyProc->xmin.
142  */
143 static int	xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
144 					 void *arg);
145 
146 static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
147 
148 /* first GetTransactionSnapshot call in a transaction? */
149 bool		FirstSnapshotSet = false;
150 
151 /*
152  * Remember the serializable transaction snapshot, if any.  We cannot trust
153  * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
154  * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
155  */
156 static Snapshot FirstXactSnapshot = NULL;
157 
158 /* Define pathname of exported-snapshot files */
159 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
160 
161 /* Structure holding info about exported snapshot. */
162 typedef struct ExportedSnapshot
163 {
164 	char	   *snapfile;
165 	Snapshot	snapshot;
166 } ExportedSnapshot;
167 
168 /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
169 static List *exportedSnapshots = NIL;
170 
171 /* Prototypes for local functions */
172 static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
173 static Snapshot CopySnapshot(Snapshot snapshot);
174 static void FreeSnapshot(Snapshot snapshot);
175 static void SnapshotResetXmin(void);
176 
177 /*
178  * Snapshot fields to be serialized.
179  *
180  * Only these fields need to be sent to the cooperating backend; the
181  * remaining ones can (and must) be set by the receiver upon restore.
182  */
183 typedef struct SerializedSnapshotData
184 {
185 	TransactionId xmin;
186 	TransactionId xmax;
187 	uint32		xcnt;
188 	int32		subxcnt;
189 	bool		suboverflowed;
190 	bool		takenDuringRecovery;
191 	CommandId	curcid;
192 	TimestampTz whenTaken;
193 	XLogRecPtr	lsn;
194 } SerializedSnapshotData;
195 
196 Size
SnapMgrShmemSize(void)197 SnapMgrShmemSize(void)
198 {
199 	Size		size;
200 
201 	size = offsetof(OldSnapshotControlData, xid_by_minute);
202 	if (old_snapshot_threshold > 0)
203 		size = add_size(size, mul_size(sizeof(TransactionId),
204 									   OLD_SNAPSHOT_TIME_MAP_ENTRIES));
205 
206 	return size;
207 }
208 
209 /*
210  * Initialize for managing old snapshot detection.
211  */
212 void
SnapMgrInit(void)213 SnapMgrInit(void)
214 {
215 	bool		found;
216 
217 	/*
218 	 * Create or attach to the OldSnapshotControlData structure.
219 	 */
220 	oldSnapshotControl = (volatile OldSnapshotControlData *)
221 		ShmemInitStruct("OldSnapshotControlData",
222 						SnapMgrShmemSize(), &found);
223 
224 	if (!found)
225 	{
226 		SpinLockInit(&oldSnapshotControl->mutex_current);
227 		oldSnapshotControl->current_timestamp = 0;
228 		SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
229 		oldSnapshotControl->latest_xmin = InvalidTransactionId;
230 		oldSnapshotControl->next_map_update = 0;
231 		SpinLockInit(&oldSnapshotControl->mutex_threshold);
232 		oldSnapshotControl->threshold_timestamp = 0;
233 		oldSnapshotControl->threshold_xid = InvalidTransactionId;
234 		oldSnapshotControl->head_offset = 0;
235 		oldSnapshotControl->head_timestamp = 0;
236 		oldSnapshotControl->count_used = 0;
237 	}
238 }
239 
240 /*
241  * GetTransactionSnapshot
242  *		Get the appropriate snapshot for a new query in a transaction.
243  *
244  * Note that the return value may point at static storage that will be modified
245  * by future calls and by CommandCounterIncrement().  Callers should call
246  * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
247  * used very long.
248  */
249 Snapshot
GetTransactionSnapshot(void)250 GetTransactionSnapshot(void)
251 {
252 	/*
253 	 * Return historic snapshot if doing logical decoding. We'll never need a
254 	 * non-historic transaction snapshot in this (sub-)transaction, so there's
255 	 * no need to be careful to set one up for later calls to
256 	 * GetTransactionSnapshot().
257 	 */
258 	if (HistoricSnapshotActive())
259 	{
260 		Assert(!FirstSnapshotSet);
261 		return HistoricSnapshot;
262 	}
263 
264 	/* First call in transaction? */
265 	if (!FirstSnapshotSet)
266 	{
267 		/*
268 		 * Don't allow catalog snapshot to be older than xact snapshot.  Must
269 		 * do this first to allow the empty-heap Assert to succeed.
270 		 */
271 		InvalidateCatalogSnapshot();
272 
273 		Assert(pairingheap_is_empty(&RegisteredSnapshots));
274 		Assert(FirstXactSnapshot == NULL);
275 
276 		if (IsInParallelMode())
277 			elog(ERROR,
278 				 "cannot take query snapshot during a parallel operation");
279 
280 		/*
281 		 * In transaction-snapshot mode, the first snapshot must live until
282 		 * end of xact regardless of what the caller does with it, so we must
283 		 * make a copy of it rather than returning CurrentSnapshotData
284 		 * directly.  Furthermore, if we're running in serializable mode,
285 		 * predicate.c needs to wrap the snapshot fetch in its own processing.
286 		 */
287 		if (IsolationUsesXactSnapshot())
288 		{
289 			/* First, create the snapshot in CurrentSnapshotData */
290 			if (IsolationIsSerializable())
291 				CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
292 			else
293 				CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
294 			/* Make a saved copy */
295 			CurrentSnapshot = CopySnapshot(CurrentSnapshot);
296 			FirstXactSnapshot = CurrentSnapshot;
297 			/* Mark it as "registered" in FirstXactSnapshot */
298 			FirstXactSnapshot->regd_count++;
299 			pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
300 		}
301 		else
302 			CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
303 
304 		FirstSnapshotSet = true;
305 		return CurrentSnapshot;
306 	}
307 
308 	if (IsolationUsesXactSnapshot())
309 		return CurrentSnapshot;
310 
311 	/* Don't allow catalog snapshot to be older than xact snapshot. */
312 	InvalidateCatalogSnapshot();
313 
314 	CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
315 
316 	return CurrentSnapshot;
317 }
318 
319 /*
320  * GetLatestSnapshot
321  *		Get a snapshot that is up-to-date as of the current instant,
322  *		even if we are executing in transaction-snapshot mode.
323  */
324 Snapshot
GetLatestSnapshot(void)325 GetLatestSnapshot(void)
326 {
327 	/*
328 	 * We might be able to relax this, but nothing that could otherwise work
329 	 * needs it.
330 	 */
331 	if (IsInParallelMode())
332 		elog(ERROR,
333 			 "cannot update SecondarySnapshot during a parallel operation");
334 
335 	/*
336 	 * So far there are no cases requiring support for GetLatestSnapshot()
337 	 * during logical decoding, but it wouldn't be hard to add if required.
338 	 */
339 	Assert(!HistoricSnapshotActive());
340 
341 	/* If first call in transaction, go ahead and set the xact snapshot */
342 	if (!FirstSnapshotSet)
343 		return GetTransactionSnapshot();
344 
345 	SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
346 
347 	return SecondarySnapshot;
348 }
349 
350 /*
351  * GetOldestSnapshot
352  *
353  *		Get the transaction's oldest known snapshot, as judged by the LSN.
354  *		Will return NULL if there are no active or registered snapshots.
355  */
356 Snapshot
GetOldestSnapshot(void)357 GetOldestSnapshot(void)
358 {
359 	Snapshot	OldestRegisteredSnapshot = NULL;
360 	XLogRecPtr	RegisteredLSN = InvalidXLogRecPtr;
361 
362 	if (!pairingheap_is_empty(&RegisteredSnapshots))
363 	{
364 		OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
365 														 pairingheap_first(&RegisteredSnapshots));
366 		RegisteredLSN = OldestRegisteredSnapshot->lsn;
367 	}
368 
369 	if (OldestActiveSnapshot != NULL)
370 	{
371 		XLogRecPtr	ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
372 
373 		if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
374 			return OldestActiveSnapshot->as_snap;
375 	}
376 
377 	return OldestRegisteredSnapshot;
378 }
379 
380 /*
381  * GetCatalogSnapshot
382  *		Get a snapshot that is sufficiently up-to-date for scan of the
383  *		system catalog with the specified OID.
384  */
385 Snapshot
GetCatalogSnapshot(Oid relid)386 GetCatalogSnapshot(Oid relid)
387 {
388 	/*
389 	 * Return historic snapshot while we're doing logical decoding, so we can
390 	 * see the appropriate state of the catalog.
391 	 *
392 	 * This is the primary reason for needing to reset the system caches after
393 	 * finishing decoding.
394 	 */
395 	if (HistoricSnapshotActive())
396 		return HistoricSnapshot;
397 
398 	return GetNonHistoricCatalogSnapshot(relid);
399 }
400 
401 /*
402  * GetNonHistoricCatalogSnapshot
403  *		Get a snapshot that is sufficiently up-to-date for scan of the system
404  *		catalog with the specified OID, even while historic snapshots are set
405  *		up.
406  */
407 Snapshot
GetNonHistoricCatalogSnapshot(Oid relid)408 GetNonHistoricCatalogSnapshot(Oid relid)
409 {
410 	/*
411 	 * If the caller is trying to scan a relation that has no syscache, no
412 	 * catcache invalidations will be sent when it is updated.  For a few key
413 	 * relations, snapshot invalidations are sent instead.  If we're trying to
414 	 * scan a relation for which neither catcache nor snapshot invalidations
415 	 * are sent, we must refresh the snapshot every time.
416 	 */
417 	if (CatalogSnapshot &&
418 		!RelationInvalidatesSnapshotsOnly(relid) &&
419 		!RelationHasSysCache(relid))
420 		InvalidateCatalogSnapshot();
421 
422 	if (CatalogSnapshot == NULL)
423 	{
424 		/* Get new snapshot. */
425 		CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
426 
427 		/*
428 		 * Make sure the catalog snapshot will be accounted for in decisions
429 		 * about advancing PGPROC->xmin.  We could apply RegisterSnapshot, but
430 		 * that would result in making a physical copy, which is overkill; and
431 		 * it would also create a dependency on some resource owner, which we
432 		 * do not want for reasons explained at the head of this file. Instead
433 		 * just shove the CatalogSnapshot into the pairing heap manually. This
434 		 * has to be reversed in InvalidateCatalogSnapshot, of course.
435 		 *
436 		 * NB: it had better be impossible for this to throw error, since the
437 		 * CatalogSnapshot pointer is already valid.
438 		 */
439 		pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
440 	}
441 
442 	return CatalogSnapshot;
443 }
444 
445 /*
446  * InvalidateCatalogSnapshot
447  *		Mark the current catalog snapshot, if any, as invalid
448  *
449  * We could change this API to allow the caller to provide more fine-grained
450  * invalidation details, so that a change to relation A wouldn't prevent us
451  * from using our cached snapshot to scan relation B, but so far there's no
452  * evidence that the CPU cycles we spent tracking such fine details would be
453  * well-spent.
454  */
455 void
InvalidateCatalogSnapshot(void)456 InvalidateCatalogSnapshot(void)
457 {
458 	if (CatalogSnapshot)
459 	{
460 		pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
461 		CatalogSnapshot = NULL;
462 		SnapshotResetXmin();
463 	}
464 }
465 
466 /*
467  * InvalidateCatalogSnapshotConditionally
468  *		Drop catalog snapshot if it's the only one we have
469  *
470  * This is called when we are about to wait for client input, so we don't
471  * want to continue holding the catalog snapshot if it might mean that the
472  * global xmin horizon can't advance.  However, if there are other snapshots
473  * still active or registered, the catalog snapshot isn't likely to be the
474  * oldest one, so we might as well keep it.
475  */
476 void
InvalidateCatalogSnapshotConditionally(void)477 InvalidateCatalogSnapshotConditionally(void)
478 {
479 	if (CatalogSnapshot &&
480 		ActiveSnapshot == NULL &&
481 		pairingheap_is_singular(&RegisteredSnapshots))
482 		InvalidateCatalogSnapshot();
483 }
484 
485 /*
486  * SnapshotSetCommandId
487  *		Propagate CommandCounterIncrement into the static snapshots, if set
488  */
489 void
SnapshotSetCommandId(CommandId curcid)490 SnapshotSetCommandId(CommandId curcid)
491 {
492 	if (!FirstSnapshotSet)
493 		return;
494 
495 	if (CurrentSnapshot)
496 		CurrentSnapshot->curcid = curcid;
497 	if (SecondarySnapshot)
498 		SecondarySnapshot->curcid = curcid;
499 	/* Should we do the same with CatalogSnapshot? */
500 }
501 
502 /*
503  * SetTransactionSnapshot
504  *		Set the transaction's snapshot from an imported MVCC snapshot.
505  *
506  * Note that this is very closely tied to GetTransactionSnapshot --- it
507  * must take care of all the same considerations as the first-snapshot case
508  * in GetTransactionSnapshot.
509  */
510 static void
SetTransactionSnapshot(Snapshot sourcesnap,VirtualTransactionId * sourcevxid,int sourcepid,PGPROC * sourceproc)511 SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
512 					   int sourcepid, PGPROC *sourceproc)
513 {
514 	/* Caller should have checked this already */
515 	Assert(!FirstSnapshotSet);
516 
517 	/* Better do this to ensure following Assert succeeds. */
518 	InvalidateCatalogSnapshot();
519 
520 	Assert(pairingheap_is_empty(&RegisteredSnapshots));
521 	Assert(FirstXactSnapshot == NULL);
522 	Assert(!HistoricSnapshotActive());
523 
524 	/*
525 	 * Even though we are not going to use the snapshot it computes, we must
526 	 * call GetSnapshotData, for two reasons: (1) to be sure that
527 	 * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
528 	 * the state for GlobalVis*.
529 	 */
530 	CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
531 
532 	/*
533 	 * Now copy appropriate fields from the source snapshot.
534 	 */
535 	CurrentSnapshot->xmin = sourcesnap->xmin;
536 	CurrentSnapshot->xmax = sourcesnap->xmax;
537 	CurrentSnapshot->xcnt = sourcesnap->xcnt;
538 	Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
539 	memcpy(CurrentSnapshot->xip, sourcesnap->xip,
540 		   sourcesnap->xcnt * sizeof(TransactionId));
541 	CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
542 	Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
543 	memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
544 		   sourcesnap->subxcnt * sizeof(TransactionId));
545 	CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
546 	CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
547 	/* NB: curcid should NOT be copied, it's a local matter */
548 
549 	CurrentSnapshot->snapXactCompletionCount = 0;
550 
551 	/*
552 	 * Now we have to fix what GetSnapshotData did with MyProc->xmin and
553 	 * TransactionXmin.  There is a race condition: to make sure we are not
554 	 * causing the global xmin to go backwards, we have to test that the
555 	 * source transaction is still running, and that has to be done
556 	 * atomically. So let procarray.c do it.
557 	 *
558 	 * Note: in serializable mode, predicate.c will do this a second time. It
559 	 * doesn't seem worth contorting the logic here to avoid two calls,
560 	 * especially since it's not clear that predicate.c *must* do this.
561 	 */
562 	if (sourceproc != NULL)
563 	{
564 		if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
565 			ereport(ERROR,
566 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567 					 errmsg("could not import the requested snapshot"),
568 					 errdetail("The source transaction is not running anymore.")));
569 	}
570 	else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
571 		ereport(ERROR,
572 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
573 				 errmsg("could not import the requested snapshot"),
574 				 errdetail("The source process with PID %d is not running anymore.",
575 						   sourcepid)));
576 
577 	/*
578 	 * In transaction-snapshot mode, the first snapshot must live until end of
579 	 * xact, so we must make a copy of it.  Furthermore, if we're running in
580 	 * serializable mode, predicate.c needs to do its own processing.
581 	 */
582 	if (IsolationUsesXactSnapshot())
583 	{
584 		if (IsolationIsSerializable())
585 			SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
586 											   sourcepid);
587 		/* Make a saved copy */
588 		CurrentSnapshot = CopySnapshot(CurrentSnapshot);
589 		FirstXactSnapshot = CurrentSnapshot;
590 		/* Mark it as "registered" in FirstXactSnapshot */
591 		FirstXactSnapshot->regd_count++;
592 		pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
593 	}
594 
595 	FirstSnapshotSet = true;
596 }
597 
598 /*
599  * CopySnapshot
600  *		Copy the given snapshot.
601  *
602  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
603  * to 0.  The returned snapshot has the copied flag set.
604  */
605 static Snapshot
CopySnapshot(Snapshot snapshot)606 CopySnapshot(Snapshot snapshot)
607 {
608 	Snapshot	newsnap;
609 	Size		subxipoff;
610 	Size		size;
611 
612 	Assert(snapshot != InvalidSnapshot);
613 
614 	/* We allocate any XID arrays needed in the same palloc block. */
615 	size = subxipoff = sizeof(SnapshotData) +
616 		snapshot->xcnt * sizeof(TransactionId);
617 	if (snapshot->subxcnt > 0)
618 		size += snapshot->subxcnt * sizeof(TransactionId);
619 
620 	newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
621 	memcpy(newsnap, snapshot, sizeof(SnapshotData));
622 
623 	newsnap->regd_count = 0;
624 	newsnap->active_count = 0;
625 	newsnap->copied = true;
626 	newsnap->snapXactCompletionCount = 0;
627 
628 	/* setup XID array */
629 	if (snapshot->xcnt > 0)
630 	{
631 		newsnap->xip = (TransactionId *) (newsnap + 1);
632 		memcpy(newsnap->xip, snapshot->xip,
633 			   snapshot->xcnt * sizeof(TransactionId));
634 	}
635 	else
636 		newsnap->xip = NULL;
637 
638 	/*
639 	 * Setup subXID array. Don't bother to copy it if it had overflowed,
640 	 * though, because it's not used anywhere in that case. Except if it's a
641 	 * snapshot taken during recovery; all the top-level XIDs are in subxip as
642 	 * well in that case, so we mustn't lose them.
643 	 */
644 	if (snapshot->subxcnt > 0 &&
645 		(!snapshot->suboverflowed || snapshot->takenDuringRecovery))
646 	{
647 		newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
648 		memcpy(newsnap->subxip, snapshot->subxip,
649 			   snapshot->subxcnt * sizeof(TransactionId));
650 	}
651 	else
652 		newsnap->subxip = NULL;
653 
654 	return newsnap;
655 }
656 
657 /*
658  * FreeSnapshot
659  *		Free the memory associated with a snapshot.
660  */
661 static void
FreeSnapshot(Snapshot snapshot)662 FreeSnapshot(Snapshot snapshot)
663 {
664 	Assert(snapshot->regd_count == 0);
665 	Assert(snapshot->active_count == 0);
666 	Assert(snapshot->copied);
667 
668 	pfree(snapshot);
669 }
670 
671 /*
672  * PushActiveSnapshot
673  *		Set the given snapshot as the current active snapshot
674  *
675  * If the passed snapshot is a statically-allocated one, or it is possibly
676  * subject to a future command counter update, create a new long-lived copy
677  * with active refcount=1.  Otherwise, only increment the refcount.
678  */
679 void
PushActiveSnapshot(Snapshot snap)680 PushActiveSnapshot(Snapshot snap)
681 {
682 	PushActiveSnapshotWithLevel(snap, GetCurrentTransactionNestLevel());
683 }
684 
685 /*
686  * PushActiveSnapshotWithLevel
687  *		Set the given snapshot as the current active snapshot
688  *
689  * Same as PushActiveSnapshot except that caller can specify the
690  * transaction nesting level that "owns" the snapshot.  This level
691  * must not be deeper than the current top of the snapshot stack.
692  */
693 void
PushActiveSnapshotWithLevel(Snapshot snap,int snap_level)694 PushActiveSnapshotWithLevel(Snapshot snap, int snap_level)
695 {
696 	ActiveSnapshotElt *newactive;
697 
698 	Assert(snap != InvalidSnapshot);
699 	Assert(ActiveSnapshot == NULL || snap_level >= ActiveSnapshot->as_level);
700 
701 	newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
702 
703 	/*
704 	 * Checking SecondarySnapshot is probably useless here, but it seems
705 	 * better to be sure.
706 	 */
707 	if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
708 		newactive->as_snap = CopySnapshot(snap);
709 	else
710 		newactive->as_snap = snap;
711 
712 	newactive->as_next = ActiveSnapshot;
713 	newactive->as_level = snap_level;
714 
715 	newactive->as_snap->active_count++;
716 
717 	ActiveSnapshot = newactive;
718 	if (OldestActiveSnapshot == NULL)
719 		OldestActiveSnapshot = ActiveSnapshot;
720 }
721 
722 /*
723  * PushCopiedSnapshot
724  *		As above, except forcibly copy the presented snapshot.
725  *
726  * This should be used when the ActiveSnapshot has to be modifiable, for
727  * example if the caller intends to call UpdateActiveSnapshotCommandId.
728  * The new snapshot will be released when popped from the stack.
729  */
730 void
PushCopiedSnapshot(Snapshot snapshot)731 PushCopiedSnapshot(Snapshot snapshot)
732 {
733 	PushActiveSnapshot(CopySnapshot(snapshot));
734 }
735 
736 /*
737  * UpdateActiveSnapshotCommandId
738  *
739  * Update the current CID of the active snapshot.  This can only be applied
740  * to a snapshot that is not referenced elsewhere.
741  */
742 void
UpdateActiveSnapshotCommandId(void)743 UpdateActiveSnapshotCommandId(void)
744 {
745 	CommandId	save_curcid,
746 				curcid;
747 
748 	Assert(ActiveSnapshot != NULL);
749 	Assert(ActiveSnapshot->as_snap->active_count == 1);
750 	Assert(ActiveSnapshot->as_snap->regd_count == 0);
751 
752 	/*
753 	 * Don't allow modification of the active snapshot during parallel
754 	 * operation.  We share the snapshot to worker backends at the beginning
755 	 * of parallel operation, so any change to the snapshot can lead to
756 	 * inconsistencies.  We have other defenses against
757 	 * CommandCounterIncrement, but there are a few places that call this
758 	 * directly, so we put an additional guard here.
759 	 */
760 	save_curcid = ActiveSnapshot->as_snap->curcid;
761 	curcid = GetCurrentCommandId(false);
762 	if (IsInParallelMode() && save_curcid != curcid)
763 		elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
764 	ActiveSnapshot->as_snap->curcid = curcid;
765 }
766 
767 /*
768  * PopActiveSnapshot
769  *
770  * Remove the topmost snapshot from the active snapshot stack, decrementing the
771  * reference count, and free it if this was the last reference.
772  */
773 void
PopActiveSnapshot(void)774 PopActiveSnapshot(void)
775 {
776 	ActiveSnapshotElt *newstack;
777 
778 	newstack = ActiveSnapshot->as_next;
779 
780 	Assert(ActiveSnapshot->as_snap->active_count > 0);
781 
782 	ActiveSnapshot->as_snap->active_count--;
783 
784 	if (ActiveSnapshot->as_snap->active_count == 0 &&
785 		ActiveSnapshot->as_snap->regd_count == 0)
786 		FreeSnapshot(ActiveSnapshot->as_snap);
787 
788 	pfree(ActiveSnapshot);
789 	ActiveSnapshot = newstack;
790 	if (ActiveSnapshot == NULL)
791 		OldestActiveSnapshot = NULL;
792 
793 	SnapshotResetXmin();
794 }
795 
796 /*
797  * GetActiveSnapshot
798  *		Return the topmost snapshot in the Active stack.
799  */
800 Snapshot
GetActiveSnapshot(void)801 GetActiveSnapshot(void)
802 {
803 	Assert(ActiveSnapshot != NULL);
804 
805 	return ActiveSnapshot->as_snap;
806 }
807 
808 /*
809  * ActiveSnapshotSet
810  *		Return whether there is at least one snapshot in the Active stack
811  */
812 bool
ActiveSnapshotSet(void)813 ActiveSnapshotSet(void)
814 {
815 	return ActiveSnapshot != NULL;
816 }
817 
818 /*
819  * RegisterSnapshot
820  *		Register a snapshot as being in use by the current resource owner
821  *
822  * If InvalidSnapshot is passed, it is not registered.
823  */
824 Snapshot
RegisterSnapshot(Snapshot snapshot)825 RegisterSnapshot(Snapshot snapshot)
826 {
827 	if (snapshot == InvalidSnapshot)
828 		return InvalidSnapshot;
829 
830 	return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
831 }
832 
833 /*
834  * RegisterSnapshotOnOwner
835  *		As above, but use the specified resource owner
836  */
837 Snapshot
RegisterSnapshotOnOwner(Snapshot snapshot,ResourceOwner owner)838 RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
839 {
840 	Snapshot	snap;
841 
842 	if (snapshot == InvalidSnapshot)
843 		return InvalidSnapshot;
844 
845 	/* Static snapshot?  Create a persistent copy */
846 	snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
847 
848 	/* and tell resowner.c about it */
849 	ResourceOwnerEnlargeSnapshots(owner);
850 	snap->regd_count++;
851 	ResourceOwnerRememberSnapshot(owner, snap);
852 
853 	if (snap->regd_count == 1)
854 		pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
855 
856 	return snap;
857 }
858 
859 /*
860  * UnregisterSnapshot
861  *
862  * Decrement the reference count of a snapshot, remove the corresponding
863  * reference from CurrentResourceOwner, and free the snapshot if no more
864  * references remain.
865  */
866 void
UnregisterSnapshot(Snapshot snapshot)867 UnregisterSnapshot(Snapshot snapshot)
868 {
869 	if (snapshot == NULL)
870 		return;
871 
872 	UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
873 }
874 
875 /*
876  * UnregisterSnapshotFromOwner
877  *		As above, but use the specified resource owner
878  */
879 void
UnregisterSnapshotFromOwner(Snapshot snapshot,ResourceOwner owner)880 UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
881 {
882 	if (snapshot == NULL)
883 		return;
884 
885 	Assert(snapshot->regd_count > 0);
886 	Assert(!pairingheap_is_empty(&RegisteredSnapshots));
887 
888 	ResourceOwnerForgetSnapshot(owner, snapshot);
889 
890 	snapshot->regd_count--;
891 	if (snapshot->regd_count == 0)
892 		pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
893 
894 	if (snapshot->regd_count == 0 && snapshot->active_count == 0)
895 	{
896 		FreeSnapshot(snapshot);
897 		SnapshotResetXmin();
898 	}
899 }
900 
901 /*
902  * Comparison function for RegisteredSnapshots heap.  Snapshots are ordered
903  * by xmin, so that the snapshot with smallest xmin is at the top.
904  */
905 static int
xmin_cmp(const pairingheap_node * a,const pairingheap_node * b,void * arg)906 xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
907 {
908 	const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
909 	const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
910 
911 	if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
912 		return 1;
913 	else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
914 		return -1;
915 	else
916 		return 0;
917 }
918 
919 /*
920  * SnapshotResetXmin
921  *
922  * If there are no more snapshots, we can reset our PGPROC->xmin to InvalidXid.
923  * Note we can do this without locking because we assume that storing an Xid
924  * is atomic.
925  *
926  * Even if there are some remaining snapshots, we may be able to advance our
927  * PGPROC->xmin to some degree.  This typically happens when a portal is
928  * dropped.  For efficiency, we only consider recomputing PGPROC->xmin when
929  * the active snapshot stack is empty; this allows us not to need to track
930  * which active snapshot is oldest.
931  *
932  * Note: it's tempting to use GetOldestSnapshot() here so that we can include
933  * active snapshots in the calculation.  However, that compares by LSN not
934  * xmin so it's not entirely clear that it's the same thing.  Also, we'd be
935  * critically dependent on the assumption that the bottommost active snapshot
936  * stack entry has the oldest xmin.  (Current uses of GetOldestSnapshot() are
937  * not actually critical, but this would be.)
938  */
939 static void
SnapshotResetXmin(void)940 SnapshotResetXmin(void)
941 {
942 	Snapshot	minSnapshot;
943 
944 	if (ActiveSnapshot != NULL)
945 		return;
946 
947 	if (pairingheap_is_empty(&RegisteredSnapshots))
948 	{
949 		MyProc->xmin = InvalidTransactionId;
950 		return;
951 	}
952 
953 	minSnapshot = pairingheap_container(SnapshotData, ph_node,
954 										pairingheap_first(&RegisteredSnapshots));
955 
956 	if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
957 		MyProc->xmin = minSnapshot->xmin;
958 }
959 
960 /*
961  * AtSubCommit_Snapshot
962  */
963 void
AtSubCommit_Snapshot(int level)964 AtSubCommit_Snapshot(int level)
965 {
966 	ActiveSnapshotElt *active;
967 
968 	/*
969 	 * Relabel the active snapshots set in this subtransaction as though they
970 	 * are owned by the parent subxact.
971 	 */
972 	for (active = ActiveSnapshot; active != NULL; active = active->as_next)
973 	{
974 		if (active->as_level < level)
975 			break;
976 		active->as_level = level - 1;
977 	}
978 }
979 
980 /*
981  * AtSubAbort_Snapshot
982  *		Clean up snapshots after a subtransaction abort
983  */
984 void
AtSubAbort_Snapshot(int level)985 AtSubAbort_Snapshot(int level)
986 {
987 	/* Forget the active snapshots set by this subtransaction */
988 	while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
989 	{
990 		ActiveSnapshotElt *next;
991 
992 		next = ActiveSnapshot->as_next;
993 
994 		/*
995 		 * Decrement the snapshot's active count.  If it's still registered or
996 		 * marked as active by an outer subtransaction, we can't free it yet.
997 		 */
998 		Assert(ActiveSnapshot->as_snap->active_count >= 1);
999 		ActiveSnapshot->as_snap->active_count -= 1;
1000 
1001 		if (ActiveSnapshot->as_snap->active_count == 0 &&
1002 			ActiveSnapshot->as_snap->regd_count == 0)
1003 			FreeSnapshot(ActiveSnapshot->as_snap);
1004 
1005 		/* and free the stack element */
1006 		pfree(ActiveSnapshot);
1007 
1008 		ActiveSnapshot = next;
1009 		if (ActiveSnapshot == NULL)
1010 			OldestActiveSnapshot = NULL;
1011 	}
1012 
1013 	SnapshotResetXmin();
1014 }
1015 
1016 /*
1017  * AtEOXact_Snapshot
1018  *		Snapshot manager's cleanup function for end of transaction
1019  */
1020 void
AtEOXact_Snapshot(bool isCommit,bool resetXmin)1021 AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1022 {
1023 	/*
1024 	 * In transaction-snapshot mode we must release our privately-managed
1025 	 * reference to the transaction snapshot.  We must remove it from
1026 	 * RegisteredSnapshots to keep the check below happy.  But we don't bother
1027 	 * to do FreeSnapshot, for two reasons: the memory will go away with
1028 	 * TopTransactionContext anyway, and if someone has left the snapshot
1029 	 * stacked as active, we don't want the code below to be chasing through a
1030 	 * dangling pointer.
1031 	 */
1032 	if (FirstXactSnapshot != NULL)
1033 	{
1034 		Assert(FirstXactSnapshot->regd_count > 0);
1035 		Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1036 		pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1037 	}
1038 	FirstXactSnapshot = NULL;
1039 
1040 	/*
1041 	 * If we exported any snapshots, clean them up.
1042 	 */
1043 	if (exportedSnapshots != NIL)
1044 	{
1045 		ListCell   *lc;
1046 
1047 		/*
1048 		 * Get rid of the files.  Unlink failure is only a WARNING because (1)
1049 		 * it's too late to abort the transaction, and (2) leaving a leaked
1050 		 * file around has little real consequence anyway.
1051 		 *
1052 		 * We also need to remove the snapshots from RegisteredSnapshots to
1053 		 * prevent a warning below.
1054 		 *
1055 		 * As with the FirstXactSnapshot, we don't need to free resources of
1056 		 * the snapshot itself as it will go away with the memory context.
1057 		 */
1058 		foreach(lc, exportedSnapshots)
1059 		{
1060 			ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1061 
1062 			if (unlink(esnap->snapfile))
1063 				elog(WARNING, "could not unlink file \"%s\": %m",
1064 					 esnap->snapfile);
1065 
1066 			pairingheap_remove(&RegisteredSnapshots,
1067 							   &esnap->snapshot->ph_node);
1068 		}
1069 
1070 		exportedSnapshots = NIL;
1071 	}
1072 
1073 	/* Drop catalog snapshot if any */
1074 	InvalidateCatalogSnapshot();
1075 
1076 	/* On commit, complain about leftover snapshots */
1077 	if (isCommit)
1078 	{
1079 		ActiveSnapshotElt *active;
1080 
1081 		if (!pairingheap_is_empty(&RegisteredSnapshots))
1082 			elog(WARNING, "registered snapshots seem to remain after cleanup");
1083 
1084 		/* complain about unpopped active snapshots */
1085 		for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1086 			elog(WARNING, "snapshot %p still active", active);
1087 	}
1088 
1089 	/*
1090 	 * And reset our state.  We don't need to free the memory explicitly --
1091 	 * it'll go away with TopTransactionContext.
1092 	 */
1093 	ActiveSnapshot = NULL;
1094 	OldestActiveSnapshot = NULL;
1095 	pairingheap_reset(&RegisteredSnapshots);
1096 
1097 	CurrentSnapshot = NULL;
1098 	SecondarySnapshot = NULL;
1099 
1100 	FirstSnapshotSet = false;
1101 
1102 	/*
1103 	 * During normal commit processing, we call ProcArrayEndTransaction() to
1104 	 * reset the MyProc->xmin. That call happens prior to the call to
1105 	 * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1106 	 */
1107 	if (resetXmin)
1108 		SnapshotResetXmin();
1109 
1110 	Assert(resetXmin || MyProc->xmin == 0);
1111 }
1112 
1113 
1114 /*
1115  * ExportSnapshot
1116  *		Export the snapshot to a file so that other backends can import it.
1117  *		Returns the token (the file name) that can be used to import this
1118  *		snapshot.
1119  */
1120 char *
ExportSnapshot(Snapshot snapshot)1121 ExportSnapshot(Snapshot snapshot)
1122 {
1123 	TransactionId topXid;
1124 	TransactionId *children;
1125 	ExportedSnapshot *esnap;
1126 	int			nchildren;
1127 	int			addTopXid;
1128 	StringInfoData buf;
1129 	FILE	   *f;
1130 	int			i;
1131 	MemoryContext oldcxt;
1132 	char		path[MAXPGPATH];
1133 	char		pathtmp[MAXPGPATH];
1134 
1135 	/*
1136 	 * It's tempting to call RequireTransactionBlock here, since it's not very
1137 	 * useful to export a snapshot that will disappear immediately afterwards.
1138 	 * However, we haven't got enough information to do that, since we don't
1139 	 * know if we're at top level or not.  For example, we could be inside a
1140 	 * plpgsql function that is going to fire off other transactions via
1141 	 * dblink.  Rather than disallow perfectly legitimate usages, don't make a
1142 	 * check.
1143 	 *
1144 	 * Also note that we don't make any restriction on the transaction's
1145 	 * isolation level; however, importers must check the level if they are
1146 	 * serializable.
1147 	 */
1148 
1149 	/*
1150 	 * Get our transaction ID if there is one, to include in the snapshot.
1151 	 */
1152 	topXid = GetTopTransactionIdIfAny();
1153 
1154 	/*
1155 	 * We cannot export a snapshot from a subtransaction because there's no
1156 	 * easy way for importers to verify that the same subtransaction is still
1157 	 * running.
1158 	 */
1159 	if (IsSubTransaction())
1160 		ereport(ERROR,
1161 				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1162 				 errmsg("cannot export a snapshot from a subtransaction")));
1163 
1164 	/*
1165 	 * We do however allow previous committed subtransactions to exist.
1166 	 * Importers of the snapshot must see them as still running, so get their
1167 	 * XIDs to add them to the snapshot.
1168 	 */
1169 	nchildren = xactGetCommittedChildren(&children);
1170 
1171 	/*
1172 	 * Generate file path for the snapshot.  We start numbering of snapshots
1173 	 * inside the transaction from 1.
1174 	 */
1175 	snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1176 			 MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1177 
1178 	/*
1179 	 * Copy the snapshot into TopTransactionContext, add it to the
1180 	 * exportedSnapshots list, and mark it pseudo-registered.  We do this to
1181 	 * ensure that the snapshot's xmin is honored for the rest of the
1182 	 * transaction.
1183 	 */
1184 	snapshot = CopySnapshot(snapshot);
1185 
1186 	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1187 	esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1188 	esnap->snapfile = pstrdup(path);
1189 	esnap->snapshot = snapshot;
1190 	exportedSnapshots = lappend(exportedSnapshots, esnap);
1191 	MemoryContextSwitchTo(oldcxt);
1192 
1193 	snapshot->regd_count++;
1194 	pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1195 
1196 	/*
1197 	 * Fill buf with a text serialization of the snapshot, plus identification
1198 	 * data about this transaction.  The format expected by ImportSnapshot is
1199 	 * pretty rigid: each line must be fieldname:value.
1200 	 */
1201 	initStringInfo(&buf);
1202 
1203 	appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1204 	appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1205 	appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1206 	appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1207 	appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1208 
1209 	appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1210 	appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1211 
1212 	/*
1213 	 * We must include our own top transaction ID in the top-xid data, since
1214 	 * by definition we will still be running when the importing transaction
1215 	 * adopts the snapshot, but GetSnapshotData never includes our own XID in
1216 	 * the snapshot.  (There must, therefore, be enough room to add it.)
1217 	 *
1218 	 * However, it could be that our topXid is after the xmax, in which case
1219 	 * we shouldn't include it because xip[] members are expected to be before
1220 	 * xmax.  (We need not make the same check for subxip[] members, see
1221 	 * snapshot.h.)
1222 	 */
1223 	addTopXid = (TransactionIdIsValid(topXid) &&
1224 				 TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1225 	appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1226 	for (i = 0; i < snapshot->xcnt; i++)
1227 		appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1228 	if (addTopXid)
1229 		appendStringInfo(&buf, "xip:%u\n", topXid);
1230 
1231 	/*
1232 	 * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1233 	 * we have to cope with possible overflow.
1234 	 */
1235 	if (snapshot->suboverflowed ||
1236 		snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1237 		appendStringInfoString(&buf, "sof:1\n");
1238 	else
1239 	{
1240 		appendStringInfoString(&buf, "sof:0\n");
1241 		appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1242 		for (i = 0; i < snapshot->subxcnt; i++)
1243 			appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1244 		for (i = 0; i < nchildren; i++)
1245 			appendStringInfo(&buf, "sxp:%u\n", children[i]);
1246 	}
1247 	appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1248 
1249 	/*
1250 	 * Now write the text representation into a file.  We first write to a
1251 	 * ".tmp" filename, and rename to final filename if no error.  This
1252 	 * ensures that no other backend can read an incomplete file
1253 	 * (ImportSnapshot won't allow it because of its valid-characters check).
1254 	 */
1255 	snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1256 	if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1257 		ereport(ERROR,
1258 				(errcode_for_file_access(),
1259 				 errmsg("could not create file \"%s\": %m", pathtmp)));
1260 
1261 	if (fwrite(buf.data, buf.len, 1, f) != 1)
1262 		ereport(ERROR,
1263 				(errcode_for_file_access(),
1264 				 errmsg("could not write to file \"%s\": %m", pathtmp)));
1265 
1266 	/* no fsync() since file need not survive a system crash */
1267 
1268 	if (FreeFile(f))
1269 		ereport(ERROR,
1270 				(errcode_for_file_access(),
1271 				 errmsg("could not write to file \"%s\": %m", pathtmp)));
1272 
1273 	/*
1274 	 * Now that we have written everything into a .tmp file, rename the file
1275 	 * to remove the .tmp suffix.
1276 	 */
1277 	if (rename(pathtmp, path) < 0)
1278 		ereport(ERROR,
1279 				(errcode_for_file_access(),
1280 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1281 						pathtmp, path)));
1282 
1283 	/*
1284 	 * The basename of the file is what we return from pg_export_snapshot().
1285 	 * It's already in path in a textual format and we know that the path
1286 	 * starts with SNAPSHOT_EXPORT_DIR.  Skip over the prefix and the slash
1287 	 * and pstrdup it so as not to return the address of a local variable.
1288 	 */
1289 	return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1290 }
1291 
1292 /*
1293  * pg_export_snapshot
1294  *		SQL-callable wrapper for ExportSnapshot.
1295  */
1296 Datum
pg_export_snapshot(PG_FUNCTION_ARGS)1297 pg_export_snapshot(PG_FUNCTION_ARGS)
1298 {
1299 	char	   *snapshotName;
1300 
1301 	snapshotName = ExportSnapshot(GetActiveSnapshot());
1302 	PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1303 }
1304 
1305 
1306 /*
1307  * Parsing subroutines for ImportSnapshot: parse a line with the given
1308  * prefix followed by a value, and advance *s to the next line.  The
1309  * filename is provided for use in error messages.
1310  */
1311 static int
parseIntFromText(const char * prefix,char ** s,const char * filename)1312 parseIntFromText(const char *prefix, char **s, const char *filename)
1313 {
1314 	char	   *ptr = *s;
1315 	int			prefixlen = strlen(prefix);
1316 	int			val;
1317 
1318 	if (strncmp(ptr, prefix, prefixlen) != 0)
1319 		ereport(ERROR,
1320 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1321 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1322 	ptr += prefixlen;
1323 	if (sscanf(ptr, "%d", &val) != 1)
1324 		ereport(ERROR,
1325 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1326 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1327 	ptr = strchr(ptr, '\n');
1328 	if (!ptr)
1329 		ereport(ERROR,
1330 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1331 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1332 	*s = ptr + 1;
1333 	return val;
1334 }
1335 
1336 static TransactionId
parseXidFromText(const char * prefix,char ** s,const char * filename)1337 parseXidFromText(const char *prefix, char **s, const char *filename)
1338 {
1339 	char	   *ptr = *s;
1340 	int			prefixlen = strlen(prefix);
1341 	TransactionId val;
1342 
1343 	if (strncmp(ptr, prefix, prefixlen) != 0)
1344 		ereport(ERROR,
1345 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1346 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1347 	ptr += prefixlen;
1348 	if (sscanf(ptr, "%u", &val) != 1)
1349 		ereport(ERROR,
1350 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1351 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1352 	ptr = strchr(ptr, '\n');
1353 	if (!ptr)
1354 		ereport(ERROR,
1355 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1356 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1357 	*s = ptr + 1;
1358 	return val;
1359 }
1360 
1361 static void
parseVxidFromText(const char * prefix,char ** s,const char * filename,VirtualTransactionId * vxid)1362 parseVxidFromText(const char *prefix, char **s, const char *filename,
1363 				  VirtualTransactionId *vxid)
1364 {
1365 	char	   *ptr = *s;
1366 	int			prefixlen = strlen(prefix);
1367 
1368 	if (strncmp(ptr, prefix, prefixlen) != 0)
1369 		ereport(ERROR,
1370 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1371 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1372 	ptr += prefixlen;
1373 	if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1374 		ereport(ERROR,
1375 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1376 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1377 	ptr = strchr(ptr, '\n');
1378 	if (!ptr)
1379 		ereport(ERROR,
1380 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1381 				 errmsg("invalid snapshot data in file \"%s\"", filename)));
1382 	*s = ptr + 1;
1383 }
1384 
1385 /*
1386  * ImportSnapshot
1387  *		Import a previously exported snapshot.  The argument should be a
1388  *		filename in SNAPSHOT_EXPORT_DIR.  Load the snapshot from that file.
1389  *		This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1390  */
1391 void
ImportSnapshot(const char * idstr)1392 ImportSnapshot(const char *idstr)
1393 {
1394 	char		path[MAXPGPATH];
1395 	FILE	   *f;
1396 	struct stat stat_buf;
1397 	char	   *filebuf;
1398 	int			xcnt;
1399 	int			i;
1400 	VirtualTransactionId src_vxid;
1401 	int			src_pid;
1402 	Oid			src_dbid;
1403 	int			src_isolevel;
1404 	bool		src_readonly;
1405 	SnapshotData snapshot;
1406 
1407 	/*
1408 	 * Must be at top level of a fresh transaction.  Note in particular that
1409 	 * we check we haven't acquired an XID --- if we have, it's conceivable
1410 	 * that the snapshot would show it as not running, making for very screwy
1411 	 * behavior.
1412 	 */
1413 	if (FirstSnapshotSet ||
1414 		GetTopTransactionIdIfAny() != InvalidTransactionId ||
1415 		IsSubTransaction())
1416 		ereport(ERROR,
1417 				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1418 				 errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1419 
1420 	/*
1421 	 * If we are in read committed mode then the next query would execute with
1422 	 * a new snapshot thus making this function call quite useless.
1423 	 */
1424 	if (!IsolationUsesXactSnapshot())
1425 		ereport(ERROR,
1426 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1427 				 errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1428 
1429 	/*
1430 	 * Verify the identifier: only 0-9, A-F and hyphens are allowed.  We do
1431 	 * this mainly to prevent reading arbitrary files.
1432 	 */
1433 	if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1434 		ereport(ERROR,
1435 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1436 				 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1437 
1438 	/* OK, read the file */
1439 	snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1440 
1441 	f = AllocateFile(path, PG_BINARY_R);
1442 	if (!f)
1443 		ereport(ERROR,
1444 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1445 				 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1446 
1447 	/* get the size of the file so that we know how much memory we need */
1448 	if (fstat(fileno(f), &stat_buf))
1449 		elog(ERROR, "could not stat file \"%s\": %m", path);
1450 
1451 	/* and read the file into a palloc'd string */
1452 	filebuf = (char *) palloc(stat_buf.st_size + 1);
1453 	if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1454 		elog(ERROR, "could not read file \"%s\": %m", path);
1455 
1456 	filebuf[stat_buf.st_size] = '\0';
1457 
1458 	FreeFile(f);
1459 
1460 	/*
1461 	 * Construct a snapshot struct by parsing the file content.
1462 	 */
1463 	memset(&snapshot, 0, sizeof(snapshot));
1464 
1465 	parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1466 	src_pid = parseIntFromText("pid:", &filebuf, path);
1467 	/* we abuse parseXidFromText a bit here ... */
1468 	src_dbid = parseXidFromText("dbid:", &filebuf, path);
1469 	src_isolevel = parseIntFromText("iso:", &filebuf, path);
1470 	src_readonly = parseIntFromText("ro:", &filebuf, path);
1471 
1472 	snapshot.snapshot_type = SNAPSHOT_MVCC;
1473 
1474 	snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1475 	snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1476 
1477 	snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1478 
1479 	/* sanity-check the xid count before palloc */
1480 	if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1481 		ereport(ERROR,
1482 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1483 				 errmsg("invalid snapshot data in file \"%s\"", path)));
1484 
1485 	snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1486 	for (i = 0; i < xcnt; i++)
1487 		snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1488 
1489 	snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1490 
1491 	if (!snapshot.suboverflowed)
1492 	{
1493 		snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1494 
1495 		/* sanity-check the xid count before palloc */
1496 		if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1497 			ereport(ERROR,
1498 					(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1499 					 errmsg("invalid snapshot data in file \"%s\"", path)));
1500 
1501 		snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1502 		for (i = 0; i < xcnt; i++)
1503 			snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1504 	}
1505 	else
1506 	{
1507 		snapshot.subxcnt = 0;
1508 		snapshot.subxip = NULL;
1509 	}
1510 
1511 	snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1512 
1513 	/*
1514 	 * Do some additional sanity checking, just to protect ourselves.  We
1515 	 * don't trouble to check the array elements, just the most critical
1516 	 * fields.
1517 	 */
1518 	if (!VirtualTransactionIdIsValid(src_vxid) ||
1519 		!OidIsValid(src_dbid) ||
1520 		!TransactionIdIsNormal(snapshot.xmin) ||
1521 		!TransactionIdIsNormal(snapshot.xmax))
1522 		ereport(ERROR,
1523 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1524 				 errmsg("invalid snapshot data in file \"%s\"", path)));
1525 
1526 	/*
1527 	 * If we're serializable, the source transaction must be too, otherwise
1528 	 * predicate.c has problems (SxactGlobalXmin could go backwards).  Also, a
1529 	 * non-read-only transaction can't adopt a snapshot from a read-only
1530 	 * transaction, as predicate.c handles the cases very differently.
1531 	 */
1532 	if (IsolationIsSerializable())
1533 	{
1534 		if (src_isolevel != XACT_SERIALIZABLE)
1535 			ereport(ERROR,
1536 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1537 					 errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1538 		if (src_readonly && !XactReadOnly)
1539 			ereport(ERROR,
1540 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1541 					 errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1542 	}
1543 
1544 	/*
1545 	 * We cannot import a snapshot that was taken in a different database,
1546 	 * because vacuum calculates OldestXmin on a per-database basis; so the
1547 	 * source transaction's xmin doesn't protect us from data loss.  This
1548 	 * restriction could be removed if the source transaction were to mark its
1549 	 * xmin as being globally applicable.  But that would require some
1550 	 * additional syntax, since that has to be known when the snapshot is
1551 	 * initially taken.  (See pgsql-hackers discussion of 2011-10-21.)
1552 	 */
1553 	if (src_dbid != MyDatabaseId)
1554 		ereport(ERROR,
1555 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1556 				 errmsg("cannot import a snapshot from a different database")));
1557 
1558 	/* OK, install the snapshot */
1559 	SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1560 }
1561 
1562 /*
1563  * XactHasExportedSnapshots
1564  *		Test whether current transaction has exported any snapshots.
1565  */
1566 bool
XactHasExportedSnapshots(void)1567 XactHasExportedSnapshots(void)
1568 {
1569 	return (exportedSnapshots != NIL);
1570 }
1571 
1572 /*
1573  * DeleteAllExportedSnapshotFiles
1574  *		Clean up any files that have been left behind by a crashed backend
1575  *		that had exported snapshots before it died.
1576  *
1577  * This should be called during database startup or crash recovery.
1578  */
1579 void
DeleteAllExportedSnapshotFiles(void)1580 DeleteAllExportedSnapshotFiles(void)
1581 {
1582 	char		buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1583 	DIR		   *s_dir;
1584 	struct dirent *s_de;
1585 
1586 	/*
1587 	 * Problems in reading the directory, or unlinking files, are reported at
1588 	 * LOG level.  Since we're running in the startup process, ERROR level
1589 	 * would prevent database start, and it's not important enough for that.
1590 	 */
1591 	s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR);
1592 
1593 	while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL)
1594 	{
1595 		if (strcmp(s_de->d_name, ".") == 0 ||
1596 			strcmp(s_de->d_name, "..") == 0)
1597 			continue;
1598 
1599 		snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1600 
1601 		if (unlink(buf) != 0)
1602 			ereport(LOG,
1603 					(errcode_for_file_access(),
1604 					 errmsg("could not remove file \"%s\": %m", buf)));
1605 	}
1606 
1607 	FreeDir(s_dir);
1608 }
1609 
1610 /*
1611  * ThereAreNoPriorRegisteredSnapshots
1612  *		Is the registered snapshot count less than or equal to one?
1613  *
1614  * Don't use this to settle important decisions.  While zero registrations and
1615  * no ActiveSnapshot would confirm a certain idleness, the system makes no
1616  * guarantees about the significance of one registered snapshot.
1617  */
1618 bool
ThereAreNoPriorRegisteredSnapshots(void)1619 ThereAreNoPriorRegisteredSnapshots(void)
1620 {
1621 	if (pairingheap_is_empty(&RegisteredSnapshots) ||
1622 		pairingheap_is_singular(&RegisteredSnapshots))
1623 		return true;
1624 
1625 	return false;
1626 }
1627 
1628 
1629 /*
1630  * Return a timestamp that is exactly on a minute boundary.
1631  *
1632  * If the argument is already aligned, return that value, otherwise move to
1633  * the next minute boundary following the given time.
1634  */
1635 static TimestampTz
AlignTimestampToMinuteBoundary(TimestampTz ts)1636 AlignTimestampToMinuteBoundary(TimestampTz ts)
1637 {
1638 	TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1639 
1640 	return retval - (retval % USECS_PER_MINUTE);
1641 }
1642 
1643 /*
1644  * Get current timestamp for snapshots
1645  *
1646  * This is basically GetCurrentTimestamp(), but with a guarantee that
1647  * the result never moves backward.
1648  */
1649 TimestampTz
GetSnapshotCurrentTimestamp(void)1650 GetSnapshotCurrentTimestamp(void)
1651 {
1652 	TimestampTz now = GetCurrentTimestamp();
1653 
1654 	/*
1655 	 * Don't let time move backward; if it hasn't advanced, use the old value.
1656 	 */
1657 	SpinLockAcquire(&oldSnapshotControl->mutex_current);
1658 	if (now <= oldSnapshotControl->current_timestamp)
1659 		now = oldSnapshotControl->current_timestamp;
1660 	else
1661 		oldSnapshotControl->current_timestamp = now;
1662 	SpinLockRelease(&oldSnapshotControl->mutex_current);
1663 
1664 	return now;
1665 }
1666 
1667 /*
1668  * Get timestamp through which vacuum may have processed based on last stored
1669  * value for threshold_timestamp.
1670  *
1671  * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1672  * that ever changes, we could get rid of the spinlock here.
1673  */
1674 TimestampTz
GetOldSnapshotThresholdTimestamp(void)1675 GetOldSnapshotThresholdTimestamp(void)
1676 {
1677 	TimestampTz threshold_timestamp;
1678 
1679 	SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1680 	threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1681 	SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1682 
1683 	return threshold_timestamp;
1684 }
1685 
1686 void
SetOldSnapshotThresholdTimestamp(TimestampTz ts,TransactionId xlimit)1687 SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1688 {
1689 	SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1690 	Assert(oldSnapshotControl->threshold_timestamp <= ts);
1691 	Assert(TransactionIdPrecedesOrEquals(oldSnapshotControl->threshold_xid, xlimit));
1692 	oldSnapshotControl->threshold_timestamp = ts;
1693 	oldSnapshotControl->threshold_xid = xlimit;
1694 	SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1695 }
1696 
1697 /*
1698  * XXX: Magic to keep old_snapshot_threshold tests appear "working". They
1699  * currently are broken, and discussion of what to do about them is
1700  * ongoing. See
1701  * https://www.postgresql.org/message-id/20200403001235.e6jfdll3gh2ygbuc%40alap3.anarazel.de
1702  */
1703 void
SnapshotTooOldMagicForTest(void)1704 SnapshotTooOldMagicForTest(void)
1705 {
1706 	TimestampTz ts = GetSnapshotCurrentTimestamp();
1707 
1708 	Assert(old_snapshot_threshold == 0);
1709 
1710 	ts -= 5 * USECS_PER_SEC;
1711 
1712 	SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1713 	oldSnapshotControl->threshold_timestamp = ts;
1714 	SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1715 }
1716 
1717 /*
1718  * If there is a valid mapping for the timestamp, set *xlimitp to
1719  * that. Returns whether there is such a mapping.
1720  */
1721 static bool
GetOldSnapshotFromTimeMapping(TimestampTz ts,TransactionId * xlimitp)1722 GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp)
1723 {
1724 	bool		in_mapping = false;
1725 
1726 	Assert(ts == AlignTimestampToMinuteBoundary(ts));
1727 
1728 	LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1729 
1730 	if (oldSnapshotControl->count_used > 0
1731 		&& ts >= oldSnapshotControl->head_timestamp)
1732 	{
1733 		int			offset;
1734 
1735 		offset = ((ts - oldSnapshotControl->head_timestamp)
1736 				  / USECS_PER_MINUTE);
1737 		if (offset > oldSnapshotControl->count_used - 1)
1738 			offset = oldSnapshotControl->count_used - 1;
1739 		offset = (oldSnapshotControl->head_offset + offset)
1740 			% OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1741 
1742 		*xlimitp = oldSnapshotControl->xid_by_minute[offset];
1743 
1744 		in_mapping = true;
1745 	}
1746 
1747 	LWLockRelease(OldSnapshotTimeMapLock);
1748 
1749 	return in_mapping;
1750 }
1751 
1752 /*
1753  * TransactionIdLimitedForOldSnapshots
1754  *
1755  * Apply old snapshot limit.  This is intended to be called for page pruning
1756  * and table vacuuming, to allow old_snapshot_threshold to override the normal
1757  * global xmin value.  Actual testing for snapshot too old will be based on
1758  * whether a snapshot timestamp is prior to the threshold timestamp set in
1759  * this function.
1760  *
1761  * If the limited horizon allows a cleanup action that otherwise would not be
1762  * possible, SetOldSnapshotThresholdTimestamp(*limit_ts, *limit_xid) needs to
1763  * be called before that cleanup action.
1764  */
1765 bool
TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,Relation relation,TransactionId * limit_xid,TimestampTz * limit_ts)1766 TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1767 									Relation relation,
1768 									TransactionId *limit_xid,
1769 									TimestampTz *limit_ts)
1770 {
1771 	TimestampTz ts;
1772 	TransactionId xlimit = recentXmin;
1773 	TransactionId latest_xmin;
1774 	TimestampTz next_map_update_ts;
1775 	TransactionId threshold_timestamp;
1776 	TransactionId threshold_xid;
1777 
1778 	Assert(TransactionIdIsNormal(recentXmin));
1779 	Assert(OldSnapshotThresholdActive());
1780 	Assert(limit_ts != NULL && limit_xid != NULL);
1781 
1782 	/*
1783 	 * TestForOldSnapshot() assumes early pruning advances the page LSN, so we
1784 	 * can't prune early when skipping WAL.
1785 	 */
1786 	if (!RelationAllowsEarlyPruning(relation) || !RelationNeedsWAL(relation))
1787 		return false;
1788 
1789 	ts = GetSnapshotCurrentTimestamp();
1790 
1791 	SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1792 	latest_xmin = oldSnapshotControl->latest_xmin;
1793 	next_map_update_ts = oldSnapshotControl->next_map_update;
1794 	SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1795 
1796 	/*
1797 	 * Zero threshold always overrides to latest xmin, if valid.  Without some
1798 	 * heuristic it will find its own snapshot too old on, for example, a
1799 	 * simple UPDATE -- which would make it useless for most testing, but
1800 	 * there is no principled way to ensure that it doesn't fail in this way.
1801 	 * Use a five-second delay to try to get useful testing behavior, but this
1802 	 * may need adjustment.
1803 	 */
1804 	if (old_snapshot_threshold == 0)
1805 	{
1806 		if (TransactionIdPrecedes(latest_xmin, MyProc->xmin)
1807 			&& TransactionIdFollows(latest_xmin, xlimit))
1808 			xlimit = latest_xmin;
1809 
1810 		ts -= 5 * USECS_PER_SEC;
1811 	}
1812 	else
1813 	{
1814 		ts = AlignTimestampToMinuteBoundary(ts)
1815 			- (old_snapshot_threshold * USECS_PER_MINUTE);
1816 
1817 		/* Check for fast exit without LW locking. */
1818 		SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1819 		threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1820 		threshold_xid = oldSnapshotControl->threshold_xid;
1821 		SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1822 
1823 		if (ts == threshold_timestamp)
1824 		{
1825 			/*
1826 			 * Current timestamp is in same bucket as the last limit that was
1827 			 * applied. Reuse.
1828 			 */
1829 			xlimit = threshold_xid;
1830 		}
1831 		else if (ts == next_map_update_ts)
1832 		{
1833 			/*
1834 			 * FIXME: This branch is super iffy - but that should probably
1835 			 * fixed separately.
1836 			 */
1837 			xlimit = latest_xmin;
1838 		}
1839 		else if (GetOldSnapshotFromTimeMapping(ts, &xlimit))
1840 		{
1841 		}
1842 
1843 		/*
1844 		 * Failsafe protection against vacuuming work of active transaction.
1845 		 *
1846 		 * This is not an assertion because we avoid the spinlock for
1847 		 * performance, leaving open the possibility that xlimit could advance
1848 		 * and be more current; but it seems prudent to apply this limit.  It
1849 		 * might make pruning a tiny bit less aggressive than it could be, but
1850 		 * protects against data loss bugs.
1851 		 */
1852 		if (TransactionIdIsNormal(latest_xmin)
1853 			&& TransactionIdPrecedes(latest_xmin, xlimit))
1854 			xlimit = latest_xmin;
1855 	}
1856 
1857 	if (TransactionIdIsValid(xlimit) &&
1858 		TransactionIdFollowsOrEquals(xlimit, recentXmin))
1859 	{
1860 		*limit_ts = ts;
1861 		*limit_xid = xlimit;
1862 
1863 		return true;
1864 	}
1865 
1866 	return false;
1867 }
1868 
1869 /*
1870  * Take care of the circular buffer that maps time to xid.
1871  */
1872 void
MaintainOldSnapshotTimeMapping(TimestampTz whenTaken,TransactionId xmin)1873 MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1874 {
1875 	TimestampTz ts;
1876 	TransactionId latest_xmin;
1877 	TimestampTz update_ts;
1878 	bool		map_update_required = false;
1879 
1880 	/* Never call this function when old snapshot checking is disabled. */
1881 	Assert(old_snapshot_threshold >= 0);
1882 
1883 	ts = AlignTimestampToMinuteBoundary(whenTaken);
1884 
1885 	/*
1886 	 * Keep track of the latest xmin seen by any process. Update mapping with
1887 	 * a new value when we have crossed a bucket boundary.
1888 	 */
1889 	SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1890 	latest_xmin = oldSnapshotControl->latest_xmin;
1891 	update_ts = oldSnapshotControl->next_map_update;
1892 	if (ts > update_ts)
1893 	{
1894 		oldSnapshotControl->next_map_update = ts;
1895 		map_update_required = true;
1896 	}
1897 	if (TransactionIdFollows(xmin, latest_xmin))
1898 		oldSnapshotControl->latest_xmin = xmin;
1899 	SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1900 
1901 	/* We only needed to update the most recent xmin value. */
1902 	if (!map_update_required)
1903 		return;
1904 
1905 	/* No further tracking needed for 0 (used for testing). */
1906 	if (old_snapshot_threshold == 0)
1907 		return;
1908 
1909 	/*
1910 	 * We don't want to do something stupid with unusual values, but we don't
1911 	 * want to litter the log with warnings or break otherwise normal
1912 	 * processing for this feature; so if something seems unreasonable, just
1913 	 * log at DEBUG level and return without doing anything.
1914 	 */
1915 	if (whenTaken < 0)
1916 	{
1917 		elog(DEBUG1,
1918 			 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1919 			 (long) whenTaken);
1920 		return;
1921 	}
1922 	if (!TransactionIdIsNormal(xmin))
1923 	{
1924 		elog(DEBUG1,
1925 			 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1926 			 (unsigned long) xmin);
1927 		return;
1928 	}
1929 
1930 	LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1931 
1932 	Assert(oldSnapshotControl->head_offset >= 0);
1933 	Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1934 	Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1935 	Assert(oldSnapshotControl->count_used >= 0);
1936 	Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1937 
1938 	if (oldSnapshotControl->count_used == 0)
1939 	{
1940 		/* set up first entry for empty mapping */
1941 		oldSnapshotControl->head_offset = 0;
1942 		oldSnapshotControl->head_timestamp = ts;
1943 		oldSnapshotControl->count_used = 1;
1944 		oldSnapshotControl->xid_by_minute[0] = xmin;
1945 	}
1946 	else if (ts < oldSnapshotControl->head_timestamp)
1947 	{
1948 		/* old ts; log it at DEBUG */
1949 		LWLockRelease(OldSnapshotTimeMapLock);
1950 		elog(DEBUG1,
1951 			 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1952 			 (long) whenTaken);
1953 		return;
1954 	}
1955 	else if (ts <= (oldSnapshotControl->head_timestamp +
1956 					((oldSnapshotControl->count_used - 1)
1957 					 * USECS_PER_MINUTE)))
1958 	{
1959 		/* existing mapping; advance xid if possible */
1960 		int			bucket = (oldSnapshotControl->head_offset
1961 							  + ((ts - oldSnapshotControl->head_timestamp)
1962 								 / USECS_PER_MINUTE))
1963 		% OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1964 
1965 		if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1966 			oldSnapshotControl->xid_by_minute[bucket] = xmin;
1967 	}
1968 	else
1969 	{
1970 		/* We need a new bucket, but it might not be the very next one. */
1971 		int			distance_to_new_tail;
1972 		int			distance_to_current_tail;
1973 		int			advance;
1974 
1975 		/*
1976 		 * Our goal is for the new "tail" of the mapping, that is, the entry
1977 		 * which is newest and thus furthest from the "head" entry, to
1978 		 * correspond to "ts". Since there's one entry per minute, the
1979 		 * distance between the current head and the new tail is just the
1980 		 * number of minutes of difference between ts and the current
1981 		 * head_timestamp.
1982 		 *
1983 		 * The distance from the current head to the current tail is one less
1984 		 * than the number of entries in the mapping, because the entry at the
1985 		 * head_offset is for 0 minutes after head_timestamp.
1986 		 *
1987 		 * The difference between these two values is the number of minutes by
1988 		 * which we need to advance the mapping, either adding new entries or
1989 		 * rotating old ones out.
1990 		 */
1991 		distance_to_new_tail =
1992 			(ts - oldSnapshotControl->head_timestamp) / USECS_PER_MINUTE;
1993 		distance_to_current_tail =
1994 			oldSnapshotControl->count_used - 1;
1995 		advance = distance_to_new_tail - distance_to_current_tail;
1996 		Assert(advance > 0);
1997 
1998 		if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1999 		{
2000 			/* Advance is so far that all old data is junk; start over. */
2001 			oldSnapshotControl->head_offset = 0;
2002 			oldSnapshotControl->count_used = 1;
2003 			oldSnapshotControl->xid_by_minute[0] = xmin;
2004 			oldSnapshotControl->head_timestamp = ts;
2005 		}
2006 		else
2007 		{
2008 			/* Store the new value in one or more buckets. */
2009 			int			i;
2010 
2011 			for (i = 0; i < advance; i++)
2012 			{
2013 				if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
2014 				{
2015 					/* Map full and new value replaces old head. */
2016 					int			old_head = oldSnapshotControl->head_offset;
2017 
2018 					if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
2019 						oldSnapshotControl->head_offset = 0;
2020 					else
2021 						oldSnapshotControl->head_offset = old_head + 1;
2022 					oldSnapshotControl->xid_by_minute[old_head] = xmin;
2023 					oldSnapshotControl->head_timestamp += USECS_PER_MINUTE;
2024 				}
2025 				else
2026 				{
2027 					/* Extend map to unused entry. */
2028 					int			new_tail = (oldSnapshotControl->head_offset
2029 											+ oldSnapshotControl->count_used)
2030 					% OLD_SNAPSHOT_TIME_MAP_ENTRIES;
2031 
2032 					oldSnapshotControl->count_used++;
2033 					oldSnapshotControl->xid_by_minute[new_tail] = xmin;
2034 				}
2035 			}
2036 		}
2037 	}
2038 
2039 	LWLockRelease(OldSnapshotTimeMapLock);
2040 }
2041 
2042 
2043 /*
2044  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
2045  * access to behave just like it did at a certain point in the past.
2046  *
2047  * Needed for logical decoding.
2048  */
2049 void
SetupHistoricSnapshot(Snapshot historic_snapshot,HTAB * tuplecids)2050 SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
2051 {
2052 	Assert(historic_snapshot != NULL);
2053 
2054 	/* setup the timetravel snapshot */
2055 	HistoricSnapshot = historic_snapshot;
2056 
2057 	/* setup (cmin, cmax) lookup hash */
2058 	tuplecid_data = tuplecids;
2059 }
2060 
2061 
2062 /*
2063  * Make catalog snapshots behave normally again.
2064  */
2065 void
TeardownHistoricSnapshot(bool is_error)2066 TeardownHistoricSnapshot(bool is_error)
2067 {
2068 	HistoricSnapshot = NULL;
2069 	tuplecid_data = NULL;
2070 }
2071 
2072 bool
HistoricSnapshotActive(void)2073 HistoricSnapshotActive(void)
2074 {
2075 	return HistoricSnapshot != NULL;
2076 }
2077 
2078 HTAB *
HistoricSnapshotGetTupleCids(void)2079 HistoricSnapshotGetTupleCids(void)
2080 {
2081 	Assert(HistoricSnapshotActive());
2082 	return tuplecid_data;
2083 }
2084 
2085 /*
2086  * EstimateSnapshotSpace
2087  *		Returns the size needed to store the given snapshot.
2088  *
2089  * We are exporting only required fields from the Snapshot, stored in
2090  * SerializedSnapshotData.
2091  */
2092 Size
EstimateSnapshotSpace(Snapshot snap)2093 EstimateSnapshotSpace(Snapshot snap)
2094 {
2095 	Size		size;
2096 
2097 	Assert(snap != InvalidSnapshot);
2098 	Assert(snap->snapshot_type == SNAPSHOT_MVCC);
2099 
2100 	/* We allocate any XID arrays needed in the same palloc block. */
2101 	size = add_size(sizeof(SerializedSnapshotData),
2102 					mul_size(snap->xcnt, sizeof(TransactionId)));
2103 	if (snap->subxcnt > 0 &&
2104 		(!snap->suboverflowed || snap->takenDuringRecovery))
2105 		size = add_size(size,
2106 						mul_size(snap->subxcnt, sizeof(TransactionId)));
2107 
2108 	return size;
2109 }
2110 
2111 /*
2112  * SerializeSnapshot
2113  *		Dumps the serialized snapshot (extracted from given snapshot) onto the
2114  *		memory location at start_address.
2115  */
2116 void
SerializeSnapshot(Snapshot snapshot,char * start_address)2117 SerializeSnapshot(Snapshot snapshot, char *start_address)
2118 {
2119 	SerializedSnapshotData serialized_snapshot;
2120 
2121 	Assert(snapshot->subxcnt >= 0);
2122 
2123 	/* Copy all required fields */
2124 	serialized_snapshot.xmin = snapshot->xmin;
2125 	serialized_snapshot.xmax = snapshot->xmax;
2126 	serialized_snapshot.xcnt = snapshot->xcnt;
2127 	serialized_snapshot.subxcnt = snapshot->subxcnt;
2128 	serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2129 	serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2130 	serialized_snapshot.curcid = snapshot->curcid;
2131 	serialized_snapshot.whenTaken = snapshot->whenTaken;
2132 	serialized_snapshot.lsn = snapshot->lsn;
2133 
2134 	/*
2135 	 * Ignore the SubXID array if it has overflowed, unless the snapshot was
2136 	 * taken during recovery - in that case, top-level XIDs are in subxip as
2137 	 * well, and we mustn't lose them.
2138 	 */
2139 	if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2140 		serialized_snapshot.subxcnt = 0;
2141 
2142 	/* Copy struct to possibly-unaligned buffer */
2143 	memcpy(start_address,
2144 		   &serialized_snapshot, sizeof(SerializedSnapshotData));
2145 
2146 	/* Copy XID array */
2147 	if (snapshot->xcnt > 0)
2148 		memcpy((TransactionId *) (start_address +
2149 								  sizeof(SerializedSnapshotData)),
2150 			   snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2151 
2152 	/*
2153 	 * Copy SubXID array. Don't bother to copy it if it had overflowed,
2154 	 * though, because it's not used anywhere in that case. Except if it's a
2155 	 * snapshot taken during recovery; all the top-level XIDs are in subxip as
2156 	 * well in that case, so we mustn't lose them.
2157 	 */
2158 	if (serialized_snapshot.subxcnt > 0)
2159 	{
2160 		Size		subxipoff = sizeof(SerializedSnapshotData) +
2161 		snapshot->xcnt * sizeof(TransactionId);
2162 
2163 		memcpy((TransactionId *) (start_address + subxipoff),
2164 			   snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2165 	}
2166 }
2167 
2168 /*
2169  * RestoreSnapshot
2170  *		Restore a serialized snapshot from the specified address.
2171  *
2172  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2173  * to 0.  The returned snapshot has the copied flag set.
2174  */
2175 Snapshot
RestoreSnapshot(char * start_address)2176 RestoreSnapshot(char *start_address)
2177 {
2178 	SerializedSnapshotData serialized_snapshot;
2179 	Size		size;
2180 	Snapshot	snapshot;
2181 	TransactionId *serialized_xids;
2182 
2183 	memcpy(&serialized_snapshot, start_address,
2184 		   sizeof(SerializedSnapshotData));
2185 	serialized_xids = (TransactionId *)
2186 		(start_address + sizeof(SerializedSnapshotData));
2187 
2188 	/* We allocate any XID arrays needed in the same palloc block. */
2189 	size = sizeof(SnapshotData)
2190 		+ serialized_snapshot.xcnt * sizeof(TransactionId)
2191 		+ serialized_snapshot.subxcnt * sizeof(TransactionId);
2192 
2193 	/* Copy all required fields */
2194 	snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2195 	snapshot->snapshot_type = SNAPSHOT_MVCC;
2196 	snapshot->xmin = serialized_snapshot.xmin;
2197 	snapshot->xmax = serialized_snapshot.xmax;
2198 	snapshot->xip = NULL;
2199 	snapshot->xcnt = serialized_snapshot.xcnt;
2200 	snapshot->subxip = NULL;
2201 	snapshot->subxcnt = serialized_snapshot.subxcnt;
2202 	snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2203 	snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2204 	snapshot->curcid = serialized_snapshot.curcid;
2205 	snapshot->whenTaken = serialized_snapshot.whenTaken;
2206 	snapshot->lsn = serialized_snapshot.lsn;
2207 	snapshot->snapXactCompletionCount = 0;
2208 
2209 	/* Copy XIDs, if present. */
2210 	if (serialized_snapshot.xcnt > 0)
2211 	{
2212 		snapshot->xip = (TransactionId *) (snapshot + 1);
2213 		memcpy(snapshot->xip, serialized_xids,
2214 			   serialized_snapshot.xcnt * sizeof(TransactionId));
2215 	}
2216 
2217 	/* Copy SubXIDs, if present. */
2218 	if (serialized_snapshot.subxcnt > 0)
2219 	{
2220 		snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2221 			serialized_snapshot.xcnt;
2222 		memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2223 			   serialized_snapshot.subxcnt * sizeof(TransactionId));
2224 	}
2225 
2226 	/* Set the copied flag so that the caller will set refcounts correctly. */
2227 	snapshot->regd_count = 0;
2228 	snapshot->active_count = 0;
2229 	snapshot->copied = true;
2230 
2231 	return snapshot;
2232 }
2233 
2234 /*
2235  * Install a restored snapshot as the transaction snapshot.
2236  *
2237  * The second argument is of type void * so that snapmgr.h need not include
2238  * the declaration for PGPROC.
2239  */
2240 void
RestoreTransactionSnapshot(Snapshot snapshot,void * source_pgproc)2241 RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
2242 {
2243 	SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc);
2244 }
2245 
2246 /*
2247  * XidInMVCCSnapshot
2248  *		Is the given XID still-in-progress according to the snapshot?
2249  *
2250  * Note: GetSnapshotData never stores either top xid or subxids of our own
2251  * backend into a snapshot, so these xids will not be reported as "running"
2252  * by this function.  This is OK for current uses, because we always check
2253  * TransactionIdIsCurrentTransactionId first, except when it's known the
2254  * XID could not be ours anyway.
2255  */
2256 bool
XidInMVCCSnapshot(TransactionId xid,Snapshot snapshot)2257 XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
2258 {
2259 	uint32		i;
2260 
2261 	/*
2262 	 * Make a quick range check to eliminate most XIDs without looking at the
2263 	 * xip arrays.  Note that this is OK even if we convert a subxact XID to
2264 	 * its parent below, because a subxact with XID < xmin has surely also got
2265 	 * a parent with XID < xmin, while one with XID >= xmax must belong to a
2266 	 * parent that was not yet committed at the time of this snapshot.
2267 	 */
2268 
2269 	/* Any xid < xmin is not in-progress */
2270 	if (TransactionIdPrecedes(xid, snapshot->xmin))
2271 		return false;
2272 	/* Any xid >= xmax is in-progress */
2273 	if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
2274 		return true;
2275 
2276 	/*
2277 	 * Snapshot information is stored slightly differently in snapshots taken
2278 	 * during recovery.
2279 	 */
2280 	if (!snapshot->takenDuringRecovery)
2281 	{
2282 		/*
2283 		 * If the snapshot contains full subxact data, the fastest way to
2284 		 * check things is just to compare the given XID against both subxact
2285 		 * XIDs and top-level XIDs.  If the snapshot overflowed, we have to
2286 		 * use pg_subtrans to convert a subxact XID to its parent XID, but
2287 		 * then we need only look at top-level XIDs not subxacts.
2288 		 */
2289 		if (!snapshot->suboverflowed)
2290 		{
2291 			/* we have full data, so search subxip */
2292 			int32		j;
2293 
2294 			for (j = 0; j < snapshot->subxcnt; j++)
2295 			{
2296 				if (TransactionIdEquals(xid, snapshot->subxip[j]))
2297 					return true;
2298 			}
2299 
2300 			/* not there, fall through to search xip[] */
2301 		}
2302 		else
2303 		{
2304 			/*
2305 			 * Snapshot overflowed, so convert xid to top-level.  This is safe
2306 			 * because we eliminated too-old XIDs above.
2307 			 */
2308 			xid = SubTransGetTopmostTransaction(xid);
2309 
2310 			/*
2311 			 * If xid was indeed a subxact, we might now have an xid < xmin,
2312 			 * so recheck to avoid an array scan.  No point in rechecking
2313 			 * xmax.
2314 			 */
2315 			if (TransactionIdPrecedes(xid, snapshot->xmin))
2316 				return false;
2317 		}
2318 
2319 		for (i = 0; i < snapshot->xcnt; i++)
2320 		{
2321 			if (TransactionIdEquals(xid, snapshot->xip[i]))
2322 				return true;
2323 		}
2324 	}
2325 	else
2326 	{
2327 		int32		j;
2328 
2329 		/*
2330 		 * In recovery we store all xids in the subxact array because it is by
2331 		 * far the bigger array, and we mostly don't know which xids are
2332 		 * top-level and which are subxacts. The xip array is empty.
2333 		 *
2334 		 * We start by searching subtrans, if we overflowed.
2335 		 */
2336 		if (snapshot->suboverflowed)
2337 		{
2338 			/*
2339 			 * Snapshot overflowed, so convert xid to top-level.  This is safe
2340 			 * because we eliminated too-old XIDs above.
2341 			 */
2342 			xid = SubTransGetTopmostTransaction(xid);
2343 
2344 			/*
2345 			 * If xid was indeed a subxact, we might now have an xid < xmin,
2346 			 * so recheck to avoid an array scan.  No point in rechecking
2347 			 * xmax.
2348 			 */
2349 			if (TransactionIdPrecedes(xid, snapshot->xmin))
2350 				return false;
2351 		}
2352 
2353 		/*
2354 		 * We now have either a top-level xid higher than xmin or an
2355 		 * indeterminate xid. We don't know whether it's top level or subxact
2356 		 * but it doesn't matter. If it's present, the xid is visible.
2357 		 */
2358 		for (j = 0; j < snapshot->subxcnt; j++)
2359 		{
2360 			if (TransactionIdEquals(xid, snapshot->subxip[j]))
2361 				return true;
2362 		}
2363 	}
2364 
2365 	return false;
2366 }
2367