1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  *	  Infrastructure for building historic catalog snapshots based on contents
6  *	  of the WAL, for the purpose of decoding heapam.c style values in the
7  *	  WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this module, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (cf. combocid.c). Since combo CIDs are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph describing the SnapBuild->state transitions:
60  *
61  *		   +-------------------------+
62  *	  +----|		 START			 |-------------+
63  *	  |    +-------------------------+			   |
64  *	  |					|						   |
65  *	  |					|						   |
66  *	  |		   running_xacts #1					   |
67  *	  |					|						   |
68  *	  |					|						   |
69  *	  |					v						   |
70  *	  |    +-------------------------+			   v
71  *	  |    |   BUILDING_SNAPSHOT	 |------------>|
72  *	  |    +-------------------------+			   |
73  *	  |					|						   |
74  *	  |					|						   |
75  *	  | running_xacts #2, xacts from #1 finished   |
76  *	  |					|						   |
77  *	  |					|						   |
78  *	  |					v						   |
79  *	  |    +-------------------------+			   v
80  *	  |    |	   FULL_SNAPSHOT	 |------------>|
81  *	  |    +-------------------------+			   |
82  *	  |					|						   |
83  * running_xacts		|					   saved snapshot
84  * with zero xacts		|				  at running_xacts's lsn
85  *	  |					|						   |
86  *	  | running_xacts with xacts from #2 finished  |
87  *	  |					|						   |
88  *	  |					v						   |
89  *	  |    +-------------------------+			   |
90  *	  +--->|SNAPBUILD_CONSISTENT	 |<------------+
91  *		   +-------------------------+
92  *
93  * Initially the machinery is in the START stage. When an xl_running_xacts
94  * record is read that is sufficiently new (above the safe xmin horizon),
95  * there's a state transition. If there were no running xacts when the
96  * running_xacts record was generated, we'll directly go into CONSISTENT
97  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98  * snapshot means that all transactions that start henceforth can be decoded
99  * in their entirety, but transactions that started previously can't. In
100  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101  * running transactions have committed or aborted.
102  *
103  * Only transactions that commit after CONSISTENT state has been reached will
104  * be replayed, even though they might have started while still in
105  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106  * changes has been exported, but all the following ones will be. That point
107  * is a convenient point to initialize replication from, which is why we
108  * export a snapshot at that point, which *can* be used to read normal data.
109  *
110  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
111  *
112  * IDENTIFICATION
113  *	  src/backend/replication/snapbuild.c
114  *
115  *-------------------------------------------------------------------------
116  */
117 
118 #include "postgres.h"
119 
120 #include <sys/stat.h>
121 #include <unistd.h>
122 
123 #include "access/heapam_xlog.h"
124 #include "access/transam.h"
125 #include "access/xact.h"
126 #include "miscadmin.h"
127 #include "pgstat.h"
128 #include "replication/logical.h"
129 #include "replication/reorderbuffer.h"
130 #include "replication/snapbuild.h"
131 #include "storage/block.h"		/* debugging output */
132 #include "storage/fd.h"
133 #include "storage/lmgr.h"
134 #include "storage/proc.h"
135 #include "storage/procarray.h"
136 #include "storage/standby.h"
137 #include "utils/builtins.h"
138 #include "utils/memutils.h"
139 #include "utils/snapmgr.h"
140 #include "utils/snapshot.h"
141 
142 /*
143  * This struct contains the current state of the snapshot building
144  * machinery. Besides a forward declaration in the header, it is not exposed
145  * to the public, so we can easily change its contents.
146  */
147 struct SnapBuild
148 {
149 	/* how far are we along building our first full snapshot */
150 	SnapBuildState state;
151 
152 	/* private memory context used to allocate memory for this module. */
153 	MemoryContext context;
154 
155 	/* all transactions < than this have committed/aborted */
156 	TransactionId xmin;
157 
158 	/* all transactions >= than this are uncommitted */
159 	TransactionId xmax;
160 
161 	/*
162 	 * Don't replay commits from an LSN < this LSN. This can be set externally
163 	 * but it will also be advanced (never retreat) from within snapbuild.c.
164 	 */
165 	XLogRecPtr	start_decoding_at;
166 
167 	/*
168 	 * LSN at which we found a consistent point at the time of slot creation.
169 	 * This is also the point where we have exported a snapshot for the
170 	 * initial copy.
171 	 *
172 	 * The prepared transactions that are not covered by initial snapshot
173 	 * needs to be sent later along with commit prepared and they must be
174 	 * before this point.
175 	 */
176 	XLogRecPtr	initial_consistent_point;
177 
178 	/*
179 	 * Don't start decoding WAL until the "xl_running_xacts" information
180 	 * indicates there are no running xids with an xid smaller than this.
181 	 */
182 	TransactionId initial_xmin_horizon;
183 
184 	/* Indicates if we are building full snapshot or just catalog one. */
185 	bool		building_full_snapshot;
186 
187 	/*
188 	 * Snapshot that's valid to see the catalog state seen at this moment.
189 	 */
190 	Snapshot	snapshot;
191 
192 	/*
193 	 * LSN of the last location we are sure a snapshot has been serialized to.
194 	 */
195 	XLogRecPtr	last_serialized_snapshot;
196 
197 	/*
198 	 * The reorderbuffer we need to update with usable snapshots et al.
199 	 */
200 	ReorderBuffer *reorder;
201 
202 	/*
203 	 * TransactionId at which the next phase of initial snapshot building will
204 	 * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
205 	 * when no next phase necessary (SNAPBUILD_CONSISTENT).
206 	 */
207 	TransactionId next_phase_at;
208 
209 	/*
210 	 * Array of transactions which could have catalog changes that committed
211 	 * between xmin and xmax.
212 	 */
213 	struct
214 	{
215 		/* number of committed transactions */
216 		size_t		xcnt;
217 
218 		/* available space for committed transactions */
219 		size_t		xcnt_space;
220 
221 		/*
222 		 * Until we reach a CONSISTENT state, we record commits of all
223 		 * transactions, not just the catalog changing ones. Record when that
224 		 * changes so we know we cannot export a snapshot safely anymore.
225 		 */
226 		bool		includes_all_transactions;
227 
228 		/*
229 		 * Array of committed transactions that have modified the catalog.
230 		 *
231 		 * As this array is frequently modified we do *not* keep it in
232 		 * xidComparator order. Instead we sort the array when building &
233 		 * distributing a snapshot.
234 		 *
235 		 * TODO: It's unclear whether that reasoning has much merit. Every
236 		 * time we add something here after becoming consistent will also
237 		 * require distributing a snapshot. Storing them sorted would
238 		 * potentially also make it easier to purge (but more complicated wrt
239 		 * wraparound?). Should be improved if sorting while building the
240 		 * snapshot shows up in profiles.
241 		 */
242 		TransactionId *xip;
243 	}			committed;
244 };
245 
246 /*
247  * Starting a transaction -- which we need to do while exporting a snapshot --
248  * removes knowledge about the previously used resowner, so we save it here.
249  */
250 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
251 static bool ExportInProgress = false;
252 
253 /* ->committed manipulation */
254 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
255 
256 /* snapshot building/manipulation/distribution functions */
257 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
258 
259 static void SnapBuildFreeSnapshot(Snapshot snap);
260 
261 static void SnapBuildSnapIncRefcount(Snapshot snap);
262 
263 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
264 
265 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
266 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
267 static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
268 
269 /* serialization functions */
270 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
271 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
272 
273 /*
274  * Allocate a new snapshot builder.
275  *
276  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
277  * removed, start_lsn is the LSN >= we want to replay commits.
278  */
279 SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer * reorder,TransactionId xmin_horizon,XLogRecPtr start_lsn,bool need_full_snapshot,XLogRecPtr initial_consistent_point)280 AllocateSnapshotBuilder(ReorderBuffer *reorder,
281 						TransactionId xmin_horizon,
282 						XLogRecPtr start_lsn,
283 						bool need_full_snapshot,
284 						XLogRecPtr initial_consistent_point)
285 {
286 	MemoryContext context;
287 	MemoryContext oldcontext;
288 	SnapBuild  *builder;
289 
290 	/* allocate memory in own context, to have better accountability */
291 	context = AllocSetContextCreate(CurrentMemoryContext,
292 									"snapshot builder context",
293 									ALLOCSET_DEFAULT_SIZES);
294 	oldcontext = MemoryContextSwitchTo(context);
295 
296 	builder = palloc0(sizeof(SnapBuild));
297 
298 	builder->state = SNAPBUILD_START;
299 	builder->context = context;
300 	builder->reorder = reorder;
301 	/* Other struct members initialized by zeroing via palloc0 above */
302 
303 	builder->committed.xcnt = 0;
304 	builder->committed.xcnt_space = 128;	/* arbitrary number */
305 	builder->committed.xip =
306 		palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
307 	builder->committed.includes_all_transactions = true;
308 
309 	builder->initial_xmin_horizon = xmin_horizon;
310 	builder->start_decoding_at = start_lsn;
311 	builder->building_full_snapshot = need_full_snapshot;
312 	builder->initial_consistent_point = initial_consistent_point;
313 
314 	MemoryContextSwitchTo(oldcontext);
315 
316 	return builder;
317 }
318 
319 /*
320  * Free a snapshot builder.
321  */
322 void
FreeSnapshotBuilder(SnapBuild * builder)323 FreeSnapshotBuilder(SnapBuild *builder)
324 {
325 	MemoryContext context = builder->context;
326 
327 	/* free snapshot explicitly, that contains some error checking */
328 	if (builder->snapshot != NULL)
329 	{
330 		SnapBuildSnapDecRefcount(builder->snapshot);
331 		builder->snapshot = NULL;
332 	}
333 
334 	/* other resources are deallocated via memory context reset */
335 	MemoryContextDelete(context);
336 }
337 
338 /*
339  * Free an unreferenced snapshot that has previously been built by us.
340  */
341 static void
SnapBuildFreeSnapshot(Snapshot snap)342 SnapBuildFreeSnapshot(Snapshot snap)
343 {
344 	/* make sure we don't get passed an external snapshot */
345 	Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
346 
347 	/* make sure nobody modified our snapshot */
348 	Assert(snap->curcid == FirstCommandId);
349 	Assert(!snap->suboverflowed);
350 	Assert(!snap->takenDuringRecovery);
351 	Assert(snap->regd_count == 0);
352 
353 	/* slightly more likely, so it's checked even without c-asserts */
354 	if (snap->copied)
355 		elog(ERROR, "cannot free a copied snapshot");
356 
357 	if (snap->active_count)
358 		elog(ERROR, "cannot free an active snapshot");
359 
360 	pfree(snap);
361 }
362 
363 /*
364  * In which state of snapshot building are we?
365  */
366 SnapBuildState
SnapBuildCurrentState(SnapBuild * builder)367 SnapBuildCurrentState(SnapBuild *builder)
368 {
369 	return builder->state;
370 }
371 
372 /*
373  * Return the LSN at which the snapshot was exported
374  */
375 XLogRecPtr
SnapBuildInitialConsistentPoint(SnapBuild * builder)376 SnapBuildInitialConsistentPoint(SnapBuild *builder)
377 {
378 	return builder->initial_consistent_point;
379 }
380 
381 /*
382  * Should the contents of transaction ending at 'ptr' be decoded?
383  */
384 bool
SnapBuildXactNeedsSkip(SnapBuild * builder,XLogRecPtr ptr)385 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
386 {
387 	return ptr < builder->start_decoding_at;
388 }
389 
390 /*
391  * Increase refcount of a snapshot.
392  *
393  * This is used when handing out a snapshot to some external resource or when
394  * adding a Snapshot as builder->snapshot.
395  */
396 static void
SnapBuildSnapIncRefcount(Snapshot snap)397 SnapBuildSnapIncRefcount(Snapshot snap)
398 {
399 	snap->active_count++;
400 }
401 
402 /*
403  * Decrease refcount of a snapshot and free if the refcount reaches zero.
404  *
405  * Externally visible, so that external resources that have been handed an
406  * IncRef'ed Snapshot can adjust its refcount easily.
407  */
408 void
SnapBuildSnapDecRefcount(Snapshot snap)409 SnapBuildSnapDecRefcount(Snapshot snap)
410 {
411 	/* make sure we don't get passed an external snapshot */
412 	Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
413 
414 	/* make sure nobody modified our snapshot */
415 	Assert(snap->curcid == FirstCommandId);
416 	Assert(!snap->suboverflowed);
417 	Assert(!snap->takenDuringRecovery);
418 
419 	Assert(snap->regd_count == 0);
420 
421 	Assert(snap->active_count > 0);
422 
423 	/* slightly more likely, so it's checked even without casserts */
424 	if (snap->copied)
425 		elog(ERROR, "cannot free a copied snapshot");
426 
427 	snap->active_count--;
428 	if (snap->active_count == 0)
429 		SnapBuildFreeSnapshot(snap);
430 }
431 
432 /*
433  * Build a new snapshot, based on currently committed catalog-modifying
434  * transactions.
435  *
436  * In-progress transactions with catalog access are *not* allowed to modify
437  * these snapshots; they have to copy them and fill in appropriate ->curcid
438  * and ->subxip/subxcnt values.
439  */
440 static Snapshot
SnapBuildBuildSnapshot(SnapBuild * builder)441 SnapBuildBuildSnapshot(SnapBuild *builder)
442 {
443 	Snapshot	snapshot;
444 	Size		ssize;
445 
446 	Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
447 
448 	ssize = sizeof(SnapshotData)
449 		+ sizeof(TransactionId) * builder->committed.xcnt
450 		+ sizeof(TransactionId) * 1 /* toplevel xid */ ;
451 
452 	snapshot = MemoryContextAllocZero(builder->context, ssize);
453 
454 	snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
455 
456 	/*
457 	 * We misuse the original meaning of SnapshotData's xip and subxip fields
458 	 * to make the more fitting for our needs.
459 	 *
460 	 * In the 'xip' array we store transactions that have to be treated as
461 	 * committed. Since we will only ever look at tuples from transactions
462 	 * that have modified the catalog it's more efficient to store those few
463 	 * that exist between xmin and xmax (frequently there are none).
464 	 *
465 	 * Snapshots that are used in transactions that have modified the catalog
466 	 * also use the 'subxip' array to store their toplevel xid and all the
467 	 * subtransaction xids so we can recognize when we need to treat rows as
468 	 * visible that are not in xip but still need to be visible. Subxip only
469 	 * gets filled when the transaction is copied into the context of a
470 	 * catalog modifying transaction since we otherwise share a snapshot
471 	 * between transactions. As long as a txn hasn't modified the catalog it
472 	 * doesn't need to treat any uncommitted rows as visible, so there is no
473 	 * need for those xids.
474 	 *
475 	 * Both arrays are qsort'ed so that we can use bsearch() on them.
476 	 */
477 	Assert(TransactionIdIsNormal(builder->xmin));
478 	Assert(TransactionIdIsNormal(builder->xmax));
479 
480 	snapshot->xmin = builder->xmin;
481 	snapshot->xmax = builder->xmax;
482 
483 	/* store all transactions to be treated as committed by this snapshot */
484 	snapshot->xip =
485 		(TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
486 	snapshot->xcnt = builder->committed.xcnt;
487 	memcpy(snapshot->xip,
488 		   builder->committed.xip,
489 		   builder->committed.xcnt * sizeof(TransactionId));
490 
491 	/* sort so we can bsearch() */
492 	qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
493 
494 	/*
495 	 * Initially, subxip is empty, i.e. it's a snapshot to be used by
496 	 * transactions that don't modify the catalog. Will be filled by
497 	 * ReorderBufferCopySnap() if necessary.
498 	 */
499 	snapshot->subxcnt = 0;
500 	snapshot->subxip = NULL;
501 
502 	snapshot->suboverflowed = false;
503 	snapshot->takenDuringRecovery = false;
504 	snapshot->copied = false;
505 	snapshot->curcid = FirstCommandId;
506 	snapshot->active_count = 0;
507 	snapshot->regd_count = 0;
508 	snapshot->snapXactCompletionCount = 0;
509 
510 	return snapshot;
511 }
512 
513 /*
514  * Build the initial slot snapshot and convert it to a normal snapshot that
515  * is understood by HeapTupleSatisfiesMVCC.
516  *
517  * The snapshot will be usable directly in current transaction or exported
518  * for loading in different transaction.
519  */
520 Snapshot
SnapBuildInitialSnapshot(SnapBuild * builder)521 SnapBuildInitialSnapshot(SnapBuild *builder)
522 {
523 	Snapshot	snap;
524 	TransactionId xid;
525 	TransactionId *newxip;
526 	int			newxcnt = 0;
527 
528 	Assert(!FirstSnapshotSet);
529 	Assert(XactIsoLevel == XACT_REPEATABLE_READ);
530 
531 	if (builder->state != SNAPBUILD_CONSISTENT)
532 		elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
533 
534 	if (!builder->committed.includes_all_transactions)
535 		elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
536 
537 	/* so we don't overwrite the existing value */
538 	if (TransactionIdIsValid(MyProc->xmin))
539 		elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
540 
541 	snap = SnapBuildBuildSnapshot(builder);
542 
543 	/*
544 	 * We know that snap->xmin is alive, enforced by the logical xmin
545 	 * mechanism. Due to that we can do this without locks, we're only
546 	 * changing our own value.
547 	 */
548 #ifdef USE_ASSERT_CHECKING
549 	{
550 		TransactionId safeXid;
551 
552 		LWLockAcquire(ProcArrayLock, LW_SHARED);
553 		safeXid = GetOldestSafeDecodingTransactionId(false);
554 		LWLockRelease(ProcArrayLock);
555 
556 		Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
557 	}
558 #endif
559 
560 	MyProc->xmin = snap->xmin;
561 
562 	/* allocate in transaction context */
563 	newxip = (TransactionId *)
564 		palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
565 
566 	/*
567 	 * snapbuild.c builds transactions in an "inverted" manner, which means it
568 	 * stores committed transactions in ->xip, not ones in progress. Build a
569 	 * classical snapshot by marking all non-committed transactions as
570 	 * in-progress. This can be expensive.
571 	 */
572 	for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
573 	{
574 		void	   *test;
575 
576 		/*
577 		 * Check whether transaction committed using the decoding snapshot
578 		 * meaning of ->xip.
579 		 */
580 		test = bsearch(&xid, snap->xip, snap->xcnt,
581 					   sizeof(TransactionId), xidComparator);
582 
583 		if (test == NULL)
584 		{
585 			if (newxcnt >= GetMaxSnapshotXidCount())
586 				ereport(ERROR,
587 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
588 						 errmsg("initial slot snapshot too large")));
589 
590 			newxip[newxcnt++] = xid;
591 		}
592 
593 		TransactionIdAdvance(xid);
594 	}
595 
596 	/* adjust remaining snapshot fields as needed */
597 	snap->snapshot_type = SNAPSHOT_MVCC;
598 	snap->xcnt = newxcnt;
599 	snap->xip = newxip;
600 
601 	return snap;
602 }
603 
604 /*
605  * Export a snapshot so it can be set in another session with SET TRANSACTION
606  * SNAPSHOT.
607  *
608  * For that we need to start a transaction in the current backend as the
609  * importing side checks whether the source transaction is still open to make
610  * sure the xmin horizon hasn't advanced since then.
611  */
612 const char *
SnapBuildExportSnapshot(SnapBuild * builder)613 SnapBuildExportSnapshot(SnapBuild *builder)
614 {
615 	Snapshot	snap;
616 	char	   *snapname;
617 
618 	if (IsTransactionOrTransactionBlock())
619 		elog(ERROR, "cannot export a snapshot from within a transaction");
620 
621 	if (SavedResourceOwnerDuringExport)
622 		elog(ERROR, "can only export one snapshot at a time");
623 
624 	SavedResourceOwnerDuringExport = CurrentResourceOwner;
625 	ExportInProgress = true;
626 
627 	StartTransactionCommand();
628 
629 	/* There doesn't seem to a nice API to set these */
630 	XactIsoLevel = XACT_REPEATABLE_READ;
631 	XactReadOnly = true;
632 
633 	snap = SnapBuildInitialSnapshot(builder);
634 
635 	/*
636 	 * now that we've built a plain snapshot, make it active and use the
637 	 * normal mechanisms for exporting it
638 	 */
639 	snapname = ExportSnapshot(snap);
640 
641 	ereport(LOG,
642 			(errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
643 						   "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
644 						   snap->xcnt,
645 						   snapname, snap->xcnt)));
646 	return snapname;
647 }
648 
649 /*
650  * Ensure there is a snapshot and if not build one for current transaction.
651  */
652 Snapshot
SnapBuildGetOrBuildSnapshot(SnapBuild * builder,TransactionId xid)653 SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
654 {
655 	Assert(builder->state == SNAPBUILD_CONSISTENT);
656 
657 	/* only build a new snapshot if we don't have a prebuilt one */
658 	if (builder->snapshot == NULL)
659 	{
660 		builder->snapshot = SnapBuildBuildSnapshot(builder);
661 		/* increase refcount for the snapshot builder */
662 		SnapBuildSnapIncRefcount(builder->snapshot);
663 	}
664 
665 	return builder->snapshot;
666 }
667 
668 /*
669  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
670  * any. Aborts the previously started transaction and resets the resource
671  * owner back to its original value.
672  */
673 void
SnapBuildClearExportedSnapshot(void)674 SnapBuildClearExportedSnapshot(void)
675 {
676 	ResourceOwner tmpResOwner;
677 
678 	/* nothing exported, that is the usual case */
679 	if (!ExportInProgress)
680 		return;
681 
682 	if (!IsTransactionState())
683 		elog(ERROR, "clearing exported snapshot in wrong transaction state");
684 
685 	/*
686 	 * AbortCurrentTransaction() takes care of resetting the snapshot state,
687 	 * so remember SavedResourceOwnerDuringExport.
688 	 */
689 	tmpResOwner = SavedResourceOwnerDuringExport;
690 
691 	/* make sure nothing could have ever happened */
692 	AbortCurrentTransaction();
693 
694 	CurrentResourceOwner = tmpResOwner;
695 }
696 
697 /*
698  * Clear snapshot export state during transaction abort.
699  */
700 void
SnapBuildResetExportedSnapshotState(void)701 SnapBuildResetExportedSnapshotState(void)
702 {
703 	SavedResourceOwnerDuringExport = NULL;
704 	ExportInProgress = false;
705 }
706 
707 /*
708  * Handle the effects of a single heap change, appropriate to the current state
709  * of the snapshot builder and returns whether changes made at (xid, lsn) can
710  * be decoded.
711  */
712 bool
SnapBuildProcessChange(SnapBuild * builder,TransactionId xid,XLogRecPtr lsn)713 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
714 {
715 	/*
716 	 * We can't handle data in transactions if we haven't built a snapshot
717 	 * yet, so don't store them.
718 	 */
719 	if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
720 		return false;
721 
722 	/*
723 	 * No point in keeping track of changes in transactions that we don't have
724 	 * enough information about to decode. This means that they started before
725 	 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
726 	 */
727 	if (builder->state < SNAPBUILD_CONSISTENT &&
728 		TransactionIdPrecedes(xid, builder->next_phase_at))
729 		return false;
730 
731 	/*
732 	 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
733 	 * be needed to decode the change we're currently processing.
734 	 */
735 	if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
736 	{
737 		/* only build a new snapshot if we don't have a prebuilt one */
738 		if (builder->snapshot == NULL)
739 		{
740 			builder->snapshot = SnapBuildBuildSnapshot(builder);
741 			/* increase refcount for the snapshot builder */
742 			SnapBuildSnapIncRefcount(builder->snapshot);
743 		}
744 
745 		/*
746 		 * Increase refcount for the transaction we're handing the snapshot
747 		 * out to.
748 		 */
749 		SnapBuildSnapIncRefcount(builder->snapshot);
750 		ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
751 									 builder->snapshot);
752 	}
753 
754 	return true;
755 }
756 
757 /*
758  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
759  * This implies that a transaction has done some form of write to system
760  * catalogs.
761  */
762 void
SnapBuildProcessNewCid(SnapBuild * builder,TransactionId xid,XLogRecPtr lsn,xl_heap_new_cid * xlrec)763 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
764 					   XLogRecPtr lsn, xl_heap_new_cid *xlrec)
765 {
766 	CommandId	cid;
767 
768 	/*
769 	 * we only log new_cid's if a catalog tuple was modified, so mark the
770 	 * transaction as containing catalog modifications
771 	 */
772 	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
773 
774 	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
775 								 xlrec->target_node, xlrec->target_tid,
776 								 xlrec->cmin, xlrec->cmax,
777 								 xlrec->combocid);
778 
779 	/* figure out new command id */
780 	if (xlrec->cmin != InvalidCommandId &&
781 		xlrec->cmax != InvalidCommandId)
782 		cid = Max(xlrec->cmin, xlrec->cmax);
783 	else if (xlrec->cmax != InvalidCommandId)
784 		cid = xlrec->cmax;
785 	else if (xlrec->cmin != InvalidCommandId)
786 		cid = xlrec->cmin;
787 	else
788 	{
789 		cid = InvalidCommandId; /* silence compiler */
790 		elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
791 	}
792 
793 	ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
794 }
795 
796 /*
797  * Add a new Snapshot to all transactions we're decoding that currently are
798  * in-progress so they can see new catalog contents made by the transaction
799  * that just committed. This is necessary because those in-progress
800  * transactions will use the new catalog's contents from here on (at the very
801  * least everything they do needs to be compatible with newer catalog
802  * contents).
803  */
804 static void
SnapBuildDistributeNewCatalogSnapshot(SnapBuild * builder,XLogRecPtr lsn)805 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
806 {
807 	dlist_iter	txn_i;
808 	ReorderBufferTXN *txn;
809 
810 	/*
811 	 * Iterate through all toplevel transactions. This can include
812 	 * subtransactions which we just don't yet know to be that, but that's
813 	 * fine, they will just get an unnecessary snapshot queued.
814 	 */
815 	dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
816 	{
817 		txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
818 
819 		Assert(TransactionIdIsValid(txn->xid));
820 
821 		/*
822 		 * If we don't have a base snapshot yet, there are no changes in this
823 		 * transaction which in turn implies we don't yet need a snapshot at
824 		 * all. We'll add a snapshot when the first change gets queued.
825 		 *
826 		 * NB: This works correctly even for subtransactions because
827 		 * ReorderBufferAssignChild() takes care to transfer the base snapshot
828 		 * to the top-level transaction, and while iterating the changequeue
829 		 * we'll get the change from the subtxn.
830 		 */
831 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
832 			continue;
833 
834 		/*
835 		 * We don't need to add snapshot to prepared transactions as they
836 		 * should not see the new catalog contents.
837 		 */
838 		if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
839 			continue;
840 
841 		elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
842 			 txn->xid, LSN_FORMAT_ARGS(lsn));
843 
844 		/*
845 		 * increase the snapshot's refcount for the transaction we are handing
846 		 * it out to
847 		 */
848 		SnapBuildSnapIncRefcount(builder->snapshot);
849 		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
850 								 builder->snapshot);
851 	}
852 }
853 
854 /*
855  * Keep track of a new catalog changing transaction that has committed.
856  */
857 static void
SnapBuildAddCommittedTxn(SnapBuild * builder,TransactionId xid)858 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
859 {
860 	Assert(TransactionIdIsValid(xid));
861 
862 	if (builder->committed.xcnt == builder->committed.xcnt_space)
863 	{
864 		builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
865 
866 		elog(DEBUG1, "increasing space for committed transactions to %u",
867 			 (uint32) builder->committed.xcnt_space);
868 
869 		builder->committed.xip = repalloc(builder->committed.xip,
870 										  builder->committed.xcnt_space * sizeof(TransactionId));
871 	}
872 
873 	/*
874 	 * TODO: It might make sense to keep the array sorted here instead of
875 	 * doing it every time we build a new snapshot. On the other hand this
876 	 * gets called repeatedly when a transaction with subtransactions commits.
877 	 */
878 	builder->committed.xip[builder->committed.xcnt++] = xid;
879 }
880 
881 /*
882  * Remove knowledge about transactions we treat as committed that are smaller
883  * than ->xmin. Those won't ever get checked via the ->committed array but via
884  * the clog machinery, so we don't need to waste memory on them.
885  */
886 static void
SnapBuildPurgeCommittedTxn(SnapBuild * builder)887 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
888 {
889 	int			off;
890 	TransactionId *workspace;
891 	int			surviving_xids = 0;
892 
893 	/* not ready yet */
894 	if (!TransactionIdIsNormal(builder->xmin))
895 		return;
896 
897 	/* TODO: Neater algorithm than just copying and iterating? */
898 	workspace =
899 		MemoryContextAlloc(builder->context,
900 						   builder->committed.xcnt * sizeof(TransactionId));
901 
902 	/* copy xids that still are interesting to workspace */
903 	for (off = 0; off < builder->committed.xcnt; off++)
904 	{
905 		if (NormalTransactionIdPrecedes(builder->committed.xip[off],
906 										builder->xmin))
907 			;					/* remove */
908 		else
909 			workspace[surviving_xids++] = builder->committed.xip[off];
910 	}
911 
912 	/* copy workspace back to persistent state */
913 	memcpy(builder->committed.xip, workspace,
914 		   surviving_xids * sizeof(TransactionId));
915 
916 	elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
917 		 (uint32) builder->committed.xcnt, (uint32) surviving_xids,
918 		 builder->xmin, builder->xmax);
919 	builder->committed.xcnt = surviving_xids;
920 
921 	pfree(workspace);
922 }
923 
924 /*
925  * Handle everything that needs to be done when a transaction commits
926  */
927 void
SnapBuildCommitTxn(SnapBuild * builder,XLogRecPtr lsn,TransactionId xid,int nsubxacts,TransactionId * subxacts)928 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
929 				   int nsubxacts, TransactionId *subxacts)
930 {
931 	int			nxact;
932 
933 	bool		needs_snapshot = false;
934 	bool		needs_timetravel = false;
935 	bool		sub_needs_timetravel = false;
936 
937 	TransactionId xmax = xid;
938 
939 	/*
940 	 * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
941 	 * will they be part of a snapshot.  So we don't need to record anything.
942 	 */
943 	if (builder->state == SNAPBUILD_START ||
944 		(builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
945 		 TransactionIdPrecedes(xid, builder->next_phase_at)))
946 	{
947 		/* ensure that only commits after this are getting replayed */
948 		if (builder->start_decoding_at <= lsn)
949 			builder->start_decoding_at = lsn + 1;
950 		return;
951 	}
952 
953 	if (builder->state < SNAPBUILD_CONSISTENT)
954 	{
955 		/* ensure that only commits after this are getting replayed */
956 		if (builder->start_decoding_at <= lsn)
957 			builder->start_decoding_at = lsn + 1;
958 
959 		/*
960 		 * If building an exportable snapshot, force xid to be tracked, even
961 		 * if the transaction didn't modify the catalog.
962 		 */
963 		if (builder->building_full_snapshot)
964 		{
965 			needs_timetravel = true;
966 		}
967 	}
968 
969 	for (nxact = 0; nxact < nsubxacts; nxact++)
970 	{
971 		TransactionId subxid = subxacts[nxact];
972 
973 		/*
974 		 * Add subtransaction to base snapshot if catalog modifying, we don't
975 		 * distinguish to toplevel transactions there.
976 		 */
977 		if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
978 		{
979 			sub_needs_timetravel = true;
980 			needs_snapshot = true;
981 
982 			elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
983 				 xid, subxid);
984 
985 			SnapBuildAddCommittedTxn(builder, subxid);
986 
987 			if (NormalTransactionIdFollows(subxid, xmax))
988 				xmax = subxid;
989 		}
990 
991 		/*
992 		 * If we're forcing timetravel we also need visibility information
993 		 * about subtransaction, so keep track of subtransaction's state, even
994 		 * if not catalog modifying.  Don't need to distribute a snapshot in
995 		 * that case.
996 		 */
997 		else if (needs_timetravel)
998 		{
999 			SnapBuildAddCommittedTxn(builder, subxid);
1000 			if (NormalTransactionIdFollows(subxid, xmax))
1001 				xmax = subxid;
1002 		}
1003 	}
1004 
1005 	/* if top-level modified catalog, it'll need a snapshot */
1006 	if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1007 	{
1008 		elog(DEBUG2, "found top level transaction %u, with catalog changes",
1009 			 xid);
1010 		needs_snapshot = true;
1011 		needs_timetravel = true;
1012 		SnapBuildAddCommittedTxn(builder, xid);
1013 	}
1014 	else if (sub_needs_timetravel)
1015 	{
1016 		/* track toplevel txn as well, subxact alone isn't meaningful */
1017 		SnapBuildAddCommittedTxn(builder, xid);
1018 	}
1019 	else if (needs_timetravel)
1020 	{
1021 		elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1022 
1023 		SnapBuildAddCommittedTxn(builder, xid);
1024 	}
1025 
1026 	if (!needs_timetravel)
1027 	{
1028 		/* record that we cannot export a general snapshot anymore */
1029 		builder->committed.includes_all_transactions = false;
1030 	}
1031 
1032 	Assert(!needs_snapshot || needs_timetravel);
1033 
1034 	/*
1035 	 * Adjust xmax of the snapshot builder, we only do that for committed,
1036 	 * catalog modifying, transactions, everything else isn't interesting for
1037 	 * us since we'll never look at the respective rows.
1038 	 */
1039 	if (needs_timetravel &&
1040 		(!TransactionIdIsValid(builder->xmax) ||
1041 		 TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1042 	{
1043 		builder->xmax = xmax;
1044 		TransactionIdAdvance(builder->xmax);
1045 	}
1046 
1047 	/* if there's any reason to build a historic snapshot, do so now */
1048 	if (needs_snapshot)
1049 	{
1050 		/*
1051 		 * If we haven't built a complete snapshot yet there's no need to hand
1052 		 * it out, it wouldn't (and couldn't) be used anyway.
1053 		 */
1054 		if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1055 			return;
1056 
1057 		/*
1058 		 * Decrease the snapshot builder's refcount of the old snapshot, note
1059 		 * that it still will be used if it has been handed out to the
1060 		 * reorderbuffer earlier.
1061 		 */
1062 		if (builder->snapshot)
1063 			SnapBuildSnapDecRefcount(builder->snapshot);
1064 
1065 		builder->snapshot = SnapBuildBuildSnapshot(builder);
1066 
1067 		/* we might need to execute invalidations, add snapshot */
1068 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1069 		{
1070 			SnapBuildSnapIncRefcount(builder->snapshot);
1071 			ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1072 										 builder->snapshot);
1073 		}
1074 
1075 		/* refcount of the snapshot builder for the new snapshot */
1076 		SnapBuildSnapIncRefcount(builder->snapshot);
1077 
1078 		/* add a new catalog snapshot to all currently running transactions */
1079 		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1080 	}
1081 }
1082 
1083 
1084 /* -----------------------------------
1085  * Snapshot building functions dealing with xlog records
1086  * -----------------------------------
1087  */
1088 
1089 /*
1090  * Process a running xacts record, and use its information to first build a
1091  * historic snapshot and later to release resources that aren't needed
1092  * anymore.
1093  */
1094 void
SnapBuildProcessRunningXacts(SnapBuild * builder,XLogRecPtr lsn,xl_running_xacts * running)1095 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1096 {
1097 	ReorderBufferTXN *txn;
1098 	TransactionId xmin;
1099 
1100 	/*
1101 	 * If we're not consistent yet, inspect the record to see whether it
1102 	 * allows to get closer to being consistent. If we are consistent, dump
1103 	 * our snapshot so others or we, after a restart, can use it.
1104 	 */
1105 	if (builder->state < SNAPBUILD_CONSISTENT)
1106 	{
1107 		/* returns false if there's no point in performing cleanup just yet */
1108 		if (!SnapBuildFindSnapshot(builder, lsn, running))
1109 			return;
1110 	}
1111 	else
1112 		SnapBuildSerialize(builder, lsn);
1113 
1114 	/*
1115 	 * Update range of interesting xids based on the running xacts
1116 	 * information. We don't increase ->xmax using it, because once we are in
1117 	 * a consistent state we can do that ourselves and much more efficiently
1118 	 * so, because we only need to do it for catalog transactions since we
1119 	 * only ever look at those.
1120 	 *
1121 	 * NB: We only increase xmax when a catalog modifying transaction commits
1122 	 * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
1123 	 * xmin, which looks odd but is correct and actually more efficient, since
1124 	 * we hit fast paths in heapam_visibility.c.
1125 	 */
1126 	builder->xmin = running->oldestRunningXid;
1127 
1128 	/* Remove transactions we don't need to keep track off anymore */
1129 	SnapBuildPurgeCommittedTxn(builder);
1130 
1131 	/*
1132 	 * Advance the xmin limit for the current replication slot, to allow
1133 	 * vacuum to clean up the tuples this slot has been protecting.
1134 	 *
1135 	 * The reorderbuffer might have an xmin among the currently running
1136 	 * snapshots; use it if so.  If not, we need only consider the snapshots
1137 	 * we'll produce later, which can't be less than the oldest running xid in
1138 	 * the record we're reading now.
1139 	 */
1140 	xmin = ReorderBufferGetOldestXmin(builder->reorder);
1141 	if (xmin == InvalidTransactionId)
1142 		xmin = running->oldestRunningXid;
1143 	elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1144 		 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1145 	LogicalIncreaseXminForSlot(lsn, xmin);
1146 
1147 	/*
1148 	 * Also tell the slot where we can restart decoding from. We don't want to
1149 	 * do that after every commit because changing that implies an fsync of
1150 	 * the logical slot's state file, so we only do it every time we see a
1151 	 * running xacts record.
1152 	 *
1153 	 * Do so by looking for the oldest in progress transaction (determined by
1154 	 * the first LSN of any of its relevant records). Every transaction
1155 	 * remembers the last location we stored the snapshot to disk before its
1156 	 * beginning. That point is where we can restart from.
1157 	 */
1158 
1159 	/*
1160 	 * Can't know about a serialized snapshot's location if we're not
1161 	 * consistent.
1162 	 */
1163 	if (builder->state < SNAPBUILD_CONSISTENT)
1164 		return;
1165 
1166 	txn = ReorderBufferGetOldestTXN(builder->reorder);
1167 
1168 	/*
1169 	 * oldest ongoing txn might have started when we didn't yet serialize
1170 	 * anything because we hadn't reached a consistent state yet.
1171 	 */
1172 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1173 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1174 
1175 	/*
1176 	 * No in-progress transaction, can reuse the last serialized snapshot if
1177 	 * we have one.
1178 	 */
1179 	else if (txn == NULL &&
1180 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1181 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
1182 		LogicalIncreaseRestartDecodingForSlot(lsn,
1183 											  builder->last_serialized_snapshot);
1184 }
1185 
1186 
1187 /*
1188  * Build the start of a snapshot that's capable of decoding the catalog.
1189  *
1190  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1191  * consistent.
1192  *
1193  * Returns true if there is a point in performing internal maintenance/cleanup
1194  * using the xl_running_xacts record.
1195  */
1196 static bool
SnapBuildFindSnapshot(SnapBuild * builder,XLogRecPtr lsn,xl_running_xacts * running)1197 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1198 {
1199 	/* ---
1200 	 * Build catalog decoding snapshot incrementally using information about
1201 	 * the currently running transactions. There are several ways to do that:
1202 	 *
1203 	 * a) There were no running transactions when the xl_running_xacts record
1204 	 *	  was inserted, jump to CONSISTENT immediately. We might find such a
1205 	 *	  state while waiting on c)'s sub-states.
1206 	 *
1207 	 * b) This (in a previous run) or another decoding slot serialized a
1208 	 *	  snapshot to disk that we can use.  Can't use this method for the
1209 	 *	  initial snapshot when slot is being created and needs full snapshot
1210 	 *	  for export or direct use, as that snapshot will only contain catalog
1211 	 *	  modifying transactions.
1212 	 *
1213 	 * c) First incrementally build a snapshot for catalog tuples
1214 	 *	  (BUILDING_SNAPSHOT), that requires all, already in-progress,
1215 	 *	  transactions to finish.  Every transaction starting after that
1216 	 *	  (FULL_SNAPSHOT state), has enough information to be decoded.  But
1217 	 *	  for older running transactions no viable snapshot exists yet, so
1218 	 *	  CONSISTENT will only be reached once all of those have finished.
1219 	 * ---
1220 	 */
1221 
1222 	/*
1223 	 * xl_running_xact record is older than what we can use, we might not have
1224 	 * all necessary catalog rows anymore.
1225 	 */
1226 	if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1227 		NormalTransactionIdPrecedes(running->oldestRunningXid,
1228 									builder->initial_xmin_horizon))
1229 	{
1230 		ereport(DEBUG1,
1231 				(errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1232 								 LSN_FORMAT_ARGS(lsn)),
1233 				 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1234 									builder->initial_xmin_horizon, running->oldestRunningXid)));
1235 
1236 
1237 		SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1238 
1239 		return true;
1240 	}
1241 
1242 	/*
1243 	 * a) No transaction were running, we can jump to consistent.
1244 	 *
1245 	 * This is not affected by races around xl_running_xacts, because we can
1246 	 * miss transaction commits, but currently not transactions starting.
1247 	 *
1248 	 * NB: We might have already started to incrementally assemble a snapshot,
1249 	 * so we need to be careful to deal with that.
1250 	 */
1251 	if (running->oldestRunningXid == running->nextXid)
1252 	{
1253 		if (builder->start_decoding_at == InvalidXLogRecPtr ||
1254 			builder->start_decoding_at <= lsn)
1255 			/* can decode everything after this */
1256 			builder->start_decoding_at = lsn + 1;
1257 
1258 		/* As no transactions were running xmin/xmax can be trivially set. */
1259 		builder->xmin = running->nextXid;	/* < are finished */
1260 		builder->xmax = running->nextXid;	/* >= are running */
1261 
1262 		/* so we can safely use the faster comparisons */
1263 		Assert(TransactionIdIsNormal(builder->xmin));
1264 		Assert(TransactionIdIsNormal(builder->xmax));
1265 
1266 		builder->state = SNAPBUILD_CONSISTENT;
1267 		builder->next_phase_at = InvalidTransactionId;
1268 
1269 		ereport(LOG,
1270 				(errmsg("logical decoding found consistent point at %X/%X",
1271 						LSN_FORMAT_ARGS(lsn)),
1272 				 errdetail("There are no running transactions.")));
1273 
1274 		return false;
1275 	}
1276 	/* b) valid on disk state and not building full snapshot */
1277 	else if (!builder->building_full_snapshot &&
1278 			 SnapBuildRestore(builder, lsn))
1279 	{
1280 		/* there won't be any state to cleanup */
1281 		return false;
1282 	}
1283 
1284 	/*
1285 	 * c) transition from START to BUILDING_SNAPSHOT.
1286 	 *
1287 	 * In START state, and a xl_running_xacts record with running xacts is
1288 	 * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
1289 	 * record xl_running_xacts->nextXid.  Once all running xacts have finished
1290 	 * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
1291 	 * might look that we could use xl_running_xact's ->xids information to
1292 	 * get there quicker, but that is problematic because transactions marked
1293 	 * as running, might already have inserted their commit record - it's
1294 	 * infeasible to change that with locking.
1295 	 */
1296 	else if (builder->state == SNAPBUILD_START)
1297 	{
1298 		builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1299 		builder->next_phase_at = running->nextXid;
1300 
1301 		/*
1302 		 * Start with an xmin/xmax that's correct for future, when all the
1303 		 * currently running transactions have finished. We'll update both
1304 		 * while waiting for the pending transactions to finish.
1305 		 */
1306 		builder->xmin = running->nextXid;	/* < are finished */
1307 		builder->xmax = running->nextXid;	/* >= are running */
1308 
1309 		/* so we can safely use the faster comparisons */
1310 		Assert(TransactionIdIsNormal(builder->xmin));
1311 		Assert(TransactionIdIsNormal(builder->xmax));
1312 
1313 		ereport(LOG,
1314 				(errmsg("logical decoding found initial starting point at %X/%X",
1315 						LSN_FORMAT_ARGS(lsn)),
1316 				 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1317 						   running->xcnt, running->nextXid)));
1318 
1319 		SnapBuildWaitSnapshot(running, running->nextXid);
1320 	}
1321 
1322 	/*
1323 	 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1324 	 *
1325 	 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1326 	 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
1327 	 * means all transactions starting afterwards have enough information to
1328 	 * be decoded.  Switch to FULL_SNAPSHOT.
1329 	 */
1330 	else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1331 			 TransactionIdPrecedesOrEquals(builder->next_phase_at,
1332 										   running->oldestRunningXid))
1333 	{
1334 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
1335 		builder->next_phase_at = running->nextXid;
1336 
1337 		ereport(LOG,
1338 				(errmsg("logical decoding found initial consistent point at %X/%X",
1339 						LSN_FORMAT_ARGS(lsn)),
1340 				 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1341 						   running->xcnt, running->nextXid)));
1342 
1343 		SnapBuildWaitSnapshot(running, running->nextXid);
1344 	}
1345 
1346 	/*
1347 	 * c) transition from FULL_SNAPSHOT to CONSISTENT.
1348 	 *
1349 	 * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
1350 	 * oldestRunningXid is >= than nextXid from when we switched to
1351 	 * FULL_SNAPSHOT.  This means all transactions that are currently in
1352 	 * progress have a catalog snapshot, and all their changes have been
1353 	 * collected.  Switch to CONSISTENT.
1354 	 */
1355 	else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1356 			 TransactionIdPrecedesOrEquals(builder->next_phase_at,
1357 										   running->oldestRunningXid))
1358 	{
1359 		builder->state = SNAPBUILD_CONSISTENT;
1360 		builder->next_phase_at = InvalidTransactionId;
1361 
1362 		ereport(LOG,
1363 				(errmsg("logical decoding found consistent point at %X/%X",
1364 						LSN_FORMAT_ARGS(lsn)),
1365 				 errdetail("There are no old transactions anymore.")));
1366 	}
1367 
1368 	/*
1369 	 * We already started to track running xacts and need to wait for all
1370 	 * in-progress ones to finish. We fall through to the normal processing of
1371 	 * records so incremental cleanup can be performed.
1372 	 */
1373 	return true;
1374 
1375 }
1376 
1377 /* ---
1378  * Iterate through xids in record, wait for all older than the cutoff to
1379  * finish.  Then, if possible, log a new xl_running_xacts record.
1380  *
1381  * This isn't required for the correctness of decoding, but to:
1382  * a) allow isolationtester to notice that we're currently waiting for
1383  *	  something.
1384  * b) log a new xl_running_xacts record where it'd be helpful, without having
1385  *	  to wait for bgwriter or checkpointer.
1386  * ---
1387  */
1388 static void
SnapBuildWaitSnapshot(xl_running_xacts * running,TransactionId cutoff)1389 SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1390 {
1391 	int			off;
1392 
1393 	for (off = 0; off < running->xcnt; off++)
1394 	{
1395 		TransactionId xid = running->xids[off];
1396 
1397 		/*
1398 		 * Upper layers should prevent that we ever need to wait on ourselves.
1399 		 * Check anyway, since failing to do so would either result in an
1400 		 * endless wait or an Assert() failure.
1401 		 */
1402 		if (TransactionIdIsCurrentTransactionId(xid))
1403 			elog(ERROR, "waiting for ourselves");
1404 
1405 		if (TransactionIdFollows(xid, cutoff))
1406 			continue;
1407 
1408 		XactLockTableWait(xid, NULL, NULL, XLTW_None);
1409 	}
1410 
1411 	/*
1412 	 * All transactions we needed to finish finished - try to ensure there is
1413 	 * another xl_running_xacts record in a timely manner, without having to
1414 	 * wait for bgwriter or checkpointer to log one.  During recovery we can't
1415 	 * enforce that, so we'll have to wait.
1416 	 */
1417 	if (!RecoveryInProgress())
1418 	{
1419 		LogStandbySnapshot();
1420 	}
1421 }
1422 
1423 /* -----------------------------------
1424  * Snapshot serialization support
1425  * -----------------------------------
1426  */
1427 
1428 /*
1429  * We store current state of struct SnapBuild on disk in the following manner:
1430  *
1431  * struct SnapBuildOnDisk;
1432  * TransactionId * running.xcnt_space;
1433  * TransactionId * committed.xcnt; (*not xcnt_space*)
1434  *
1435  */
1436 typedef struct SnapBuildOnDisk
1437 {
1438 	/* first part of this struct needs to be version independent */
1439 
1440 	/* data not covered by checksum */
1441 	uint32		magic;
1442 	pg_crc32c	checksum;
1443 
1444 	/* data covered by checksum */
1445 
1446 	/* version, in case we want to support pg_upgrade */
1447 	uint32		version;
1448 	/* how large is the on disk data, excluding the constant sized part */
1449 	uint32		length;
1450 
1451 	/* version dependent part */
1452 	SnapBuild	builder;
1453 
1454 	/* variable amount of TransactionIds follows */
1455 } SnapBuildOnDisk;
1456 
1457 #define SnapBuildOnDiskConstantSize \
1458 	offsetof(SnapBuildOnDisk, builder)
1459 #define SnapBuildOnDiskNotChecksummedSize \
1460 	offsetof(SnapBuildOnDisk, version)
1461 
1462 #define SNAPBUILD_MAGIC 0x51A1E001
1463 #define SNAPBUILD_VERSION 4
1464 
1465 /*
1466  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1467  *
1468  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1469  * a record that's a potential location for a serialized snapshot.
1470  */
1471 void
SnapBuildSerializationPoint(SnapBuild * builder,XLogRecPtr lsn)1472 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1473 {
1474 	if (builder->state < SNAPBUILD_CONSISTENT)
1475 		SnapBuildRestore(builder, lsn);
1476 	else
1477 		SnapBuildSerialize(builder, lsn);
1478 }
1479 
1480 /*
1481  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1482  * been done by another decoding process.
1483  */
1484 static void
SnapBuildSerialize(SnapBuild * builder,XLogRecPtr lsn)1485 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1486 {
1487 	Size		needed_length;
1488 	SnapBuildOnDisk *ondisk = NULL;
1489 	char	   *ondisk_c;
1490 	int			fd;
1491 	char		tmppath[MAXPGPATH];
1492 	char		path[MAXPGPATH];
1493 	int			ret;
1494 	struct stat stat_buf;
1495 	Size		sz;
1496 
1497 	Assert(lsn != InvalidXLogRecPtr);
1498 	Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1499 		   builder->last_serialized_snapshot <= lsn);
1500 
1501 	/*
1502 	 * no point in serializing if we cannot continue to work immediately after
1503 	 * restoring the snapshot
1504 	 */
1505 	if (builder->state < SNAPBUILD_CONSISTENT)
1506 		return;
1507 
1508 	/* consistent snapshots have no next phase */
1509 	Assert(builder->next_phase_at == InvalidTransactionId);
1510 
1511 	/*
1512 	 * We identify snapshots by the LSN they are valid for. We don't need to
1513 	 * include timelines in the name as each LSN maps to exactly one timeline
1514 	 * unless the user used pg_resetwal or similar. If a user did so, there's
1515 	 * no hope continuing to decode anyway.
1516 	 */
1517 	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1518 			LSN_FORMAT_ARGS(lsn));
1519 
1520 	/*
1521 	 * first check whether some other backend already has written the snapshot
1522 	 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1523 	 * as a valid state. Everything else is an unexpected error.
1524 	 */
1525 	ret = stat(path, &stat_buf);
1526 
1527 	if (ret != 0 && errno != ENOENT)
1528 		ereport(ERROR,
1529 				(errcode_for_file_access(),
1530 				 errmsg("could not stat file \"%s\": %m", path)));
1531 
1532 	else if (ret == 0)
1533 	{
1534 		/*
1535 		 * somebody else has already serialized to this point, don't overwrite
1536 		 * but remember location, so we don't need to read old data again.
1537 		 *
1538 		 * To be sure it has been synced to disk after the rename() from the
1539 		 * tempfile filename to the real filename, we just repeat the fsync.
1540 		 * That ought to be cheap because in most scenarios it should already
1541 		 * be safely on disk.
1542 		 */
1543 		fsync_fname(path, false);
1544 		fsync_fname("pg_logical/snapshots", true);
1545 
1546 		builder->last_serialized_snapshot = lsn;
1547 		goto out;
1548 	}
1549 
1550 	/*
1551 	 * there is an obvious race condition here between the time we stat(2) the
1552 	 * file and us writing the file. But we rename the file into place
1553 	 * atomically and all files created need to contain the same data anyway,
1554 	 * so this is perfectly fine, although a bit of a resource waste. Locking
1555 	 * seems like pointless complication.
1556 	 */
1557 	elog(DEBUG1, "serializing snapshot to %s", path);
1558 
1559 	/* to make sure only we will write to this tempfile, include pid */
1560 	sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
1561 			LSN_FORMAT_ARGS(lsn), MyProcPid);
1562 
1563 	/*
1564 	 * Unlink temporary file if it already exists, needs to have been before a
1565 	 * crash/error since we won't enter this function twice from within a
1566 	 * single decoding slot/backend and the temporary file contains the pid of
1567 	 * the current process.
1568 	 */
1569 	if (unlink(tmppath) != 0 && errno != ENOENT)
1570 		ereport(ERROR,
1571 				(errcode_for_file_access(),
1572 				 errmsg("could not remove file \"%s\": %m", tmppath)));
1573 
1574 	needed_length = sizeof(SnapBuildOnDisk) +
1575 		sizeof(TransactionId) * builder->committed.xcnt;
1576 
1577 	ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1578 	ondisk = (SnapBuildOnDisk *) ondisk_c;
1579 	ondisk->magic = SNAPBUILD_MAGIC;
1580 	ondisk->version = SNAPBUILD_VERSION;
1581 	ondisk->length = needed_length;
1582 	INIT_CRC32C(ondisk->checksum);
1583 	COMP_CRC32C(ondisk->checksum,
1584 				((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1585 				SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1586 	ondisk_c += sizeof(SnapBuildOnDisk);
1587 
1588 	memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1589 	/* NULL-ify memory-only data */
1590 	ondisk->builder.context = NULL;
1591 	ondisk->builder.snapshot = NULL;
1592 	ondisk->builder.reorder = NULL;
1593 	ondisk->builder.committed.xip = NULL;
1594 
1595 	COMP_CRC32C(ondisk->checksum,
1596 				&ondisk->builder,
1597 				sizeof(SnapBuild));
1598 
1599 	/* copy committed xacts */
1600 	sz = sizeof(TransactionId) * builder->committed.xcnt;
1601 	memcpy(ondisk_c, builder->committed.xip, sz);
1602 	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1603 	ondisk_c += sz;
1604 
1605 	FIN_CRC32C(ondisk->checksum);
1606 
1607 	/* we have valid data now, open tempfile and write it there */
1608 	fd = OpenTransientFile(tmppath,
1609 						   O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1610 	if (fd < 0)
1611 		ereport(ERROR,
1612 				(errcode_for_file_access(),
1613 				 errmsg("could not open file \"%s\": %m", tmppath)));
1614 
1615 	errno = 0;
1616 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1617 	if ((write(fd, ondisk, needed_length)) != needed_length)
1618 	{
1619 		int			save_errno = errno;
1620 
1621 		CloseTransientFile(fd);
1622 
1623 		/* if write didn't set errno, assume problem is no disk space */
1624 		errno = save_errno ? save_errno : ENOSPC;
1625 		ereport(ERROR,
1626 				(errcode_for_file_access(),
1627 				 errmsg("could not write to file \"%s\": %m", tmppath)));
1628 	}
1629 	pgstat_report_wait_end();
1630 
1631 	/*
1632 	 * fsync the file before renaming so that even if we crash after this we
1633 	 * have either a fully valid file or nothing.
1634 	 *
1635 	 * It's safe to just ERROR on fsync() here because we'll retry the whole
1636 	 * operation including the writes.
1637 	 *
1638 	 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1639 	 * some noticeable overhead since it's performed synchronously during
1640 	 * decoding?
1641 	 */
1642 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1643 	if (pg_fsync(fd) != 0)
1644 	{
1645 		int			save_errno = errno;
1646 
1647 		CloseTransientFile(fd);
1648 		errno = save_errno;
1649 		ereport(ERROR,
1650 				(errcode_for_file_access(),
1651 				 errmsg("could not fsync file \"%s\": %m", tmppath)));
1652 	}
1653 	pgstat_report_wait_end();
1654 
1655 	if (CloseTransientFile(fd) != 0)
1656 		ereport(ERROR,
1657 				(errcode_for_file_access(),
1658 				 errmsg("could not close file \"%s\": %m", tmppath)));
1659 
1660 	fsync_fname("pg_logical/snapshots", true);
1661 
1662 	/*
1663 	 * We may overwrite the work from some other backend, but that's ok, our
1664 	 * snapshot is valid as well, we'll just have done some superfluous work.
1665 	 */
1666 	if (rename(tmppath, path) != 0)
1667 	{
1668 		ereport(ERROR,
1669 				(errcode_for_file_access(),
1670 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
1671 						tmppath, path)));
1672 	}
1673 
1674 	/* make sure we persist */
1675 	fsync_fname(path, false);
1676 	fsync_fname("pg_logical/snapshots", true);
1677 
1678 	/*
1679 	 * Now there's no way we can loose the dumped state anymore, remember this
1680 	 * as a serialization point.
1681 	 */
1682 	builder->last_serialized_snapshot = lsn;
1683 
1684 out:
1685 	ReorderBufferSetRestartPoint(builder->reorder,
1686 								 builder->last_serialized_snapshot);
1687 	/* be tidy */
1688 	if (ondisk)
1689 		pfree(ondisk);
1690 }
1691 
1692 /*
1693  * Restore a snapshot into 'builder' if previously one has been stored at the
1694  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1695  */
1696 static bool
SnapBuildRestore(SnapBuild * builder,XLogRecPtr lsn)1697 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1698 {
1699 	SnapBuildOnDisk ondisk;
1700 	int			fd;
1701 	char		path[MAXPGPATH];
1702 	Size		sz;
1703 	int			readBytes;
1704 	pg_crc32c	checksum;
1705 
1706 	/* no point in loading a snapshot if we're already there */
1707 	if (builder->state == SNAPBUILD_CONSISTENT)
1708 		return false;
1709 
1710 	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1711 			LSN_FORMAT_ARGS(lsn));
1712 
1713 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1714 
1715 	if (fd < 0 && errno == ENOENT)
1716 		return false;
1717 	else if (fd < 0)
1718 		ereport(ERROR,
1719 				(errcode_for_file_access(),
1720 				 errmsg("could not open file \"%s\": %m", path)));
1721 
1722 	/* ----
1723 	 * Make sure the snapshot had been stored safely to disk, that's normally
1724 	 * cheap.
1725 	 * Note that we do not need PANIC here, nobody will be able to use the
1726 	 * slot without fsyncing, and saving it won't succeed without an fsync()
1727 	 * either...
1728 	 * ----
1729 	 */
1730 	fsync_fname(path, false);
1731 	fsync_fname("pg_logical/snapshots", true);
1732 
1733 
1734 	/* read statically sized portion of snapshot */
1735 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1736 	readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1737 	pgstat_report_wait_end();
1738 	if (readBytes != SnapBuildOnDiskConstantSize)
1739 	{
1740 		int			save_errno = errno;
1741 
1742 		CloseTransientFile(fd);
1743 
1744 		if (readBytes < 0)
1745 		{
1746 			errno = save_errno;
1747 			ereport(ERROR,
1748 					(errcode_for_file_access(),
1749 					 errmsg("could not read file \"%s\": %m", path)));
1750 		}
1751 		else
1752 			ereport(ERROR,
1753 					(errcode(ERRCODE_DATA_CORRUPTED),
1754 					 errmsg("could not read file \"%s\": read %d of %zu",
1755 							path, readBytes,
1756 							(Size) SnapBuildOnDiskConstantSize)));
1757 	}
1758 
1759 	if (ondisk.magic != SNAPBUILD_MAGIC)
1760 		ereport(ERROR,
1761 				(errcode(ERRCODE_DATA_CORRUPTED),
1762 				 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1763 						path, ondisk.magic, SNAPBUILD_MAGIC)));
1764 
1765 	if (ondisk.version != SNAPBUILD_VERSION)
1766 		ereport(ERROR,
1767 				(errcode(ERRCODE_DATA_CORRUPTED),
1768 				 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1769 						path, ondisk.version, SNAPBUILD_VERSION)));
1770 
1771 	INIT_CRC32C(checksum);
1772 	COMP_CRC32C(checksum,
1773 				((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1774 				SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1775 
1776 	/* read SnapBuild */
1777 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1778 	readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1779 	pgstat_report_wait_end();
1780 	if (readBytes != sizeof(SnapBuild))
1781 	{
1782 		int			save_errno = errno;
1783 
1784 		CloseTransientFile(fd);
1785 
1786 		if (readBytes < 0)
1787 		{
1788 			errno = save_errno;
1789 			ereport(ERROR,
1790 					(errcode_for_file_access(),
1791 					 errmsg("could not read file \"%s\": %m", path)));
1792 		}
1793 		else
1794 			ereport(ERROR,
1795 					(errcode(ERRCODE_DATA_CORRUPTED),
1796 					 errmsg("could not read file \"%s\": read %d of %zu",
1797 							path, readBytes, sizeof(SnapBuild))));
1798 	}
1799 	COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1800 
1801 	/* restore committed xacts information */
1802 	sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1803 	ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1804 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1805 	readBytes = read(fd, ondisk.builder.committed.xip, sz);
1806 	pgstat_report_wait_end();
1807 	if (readBytes != sz)
1808 	{
1809 		int			save_errno = errno;
1810 
1811 		CloseTransientFile(fd);
1812 
1813 		if (readBytes < 0)
1814 		{
1815 			errno = save_errno;
1816 			ereport(ERROR,
1817 					(errcode_for_file_access(),
1818 					 errmsg("could not read file \"%s\": %m", path)));
1819 		}
1820 		else
1821 			ereport(ERROR,
1822 					(errcode(ERRCODE_DATA_CORRUPTED),
1823 					 errmsg("could not read file \"%s\": read %d of %zu",
1824 							path, readBytes, sz)));
1825 	}
1826 	COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1827 
1828 	if (CloseTransientFile(fd) != 0)
1829 		ereport(ERROR,
1830 				(errcode_for_file_access(),
1831 				 errmsg("could not close file \"%s\": %m", path)));
1832 
1833 	FIN_CRC32C(checksum);
1834 
1835 	/* verify checksum of what we've read */
1836 	if (!EQ_CRC32C(checksum, ondisk.checksum))
1837 		ereport(ERROR,
1838 				(errcode(ERRCODE_DATA_CORRUPTED),
1839 				 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1840 						path, checksum, ondisk.checksum)));
1841 
1842 	/*
1843 	 * ok, we now have a sensible snapshot here, figure out if it has more
1844 	 * information than we have.
1845 	 */
1846 
1847 	/*
1848 	 * We are only interested in consistent snapshots for now, comparing
1849 	 * whether one incomplete snapshot is more "advanced" seems to be
1850 	 * unnecessarily complex.
1851 	 */
1852 	if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1853 		goto snapshot_not_interesting;
1854 
1855 	/*
1856 	 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1857 	 * be available.
1858 	 */
1859 	if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1860 		goto snapshot_not_interesting;
1861 
1862 	/* consistent snapshots have no next phase */
1863 	Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1864 
1865 	/* ok, we think the snapshot is sensible, copy over everything important */
1866 	builder->xmin = ondisk.builder.xmin;
1867 	builder->xmax = ondisk.builder.xmax;
1868 	builder->state = ondisk.builder.state;
1869 
1870 	builder->committed.xcnt = ondisk.builder.committed.xcnt;
1871 	/* We only allocated/stored xcnt, not xcnt_space xids ! */
1872 	/* don't overwrite preallocated xip, if we don't have anything here */
1873 	if (builder->committed.xcnt > 0)
1874 	{
1875 		pfree(builder->committed.xip);
1876 		builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1877 		builder->committed.xip = ondisk.builder.committed.xip;
1878 	}
1879 	ondisk.builder.committed.xip = NULL;
1880 
1881 	/* our snapshot is not interesting anymore, build a new one */
1882 	if (builder->snapshot != NULL)
1883 	{
1884 		SnapBuildSnapDecRefcount(builder->snapshot);
1885 	}
1886 	builder->snapshot = SnapBuildBuildSnapshot(builder);
1887 	SnapBuildSnapIncRefcount(builder->snapshot);
1888 
1889 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
1890 
1891 	Assert(builder->state == SNAPBUILD_CONSISTENT);
1892 
1893 	ereport(LOG,
1894 			(errmsg("logical decoding found consistent point at %X/%X",
1895 					LSN_FORMAT_ARGS(lsn)),
1896 			 errdetail("Logical decoding will begin using saved snapshot.")));
1897 	return true;
1898 
1899 snapshot_not_interesting:
1900 	if (ondisk.builder.committed.xip != NULL)
1901 		pfree(ondisk.builder.committed.xip);
1902 	return false;
1903 }
1904 
1905 /*
1906  * Remove all serialized snapshots that are not required anymore because no
1907  * slot can need them. This doesn't actually have to run during a checkpoint,
1908  * but it's a convenient point to schedule this.
1909  *
1910  * NB: We run this during checkpoints even if logical decoding is disabled so
1911  * we cleanup old slots at some point after it got disabled.
1912  */
1913 void
CheckPointSnapBuild(void)1914 CheckPointSnapBuild(void)
1915 {
1916 	XLogRecPtr	cutoff;
1917 	XLogRecPtr	redo;
1918 	DIR		   *snap_dir;
1919 	struct dirent *snap_de;
1920 	char		path[MAXPGPATH + 21];
1921 
1922 	/*
1923 	 * We start off with a minimum of the last redo pointer. No new
1924 	 * replication slot will start before that, so that's a safe upper bound
1925 	 * for removal.
1926 	 */
1927 	redo = GetRedoRecPtr();
1928 
1929 	/* now check for the restart ptrs from existing slots */
1930 	cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1931 
1932 	/* don't start earlier than the restart lsn */
1933 	if (redo < cutoff)
1934 		cutoff = redo;
1935 
1936 	snap_dir = AllocateDir("pg_logical/snapshots");
1937 	while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1938 	{
1939 		uint32		hi;
1940 		uint32		lo;
1941 		XLogRecPtr	lsn;
1942 		struct stat statbuf;
1943 
1944 		if (strcmp(snap_de->d_name, ".") == 0 ||
1945 			strcmp(snap_de->d_name, "..") == 0)
1946 			continue;
1947 
1948 		snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
1949 
1950 		if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1951 		{
1952 			elog(DEBUG1, "only regular files expected: %s", path);
1953 			continue;
1954 		}
1955 
1956 		/*
1957 		 * temporary filenames from SnapBuildSerialize() include the LSN and
1958 		 * everything but are postfixed by .$pid.tmp. We can just remove them
1959 		 * the same as other files because there can be none that are
1960 		 * currently being written that are older than cutoff.
1961 		 *
1962 		 * We just log a message if a file doesn't fit the pattern, it's
1963 		 * probably some editors lock/state file or similar...
1964 		 */
1965 		if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1966 		{
1967 			ereport(LOG,
1968 					(errmsg("could not parse file name \"%s\"", path)));
1969 			continue;
1970 		}
1971 
1972 		lsn = ((uint64) hi) << 32 | lo;
1973 
1974 		/* check whether we still need it */
1975 		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1976 		{
1977 			elog(DEBUG1, "removing snapbuild snapshot %s", path);
1978 
1979 			/*
1980 			 * It's not particularly harmful, though strange, if we can't
1981 			 * remove the file here. Don't prevent the checkpoint from
1982 			 * completing, that'd be a cure worse than the disease.
1983 			 */
1984 			if (unlink(path) < 0)
1985 			{
1986 				ereport(LOG,
1987 						(errcode_for_file_access(),
1988 						 errmsg("could not remove file \"%s\": %m",
1989 								path)));
1990 				continue;
1991 			}
1992 		}
1993 	}
1994 	FreeDir(snap_dir);
1995 }
1996