1 /*-------------------------------------------------------------------------
2 *
3 * snapmgr.c
4 * PostgreSQL snapshot manager
5 *
6 * We keep track of snapshots in two ways: those "registered" by resowner.c,
7 * and the "active snapshot" stack. All snapshots in either of them live in
8 * persistent memory. When a snapshot is no longer in any of these lists
9 * (tracked by separate refcounts on each snapshot), its memory can be freed.
10 *
11 * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12 * regd_count and list it in RegisteredSnapshots, but this reference is not
13 * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14 * to track this snapshot reference, but that introduces logical circularity
15 * and thus makes it impossible to clean up in a sane fashion. It's better to
16 * handle this reference as an internally-tracked registration, so that this
17 * module is entirely lower-level than ResourceOwners.
18 *
19 * Likewise, any snapshots that have been exported by pg_export_snapshot
20 * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21 * tracked by any resource owner.
22 *
23 * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24 * is valid, but is not tracked by any resource owner.
25 *
26 * The same is true for historic snapshots used during logical decoding,
27 * their lifetime is managed separately (as they live longer than one xact.c
28 * transaction).
29 *
30 * These arrangements let us reset MyProc->xmin when there are no snapshots
31 * referenced by this transaction, and advance it when the one with oldest
32 * Xmin is no longer referenced. For simplicity however, only registered
33 * snapshots not active snapshots participate in tracking which one is oldest;
34 * we don't try to change MyProc->xmin except when the active-snapshot
35 * stack is empty.
36 *
37 *
38 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
39 * Portions Copyright (c) 1994, Regents of the University of California
40 *
41 * IDENTIFICATION
42 * src/backend/utils/time/snapmgr.c
43 *
44 *-------------------------------------------------------------------------
45 */
46 #include "postgres.h"
47
48 #include <sys/stat.h>
49 #include <unistd.h>
50
51 #include "access/subtrans.h"
52 #include "access/transam.h"
53 #include "access/xact.h"
54 #include "access/xlog.h"
55 #include "catalog/catalog.h"
56 #include "datatype/timestamp.h"
57 #include "lib/pairingheap.h"
58 #include "miscadmin.h"
59 #include "storage/predicate.h"
60 #include "storage/proc.h"
61 #include "storage/procarray.h"
62 #include "storage/sinval.h"
63 #include "storage/sinvaladt.h"
64 #include "storage/spin.h"
65 #include "utils/builtins.h"
66 #include "utils/memutils.h"
67 #include "utils/old_snapshot.h"
68 #include "utils/rel.h"
69 #include "utils/resowner_private.h"
70 #include "utils/snapmgr.h"
71 #include "utils/syscache.h"
72 #include "utils/timestamp.h"
73
74
75 /*
76 * GUC parameters
77 */
78 int old_snapshot_threshold; /* number of minutes, -1 disables */
79
80 volatile OldSnapshotControlData *oldSnapshotControl;
81
82
83 /*
84 * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
85 * mode, and to the latest one taken in a read-committed transaction.
86 * SecondarySnapshot is a snapshot that's always up-to-date as of the current
87 * instant, even in transaction-snapshot mode. It should only be used for
88 * special-purpose code (say, RI checking.) CatalogSnapshot points to an
89 * MVCC snapshot intended to be used for catalog scans; we must invalidate it
90 * whenever a system catalog change occurs.
91 *
92 * These SnapshotData structs are static to simplify memory allocation
93 * (see the hack in GetSnapshotData to avoid repeated malloc/free).
94 */
95 static SnapshotData CurrentSnapshotData = {SNAPSHOT_MVCC};
96 static SnapshotData SecondarySnapshotData = {SNAPSHOT_MVCC};
97 SnapshotData CatalogSnapshotData = {SNAPSHOT_MVCC};
98 SnapshotData SnapshotSelfData = {SNAPSHOT_SELF};
99 SnapshotData SnapshotAnyData = {SNAPSHOT_ANY};
100
101 /* Pointers to valid snapshots */
102 static Snapshot CurrentSnapshot = NULL;
103 static Snapshot SecondarySnapshot = NULL;
104 static Snapshot CatalogSnapshot = NULL;
105 static Snapshot HistoricSnapshot = NULL;
106
107 /*
108 * These are updated by GetSnapshotData. We initialize them this way
109 * for the convenience of TransactionIdIsInProgress: even in bootstrap
110 * mode, we don't want it to say that BootstrapTransactionId is in progress.
111 */
112 TransactionId TransactionXmin = FirstNormalTransactionId;
113 TransactionId RecentXmin = FirstNormalTransactionId;
114
115 /* (table, ctid) => (cmin, cmax) mapping during timetravel */
116 static HTAB *tuplecid_data = NULL;
117
118 /*
119 * Elements of the active snapshot stack.
120 *
121 * Each element here accounts for exactly one active_count on SnapshotData.
122 *
123 * NB: the code assumes that elements in this list are in non-increasing
124 * order of as_level; also, the list must be NULL-terminated.
125 */
126 typedef struct ActiveSnapshotElt
127 {
128 Snapshot as_snap;
129 int as_level;
130 struct ActiveSnapshotElt *as_next;
131 } ActiveSnapshotElt;
132
133 /* Top of the stack of active snapshots */
134 static ActiveSnapshotElt *ActiveSnapshot = NULL;
135
136 /* Bottom of the stack of active snapshots */
137 static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
138
139 /*
140 * Currently registered Snapshots. Ordered in a heap by xmin, so that we can
141 * quickly find the one with lowest xmin, to advance our MyProc->xmin.
142 */
143 static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
144 void *arg);
145
146 static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
147
148 /* first GetTransactionSnapshot call in a transaction? */
149 bool FirstSnapshotSet = false;
150
151 /*
152 * Remember the serializable transaction snapshot, if any. We cannot trust
153 * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
154 * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
155 */
156 static Snapshot FirstXactSnapshot = NULL;
157
158 /* Define pathname of exported-snapshot files */
159 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
160
161 /* Structure holding info about exported snapshot. */
162 typedef struct ExportedSnapshot
163 {
164 char *snapfile;
165 Snapshot snapshot;
166 } ExportedSnapshot;
167
168 /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
169 static List *exportedSnapshots = NIL;
170
171 /* Prototypes for local functions */
172 static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
173 static Snapshot CopySnapshot(Snapshot snapshot);
174 static void FreeSnapshot(Snapshot snapshot);
175 static void SnapshotResetXmin(void);
176
177 /*
178 * Snapshot fields to be serialized.
179 *
180 * Only these fields need to be sent to the cooperating backend; the
181 * remaining ones can (and must) be set by the receiver upon restore.
182 */
183 typedef struct SerializedSnapshotData
184 {
185 TransactionId xmin;
186 TransactionId xmax;
187 uint32 xcnt;
188 int32 subxcnt;
189 bool suboverflowed;
190 bool takenDuringRecovery;
191 CommandId curcid;
192 TimestampTz whenTaken;
193 XLogRecPtr lsn;
194 } SerializedSnapshotData;
195
196 Size
SnapMgrShmemSize(void)197 SnapMgrShmemSize(void)
198 {
199 Size size;
200
201 size = offsetof(OldSnapshotControlData, xid_by_minute);
202 if (old_snapshot_threshold > 0)
203 size = add_size(size, mul_size(sizeof(TransactionId),
204 OLD_SNAPSHOT_TIME_MAP_ENTRIES));
205
206 return size;
207 }
208
209 /*
210 * Initialize for managing old snapshot detection.
211 */
212 void
SnapMgrInit(void)213 SnapMgrInit(void)
214 {
215 bool found;
216
217 /*
218 * Create or attach to the OldSnapshotControlData structure.
219 */
220 oldSnapshotControl = (volatile OldSnapshotControlData *)
221 ShmemInitStruct("OldSnapshotControlData",
222 SnapMgrShmemSize(), &found);
223
224 if (!found)
225 {
226 SpinLockInit(&oldSnapshotControl->mutex_current);
227 oldSnapshotControl->current_timestamp = 0;
228 SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
229 oldSnapshotControl->latest_xmin = InvalidTransactionId;
230 oldSnapshotControl->next_map_update = 0;
231 SpinLockInit(&oldSnapshotControl->mutex_threshold);
232 oldSnapshotControl->threshold_timestamp = 0;
233 oldSnapshotControl->threshold_xid = InvalidTransactionId;
234 oldSnapshotControl->head_offset = 0;
235 oldSnapshotControl->head_timestamp = 0;
236 oldSnapshotControl->count_used = 0;
237 }
238 }
239
240 /*
241 * GetTransactionSnapshot
242 * Get the appropriate snapshot for a new query in a transaction.
243 *
244 * Note that the return value may point at static storage that will be modified
245 * by future calls and by CommandCounterIncrement(). Callers should call
246 * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
247 * used very long.
248 */
249 Snapshot
GetTransactionSnapshot(void)250 GetTransactionSnapshot(void)
251 {
252 /*
253 * Return historic snapshot if doing logical decoding. We'll never need a
254 * non-historic transaction snapshot in this (sub-)transaction, so there's
255 * no need to be careful to set one up for later calls to
256 * GetTransactionSnapshot().
257 */
258 if (HistoricSnapshotActive())
259 {
260 Assert(!FirstSnapshotSet);
261 return HistoricSnapshot;
262 }
263
264 /* First call in transaction? */
265 if (!FirstSnapshotSet)
266 {
267 /*
268 * Don't allow catalog snapshot to be older than xact snapshot. Must
269 * do this first to allow the empty-heap Assert to succeed.
270 */
271 InvalidateCatalogSnapshot();
272
273 Assert(pairingheap_is_empty(&RegisteredSnapshots));
274 Assert(FirstXactSnapshot == NULL);
275
276 if (IsInParallelMode())
277 elog(ERROR,
278 "cannot take query snapshot during a parallel operation");
279
280 /*
281 * In transaction-snapshot mode, the first snapshot must live until
282 * end of xact regardless of what the caller does with it, so we must
283 * make a copy of it rather than returning CurrentSnapshotData
284 * directly. Furthermore, if we're running in serializable mode,
285 * predicate.c needs to wrap the snapshot fetch in its own processing.
286 */
287 if (IsolationUsesXactSnapshot())
288 {
289 /* First, create the snapshot in CurrentSnapshotData */
290 if (IsolationIsSerializable())
291 CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
292 else
293 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
294 /* Make a saved copy */
295 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
296 FirstXactSnapshot = CurrentSnapshot;
297 /* Mark it as "registered" in FirstXactSnapshot */
298 FirstXactSnapshot->regd_count++;
299 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
300 }
301 else
302 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
303
304 FirstSnapshotSet = true;
305 return CurrentSnapshot;
306 }
307
308 if (IsolationUsesXactSnapshot())
309 return CurrentSnapshot;
310
311 /* Don't allow catalog snapshot to be older than xact snapshot. */
312 InvalidateCatalogSnapshot();
313
314 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
315
316 return CurrentSnapshot;
317 }
318
319 /*
320 * GetLatestSnapshot
321 * Get a snapshot that is up-to-date as of the current instant,
322 * even if we are executing in transaction-snapshot mode.
323 */
324 Snapshot
GetLatestSnapshot(void)325 GetLatestSnapshot(void)
326 {
327 /*
328 * We might be able to relax this, but nothing that could otherwise work
329 * needs it.
330 */
331 if (IsInParallelMode())
332 elog(ERROR,
333 "cannot update SecondarySnapshot during a parallel operation");
334
335 /*
336 * So far there are no cases requiring support for GetLatestSnapshot()
337 * during logical decoding, but it wouldn't be hard to add if required.
338 */
339 Assert(!HistoricSnapshotActive());
340
341 /* If first call in transaction, go ahead and set the xact snapshot */
342 if (!FirstSnapshotSet)
343 return GetTransactionSnapshot();
344
345 SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
346
347 return SecondarySnapshot;
348 }
349
350 /*
351 * GetOldestSnapshot
352 *
353 * Get the transaction's oldest known snapshot, as judged by the LSN.
354 * Will return NULL if there are no active or registered snapshots.
355 */
356 Snapshot
GetOldestSnapshot(void)357 GetOldestSnapshot(void)
358 {
359 Snapshot OldestRegisteredSnapshot = NULL;
360 XLogRecPtr RegisteredLSN = InvalidXLogRecPtr;
361
362 if (!pairingheap_is_empty(&RegisteredSnapshots))
363 {
364 OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
365 pairingheap_first(&RegisteredSnapshots));
366 RegisteredLSN = OldestRegisteredSnapshot->lsn;
367 }
368
369 if (OldestActiveSnapshot != NULL)
370 {
371 XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
372
373 if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
374 return OldestActiveSnapshot->as_snap;
375 }
376
377 return OldestRegisteredSnapshot;
378 }
379
380 /*
381 * GetCatalogSnapshot
382 * Get a snapshot that is sufficiently up-to-date for scan of the
383 * system catalog with the specified OID.
384 */
385 Snapshot
GetCatalogSnapshot(Oid relid)386 GetCatalogSnapshot(Oid relid)
387 {
388 /*
389 * Return historic snapshot while we're doing logical decoding, so we can
390 * see the appropriate state of the catalog.
391 *
392 * This is the primary reason for needing to reset the system caches after
393 * finishing decoding.
394 */
395 if (HistoricSnapshotActive())
396 return HistoricSnapshot;
397
398 return GetNonHistoricCatalogSnapshot(relid);
399 }
400
401 /*
402 * GetNonHistoricCatalogSnapshot
403 * Get a snapshot that is sufficiently up-to-date for scan of the system
404 * catalog with the specified OID, even while historic snapshots are set
405 * up.
406 */
407 Snapshot
GetNonHistoricCatalogSnapshot(Oid relid)408 GetNonHistoricCatalogSnapshot(Oid relid)
409 {
410 /*
411 * If the caller is trying to scan a relation that has no syscache, no
412 * catcache invalidations will be sent when it is updated. For a few key
413 * relations, snapshot invalidations are sent instead. If we're trying to
414 * scan a relation for which neither catcache nor snapshot invalidations
415 * are sent, we must refresh the snapshot every time.
416 */
417 if (CatalogSnapshot &&
418 !RelationInvalidatesSnapshotsOnly(relid) &&
419 !RelationHasSysCache(relid))
420 InvalidateCatalogSnapshot();
421
422 if (CatalogSnapshot == NULL)
423 {
424 /* Get new snapshot. */
425 CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
426
427 /*
428 * Make sure the catalog snapshot will be accounted for in decisions
429 * about advancing PGPROC->xmin. We could apply RegisterSnapshot, but
430 * that would result in making a physical copy, which is overkill; and
431 * it would also create a dependency on some resource owner, which we
432 * do not want for reasons explained at the head of this file. Instead
433 * just shove the CatalogSnapshot into the pairing heap manually. This
434 * has to be reversed in InvalidateCatalogSnapshot, of course.
435 *
436 * NB: it had better be impossible for this to throw error, since the
437 * CatalogSnapshot pointer is already valid.
438 */
439 pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
440 }
441
442 return CatalogSnapshot;
443 }
444
445 /*
446 * InvalidateCatalogSnapshot
447 * Mark the current catalog snapshot, if any, as invalid
448 *
449 * We could change this API to allow the caller to provide more fine-grained
450 * invalidation details, so that a change to relation A wouldn't prevent us
451 * from using our cached snapshot to scan relation B, but so far there's no
452 * evidence that the CPU cycles we spent tracking such fine details would be
453 * well-spent.
454 */
455 void
InvalidateCatalogSnapshot(void)456 InvalidateCatalogSnapshot(void)
457 {
458 if (CatalogSnapshot)
459 {
460 pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
461 CatalogSnapshot = NULL;
462 SnapshotResetXmin();
463 }
464 }
465
466 /*
467 * InvalidateCatalogSnapshotConditionally
468 * Drop catalog snapshot if it's the only one we have
469 *
470 * This is called when we are about to wait for client input, so we don't
471 * want to continue holding the catalog snapshot if it might mean that the
472 * global xmin horizon can't advance. However, if there are other snapshots
473 * still active or registered, the catalog snapshot isn't likely to be the
474 * oldest one, so we might as well keep it.
475 */
476 void
InvalidateCatalogSnapshotConditionally(void)477 InvalidateCatalogSnapshotConditionally(void)
478 {
479 if (CatalogSnapshot &&
480 ActiveSnapshot == NULL &&
481 pairingheap_is_singular(&RegisteredSnapshots))
482 InvalidateCatalogSnapshot();
483 }
484
485 /*
486 * SnapshotSetCommandId
487 * Propagate CommandCounterIncrement into the static snapshots, if set
488 */
489 void
SnapshotSetCommandId(CommandId curcid)490 SnapshotSetCommandId(CommandId curcid)
491 {
492 if (!FirstSnapshotSet)
493 return;
494
495 if (CurrentSnapshot)
496 CurrentSnapshot->curcid = curcid;
497 if (SecondarySnapshot)
498 SecondarySnapshot->curcid = curcid;
499 /* Should we do the same with CatalogSnapshot? */
500 }
501
502 /*
503 * SetTransactionSnapshot
504 * Set the transaction's snapshot from an imported MVCC snapshot.
505 *
506 * Note that this is very closely tied to GetTransactionSnapshot --- it
507 * must take care of all the same considerations as the first-snapshot case
508 * in GetTransactionSnapshot.
509 */
510 static void
SetTransactionSnapshot(Snapshot sourcesnap,VirtualTransactionId * sourcevxid,int sourcepid,PGPROC * sourceproc)511 SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
512 int sourcepid, PGPROC *sourceproc)
513 {
514 /* Caller should have checked this already */
515 Assert(!FirstSnapshotSet);
516
517 /* Better do this to ensure following Assert succeeds. */
518 InvalidateCatalogSnapshot();
519
520 Assert(pairingheap_is_empty(&RegisteredSnapshots));
521 Assert(FirstXactSnapshot == NULL);
522 Assert(!HistoricSnapshotActive());
523
524 /*
525 * Even though we are not going to use the snapshot it computes, we must
526 * call GetSnapshotData, for two reasons: (1) to be sure that
527 * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
528 * the state for GlobalVis*.
529 */
530 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
531
532 /*
533 * Now copy appropriate fields from the source snapshot.
534 */
535 CurrentSnapshot->xmin = sourcesnap->xmin;
536 CurrentSnapshot->xmax = sourcesnap->xmax;
537 CurrentSnapshot->xcnt = sourcesnap->xcnt;
538 Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
539 memcpy(CurrentSnapshot->xip, sourcesnap->xip,
540 sourcesnap->xcnt * sizeof(TransactionId));
541 CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
542 Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
543 memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
544 sourcesnap->subxcnt * sizeof(TransactionId));
545 CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
546 CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
547 /* NB: curcid should NOT be copied, it's a local matter */
548
549 CurrentSnapshot->snapXactCompletionCount = 0;
550
551 /*
552 * Now we have to fix what GetSnapshotData did with MyProc->xmin and
553 * TransactionXmin. There is a race condition: to make sure we are not
554 * causing the global xmin to go backwards, we have to test that the
555 * source transaction is still running, and that has to be done
556 * atomically. So let procarray.c do it.
557 *
558 * Note: in serializable mode, predicate.c will do this a second time. It
559 * doesn't seem worth contorting the logic here to avoid two calls,
560 * especially since it's not clear that predicate.c *must* do this.
561 */
562 if (sourceproc != NULL)
563 {
564 if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
565 ereport(ERROR,
566 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567 errmsg("could not import the requested snapshot"),
568 errdetail("The source transaction is not running anymore.")));
569 }
570 else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
571 ereport(ERROR,
572 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
573 errmsg("could not import the requested snapshot"),
574 errdetail("The source process with PID %d is not running anymore.",
575 sourcepid)));
576
577 /*
578 * In transaction-snapshot mode, the first snapshot must live until end of
579 * xact, so we must make a copy of it. Furthermore, if we're running in
580 * serializable mode, predicate.c needs to do its own processing.
581 */
582 if (IsolationUsesXactSnapshot())
583 {
584 if (IsolationIsSerializable())
585 SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
586 sourcepid);
587 /* Make a saved copy */
588 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
589 FirstXactSnapshot = CurrentSnapshot;
590 /* Mark it as "registered" in FirstXactSnapshot */
591 FirstXactSnapshot->regd_count++;
592 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
593 }
594
595 FirstSnapshotSet = true;
596 }
597
598 /*
599 * CopySnapshot
600 * Copy the given snapshot.
601 *
602 * The copy is palloc'd in TopTransactionContext and has initial refcounts set
603 * to 0. The returned snapshot has the copied flag set.
604 */
605 static Snapshot
CopySnapshot(Snapshot snapshot)606 CopySnapshot(Snapshot snapshot)
607 {
608 Snapshot newsnap;
609 Size subxipoff;
610 Size size;
611
612 Assert(snapshot != InvalidSnapshot);
613
614 /* We allocate any XID arrays needed in the same palloc block. */
615 size = subxipoff = sizeof(SnapshotData) +
616 snapshot->xcnt * sizeof(TransactionId);
617 if (snapshot->subxcnt > 0)
618 size += snapshot->subxcnt * sizeof(TransactionId);
619
620 newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
621 memcpy(newsnap, snapshot, sizeof(SnapshotData));
622
623 newsnap->regd_count = 0;
624 newsnap->active_count = 0;
625 newsnap->copied = true;
626 newsnap->snapXactCompletionCount = 0;
627
628 /* setup XID array */
629 if (snapshot->xcnt > 0)
630 {
631 newsnap->xip = (TransactionId *) (newsnap + 1);
632 memcpy(newsnap->xip, snapshot->xip,
633 snapshot->xcnt * sizeof(TransactionId));
634 }
635 else
636 newsnap->xip = NULL;
637
638 /*
639 * Setup subXID array. Don't bother to copy it if it had overflowed,
640 * though, because it's not used anywhere in that case. Except if it's a
641 * snapshot taken during recovery; all the top-level XIDs are in subxip as
642 * well in that case, so we mustn't lose them.
643 */
644 if (snapshot->subxcnt > 0 &&
645 (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
646 {
647 newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
648 memcpy(newsnap->subxip, snapshot->subxip,
649 snapshot->subxcnt * sizeof(TransactionId));
650 }
651 else
652 newsnap->subxip = NULL;
653
654 return newsnap;
655 }
656
657 /*
658 * FreeSnapshot
659 * Free the memory associated with a snapshot.
660 */
661 static void
FreeSnapshot(Snapshot snapshot)662 FreeSnapshot(Snapshot snapshot)
663 {
664 Assert(snapshot->regd_count == 0);
665 Assert(snapshot->active_count == 0);
666 Assert(snapshot->copied);
667
668 pfree(snapshot);
669 }
670
671 /*
672 * PushActiveSnapshot
673 * Set the given snapshot as the current active snapshot
674 *
675 * If the passed snapshot is a statically-allocated one, or it is possibly
676 * subject to a future command counter update, create a new long-lived copy
677 * with active refcount=1. Otherwise, only increment the refcount.
678 */
679 void
PushActiveSnapshot(Snapshot snap)680 PushActiveSnapshot(Snapshot snap)
681 {
682 PushActiveSnapshotWithLevel(snap, GetCurrentTransactionNestLevel());
683 }
684
685 /*
686 * PushActiveSnapshotWithLevel
687 * Set the given snapshot as the current active snapshot
688 *
689 * Same as PushActiveSnapshot except that caller can specify the
690 * transaction nesting level that "owns" the snapshot. This level
691 * must not be deeper than the current top of the snapshot stack.
692 */
693 void
PushActiveSnapshotWithLevel(Snapshot snap,int snap_level)694 PushActiveSnapshotWithLevel(Snapshot snap, int snap_level)
695 {
696 ActiveSnapshotElt *newactive;
697
698 Assert(snap != InvalidSnapshot);
699 Assert(ActiveSnapshot == NULL || snap_level >= ActiveSnapshot->as_level);
700
701 newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
702
703 /*
704 * Checking SecondarySnapshot is probably useless here, but it seems
705 * better to be sure.
706 */
707 if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
708 newactive->as_snap = CopySnapshot(snap);
709 else
710 newactive->as_snap = snap;
711
712 newactive->as_next = ActiveSnapshot;
713 newactive->as_level = snap_level;
714
715 newactive->as_snap->active_count++;
716
717 ActiveSnapshot = newactive;
718 if (OldestActiveSnapshot == NULL)
719 OldestActiveSnapshot = ActiveSnapshot;
720 }
721
722 /*
723 * PushCopiedSnapshot
724 * As above, except forcibly copy the presented snapshot.
725 *
726 * This should be used when the ActiveSnapshot has to be modifiable, for
727 * example if the caller intends to call UpdateActiveSnapshotCommandId.
728 * The new snapshot will be released when popped from the stack.
729 */
730 void
PushCopiedSnapshot(Snapshot snapshot)731 PushCopiedSnapshot(Snapshot snapshot)
732 {
733 PushActiveSnapshot(CopySnapshot(snapshot));
734 }
735
736 /*
737 * UpdateActiveSnapshotCommandId
738 *
739 * Update the current CID of the active snapshot. This can only be applied
740 * to a snapshot that is not referenced elsewhere.
741 */
742 void
UpdateActiveSnapshotCommandId(void)743 UpdateActiveSnapshotCommandId(void)
744 {
745 CommandId save_curcid,
746 curcid;
747
748 Assert(ActiveSnapshot != NULL);
749 Assert(ActiveSnapshot->as_snap->active_count == 1);
750 Assert(ActiveSnapshot->as_snap->regd_count == 0);
751
752 /*
753 * Don't allow modification of the active snapshot during parallel
754 * operation. We share the snapshot to worker backends at the beginning
755 * of parallel operation, so any change to the snapshot can lead to
756 * inconsistencies. We have other defenses against
757 * CommandCounterIncrement, but there are a few places that call this
758 * directly, so we put an additional guard here.
759 */
760 save_curcid = ActiveSnapshot->as_snap->curcid;
761 curcid = GetCurrentCommandId(false);
762 if (IsInParallelMode() && save_curcid != curcid)
763 elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
764 ActiveSnapshot->as_snap->curcid = curcid;
765 }
766
767 /*
768 * PopActiveSnapshot
769 *
770 * Remove the topmost snapshot from the active snapshot stack, decrementing the
771 * reference count, and free it if this was the last reference.
772 */
773 void
PopActiveSnapshot(void)774 PopActiveSnapshot(void)
775 {
776 ActiveSnapshotElt *newstack;
777
778 newstack = ActiveSnapshot->as_next;
779
780 Assert(ActiveSnapshot->as_snap->active_count > 0);
781
782 ActiveSnapshot->as_snap->active_count--;
783
784 if (ActiveSnapshot->as_snap->active_count == 0 &&
785 ActiveSnapshot->as_snap->regd_count == 0)
786 FreeSnapshot(ActiveSnapshot->as_snap);
787
788 pfree(ActiveSnapshot);
789 ActiveSnapshot = newstack;
790 if (ActiveSnapshot == NULL)
791 OldestActiveSnapshot = NULL;
792
793 SnapshotResetXmin();
794 }
795
796 /*
797 * GetActiveSnapshot
798 * Return the topmost snapshot in the Active stack.
799 */
800 Snapshot
GetActiveSnapshot(void)801 GetActiveSnapshot(void)
802 {
803 Assert(ActiveSnapshot != NULL);
804
805 return ActiveSnapshot->as_snap;
806 }
807
808 /*
809 * ActiveSnapshotSet
810 * Return whether there is at least one snapshot in the Active stack
811 */
812 bool
ActiveSnapshotSet(void)813 ActiveSnapshotSet(void)
814 {
815 return ActiveSnapshot != NULL;
816 }
817
818 /*
819 * RegisterSnapshot
820 * Register a snapshot as being in use by the current resource owner
821 *
822 * If InvalidSnapshot is passed, it is not registered.
823 */
824 Snapshot
RegisterSnapshot(Snapshot snapshot)825 RegisterSnapshot(Snapshot snapshot)
826 {
827 if (snapshot == InvalidSnapshot)
828 return InvalidSnapshot;
829
830 return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
831 }
832
833 /*
834 * RegisterSnapshotOnOwner
835 * As above, but use the specified resource owner
836 */
837 Snapshot
RegisterSnapshotOnOwner(Snapshot snapshot,ResourceOwner owner)838 RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
839 {
840 Snapshot snap;
841
842 if (snapshot == InvalidSnapshot)
843 return InvalidSnapshot;
844
845 /* Static snapshot? Create a persistent copy */
846 snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
847
848 /* and tell resowner.c about it */
849 ResourceOwnerEnlargeSnapshots(owner);
850 snap->regd_count++;
851 ResourceOwnerRememberSnapshot(owner, snap);
852
853 if (snap->regd_count == 1)
854 pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
855
856 return snap;
857 }
858
859 /*
860 * UnregisterSnapshot
861 *
862 * Decrement the reference count of a snapshot, remove the corresponding
863 * reference from CurrentResourceOwner, and free the snapshot if no more
864 * references remain.
865 */
866 void
UnregisterSnapshot(Snapshot snapshot)867 UnregisterSnapshot(Snapshot snapshot)
868 {
869 if (snapshot == NULL)
870 return;
871
872 UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
873 }
874
875 /*
876 * UnregisterSnapshotFromOwner
877 * As above, but use the specified resource owner
878 */
879 void
UnregisterSnapshotFromOwner(Snapshot snapshot,ResourceOwner owner)880 UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
881 {
882 if (snapshot == NULL)
883 return;
884
885 Assert(snapshot->regd_count > 0);
886 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
887
888 ResourceOwnerForgetSnapshot(owner, snapshot);
889
890 snapshot->regd_count--;
891 if (snapshot->regd_count == 0)
892 pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
893
894 if (snapshot->regd_count == 0 && snapshot->active_count == 0)
895 {
896 FreeSnapshot(snapshot);
897 SnapshotResetXmin();
898 }
899 }
900
901 /*
902 * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
903 * by xmin, so that the snapshot with smallest xmin is at the top.
904 */
905 static int
xmin_cmp(const pairingheap_node * a,const pairingheap_node * b,void * arg)906 xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
907 {
908 const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
909 const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
910
911 if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
912 return 1;
913 else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
914 return -1;
915 else
916 return 0;
917 }
918
919 /*
920 * SnapshotResetXmin
921 *
922 * If there are no more snapshots, we can reset our PGPROC->xmin to InvalidXid.
923 * Note we can do this without locking because we assume that storing an Xid
924 * is atomic.
925 *
926 * Even if there are some remaining snapshots, we may be able to advance our
927 * PGPROC->xmin to some degree. This typically happens when a portal is
928 * dropped. For efficiency, we only consider recomputing PGPROC->xmin when
929 * the active snapshot stack is empty; this allows us not to need to track
930 * which active snapshot is oldest.
931 *
932 * Note: it's tempting to use GetOldestSnapshot() here so that we can include
933 * active snapshots in the calculation. However, that compares by LSN not
934 * xmin so it's not entirely clear that it's the same thing. Also, we'd be
935 * critically dependent on the assumption that the bottommost active snapshot
936 * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are
937 * not actually critical, but this would be.)
938 */
939 static void
SnapshotResetXmin(void)940 SnapshotResetXmin(void)
941 {
942 Snapshot minSnapshot;
943
944 if (ActiveSnapshot != NULL)
945 return;
946
947 if (pairingheap_is_empty(&RegisteredSnapshots))
948 {
949 MyProc->xmin = InvalidTransactionId;
950 return;
951 }
952
953 minSnapshot = pairingheap_container(SnapshotData, ph_node,
954 pairingheap_first(&RegisteredSnapshots));
955
956 if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
957 MyProc->xmin = minSnapshot->xmin;
958 }
959
960 /*
961 * AtSubCommit_Snapshot
962 */
963 void
AtSubCommit_Snapshot(int level)964 AtSubCommit_Snapshot(int level)
965 {
966 ActiveSnapshotElt *active;
967
968 /*
969 * Relabel the active snapshots set in this subtransaction as though they
970 * are owned by the parent subxact.
971 */
972 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
973 {
974 if (active->as_level < level)
975 break;
976 active->as_level = level - 1;
977 }
978 }
979
980 /*
981 * AtSubAbort_Snapshot
982 * Clean up snapshots after a subtransaction abort
983 */
984 void
AtSubAbort_Snapshot(int level)985 AtSubAbort_Snapshot(int level)
986 {
987 /* Forget the active snapshots set by this subtransaction */
988 while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
989 {
990 ActiveSnapshotElt *next;
991
992 next = ActiveSnapshot->as_next;
993
994 /*
995 * Decrement the snapshot's active count. If it's still registered or
996 * marked as active by an outer subtransaction, we can't free it yet.
997 */
998 Assert(ActiveSnapshot->as_snap->active_count >= 1);
999 ActiveSnapshot->as_snap->active_count -= 1;
1000
1001 if (ActiveSnapshot->as_snap->active_count == 0 &&
1002 ActiveSnapshot->as_snap->regd_count == 0)
1003 FreeSnapshot(ActiveSnapshot->as_snap);
1004
1005 /* and free the stack element */
1006 pfree(ActiveSnapshot);
1007
1008 ActiveSnapshot = next;
1009 if (ActiveSnapshot == NULL)
1010 OldestActiveSnapshot = NULL;
1011 }
1012
1013 SnapshotResetXmin();
1014 }
1015
1016 /*
1017 * AtEOXact_Snapshot
1018 * Snapshot manager's cleanup function for end of transaction
1019 */
1020 void
AtEOXact_Snapshot(bool isCommit,bool resetXmin)1021 AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1022 {
1023 /*
1024 * In transaction-snapshot mode we must release our privately-managed
1025 * reference to the transaction snapshot. We must remove it from
1026 * RegisteredSnapshots to keep the check below happy. But we don't bother
1027 * to do FreeSnapshot, for two reasons: the memory will go away with
1028 * TopTransactionContext anyway, and if someone has left the snapshot
1029 * stacked as active, we don't want the code below to be chasing through a
1030 * dangling pointer.
1031 */
1032 if (FirstXactSnapshot != NULL)
1033 {
1034 Assert(FirstXactSnapshot->regd_count > 0);
1035 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1036 pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1037 }
1038 FirstXactSnapshot = NULL;
1039
1040 /*
1041 * If we exported any snapshots, clean them up.
1042 */
1043 if (exportedSnapshots != NIL)
1044 {
1045 ListCell *lc;
1046
1047 /*
1048 * Get rid of the files. Unlink failure is only a WARNING because (1)
1049 * it's too late to abort the transaction, and (2) leaving a leaked
1050 * file around has little real consequence anyway.
1051 *
1052 * We also need to remove the snapshots from RegisteredSnapshots to
1053 * prevent a warning below.
1054 *
1055 * As with the FirstXactSnapshot, we don't need to free resources of
1056 * the snapshot itself as it will go away with the memory context.
1057 */
1058 foreach(lc, exportedSnapshots)
1059 {
1060 ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1061
1062 if (unlink(esnap->snapfile))
1063 elog(WARNING, "could not unlink file \"%s\": %m",
1064 esnap->snapfile);
1065
1066 pairingheap_remove(&RegisteredSnapshots,
1067 &esnap->snapshot->ph_node);
1068 }
1069
1070 exportedSnapshots = NIL;
1071 }
1072
1073 /* Drop catalog snapshot if any */
1074 InvalidateCatalogSnapshot();
1075
1076 /* On commit, complain about leftover snapshots */
1077 if (isCommit)
1078 {
1079 ActiveSnapshotElt *active;
1080
1081 if (!pairingheap_is_empty(&RegisteredSnapshots))
1082 elog(WARNING, "registered snapshots seem to remain after cleanup");
1083
1084 /* complain about unpopped active snapshots */
1085 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1086 elog(WARNING, "snapshot %p still active", active);
1087 }
1088
1089 /*
1090 * And reset our state. We don't need to free the memory explicitly --
1091 * it'll go away with TopTransactionContext.
1092 */
1093 ActiveSnapshot = NULL;
1094 OldestActiveSnapshot = NULL;
1095 pairingheap_reset(&RegisteredSnapshots);
1096
1097 CurrentSnapshot = NULL;
1098 SecondarySnapshot = NULL;
1099
1100 FirstSnapshotSet = false;
1101
1102 /*
1103 * During normal commit processing, we call ProcArrayEndTransaction() to
1104 * reset the MyProc->xmin. That call happens prior to the call to
1105 * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1106 */
1107 if (resetXmin)
1108 SnapshotResetXmin();
1109
1110 Assert(resetXmin || MyProc->xmin == 0);
1111 }
1112
1113
1114 /*
1115 * ExportSnapshot
1116 * Export the snapshot to a file so that other backends can import it.
1117 * Returns the token (the file name) that can be used to import this
1118 * snapshot.
1119 */
1120 char *
ExportSnapshot(Snapshot snapshot)1121 ExportSnapshot(Snapshot snapshot)
1122 {
1123 TransactionId topXid;
1124 TransactionId *children;
1125 ExportedSnapshot *esnap;
1126 int nchildren;
1127 int addTopXid;
1128 StringInfoData buf;
1129 FILE *f;
1130 int i;
1131 MemoryContext oldcxt;
1132 char path[MAXPGPATH];
1133 char pathtmp[MAXPGPATH];
1134
1135 /*
1136 * It's tempting to call RequireTransactionBlock here, since it's not very
1137 * useful to export a snapshot that will disappear immediately afterwards.
1138 * However, we haven't got enough information to do that, since we don't
1139 * know if we're at top level or not. For example, we could be inside a
1140 * plpgsql function that is going to fire off other transactions via
1141 * dblink. Rather than disallow perfectly legitimate usages, don't make a
1142 * check.
1143 *
1144 * Also note that we don't make any restriction on the transaction's
1145 * isolation level; however, importers must check the level if they are
1146 * serializable.
1147 */
1148
1149 /*
1150 * Get our transaction ID if there is one, to include in the snapshot.
1151 */
1152 topXid = GetTopTransactionIdIfAny();
1153
1154 /*
1155 * We cannot export a snapshot from a subtransaction because there's no
1156 * easy way for importers to verify that the same subtransaction is still
1157 * running.
1158 */
1159 if (IsSubTransaction())
1160 ereport(ERROR,
1161 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1162 errmsg("cannot export a snapshot from a subtransaction")));
1163
1164 /*
1165 * We do however allow previous committed subtransactions to exist.
1166 * Importers of the snapshot must see them as still running, so get their
1167 * XIDs to add them to the snapshot.
1168 */
1169 nchildren = xactGetCommittedChildren(&children);
1170
1171 /*
1172 * Generate file path for the snapshot. We start numbering of snapshots
1173 * inside the transaction from 1.
1174 */
1175 snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1176 MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1177
1178 /*
1179 * Copy the snapshot into TopTransactionContext, add it to the
1180 * exportedSnapshots list, and mark it pseudo-registered. We do this to
1181 * ensure that the snapshot's xmin is honored for the rest of the
1182 * transaction.
1183 */
1184 snapshot = CopySnapshot(snapshot);
1185
1186 oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1187 esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1188 esnap->snapfile = pstrdup(path);
1189 esnap->snapshot = snapshot;
1190 exportedSnapshots = lappend(exportedSnapshots, esnap);
1191 MemoryContextSwitchTo(oldcxt);
1192
1193 snapshot->regd_count++;
1194 pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1195
1196 /*
1197 * Fill buf with a text serialization of the snapshot, plus identification
1198 * data about this transaction. The format expected by ImportSnapshot is
1199 * pretty rigid: each line must be fieldname:value.
1200 */
1201 initStringInfo(&buf);
1202
1203 appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1204 appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1205 appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1206 appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1207 appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1208
1209 appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1210 appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1211
1212 /*
1213 * We must include our own top transaction ID in the top-xid data, since
1214 * by definition we will still be running when the importing transaction
1215 * adopts the snapshot, but GetSnapshotData never includes our own XID in
1216 * the snapshot. (There must, therefore, be enough room to add it.)
1217 *
1218 * However, it could be that our topXid is after the xmax, in which case
1219 * we shouldn't include it because xip[] members are expected to be before
1220 * xmax. (We need not make the same check for subxip[] members, see
1221 * snapshot.h.)
1222 */
1223 addTopXid = (TransactionIdIsValid(topXid) &&
1224 TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1225 appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1226 for (i = 0; i < snapshot->xcnt; i++)
1227 appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1228 if (addTopXid)
1229 appendStringInfo(&buf, "xip:%u\n", topXid);
1230
1231 /*
1232 * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1233 * we have to cope with possible overflow.
1234 */
1235 if (snapshot->suboverflowed ||
1236 snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1237 appendStringInfoString(&buf, "sof:1\n");
1238 else
1239 {
1240 appendStringInfoString(&buf, "sof:0\n");
1241 appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1242 for (i = 0; i < snapshot->subxcnt; i++)
1243 appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1244 for (i = 0; i < nchildren; i++)
1245 appendStringInfo(&buf, "sxp:%u\n", children[i]);
1246 }
1247 appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1248
1249 /*
1250 * Now write the text representation into a file. We first write to a
1251 * ".tmp" filename, and rename to final filename if no error. This
1252 * ensures that no other backend can read an incomplete file
1253 * (ImportSnapshot won't allow it because of its valid-characters check).
1254 */
1255 snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1256 if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1257 ereport(ERROR,
1258 (errcode_for_file_access(),
1259 errmsg("could not create file \"%s\": %m", pathtmp)));
1260
1261 if (fwrite(buf.data, buf.len, 1, f) != 1)
1262 ereport(ERROR,
1263 (errcode_for_file_access(),
1264 errmsg("could not write to file \"%s\": %m", pathtmp)));
1265
1266 /* no fsync() since file need not survive a system crash */
1267
1268 if (FreeFile(f))
1269 ereport(ERROR,
1270 (errcode_for_file_access(),
1271 errmsg("could not write to file \"%s\": %m", pathtmp)));
1272
1273 /*
1274 * Now that we have written everything into a .tmp file, rename the file
1275 * to remove the .tmp suffix.
1276 */
1277 if (rename(pathtmp, path) < 0)
1278 ereport(ERROR,
1279 (errcode_for_file_access(),
1280 errmsg("could not rename file \"%s\" to \"%s\": %m",
1281 pathtmp, path)));
1282
1283 /*
1284 * The basename of the file is what we return from pg_export_snapshot().
1285 * It's already in path in a textual format and we know that the path
1286 * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
1287 * and pstrdup it so as not to return the address of a local variable.
1288 */
1289 return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1290 }
1291
1292 /*
1293 * pg_export_snapshot
1294 * SQL-callable wrapper for ExportSnapshot.
1295 */
1296 Datum
pg_export_snapshot(PG_FUNCTION_ARGS)1297 pg_export_snapshot(PG_FUNCTION_ARGS)
1298 {
1299 char *snapshotName;
1300
1301 snapshotName = ExportSnapshot(GetActiveSnapshot());
1302 PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1303 }
1304
1305
1306 /*
1307 * Parsing subroutines for ImportSnapshot: parse a line with the given
1308 * prefix followed by a value, and advance *s to the next line. The
1309 * filename is provided for use in error messages.
1310 */
1311 static int
parseIntFromText(const char * prefix,char ** s,const char * filename)1312 parseIntFromText(const char *prefix, char **s, const char *filename)
1313 {
1314 char *ptr = *s;
1315 int prefixlen = strlen(prefix);
1316 int val;
1317
1318 if (strncmp(ptr, prefix, prefixlen) != 0)
1319 ereport(ERROR,
1320 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1321 errmsg("invalid snapshot data in file \"%s\"", filename)));
1322 ptr += prefixlen;
1323 if (sscanf(ptr, "%d", &val) != 1)
1324 ereport(ERROR,
1325 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1326 errmsg("invalid snapshot data in file \"%s\"", filename)));
1327 ptr = strchr(ptr, '\n');
1328 if (!ptr)
1329 ereport(ERROR,
1330 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1331 errmsg("invalid snapshot data in file \"%s\"", filename)));
1332 *s = ptr + 1;
1333 return val;
1334 }
1335
1336 static TransactionId
parseXidFromText(const char * prefix,char ** s,const char * filename)1337 parseXidFromText(const char *prefix, char **s, const char *filename)
1338 {
1339 char *ptr = *s;
1340 int prefixlen = strlen(prefix);
1341 TransactionId val;
1342
1343 if (strncmp(ptr, prefix, prefixlen) != 0)
1344 ereport(ERROR,
1345 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1346 errmsg("invalid snapshot data in file \"%s\"", filename)));
1347 ptr += prefixlen;
1348 if (sscanf(ptr, "%u", &val) != 1)
1349 ereport(ERROR,
1350 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1351 errmsg("invalid snapshot data in file \"%s\"", filename)));
1352 ptr = strchr(ptr, '\n');
1353 if (!ptr)
1354 ereport(ERROR,
1355 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1356 errmsg("invalid snapshot data in file \"%s\"", filename)));
1357 *s = ptr + 1;
1358 return val;
1359 }
1360
1361 static void
parseVxidFromText(const char * prefix,char ** s,const char * filename,VirtualTransactionId * vxid)1362 parseVxidFromText(const char *prefix, char **s, const char *filename,
1363 VirtualTransactionId *vxid)
1364 {
1365 char *ptr = *s;
1366 int prefixlen = strlen(prefix);
1367
1368 if (strncmp(ptr, prefix, prefixlen) != 0)
1369 ereport(ERROR,
1370 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1371 errmsg("invalid snapshot data in file \"%s\"", filename)));
1372 ptr += prefixlen;
1373 if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1374 ereport(ERROR,
1375 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1376 errmsg("invalid snapshot data in file \"%s\"", filename)));
1377 ptr = strchr(ptr, '\n');
1378 if (!ptr)
1379 ereport(ERROR,
1380 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1381 errmsg("invalid snapshot data in file \"%s\"", filename)));
1382 *s = ptr + 1;
1383 }
1384
1385 /*
1386 * ImportSnapshot
1387 * Import a previously exported snapshot. The argument should be a
1388 * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
1389 * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1390 */
1391 void
ImportSnapshot(const char * idstr)1392 ImportSnapshot(const char *idstr)
1393 {
1394 char path[MAXPGPATH];
1395 FILE *f;
1396 struct stat stat_buf;
1397 char *filebuf;
1398 int xcnt;
1399 int i;
1400 VirtualTransactionId src_vxid;
1401 int src_pid;
1402 Oid src_dbid;
1403 int src_isolevel;
1404 bool src_readonly;
1405 SnapshotData snapshot;
1406
1407 /*
1408 * Must be at top level of a fresh transaction. Note in particular that
1409 * we check we haven't acquired an XID --- if we have, it's conceivable
1410 * that the snapshot would show it as not running, making for very screwy
1411 * behavior.
1412 */
1413 if (FirstSnapshotSet ||
1414 GetTopTransactionIdIfAny() != InvalidTransactionId ||
1415 IsSubTransaction())
1416 ereport(ERROR,
1417 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1418 errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1419
1420 /*
1421 * If we are in read committed mode then the next query would execute with
1422 * a new snapshot thus making this function call quite useless.
1423 */
1424 if (!IsolationUsesXactSnapshot())
1425 ereport(ERROR,
1426 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1427 errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1428
1429 /*
1430 * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
1431 * this mainly to prevent reading arbitrary files.
1432 */
1433 if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1434 ereport(ERROR,
1435 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1436 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1437
1438 /* OK, read the file */
1439 snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1440
1441 f = AllocateFile(path, PG_BINARY_R);
1442 if (!f)
1443 ereport(ERROR,
1444 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1445 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1446
1447 /* get the size of the file so that we know how much memory we need */
1448 if (fstat(fileno(f), &stat_buf))
1449 elog(ERROR, "could not stat file \"%s\": %m", path);
1450
1451 /* and read the file into a palloc'd string */
1452 filebuf = (char *) palloc(stat_buf.st_size + 1);
1453 if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1454 elog(ERROR, "could not read file \"%s\": %m", path);
1455
1456 filebuf[stat_buf.st_size] = '\0';
1457
1458 FreeFile(f);
1459
1460 /*
1461 * Construct a snapshot struct by parsing the file content.
1462 */
1463 memset(&snapshot, 0, sizeof(snapshot));
1464
1465 parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1466 src_pid = parseIntFromText("pid:", &filebuf, path);
1467 /* we abuse parseXidFromText a bit here ... */
1468 src_dbid = parseXidFromText("dbid:", &filebuf, path);
1469 src_isolevel = parseIntFromText("iso:", &filebuf, path);
1470 src_readonly = parseIntFromText("ro:", &filebuf, path);
1471
1472 snapshot.snapshot_type = SNAPSHOT_MVCC;
1473
1474 snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1475 snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1476
1477 snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1478
1479 /* sanity-check the xid count before palloc */
1480 if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1481 ereport(ERROR,
1482 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1483 errmsg("invalid snapshot data in file \"%s\"", path)));
1484
1485 snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1486 for (i = 0; i < xcnt; i++)
1487 snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1488
1489 snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1490
1491 if (!snapshot.suboverflowed)
1492 {
1493 snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1494
1495 /* sanity-check the xid count before palloc */
1496 if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1497 ereport(ERROR,
1498 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1499 errmsg("invalid snapshot data in file \"%s\"", path)));
1500
1501 snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1502 for (i = 0; i < xcnt; i++)
1503 snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1504 }
1505 else
1506 {
1507 snapshot.subxcnt = 0;
1508 snapshot.subxip = NULL;
1509 }
1510
1511 snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1512
1513 /*
1514 * Do some additional sanity checking, just to protect ourselves. We
1515 * don't trouble to check the array elements, just the most critical
1516 * fields.
1517 */
1518 if (!VirtualTransactionIdIsValid(src_vxid) ||
1519 !OidIsValid(src_dbid) ||
1520 !TransactionIdIsNormal(snapshot.xmin) ||
1521 !TransactionIdIsNormal(snapshot.xmax))
1522 ereport(ERROR,
1523 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1524 errmsg("invalid snapshot data in file \"%s\"", path)));
1525
1526 /*
1527 * If we're serializable, the source transaction must be too, otherwise
1528 * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
1529 * non-read-only transaction can't adopt a snapshot from a read-only
1530 * transaction, as predicate.c handles the cases very differently.
1531 */
1532 if (IsolationIsSerializable())
1533 {
1534 if (src_isolevel != XACT_SERIALIZABLE)
1535 ereport(ERROR,
1536 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1537 errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1538 if (src_readonly && !XactReadOnly)
1539 ereport(ERROR,
1540 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1541 errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1542 }
1543
1544 /*
1545 * We cannot import a snapshot that was taken in a different database,
1546 * because vacuum calculates OldestXmin on a per-database basis; so the
1547 * source transaction's xmin doesn't protect us from data loss. This
1548 * restriction could be removed if the source transaction were to mark its
1549 * xmin as being globally applicable. But that would require some
1550 * additional syntax, since that has to be known when the snapshot is
1551 * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
1552 */
1553 if (src_dbid != MyDatabaseId)
1554 ereport(ERROR,
1555 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1556 errmsg("cannot import a snapshot from a different database")));
1557
1558 /* OK, install the snapshot */
1559 SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1560 }
1561
1562 /*
1563 * XactHasExportedSnapshots
1564 * Test whether current transaction has exported any snapshots.
1565 */
1566 bool
XactHasExportedSnapshots(void)1567 XactHasExportedSnapshots(void)
1568 {
1569 return (exportedSnapshots != NIL);
1570 }
1571
1572 /*
1573 * DeleteAllExportedSnapshotFiles
1574 * Clean up any files that have been left behind by a crashed backend
1575 * that had exported snapshots before it died.
1576 *
1577 * This should be called during database startup or crash recovery.
1578 */
1579 void
DeleteAllExportedSnapshotFiles(void)1580 DeleteAllExportedSnapshotFiles(void)
1581 {
1582 char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1583 DIR *s_dir;
1584 struct dirent *s_de;
1585
1586 /*
1587 * Problems in reading the directory, or unlinking files, are reported at
1588 * LOG level. Since we're running in the startup process, ERROR level
1589 * would prevent database start, and it's not important enough for that.
1590 */
1591 s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR);
1592
1593 while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL)
1594 {
1595 if (strcmp(s_de->d_name, ".") == 0 ||
1596 strcmp(s_de->d_name, "..") == 0)
1597 continue;
1598
1599 snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1600
1601 if (unlink(buf) != 0)
1602 ereport(LOG,
1603 (errcode_for_file_access(),
1604 errmsg("could not remove file \"%s\": %m", buf)));
1605 }
1606
1607 FreeDir(s_dir);
1608 }
1609
1610 /*
1611 * ThereAreNoPriorRegisteredSnapshots
1612 * Is the registered snapshot count less than or equal to one?
1613 *
1614 * Don't use this to settle important decisions. While zero registrations and
1615 * no ActiveSnapshot would confirm a certain idleness, the system makes no
1616 * guarantees about the significance of one registered snapshot.
1617 */
1618 bool
ThereAreNoPriorRegisteredSnapshots(void)1619 ThereAreNoPriorRegisteredSnapshots(void)
1620 {
1621 if (pairingheap_is_empty(&RegisteredSnapshots) ||
1622 pairingheap_is_singular(&RegisteredSnapshots))
1623 return true;
1624
1625 return false;
1626 }
1627
1628
1629 /*
1630 * Return a timestamp that is exactly on a minute boundary.
1631 *
1632 * If the argument is already aligned, return that value, otherwise move to
1633 * the next minute boundary following the given time.
1634 */
1635 static TimestampTz
AlignTimestampToMinuteBoundary(TimestampTz ts)1636 AlignTimestampToMinuteBoundary(TimestampTz ts)
1637 {
1638 TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1639
1640 return retval - (retval % USECS_PER_MINUTE);
1641 }
1642
1643 /*
1644 * Get current timestamp for snapshots
1645 *
1646 * This is basically GetCurrentTimestamp(), but with a guarantee that
1647 * the result never moves backward.
1648 */
1649 TimestampTz
GetSnapshotCurrentTimestamp(void)1650 GetSnapshotCurrentTimestamp(void)
1651 {
1652 TimestampTz now = GetCurrentTimestamp();
1653
1654 /*
1655 * Don't let time move backward; if it hasn't advanced, use the old value.
1656 */
1657 SpinLockAcquire(&oldSnapshotControl->mutex_current);
1658 if (now <= oldSnapshotControl->current_timestamp)
1659 now = oldSnapshotControl->current_timestamp;
1660 else
1661 oldSnapshotControl->current_timestamp = now;
1662 SpinLockRelease(&oldSnapshotControl->mutex_current);
1663
1664 return now;
1665 }
1666
1667 /*
1668 * Get timestamp through which vacuum may have processed based on last stored
1669 * value for threshold_timestamp.
1670 *
1671 * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1672 * that ever changes, we could get rid of the spinlock here.
1673 */
1674 TimestampTz
GetOldSnapshotThresholdTimestamp(void)1675 GetOldSnapshotThresholdTimestamp(void)
1676 {
1677 TimestampTz threshold_timestamp;
1678
1679 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1680 threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1681 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1682
1683 return threshold_timestamp;
1684 }
1685
1686 void
SetOldSnapshotThresholdTimestamp(TimestampTz ts,TransactionId xlimit)1687 SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1688 {
1689 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1690 Assert(oldSnapshotControl->threshold_timestamp <= ts);
1691 Assert(TransactionIdPrecedesOrEquals(oldSnapshotControl->threshold_xid, xlimit));
1692 oldSnapshotControl->threshold_timestamp = ts;
1693 oldSnapshotControl->threshold_xid = xlimit;
1694 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1695 }
1696
1697 /*
1698 * XXX: Magic to keep old_snapshot_threshold tests appear "working". They
1699 * currently are broken, and discussion of what to do about them is
1700 * ongoing. See
1701 * https://www.postgresql.org/message-id/20200403001235.e6jfdll3gh2ygbuc%40alap3.anarazel.de
1702 */
1703 void
SnapshotTooOldMagicForTest(void)1704 SnapshotTooOldMagicForTest(void)
1705 {
1706 TimestampTz ts = GetSnapshotCurrentTimestamp();
1707
1708 Assert(old_snapshot_threshold == 0);
1709
1710 ts -= 5 * USECS_PER_SEC;
1711
1712 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1713 oldSnapshotControl->threshold_timestamp = ts;
1714 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1715 }
1716
1717 /*
1718 * If there is a valid mapping for the timestamp, set *xlimitp to
1719 * that. Returns whether there is such a mapping.
1720 */
1721 static bool
GetOldSnapshotFromTimeMapping(TimestampTz ts,TransactionId * xlimitp)1722 GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp)
1723 {
1724 bool in_mapping = false;
1725
1726 Assert(ts == AlignTimestampToMinuteBoundary(ts));
1727
1728 LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1729
1730 if (oldSnapshotControl->count_used > 0
1731 && ts >= oldSnapshotControl->head_timestamp)
1732 {
1733 int offset;
1734
1735 offset = ((ts - oldSnapshotControl->head_timestamp)
1736 / USECS_PER_MINUTE);
1737 if (offset > oldSnapshotControl->count_used - 1)
1738 offset = oldSnapshotControl->count_used - 1;
1739 offset = (oldSnapshotControl->head_offset + offset)
1740 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1741
1742 *xlimitp = oldSnapshotControl->xid_by_minute[offset];
1743
1744 in_mapping = true;
1745 }
1746
1747 LWLockRelease(OldSnapshotTimeMapLock);
1748
1749 return in_mapping;
1750 }
1751
1752 /*
1753 * TransactionIdLimitedForOldSnapshots
1754 *
1755 * Apply old snapshot limit. This is intended to be called for page pruning
1756 * and table vacuuming, to allow old_snapshot_threshold to override the normal
1757 * global xmin value. Actual testing for snapshot too old will be based on
1758 * whether a snapshot timestamp is prior to the threshold timestamp set in
1759 * this function.
1760 *
1761 * If the limited horizon allows a cleanup action that otherwise would not be
1762 * possible, SetOldSnapshotThresholdTimestamp(*limit_ts, *limit_xid) needs to
1763 * be called before that cleanup action.
1764 */
1765 bool
TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,Relation relation,TransactionId * limit_xid,TimestampTz * limit_ts)1766 TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1767 Relation relation,
1768 TransactionId *limit_xid,
1769 TimestampTz *limit_ts)
1770 {
1771 TimestampTz ts;
1772 TransactionId xlimit = recentXmin;
1773 TransactionId latest_xmin;
1774 TimestampTz next_map_update_ts;
1775 TransactionId threshold_timestamp;
1776 TransactionId threshold_xid;
1777
1778 Assert(TransactionIdIsNormal(recentXmin));
1779 Assert(OldSnapshotThresholdActive());
1780 Assert(limit_ts != NULL && limit_xid != NULL);
1781
1782 /*
1783 * TestForOldSnapshot() assumes early pruning advances the page LSN, so we
1784 * can't prune early when skipping WAL.
1785 */
1786 if (!RelationAllowsEarlyPruning(relation) || !RelationNeedsWAL(relation))
1787 return false;
1788
1789 ts = GetSnapshotCurrentTimestamp();
1790
1791 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1792 latest_xmin = oldSnapshotControl->latest_xmin;
1793 next_map_update_ts = oldSnapshotControl->next_map_update;
1794 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1795
1796 /*
1797 * Zero threshold always overrides to latest xmin, if valid. Without some
1798 * heuristic it will find its own snapshot too old on, for example, a
1799 * simple UPDATE -- which would make it useless for most testing, but
1800 * there is no principled way to ensure that it doesn't fail in this way.
1801 * Use a five-second delay to try to get useful testing behavior, but this
1802 * may need adjustment.
1803 */
1804 if (old_snapshot_threshold == 0)
1805 {
1806 if (TransactionIdPrecedes(latest_xmin, MyProc->xmin)
1807 && TransactionIdFollows(latest_xmin, xlimit))
1808 xlimit = latest_xmin;
1809
1810 ts -= 5 * USECS_PER_SEC;
1811 }
1812 else
1813 {
1814 ts = AlignTimestampToMinuteBoundary(ts)
1815 - (old_snapshot_threshold * USECS_PER_MINUTE);
1816
1817 /* Check for fast exit without LW locking. */
1818 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1819 threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1820 threshold_xid = oldSnapshotControl->threshold_xid;
1821 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1822
1823 if (ts == threshold_timestamp)
1824 {
1825 /*
1826 * Current timestamp is in same bucket as the last limit that was
1827 * applied. Reuse.
1828 */
1829 xlimit = threshold_xid;
1830 }
1831 else if (ts == next_map_update_ts)
1832 {
1833 /*
1834 * FIXME: This branch is super iffy - but that should probably
1835 * fixed separately.
1836 */
1837 xlimit = latest_xmin;
1838 }
1839 else if (GetOldSnapshotFromTimeMapping(ts, &xlimit))
1840 {
1841 }
1842
1843 /*
1844 * Failsafe protection against vacuuming work of active transaction.
1845 *
1846 * This is not an assertion because we avoid the spinlock for
1847 * performance, leaving open the possibility that xlimit could advance
1848 * and be more current; but it seems prudent to apply this limit. It
1849 * might make pruning a tiny bit less aggressive than it could be, but
1850 * protects against data loss bugs.
1851 */
1852 if (TransactionIdIsNormal(latest_xmin)
1853 && TransactionIdPrecedes(latest_xmin, xlimit))
1854 xlimit = latest_xmin;
1855 }
1856
1857 if (TransactionIdIsValid(xlimit) &&
1858 TransactionIdFollowsOrEquals(xlimit, recentXmin))
1859 {
1860 *limit_ts = ts;
1861 *limit_xid = xlimit;
1862
1863 return true;
1864 }
1865
1866 return false;
1867 }
1868
1869 /*
1870 * Take care of the circular buffer that maps time to xid.
1871 */
1872 void
MaintainOldSnapshotTimeMapping(TimestampTz whenTaken,TransactionId xmin)1873 MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1874 {
1875 TimestampTz ts;
1876 TransactionId latest_xmin;
1877 TimestampTz update_ts;
1878 bool map_update_required = false;
1879
1880 /* Never call this function when old snapshot checking is disabled. */
1881 Assert(old_snapshot_threshold >= 0);
1882
1883 ts = AlignTimestampToMinuteBoundary(whenTaken);
1884
1885 /*
1886 * Keep track of the latest xmin seen by any process. Update mapping with
1887 * a new value when we have crossed a bucket boundary.
1888 */
1889 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1890 latest_xmin = oldSnapshotControl->latest_xmin;
1891 update_ts = oldSnapshotControl->next_map_update;
1892 if (ts > update_ts)
1893 {
1894 oldSnapshotControl->next_map_update = ts;
1895 map_update_required = true;
1896 }
1897 if (TransactionIdFollows(xmin, latest_xmin))
1898 oldSnapshotControl->latest_xmin = xmin;
1899 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1900
1901 /* We only needed to update the most recent xmin value. */
1902 if (!map_update_required)
1903 return;
1904
1905 /* No further tracking needed for 0 (used for testing). */
1906 if (old_snapshot_threshold == 0)
1907 return;
1908
1909 /*
1910 * We don't want to do something stupid with unusual values, but we don't
1911 * want to litter the log with warnings or break otherwise normal
1912 * processing for this feature; so if something seems unreasonable, just
1913 * log at DEBUG level and return without doing anything.
1914 */
1915 if (whenTaken < 0)
1916 {
1917 elog(DEBUG1,
1918 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1919 (long) whenTaken);
1920 return;
1921 }
1922 if (!TransactionIdIsNormal(xmin))
1923 {
1924 elog(DEBUG1,
1925 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1926 (unsigned long) xmin);
1927 return;
1928 }
1929
1930 LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1931
1932 Assert(oldSnapshotControl->head_offset >= 0);
1933 Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1934 Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1935 Assert(oldSnapshotControl->count_used >= 0);
1936 Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1937
1938 if (oldSnapshotControl->count_used == 0)
1939 {
1940 /* set up first entry for empty mapping */
1941 oldSnapshotControl->head_offset = 0;
1942 oldSnapshotControl->head_timestamp = ts;
1943 oldSnapshotControl->count_used = 1;
1944 oldSnapshotControl->xid_by_minute[0] = xmin;
1945 }
1946 else if (ts < oldSnapshotControl->head_timestamp)
1947 {
1948 /* old ts; log it at DEBUG */
1949 LWLockRelease(OldSnapshotTimeMapLock);
1950 elog(DEBUG1,
1951 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1952 (long) whenTaken);
1953 return;
1954 }
1955 else if (ts <= (oldSnapshotControl->head_timestamp +
1956 ((oldSnapshotControl->count_used - 1)
1957 * USECS_PER_MINUTE)))
1958 {
1959 /* existing mapping; advance xid if possible */
1960 int bucket = (oldSnapshotControl->head_offset
1961 + ((ts - oldSnapshotControl->head_timestamp)
1962 / USECS_PER_MINUTE))
1963 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1964
1965 if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1966 oldSnapshotControl->xid_by_minute[bucket] = xmin;
1967 }
1968 else
1969 {
1970 /* We need a new bucket, but it might not be the very next one. */
1971 int distance_to_new_tail;
1972 int distance_to_current_tail;
1973 int advance;
1974
1975 /*
1976 * Our goal is for the new "tail" of the mapping, that is, the entry
1977 * which is newest and thus furthest from the "head" entry, to
1978 * correspond to "ts". Since there's one entry per minute, the
1979 * distance between the current head and the new tail is just the
1980 * number of minutes of difference between ts and the current
1981 * head_timestamp.
1982 *
1983 * The distance from the current head to the current tail is one less
1984 * than the number of entries in the mapping, because the entry at the
1985 * head_offset is for 0 minutes after head_timestamp.
1986 *
1987 * The difference between these two values is the number of minutes by
1988 * which we need to advance the mapping, either adding new entries or
1989 * rotating old ones out.
1990 */
1991 distance_to_new_tail =
1992 (ts - oldSnapshotControl->head_timestamp) / USECS_PER_MINUTE;
1993 distance_to_current_tail =
1994 oldSnapshotControl->count_used - 1;
1995 advance = distance_to_new_tail - distance_to_current_tail;
1996 Assert(advance > 0);
1997
1998 if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1999 {
2000 /* Advance is so far that all old data is junk; start over. */
2001 oldSnapshotControl->head_offset = 0;
2002 oldSnapshotControl->count_used = 1;
2003 oldSnapshotControl->xid_by_minute[0] = xmin;
2004 oldSnapshotControl->head_timestamp = ts;
2005 }
2006 else
2007 {
2008 /* Store the new value in one or more buckets. */
2009 int i;
2010
2011 for (i = 0; i < advance; i++)
2012 {
2013 if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
2014 {
2015 /* Map full and new value replaces old head. */
2016 int old_head = oldSnapshotControl->head_offset;
2017
2018 if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
2019 oldSnapshotControl->head_offset = 0;
2020 else
2021 oldSnapshotControl->head_offset = old_head + 1;
2022 oldSnapshotControl->xid_by_minute[old_head] = xmin;
2023 oldSnapshotControl->head_timestamp += USECS_PER_MINUTE;
2024 }
2025 else
2026 {
2027 /* Extend map to unused entry. */
2028 int new_tail = (oldSnapshotControl->head_offset
2029 + oldSnapshotControl->count_used)
2030 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
2031
2032 oldSnapshotControl->count_used++;
2033 oldSnapshotControl->xid_by_minute[new_tail] = xmin;
2034 }
2035 }
2036 }
2037 }
2038
2039 LWLockRelease(OldSnapshotTimeMapLock);
2040 }
2041
2042
2043 /*
2044 * Setup a snapshot that replaces normal catalog snapshots that allows catalog
2045 * access to behave just like it did at a certain point in the past.
2046 *
2047 * Needed for logical decoding.
2048 */
2049 void
SetupHistoricSnapshot(Snapshot historic_snapshot,HTAB * tuplecids)2050 SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
2051 {
2052 Assert(historic_snapshot != NULL);
2053
2054 /* setup the timetravel snapshot */
2055 HistoricSnapshot = historic_snapshot;
2056
2057 /* setup (cmin, cmax) lookup hash */
2058 tuplecid_data = tuplecids;
2059 }
2060
2061
2062 /*
2063 * Make catalog snapshots behave normally again.
2064 */
2065 void
TeardownHistoricSnapshot(bool is_error)2066 TeardownHistoricSnapshot(bool is_error)
2067 {
2068 HistoricSnapshot = NULL;
2069 tuplecid_data = NULL;
2070 }
2071
2072 bool
HistoricSnapshotActive(void)2073 HistoricSnapshotActive(void)
2074 {
2075 return HistoricSnapshot != NULL;
2076 }
2077
2078 HTAB *
HistoricSnapshotGetTupleCids(void)2079 HistoricSnapshotGetTupleCids(void)
2080 {
2081 Assert(HistoricSnapshotActive());
2082 return tuplecid_data;
2083 }
2084
2085 /*
2086 * EstimateSnapshotSpace
2087 * Returns the size needed to store the given snapshot.
2088 *
2089 * We are exporting only required fields from the Snapshot, stored in
2090 * SerializedSnapshotData.
2091 */
2092 Size
EstimateSnapshotSpace(Snapshot snap)2093 EstimateSnapshotSpace(Snapshot snap)
2094 {
2095 Size size;
2096
2097 Assert(snap != InvalidSnapshot);
2098 Assert(snap->snapshot_type == SNAPSHOT_MVCC);
2099
2100 /* We allocate any XID arrays needed in the same palloc block. */
2101 size = add_size(sizeof(SerializedSnapshotData),
2102 mul_size(snap->xcnt, sizeof(TransactionId)));
2103 if (snap->subxcnt > 0 &&
2104 (!snap->suboverflowed || snap->takenDuringRecovery))
2105 size = add_size(size,
2106 mul_size(snap->subxcnt, sizeof(TransactionId)));
2107
2108 return size;
2109 }
2110
2111 /*
2112 * SerializeSnapshot
2113 * Dumps the serialized snapshot (extracted from given snapshot) onto the
2114 * memory location at start_address.
2115 */
2116 void
SerializeSnapshot(Snapshot snapshot,char * start_address)2117 SerializeSnapshot(Snapshot snapshot, char *start_address)
2118 {
2119 SerializedSnapshotData serialized_snapshot;
2120
2121 Assert(snapshot->subxcnt >= 0);
2122
2123 /* Copy all required fields */
2124 serialized_snapshot.xmin = snapshot->xmin;
2125 serialized_snapshot.xmax = snapshot->xmax;
2126 serialized_snapshot.xcnt = snapshot->xcnt;
2127 serialized_snapshot.subxcnt = snapshot->subxcnt;
2128 serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2129 serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2130 serialized_snapshot.curcid = snapshot->curcid;
2131 serialized_snapshot.whenTaken = snapshot->whenTaken;
2132 serialized_snapshot.lsn = snapshot->lsn;
2133
2134 /*
2135 * Ignore the SubXID array if it has overflowed, unless the snapshot was
2136 * taken during recovery - in that case, top-level XIDs are in subxip as
2137 * well, and we mustn't lose them.
2138 */
2139 if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2140 serialized_snapshot.subxcnt = 0;
2141
2142 /* Copy struct to possibly-unaligned buffer */
2143 memcpy(start_address,
2144 &serialized_snapshot, sizeof(SerializedSnapshotData));
2145
2146 /* Copy XID array */
2147 if (snapshot->xcnt > 0)
2148 memcpy((TransactionId *) (start_address +
2149 sizeof(SerializedSnapshotData)),
2150 snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2151
2152 /*
2153 * Copy SubXID array. Don't bother to copy it if it had overflowed,
2154 * though, because it's not used anywhere in that case. Except if it's a
2155 * snapshot taken during recovery; all the top-level XIDs are in subxip as
2156 * well in that case, so we mustn't lose them.
2157 */
2158 if (serialized_snapshot.subxcnt > 0)
2159 {
2160 Size subxipoff = sizeof(SerializedSnapshotData) +
2161 snapshot->xcnt * sizeof(TransactionId);
2162
2163 memcpy((TransactionId *) (start_address + subxipoff),
2164 snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2165 }
2166 }
2167
2168 /*
2169 * RestoreSnapshot
2170 * Restore a serialized snapshot from the specified address.
2171 *
2172 * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2173 * to 0. The returned snapshot has the copied flag set.
2174 */
2175 Snapshot
RestoreSnapshot(char * start_address)2176 RestoreSnapshot(char *start_address)
2177 {
2178 SerializedSnapshotData serialized_snapshot;
2179 Size size;
2180 Snapshot snapshot;
2181 TransactionId *serialized_xids;
2182
2183 memcpy(&serialized_snapshot, start_address,
2184 sizeof(SerializedSnapshotData));
2185 serialized_xids = (TransactionId *)
2186 (start_address + sizeof(SerializedSnapshotData));
2187
2188 /* We allocate any XID arrays needed in the same palloc block. */
2189 size = sizeof(SnapshotData)
2190 + serialized_snapshot.xcnt * sizeof(TransactionId)
2191 + serialized_snapshot.subxcnt * sizeof(TransactionId);
2192
2193 /* Copy all required fields */
2194 snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2195 snapshot->snapshot_type = SNAPSHOT_MVCC;
2196 snapshot->xmin = serialized_snapshot.xmin;
2197 snapshot->xmax = serialized_snapshot.xmax;
2198 snapshot->xip = NULL;
2199 snapshot->xcnt = serialized_snapshot.xcnt;
2200 snapshot->subxip = NULL;
2201 snapshot->subxcnt = serialized_snapshot.subxcnt;
2202 snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2203 snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2204 snapshot->curcid = serialized_snapshot.curcid;
2205 snapshot->whenTaken = serialized_snapshot.whenTaken;
2206 snapshot->lsn = serialized_snapshot.lsn;
2207 snapshot->snapXactCompletionCount = 0;
2208
2209 /* Copy XIDs, if present. */
2210 if (serialized_snapshot.xcnt > 0)
2211 {
2212 snapshot->xip = (TransactionId *) (snapshot + 1);
2213 memcpy(snapshot->xip, serialized_xids,
2214 serialized_snapshot.xcnt * sizeof(TransactionId));
2215 }
2216
2217 /* Copy SubXIDs, if present. */
2218 if (serialized_snapshot.subxcnt > 0)
2219 {
2220 snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2221 serialized_snapshot.xcnt;
2222 memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2223 serialized_snapshot.subxcnt * sizeof(TransactionId));
2224 }
2225
2226 /* Set the copied flag so that the caller will set refcounts correctly. */
2227 snapshot->regd_count = 0;
2228 snapshot->active_count = 0;
2229 snapshot->copied = true;
2230
2231 return snapshot;
2232 }
2233
2234 /*
2235 * Install a restored snapshot as the transaction snapshot.
2236 *
2237 * The second argument is of type void * so that snapmgr.h need not include
2238 * the declaration for PGPROC.
2239 */
2240 void
RestoreTransactionSnapshot(Snapshot snapshot,void * source_pgproc)2241 RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
2242 {
2243 SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc);
2244 }
2245
2246 /*
2247 * XidInMVCCSnapshot
2248 * Is the given XID still-in-progress according to the snapshot?
2249 *
2250 * Note: GetSnapshotData never stores either top xid or subxids of our own
2251 * backend into a snapshot, so these xids will not be reported as "running"
2252 * by this function. This is OK for current uses, because we always check
2253 * TransactionIdIsCurrentTransactionId first, except when it's known the
2254 * XID could not be ours anyway.
2255 */
2256 bool
XidInMVCCSnapshot(TransactionId xid,Snapshot snapshot)2257 XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
2258 {
2259 uint32 i;
2260
2261 /*
2262 * Make a quick range check to eliminate most XIDs without looking at the
2263 * xip arrays. Note that this is OK even if we convert a subxact XID to
2264 * its parent below, because a subxact with XID < xmin has surely also got
2265 * a parent with XID < xmin, while one with XID >= xmax must belong to a
2266 * parent that was not yet committed at the time of this snapshot.
2267 */
2268
2269 /* Any xid < xmin is not in-progress */
2270 if (TransactionIdPrecedes(xid, snapshot->xmin))
2271 return false;
2272 /* Any xid >= xmax is in-progress */
2273 if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
2274 return true;
2275
2276 /*
2277 * Snapshot information is stored slightly differently in snapshots taken
2278 * during recovery.
2279 */
2280 if (!snapshot->takenDuringRecovery)
2281 {
2282 /*
2283 * If the snapshot contains full subxact data, the fastest way to
2284 * check things is just to compare the given XID against both subxact
2285 * XIDs and top-level XIDs. If the snapshot overflowed, we have to
2286 * use pg_subtrans to convert a subxact XID to its parent XID, but
2287 * then we need only look at top-level XIDs not subxacts.
2288 */
2289 if (!snapshot->suboverflowed)
2290 {
2291 /* we have full data, so search subxip */
2292 int32 j;
2293
2294 for (j = 0; j < snapshot->subxcnt; j++)
2295 {
2296 if (TransactionIdEquals(xid, snapshot->subxip[j]))
2297 return true;
2298 }
2299
2300 /* not there, fall through to search xip[] */
2301 }
2302 else
2303 {
2304 /*
2305 * Snapshot overflowed, so convert xid to top-level. This is safe
2306 * because we eliminated too-old XIDs above.
2307 */
2308 xid = SubTransGetTopmostTransaction(xid);
2309
2310 /*
2311 * If xid was indeed a subxact, we might now have an xid < xmin,
2312 * so recheck to avoid an array scan. No point in rechecking
2313 * xmax.
2314 */
2315 if (TransactionIdPrecedes(xid, snapshot->xmin))
2316 return false;
2317 }
2318
2319 for (i = 0; i < snapshot->xcnt; i++)
2320 {
2321 if (TransactionIdEquals(xid, snapshot->xip[i]))
2322 return true;
2323 }
2324 }
2325 else
2326 {
2327 int32 j;
2328
2329 /*
2330 * In recovery we store all xids in the subxact array because it is by
2331 * far the bigger array, and we mostly don't know which xids are
2332 * top-level and which are subxacts. The xip array is empty.
2333 *
2334 * We start by searching subtrans, if we overflowed.
2335 */
2336 if (snapshot->suboverflowed)
2337 {
2338 /*
2339 * Snapshot overflowed, so convert xid to top-level. This is safe
2340 * because we eliminated too-old XIDs above.
2341 */
2342 xid = SubTransGetTopmostTransaction(xid);
2343
2344 /*
2345 * If xid was indeed a subxact, we might now have an xid < xmin,
2346 * so recheck to avoid an array scan. No point in rechecking
2347 * xmax.
2348 */
2349 if (TransactionIdPrecedes(xid, snapshot->xmin))
2350 return false;
2351 }
2352
2353 /*
2354 * We now have either a top-level xid higher than xmin or an
2355 * indeterminate xid. We don't know whether it's top level or subxact
2356 * but it doesn't matter. If it's present, the xid is visible.
2357 */
2358 for (j = 0; j < snapshot->subxcnt; j++)
2359 {
2360 if (TransactionIdEquals(xid, snapshot->subxip[j]))
2361 return true;
2362 }
2363 }
2364
2365 return false;
2366 }
2367