1 /*-------------------------------------------------------------------------
2 *
3 * snapbuild.c
4 *
5 * Infrastructure for building historic catalog snapshots based on contents
6 * of the WAL, for the purpose of decoding heapam.c style values in the
7 * WAL.
8 *
9 * NOTES:
10 *
11 * We build snapshots which can *only* be used to read catalog contents and we
12 * do so by reading and interpreting the WAL stream. The aim is to build a
13 * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 * at the time the XLogRecord was generated.
15 *
16 * To build the snapshots we reuse the infrastructure built for Hot
17 * Standby. The in-memory snapshots we build look different than HS' because
18 * we have different needs. To successfully decode data from the WAL we only
19 * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 * tables since the data we decode is wholly contained in the WAL
21 * records. Also, our snapshots need to be different in comparison to normal
22 * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 * pg_subtrans for information about committed transactions because they might
24 * commit in the future from the POV of the WAL entry we're currently
25 * decoding. This definition has the advantage that we only need to prevent
26 * removal of catalog rows, while normal table's rows can still be
27 * removed. This is achieved by using the replication slot mechanism.
28 *
29 * As the percentage of transactions modifying the catalog normally is fairly
30 * small in comparisons to ones only manipulating user data, we keep track of
31 * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 * track of all running transactions like it's done in a normal snapshot. Note
33 * that we're generally only looking at transactions that have acquired an
34 * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 * that we consider committed, everything else is considered aborted/in
36 * progress. That also allows us not to care about subtransactions before they
37 * have committed which means this module, in contrast to HS, doesn't have to
38 * care about suboverflowed subtransactions and similar.
39 *
40 * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 * transactions we need Snapshots that see intermediate versions of the
42 * catalog in a transaction. During normal operation this is achieved by using
43 * CommandIds/cmin/cmax. The problem with that however is that for space
44 * efficiency reasons only one value of that is stored
45 * (cf. combocid.c). Since ComboCids are only available in memory we log
46 * additional information which allows us to get the original (cmin, cmax)
47 * pair during visibility checks. Check the reorderbuffer.c's comment above
48 * ResolveCminCmaxDuringDecoding() for details.
49 *
50 * To facilitate all this we need our own visibility routine, as the normal
51 * ones are optimized for different usecases.
52 *
53 * To replace the normal catalog snapshots with decoding ones use the
54 * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55 *
56 *
57 *
58 * The snapbuild machinery is starting up in several stages, as illustrated
59 * by the following graph describing the SnapBuild->state transitions:
60 *
61 * +-------------------------+
62 * +----| START |-------------+
63 * | +-------------------------+ |
64 * | | |
65 * | | |
66 * | running_xacts #1 |
67 * | | |
68 * | | |
69 * | v |
70 * | +-------------------------+ v
71 * | | BUILDING_SNAPSHOT |------------>|
72 * | +-------------------------+ |
73 * | | |
74 * | | |
75 * | running_xacts #2, xacts from #1 finished |
76 * | | |
77 * | | |
78 * | v |
79 * | +-------------------------+ v
80 * | | FULL_SNAPSHOT |------------>|
81 * | +-------------------------+ |
82 * | | |
83 * running_xacts | saved snapshot
84 * with zero xacts | at running_xacts's lsn
85 * | | |
86 * | running_xacts with xacts from #2 finished |
87 * | | |
88 * | v |
89 * | +-------------------------+ |
90 * +--->|SNAPBUILD_CONSISTENT |<------------+
91 * +-------------------------+
92 *
93 * Initially the machinery is in the START stage. When an xl_running_xacts
94 * record is read that is sufficiently new (above the safe xmin horizon),
95 * there's a state transition. If there were no running xacts when the
96 * running_xacts record was generated, we'll directly go into CONSISTENT
97 * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98 * snapshot means that all transactions that start henceforth can be decoded
99 * in their entirety, but transactions that started previously can't. In
100 * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101 * running transactions have committed or aborted.
102 *
103 * Only transactions that commit after CONSISTENT state has been reached will
104 * be replayed, even though they might have started while still in
105 * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106 * changes has been exported, but all the following ones will be. That point
107 * is a convenient point to initialize replication from, which is why we
108 * export a snapshot at that point, which *can* be used to read normal data.
109 *
110 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
111 *
112 * IDENTIFICATION
113 * src/backend/replication/snapbuild.c
114 *
115 *-------------------------------------------------------------------------
116 */
117
118 #include "postgres.h"
119
120 #include <sys/stat.h>
121 #include <unistd.h>
122
123 #include "miscadmin.h"
124
125 #include "access/heapam_xlog.h"
126 #include "access/transam.h"
127 #include "access/xact.h"
128
129 #include "pgstat.h"
130
131 #include "replication/logical.h"
132 #include "replication/reorderbuffer.h"
133 #include "replication/snapbuild.h"
134
135 #include "utils/builtins.h"
136 #include "utils/memutils.h"
137 #include "utils/snapshot.h"
138 #include "utils/snapmgr.h"
139
140 #include "storage/block.h" /* debugging output */
141 #include "storage/fd.h"
142 #include "storage/lmgr.h"
143 #include "storage/proc.h"
144 #include "storage/procarray.h"
145 #include "storage/standby.h"
146
147 /*
148 * This struct contains the current state of the snapshot building
149 * machinery. Besides a forward declaration in the header, it is not exposed
150 * to the public, so we can easily change its contents.
151 */
152 struct SnapBuild
153 {
154 /* how far are we along building our first full snapshot */
155 SnapBuildState state;
156
157 /* private memory context used to allocate memory for this module. */
158 MemoryContext context;
159
160 /* all transactions < than this have committed/aborted */
161 TransactionId xmin;
162
163 /* all transactions >= than this are uncommitted */
164 TransactionId xmax;
165
166 /*
167 * Don't replay commits from an LSN < this LSN. This can be set externally
168 * but it will also be advanced (never retreat) from within snapbuild.c.
169 */
170 XLogRecPtr start_decoding_at;
171
172 /*
173 * Don't start decoding WAL until the "xl_running_xacts" information
174 * indicates there are no running xids with an xid smaller than this.
175 */
176 TransactionId initial_xmin_horizon;
177
178 /* Indicates if we are building full snapshot or just catalog one. */
179 bool building_full_snapshot;
180
181 /*
182 * Snapshot that's valid to see the catalog state seen at this moment.
183 */
184 Snapshot snapshot;
185
186 /*
187 * LSN of the last location we are sure a snapshot has been serialized to.
188 */
189 XLogRecPtr last_serialized_snapshot;
190
191 /*
192 * The reorderbuffer we need to update with usable snapshots et al.
193 */
194 ReorderBuffer *reorder;
195
196 /*
197 * Outdated: This struct isn't used for its original purpose anymore, but
198 * can't be removed / changed in a minor version, because it's stored
199 * on-disk.
200 */
201 struct
202 {
203 /*
204 * NB: This field is misused, until a major version can break on-disk
205 * compatibility. See SnapBuildNextPhaseAt() /
206 * SnapBuildStartNextPhaseAt().
207 */
208 TransactionId was_xmin;
209 TransactionId was_xmax;
210
211 size_t was_xcnt; /* number of used xip entries */
212 size_t was_xcnt_space; /* allocated size of xip */
213 TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
214 } was_running;
215
216 /*
217 * Array of transactions which could have catalog changes that committed
218 * between xmin and xmax.
219 */
220 struct
221 {
222 /* number of committed transactions */
223 size_t xcnt;
224
225 /* available space for committed transactions */
226 size_t xcnt_space;
227
228 /*
229 * Until we reach a CONSISTENT state, we record commits of all
230 * transactions, not just the catalog changing ones. Record when that
231 * changes so we know we cannot export a snapshot safely anymore.
232 */
233 bool includes_all_transactions;
234
235 /*
236 * Array of committed transactions that have modified the catalog.
237 *
238 * As this array is frequently modified we do *not* keep it in
239 * xidComparator order. Instead we sort the array when building &
240 * distributing a snapshot.
241 *
242 * TODO: It's unclear whether that reasoning has much merit. Every
243 * time we add something here after becoming consistent will also
244 * require distributing a snapshot. Storing them sorted would
245 * potentially also make it easier to purge (but more complicated wrt
246 * wraparound?). Should be improved if sorting while building the
247 * snapshot shows up in profiles.
248 */
249 TransactionId *xip;
250 } committed;
251 };
252
253 /*
254 * Starting a transaction -- which we need to do while exporting a snapshot --
255 * removes knowledge about the previously used resowner, so we save it here.
256 */
257 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
258 static bool ExportInProgress = false;
259
260 /* ->committed manipulation */
261 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
262
263 /* snapshot building/manipulation/distribution functions */
264 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
265
266 static void SnapBuildFreeSnapshot(Snapshot snap);
267
268 static void SnapBuildSnapIncRefcount(Snapshot snap);
269
270 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
271
272 /* xlog reading helper functions for SnapBuildProcessRecord */
273 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
274 static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
275
276 /* serialization functions */
277 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
278 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
279
280 /*
281 * Return TransactionId after which the next phase of initial snapshot
282 * building will happen.
283 */
284 static inline TransactionId
SnapBuildNextPhaseAt(SnapBuild * builder)285 SnapBuildNextPhaseAt(SnapBuild *builder)
286 {
287 /*
288 * For backward compatibility reasons this has to be stored in the wrongly
289 * named field. Will be fixed in next major version.
290 */
291 return builder->was_running.was_xmax;
292 }
293
294 /*
295 * Set TransactionId after which the next phase of initial snapshot building
296 * will happen.
297 */
298 static inline void
SnapBuildStartNextPhaseAt(SnapBuild * builder,TransactionId at)299 SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
300 {
301 /*
302 * For backward compatibility reasons this has to be stored in the wrongly
303 * named field. Will be fixed in next major version.
304 */
305 builder->was_running.was_xmax = at;
306 }
307
308 /*
309 * Allocate a new snapshot builder.
310 *
311 * xmin_horizon is the xid >= which we can be sure no catalog rows have been
312 * removed, start_lsn is the LSN >= we want to replay commits.
313 */
314 SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer * reorder,TransactionId xmin_horizon,XLogRecPtr start_lsn,bool need_full_snapshot)315 AllocateSnapshotBuilder(ReorderBuffer *reorder,
316 TransactionId xmin_horizon,
317 XLogRecPtr start_lsn,
318 bool need_full_snapshot)
319 {
320 MemoryContext context;
321 MemoryContext oldcontext;
322 SnapBuild *builder;
323
324 /* allocate memory in own context, to have better accountability */
325 context = AllocSetContextCreate(CurrentMemoryContext,
326 "snapshot builder context",
327 ALLOCSET_DEFAULT_SIZES);
328 oldcontext = MemoryContextSwitchTo(context);
329
330 builder = palloc0(sizeof(SnapBuild));
331
332 builder->state = SNAPBUILD_START;
333 builder->context = context;
334 builder->reorder = reorder;
335 /* Other struct members initialized by zeroing via palloc0 above */
336
337 builder->committed.xcnt = 0;
338 builder->committed.xcnt_space = 128; /* arbitrary number */
339 builder->committed.xip =
340 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
341 builder->committed.includes_all_transactions = true;
342
343 builder->initial_xmin_horizon = xmin_horizon;
344 builder->start_decoding_at = start_lsn;
345 builder->building_full_snapshot = need_full_snapshot;
346
347 MemoryContextSwitchTo(oldcontext);
348
349 return builder;
350 }
351
352 /*
353 * Free a snapshot builder.
354 */
355 void
FreeSnapshotBuilder(SnapBuild * builder)356 FreeSnapshotBuilder(SnapBuild *builder)
357 {
358 MemoryContext context = builder->context;
359
360 /* free snapshot explicitly, that contains some error checking */
361 if (builder->snapshot != NULL)
362 {
363 SnapBuildSnapDecRefcount(builder->snapshot);
364 builder->snapshot = NULL;
365 }
366
367 /* other resources are deallocated via memory context reset */
368 MemoryContextDelete(context);
369 }
370
371 /*
372 * Free an unreferenced snapshot that has previously been built by us.
373 */
374 static void
SnapBuildFreeSnapshot(Snapshot snap)375 SnapBuildFreeSnapshot(Snapshot snap)
376 {
377 /* make sure we don't get passed an external snapshot */
378 Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
379
380 /* make sure nobody modified our snapshot */
381 Assert(snap->curcid == FirstCommandId);
382 Assert(!snap->suboverflowed);
383 Assert(!snap->takenDuringRecovery);
384 Assert(snap->regd_count == 0);
385
386 /* slightly more likely, so it's checked even without c-asserts */
387 if (snap->copied)
388 elog(ERROR, "cannot free a copied snapshot");
389
390 if (snap->active_count)
391 elog(ERROR, "cannot free an active snapshot");
392
393 pfree(snap);
394 }
395
396 /*
397 * In which state of snapshot building are we?
398 */
399 SnapBuildState
SnapBuildCurrentState(SnapBuild * builder)400 SnapBuildCurrentState(SnapBuild *builder)
401 {
402 return builder->state;
403 }
404
405 /*
406 * Should the contents of transaction ending at 'ptr' be decoded?
407 */
408 bool
SnapBuildXactNeedsSkip(SnapBuild * builder,XLogRecPtr ptr)409 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
410 {
411 return ptr < builder->start_decoding_at;
412 }
413
414 /*
415 * Increase refcount of a snapshot.
416 *
417 * This is used when handing out a snapshot to some external resource or when
418 * adding a Snapshot as builder->snapshot.
419 */
420 static void
SnapBuildSnapIncRefcount(Snapshot snap)421 SnapBuildSnapIncRefcount(Snapshot snap)
422 {
423 snap->active_count++;
424 }
425
426 /*
427 * Decrease refcount of a snapshot and free if the refcount reaches zero.
428 *
429 * Externally visible, so that external resources that have been handed an
430 * IncRef'ed Snapshot can adjust its refcount easily.
431 */
432 void
SnapBuildSnapDecRefcount(Snapshot snap)433 SnapBuildSnapDecRefcount(Snapshot snap)
434 {
435 /* make sure we don't get passed an external snapshot */
436 Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
437
438 /* make sure nobody modified our snapshot */
439 Assert(snap->curcid == FirstCommandId);
440 Assert(!snap->suboverflowed);
441 Assert(!snap->takenDuringRecovery);
442
443 Assert(snap->regd_count == 0);
444
445 Assert(snap->active_count > 0);
446
447 /* slightly more likely, so it's checked even without casserts */
448 if (snap->copied)
449 elog(ERROR, "cannot free a copied snapshot");
450
451 snap->active_count--;
452 if (snap->active_count == 0)
453 SnapBuildFreeSnapshot(snap);
454 }
455
456 /*
457 * Build a new snapshot, based on currently committed catalog-modifying
458 * transactions.
459 *
460 * In-progress transactions with catalog access are *not* allowed to modify
461 * these snapshots; they have to copy them and fill in appropriate ->curcid
462 * and ->subxip/subxcnt values.
463 */
464 static Snapshot
SnapBuildBuildSnapshot(SnapBuild * builder)465 SnapBuildBuildSnapshot(SnapBuild *builder)
466 {
467 Snapshot snapshot;
468 Size ssize;
469
470 Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
471
472 ssize = sizeof(SnapshotData)
473 + sizeof(TransactionId) * builder->committed.xcnt
474 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
475
476 snapshot = MemoryContextAllocZero(builder->context, ssize);
477
478 snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
479
480 /*
481 * We misuse the original meaning of SnapshotData's xip and subxip fields
482 * to make the more fitting for our needs.
483 *
484 * In the 'xip' array we store transactions that have to be treated as
485 * committed. Since we will only ever look at tuples from transactions
486 * that have modified the catalog it's more efficient to store those few
487 * that exist between xmin and xmax (frequently there are none).
488 *
489 * Snapshots that are used in transactions that have modified the catalog
490 * also use the 'subxip' array to store their toplevel xid and all the
491 * subtransaction xids so we can recognize when we need to treat rows as
492 * visible that are not in xip but still need to be visible. Subxip only
493 * gets filled when the transaction is copied into the context of a
494 * catalog modifying transaction since we otherwise share a snapshot
495 * between transactions. As long as a txn hasn't modified the catalog it
496 * doesn't need to treat any uncommitted rows as visible, so there is no
497 * need for those xids.
498 *
499 * Both arrays are qsort'ed so that we can use bsearch() on them.
500 */
501 Assert(TransactionIdIsNormal(builder->xmin));
502 Assert(TransactionIdIsNormal(builder->xmax));
503
504 snapshot->xmin = builder->xmin;
505 snapshot->xmax = builder->xmax;
506
507 /* store all transactions to be treated as committed by this snapshot */
508 snapshot->xip =
509 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
510 snapshot->xcnt = builder->committed.xcnt;
511 memcpy(snapshot->xip,
512 builder->committed.xip,
513 builder->committed.xcnt * sizeof(TransactionId));
514
515 /* sort so we can bsearch() */
516 qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
517
518 /*
519 * Initially, subxip is empty, i.e. it's a snapshot to be used by
520 * transactions that don't modify the catalog. Will be filled by
521 * ReorderBufferCopySnap() if necessary.
522 */
523 snapshot->subxcnt = 0;
524 snapshot->subxip = NULL;
525
526 snapshot->suboverflowed = false;
527 snapshot->takenDuringRecovery = false;
528 snapshot->copied = false;
529 snapshot->curcid = FirstCommandId;
530 snapshot->active_count = 0;
531 snapshot->regd_count = 0;
532
533 return snapshot;
534 }
535
536 /*
537 * Build the initial slot snapshot and convert it to a normal snapshot that
538 * is understood by HeapTupleSatisfiesMVCC.
539 *
540 * The snapshot will be usable directly in current transaction or exported
541 * for loading in different transaction.
542 */
543 Snapshot
SnapBuildInitialSnapshot(SnapBuild * builder)544 SnapBuildInitialSnapshot(SnapBuild *builder)
545 {
546 Snapshot snap;
547 TransactionId xid;
548 TransactionId *newxip;
549 int newxcnt = 0;
550
551 Assert(!FirstSnapshotSet);
552 Assert(XactIsoLevel == XACT_REPEATABLE_READ);
553
554 if (builder->state != SNAPBUILD_CONSISTENT)
555 elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
556
557 if (!builder->committed.includes_all_transactions)
558 elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
559
560 /* so we don't overwrite the existing value */
561 if (TransactionIdIsValid(MyPgXact->xmin))
562 elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
563
564 snap = SnapBuildBuildSnapshot(builder);
565
566 /*
567 * We know that snap->xmin is alive, enforced by the logical xmin
568 * mechanism. Due to that we can do this without locks, we're only
569 * changing our own value.
570 */
571 #ifdef USE_ASSERT_CHECKING
572 {
573 TransactionId safeXid;
574
575 LWLockAcquire(ProcArrayLock, LW_SHARED);
576 safeXid = GetOldestSafeDecodingTransactionId(false);
577 LWLockRelease(ProcArrayLock);
578
579 Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
580 }
581 #endif
582
583 MyPgXact->xmin = snap->xmin;
584
585 /* allocate in transaction context */
586 newxip = (TransactionId *)
587 palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
588
589 /*
590 * snapbuild.c builds transactions in an "inverted" manner, which means it
591 * stores committed transactions in ->xip, not ones in progress. Build a
592 * classical snapshot by marking all non-committed transactions as
593 * in-progress. This can be expensive.
594 */
595 for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
596 {
597 void *test;
598
599 /*
600 * Check whether transaction committed using the decoding snapshot
601 * meaning of ->xip.
602 */
603 test = bsearch(&xid, snap->xip, snap->xcnt,
604 sizeof(TransactionId), xidComparator);
605
606 if (test == NULL)
607 {
608 if (newxcnt >= GetMaxSnapshotXidCount())
609 ereport(ERROR,
610 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
611 errmsg("initial slot snapshot too large")));
612
613 newxip[newxcnt++] = xid;
614 }
615
616 TransactionIdAdvance(xid);
617 }
618
619 /* adjust remaining snapshot fields as needed */
620 snap->snapshot_type = SNAPSHOT_MVCC;
621 snap->xcnt = newxcnt;
622 snap->xip = newxip;
623
624 return snap;
625 }
626
627 /*
628 * Export a snapshot so it can be set in another session with SET TRANSACTION
629 * SNAPSHOT.
630 *
631 * For that we need to start a transaction in the current backend as the
632 * importing side checks whether the source transaction is still open to make
633 * sure the xmin horizon hasn't advanced since then.
634 */
635 const char *
SnapBuildExportSnapshot(SnapBuild * builder)636 SnapBuildExportSnapshot(SnapBuild *builder)
637 {
638 Snapshot snap;
639 char *snapname;
640
641 if (IsTransactionOrTransactionBlock())
642 elog(ERROR, "cannot export a snapshot from within a transaction");
643
644 if (SavedResourceOwnerDuringExport)
645 elog(ERROR, "can only export one snapshot at a time");
646
647 SavedResourceOwnerDuringExport = CurrentResourceOwner;
648 ExportInProgress = true;
649
650 StartTransactionCommand();
651
652 /* There doesn't seem to a nice API to set these */
653 XactIsoLevel = XACT_REPEATABLE_READ;
654 XactReadOnly = true;
655
656 snap = SnapBuildInitialSnapshot(builder);
657
658 /*
659 * now that we've built a plain snapshot, make it active and use the
660 * normal mechanisms for exporting it
661 */
662 snapname = ExportSnapshot(snap);
663
664 ereport(LOG,
665 (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
666 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
667 snap->xcnt,
668 snapname, snap->xcnt)));
669 return snapname;
670 }
671
672 /*
673 * Ensure there is a snapshot and if not build one for current transaction.
674 */
675 Snapshot
SnapBuildGetOrBuildSnapshot(SnapBuild * builder,TransactionId xid)676 SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
677 {
678 Assert(builder->state == SNAPBUILD_CONSISTENT);
679
680 /* only build a new snapshot if we don't have a prebuilt one */
681 if (builder->snapshot == NULL)
682 {
683 builder->snapshot = SnapBuildBuildSnapshot(builder);
684 /* increase refcount for the snapshot builder */
685 SnapBuildSnapIncRefcount(builder->snapshot);
686 }
687
688 return builder->snapshot;
689 }
690
691 /*
692 * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
693 * any. Aborts the previously started transaction and resets the resource
694 * owner back to its original value.
695 */
696 void
SnapBuildClearExportedSnapshot(void)697 SnapBuildClearExportedSnapshot(void)
698 {
699 ResourceOwner tmpResOwner;
700
701 /* nothing exported, that is the usual case */
702 if (!ExportInProgress)
703 return;
704
705 if (!IsTransactionState())
706 elog(ERROR, "clearing exported snapshot in wrong transaction state");
707
708 /*
709 * AbortCurrentTransaction() takes care of resetting the snapshot state,
710 * so remember SavedResourceOwnerDuringExport.
711 */
712 tmpResOwner = SavedResourceOwnerDuringExport;
713
714 /* make sure nothing could have ever happened */
715 AbortCurrentTransaction();
716
717 CurrentResourceOwner = tmpResOwner;
718 }
719
720 /*
721 * Clear snapshot export state during transaction abort.
722 */
723 void
SnapBuildResetExportedSnapshotState(void)724 SnapBuildResetExportedSnapshotState(void)
725 {
726 SavedResourceOwnerDuringExport = NULL;
727 ExportInProgress = false;
728 }
729
730 /*
731 * Handle the effects of a single heap change, appropriate to the current state
732 * of the snapshot builder and returns whether changes made at (xid, lsn) can
733 * be decoded.
734 */
735 bool
SnapBuildProcessChange(SnapBuild * builder,TransactionId xid,XLogRecPtr lsn)736 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
737 {
738 /*
739 * We can't handle data in transactions if we haven't built a snapshot
740 * yet, so don't store them.
741 */
742 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
743 return false;
744
745 /*
746 * No point in keeping track of changes in transactions that we don't have
747 * enough information about to decode. This means that they started before
748 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
749 */
750 if (builder->state < SNAPBUILD_CONSISTENT &&
751 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
752 return false;
753
754 /*
755 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
756 * be needed to decode the change we're currently processing.
757 */
758 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
759 {
760 /* only build a new snapshot if we don't have a prebuilt one */
761 if (builder->snapshot == NULL)
762 {
763 builder->snapshot = SnapBuildBuildSnapshot(builder);
764 /* increase refcount for the snapshot builder */
765 SnapBuildSnapIncRefcount(builder->snapshot);
766 }
767
768 /*
769 * Increase refcount for the transaction we're handing the snapshot
770 * out to.
771 */
772 SnapBuildSnapIncRefcount(builder->snapshot);
773 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
774 builder->snapshot);
775 }
776
777 return true;
778 }
779
780 /*
781 * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
782 * This implies that a transaction has done some form of write to system
783 * catalogs.
784 */
785 void
SnapBuildProcessNewCid(SnapBuild * builder,TransactionId xid,XLogRecPtr lsn,xl_heap_new_cid * xlrec)786 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
787 XLogRecPtr lsn, xl_heap_new_cid *xlrec)
788 {
789 CommandId cid;
790
791 /*
792 * we only log new_cid's if a catalog tuple was modified, so mark the
793 * transaction as containing catalog modifications
794 */
795 ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
796
797 ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
798 xlrec->target_node, xlrec->target_tid,
799 xlrec->cmin, xlrec->cmax,
800 xlrec->combocid);
801
802 /* figure out new command id */
803 if (xlrec->cmin != InvalidCommandId &&
804 xlrec->cmax != InvalidCommandId)
805 cid = Max(xlrec->cmin, xlrec->cmax);
806 else if (xlrec->cmax != InvalidCommandId)
807 cid = xlrec->cmax;
808 else if (xlrec->cmin != InvalidCommandId)
809 cid = xlrec->cmin;
810 else
811 {
812 cid = InvalidCommandId; /* silence compiler */
813 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
814 }
815
816 ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
817 }
818
819 /*
820 * Add a new Snapshot to all transactions we're decoding that currently are
821 * in-progress so they can see new catalog contents made by the transaction
822 * that just committed. This is necessary because those in-progress
823 * transactions will use the new catalog's contents from here on (at the very
824 * least everything they do needs to be compatible with newer catalog
825 * contents).
826 */
827 static void
SnapBuildDistributeNewCatalogSnapshot(SnapBuild * builder,XLogRecPtr lsn)828 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
829 {
830 dlist_iter txn_i;
831 ReorderBufferTXN *txn;
832
833 /*
834 * Iterate through all toplevel transactions. This can include
835 * subtransactions which we just don't yet know to be that, but that's
836 * fine, they will just get an unnecessary snapshot queued.
837 */
838 dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
839 {
840 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
841
842 Assert(TransactionIdIsValid(txn->xid));
843
844 /*
845 * If we don't have a base snapshot yet, there are no changes in this
846 * transaction which in turn implies we don't yet need a snapshot at
847 * all. We'll add a snapshot when the first change gets queued.
848 *
849 * NB: This works correctly even for subtransactions because
850 * ReorderBufferAssignChild() takes care to transfer the base snapshot
851 * to the top-level transaction, and while iterating the changequeue
852 * we'll get the change from the subtxn.
853 */
854 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
855 continue;
856
857 elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
858 txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
859
860 /*
861 * increase the snapshot's refcount for the transaction we are handing
862 * it out to
863 */
864 SnapBuildSnapIncRefcount(builder->snapshot);
865 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
866 builder->snapshot);
867 }
868 }
869
870 /*
871 * Keep track of a new catalog changing transaction that has committed.
872 */
873 static void
SnapBuildAddCommittedTxn(SnapBuild * builder,TransactionId xid)874 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
875 {
876 Assert(TransactionIdIsValid(xid));
877
878 if (builder->committed.xcnt == builder->committed.xcnt_space)
879 {
880 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
881
882 elog(DEBUG1, "increasing space for committed transactions to %u",
883 (uint32) builder->committed.xcnt_space);
884
885 builder->committed.xip = repalloc(builder->committed.xip,
886 builder->committed.xcnt_space * sizeof(TransactionId));
887 }
888
889 /*
890 * TODO: It might make sense to keep the array sorted here instead of
891 * doing it every time we build a new snapshot. On the other hand this
892 * gets called repeatedly when a transaction with subtransactions commits.
893 */
894 builder->committed.xip[builder->committed.xcnt++] = xid;
895 }
896
897 /*
898 * Remove knowledge about transactions we treat as committed that are smaller
899 * than ->xmin. Those won't ever get checked via the ->committed array but via
900 * the clog machinery, so we don't need to waste memory on them.
901 */
902 static void
SnapBuildPurgeCommittedTxn(SnapBuild * builder)903 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
904 {
905 int off;
906 TransactionId *workspace;
907 int surviving_xids = 0;
908
909 /* not ready yet */
910 if (!TransactionIdIsNormal(builder->xmin))
911 return;
912
913 /* TODO: Neater algorithm than just copying and iterating? */
914 workspace =
915 MemoryContextAlloc(builder->context,
916 builder->committed.xcnt * sizeof(TransactionId));
917
918 /* copy xids that still are interesting to workspace */
919 for (off = 0; off < builder->committed.xcnt; off++)
920 {
921 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
922 builder->xmin))
923 ; /* remove */
924 else
925 workspace[surviving_xids++] = builder->committed.xip[off];
926 }
927
928 /* copy workspace back to persistent state */
929 memcpy(builder->committed.xip, workspace,
930 surviving_xids * sizeof(TransactionId));
931
932 elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
933 (uint32) builder->committed.xcnt, (uint32) surviving_xids,
934 builder->xmin, builder->xmax);
935 builder->committed.xcnt = surviving_xids;
936
937 pfree(workspace);
938 }
939
940 /*
941 * Handle everything that needs to be done when a transaction commits
942 */
943 void
SnapBuildCommitTxn(SnapBuild * builder,XLogRecPtr lsn,TransactionId xid,int nsubxacts,TransactionId * subxacts)944 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
945 int nsubxacts, TransactionId *subxacts)
946 {
947 int nxact;
948
949 bool needs_snapshot = false;
950 bool needs_timetravel = false;
951 bool sub_needs_timetravel = false;
952
953 TransactionId xmax = xid;
954
955 /*
956 * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
957 * will they be part of a snapshot. So we don't need to record anything.
958 */
959 if (builder->state == SNAPBUILD_START ||
960 (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
961 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
962 {
963 /* ensure that only commits after this are getting replayed */
964 if (builder->start_decoding_at <= lsn)
965 builder->start_decoding_at = lsn + 1;
966 return;
967 }
968
969 if (builder->state < SNAPBUILD_CONSISTENT)
970 {
971 /* ensure that only commits after this are getting replayed */
972 if (builder->start_decoding_at <= lsn)
973 builder->start_decoding_at = lsn + 1;
974
975 /*
976 * If building an exportable snapshot, force xid to be tracked, even
977 * if the transaction didn't modify the catalog.
978 */
979 if (builder->building_full_snapshot)
980 {
981 needs_timetravel = true;
982 }
983 }
984
985 for (nxact = 0; nxact < nsubxacts; nxact++)
986 {
987 TransactionId subxid = subxacts[nxact];
988
989 /*
990 * Add subtransaction to base snapshot if catalog modifying, we don't
991 * distinguish to toplevel transactions there.
992 */
993 if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
994 {
995 sub_needs_timetravel = true;
996 needs_snapshot = true;
997
998 elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
999 xid, subxid);
1000
1001 SnapBuildAddCommittedTxn(builder, subxid);
1002
1003 if (NormalTransactionIdFollows(subxid, xmax))
1004 xmax = subxid;
1005 }
1006
1007 /*
1008 * If we're forcing timetravel we also need visibility information
1009 * about subtransaction, so keep track of subtransaction's state, even
1010 * if not catalog modifying. Don't need to distribute a snapshot in
1011 * that case.
1012 */
1013 else if (needs_timetravel)
1014 {
1015 SnapBuildAddCommittedTxn(builder, subxid);
1016 if (NormalTransactionIdFollows(subxid, xmax))
1017 xmax = subxid;
1018 }
1019 }
1020
1021 /* if top-level modified catalog, it'll need a snapshot */
1022 if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1023 {
1024 elog(DEBUG2, "found top level transaction %u, with catalog changes",
1025 xid);
1026 needs_snapshot = true;
1027 needs_timetravel = true;
1028 SnapBuildAddCommittedTxn(builder, xid);
1029 }
1030 else if (sub_needs_timetravel)
1031 {
1032 /* track toplevel txn as well, subxact alone isn't meaningful */
1033 SnapBuildAddCommittedTxn(builder, xid);
1034 }
1035 else if (needs_timetravel)
1036 {
1037 elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1038
1039 SnapBuildAddCommittedTxn(builder, xid);
1040 }
1041
1042 if (!needs_timetravel)
1043 {
1044 /* record that we cannot export a general snapshot anymore */
1045 builder->committed.includes_all_transactions = false;
1046 }
1047
1048 Assert(!needs_snapshot || needs_timetravel);
1049
1050 /*
1051 * Adjust xmax of the snapshot builder, we only do that for committed,
1052 * catalog modifying, transactions, everything else isn't interesting for
1053 * us since we'll never look at the respective rows.
1054 */
1055 if (needs_timetravel &&
1056 (!TransactionIdIsValid(builder->xmax) ||
1057 TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1058 {
1059 builder->xmax = xmax;
1060 TransactionIdAdvance(builder->xmax);
1061 }
1062
1063 /* if there's any reason to build a historic snapshot, do so now */
1064 if (needs_snapshot)
1065 {
1066 /*
1067 * If we haven't built a complete snapshot yet there's no need to hand
1068 * it out, it wouldn't (and couldn't) be used anyway.
1069 */
1070 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1071 return;
1072
1073 /*
1074 * Decrease the snapshot builder's refcount of the old snapshot, note
1075 * that it still will be used if it has been handed out to the
1076 * reorderbuffer earlier.
1077 */
1078 if (builder->snapshot)
1079 SnapBuildSnapDecRefcount(builder->snapshot);
1080
1081 builder->snapshot = SnapBuildBuildSnapshot(builder);
1082
1083 /* we might need to execute invalidations, add snapshot */
1084 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1085 {
1086 SnapBuildSnapIncRefcount(builder->snapshot);
1087 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1088 builder->snapshot);
1089 }
1090
1091 /* refcount of the snapshot builder for the new snapshot */
1092 SnapBuildSnapIncRefcount(builder->snapshot);
1093
1094 /* add a new catalog snapshot to all currently running transactions */
1095 SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1096 }
1097 }
1098
1099
1100 /* -----------------------------------
1101 * Snapshot building functions dealing with xlog records
1102 * -----------------------------------
1103 */
1104
1105 /*
1106 * Process a running xacts record, and use its information to first build a
1107 * historic snapshot and later to release resources that aren't needed
1108 * anymore.
1109 */
1110 void
SnapBuildProcessRunningXacts(SnapBuild * builder,XLogRecPtr lsn,xl_running_xacts * running)1111 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1112 {
1113 ReorderBufferTXN *txn;
1114 TransactionId xmin;
1115
1116 /*
1117 * If we're not consistent yet, inspect the record to see whether it
1118 * allows to get closer to being consistent. If we are consistent, dump
1119 * our snapshot so others or we, after a restart, can use it.
1120 */
1121 if (builder->state < SNAPBUILD_CONSISTENT)
1122 {
1123 /* returns false if there's no point in performing cleanup just yet */
1124 if (!SnapBuildFindSnapshot(builder, lsn, running))
1125 return;
1126 }
1127 else
1128 SnapBuildSerialize(builder, lsn);
1129
1130 /*
1131 * Update range of interesting xids based on the running xacts
1132 * information. We don't increase ->xmax using it, because once we are in
1133 * a consistent state we can do that ourselves and much more efficiently
1134 * so, because we only need to do it for catalog transactions since we
1135 * only ever look at those.
1136 *
1137 * NB: We only increase xmax when a catalog modifying transaction commits
1138 * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1139 * xmin, which looks odd but is correct and actually more efficient, since
1140 * we hit fast paths in heapam_visibility.c.
1141 */
1142 builder->xmin = running->oldestRunningXid;
1143
1144 /* Remove transactions we don't need to keep track off anymore */
1145 SnapBuildPurgeCommittedTxn(builder);
1146
1147 /*
1148 * Advance the xmin limit for the current replication slot, to allow
1149 * vacuum to clean up the tuples this slot has been protecting.
1150 *
1151 * The reorderbuffer might have an xmin among the currently running
1152 * snapshots; use it if so. If not, we need only consider the snapshots
1153 * we'll produce later, which can't be less than the oldest running xid in
1154 * the record we're reading now.
1155 */
1156 xmin = ReorderBufferGetOldestXmin(builder->reorder);
1157 if (xmin == InvalidTransactionId)
1158 xmin = running->oldestRunningXid;
1159 elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1160 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1161 LogicalIncreaseXminForSlot(lsn, xmin);
1162
1163 /*
1164 * Also tell the slot where we can restart decoding from. We don't want to
1165 * do that after every commit because changing that implies an fsync of
1166 * the logical slot's state file, so we only do it every time we see a
1167 * running xacts record.
1168 *
1169 * Do so by looking for the oldest in progress transaction (determined by
1170 * the first LSN of any of its relevant records). Every transaction
1171 * remembers the last location we stored the snapshot to disk before its
1172 * beginning. That point is where we can restart from.
1173 */
1174
1175 /*
1176 * Can't know about a serialized snapshot's location if we're not
1177 * consistent.
1178 */
1179 if (builder->state < SNAPBUILD_CONSISTENT)
1180 return;
1181
1182 txn = ReorderBufferGetOldestTXN(builder->reorder);
1183
1184 /*
1185 * oldest ongoing txn might have started when we didn't yet serialize
1186 * anything because we hadn't reached a consistent state yet.
1187 */
1188 if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1189 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1190
1191 /*
1192 * No in-progress transaction, can reuse the last serialized snapshot if
1193 * we have one.
1194 */
1195 else if (txn == NULL &&
1196 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1197 builder->last_serialized_snapshot != InvalidXLogRecPtr)
1198 LogicalIncreaseRestartDecodingForSlot(lsn,
1199 builder->last_serialized_snapshot);
1200 }
1201
1202
1203 /*
1204 * Build the start of a snapshot that's capable of decoding the catalog.
1205 *
1206 * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1207 * consistent.
1208 *
1209 * Returns true if there is a point in performing internal maintenance/cleanup
1210 * using the xl_running_xacts record.
1211 */
1212 static bool
SnapBuildFindSnapshot(SnapBuild * builder,XLogRecPtr lsn,xl_running_xacts * running)1213 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1214 {
1215 /* ---
1216 * Build catalog decoding snapshot incrementally using information about
1217 * the currently running transactions. There are several ways to do that:
1218 *
1219 * a) There were no running transactions when the xl_running_xacts record
1220 * was inserted, jump to CONSISTENT immediately. We might find such a
1221 * state while waiting on c)'s sub-states.
1222 *
1223 * b) This (in a previous run) or another decoding slot serialized a
1224 * snapshot to disk that we can use. Can't use this method for the
1225 * initial snapshot when slot is being created and needs full snapshot
1226 * for export or direct use, as that snapshot will only contain catalog
1227 * modifying transactions.
1228 *
1229 * c) First incrementally build a snapshot for catalog tuples
1230 * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1231 * transactions to finish. Every transaction starting after that
1232 * (FULL_SNAPSHOT state), has enough information to be decoded. But
1233 * for older running transactions no viable snapshot exists yet, so
1234 * CONSISTENT will only be reached once all of those have finished.
1235 * ---
1236 */
1237
1238 /*
1239 * xl_running_xact record is older than what we can use, we might not have
1240 * all necessary catalog rows anymore.
1241 */
1242 if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1243 NormalTransactionIdPrecedes(running->oldestRunningXid,
1244 builder->initial_xmin_horizon))
1245 {
1246 ereport(DEBUG1,
1247 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1248 (uint32) (lsn >> 32), (uint32) lsn),
1249 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1250 builder->initial_xmin_horizon, running->oldestRunningXid)));
1251
1252
1253 SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1254
1255 return true;
1256 }
1257
1258 /*
1259 * a) No transaction were running, we can jump to consistent.
1260 *
1261 * This is not affected by races around xl_running_xacts, because we can
1262 * miss transaction commits, but currently not transactions starting.
1263 *
1264 * NB: We might have already started to incrementally assemble a snapshot,
1265 * so we need to be careful to deal with that.
1266 */
1267 if (running->oldestRunningXid == running->nextXid)
1268 {
1269 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1270 builder->start_decoding_at <= lsn)
1271 /* can decode everything after this */
1272 builder->start_decoding_at = lsn + 1;
1273
1274 /* As no transactions were running xmin/xmax can be trivially set. */
1275 builder->xmin = running->nextXid; /* < are finished */
1276 builder->xmax = running->nextXid; /* >= are running */
1277
1278 /* so we can safely use the faster comparisons */
1279 Assert(TransactionIdIsNormal(builder->xmin));
1280 Assert(TransactionIdIsNormal(builder->xmax));
1281
1282 builder->state = SNAPBUILD_CONSISTENT;
1283 SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1284
1285 ereport(LOG,
1286 (errmsg("logical decoding found consistent point at %X/%X",
1287 (uint32) (lsn >> 32), (uint32) lsn),
1288 errdetail("There are no running transactions.")));
1289
1290 return false;
1291 }
1292 /* b) valid on disk state and not building full snapshot */
1293 else if (!builder->building_full_snapshot &&
1294 SnapBuildRestore(builder, lsn))
1295 {
1296 /* there won't be any state to cleanup */
1297 return false;
1298 }
1299
1300 /*
1301 * c) transition from START to BUILDING_SNAPSHOT.
1302 *
1303 * In START state, and a xl_running_xacts record with running xacts is
1304 * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1305 * record xl_running_xacts->nextXid. Once all running xacts have finished
1306 * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1307 * might look that we could use xl_running_xact's ->xids information to
1308 * get there quicker, but that is problematic because transactions marked
1309 * as running, might already have inserted their commit record - it's
1310 * infeasible to change that with locking.
1311 */
1312 else if (builder->state == SNAPBUILD_START)
1313 {
1314 builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1315 SnapBuildStartNextPhaseAt(builder, running->nextXid);
1316
1317 /*
1318 * Start with an xmin/xmax that's correct for future, when all the
1319 * currently running transactions have finished. We'll update both
1320 * while waiting for the pending transactions to finish.
1321 */
1322 builder->xmin = running->nextXid; /* < are finished */
1323 builder->xmax = running->nextXid; /* >= are running */
1324
1325 /* so we can safely use the faster comparisons */
1326 Assert(TransactionIdIsNormal(builder->xmin));
1327 Assert(TransactionIdIsNormal(builder->xmax));
1328
1329 ereport(LOG,
1330 (errmsg("logical decoding found initial starting point at %X/%X",
1331 (uint32) (lsn >> 32), (uint32) lsn),
1332 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1333 running->xcnt, running->nextXid)));
1334
1335 SnapBuildWaitSnapshot(running, running->nextXid);
1336 }
1337
1338 /*
1339 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1340 *
1341 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1342 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1343 * means all transactions starting afterwards have enough information to
1344 * be decoded. Switch to FULL_SNAPSHOT.
1345 */
1346 else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1347 TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1348 running->oldestRunningXid))
1349 {
1350 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1351 SnapBuildStartNextPhaseAt(builder, running->nextXid);
1352
1353 ereport(LOG,
1354 (errmsg("logical decoding found initial consistent point at %X/%X",
1355 (uint32) (lsn >> 32), (uint32) lsn),
1356 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1357 running->xcnt, running->nextXid)));
1358
1359 SnapBuildWaitSnapshot(running, running->nextXid);
1360 }
1361
1362 /*
1363 * c) transition from FULL_SNAPSHOT to CONSISTENT.
1364 *
1365 * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
1366 * oldestRunningXid is >= than nextXid from when we switched to
1367 * FULL_SNAPSHOT. This means all transactions that are currently in
1368 * progress have a catalog snapshot, and all their changes have been
1369 * collected. Switch to CONSISTENT.
1370 */
1371 else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1372 TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1373 running->oldestRunningXid))
1374 {
1375 builder->state = SNAPBUILD_CONSISTENT;
1376 SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1377
1378 ereport(LOG,
1379 (errmsg("logical decoding found consistent point at %X/%X",
1380 (uint32) (lsn >> 32), (uint32) lsn),
1381 errdetail("There are no old transactions anymore.")));
1382 }
1383
1384 /*
1385 * We already started to track running xacts and need to wait for all
1386 * in-progress ones to finish. We fall through to the normal processing of
1387 * records so incremental cleanup can be performed.
1388 */
1389 return true;
1390
1391 }
1392
1393 /* ---
1394 * Iterate through xids in record, wait for all older than the cutoff to
1395 * finish. Then, if possible, log a new xl_running_xacts record.
1396 *
1397 * This isn't required for the correctness of decoding, but to:
1398 * a) allow isolationtester to notice that we're currently waiting for
1399 * something.
1400 * b) log a new xl_running_xacts record where it'd be helpful, without having
1401 * to write for bgwriter or checkpointer.
1402 * ---
1403 */
1404 static void
SnapBuildWaitSnapshot(xl_running_xacts * running,TransactionId cutoff)1405 SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1406 {
1407 int off;
1408
1409 for (off = 0; off < running->xcnt; off++)
1410 {
1411 TransactionId xid = running->xids[off];
1412
1413 /*
1414 * Upper layers should prevent that we ever need to wait on ourselves.
1415 * Check anyway, since failing to do so would either result in an
1416 * endless wait or an Assert() failure.
1417 */
1418 if (TransactionIdIsCurrentTransactionId(xid))
1419 elog(ERROR, "waiting for ourselves");
1420
1421 if (TransactionIdFollows(xid, cutoff))
1422 continue;
1423
1424 XactLockTableWait(xid, NULL, NULL, XLTW_None);
1425 }
1426
1427 /*
1428 * All transactions we needed to finish finished - try to ensure there is
1429 * another xl_running_xacts record in a timely manner, without having to
1430 * write for bgwriter or checkpointer to log one. During recovery we
1431 * can't enforce that, so we'll have to wait.
1432 */
1433 if (!RecoveryInProgress())
1434 {
1435 LogStandbySnapshot();
1436 }
1437 }
1438
1439 /* -----------------------------------
1440 * Snapshot serialization support
1441 * -----------------------------------
1442 */
1443
1444 /*
1445 * We store current state of struct SnapBuild on disk in the following manner:
1446 *
1447 * struct SnapBuildOnDisk;
1448 * TransactionId * running.xcnt_space;
1449 * TransactionId * committed.xcnt; (*not xcnt_space*)
1450 *
1451 */
1452 typedef struct SnapBuildOnDisk
1453 {
1454 /* first part of this struct needs to be version independent */
1455
1456 /* data not covered by checksum */
1457 uint32 magic;
1458 pg_crc32c checksum;
1459
1460 /* data covered by checksum */
1461
1462 /* version, in case we want to support pg_upgrade */
1463 uint32 version;
1464 /* how large is the on disk data, excluding the constant sized part */
1465 uint32 length;
1466
1467 /* version dependent part */
1468 SnapBuild builder;
1469
1470 /* variable amount of TransactionIds follows */
1471 } SnapBuildOnDisk;
1472
1473 #define SnapBuildOnDiskConstantSize \
1474 offsetof(SnapBuildOnDisk, builder)
1475 #define SnapBuildOnDiskNotChecksummedSize \
1476 offsetof(SnapBuildOnDisk, version)
1477
1478 #define SNAPBUILD_MAGIC 0x51A1E001
1479 #define SNAPBUILD_VERSION 2
1480
1481 /*
1482 * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1483 *
1484 * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1485 * a record that's a potential location for a serialized snapshot.
1486 */
1487 void
SnapBuildSerializationPoint(SnapBuild * builder,XLogRecPtr lsn)1488 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1489 {
1490 if (builder->state < SNAPBUILD_CONSISTENT)
1491 SnapBuildRestore(builder, lsn);
1492 else
1493 SnapBuildSerialize(builder, lsn);
1494 }
1495
1496 /*
1497 * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1498 * been done by another decoding process.
1499 */
1500 static void
SnapBuildSerialize(SnapBuild * builder,XLogRecPtr lsn)1501 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1502 {
1503 Size needed_length;
1504 SnapBuildOnDisk *ondisk = NULL;
1505 char *ondisk_c;
1506 int fd;
1507 char tmppath[MAXPGPATH];
1508 char path[MAXPGPATH];
1509 int ret;
1510 struct stat stat_buf;
1511 Size sz;
1512
1513 Assert(lsn != InvalidXLogRecPtr);
1514 Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1515 builder->last_serialized_snapshot <= lsn);
1516
1517 /*
1518 * no point in serializing if we cannot continue to work immediately after
1519 * restoring the snapshot
1520 */
1521 if (builder->state < SNAPBUILD_CONSISTENT)
1522 return;
1523
1524 /*
1525 * We identify snapshots by the LSN they are valid for. We don't need to
1526 * include timelines in the name as each LSN maps to exactly one timeline
1527 * unless the user used pg_resetwal or similar. If a user did so, there's
1528 * no hope continuing to decode anyway.
1529 */
1530 sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1531 (uint32) (lsn >> 32), (uint32) lsn);
1532
1533 /*
1534 * first check whether some other backend already has written the snapshot
1535 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1536 * as a valid state. Everything else is an unexpected error.
1537 */
1538 ret = stat(path, &stat_buf);
1539
1540 if (ret != 0 && errno != ENOENT)
1541 ereport(ERROR,
1542 (errcode_for_file_access(),
1543 errmsg("could not stat file \"%s\": %m", path)));
1544
1545 else if (ret == 0)
1546 {
1547 /*
1548 * somebody else has already serialized to this point, don't overwrite
1549 * but remember location, so we don't need to read old data again.
1550 *
1551 * To be sure it has been synced to disk after the rename() from the
1552 * tempfile filename to the real filename, we just repeat the fsync.
1553 * That ought to be cheap because in most scenarios it should already
1554 * be safely on disk.
1555 */
1556 fsync_fname(path, false);
1557 fsync_fname("pg_logical/snapshots", true);
1558
1559 builder->last_serialized_snapshot = lsn;
1560 goto out;
1561 }
1562
1563 /*
1564 * there is an obvious race condition here between the time we stat(2) the
1565 * file and us writing the file. But we rename the file into place
1566 * atomically and all files created need to contain the same data anyway,
1567 * so this is perfectly fine, although a bit of a resource waste. Locking
1568 * seems like pointless complication.
1569 */
1570 elog(DEBUG1, "serializing snapshot to %s", path);
1571
1572 /* to make sure only we will write to this tempfile, include pid */
1573 sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1574 (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1575
1576 /*
1577 * Unlink temporary file if it already exists, needs to have been before a
1578 * crash/error since we won't enter this function twice from within a
1579 * single decoding slot/backend and the temporary file contains the pid of
1580 * the current process.
1581 */
1582 if (unlink(tmppath) != 0 && errno != ENOENT)
1583 ereport(ERROR,
1584 (errcode_for_file_access(),
1585 errmsg("could not remove file \"%s\": %m", tmppath)));
1586
1587 needed_length = sizeof(SnapBuildOnDisk) +
1588 sizeof(TransactionId) * builder->committed.xcnt;
1589
1590 ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1591 ondisk = (SnapBuildOnDisk *) ondisk_c;
1592 ondisk->magic = SNAPBUILD_MAGIC;
1593 ondisk->version = SNAPBUILD_VERSION;
1594 ondisk->length = needed_length;
1595 INIT_CRC32C(ondisk->checksum);
1596 COMP_CRC32C(ondisk->checksum,
1597 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1598 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1599 ondisk_c += sizeof(SnapBuildOnDisk);
1600
1601 memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1602 /* NULL-ify memory-only data */
1603 ondisk->builder.context = NULL;
1604 ondisk->builder.snapshot = NULL;
1605 ondisk->builder.reorder = NULL;
1606 ondisk->builder.committed.xip = NULL;
1607
1608 COMP_CRC32C(ondisk->checksum,
1609 &ondisk->builder,
1610 sizeof(SnapBuild));
1611
1612 /* there shouldn't be any running xacts */
1613 Assert(builder->was_running.was_xcnt == 0);
1614
1615 /* copy committed xacts */
1616 sz = sizeof(TransactionId) * builder->committed.xcnt;
1617 memcpy(ondisk_c, builder->committed.xip, sz);
1618 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1619 ondisk_c += sz;
1620
1621 FIN_CRC32C(ondisk->checksum);
1622
1623 /* we have valid data now, open tempfile and write it there */
1624 fd = OpenTransientFile(tmppath,
1625 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1626 if (fd < 0)
1627 ereport(ERROR,
1628 (errcode_for_file_access(),
1629 errmsg("could not open file \"%s\": %m", tmppath)));
1630
1631 errno = 0;
1632 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1633 if ((write(fd, ondisk, needed_length)) != needed_length)
1634 {
1635 int save_errno = errno;
1636
1637 CloseTransientFile(fd);
1638
1639 /* if write didn't set errno, assume problem is no disk space */
1640 errno = save_errno ? save_errno : ENOSPC;
1641 ereport(ERROR,
1642 (errcode_for_file_access(),
1643 errmsg("could not write to file \"%s\": %m", tmppath)));
1644 }
1645 pgstat_report_wait_end();
1646
1647 /*
1648 * fsync the file before renaming so that even if we crash after this we
1649 * have either a fully valid file or nothing.
1650 *
1651 * It's safe to just ERROR on fsync() here because we'll retry the whole
1652 * operation including the writes.
1653 *
1654 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1655 * some noticeable overhead since it's performed synchronously during
1656 * decoding?
1657 */
1658 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1659 if (pg_fsync(fd) != 0)
1660 {
1661 int save_errno = errno;
1662
1663 CloseTransientFile(fd);
1664 errno = save_errno;
1665 ereport(ERROR,
1666 (errcode_for_file_access(),
1667 errmsg("could not fsync file \"%s\": %m", tmppath)));
1668 }
1669 pgstat_report_wait_end();
1670
1671 if (CloseTransientFile(fd))
1672 ereport(ERROR,
1673 (errcode_for_file_access(),
1674 errmsg("could not close file \"%s\": %m", tmppath)));
1675
1676 fsync_fname("pg_logical/snapshots", true);
1677
1678 /*
1679 * We may overwrite the work from some other backend, but that's ok, our
1680 * snapshot is valid as well, we'll just have done some superfluous work.
1681 */
1682 if (rename(tmppath, path) != 0)
1683 {
1684 ereport(ERROR,
1685 (errcode_for_file_access(),
1686 errmsg("could not rename file \"%s\" to \"%s\": %m",
1687 tmppath, path)));
1688 }
1689
1690 /* make sure we persist */
1691 fsync_fname(path, false);
1692 fsync_fname("pg_logical/snapshots", true);
1693
1694 /*
1695 * Now there's no way we can loose the dumped state anymore, remember this
1696 * as a serialization point.
1697 */
1698 builder->last_serialized_snapshot = lsn;
1699
1700 out:
1701 ReorderBufferSetRestartPoint(builder->reorder,
1702 builder->last_serialized_snapshot);
1703 /* be tidy */
1704 if (ondisk)
1705 pfree(ondisk);
1706 }
1707
1708 /*
1709 * Restore a snapshot into 'builder' if previously one has been stored at the
1710 * location indicated by 'lsn'. Returns true if successful, false otherwise.
1711 */
1712 static bool
SnapBuildRestore(SnapBuild * builder,XLogRecPtr lsn)1713 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1714 {
1715 SnapBuildOnDisk ondisk;
1716 int fd;
1717 char path[MAXPGPATH];
1718 Size sz;
1719 int readBytes;
1720 pg_crc32c checksum;
1721
1722 /* no point in loading a snapshot if we're already there */
1723 if (builder->state == SNAPBUILD_CONSISTENT)
1724 return false;
1725
1726 sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1727 (uint32) (lsn >> 32), (uint32) lsn);
1728
1729 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1730
1731 if (fd < 0 && errno == ENOENT)
1732 return false;
1733 else if (fd < 0)
1734 ereport(ERROR,
1735 (errcode_for_file_access(),
1736 errmsg("could not open file \"%s\": %m", path)));
1737
1738 /* ----
1739 * Make sure the snapshot had been stored safely to disk, that's normally
1740 * cheap.
1741 * Note that we do not need PANIC here, nobody will be able to use the
1742 * slot without fsyncing, and saving it won't succeed without an fsync()
1743 * either...
1744 * ----
1745 */
1746 fsync_fname(path, false);
1747 fsync_fname("pg_logical/snapshots", true);
1748
1749
1750 /* read statically sized portion of snapshot */
1751 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1752 readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1753 pgstat_report_wait_end();
1754 if (readBytes != SnapBuildOnDiskConstantSize)
1755 {
1756 int save_errno = errno;
1757
1758 CloseTransientFile(fd);
1759
1760 if (readBytes < 0)
1761 {
1762 errno = save_errno;
1763 ereport(ERROR,
1764 (errcode_for_file_access(),
1765 errmsg("could not read file \"%s\": %m", path)));
1766 }
1767 else
1768 ereport(ERROR,
1769 (errcode(ERRCODE_DATA_CORRUPTED),
1770 errmsg("could not read file \"%s\": read %d of %zu",
1771 path, readBytes,
1772 (Size) SnapBuildOnDiskConstantSize)));
1773 }
1774
1775 if (ondisk.magic != SNAPBUILD_MAGIC)
1776 ereport(ERROR,
1777 (errcode(ERRCODE_DATA_CORRUPTED),
1778 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1779 path, ondisk.magic, SNAPBUILD_MAGIC)));
1780
1781 if (ondisk.version != SNAPBUILD_VERSION)
1782 ereport(ERROR,
1783 (errcode(ERRCODE_DATA_CORRUPTED),
1784 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1785 path, ondisk.version, SNAPBUILD_VERSION)));
1786
1787 INIT_CRC32C(checksum);
1788 COMP_CRC32C(checksum,
1789 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1790 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1791
1792 /* read SnapBuild */
1793 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1794 readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1795 pgstat_report_wait_end();
1796 if (readBytes != sizeof(SnapBuild))
1797 {
1798 int save_errno = errno;
1799
1800 CloseTransientFile(fd);
1801
1802 if (readBytes < 0)
1803 {
1804 errno = save_errno;
1805 ereport(ERROR,
1806 (errcode_for_file_access(),
1807 errmsg("could not read file \"%s\": %m", path)));
1808 }
1809 else
1810 ereport(ERROR,
1811 (errcode(ERRCODE_DATA_CORRUPTED),
1812 errmsg("could not read file \"%s\": read %d of %zu",
1813 path, readBytes, sizeof(SnapBuild))));
1814 }
1815 COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1816
1817 /* restore running xacts (dead, but kept for backward compat) */
1818 sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
1819 ondisk.builder.was_running.was_xip =
1820 MemoryContextAllocZero(builder->context, sz);
1821 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1822 readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
1823 pgstat_report_wait_end();
1824 if (readBytes != sz)
1825 {
1826 int save_errno = errno;
1827
1828 CloseTransientFile(fd);
1829
1830 if (readBytes < 0)
1831 {
1832 errno = save_errno;
1833 ereport(ERROR,
1834 (errcode_for_file_access(),
1835 errmsg("could not read file \"%s\": %m", path)));
1836 }
1837 else
1838 ereport(ERROR,
1839 (errcode(ERRCODE_DATA_CORRUPTED),
1840 errmsg("could not read file \"%s\": read %d of %zu",
1841 path, readBytes, sz)));
1842 }
1843 COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
1844
1845 /* restore committed xacts information */
1846 sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1847 ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1848 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1849 readBytes = read(fd, ondisk.builder.committed.xip, sz);
1850 pgstat_report_wait_end();
1851 if (readBytes != sz)
1852 {
1853 int save_errno = errno;
1854
1855 CloseTransientFile(fd);
1856
1857 if (readBytes < 0)
1858 {
1859 errno = save_errno;
1860 ereport(ERROR,
1861 (errcode_for_file_access(),
1862 errmsg("could not read file \"%s\": %m", path)));
1863 }
1864 else
1865 ereport(ERROR,
1866 (errcode(ERRCODE_DATA_CORRUPTED),
1867 errmsg("could not read file \"%s\": read %d of %zu",
1868 path, readBytes, sz)));
1869 }
1870 COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1871
1872 if (CloseTransientFile(fd))
1873 ereport(ERROR,
1874 (errcode_for_file_access(),
1875 errmsg("could not close file \"%s\": %m", path)));
1876
1877 FIN_CRC32C(checksum);
1878
1879 /* verify checksum of what we've read */
1880 if (!EQ_CRC32C(checksum, ondisk.checksum))
1881 ereport(ERROR,
1882 (errcode(ERRCODE_DATA_CORRUPTED),
1883 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1884 path, checksum, ondisk.checksum)));
1885
1886 /*
1887 * ok, we now have a sensible snapshot here, figure out if it has more
1888 * information than we have.
1889 */
1890
1891 /*
1892 * We are only interested in consistent snapshots for now, comparing
1893 * whether one incomplete snapshot is more "advanced" seems to be
1894 * unnecessarily complex.
1895 */
1896 if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1897 goto snapshot_not_interesting;
1898
1899 /*
1900 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1901 * be available.
1902 */
1903 if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1904 goto snapshot_not_interesting;
1905
1906
1907 /* ok, we think the snapshot is sensible, copy over everything important */
1908 builder->xmin = ondisk.builder.xmin;
1909 builder->xmax = ondisk.builder.xmax;
1910 builder->state = ondisk.builder.state;
1911
1912 builder->committed.xcnt = ondisk.builder.committed.xcnt;
1913 /* We only allocated/stored xcnt, not xcnt_space xids ! */
1914 /* don't overwrite preallocated xip, if we don't have anything here */
1915 if (builder->committed.xcnt > 0)
1916 {
1917 pfree(builder->committed.xip);
1918 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1919 builder->committed.xip = ondisk.builder.committed.xip;
1920 }
1921 ondisk.builder.committed.xip = NULL;
1922
1923 /* our snapshot is not interesting anymore, build a new one */
1924 if (builder->snapshot != NULL)
1925 {
1926 SnapBuildSnapDecRefcount(builder->snapshot);
1927 }
1928 builder->snapshot = SnapBuildBuildSnapshot(builder);
1929 SnapBuildSnapIncRefcount(builder->snapshot);
1930
1931 ReorderBufferSetRestartPoint(builder->reorder, lsn);
1932
1933 Assert(builder->state == SNAPBUILD_CONSISTENT);
1934
1935 ereport(LOG,
1936 (errmsg("logical decoding found consistent point at %X/%X",
1937 (uint32) (lsn >> 32), (uint32) lsn),
1938 errdetail("Logical decoding will begin using saved snapshot.")));
1939 return true;
1940
1941 snapshot_not_interesting:
1942 if (ondisk.builder.committed.xip != NULL)
1943 pfree(ondisk.builder.committed.xip);
1944 return false;
1945 }
1946
1947 /*
1948 * Remove all serialized snapshots that are not required anymore because no
1949 * slot can need them. This doesn't actually have to run during a checkpoint,
1950 * but it's a convenient point to schedule this.
1951 *
1952 * NB: We run this during checkpoints even if logical decoding is disabled so
1953 * we cleanup old slots at some point after it got disabled.
1954 */
1955 void
CheckPointSnapBuild(void)1956 CheckPointSnapBuild(void)
1957 {
1958 XLogRecPtr cutoff;
1959 XLogRecPtr redo;
1960 DIR *snap_dir;
1961 struct dirent *snap_de;
1962 char path[MAXPGPATH + 21];
1963
1964 /*
1965 * We start off with a minimum of the last redo pointer. No new
1966 * replication slot will start before that, so that's a safe upper bound
1967 * for removal.
1968 */
1969 redo = GetRedoRecPtr();
1970
1971 /* now check for the restart ptrs from existing slots */
1972 cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1973
1974 /* don't start earlier than the restart lsn */
1975 if (redo < cutoff)
1976 cutoff = redo;
1977
1978 snap_dir = AllocateDir("pg_logical/snapshots");
1979 while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1980 {
1981 uint32 hi;
1982 uint32 lo;
1983 XLogRecPtr lsn;
1984 struct stat statbuf;
1985
1986 if (strcmp(snap_de->d_name, ".") == 0 ||
1987 strcmp(snap_de->d_name, "..") == 0)
1988 continue;
1989
1990 snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
1991
1992 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1993 {
1994 elog(DEBUG1, "only regular files expected: %s", path);
1995 continue;
1996 }
1997
1998 /*
1999 * temporary filenames from SnapBuildSerialize() include the LSN and
2000 * everything but are postfixed by .$pid.tmp. We can just remove them
2001 * the same as other files because there can be none that are
2002 * currently being written that are older than cutoff.
2003 *
2004 * We just log a message if a file doesn't fit the pattern, it's
2005 * probably some editors lock/state file or similar...
2006 */
2007 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2008 {
2009 ereport(LOG,
2010 (errmsg("could not parse file name \"%s\"", path)));
2011 continue;
2012 }
2013
2014 lsn = ((uint64) hi) << 32 | lo;
2015
2016 /* check whether we still need it */
2017 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
2018 {
2019 elog(DEBUG1, "removing snapbuild snapshot %s", path);
2020
2021 /*
2022 * It's not particularly harmful, though strange, if we can't
2023 * remove the file here. Don't prevent the checkpoint from
2024 * completing, that'd be a cure worse than the disease.
2025 */
2026 if (unlink(path) < 0)
2027 {
2028 ereport(LOG,
2029 (errcode_for_file_access(),
2030 errmsg("could not remove file \"%s\": %m",
2031 path)));
2032 continue;
2033 }
2034 }
2035 }
2036 FreeDir(snap_dir);
2037 }
2038