1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  *	  Infrastructure for building historic catalog snapshots based on contents
6  *	  of the WAL, for the purpose of decoding heapam.c style values in the
7  *	  WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this module, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (cf. combocid.c). Since ComboCids are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph describing the SnapBuild->state transitions:
60  *
61  *		   +-------------------------+
62  *	  +----|		 START			 |-------------+
63  *	  |    +-------------------------+			   |
64  *	  |					|						   |
65  *	  |					|						   |
66  *	  |		   running_xacts #1					   |
67  *	  |					|						   |
68  *	  |					|						   |
69  *	  |					v						   |
70  *	  |    +-------------------------+			   v
71  *	  |    |   BUILDING_SNAPSHOT	 |------------>|
72  *	  |    +-------------------------+			   |
73  *	  |					|						   |
74  *	  |					|						   |
75  *	  | running_xacts #2, xacts from #1 finished   |
76  *	  |					|						   |
77  *	  |					|						   |
78  *	  |					v						   |
79  *	  |    +-------------------------+			   v
80  *	  |    |	   FULL_SNAPSHOT	 |------------>|
81  *	  |    +-------------------------+			   |
82  *	  |					|						   |
83  * running_xacts		|					   saved snapshot
84  * with zero xacts		|				  at running_xacts's lsn
85  *	  |					|						   |
86  *	  | running_xacts with xacts from #2 finished  |
87  *	  |					|						   |
88  *	  |					v						   |
89  *	  |    +-------------------------+			   |
90  *	  +--->|SNAPBUILD_CONSISTENT	 |<------------+
91  *		   +-------------------------+
92  *
93  * Initially the machinery is in the START stage. When an xl_running_xacts
94  * record is read that is sufficiently new (above the safe xmin horizon),
95  * there's a state transition. If there were no running xacts when the
96  * running_xacts record was generated, we'll directly go into CONSISTENT
97  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98  * snapshot means that all transactions that start henceforth can be decoded
99  * in their entirety, but transactions that started previously can't. In
100  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101  * running transactions have committed or aborted.
102  *
103  * Only transactions that commit after CONSISTENT state has been reached will
104  * be replayed, even though they might have started while still in
105  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106  * changes has been exported, but all the following ones will be. That point
107  * is a convenient point to initialize replication from, which is why we
108  * export a snapshot at that point, which *can* be used to read normal data.
109  *
110  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
111  *
112  * IDENTIFICATION
113  *	  src/backend/replication/snapbuild.c
114  *
115  *-------------------------------------------------------------------------
116  */
117 
118 #include "postgres.h"
119 
120 #include <sys/stat.h>
121 #include <unistd.h>
122 
123 #include "miscadmin.h"
124 
125 #include "access/heapam_xlog.h"
126 #include "access/transam.h"
127 #include "access/xact.h"
128 
129 #include "pgstat.h"
130 
131 #include "replication/logical.h"
132 #include "replication/reorderbuffer.h"
133 #include "replication/snapbuild.h"
134 
135 #include "utils/builtins.h"
136 #include "utils/memutils.h"
137 #include "utils/snapshot.h"
138 #include "utils/snapmgr.h"
139 
140 #include "storage/block.h"		/* debugging output */
141 #include "storage/fd.h"
142 #include "storage/lmgr.h"
143 #include "storage/proc.h"
144 #include "storage/procarray.h"
145 #include "storage/standby.h"
146 
147 /*
148  * This struct contains the current state of the snapshot building
149  * machinery. Besides a forward declaration in the header, it is not exposed
150  * to the public, so we can easily change its contents.
151  */
152 struct SnapBuild
153 {
154 	/* how far are we along building our first full snapshot */
155 	SnapBuildState state;
156 
157 	/* private memory context used to allocate memory for this module. */
158 	MemoryContext context;
159 
160 	/* all transactions < than this have committed/aborted */
161 	TransactionId xmin;
162 
163 	/* all transactions >= than this are uncommitted */
164 	TransactionId xmax;
165 
166 	/*
167 	 * Don't replay commits from an LSN < this LSN. This can be set externally
168 	 * but it will also be advanced (never retreat) from within snapbuild.c.
169 	 */
170 	XLogRecPtr	start_decoding_at;
171 
172 	/*
173 	 * Don't start decoding WAL until the "xl_running_xacts" information
174 	 * indicates there are no running xids with an xid smaller than this.
175 	 */
176 	TransactionId initial_xmin_horizon;
177 
178 	/* Indicates if we are building full snapshot or just catalog one. */
179 	bool		building_full_snapshot;
180 
181 	/*
182 	 * Snapshot that's valid to see the catalog state seen at this moment.
183 	 */
184 	Snapshot	snapshot;
185 
186 	/*
187 	 * LSN of the last location we are sure a snapshot has been serialized to.
188 	 */
189 	XLogRecPtr	last_serialized_snapshot;
190 
191 	/*
192 	 * The reorderbuffer we need to update with usable snapshots et al.
193 	 */
194 	ReorderBuffer *reorder;
195 
196 	/*
197 	 * Outdated: This struct isn't used for its original purpose anymore, but
198 	 * can't be removed / changed in a minor version, because it's stored
199 	 * on-disk.
200 	 */
201 	struct
202 	{
203 		/*
204 		 * NB: This field is misused, until a major version can break on-disk
205 		 * compatibility. See SnapBuildNextPhaseAt() /
206 		 * SnapBuildStartNextPhaseAt().
207 		 */
208 		TransactionId was_xmin;
209 		TransactionId was_xmax;
210 
211 		size_t		was_xcnt;	/* number of used xip entries */
212 		size_t		was_xcnt_space; /* allocated size of xip */
213 		TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
214 	}			was_running;
215 
216 	/*
217 	 * Array of transactions which could have catalog changes that committed
218 	 * between xmin and xmax.
219 	 */
220 	struct
221 	{
222 		/* number of committed transactions */
223 		size_t		xcnt;
224 
225 		/* available space for committed transactions */
226 		size_t		xcnt_space;
227 
228 		/*
229 		 * Until we reach a CONSISTENT state, we record commits of all
230 		 * transactions, not just the catalog changing ones. Record when that
231 		 * changes so we know we cannot export a snapshot safely anymore.
232 		 */
233 		bool		includes_all_transactions;
234 
235 		/*
236 		 * Array of committed transactions that have modified the catalog.
237 		 *
238 		 * As this array is frequently modified we do *not* keep it in
239 		 * xidComparator order. Instead we sort the array when building &
240 		 * distributing a snapshot.
241 		 *
242 		 * TODO: It's unclear whether that reasoning has much merit. Every
243 		 * time we add something here after becoming consistent will also
244 		 * require distributing a snapshot. Storing them sorted would
245 		 * potentially also make it easier to purge (but more complicated wrt
246 		 * wraparound?). Should be improved if sorting while building the
247 		 * snapshot shows up in profiles.
248 		 */
249 		TransactionId *xip;
250 	}			committed;
251 };
252 
253 /*
254  * Starting a transaction -- which we need to do while exporting a snapshot --
255  * removes knowledge about the previously used resowner, so we save it here.
256  */
257 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
258 static bool ExportInProgress = false;
259 
260 /* ->committed manipulation */
261 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
262 
263 /* snapshot building/manipulation/distribution functions */
264 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
265 
266 static void SnapBuildFreeSnapshot(Snapshot snap);
267 
268 static void SnapBuildSnapIncRefcount(Snapshot snap);
269 
270 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
271 
272 /* xlog reading helper functions for SnapBuildProcessRecord */
273 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
274 static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
275 
276 /* serialization functions */
277 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
278 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
279 
280 /*
281  * Return TransactionId after which the next phase of initial snapshot
282  * building will happen.
283  */
284 static inline TransactionId
SnapBuildNextPhaseAt(SnapBuild * builder)285 SnapBuildNextPhaseAt(SnapBuild *builder)
286 {
287 	/*
288 	 * For backward compatibility reasons this has to be stored in the wrongly
289 	 * named field.  Will be fixed in next major version.
290 	 */
291 	return builder->was_running.was_xmax;
292 }
293 
294 /*
295  * Set TransactionId after which the next phase of initial snapshot building
296  * will happen.
297  */
298 static inline void
SnapBuildStartNextPhaseAt(SnapBuild * builder,TransactionId at)299 SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
300 {
301 	/*
302 	 * For backward compatibility reasons this has to be stored in the wrongly
303 	 * named field.  Will be fixed in next major version.
304 	 */
305 	builder->was_running.was_xmax = at;
306 }
307 
308 /*
309  * Allocate a new snapshot builder.
310  *
311  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
312  * removed, start_lsn is the LSN >= we want to replay commits.
313  */
314 SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer * reorder,TransactionId xmin_horizon,XLogRecPtr start_lsn,bool need_full_snapshot)315 AllocateSnapshotBuilder(ReorderBuffer *reorder,
316 						TransactionId xmin_horizon,
317 						XLogRecPtr start_lsn,
318 						bool need_full_snapshot)
319 {
320 	MemoryContext context;
321 	MemoryContext oldcontext;
322 	SnapBuild  *builder;
323 
324 	/* allocate memory in own context, to have better accountability */
325 	context = AllocSetContextCreate(CurrentMemoryContext,
326 									"snapshot builder context",
327 									ALLOCSET_DEFAULT_SIZES);
328 	oldcontext = MemoryContextSwitchTo(context);
329 
330 	builder = palloc0(sizeof(SnapBuild));
331 
332 	builder->state = SNAPBUILD_START;
333 	builder->context = context;
334 	builder->reorder = reorder;
335 	/* Other struct members initialized by zeroing via palloc0 above */
336 
337 	builder->committed.xcnt = 0;
338 	builder->committed.xcnt_space = 128;	/* arbitrary number */
339 	builder->committed.xip =
340 		palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
341 	builder->committed.includes_all_transactions = true;
342 
343 	builder->initial_xmin_horizon = xmin_horizon;
344 	builder->start_decoding_at = start_lsn;
345 	builder->building_full_snapshot = need_full_snapshot;
346 
347 	MemoryContextSwitchTo(oldcontext);
348 
349 	return builder;
350 }
351 
352 /*
353  * Free a snapshot builder.
354  */
355 void
FreeSnapshotBuilder(SnapBuild * builder)356 FreeSnapshotBuilder(SnapBuild *builder)
357 {
358 	MemoryContext context = builder->context;
359 
360 	/* free snapshot explicitly, that contains some error checking */
361 	if (builder->snapshot != NULL)
362 	{
363 		SnapBuildSnapDecRefcount(builder->snapshot);
364 		builder->snapshot = NULL;
365 	}
366 
367 	/* other resources are deallocated via memory context reset */
368 	MemoryContextDelete(context);
369 }
370 
371 /*
372  * Free an unreferenced snapshot that has previously been built by us.
373  */
374 static void
SnapBuildFreeSnapshot(Snapshot snap)375 SnapBuildFreeSnapshot(Snapshot snap)
376 {
377 	/* make sure we don't get passed an external snapshot */
378 	Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
379 
380 	/* make sure nobody modified our snapshot */
381 	Assert(snap->curcid == FirstCommandId);
382 	Assert(!snap->suboverflowed);
383 	Assert(!snap->takenDuringRecovery);
384 	Assert(snap->regd_count == 0);
385 
386 	/* slightly more likely, so it's checked even without c-asserts */
387 	if (snap->copied)
388 		elog(ERROR, "cannot free a copied snapshot");
389 
390 	if (snap->active_count)
391 		elog(ERROR, "cannot free an active snapshot");
392 
393 	pfree(snap);
394 }
395 
396 /*
397  * In which state of snapshot building are we?
398  */
399 SnapBuildState
SnapBuildCurrentState(SnapBuild * builder)400 SnapBuildCurrentState(SnapBuild *builder)
401 {
402 	return builder->state;
403 }
404 
405 /*
406  * Should the contents of transaction ending at 'ptr' be decoded?
407  */
408 bool
SnapBuildXactNeedsSkip(SnapBuild * builder,XLogRecPtr ptr)409 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
410 {
411 	return ptr < builder->start_decoding_at;
412 }
413 
414 /*
415  * Increase refcount of a snapshot.
416  *
417  * This is used when handing out a snapshot to some external resource or when
418  * adding a Snapshot as builder->snapshot.
419  */
420 static void
SnapBuildSnapIncRefcount(Snapshot snap)421 SnapBuildSnapIncRefcount(Snapshot snap)
422 {
423 	snap->active_count++;
424 }
425 
426 /*
427  * Decrease refcount of a snapshot and free if the refcount reaches zero.
428  *
429  * Externally visible, so that external resources that have been handed an
430  * IncRef'ed Snapshot can adjust its refcount easily.
431  */
432 void
SnapBuildSnapDecRefcount(Snapshot snap)433 SnapBuildSnapDecRefcount(Snapshot snap)
434 {
435 	/* make sure we don't get passed an external snapshot */
436 	Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
437 
438 	/* make sure nobody modified our snapshot */
439 	Assert(snap->curcid == FirstCommandId);
440 	Assert(!snap->suboverflowed);
441 	Assert(!snap->takenDuringRecovery);
442 
443 	Assert(snap->regd_count == 0);
444 
445 	Assert(snap->active_count > 0);
446 
447 	/* slightly more likely, so it's checked even without casserts */
448 	if (snap->copied)
449 		elog(ERROR, "cannot free a copied snapshot");
450 
451 	snap->active_count--;
452 	if (snap->active_count == 0)
453 		SnapBuildFreeSnapshot(snap);
454 }
455 
456 /*
457  * Build a new snapshot, based on currently committed catalog-modifying
458  * transactions.
459  *
460  * In-progress transactions with catalog access are *not* allowed to modify
461  * these snapshots; they have to copy them and fill in appropriate ->curcid
462  * and ->subxip/subxcnt values.
463  */
464 static Snapshot
SnapBuildBuildSnapshot(SnapBuild * builder)465 SnapBuildBuildSnapshot(SnapBuild *builder)
466 {
467 	Snapshot	snapshot;
468 	Size		ssize;
469 
470 	Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
471 
472 	ssize = sizeof(SnapshotData)
473 		+ sizeof(TransactionId) * builder->committed.xcnt
474 		+ sizeof(TransactionId) * 1 /* toplevel xid */ ;
475 
476 	snapshot = MemoryContextAllocZero(builder->context, ssize);
477 
478 	snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
479 
480 	/*
481 	 * We misuse the original meaning of SnapshotData's xip and subxip fields
482 	 * to make the more fitting for our needs.
483 	 *
484 	 * In the 'xip' array we store transactions that have to be treated as
485 	 * committed. Since we will only ever look at tuples from transactions
486 	 * that have modified the catalog it's more efficient to store those few
487 	 * that exist between xmin and xmax (frequently there are none).
488 	 *
489 	 * Snapshots that are used in transactions that have modified the catalog
490 	 * also use the 'subxip' array to store their toplevel xid and all the
491 	 * subtransaction xids so we can recognize when we need to treat rows as
492 	 * visible that are not in xip but still need to be visible. Subxip only
493 	 * gets filled when the transaction is copied into the context of a
494 	 * catalog modifying transaction since we otherwise share a snapshot
495 	 * between transactions. As long as a txn hasn't modified the catalog it
496 	 * doesn't need to treat any uncommitted rows as visible, so there is no
497 	 * need for those xids.
498 	 *
499 	 * Both arrays are qsort'ed so that we can use bsearch() on them.
500 	 */
501 	Assert(TransactionIdIsNormal(builder->xmin));
502 	Assert(TransactionIdIsNormal(builder->xmax));
503 
504 	snapshot->xmin = builder->xmin;
505 	snapshot->xmax = builder->xmax;
506 
507 	/* store all transactions to be treated as committed by this snapshot */
508 	snapshot->xip =
509 		(TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
510 	snapshot->xcnt = builder->committed.xcnt;
511 	memcpy(snapshot->xip,
512 		   builder->committed.xip,
513 		   builder->committed.xcnt * sizeof(TransactionId));
514 
515 	/* sort so we can bsearch() */
516 	qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
517 
518 	/*
519 	 * Initially, subxip is empty, i.e. it's a snapshot to be used by
520 	 * transactions that don't modify the catalog. Will be filled by
521 	 * ReorderBufferCopySnap() if necessary.
522 	 */
523 	snapshot->subxcnt = 0;
524 	snapshot->subxip = NULL;
525 
526 	snapshot->suboverflowed = false;
527 	snapshot->takenDuringRecovery = false;
528 	snapshot->copied = false;
529 	snapshot->curcid = FirstCommandId;
530 	snapshot->active_count = 0;
531 	snapshot->regd_count = 0;
532 
533 	return snapshot;
534 }
535 
536 /*
537  * Build the initial slot snapshot and convert it to a normal snapshot that
538  * is understood by HeapTupleSatisfiesMVCC.
539  *
540  * The snapshot will be usable directly in current transaction or exported
541  * for loading in different transaction.
542  */
543 Snapshot
SnapBuildInitialSnapshot(SnapBuild * builder)544 SnapBuildInitialSnapshot(SnapBuild *builder)
545 {
546 	Snapshot	snap;
547 	TransactionId xid;
548 	TransactionId *newxip;
549 	int			newxcnt = 0;
550 
551 	Assert(!FirstSnapshotSet);
552 	Assert(XactIsoLevel == XACT_REPEATABLE_READ);
553 
554 	if (builder->state != SNAPBUILD_CONSISTENT)
555 		elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
556 
557 	if (!builder->committed.includes_all_transactions)
558 		elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
559 
560 	/* so we don't overwrite the existing value */
561 	if (TransactionIdIsValid(MyPgXact->xmin))
562 		elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
563 
564 	snap = SnapBuildBuildSnapshot(builder);
565 
566 	/*
567 	 * We know that snap->xmin is alive, enforced by the logical xmin
568 	 * mechanism. Due to that we can do this without locks, we're only
569 	 * changing our own value.
570 	 */
571 #ifdef USE_ASSERT_CHECKING
572 	{
573 		TransactionId safeXid;
574 
575 		LWLockAcquire(ProcArrayLock, LW_SHARED);
576 		safeXid = GetOldestSafeDecodingTransactionId(false);
577 		LWLockRelease(ProcArrayLock);
578 
579 		Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
580 	}
581 #endif
582 
583 	MyPgXact->xmin = snap->xmin;
584 
585 	/* allocate in transaction context */
586 	newxip = (TransactionId *)
587 		palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
588 
589 	/*
590 	 * snapbuild.c builds transactions in an "inverted" manner, which means it
591 	 * stores committed transactions in ->xip, not ones in progress. Build a
592 	 * classical snapshot by marking all non-committed transactions as
593 	 * in-progress. This can be expensive.
594 	 */
595 	for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
596 	{
597 		void	   *test;
598 
599 		/*
600 		 * Check whether transaction committed using the decoding snapshot
601 		 * meaning of ->xip.
602 		 */
603 		test = bsearch(&xid, snap->xip, snap->xcnt,
604 					   sizeof(TransactionId), xidComparator);
605 
606 		if (test == NULL)
607 		{
608 			if (newxcnt >= GetMaxSnapshotXidCount())
609 				ereport(ERROR,
610 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
611 						 errmsg("initial slot snapshot too large")));
612 
613 			newxip[newxcnt++] = xid;
614 		}
615 
616 		TransactionIdAdvance(xid);
617 	}
618 
619 	/* adjust remaining snapshot fields as needed */
620 	snap->snapshot_type = SNAPSHOT_MVCC;
621 	snap->xcnt = newxcnt;
622 	snap->xip = newxip;
623 
624 	return snap;
625 }
626 
627 /*
628  * Export a snapshot so it can be set in another session with SET TRANSACTION
629  * SNAPSHOT.
630  *
631  * For that we need to start a transaction in the current backend as the
632  * importing side checks whether the source transaction is still open to make
633  * sure the xmin horizon hasn't advanced since then.
634  */
635 const char *
SnapBuildExportSnapshot(SnapBuild * builder)636 SnapBuildExportSnapshot(SnapBuild *builder)
637 {
638 	Snapshot	snap;
639 	char	   *snapname;
640 
641 	if (IsTransactionOrTransactionBlock())
642 		elog(ERROR, "cannot export a snapshot from within a transaction");
643 
644 	if (SavedResourceOwnerDuringExport)
645 		elog(ERROR, "can only export one snapshot at a time");
646 
647 	SavedResourceOwnerDuringExport = CurrentResourceOwner;
648 	ExportInProgress = true;
649 
650 	StartTransactionCommand();
651 
652 	/* There doesn't seem to a nice API to set these */
653 	XactIsoLevel = XACT_REPEATABLE_READ;
654 	XactReadOnly = true;
655 
656 	snap = SnapBuildInitialSnapshot(builder);
657 
658 	/*
659 	 * now that we've built a plain snapshot, make it active and use the
660 	 * normal mechanisms for exporting it
661 	 */
662 	snapname = ExportSnapshot(snap);
663 
664 	ereport(LOG,
665 			(errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
666 						   "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
667 						   snap->xcnt,
668 						   snapname, snap->xcnt)));
669 	return snapname;
670 }
671 
672 /*
673  * Ensure there is a snapshot and if not build one for current transaction.
674  */
675 Snapshot
SnapBuildGetOrBuildSnapshot(SnapBuild * builder,TransactionId xid)676 SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
677 {
678 	Assert(builder->state == SNAPBUILD_CONSISTENT);
679 
680 	/* only build a new snapshot if we don't have a prebuilt one */
681 	if (builder->snapshot == NULL)
682 	{
683 		builder->snapshot = SnapBuildBuildSnapshot(builder);
684 		/* increase refcount for the snapshot builder */
685 		SnapBuildSnapIncRefcount(builder->snapshot);
686 	}
687 
688 	return builder->snapshot;
689 }
690 
691 /*
692  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
693  * any. Aborts the previously started transaction and resets the resource
694  * owner back to its original value.
695  */
696 void
SnapBuildClearExportedSnapshot(void)697 SnapBuildClearExportedSnapshot(void)
698 {
699 	ResourceOwner tmpResOwner;
700 
701 	/* nothing exported, that is the usual case */
702 	if (!ExportInProgress)
703 		return;
704 
705 	if (!IsTransactionState())
706 		elog(ERROR, "clearing exported snapshot in wrong transaction state");
707 
708 	/*
709 	 * AbortCurrentTransaction() takes care of resetting the snapshot state,
710 	 * so remember SavedResourceOwnerDuringExport.
711 	 */
712 	tmpResOwner = SavedResourceOwnerDuringExport;
713 
714 	/* make sure nothing could have ever happened */
715 	AbortCurrentTransaction();
716 
717 	CurrentResourceOwner = tmpResOwner;
718 }
719 
720 /*
721  * Clear snapshot export state during transaction abort.
722  */
723 void
SnapBuildResetExportedSnapshotState(void)724 SnapBuildResetExportedSnapshotState(void)
725 {
726 	SavedResourceOwnerDuringExport = NULL;
727 	ExportInProgress = false;
728 }
729 
730 /*
731  * Handle the effects of a single heap change, appropriate to the current state
732  * of the snapshot builder and returns whether changes made at (xid, lsn) can
733  * be decoded.
734  */
735 bool
SnapBuildProcessChange(SnapBuild * builder,TransactionId xid,XLogRecPtr lsn)736 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
737 {
738 	/*
739 	 * We can't handle data in transactions if we haven't built a snapshot
740 	 * yet, so don't store them.
741 	 */
742 	if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
743 		return false;
744 
745 	/*
746 	 * No point in keeping track of changes in transactions that we don't have
747 	 * enough information about to decode. This means that they started before
748 	 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
749 	 */
750 	if (builder->state < SNAPBUILD_CONSISTENT &&
751 		TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
752 		return false;
753 
754 	/*
755 	 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
756 	 * be needed to decode the change we're currently processing.
757 	 */
758 	if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
759 	{
760 		/* only build a new snapshot if we don't have a prebuilt one */
761 		if (builder->snapshot == NULL)
762 		{
763 			builder->snapshot = SnapBuildBuildSnapshot(builder);
764 			/* increase refcount for the snapshot builder */
765 			SnapBuildSnapIncRefcount(builder->snapshot);
766 		}
767 
768 		/*
769 		 * Increase refcount for the transaction we're handing the snapshot
770 		 * out to.
771 		 */
772 		SnapBuildSnapIncRefcount(builder->snapshot);
773 		ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
774 									 builder->snapshot);
775 	}
776 
777 	return true;
778 }
779 
780 /*
781  * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
782  * This implies that a transaction has done some form of write to system
783  * catalogs.
784  */
785 void
SnapBuildProcessNewCid(SnapBuild * builder,TransactionId xid,XLogRecPtr lsn,xl_heap_new_cid * xlrec)786 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
787 					   XLogRecPtr lsn, xl_heap_new_cid *xlrec)
788 {
789 	CommandId	cid;
790 
791 	/*
792 	 * we only log new_cid's if a catalog tuple was modified, so mark the
793 	 * transaction as containing catalog modifications
794 	 */
795 	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
796 
797 	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
798 								 xlrec->target_node, xlrec->target_tid,
799 								 xlrec->cmin, xlrec->cmax,
800 								 xlrec->combocid);
801 
802 	/* figure out new command id */
803 	if (xlrec->cmin != InvalidCommandId &&
804 		xlrec->cmax != InvalidCommandId)
805 		cid = Max(xlrec->cmin, xlrec->cmax);
806 	else if (xlrec->cmax != InvalidCommandId)
807 		cid = xlrec->cmax;
808 	else if (xlrec->cmin != InvalidCommandId)
809 		cid = xlrec->cmin;
810 	else
811 	{
812 		cid = InvalidCommandId; /* silence compiler */
813 		elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
814 	}
815 
816 	ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
817 }
818 
819 /*
820  * Add a new Snapshot to all transactions we're decoding that currently are
821  * in-progress so they can see new catalog contents made by the transaction
822  * that just committed. This is necessary because those in-progress
823  * transactions will use the new catalog's contents from here on (at the very
824  * least everything they do needs to be compatible with newer catalog
825  * contents).
826  */
827 static void
SnapBuildDistributeNewCatalogSnapshot(SnapBuild * builder,XLogRecPtr lsn)828 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
829 {
830 	dlist_iter	txn_i;
831 	ReorderBufferTXN *txn;
832 
833 	/*
834 	 * Iterate through all toplevel transactions. This can include
835 	 * subtransactions which we just don't yet know to be that, but that's
836 	 * fine, they will just get an unnecessary snapshot queued.
837 	 */
838 	dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
839 	{
840 		txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
841 
842 		Assert(TransactionIdIsValid(txn->xid));
843 
844 		/*
845 		 * If we don't have a base snapshot yet, there are no changes in this
846 		 * transaction which in turn implies we don't yet need a snapshot at
847 		 * all. We'll add a snapshot when the first change gets queued.
848 		 *
849 		 * NB: This works correctly even for subtransactions because
850 		 * ReorderBufferAssignChild() takes care to transfer the base snapshot
851 		 * to the top-level transaction, and while iterating the changequeue
852 		 * we'll get the change from the subtxn.
853 		 */
854 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
855 			continue;
856 
857 		elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
858 			 txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
859 
860 		/*
861 		 * increase the snapshot's refcount for the transaction we are handing
862 		 * it out to
863 		 */
864 		SnapBuildSnapIncRefcount(builder->snapshot);
865 		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
866 								 builder->snapshot);
867 	}
868 }
869 
870 /*
871  * Keep track of a new catalog changing transaction that has committed.
872  */
873 static void
SnapBuildAddCommittedTxn(SnapBuild * builder,TransactionId xid)874 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
875 {
876 	Assert(TransactionIdIsValid(xid));
877 
878 	if (builder->committed.xcnt == builder->committed.xcnt_space)
879 	{
880 		builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
881 
882 		elog(DEBUG1, "increasing space for committed transactions to %u",
883 			 (uint32) builder->committed.xcnt_space);
884 
885 		builder->committed.xip = repalloc(builder->committed.xip,
886 										  builder->committed.xcnt_space * sizeof(TransactionId));
887 	}
888 
889 	/*
890 	 * TODO: It might make sense to keep the array sorted here instead of
891 	 * doing it every time we build a new snapshot. On the other hand this
892 	 * gets called repeatedly when a transaction with subtransactions commits.
893 	 */
894 	builder->committed.xip[builder->committed.xcnt++] = xid;
895 }
896 
897 /*
898  * Remove knowledge about transactions we treat as committed that are smaller
899  * than ->xmin. Those won't ever get checked via the ->committed array but via
900  * the clog machinery, so we don't need to waste memory on them.
901  */
902 static void
SnapBuildPurgeCommittedTxn(SnapBuild * builder)903 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
904 {
905 	int			off;
906 	TransactionId *workspace;
907 	int			surviving_xids = 0;
908 
909 	/* not ready yet */
910 	if (!TransactionIdIsNormal(builder->xmin))
911 		return;
912 
913 	/* TODO: Neater algorithm than just copying and iterating? */
914 	workspace =
915 		MemoryContextAlloc(builder->context,
916 						   builder->committed.xcnt * sizeof(TransactionId));
917 
918 	/* copy xids that still are interesting to workspace */
919 	for (off = 0; off < builder->committed.xcnt; off++)
920 	{
921 		if (NormalTransactionIdPrecedes(builder->committed.xip[off],
922 										builder->xmin))
923 			;					/* remove */
924 		else
925 			workspace[surviving_xids++] = builder->committed.xip[off];
926 	}
927 
928 	/* copy workspace back to persistent state */
929 	memcpy(builder->committed.xip, workspace,
930 		   surviving_xids * sizeof(TransactionId));
931 
932 	elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
933 		 (uint32) builder->committed.xcnt, (uint32) surviving_xids,
934 		 builder->xmin, builder->xmax);
935 	builder->committed.xcnt = surviving_xids;
936 
937 	pfree(workspace);
938 }
939 
940 /*
941  * Handle everything that needs to be done when a transaction commits
942  */
943 void
SnapBuildCommitTxn(SnapBuild * builder,XLogRecPtr lsn,TransactionId xid,int nsubxacts,TransactionId * subxacts)944 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
945 				   int nsubxacts, TransactionId *subxacts)
946 {
947 	int			nxact;
948 
949 	bool		needs_snapshot = false;
950 	bool		needs_timetravel = false;
951 	bool		sub_needs_timetravel = false;
952 
953 	TransactionId xmax = xid;
954 
955 	/*
956 	 * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
957 	 * will they be part of a snapshot.  So we don't need to record anything.
958 	 */
959 	if (builder->state == SNAPBUILD_START ||
960 		(builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
961 		 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
962 	{
963 		/* ensure that only commits after this are getting replayed */
964 		if (builder->start_decoding_at <= lsn)
965 			builder->start_decoding_at = lsn + 1;
966 		return;
967 	}
968 
969 	if (builder->state < SNAPBUILD_CONSISTENT)
970 	{
971 		/* ensure that only commits after this are getting replayed */
972 		if (builder->start_decoding_at <= lsn)
973 			builder->start_decoding_at = lsn + 1;
974 
975 		/*
976 		 * If building an exportable snapshot, force xid to be tracked, even
977 		 * if the transaction didn't modify the catalog.
978 		 */
979 		if (builder->building_full_snapshot)
980 		{
981 			needs_timetravel = true;
982 		}
983 	}
984 
985 	for (nxact = 0; nxact < nsubxacts; nxact++)
986 	{
987 		TransactionId subxid = subxacts[nxact];
988 
989 		/*
990 		 * Add subtransaction to base snapshot if catalog modifying, we don't
991 		 * distinguish to toplevel transactions there.
992 		 */
993 		if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
994 		{
995 			sub_needs_timetravel = true;
996 			needs_snapshot = true;
997 
998 			elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
999 				 xid, subxid);
1000 
1001 			SnapBuildAddCommittedTxn(builder, subxid);
1002 
1003 			if (NormalTransactionIdFollows(subxid, xmax))
1004 				xmax = subxid;
1005 		}
1006 
1007 		/*
1008 		 * If we're forcing timetravel we also need visibility information
1009 		 * about subtransaction, so keep track of subtransaction's state, even
1010 		 * if not catalog modifying.  Don't need to distribute a snapshot in
1011 		 * that case.
1012 		 */
1013 		else if (needs_timetravel)
1014 		{
1015 			SnapBuildAddCommittedTxn(builder, subxid);
1016 			if (NormalTransactionIdFollows(subxid, xmax))
1017 				xmax = subxid;
1018 		}
1019 	}
1020 
1021 	/* if top-level modified catalog, it'll need a snapshot */
1022 	if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1023 	{
1024 		elog(DEBUG2, "found top level transaction %u, with catalog changes",
1025 			 xid);
1026 		needs_snapshot = true;
1027 		needs_timetravel = true;
1028 		SnapBuildAddCommittedTxn(builder, xid);
1029 	}
1030 	else if (sub_needs_timetravel)
1031 	{
1032 		/* track toplevel txn as well, subxact alone isn't meaningful */
1033 		SnapBuildAddCommittedTxn(builder, xid);
1034 	}
1035 	else if (needs_timetravel)
1036 	{
1037 		elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1038 
1039 		SnapBuildAddCommittedTxn(builder, xid);
1040 	}
1041 
1042 	if (!needs_timetravel)
1043 	{
1044 		/* record that we cannot export a general snapshot anymore */
1045 		builder->committed.includes_all_transactions = false;
1046 	}
1047 
1048 	Assert(!needs_snapshot || needs_timetravel);
1049 
1050 	/*
1051 	 * Adjust xmax of the snapshot builder, we only do that for committed,
1052 	 * catalog modifying, transactions, everything else isn't interesting for
1053 	 * us since we'll never look at the respective rows.
1054 	 */
1055 	if (needs_timetravel &&
1056 		(!TransactionIdIsValid(builder->xmax) ||
1057 		 TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1058 	{
1059 		builder->xmax = xmax;
1060 		TransactionIdAdvance(builder->xmax);
1061 	}
1062 
1063 	/* if there's any reason to build a historic snapshot, do so now */
1064 	if (needs_snapshot)
1065 	{
1066 		/*
1067 		 * If we haven't built a complete snapshot yet there's no need to hand
1068 		 * it out, it wouldn't (and couldn't) be used anyway.
1069 		 */
1070 		if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1071 			return;
1072 
1073 		/*
1074 		 * Decrease the snapshot builder's refcount of the old snapshot, note
1075 		 * that it still will be used if it has been handed out to the
1076 		 * reorderbuffer earlier.
1077 		 */
1078 		if (builder->snapshot)
1079 			SnapBuildSnapDecRefcount(builder->snapshot);
1080 
1081 		builder->snapshot = SnapBuildBuildSnapshot(builder);
1082 
1083 		/* we might need to execute invalidations, add snapshot */
1084 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1085 		{
1086 			SnapBuildSnapIncRefcount(builder->snapshot);
1087 			ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1088 										 builder->snapshot);
1089 		}
1090 
1091 		/* refcount of the snapshot builder for the new snapshot */
1092 		SnapBuildSnapIncRefcount(builder->snapshot);
1093 
1094 		/* add a new catalog snapshot to all currently running transactions */
1095 		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1096 	}
1097 }
1098 
1099 
1100 /* -----------------------------------
1101  * Snapshot building functions dealing with xlog records
1102  * -----------------------------------
1103  */
1104 
1105 /*
1106  * Process a running xacts record, and use its information to first build a
1107  * historic snapshot and later to release resources that aren't needed
1108  * anymore.
1109  */
1110 void
SnapBuildProcessRunningXacts(SnapBuild * builder,XLogRecPtr lsn,xl_running_xacts * running)1111 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1112 {
1113 	ReorderBufferTXN *txn;
1114 	TransactionId xmin;
1115 
1116 	/*
1117 	 * If we're not consistent yet, inspect the record to see whether it
1118 	 * allows to get closer to being consistent. If we are consistent, dump
1119 	 * our snapshot so others or we, after a restart, can use it.
1120 	 */
1121 	if (builder->state < SNAPBUILD_CONSISTENT)
1122 	{
1123 		/* returns false if there's no point in performing cleanup just yet */
1124 		if (!SnapBuildFindSnapshot(builder, lsn, running))
1125 			return;
1126 	}
1127 	else
1128 		SnapBuildSerialize(builder, lsn);
1129 
1130 	/*
1131 	 * Update range of interesting xids based on the running xacts
1132 	 * information. We don't increase ->xmax using it, because once we are in
1133 	 * a consistent state we can do that ourselves and much more efficiently
1134 	 * so, because we only need to do it for catalog transactions since we
1135 	 * only ever look at those.
1136 	 *
1137 	 * NB: We only increase xmax when a catalog modifying transaction commits
1138 	 * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
1139 	 * xmin, which looks odd but is correct and actually more efficient, since
1140 	 * we hit fast paths in heapam_visibility.c.
1141 	 */
1142 	builder->xmin = running->oldestRunningXid;
1143 
1144 	/* Remove transactions we don't need to keep track off anymore */
1145 	SnapBuildPurgeCommittedTxn(builder);
1146 
1147 	/*
1148 	 * Advance the xmin limit for the current replication slot, to allow
1149 	 * vacuum to clean up the tuples this slot has been protecting.
1150 	 *
1151 	 * The reorderbuffer might have an xmin among the currently running
1152 	 * snapshots; use it if so.  If not, we need only consider the snapshots
1153 	 * we'll produce later, which can't be less than the oldest running xid in
1154 	 * the record we're reading now.
1155 	 */
1156 	xmin = ReorderBufferGetOldestXmin(builder->reorder);
1157 	if (xmin == InvalidTransactionId)
1158 		xmin = running->oldestRunningXid;
1159 	elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1160 		 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1161 	LogicalIncreaseXminForSlot(lsn, xmin);
1162 
1163 	/*
1164 	 * Also tell the slot where we can restart decoding from. We don't want to
1165 	 * do that after every commit because changing that implies an fsync of
1166 	 * the logical slot's state file, so we only do it every time we see a
1167 	 * running xacts record.
1168 	 *
1169 	 * Do so by looking for the oldest in progress transaction (determined by
1170 	 * the first LSN of any of its relevant records). Every transaction
1171 	 * remembers the last location we stored the snapshot to disk before its
1172 	 * beginning. That point is where we can restart from.
1173 	 */
1174 
1175 	/*
1176 	 * Can't know about a serialized snapshot's location if we're not
1177 	 * consistent.
1178 	 */
1179 	if (builder->state < SNAPBUILD_CONSISTENT)
1180 		return;
1181 
1182 	txn = ReorderBufferGetOldestTXN(builder->reorder);
1183 
1184 	/*
1185 	 * oldest ongoing txn might have started when we didn't yet serialize
1186 	 * anything because we hadn't reached a consistent state yet.
1187 	 */
1188 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1189 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1190 
1191 	/*
1192 	 * No in-progress transaction, can reuse the last serialized snapshot if
1193 	 * we have one.
1194 	 */
1195 	else if (txn == NULL &&
1196 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1197 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
1198 		LogicalIncreaseRestartDecodingForSlot(lsn,
1199 											  builder->last_serialized_snapshot);
1200 }
1201 
1202 
1203 /*
1204  * Build the start of a snapshot that's capable of decoding the catalog.
1205  *
1206  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1207  * consistent.
1208  *
1209  * Returns true if there is a point in performing internal maintenance/cleanup
1210  * using the xl_running_xacts record.
1211  */
1212 static bool
SnapBuildFindSnapshot(SnapBuild * builder,XLogRecPtr lsn,xl_running_xacts * running)1213 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1214 {
1215 	/* ---
1216 	 * Build catalog decoding snapshot incrementally using information about
1217 	 * the currently running transactions. There are several ways to do that:
1218 	 *
1219 	 * a) There were no running transactions when the xl_running_xacts record
1220 	 *	  was inserted, jump to CONSISTENT immediately. We might find such a
1221 	 *	  state while waiting on c)'s sub-states.
1222 	 *
1223 	 * b) This (in a previous run) or another decoding slot serialized a
1224 	 *	  snapshot to disk that we can use.  Can't use this method for the
1225 	 *	  initial snapshot when slot is being created and needs full snapshot
1226 	 *	  for export or direct use, as that snapshot will only contain catalog
1227 	 *	  modifying transactions.
1228 	 *
1229 	 * c) First incrementally build a snapshot for catalog tuples
1230 	 *	  (BUILDING_SNAPSHOT), that requires all, already in-progress,
1231 	 *	  transactions to finish.  Every transaction starting after that
1232 	 *	  (FULL_SNAPSHOT state), has enough information to be decoded.  But
1233 	 *	  for older running transactions no viable snapshot exists yet, so
1234 	 *	  CONSISTENT will only be reached once all of those have finished.
1235 	 * ---
1236 	 */
1237 
1238 	/*
1239 	 * xl_running_xact record is older than what we can use, we might not have
1240 	 * all necessary catalog rows anymore.
1241 	 */
1242 	if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1243 		NormalTransactionIdPrecedes(running->oldestRunningXid,
1244 									builder->initial_xmin_horizon))
1245 	{
1246 		ereport(DEBUG1,
1247 				(errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1248 								 (uint32) (lsn >> 32), (uint32) lsn),
1249 				 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1250 									builder->initial_xmin_horizon, running->oldestRunningXid)));
1251 
1252 
1253 		SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1254 
1255 		return true;
1256 	}
1257 
1258 	/*
1259 	 * a) No transaction were running, we can jump to consistent.
1260 	 *
1261 	 * This is not affected by races around xl_running_xacts, because we can
1262 	 * miss transaction commits, but currently not transactions starting.
1263 	 *
1264 	 * NB: We might have already started to incrementally assemble a snapshot,
1265 	 * so we need to be careful to deal with that.
1266 	 */
1267 	if (running->oldestRunningXid == running->nextXid)
1268 	{
1269 		if (builder->start_decoding_at == InvalidXLogRecPtr ||
1270 			builder->start_decoding_at <= lsn)
1271 			/* can decode everything after this */
1272 			builder->start_decoding_at = lsn + 1;
1273 
1274 		/* As no transactions were running xmin/xmax can be trivially set. */
1275 		builder->xmin = running->nextXid;	/* < are finished */
1276 		builder->xmax = running->nextXid;	/* >= are running */
1277 
1278 		/* so we can safely use the faster comparisons */
1279 		Assert(TransactionIdIsNormal(builder->xmin));
1280 		Assert(TransactionIdIsNormal(builder->xmax));
1281 
1282 		builder->state = SNAPBUILD_CONSISTENT;
1283 		SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1284 
1285 		ereport(LOG,
1286 				(errmsg("logical decoding found consistent point at %X/%X",
1287 						(uint32) (lsn >> 32), (uint32) lsn),
1288 				 errdetail("There are no running transactions.")));
1289 
1290 		return false;
1291 	}
1292 	/* b) valid on disk state and not building full snapshot */
1293 	else if (!builder->building_full_snapshot &&
1294 			 SnapBuildRestore(builder, lsn))
1295 	{
1296 		/* there won't be any state to cleanup */
1297 		return false;
1298 	}
1299 
1300 	/*
1301 	 * c) transition from START to BUILDING_SNAPSHOT.
1302 	 *
1303 	 * In START state, and a xl_running_xacts record with running xacts is
1304 	 * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
1305 	 * record xl_running_xacts->nextXid.  Once all running xacts have finished
1306 	 * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
1307 	 * might look that we could use xl_running_xact's ->xids information to
1308 	 * get there quicker, but that is problematic because transactions marked
1309 	 * as running, might already have inserted their commit record - it's
1310 	 * infeasible to change that with locking.
1311 	 */
1312 	else if (builder->state == SNAPBUILD_START)
1313 	{
1314 		builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1315 		SnapBuildStartNextPhaseAt(builder, running->nextXid);
1316 
1317 		/*
1318 		 * Start with an xmin/xmax that's correct for future, when all the
1319 		 * currently running transactions have finished. We'll update both
1320 		 * while waiting for the pending transactions to finish.
1321 		 */
1322 		builder->xmin = running->nextXid;	/* < are finished */
1323 		builder->xmax = running->nextXid;	/* >= are running */
1324 
1325 		/* so we can safely use the faster comparisons */
1326 		Assert(TransactionIdIsNormal(builder->xmin));
1327 		Assert(TransactionIdIsNormal(builder->xmax));
1328 
1329 		ereport(LOG,
1330 				(errmsg("logical decoding found initial starting point at %X/%X",
1331 						(uint32) (lsn >> 32), (uint32) lsn),
1332 				 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1333 						   running->xcnt, running->nextXid)));
1334 
1335 		SnapBuildWaitSnapshot(running, running->nextXid);
1336 	}
1337 
1338 	/*
1339 	 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1340 	 *
1341 	 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1342 	 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
1343 	 * means all transactions starting afterwards have enough information to
1344 	 * be decoded.  Switch to FULL_SNAPSHOT.
1345 	 */
1346 	else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1347 			 TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1348 										   running->oldestRunningXid))
1349 	{
1350 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
1351 		SnapBuildStartNextPhaseAt(builder, running->nextXid);
1352 
1353 		ereport(LOG,
1354 				(errmsg("logical decoding found initial consistent point at %X/%X",
1355 						(uint32) (lsn >> 32), (uint32) lsn),
1356 				 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1357 						   running->xcnt, running->nextXid)));
1358 
1359 		SnapBuildWaitSnapshot(running, running->nextXid);
1360 	}
1361 
1362 	/*
1363 	 * c) transition from FULL_SNAPSHOT to CONSISTENT.
1364 	 *
1365 	 * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
1366 	 * oldestRunningXid is >= than nextXid from when we switched to
1367 	 * FULL_SNAPSHOT.  This means all transactions that are currently in
1368 	 * progress have a catalog snapshot, and all their changes have been
1369 	 * collected.  Switch to CONSISTENT.
1370 	 */
1371 	else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1372 			 TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1373 										   running->oldestRunningXid))
1374 	{
1375 		builder->state = SNAPBUILD_CONSISTENT;
1376 		SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1377 
1378 		ereport(LOG,
1379 				(errmsg("logical decoding found consistent point at %X/%X",
1380 						(uint32) (lsn >> 32), (uint32) lsn),
1381 				 errdetail("There are no old transactions anymore.")));
1382 	}
1383 
1384 	/*
1385 	 * We already started to track running xacts and need to wait for all
1386 	 * in-progress ones to finish. We fall through to the normal processing of
1387 	 * records so incremental cleanup can be performed.
1388 	 */
1389 	return true;
1390 
1391 }
1392 
1393 /* ---
1394  * Iterate through xids in record, wait for all older than the cutoff to
1395  * finish.  Then, if possible, log a new xl_running_xacts record.
1396  *
1397  * This isn't required for the correctness of decoding, but to:
1398  * a) allow isolationtester to notice that we're currently waiting for
1399  *	  something.
1400  * b) log a new xl_running_xacts record where it'd be helpful, without having
1401  *	  to write for bgwriter or checkpointer.
1402  * ---
1403  */
1404 static void
SnapBuildWaitSnapshot(xl_running_xacts * running,TransactionId cutoff)1405 SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1406 {
1407 	int			off;
1408 
1409 	for (off = 0; off < running->xcnt; off++)
1410 	{
1411 		TransactionId xid = running->xids[off];
1412 
1413 		/*
1414 		 * Upper layers should prevent that we ever need to wait on ourselves.
1415 		 * Check anyway, since failing to do so would either result in an
1416 		 * endless wait or an Assert() failure.
1417 		 */
1418 		if (TransactionIdIsCurrentTransactionId(xid))
1419 			elog(ERROR, "waiting for ourselves");
1420 
1421 		if (TransactionIdFollows(xid, cutoff))
1422 			continue;
1423 
1424 		XactLockTableWait(xid, NULL, NULL, XLTW_None);
1425 	}
1426 
1427 	/*
1428 	 * All transactions we needed to finish finished - try to ensure there is
1429 	 * another xl_running_xacts record in a timely manner, without having to
1430 	 * write for bgwriter or checkpointer to log one.  During recovery we
1431 	 * can't enforce that, so we'll have to wait.
1432 	 */
1433 	if (!RecoveryInProgress())
1434 	{
1435 		LogStandbySnapshot();
1436 	}
1437 }
1438 
1439 /* -----------------------------------
1440  * Snapshot serialization support
1441  * -----------------------------------
1442  */
1443 
1444 /*
1445  * We store current state of struct SnapBuild on disk in the following manner:
1446  *
1447  * struct SnapBuildOnDisk;
1448  * TransactionId * running.xcnt_space;
1449  * TransactionId * committed.xcnt; (*not xcnt_space*)
1450  *
1451  */
1452 typedef struct SnapBuildOnDisk
1453 {
1454 	/* first part of this struct needs to be version independent */
1455 
1456 	/* data not covered by checksum */
1457 	uint32		magic;
1458 	pg_crc32c	checksum;
1459 
1460 	/* data covered by checksum */
1461 
1462 	/* version, in case we want to support pg_upgrade */
1463 	uint32		version;
1464 	/* how large is the on disk data, excluding the constant sized part */
1465 	uint32		length;
1466 
1467 	/* version dependent part */
1468 	SnapBuild	builder;
1469 
1470 	/* variable amount of TransactionIds follows */
1471 } SnapBuildOnDisk;
1472 
1473 #define SnapBuildOnDiskConstantSize \
1474 	offsetof(SnapBuildOnDisk, builder)
1475 #define SnapBuildOnDiskNotChecksummedSize \
1476 	offsetof(SnapBuildOnDisk, version)
1477 
1478 #define SNAPBUILD_MAGIC 0x51A1E001
1479 #define SNAPBUILD_VERSION 2
1480 
1481 /*
1482  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1483  *
1484  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1485  * a record that's a potential location for a serialized snapshot.
1486  */
1487 void
SnapBuildSerializationPoint(SnapBuild * builder,XLogRecPtr lsn)1488 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1489 {
1490 	if (builder->state < SNAPBUILD_CONSISTENT)
1491 		SnapBuildRestore(builder, lsn);
1492 	else
1493 		SnapBuildSerialize(builder, lsn);
1494 }
1495 
1496 /*
1497  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1498  * been done by another decoding process.
1499  */
1500 static void
SnapBuildSerialize(SnapBuild * builder,XLogRecPtr lsn)1501 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1502 {
1503 	Size		needed_length;
1504 	SnapBuildOnDisk *ondisk = NULL;
1505 	char	   *ondisk_c;
1506 	int			fd;
1507 	char		tmppath[MAXPGPATH];
1508 	char		path[MAXPGPATH];
1509 	int			ret;
1510 	struct stat stat_buf;
1511 	Size		sz;
1512 
1513 	Assert(lsn != InvalidXLogRecPtr);
1514 	Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1515 		   builder->last_serialized_snapshot <= lsn);
1516 
1517 	/*
1518 	 * no point in serializing if we cannot continue to work immediately after
1519 	 * restoring the snapshot
1520 	 */
1521 	if (builder->state < SNAPBUILD_CONSISTENT)
1522 		return;
1523 
1524 	/*
1525 	 * We identify snapshots by the LSN they are valid for. We don't need to
1526 	 * include timelines in the name as each LSN maps to exactly one timeline
1527 	 * unless the user used pg_resetwal or similar. If a user did so, there's
1528 	 * no hope continuing to decode anyway.
1529 	 */
1530 	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1531 			(uint32) (lsn >> 32), (uint32) lsn);
1532 
1533 	/*
1534 	 * first check whether some other backend already has written the snapshot
1535 	 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1536 	 * as a valid state. Everything else is an unexpected error.
1537 	 */
1538 	ret = stat(path, &stat_buf);
1539 
1540 	if (ret != 0 && errno != ENOENT)
1541 		ereport(ERROR,
1542 				(errcode_for_file_access(),
1543 				 errmsg("could not stat file \"%s\": %m", path)));
1544 
1545 	else if (ret == 0)
1546 	{
1547 		/*
1548 		 * somebody else has already serialized to this point, don't overwrite
1549 		 * but remember location, so we don't need to read old data again.
1550 		 *
1551 		 * To be sure it has been synced to disk after the rename() from the
1552 		 * tempfile filename to the real filename, we just repeat the fsync.
1553 		 * That ought to be cheap because in most scenarios it should already
1554 		 * be safely on disk.
1555 		 */
1556 		fsync_fname(path, false);
1557 		fsync_fname("pg_logical/snapshots", true);
1558 
1559 		builder->last_serialized_snapshot = lsn;
1560 		goto out;
1561 	}
1562 
1563 	/*
1564 	 * there is an obvious race condition here between the time we stat(2) the
1565 	 * file and us writing the file. But we rename the file into place
1566 	 * atomically and all files created need to contain the same data anyway,
1567 	 * so this is perfectly fine, although a bit of a resource waste. Locking
1568 	 * seems like pointless complication.
1569 	 */
1570 	elog(DEBUG1, "serializing snapshot to %s", path);
1571 
1572 	/* to make sure only we will write to this tempfile, include pid */
1573 	sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1574 			(uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1575 
1576 	/*
1577 	 * Unlink temporary file if it already exists, needs to have been before a
1578 	 * crash/error since we won't enter this function twice from within a
1579 	 * single decoding slot/backend and the temporary file contains the pid of
1580 	 * the current process.
1581 	 */
1582 	if (unlink(tmppath) != 0 && errno != ENOENT)
1583 		ereport(ERROR,
1584 				(errcode_for_file_access(),
1585 				 errmsg("could not remove file \"%s\": %m", tmppath)));
1586 
1587 	needed_length = sizeof(SnapBuildOnDisk) +
1588 		sizeof(TransactionId) * builder->committed.xcnt;
1589 
1590 	ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1591 	ondisk = (SnapBuildOnDisk *) ondisk_c;
1592 	ondisk->magic = SNAPBUILD_MAGIC;
1593 	ondisk->version = SNAPBUILD_VERSION;
1594 	ondisk->length = needed_length;
1595 	INIT_CRC32C(ondisk->checksum);
1596 	COMP_CRC32C(ondisk->checksum,
1597 				((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1598 				SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1599 	ondisk_c += sizeof(SnapBuildOnDisk);
1600 
1601 	memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1602 	/* NULL-ify memory-only data */
1603 	ondisk->builder.context = NULL;
1604 	ondisk->builder.snapshot = NULL;
1605 	ondisk->builder.reorder = NULL;
1606 	ondisk->builder.committed.xip = NULL;
1607 
1608 	COMP_CRC32C(ondisk->checksum,
1609 				&ondisk->builder,
1610 				sizeof(SnapBuild));
1611 
1612 	/* there shouldn't be any running xacts */
1613 	Assert(builder->was_running.was_xcnt == 0);
1614 
1615 	/* copy committed xacts */
1616 	sz = sizeof(TransactionId) * builder->committed.xcnt;
1617 	memcpy(ondisk_c, builder->committed.xip, sz);
1618 	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1619 	ondisk_c += sz;
1620 
1621 	FIN_CRC32C(ondisk->checksum);
1622 
1623 	/* we have valid data now, open tempfile and write it there */
1624 	fd = OpenTransientFile(tmppath,
1625 						   O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1626 	if (fd < 0)
1627 		ereport(ERROR,
1628 				(errcode_for_file_access(),
1629 				 errmsg("could not open file \"%s\": %m", tmppath)));
1630 
1631 	errno = 0;
1632 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1633 	if ((write(fd, ondisk, needed_length)) != needed_length)
1634 	{
1635 		int			save_errno = errno;
1636 
1637 		CloseTransientFile(fd);
1638 
1639 		/* if write didn't set errno, assume problem is no disk space */
1640 		errno = save_errno ? save_errno : ENOSPC;
1641 		ereport(ERROR,
1642 				(errcode_for_file_access(),
1643 				 errmsg("could not write to file \"%s\": %m", tmppath)));
1644 	}
1645 	pgstat_report_wait_end();
1646 
1647 	/*
1648 	 * fsync the file before renaming so that even if we crash after this we
1649 	 * have either a fully valid file or nothing.
1650 	 *
1651 	 * It's safe to just ERROR on fsync() here because we'll retry the whole
1652 	 * operation including the writes.
1653 	 *
1654 	 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1655 	 * some noticeable overhead since it's performed synchronously during
1656 	 * decoding?
1657 	 */
1658 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1659 	if (pg_fsync(fd) != 0)
1660 	{
1661 		int			save_errno = errno;
1662 
1663 		CloseTransientFile(fd);
1664 		errno = save_errno;
1665 		ereport(ERROR,
1666 				(errcode_for_file_access(),
1667 				 errmsg("could not fsync file \"%s\": %m", tmppath)));
1668 	}
1669 	pgstat_report_wait_end();
1670 
1671 	if (CloseTransientFile(fd))
1672 		ereport(ERROR,
1673 				(errcode_for_file_access(),
1674 				 errmsg("could not close file \"%s\": %m", tmppath)));
1675 
1676 	fsync_fname("pg_logical/snapshots", true);
1677 
1678 	/*
1679 	 * We may overwrite the work from some other backend, but that's ok, our
1680 	 * snapshot is valid as well, we'll just have done some superfluous work.
1681 	 */
1682 	if (rename(tmppath, path) != 0)
1683 	{
1684 		ereport(ERROR,
1685 				(errcode_for_file_access(),
1686 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1687 						tmppath, path)));
1688 	}
1689 
1690 	/* make sure we persist */
1691 	fsync_fname(path, false);
1692 	fsync_fname("pg_logical/snapshots", true);
1693 
1694 	/*
1695 	 * Now there's no way we can loose the dumped state anymore, remember this
1696 	 * as a serialization point.
1697 	 */
1698 	builder->last_serialized_snapshot = lsn;
1699 
1700 out:
1701 	ReorderBufferSetRestartPoint(builder->reorder,
1702 								 builder->last_serialized_snapshot);
1703 	/* be tidy */
1704 	if (ondisk)
1705 		pfree(ondisk);
1706 }
1707 
1708 /*
1709  * Restore a snapshot into 'builder' if previously one has been stored at the
1710  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1711  */
1712 static bool
SnapBuildRestore(SnapBuild * builder,XLogRecPtr lsn)1713 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1714 {
1715 	SnapBuildOnDisk ondisk;
1716 	int			fd;
1717 	char		path[MAXPGPATH];
1718 	Size		sz;
1719 	int			readBytes;
1720 	pg_crc32c	checksum;
1721 
1722 	/* no point in loading a snapshot if we're already there */
1723 	if (builder->state == SNAPBUILD_CONSISTENT)
1724 		return false;
1725 
1726 	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1727 			(uint32) (lsn >> 32), (uint32) lsn);
1728 
1729 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1730 
1731 	if (fd < 0 && errno == ENOENT)
1732 		return false;
1733 	else if (fd < 0)
1734 		ereport(ERROR,
1735 				(errcode_for_file_access(),
1736 				 errmsg("could not open file \"%s\": %m", path)));
1737 
1738 	/* ----
1739 	 * Make sure the snapshot had been stored safely to disk, that's normally
1740 	 * cheap.
1741 	 * Note that we do not need PANIC here, nobody will be able to use the
1742 	 * slot without fsyncing, and saving it won't succeed without an fsync()
1743 	 * either...
1744 	 * ----
1745 	 */
1746 	fsync_fname(path, false);
1747 	fsync_fname("pg_logical/snapshots", true);
1748 
1749 
1750 	/* read statically sized portion of snapshot */
1751 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1752 	readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1753 	pgstat_report_wait_end();
1754 	if (readBytes != SnapBuildOnDiskConstantSize)
1755 	{
1756 		int			save_errno = errno;
1757 
1758 		CloseTransientFile(fd);
1759 
1760 		if (readBytes < 0)
1761 		{
1762 			errno = save_errno;
1763 			ereport(ERROR,
1764 					(errcode_for_file_access(),
1765 					 errmsg("could not read file \"%s\": %m", path)));
1766 		}
1767 		else
1768 			ereport(ERROR,
1769 					(errcode(ERRCODE_DATA_CORRUPTED),
1770 					 errmsg("could not read file \"%s\": read %d of %zu",
1771 							path, readBytes,
1772 							(Size) SnapBuildOnDiskConstantSize)));
1773 	}
1774 
1775 	if (ondisk.magic != SNAPBUILD_MAGIC)
1776 		ereport(ERROR,
1777 				(errcode(ERRCODE_DATA_CORRUPTED),
1778 				 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1779 						path, ondisk.magic, SNAPBUILD_MAGIC)));
1780 
1781 	if (ondisk.version != SNAPBUILD_VERSION)
1782 		ereport(ERROR,
1783 				(errcode(ERRCODE_DATA_CORRUPTED),
1784 				 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1785 						path, ondisk.version, SNAPBUILD_VERSION)));
1786 
1787 	INIT_CRC32C(checksum);
1788 	COMP_CRC32C(checksum,
1789 				((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1790 				SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1791 
1792 	/* read SnapBuild */
1793 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1794 	readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1795 	pgstat_report_wait_end();
1796 	if (readBytes != sizeof(SnapBuild))
1797 	{
1798 		int			save_errno = errno;
1799 
1800 		CloseTransientFile(fd);
1801 
1802 		if (readBytes < 0)
1803 		{
1804 			errno = save_errno;
1805 			ereport(ERROR,
1806 					(errcode_for_file_access(),
1807 					 errmsg("could not read file \"%s\": %m", path)));
1808 		}
1809 		else
1810 			ereport(ERROR,
1811 					(errcode(ERRCODE_DATA_CORRUPTED),
1812 					 errmsg("could not read file \"%s\": read %d of %zu",
1813 							path, readBytes, sizeof(SnapBuild))));
1814 	}
1815 	COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1816 
1817 	/* restore running xacts (dead, but kept for backward compat) */
1818 	sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
1819 	ondisk.builder.was_running.was_xip =
1820 		MemoryContextAllocZero(builder->context, sz);
1821 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1822 	readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
1823 	pgstat_report_wait_end();
1824 	if (readBytes != sz)
1825 	{
1826 		int			save_errno = errno;
1827 
1828 		CloseTransientFile(fd);
1829 
1830 		if (readBytes < 0)
1831 		{
1832 			errno = save_errno;
1833 			ereport(ERROR,
1834 					(errcode_for_file_access(),
1835 					 errmsg("could not read file \"%s\": %m", path)));
1836 		}
1837 		else
1838 			ereport(ERROR,
1839 					(errcode(ERRCODE_DATA_CORRUPTED),
1840 					 errmsg("could not read file \"%s\": read %d of %zu",
1841 							path, readBytes, sz)));
1842 	}
1843 	COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
1844 
1845 	/* restore committed xacts information */
1846 	sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1847 	ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1848 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1849 	readBytes = read(fd, ondisk.builder.committed.xip, sz);
1850 	pgstat_report_wait_end();
1851 	if (readBytes != sz)
1852 	{
1853 		int			save_errno = errno;
1854 
1855 		CloseTransientFile(fd);
1856 
1857 		if (readBytes < 0)
1858 		{
1859 			errno = save_errno;
1860 			ereport(ERROR,
1861 					(errcode_for_file_access(),
1862 					 errmsg("could not read file \"%s\": %m", path)));
1863 		}
1864 		else
1865 			ereport(ERROR,
1866 					(errcode(ERRCODE_DATA_CORRUPTED),
1867 					 errmsg("could not read file \"%s\": read %d of %zu",
1868 							path, readBytes, sz)));
1869 	}
1870 	COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1871 
1872 	if (CloseTransientFile(fd))
1873 		ereport(ERROR,
1874 				(errcode_for_file_access(),
1875 				 errmsg("could not close file \"%s\": %m", path)));
1876 
1877 	FIN_CRC32C(checksum);
1878 
1879 	/* verify checksum of what we've read */
1880 	if (!EQ_CRC32C(checksum, ondisk.checksum))
1881 		ereport(ERROR,
1882 				(errcode(ERRCODE_DATA_CORRUPTED),
1883 				 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1884 						path, checksum, ondisk.checksum)));
1885 
1886 	/*
1887 	 * ok, we now have a sensible snapshot here, figure out if it has more
1888 	 * information than we have.
1889 	 */
1890 
1891 	/*
1892 	 * We are only interested in consistent snapshots for now, comparing
1893 	 * whether one incomplete snapshot is more "advanced" seems to be
1894 	 * unnecessarily complex.
1895 	 */
1896 	if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1897 		goto snapshot_not_interesting;
1898 
1899 	/*
1900 	 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1901 	 * be available.
1902 	 */
1903 	if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1904 		goto snapshot_not_interesting;
1905 
1906 
1907 	/* ok, we think the snapshot is sensible, copy over everything important */
1908 	builder->xmin = ondisk.builder.xmin;
1909 	builder->xmax = ondisk.builder.xmax;
1910 	builder->state = ondisk.builder.state;
1911 
1912 	builder->committed.xcnt = ondisk.builder.committed.xcnt;
1913 	/* We only allocated/stored xcnt, not xcnt_space xids ! */
1914 	/* don't overwrite preallocated xip, if we don't have anything here */
1915 	if (builder->committed.xcnt > 0)
1916 	{
1917 		pfree(builder->committed.xip);
1918 		builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1919 		builder->committed.xip = ondisk.builder.committed.xip;
1920 	}
1921 	ondisk.builder.committed.xip = NULL;
1922 
1923 	/* our snapshot is not interesting anymore, build a new one */
1924 	if (builder->snapshot != NULL)
1925 	{
1926 		SnapBuildSnapDecRefcount(builder->snapshot);
1927 	}
1928 	builder->snapshot = SnapBuildBuildSnapshot(builder);
1929 	SnapBuildSnapIncRefcount(builder->snapshot);
1930 
1931 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
1932 
1933 	Assert(builder->state == SNAPBUILD_CONSISTENT);
1934 
1935 	ereport(LOG,
1936 			(errmsg("logical decoding found consistent point at %X/%X",
1937 					(uint32) (lsn >> 32), (uint32) lsn),
1938 			 errdetail("Logical decoding will begin using saved snapshot.")));
1939 	return true;
1940 
1941 snapshot_not_interesting:
1942 	if (ondisk.builder.committed.xip != NULL)
1943 		pfree(ondisk.builder.committed.xip);
1944 	return false;
1945 }
1946 
1947 /*
1948  * Remove all serialized snapshots that are not required anymore because no
1949  * slot can need them. This doesn't actually have to run during a checkpoint,
1950  * but it's a convenient point to schedule this.
1951  *
1952  * NB: We run this during checkpoints even if logical decoding is disabled so
1953  * we cleanup old slots at some point after it got disabled.
1954  */
1955 void
CheckPointSnapBuild(void)1956 CheckPointSnapBuild(void)
1957 {
1958 	XLogRecPtr	cutoff;
1959 	XLogRecPtr	redo;
1960 	DIR		   *snap_dir;
1961 	struct dirent *snap_de;
1962 	char		path[MAXPGPATH + 21];
1963 
1964 	/*
1965 	 * We start off with a minimum of the last redo pointer. No new
1966 	 * replication slot will start before that, so that's a safe upper bound
1967 	 * for removal.
1968 	 */
1969 	redo = GetRedoRecPtr();
1970 
1971 	/* now check for the restart ptrs from existing slots */
1972 	cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1973 
1974 	/* don't start earlier than the restart lsn */
1975 	if (redo < cutoff)
1976 		cutoff = redo;
1977 
1978 	snap_dir = AllocateDir("pg_logical/snapshots");
1979 	while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1980 	{
1981 		uint32		hi;
1982 		uint32		lo;
1983 		XLogRecPtr	lsn;
1984 		struct stat statbuf;
1985 
1986 		if (strcmp(snap_de->d_name, ".") == 0 ||
1987 			strcmp(snap_de->d_name, "..") == 0)
1988 			continue;
1989 
1990 		snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
1991 
1992 		if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1993 		{
1994 			elog(DEBUG1, "only regular files expected: %s", path);
1995 			continue;
1996 		}
1997 
1998 		/*
1999 		 * temporary filenames from SnapBuildSerialize() include the LSN and
2000 		 * everything but are postfixed by .$pid.tmp. We can just remove them
2001 		 * the same as other files because there can be none that are
2002 		 * currently being written that are older than cutoff.
2003 		 *
2004 		 * We just log a message if a file doesn't fit the pattern, it's
2005 		 * probably some editors lock/state file or similar...
2006 		 */
2007 		if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2008 		{
2009 			ereport(LOG,
2010 					(errmsg("could not parse file name \"%s\"", path)));
2011 			continue;
2012 		}
2013 
2014 		lsn = ((uint64) hi) << 32 | lo;
2015 
2016 		/* check whether we still need it */
2017 		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
2018 		{
2019 			elog(DEBUG1, "removing snapbuild snapshot %s", path);
2020 
2021 			/*
2022 			 * It's not particularly harmful, though strange, if we can't
2023 			 * remove the file here. Don't prevent the checkpoint from
2024 			 * completing, that'd be a cure worse than the disease.
2025 			 */
2026 			if (unlink(path) < 0)
2027 			{
2028 				ereport(LOG,
2029 						(errcode_for_file_access(),
2030 						 errmsg("could not remove file \"%s\": %m",
2031 								path)));
2032 				continue;
2033 			}
2034 		}
2035 	}
2036 	FreeDir(snap_dir);
2037 }
2038