1 /*-------------------------------------------------------------------------
2 *
3 * reorderbuffer.c
4 * PostgreSQL logical replay/reorder buffer management
5 *
6 *
7 * Copyright (c) 2012-2021, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/reorderbuffer.c
12 *
13 * NOTES
14 * This module gets handed individual pieces of transactions in the order
15 * they are written to the WAL and is responsible to reassemble them into
16 * toplevel transaction sized pieces. When a transaction is completely
17 * reassembled - signaled by reading the transaction commit record - it
18 * will then call the output plugin (cf. ReorderBufferCommit()) with the
19 * individual changes. The output plugins rely on snapshots built by
20 * snapbuild.c which hands them to us.
21 *
22 * Transactions and subtransactions/savepoints in postgres are not
23 * immediately linked to each other from outside the performing
24 * backend. Only at commit/abort (or special xact_assignment records) they
25 * are linked together. Which means that we will have to splice together a
26 * toplevel transaction from its subtransactions. To do that efficiently we
27 * build a binary heap indexed by the smallest current lsn of the individual
28 * subtransactions' changestreams. As the individual streams are inherently
29 * ordered by LSN - since that is where we build them from - the transaction
30 * can easily be reassembled by always using the subtransaction with the
31 * smallest current LSN from the heap.
32 *
33 * In order to cope with large transactions - which can be several times as
34 * big as the available memory - this module supports spooling the contents
35 * of a large transactions to disk. When the transaction is replayed the
36 * contents of individual (sub-)transactions will be read from disk in
37 * chunks.
38 *
39 * This module also has to deal with reassembling toast records from the
40 * individual chunks stored in WAL. When a new (or initial) version of a
41 * tuple is stored in WAL it will always be preceded by the toast chunks
42 * emitted for the columns stored out of line. Within a single toplevel
43 * transaction there will be no other data carrying records between a row's
44 * toast chunks and the row data itself. See ReorderBufferToast* for
45 * details.
46 *
47 * ReorderBuffer uses two special memory context types - SlabContext for
48 * allocations of fixed-length structures (changes and transactions), and
49 * GenerationContext for the variable-length transaction data (allocated
50 * and freed in groups with similar lifespans).
51 *
52 * To limit the amount of memory used by decoded changes, we track memory
53 * used at the reorder buffer level (i.e. total amount of memory), and for
54 * each transaction. When the total amount of used memory exceeds the
55 * limit, the transaction consuming the most memory is then serialized to
56 * disk.
57 *
58 * Only decoded changes are evicted from memory (spilled to disk), not the
59 * transaction records. The number of toplevel transactions is limited,
60 * but a transaction with many subtransactions may still consume significant
61 * amounts of memory. However, the transaction records are fairly small and
62 * are not included in the memory limit.
63 *
64 * The current eviction algorithm is very simple - the transaction is
65 * picked merely by size, while it might be useful to also consider age
66 * (LSN) of the changes for example. With the new Generational memory
67 * allocator, evicting the oldest changes would make it more likely the
68 * memory gets actually freed.
69 *
70 * We still rely on max_changes_in_memory when loading serialized changes
71 * back into memory. At that point we can't use the memory limit directly
72 * as we load the subxacts independently. One option to deal with this
73 * would be to count the subxacts, and allow each to allocate 1/N of the
74 * memory limit. That however does not seem very appealing, because with
75 * many subtransactions it may easily cause thrashing (short cycles of
76 * deserializing and applying very few changes). We probably should give
77 * a bit more memory to the oldest subtransactions, because it's likely
78 * they are the source for the next sequence of changes.
79 *
80 * -------------------------------------------------------------------------
81 */
82 #include "postgres.h"
83
84 #include <unistd.h>
85 #include <sys/stat.h>
86
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
98 #include "replication/reorderbuffer.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110
111
112 /* entry for a hash table we use to map from xid to our transaction state */
113 typedef struct ReorderBufferTXNByIdEnt
114 {
115 TransactionId xid;
116 ReorderBufferTXN *txn;
117 } ReorderBufferTXNByIdEnt;
118
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
120 typedef struct ReorderBufferTupleCidKey
121 {
122 RelFileNode relnode;
123 ItemPointerData tid;
124 } ReorderBufferTupleCidKey;
125
126 typedef struct ReorderBufferTupleCidEnt
127 {
128 ReorderBufferTupleCidKey key;
129 CommandId cmin;
130 CommandId cmax;
131 CommandId combocid; /* just for debugging */
132 } ReorderBufferTupleCidEnt;
133
134 /* Virtual file descriptor with file offset tracking */
135 typedef struct TXNEntryFile
136 {
137 File vfd; /* -1 when the file is closed */
138 off_t curOffset; /* offset for next write or read. Reset to 0
139 * when vfd is opened. */
140 } TXNEntryFile;
141
142 /* k-way in-order change iteration support structures */
143 typedef struct ReorderBufferIterTXNEntry
144 {
145 XLogRecPtr lsn;
146 ReorderBufferChange *change;
147 ReorderBufferTXN *txn;
148 TXNEntryFile file;
149 XLogSegNo segno;
150 } ReorderBufferIterTXNEntry;
151
152 typedef struct ReorderBufferIterTXNState
153 {
154 binaryheap *heap;
155 Size nr_txns;
156 dlist_head old_change;
157 ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
158 } ReorderBufferIterTXNState;
159
160 /* toast datastructures */
161 typedef struct ReorderBufferToastEnt
162 {
163 Oid chunk_id; /* toast_table.chunk_id */
164 int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165 * have seen */
166 Size num_chunks; /* number of chunks we've already seen */
167 Size size; /* combined size of chunks seen */
168 dlist_head chunks; /* linked list of chunks */
169 struct varlena *reconstructed; /* reconstructed varlena now pointed to in
170 * main tup */
171 } ReorderBufferToastEnt;
172
173 /* Disk serialization support datastructures */
174 typedef struct ReorderBufferDiskChange
175 {
176 Size size;
177 ReorderBufferChange change;
178 /* data follows */
179 } ReorderBufferDiskChange;
180
181 #define IsSpecInsert(action) \
182 ( \
183 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
184 )
185 #define IsSpecConfirmOrAbort(action) \
186 ( \
187 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
188 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
189 )
190 #define IsInsertOrUpdate(action) \
191 ( \
192 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
193 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
194 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
195 )
196
197 /*
198 * Maximum number of changes kept in memory, per transaction. After that,
199 * changes are spooled to disk.
200 *
201 * The current value should be sufficient to decode the entire transaction
202 * without hitting disk in OLTP workloads, while starting to spool to disk in
203 * other workloads reasonably fast.
204 *
205 * At some point in the future it probably makes sense to have a more elaborate
206 * resource management here, but it's not entirely clear what that would look
207 * like.
208 */
209 int logical_decoding_work_mem;
210 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
211
212 /* ---------------------------------------
213 * primary reorderbuffer support routines
214 * ---------------------------------------
215 */
216 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
217 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
218 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
219 TransactionId xid, bool create, bool *is_new,
220 XLogRecPtr lsn, bool create_as_top);
221 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
222 ReorderBufferTXN *subtxn);
223
224 static void AssertTXNLsnOrder(ReorderBuffer *rb);
225
226 /* ---------------------------------------
227 * support functions for lsn-order iterating over the ->changes of a
228 * transaction and its subtransactions
229 *
230 * used for iteration over the k-way heap merge of a transaction and its
231 * subtransactions
232 * ---------------------------------------
233 */
234 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
235 ReorderBufferIterTXNState *volatile *iter_state);
236 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
237 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
238 ReorderBufferIterTXNState *state);
239 static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
240
241 /*
242 * ---------------------------------------
243 * Disk serialization support functions
244 * ---------------------------------------
245 */
246 static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
247 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
248 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
249 int fd, ReorderBufferChange *change);
250 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
251 TXNEntryFile *file, XLogSegNo *segno);
252 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
253 char *change);
254 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
255 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
256 bool txn_prepared);
257 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
258 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
259 TransactionId xid, XLogSegNo segno);
260
261 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
262 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
263 ReorderBufferTXN *txn, CommandId cid);
264
265 /*
266 * ---------------------------------------
267 * Streaming support functions
268 * ---------------------------------------
269 */
270 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
271 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
272 static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
273 static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
274
275 /* ---------------------------------------
276 * toast reassembly support
277 * ---------------------------------------
278 */
279 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
280 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
281 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
282 Relation relation, ReorderBufferChange *change);
283 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
284 Relation relation, ReorderBufferChange *change);
285
286 /*
287 * ---------------------------------------
288 * memory accounting
289 * ---------------------------------------
290 */
291 static Size ReorderBufferChangeSize(ReorderBufferChange *change);
292 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
293 ReorderBufferChange *change,
294 bool addition, Size sz);
295
296 /*
297 * Allocate a new ReorderBuffer and clean out any old serialized state from
298 * prior ReorderBuffer instances for the same slot.
299 */
300 ReorderBuffer *
ReorderBufferAllocate(void)301 ReorderBufferAllocate(void)
302 {
303 ReorderBuffer *buffer;
304 HASHCTL hash_ctl;
305 MemoryContext new_ctx;
306
307 Assert(MyReplicationSlot != NULL);
308
309 /* allocate memory in own context, to have better accountability */
310 new_ctx = AllocSetContextCreate(CurrentMemoryContext,
311 "ReorderBuffer",
312 ALLOCSET_DEFAULT_SIZES);
313
314 buffer =
315 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
316
317 memset(&hash_ctl, 0, sizeof(hash_ctl));
318
319 buffer->context = new_ctx;
320
321 buffer->change_context = SlabContextCreate(new_ctx,
322 "Change",
323 SLAB_DEFAULT_BLOCK_SIZE,
324 sizeof(ReorderBufferChange));
325
326 buffer->txn_context = SlabContextCreate(new_ctx,
327 "TXN",
328 SLAB_DEFAULT_BLOCK_SIZE,
329 sizeof(ReorderBufferTXN));
330
331 buffer->tup_context = GenerationContextCreate(new_ctx,
332 "Tuples",
333 SLAB_LARGE_BLOCK_SIZE);
334
335 hash_ctl.keysize = sizeof(TransactionId);
336 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
337 hash_ctl.hcxt = buffer->context;
338
339 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
340 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
341
342 buffer->by_txn_last_xid = InvalidTransactionId;
343 buffer->by_txn_last_txn = NULL;
344
345 buffer->outbuf = NULL;
346 buffer->outbufsize = 0;
347 buffer->size = 0;
348
349 buffer->spillTxns = 0;
350 buffer->spillCount = 0;
351 buffer->spillBytes = 0;
352 buffer->streamTxns = 0;
353 buffer->streamCount = 0;
354 buffer->streamBytes = 0;
355 buffer->totalTxns = 0;
356 buffer->totalBytes = 0;
357
358 buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
359
360 dlist_init(&buffer->toplevel_by_lsn);
361 dlist_init(&buffer->txns_by_base_snapshot_lsn);
362
363 /*
364 * Ensure there's no stale data from prior uses of this slot, in case some
365 * prior exit avoided calling ReorderBufferFree. Failure to do this can
366 * produce duplicated txns, and it's very cheap if there's nothing there.
367 */
368 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
369
370 return buffer;
371 }
372
373 /*
374 * Free a ReorderBuffer
375 */
376 void
ReorderBufferFree(ReorderBuffer * rb)377 ReorderBufferFree(ReorderBuffer *rb)
378 {
379 MemoryContext context = rb->context;
380
381 /*
382 * We free separately allocated data by entirely scrapping reorderbuffer's
383 * memory context.
384 */
385 MemoryContextDelete(context);
386
387 /* Free disk space used by unconsumed reorder buffers */
388 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
389 }
390
391 /*
392 * Get an unused, possibly preallocated, ReorderBufferTXN.
393 */
394 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)395 ReorderBufferGetTXN(ReorderBuffer *rb)
396 {
397 ReorderBufferTXN *txn;
398
399 txn = (ReorderBufferTXN *)
400 MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
401
402 memset(txn, 0, sizeof(ReorderBufferTXN));
403
404 dlist_init(&txn->changes);
405 dlist_init(&txn->tuplecids);
406 dlist_init(&txn->subtxns);
407
408 /* InvalidCommandId is not zero, so set it explicitly */
409 txn->command_id = InvalidCommandId;
410 txn->output_plugin_private = NULL;
411
412 return txn;
413 }
414
415 /*
416 * Free a ReorderBufferTXN.
417 */
418 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)419 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
420 {
421 /* clean the lookup cache if we were cached (quite likely) */
422 if (rb->by_txn_last_xid == txn->xid)
423 {
424 rb->by_txn_last_xid = InvalidTransactionId;
425 rb->by_txn_last_txn = NULL;
426 }
427
428 /* free data that's contained */
429
430 if (txn->gid != NULL)
431 {
432 pfree(txn->gid);
433 txn->gid = NULL;
434 }
435
436 if (txn->tuplecid_hash != NULL)
437 {
438 hash_destroy(txn->tuplecid_hash);
439 txn->tuplecid_hash = NULL;
440 }
441
442 if (txn->invalidations)
443 {
444 pfree(txn->invalidations);
445 txn->invalidations = NULL;
446 }
447
448 /* Reset the toast hash */
449 ReorderBufferToastReset(rb, txn);
450
451 pfree(txn);
452 }
453
454 /*
455 * Get an fresh ReorderBufferChange.
456 */
457 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)458 ReorderBufferGetChange(ReorderBuffer *rb)
459 {
460 ReorderBufferChange *change;
461
462 change = (ReorderBufferChange *)
463 MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
464
465 memset(change, 0, sizeof(ReorderBufferChange));
466 return change;
467 }
468
469 /*
470 * Free a ReorderBufferChange and update memory accounting, if requested.
471 */
472 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change,bool upd_mem)473 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
474 bool upd_mem)
475 {
476 /* update memory accounting info */
477 if (upd_mem)
478 ReorderBufferChangeMemoryUpdate(rb, change, false,
479 ReorderBufferChangeSize(change));
480
481 /* free contained data */
482 switch (change->action)
483 {
484 case REORDER_BUFFER_CHANGE_INSERT:
485 case REORDER_BUFFER_CHANGE_UPDATE:
486 case REORDER_BUFFER_CHANGE_DELETE:
487 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
488 if (change->data.tp.newtuple)
489 {
490 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
491 change->data.tp.newtuple = NULL;
492 }
493
494 if (change->data.tp.oldtuple)
495 {
496 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
497 change->data.tp.oldtuple = NULL;
498 }
499 break;
500 case REORDER_BUFFER_CHANGE_MESSAGE:
501 if (change->data.msg.prefix != NULL)
502 pfree(change->data.msg.prefix);
503 change->data.msg.prefix = NULL;
504 if (change->data.msg.message != NULL)
505 pfree(change->data.msg.message);
506 change->data.msg.message = NULL;
507 break;
508 case REORDER_BUFFER_CHANGE_INVALIDATION:
509 if (change->data.inval.invalidations)
510 pfree(change->data.inval.invalidations);
511 change->data.inval.invalidations = NULL;
512 break;
513 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
514 if (change->data.snapshot)
515 {
516 ReorderBufferFreeSnap(rb, change->data.snapshot);
517 change->data.snapshot = NULL;
518 }
519 break;
520 /* no data in addition to the struct itself */
521 case REORDER_BUFFER_CHANGE_TRUNCATE:
522 if (change->data.truncate.relids != NULL)
523 {
524 ReorderBufferReturnRelids(rb, change->data.truncate.relids);
525 change->data.truncate.relids = NULL;
526 }
527 break;
528 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
529 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
530 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
531 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
532 break;
533 }
534
535 pfree(change);
536 }
537
538 /*
539 * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
540 * tuple_len (excluding header overhead).
541 */
542 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)543 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
544 {
545 ReorderBufferTupleBuf *tuple;
546 Size alloc_len;
547
548 alloc_len = tuple_len + SizeofHeapTupleHeader;
549
550 tuple = (ReorderBufferTupleBuf *)
551 MemoryContextAlloc(rb->tup_context,
552 sizeof(ReorderBufferTupleBuf) +
553 MAXIMUM_ALIGNOF + alloc_len);
554 tuple->alloc_tuple_size = alloc_len;
555 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
556
557 return tuple;
558 }
559
560 /*
561 * Free an ReorderBufferTupleBuf.
562 */
563 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)564 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
565 {
566 pfree(tuple);
567 }
568
569 /*
570 * Get an array for relids of truncated relations.
571 *
572 * We use the global memory context (for the whole reorder buffer), because
573 * none of the existing ones seems like a good match (some are SLAB, so we
574 * can't use those, and tup_context is meant for tuple data, not relids). We
575 * could add yet another context, but it seems like an overkill - TRUNCATE is
576 * not particularly common operation, so it does not seem worth it.
577 */
578 Oid *
ReorderBufferGetRelids(ReorderBuffer * rb,int nrelids)579 ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
580 {
581 Oid *relids;
582 Size alloc_len;
583
584 alloc_len = sizeof(Oid) * nrelids;
585
586 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
587
588 return relids;
589 }
590
591 /*
592 * Free an array of relids.
593 */
594 void
ReorderBufferReturnRelids(ReorderBuffer * rb,Oid * relids)595 ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
596 {
597 pfree(relids);
598 }
599
600 /*
601 * Return the ReorderBufferTXN from the given buffer, specified by Xid.
602 * If create is true, and a transaction doesn't already exist, create it
603 * (with the given LSN, and as top transaction if that's specified);
604 * when this happens, is_new is set to true.
605 */
606 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)607 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
608 bool *is_new, XLogRecPtr lsn, bool create_as_top)
609 {
610 ReorderBufferTXN *txn;
611 ReorderBufferTXNByIdEnt *ent;
612 bool found;
613
614 Assert(TransactionIdIsValid(xid));
615
616 /*
617 * Check the one-entry lookup cache first
618 */
619 if (TransactionIdIsValid(rb->by_txn_last_xid) &&
620 rb->by_txn_last_xid == xid)
621 {
622 txn = rb->by_txn_last_txn;
623
624 if (txn != NULL)
625 {
626 /* found it, and it's valid */
627 if (is_new)
628 *is_new = false;
629 return txn;
630 }
631
632 /*
633 * cached as non-existent, and asked not to create? Then nothing else
634 * to do.
635 */
636 if (!create)
637 return NULL;
638 /* otherwise fall through to create it */
639 }
640
641 /*
642 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
643 * to create an entry.
644 */
645
646 /* search the lookup table */
647 ent = (ReorderBufferTXNByIdEnt *)
648 hash_search(rb->by_txn,
649 (void *) &xid,
650 create ? HASH_ENTER : HASH_FIND,
651 &found);
652 if (found)
653 txn = ent->txn;
654 else if (create)
655 {
656 /* initialize the new entry, if creation was requested */
657 Assert(ent != NULL);
658 Assert(lsn != InvalidXLogRecPtr);
659
660 ent->txn = ReorderBufferGetTXN(rb);
661 ent->txn->xid = xid;
662 txn = ent->txn;
663 txn->first_lsn = lsn;
664 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
665
666 if (create_as_top)
667 {
668 dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
669 AssertTXNLsnOrder(rb);
670 }
671 }
672 else
673 txn = NULL; /* not found and not asked to create */
674
675 /* update cache */
676 rb->by_txn_last_xid = xid;
677 rb->by_txn_last_txn = txn;
678
679 if (is_new)
680 *is_new = !found;
681
682 Assert(!create || txn != NULL);
683 return txn;
684 }
685
686 /*
687 * Record the partial change for the streaming of in-progress transactions. We
688 * can stream only complete changes so if we have a partial change like toast
689 * table insert or speculative insert then we mark such a 'txn' so that it
690 * can't be streamed. We also ensure that if the changes in such a 'txn' are
691 * above logical_decoding_work_mem threshold then we stream them as soon as we
692 * have a complete change.
693 */
694 static void
ReorderBufferProcessPartialChange(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferChange * change,bool toast_insert)695 ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
696 ReorderBufferChange *change,
697 bool toast_insert)
698 {
699 ReorderBufferTXN *toptxn;
700
701 /*
702 * The partial changes need to be processed only while streaming
703 * in-progress transactions.
704 */
705 if (!ReorderBufferCanStream(rb))
706 return;
707
708 /* Get the top transaction. */
709 if (txn->toptxn != NULL)
710 toptxn = txn->toptxn;
711 else
712 toptxn = txn;
713
714 /*
715 * Indicate a partial change for toast inserts. The change will be
716 * considered as complete once we get the insert or update on the main
717 * table and we are sure that the pending toast chunks are not required
718 * anymore.
719 *
720 * If we allow streaming when there are pending toast chunks then such
721 * chunks won't be released till the insert (multi_insert) is complete and
722 * we expect the txn to have streamed all changes after streaming. This
723 * restriction is mainly to ensure the correctness of streamed
724 * transactions and it doesn't seem worth uplifting such a restriction
725 * just to allow this case because anyway we will stream the transaction
726 * once such an insert is complete.
727 */
728 if (toast_insert)
729 toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
730 else if (rbtxn_has_partial_change(toptxn) &&
731 IsInsertOrUpdate(change->action) &&
732 change->data.tp.clear_toast_afterwards)
733 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
734
735 /*
736 * Indicate a partial change for speculative inserts. The change will be
737 * considered as complete once we get the speculative confirm or abort
738 * token.
739 */
740 if (IsSpecInsert(change->action))
741 toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
742 else if (rbtxn_has_partial_change(toptxn) &&
743 IsSpecConfirmOrAbort(change->action))
744 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
745
746 /*
747 * Stream the transaction if it is serialized before and the changes are
748 * now complete in the top-level transaction.
749 *
750 * The reason for doing the streaming of such a transaction as soon as we
751 * get the complete change for it is that previously it would have reached
752 * the memory threshold and wouldn't get streamed because of incomplete
753 * changes. Delaying such transactions would increase apply lag for them.
754 */
755 if (ReorderBufferCanStartStreaming(rb) &&
756 !(rbtxn_has_partial_change(toptxn)) &&
757 rbtxn_is_serialized(txn))
758 ReorderBufferStreamTXN(rb, toptxn);
759 }
760
761 /*
762 * Queue a change into a transaction so it can be replayed upon commit or will be
763 * streamed when we reach logical_decoding_work_mem threshold.
764 */
765 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change,bool toast_insert)766 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
767 ReorderBufferChange *change, bool toast_insert)
768 {
769 ReorderBufferTXN *txn;
770
771 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
772
773 /*
774 * While streaming the previous changes we have detected that the
775 * transaction is aborted. So there is no point in collecting further
776 * changes for it.
777 */
778 if (txn->concurrent_abort)
779 {
780 /*
781 * We don't need to update memory accounting for this change as we
782 * have not added it to the queue yet.
783 */
784 ReorderBufferReturnChange(rb, change, false);
785 return;
786 }
787
788 change->lsn = lsn;
789 change->txn = txn;
790
791 Assert(InvalidXLogRecPtr != lsn);
792 dlist_push_tail(&txn->changes, &change->node);
793 txn->nentries++;
794 txn->nentries_mem++;
795
796 /* update memory accounting information */
797 ReorderBufferChangeMemoryUpdate(rb, change, true,
798 ReorderBufferChangeSize(change));
799
800 /* process partial change */
801 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
802
803 /* check the memory limits and evict something if needed */
804 ReorderBufferCheckMemoryLimit(rb);
805 }
806
807 /*
808 * A transactional message is queued to be processed upon commit and a
809 * non-transactional message gets processed immediately.
810 */
811 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)812 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
813 Snapshot snapshot, XLogRecPtr lsn,
814 bool transactional, const char *prefix,
815 Size message_size, const char *message)
816 {
817 if (transactional)
818 {
819 MemoryContext oldcontext;
820 ReorderBufferChange *change;
821
822 Assert(xid != InvalidTransactionId);
823
824 oldcontext = MemoryContextSwitchTo(rb->context);
825
826 change = ReorderBufferGetChange(rb);
827 change->action = REORDER_BUFFER_CHANGE_MESSAGE;
828 change->data.msg.prefix = pstrdup(prefix);
829 change->data.msg.message_size = message_size;
830 change->data.msg.message = palloc(message_size);
831 memcpy(change->data.msg.message, message, message_size);
832
833 ReorderBufferQueueChange(rb, xid, lsn, change, false);
834
835 MemoryContextSwitchTo(oldcontext);
836 }
837 else
838 {
839 ReorderBufferTXN *txn = NULL;
840 volatile Snapshot snapshot_now = snapshot;
841
842 if (xid != InvalidTransactionId)
843 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
844
845 /* setup snapshot to allow catalog access */
846 SetupHistoricSnapshot(snapshot_now, NULL);
847 PG_TRY();
848 {
849 rb->message(rb, txn, lsn, false, prefix, message_size, message);
850
851 TeardownHistoricSnapshot(false);
852 }
853 PG_CATCH();
854 {
855 TeardownHistoricSnapshot(true);
856 PG_RE_THROW();
857 }
858 PG_END_TRY();
859 }
860 }
861
862 /*
863 * AssertTXNLsnOrder
864 * Verify LSN ordering of transaction lists in the reorderbuffer
865 *
866 * Other LSN-related invariants are checked too.
867 *
868 * No-op if assertions are not in use.
869 */
870 static void
AssertTXNLsnOrder(ReorderBuffer * rb)871 AssertTXNLsnOrder(ReorderBuffer *rb)
872 {
873 #ifdef USE_ASSERT_CHECKING
874 dlist_iter iter;
875 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
876 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
877
878 dlist_foreach(iter, &rb->toplevel_by_lsn)
879 {
880 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
881 iter.cur);
882
883 /* start LSN must be set */
884 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
885
886 /* If there is an end LSN, it must be higher than start LSN */
887 if (cur_txn->end_lsn != InvalidXLogRecPtr)
888 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
889
890 /* Current initial LSN must be strictly higher than previous */
891 if (prev_first_lsn != InvalidXLogRecPtr)
892 Assert(prev_first_lsn < cur_txn->first_lsn);
893
894 /* known-as-subtxn txns must not be listed */
895 Assert(!rbtxn_is_known_subxact(cur_txn));
896
897 prev_first_lsn = cur_txn->first_lsn;
898 }
899
900 dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
901 {
902 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
903 base_snapshot_node,
904 iter.cur);
905
906 /* base snapshot (and its LSN) must be set */
907 Assert(cur_txn->base_snapshot != NULL);
908 Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
909
910 /* current LSN must be strictly higher than previous */
911 if (prev_base_snap_lsn != InvalidXLogRecPtr)
912 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
913
914 /* known-as-subtxn txns must not be listed */
915 Assert(!rbtxn_is_known_subxact(cur_txn));
916
917 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
918 }
919 #endif
920 }
921
922 /*
923 * AssertChangeLsnOrder
924 *
925 * Check ordering of changes in the (sub)transaction.
926 */
927 static void
AssertChangeLsnOrder(ReorderBufferTXN * txn)928 AssertChangeLsnOrder(ReorderBufferTXN *txn)
929 {
930 #ifdef USE_ASSERT_CHECKING
931 dlist_iter iter;
932 XLogRecPtr prev_lsn = txn->first_lsn;
933
934 dlist_foreach(iter, &txn->changes)
935 {
936 ReorderBufferChange *cur_change;
937
938 cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
939
940 Assert(txn->first_lsn != InvalidXLogRecPtr);
941 Assert(cur_change->lsn != InvalidXLogRecPtr);
942 Assert(txn->first_lsn <= cur_change->lsn);
943
944 if (txn->end_lsn != InvalidXLogRecPtr)
945 Assert(cur_change->lsn <= txn->end_lsn);
946
947 Assert(prev_lsn <= cur_change->lsn);
948
949 prev_lsn = cur_change->lsn;
950 }
951 #endif
952 }
953
954 /*
955 * ReorderBufferGetOldestTXN
956 * Return oldest transaction in reorderbuffer
957 */
958 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)959 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
960 {
961 ReorderBufferTXN *txn;
962
963 AssertTXNLsnOrder(rb);
964
965 if (dlist_is_empty(&rb->toplevel_by_lsn))
966 return NULL;
967
968 txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
969
970 Assert(!rbtxn_is_known_subxact(txn));
971 Assert(txn->first_lsn != InvalidXLogRecPtr);
972 return txn;
973 }
974
975 /*
976 * ReorderBufferGetOldestXmin
977 * Return oldest Xmin in reorderbuffer
978 *
979 * Returns oldest possibly running Xid from the point of view of snapshots
980 * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
981 * there are none.
982 *
983 * Since snapshots are assigned monotonically, this equals the Xmin of the
984 * base snapshot with minimal base_snapshot_lsn.
985 */
986 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)987 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
988 {
989 ReorderBufferTXN *txn;
990
991 AssertTXNLsnOrder(rb);
992
993 if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
994 return InvalidTransactionId;
995
996 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
997 &rb->txns_by_base_snapshot_lsn);
998 return txn->base_snapshot->xmin;
999 }
1000
1001 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)1002 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
1003 {
1004 rb->current_restart_decoding_lsn = ptr;
1005 }
1006
1007 /*
1008 * ReorderBufferAssignChild
1009 *
1010 * Make note that we know that subxid is a subtransaction of xid, seen as of
1011 * the given lsn.
1012 */
1013 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)1014 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
1015 TransactionId subxid, XLogRecPtr lsn)
1016 {
1017 ReorderBufferTXN *txn;
1018 ReorderBufferTXN *subtxn;
1019 bool new_top;
1020 bool new_sub;
1021
1022 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1023 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1024
1025 if (!new_sub)
1026 {
1027 if (rbtxn_is_known_subxact(subtxn))
1028 {
1029 /* already associated, nothing to do */
1030 return;
1031 }
1032 else
1033 {
1034 /*
1035 * We already saw this transaction, but initially added it to the
1036 * list of top-level txns. Now that we know it's not top-level,
1037 * remove it from there.
1038 */
1039 dlist_delete(&subtxn->node);
1040 }
1041 }
1042
1043 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1044 subtxn->toplevel_xid = xid;
1045 Assert(subtxn->nsubtxns == 0);
1046
1047 /* set the reference to top-level transaction */
1048 subtxn->toptxn = txn;
1049
1050 /* add to subtransaction list */
1051 dlist_push_tail(&txn->subtxns, &subtxn->node);
1052 txn->nsubtxns++;
1053
1054 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1055 ReorderBufferTransferSnapToParent(txn, subtxn);
1056
1057 /* Verify LSN-ordering invariant */
1058 AssertTXNLsnOrder(rb);
1059 }
1060
1061 /*
1062 * ReorderBufferTransferSnapToParent
1063 * Transfer base snapshot from subtxn to top-level txn, if needed
1064 *
1065 * This is done if the top-level txn doesn't have a base snapshot, or if the
1066 * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1067 * snapshot's LSN. This can happen if there are no changes in the toplevel
1068 * txn but there are some in the subtxn, or the first change in subtxn has
1069 * earlier LSN than first change in the top-level txn and we learned about
1070 * their kinship only now.
1071 *
1072 * The subtransaction's snapshot is cleared regardless of the transfer
1073 * happening, since it's not needed anymore in either case.
1074 *
1075 * We do this as soon as we become aware of their kinship, to avoid queueing
1076 * extra snapshots to txns known-as-subtxns -- only top-level txns will
1077 * receive further snapshots.
1078 */
1079 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)1080 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
1081 ReorderBufferTXN *subtxn)
1082 {
1083 Assert(subtxn->toplevel_xid == txn->xid);
1084
1085 if (subtxn->base_snapshot != NULL)
1086 {
1087 if (txn->base_snapshot == NULL ||
1088 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1089 {
1090 /*
1091 * If the toplevel transaction already has a base snapshot but
1092 * it's newer than the subxact's, purge it.
1093 */
1094 if (txn->base_snapshot != NULL)
1095 {
1096 SnapBuildSnapDecRefcount(txn->base_snapshot);
1097 dlist_delete(&txn->base_snapshot_node);
1098 }
1099
1100 /*
1101 * The snapshot is now the top transaction's; transfer it, and
1102 * adjust the list position of the top transaction in the list by
1103 * moving it to where the subtransaction is.
1104 */
1105 txn->base_snapshot = subtxn->base_snapshot;
1106 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1107 dlist_insert_before(&subtxn->base_snapshot_node,
1108 &txn->base_snapshot_node);
1109
1110 /*
1111 * The subtransaction doesn't have a snapshot anymore (so it
1112 * mustn't be in the list.)
1113 */
1114 subtxn->base_snapshot = NULL;
1115 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1116 dlist_delete(&subtxn->base_snapshot_node);
1117 }
1118 else
1119 {
1120 /* Base snap of toplevel is fine, so subxact's is not needed */
1121 SnapBuildSnapDecRefcount(subtxn->base_snapshot);
1122 dlist_delete(&subtxn->base_snapshot_node);
1123 subtxn->base_snapshot = NULL;
1124 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1125 }
1126 }
1127 }
1128
1129 /*
1130 * Associate a subtransaction with its toplevel transaction at commit
1131 * time. There may be no further changes added after this.
1132 */
1133 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)1134 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
1135 TransactionId subxid, XLogRecPtr commit_lsn,
1136 XLogRecPtr end_lsn)
1137 {
1138 ReorderBufferTXN *subtxn;
1139
1140 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1141 InvalidXLogRecPtr, false);
1142
1143 /*
1144 * No need to do anything if that subtxn didn't contain any changes
1145 */
1146 if (!subtxn)
1147 return;
1148
1149 subtxn->final_lsn = commit_lsn;
1150 subtxn->end_lsn = end_lsn;
1151
1152 /*
1153 * Assign this subxact as a child of the toplevel xact (no-op if already
1154 * done.)
1155 */
1156 ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1157 }
1158
1159
1160 /*
1161 * Support for efficiently iterating over a transaction's and its
1162 * subtransactions' changes.
1163 *
1164 * We do by doing a k-way merge between transactions/subtransactions. For that
1165 * we model the current heads of the different transactions as a binary heap
1166 * so we easily know which (sub-)transaction has the change with the smallest
1167 * lsn next.
1168 *
1169 * We assume the changes in individual transactions are already sorted by LSN.
1170 */
1171
1172 /*
1173 * Binary heap comparison function.
1174 */
1175 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)1176 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
1177 {
1178 ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
1179 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1180 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1181
1182 if (pos_a < pos_b)
1183 return 1;
1184 else if (pos_a == pos_b)
1185 return 0;
1186 return -1;
1187 }
1188
1189 /*
1190 * Allocate & initialize an iterator which iterates in lsn order over a
1191 * transaction and all its subtransactions.
1192 *
1193 * Note: The iterator state is returned through iter_state parameter rather
1194 * than the function's return value. This is because the state gets cleaned up
1195 * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1196 * back the state even if this function throws an exception.
1197 */
1198 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)1199 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
1200 ReorderBufferIterTXNState *volatile *iter_state)
1201 {
1202 Size nr_txns = 0;
1203 ReorderBufferIterTXNState *state;
1204 dlist_iter cur_txn_i;
1205 int32 off;
1206
1207 *iter_state = NULL;
1208
1209 /* Check ordering of changes in the toplevel transaction. */
1210 AssertChangeLsnOrder(txn);
1211
1212 /*
1213 * Calculate the size of our heap: one element for every transaction that
1214 * contains changes. (Besides the transactions already in the reorder
1215 * buffer, we count the one we were directly passed.)
1216 */
1217 if (txn->nentries > 0)
1218 nr_txns++;
1219
1220 dlist_foreach(cur_txn_i, &txn->subtxns)
1221 {
1222 ReorderBufferTXN *cur_txn;
1223
1224 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1225
1226 /* Check ordering of changes in this subtransaction. */
1227 AssertChangeLsnOrder(cur_txn);
1228
1229 if (cur_txn->nentries > 0)
1230 nr_txns++;
1231 }
1232
1233 /* allocate iteration state */
1234 state = (ReorderBufferIterTXNState *)
1235 MemoryContextAllocZero(rb->context,
1236 sizeof(ReorderBufferIterTXNState) +
1237 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1238
1239 state->nr_txns = nr_txns;
1240 dlist_init(&state->old_change);
1241
1242 for (off = 0; off < state->nr_txns; off++)
1243 {
1244 state->entries[off].file.vfd = -1;
1245 state->entries[off].segno = 0;
1246 }
1247
1248 /* allocate heap */
1249 state->heap = binaryheap_allocate(state->nr_txns,
1250 ReorderBufferIterCompare,
1251 state);
1252
1253 /* Now that the state fields are initialized, it is safe to return it. */
1254 *iter_state = state;
1255
1256 /*
1257 * Now insert items into the binary heap, in an unordered fashion. (We
1258 * will run a heap assembly step at the end; this is more efficient.)
1259 */
1260
1261 off = 0;
1262
1263 /* add toplevel transaction if it contains changes */
1264 if (txn->nentries > 0)
1265 {
1266 ReorderBufferChange *cur_change;
1267
1268 if (rbtxn_is_serialized(txn))
1269 {
1270 /* serialize remaining changes */
1271 ReorderBufferSerializeTXN(rb, txn);
1272 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1273 &state->entries[off].segno);
1274 }
1275
1276 cur_change = dlist_head_element(ReorderBufferChange, node,
1277 &txn->changes);
1278
1279 state->entries[off].lsn = cur_change->lsn;
1280 state->entries[off].change = cur_change;
1281 state->entries[off].txn = txn;
1282
1283 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1284 }
1285
1286 /* add subtransactions if they contain changes */
1287 dlist_foreach(cur_txn_i, &txn->subtxns)
1288 {
1289 ReorderBufferTXN *cur_txn;
1290
1291 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1292
1293 if (cur_txn->nentries > 0)
1294 {
1295 ReorderBufferChange *cur_change;
1296
1297 if (rbtxn_is_serialized(cur_txn))
1298 {
1299 /* serialize remaining changes */
1300 ReorderBufferSerializeTXN(rb, cur_txn);
1301 ReorderBufferRestoreChanges(rb, cur_txn,
1302 &state->entries[off].file,
1303 &state->entries[off].segno);
1304 }
1305 cur_change = dlist_head_element(ReorderBufferChange, node,
1306 &cur_txn->changes);
1307
1308 state->entries[off].lsn = cur_change->lsn;
1309 state->entries[off].change = cur_change;
1310 state->entries[off].txn = cur_txn;
1311
1312 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1313 }
1314 }
1315
1316 /* assemble a valid binary heap */
1317 binaryheap_build(state->heap);
1318 }
1319
1320 /*
1321 * Return the next change when iterating over a transaction and its
1322 * subtransactions.
1323 *
1324 * Returns NULL when no further changes exist.
1325 */
1326 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1327 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1328 {
1329 ReorderBufferChange *change;
1330 ReorderBufferIterTXNEntry *entry;
1331 int32 off;
1332
1333 /* nothing there anymore */
1334 if (state->heap->bh_size == 0)
1335 return NULL;
1336
1337 off = DatumGetInt32(binaryheap_first(state->heap));
1338 entry = &state->entries[off];
1339
1340 /* free memory we might have "leaked" in the previous *Next call */
1341 if (!dlist_is_empty(&state->old_change))
1342 {
1343 change = dlist_container(ReorderBufferChange, node,
1344 dlist_pop_head_node(&state->old_change));
1345 ReorderBufferReturnChange(rb, change, true);
1346 Assert(dlist_is_empty(&state->old_change));
1347 }
1348
1349 change = entry->change;
1350
1351 /*
1352 * update heap with information about which transaction has the next
1353 * relevant change in LSN order
1354 */
1355
1356 /* there are in-memory changes */
1357 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1358 {
1359 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1360 ReorderBufferChange *next_change =
1361 dlist_container(ReorderBufferChange, node, next);
1362
1363 /* txn stays the same */
1364 state->entries[off].lsn = next_change->lsn;
1365 state->entries[off].change = next_change;
1366
1367 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1368 return change;
1369 }
1370
1371 /* try to load changes from disk */
1372 if (entry->txn->nentries != entry->txn->nentries_mem)
1373 {
1374 /*
1375 * Ugly: restoring changes will reuse *Change records, thus delete the
1376 * current one from the per-tx list and only free in the next call.
1377 */
1378 dlist_delete(&change->node);
1379 dlist_push_tail(&state->old_change, &change->node);
1380
1381 /*
1382 * Update the total bytes processed by the txn for which we are
1383 * releasing the current set of changes and restoring the new set of
1384 * changes.
1385 */
1386 rb->totalBytes += entry->txn->size;
1387 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1388 &state->entries[off].segno))
1389 {
1390 /* successfully restored changes from disk */
1391 ReorderBufferChange *next_change =
1392 dlist_head_element(ReorderBufferChange, node,
1393 &entry->txn->changes);
1394
1395 elog(DEBUG2, "restored %u/%u changes from disk",
1396 (uint32) entry->txn->nentries_mem,
1397 (uint32) entry->txn->nentries);
1398
1399 Assert(entry->txn->nentries_mem);
1400 /* txn stays the same */
1401 state->entries[off].lsn = next_change->lsn;
1402 state->entries[off].change = next_change;
1403 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1404
1405 return change;
1406 }
1407 }
1408
1409 /* ok, no changes there anymore, remove */
1410 binaryheap_remove_first(state->heap);
1411
1412 return change;
1413 }
1414
1415 /*
1416 * Deallocate the iterator
1417 */
1418 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1419 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1420 ReorderBufferIterTXNState *state)
1421 {
1422 int32 off;
1423
1424 for (off = 0; off < state->nr_txns; off++)
1425 {
1426 if (state->entries[off].file.vfd != -1)
1427 FileClose(state->entries[off].file.vfd);
1428 }
1429
1430 /* free memory we might have "leaked" in the last *Next call */
1431 if (!dlist_is_empty(&state->old_change))
1432 {
1433 ReorderBufferChange *change;
1434
1435 change = dlist_container(ReorderBufferChange, node,
1436 dlist_pop_head_node(&state->old_change));
1437 ReorderBufferReturnChange(rb, change, true);
1438 Assert(dlist_is_empty(&state->old_change));
1439 }
1440
1441 binaryheap_free(state->heap);
1442 pfree(state);
1443 }
1444
1445 /*
1446 * Cleanup the contents of a transaction, usually after the transaction
1447 * committed or aborted.
1448 */
1449 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1450 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1451 {
1452 bool found;
1453 dlist_mutable_iter iter;
1454
1455 /* cleanup subtransactions & their changes */
1456 dlist_foreach_modify(iter, &txn->subtxns)
1457 {
1458 ReorderBufferTXN *subtxn;
1459
1460 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1461
1462 /*
1463 * Subtransactions are always associated to the toplevel TXN, even if
1464 * they originally were happening inside another subtxn, so we won't
1465 * ever recurse more than one level deep here.
1466 */
1467 Assert(rbtxn_is_known_subxact(subtxn));
1468 Assert(subtxn->nsubtxns == 0);
1469
1470 ReorderBufferCleanupTXN(rb, subtxn);
1471 }
1472
1473 /* cleanup changes in the txn */
1474 dlist_foreach_modify(iter, &txn->changes)
1475 {
1476 ReorderBufferChange *change;
1477
1478 change = dlist_container(ReorderBufferChange, node, iter.cur);
1479
1480 /* Check we're not mixing changes from different transactions. */
1481 Assert(change->txn == txn);
1482
1483 ReorderBufferReturnChange(rb, change, true);
1484 }
1485
1486 /*
1487 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1488 * They are always stored in the toplevel transaction.
1489 */
1490 dlist_foreach_modify(iter, &txn->tuplecids)
1491 {
1492 ReorderBufferChange *change;
1493
1494 change = dlist_container(ReorderBufferChange, node, iter.cur);
1495
1496 /* Check we're not mixing changes from different transactions. */
1497 Assert(change->txn == txn);
1498 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1499
1500 ReorderBufferReturnChange(rb, change, true);
1501 }
1502
1503 /*
1504 * Cleanup the base snapshot, if set.
1505 */
1506 if (txn->base_snapshot != NULL)
1507 {
1508 SnapBuildSnapDecRefcount(txn->base_snapshot);
1509 dlist_delete(&txn->base_snapshot_node);
1510 }
1511
1512 /*
1513 * Cleanup the snapshot for the last streamed run.
1514 */
1515 if (txn->snapshot_now != NULL)
1516 {
1517 Assert(rbtxn_is_streamed(txn));
1518 ReorderBufferFreeSnap(rb, txn->snapshot_now);
1519 }
1520
1521 /*
1522 * Remove TXN from its containing list.
1523 *
1524 * Note: if txn is known as subxact, we are deleting the TXN from its
1525 * parent's list of known subxacts; this leaves the parent's nsubxacts
1526 * count too high, but we don't care. Otherwise, we are deleting the TXN
1527 * from the LSN-ordered list of toplevel TXNs.
1528 */
1529 dlist_delete(&txn->node);
1530
1531 /* now remove reference from buffer */
1532 hash_search(rb->by_txn,
1533 (void *) &txn->xid,
1534 HASH_REMOVE,
1535 &found);
1536 Assert(found);
1537
1538 /* remove entries spilled to disk */
1539 if (rbtxn_is_serialized(txn))
1540 ReorderBufferRestoreCleanup(rb, txn);
1541
1542 /* deallocate */
1543 ReorderBufferReturnTXN(rb, txn);
1544 }
1545
1546 /*
1547 * Discard changes from a transaction (and subtransactions), either after
1548 * streaming or decoding them at PREPARE. Keep the remaining info -
1549 * transactions, tuplecids, invalidations and snapshots.
1550 *
1551 * We additionaly remove tuplecids after decoding the transaction at prepare
1552 * time as we only need to perform invalidation at rollback or commit prepared.
1553 *
1554 * 'txn_prepared' indicates that we have decoded the transaction at prepare
1555 * time.
1556 */
1557 static void
ReorderBufferTruncateTXN(ReorderBuffer * rb,ReorderBufferTXN * txn,bool txn_prepared)1558 ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
1559 {
1560 dlist_mutable_iter iter;
1561
1562 /* cleanup subtransactions & their changes */
1563 dlist_foreach_modify(iter, &txn->subtxns)
1564 {
1565 ReorderBufferTXN *subtxn;
1566
1567 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1568
1569 /*
1570 * Subtransactions are always associated to the toplevel TXN, even if
1571 * they originally were happening inside another subtxn, so we won't
1572 * ever recurse more than one level deep here.
1573 */
1574 Assert(rbtxn_is_known_subxact(subtxn));
1575 Assert(subtxn->nsubtxns == 0);
1576
1577 ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1578 }
1579
1580 /* cleanup changes in the txn */
1581 dlist_foreach_modify(iter, &txn->changes)
1582 {
1583 ReorderBufferChange *change;
1584
1585 change = dlist_container(ReorderBufferChange, node, iter.cur);
1586
1587 /* Check we're not mixing changes from different transactions. */
1588 Assert(change->txn == txn);
1589
1590 /* remove the change from it's containing list */
1591 dlist_delete(&change->node);
1592
1593 ReorderBufferReturnChange(rb, change, true);
1594 }
1595
1596 /*
1597 * Mark the transaction as streamed.
1598 *
1599 * The toplevel transaction, identified by (toptxn==NULL), is marked as
1600 * streamed always, even if it does not contain any changes (that is, when
1601 * all the changes are in subtransactions).
1602 *
1603 * For subtransactions, we only mark them as streamed when there are
1604 * changes in them.
1605 *
1606 * We do it this way because of aborts - we don't want to send aborts for
1607 * XIDs the downstream is not aware of. And of course, it always knows
1608 * about the toplevel xact (we send the XID in all messages), but we never
1609 * stream XIDs of empty subxacts.
1610 */
1611 if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
1612 txn->txn_flags |= RBTXN_IS_STREAMED;
1613
1614 if (txn_prepared)
1615 {
1616 /*
1617 * If this is a prepared txn, cleanup the tuplecids we stored for
1618 * decoding catalog snapshot access. They are always stored in the
1619 * toplevel transaction.
1620 */
1621 dlist_foreach_modify(iter, &txn->tuplecids)
1622 {
1623 ReorderBufferChange *change;
1624
1625 change = dlist_container(ReorderBufferChange, node, iter.cur);
1626
1627 /* Check we're not mixing changes from different transactions. */
1628 Assert(change->txn == txn);
1629 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1630
1631 /* Remove the change from its containing list. */
1632 dlist_delete(&change->node);
1633
1634 ReorderBufferReturnChange(rb, change, true);
1635 }
1636 }
1637
1638 /*
1639 * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
1640 * memory. We could also keep the hash table and update it with new ctid
1641 * values, but this seems simpler and good enough for now.
1642 */
1643 if (txn->tuplecid_hash != NULL)
1644 {
1645 hash_destroy(txn->tuplecid_hash);
1646 txn->tuplecid_hash = NULL;
1647 }
1648
1649 /* If this txn is serialized then clean the disk space. */
1650 if (rbtxn_is_serialized(txn))
1651 {
1652 ReorderBufferRestoreCleanup(rb, txn);
1653 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1654
1655 /*
1656 * We set this flag to indicate if the transaction is ever serialized.
1657 * We need this to accurately update the stats as otherwise the same
1658 * transaction can be counted as serialized multiple times.
1659 */
1660 txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
1661 }
1662
1663 /* also reset the number of entries in the transaction */
1664 txn->nentries_mem = 0;
1665 txn->nentries = 0;
1666 }
1667
1668 /*
1669 * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1670 * HeapTupleSatisfiesHistoricMVCC.
1671 */
1672 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1673 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1674 {
1675 dlist_iter iter;
1676 HASHCTL hash_ctl;
1677
1678 if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
1679 return;
1680
1681 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1682 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1683 hash_ctl.hcxt = rb->context;
1684
1685 /*
1686 * create the hash with the exact number of to-be-stored tuplecids from
1687 * the start
1688 */
1689 txn->tuplecid_hash =
1690 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1691 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1692
1693 dlist_foreach(iter, &txn->tuplecids)
1694 {
1695 ReorderBufferTupleCidKey key;
1696 ReorderBufferTupleCidEnt *ent;
1697 bool found;
1698 ReorderBufferChange *change;
1699
1700 change = dlist_container(ReorderBufferChange, node, iter.cur);
1701
1702 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1703
1704 /* be careful about padding */
1705 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1706
1707 key.relnode = change->data.tuplecid.node;
1708
1709 ItemPointerCopy(&change->data.tuplecid.tid,
1710 &key.tid);
1711
1712 ent = (ReorderBufferTupleCidEnt *)
1713 hash_search(txn->tuplecid_hash,
1714 (void *) &key,
1715 HASH_ENTER,
1716 &found);
1717 if (!found)
1718 {
1719 ent->cmin = change->data.tuplecid.cmin;
1720 ent->cmax = change->data.tuplecid.cmax;
1721 ent->combocid = change->data.tuplecid.combocid;
1722 }
1723 else
1724 {
1725 /*
1726 * Maybe we already saw this tuple before in this transaction, but
1727 * if so it must have the same cmin.
1728 */
1729 Assert(ent->cmin == change->data.tuplecid.cmin);
1730
1731 /*
1732 * cmax may be initially invalid, but once set it can only grow,
1733 * and never become invalid again.
1734 */
1735 Assert((ent->cmax == InvalidCommandId) ||
1736 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1737 (change->data.tuplecid.cmax > ent->cmax)));
1738 ent->cmax = change->data.tuplecid.cmax;
1739 }
1740 }
1741 }
1742
1743 /*
1744 * Copy a provided snapshot so we can modify it privately. This is needed so
1745 * that catalog modifying transactions can look into intermediate catalog
1746 * states.
1747 */
1748 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1749 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1750 ReorderBufferTXN *txn, CommandId cid)
1751 {
1752 Snapshot snap;
1753 dlist_iter iter;
1754 int i = 0;
1755 Size size;
1756
1757 size = sizeof(SnapshotData) +
1758 sizeof(TransactionId) * orig_snap->xcnt +
1759 sizeof(TransactionId) * (txn->nsubtxns + 1);
1760
1761 snap = MemoryContextAllocZero(rb->context, size);
1762 memcpy(snap, orig_snap, sizeof(SnapshotData));
1763
1764 snap->copied = true;
1765 snap->active_count = 1; /* mark as active so nobody frees it */
1766 snap->regd_count = 0;
1767 snap->xip = (TransactionId *) (snap + 1);
1768
1769 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1770
1771 /*
1772 * snap->subxip contains all txids that belong to our transaction which we
1773 * need to check via cmin/cmax. That's why we store the toplevel
1774 * transaction in there as well.
1775 */
1776 snap->subxip = snap->xip + snap->xcnt;
1777 snap->subxip[i++] = txn->xid;
1778
1779 /*
1780 * subxcnt isn't decreased when subtransactions abort, so count manually.
1781 * Since it's an upper boundary it is safe to use it for the allocation
1782 * above.
1783 */
1784 snap->subxcnt = 1;
1785
1786 dlist_foreach(iter, &txn->subtxns)
1787 {
1788 ReorderBufferTXN *sub_txn;
1789
1790 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1791 snap->subxip[i++] = sub_txn->xid;
1792 snap->subxcnt++;
1793 }
1794
1795 /* sort so we can bsearch() later */
1796 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1797
1798 /* store the specified current CommandId */
1799 snap->curcid = cid;
1800
1801 return snap;
1802 }
1803
1804 /*
1805 * Free a previously ReorderBufferCopySnap'ed snapshot
1806 */
1807 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1808 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1809 {
1810 if (snap->copied)
1811 pfree(snap);
1812 else
1813 SnapBuildSnapDecRefcount(snap);
1814 }
1815
1816 /*
1817 * If the transaction was (partially) streamed, we need to prepare or commit
1818 * it in a 'streamed' way. That is, we first stream the remaining part of the
1819 * transaction, and then invoke stream_prepare or stream_commit message as per
1820 * the case.
1821 */
1822 static void
ReorderBufferStreamCommit(ReorderBuffer * rb,ReorderBufferTXN * txn)1823 ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1824 {
1825 /* we should only call this for previously streamed transactions */
1826 Assert(rbtxn_is_streamed(txn));
1827
1828 ReorderBufferStreamTXN(rb, txn);
1829
1830 if (rbtxn_prepared(txn))
1831 {
1832 /*
1833 * Note, we send stream prepare even if a concurrent abort is
1834 * detected. See DecodePrepare for more information.
1835 */
1836 rb->stream_prepare(rb, txn, txn->final_lsn);
1837
1838 /*
1839 * This is a PREPARED transaction, part of a two-phase commit. The
1840 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1841 * just truncate txn by removing changes and tuple_cids.
1842 */
1843 ReorderBufferTruncateTXN(rb, txn, true);
1844 /* Reset the CheckXidAlive */
1845 CheckXidAlive = InvalidTransactionId;
1846 }
1847 else
1848 {
1849 rb->stream_commit(rb, txn, txn->final_lsn);
1850 ReorderBufferCleanupTXN(rb, txn);
1851 }
1852 }
1853
1854 /*
1855 * Set xid to detect concurrent aborts.
1856 *
1857 * While streaming an in-progress transaction or decoding a prepared
1858 * transaction there is a possibility that the (sub)transaction might get
1859 * aborted concurrently. In such case if the (sub)transaction has catalog
1860 * update then we might decode the tuple using wrong catalog version. For
1861 * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1862 * the transaction 501 updates the catalog tuple and after that we will have
1863 * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1864 * aborted and some other transaction say 502 updates the same catalog tuple
1865 * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1866 * problem is that when we try to decode the tuple inserted/updated in 501
1867 * after the catalog update, we will see the catalog tuple with (xmin: 500,
1868 * xmax: 502) as visible because it will consider that the tuple is deleted by
1869 * xid 502 which is not visible to our snapshot. And when we will try to
1870 * decode with that catalog tuple, it can lead to a wrong result or a crash.
1871 * So, it is necessary to detect concurrent aborts to allow streaming of
1872 * in-progress transactions or decoding of prepared transactions.
1873 *
1874 * For detecting the concurrent abort we set CheckXidAlive to the current
1875 * (sub)transaction's xid for which this change belongs to. And, during
1876 * catalog scan we can check the status of the xid and if it is aborted we will
1877 * report a specific error so that we can stop streaming current transaction
1878 * and discard the already streamed changes on such an error. We might have
1879 * already streamed some of the changes for the aborted (sub)transaction, but
1880 * that is fine because when we decode the abort we will stream abort message
1881 * to truncate the changes in the subscriber. Similarly, for prepared
1882 * transactions, we stop decoding if concurrent abort is detected and then
1883 * rollback the changes when rollback prepared is encountered. See
1884 * DecodePrepare.
1885 */
1886 static inline void
SetupCheckXidLive(TransactionId xid)1887 SetupCheckXidLive(TransactionId xid)
1888 {
1889 /*
1890 * If the input transaction id is already set as a CheckXidAlive then
1891 * nothing to do.
1892 */
1893 if (TransactionIdEquals(CheckXidAlive, xid))
1894 return;
1895
1896 /*
1897 * setup CheckXidAlive if it's not committed yet. We don't check if the
1898 * xid is aborted. That will happen during catalog access.
1899 */
1900 if (!TransactionIdDidCommit(xid))
1901 CheckXidAlive = xid;
1902 else
1903 CheckXidAlive = InvalidTransactionId;
1904 }
1905
1906 /*
1907 * Helper function for ReorderBufferProcessTXN for applying change.
1908 */
1909 static inline void
ReorderBufferApplyChange(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change,bool streaming)1910 ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
1911 Relation relation, ReorderBufferChange *change,
1912 bool streaming)
1913 {
1914 if (streaming)
1915 rb->stream_change(rb, txn, relation, change);
1916 else
1917 rb->apply_change(rb, txn, relation, change);
1918 }
1919
1920 /*
1921 * Helper function for ReorderBufferProcessTXN for applying the truncate.
1922 */
1923 static inline void
ReorderBufferApplyTruncate(ReorderBuffer * rb,ReorderBufferTXN * txn,int nrelations,Relation * relations,ReorderBufferChange * change,bool streaming)1924 ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
1925 int nrelations, Relation *relations,
1926 ReorderBufferChange *change, bool streaming)
1927 {
1928 if (streaming)
1929 rb->stream_truncate(rb, txn, nrelations, relations, change);
1930 else
1931 rb->apply_truncate(rb, txn, nrelations, relations, change);
1932 }
1933
1934 /*
1935 * Helper function for ReorderBufferProcessTXN for applying the message.
1936 */
1937 static inline void
ReorderBufferApplyMessage(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferChange * change,bool streaming)1938 ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
1939 ReorderBufferChange *change, bool streaming)
1940 {
1941 if (streaming)
1942 rb->stream_message(rb, txn, change->lsn, true,
1943 change->data.msg.prefix,
1944 change->data.msg.message_size,
1945 change->data.msg.message);
1946 else
1947 rb->message(rb, txn, change->lsn, true,
1948 change->data.msg.prefix,
1949 change->data.msg.message_size,
1950 change->data.msg.message);
1951 }
1952
1953 /*
1954 * Function to store the command id and snapshot at the end of the current
1955 * stream so that we can reuse the same while sending the next stream.
1956 */
1957 static inline void
ReorderBufferSaveTXNSnapshot(ReorderBuffer * rb,ReorderBufferTXN * txn,Snapshot snapshot_now,CommandId command_id)1958 ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
1959 Snapshot snapshot_now, CommandId command_id)
1960 {
1961 txn->command_id = command_id;
1962
1963 /* Avoid copying if it's already copied. */
1964 if (snapshot_now->copied)
1965 txn->snapshot_now = snapshot_now;
1966 else
1967 txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1968 txn, command_id);
1969 }
1970
1971 /*
1972 * Helper function for ReorderBufferProcessTXN to handle the concurrent
1973 * abort of the streaming transaction. This resets the TXN such that it
1974 * can be used to stream the remaining data of transaction being processed.
1975 * This can happen when the subtransaction is aborted and we still want to
1976 * continue processing the main or other subtransactions data.
1977 */
1978 static void
ReorderBufferResetTXN(ReorderBuffer * rb,ReorderBufferTXN * txn,Snapshot snapshot_now,CommandId command_id,XLogRecPtr last_lsn,ReorderBufferChange * specinsert)1979 ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
1980 Snapshot snapshot_now,
1981 CommandId command_id,
1982 XLogRecPtr last_lsn,
1983 ReorderBufferChange *specinsert)
1984 {
1985 /* Discard the changes that we just streamed */
1986 ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
1987
1988 /* Free all resources allocated for toast reconstruction */
1989 ReorderBufferToastReset(rb, txn);
1990
1991 /* Return the spec insert change if it is not NULL */
1992 if (specinsert != NULL)
1993 {
1994 ReorderBufferReturnChange(rb, specinsert, true);
1995 specinsert = NULL;
1996 }
1997
1998 /*
1999 * For the streaming case, stop the stream and remember the command ID and
2000 * snapshot for the streaming run.
2001 */
2002 if (rbtxn_is_streamed(txn))
2003 {
2004 rb->stream_stop(rb, txn, last_lsn);
2005 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2006 }
2007 }
2008
2009 /*
2010 * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2011 *
2012 * Send data of a transaction (and its subtransactions) to the
2013 * output plugin. We iterate over the top and subtransactions (using a k-way
2014 * merge) and replay the changes in lsn order.
2015 *
2016 * If streaming is true then data will be sent using stream API.
2017 *
2018 * Note: "volatile" markers on some parameters are to avoid trouble with
2019 * PG_TRY inside the function.
2020 */
2021 static void
ReorderBufferProcessTXN(ReorderBuffer * rb,ReorderBufferTXN * txn,XLogRecPtr commit_lsn,volatile Snapshot snapshot_now,volatile CommandId command_id,bool streaming)2022 ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2023 XLogRecPtr commit_lsn,
2024 volatile Snapshot snapshot_now,
2025 volatile CommandId command_id,
2026 bool streaming)
2027 {
2028 bool using_subtxn;
2029 MemoryContext ccxt = CurrentMemoryContext;
2030 ReorderBufferIterTXNState *volatile iterstate = NULL;
2031 volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2032 ReorderBufferChange *volatile specinsert = NULL;
2033 volatile bool stream_started = false;
2034 ReorderBufferTXN *volatile curtxn = NULL;
2035
2036 /* build data to be able to lookup the CommandIds of catalog tuples */
2037 ReorderBufferBuildTupleCidHash(rb, txn);
2038
2039 /* setup the initial snapshot */
2040 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2041
2042 /*
2043 * Decoding needs access to syscaches et al., which in turn use
2044 * heavyweight locks and such. Thus we need to have enough state around to
2045 * keep track of those. The easiest way is to simply use a transaction
2046 * internally. That also allows us to easily enforce that nothing writes
2047 * to the database by checking for xid assignments.
2048 *
2049 * When we're called via the SQL SRF there's already a transaction
2050 * started, so start an explicit subtransaction there.
2051 */
2052 using_subtxn = IsTransactionOrTransactionBlock();
2053
2054 PG_TRY();
2055 {
2056 ReorderBufferChange *change;
2057
2058 if (using_subtxn)
2059 BeginInternalSubTransaction(streaming ? "stream" : "replay");
2060 else
2061 StartTransactionCommand();
2062
2063 /*
2064 * We only need to send begin/begin-prepare for non-streamed
2065 * transactions.
2066 */
2067 if (!streaming)
2068 {
2069 if (rbtxn_prepared(txn))
2070 rb->begin_prepare(rb, txn);
2071 else
2072 rb->begin(rb, txn);
2073 }
2074
2075 ReorderBufferIterTXNInit(rb, txn, &iterstate);
2076 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2077 {
2078 Relation relation = NULL;
2079 Oid reloid;
2080
2081 /*
2082 * We can't call start stream callback before processing first
2083 * change.
2084 */
2085 if (prev_lsn == InvalidXLogRecPtr)
2086 {
2087 if (streaming)
2088 {
2089 txn->origin_id = change->origin_id;
2090 rb->stream_start(rb, txn, change->lsn);
2091 stream_started = true;
2092 }
2093 }
2094
2095 /*
2096 * Enforce correct ordering of changes, merged from multiple
2097 * subtransactions. The changes may have the same LSN due to
2098 * MULTI_INSERT xlog records.
2099 */
2100 Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2101
2102 prev_lsn = change->lsn;
2103
2104 /*
2105 * Set the current xid to detect concurrent aborts. This is
2106 * required for the cases when we decode the changes before the
2107 * COMMIT record is processed.
2108 */
2109 if (streaming || rbtxn_prepared(change->txn))
2110 {
2111 curtxn = change->txn;
2112 SetupCheckXidLive(curtxn->xid);
2113 }
2114
2115 switch (change->action)
2116 {
2117 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2118
2119 /*
2120 * Confirmation for speculative insertion arrived. Simply
2121 * use as a normal record. It'll be cleaned up at the end
2122 * of INSERT processing.
2123 */
2124 if (specinsert == NULL)
2125 elog(ERROR, "invalid ordering of speculative insertion changes");
2126 Assert(specinsert->data.tp.oldtuple == NULL);
2127 change = specinsert;
2128 change->action = REORDER_BUFFER_CHANGE_INSERT;
2129
2130 /* intentionally fall through */
2131 case REORDER_BUFFER_CHANGE_INSERT:
2132 case REORDER_BUFFER_CHANGE_UPDATE:
2133 case REORDER_BUFFER_CHANGE_DELETE:
2134 Assert(snapshot_now);
2135
2136 reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2137 change->data.tp.relnode.relNode);
2138
2139 /*
2140 * Mapped catalog tuple without data, emitted while
2141 * catalog table was in the process of being rewritten. We
2142 * can fail to look up the relfilenode, because the
2143 * relmapper has no "historic" view, in contrast to the
2144 * normal catalog during decoding. Thus repeated rewrites
2145 * can cause a lookup failure. That's OK because we do not
2146 * decode catalog changes anyway. Normally such tuples
2147 * would be skipped over below, but we can't identify
2148 * whether the table should be logically logged without
2149 * mapping the relfilenode to the oid.
2150 */
2151 if (reloid == InvalidOid &&
2152 change->data.tp.newtuple == NULL &&
2153 change->data.tp.oldtuple == NULL)
2154 goto change_done;
2155 else if (reloid == InvalidOid)
2156 elog(ERROR, "could not map filenode \"%s\" to relation OID",
2157 relpathperm(change->data.tp.relnode,
2158 MAIN_FORKNUM));
2159
2160 relation = RelationIdGetRelation(reloid);
2161
2162 if (!RelationIsValid(relation))
2163 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2164 reloid,
2165 relpathperm(change->data.tp.relnode,
2166 MAIN_FORKNUM));
2167
2168 if (!RelationIsLogicallyLogged(relation))
2169 goto change_done;
2170
2171 /*
2172 * Ignore temporary heaps created during DDL unless the
2173 * plugin has asked for them.
2174 */
2175 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2176 goto change_done;
2177
2178 /*
2179 * For now ignore sequence changes entirely. Most of the
2180 * time they don't log changes using records we
2181 * understand, so it doesn't make sense to handle the few
2182 * cases we do.
2183 */
2184 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2185 goto change_done;
2186
2187 /* user-triggered change */
2188 if (!IsToastRelation(relation))
2189 {
2190 ReorderBufferToastReplace(rb, txn, relation, change);
2191 ReorderBufferApplyChange(rb, txn, relation, change,
2192 streaming);
2193
2194 /*
2195 * Only clear reassembled toast chunks if we're sure
2196 * they're not required anymore. The creator of the
2197 * tuple tells us.
2198 */
2199 if (change->data.tp.clear_toast_afterwards)
2200 ReorderBufferToastReset(rb, txn);
2201 }
2202 /* we're not interested in toast deletions */
2203 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2204 {
2205 /*
2206 * Need to reassemble the full toasted Datum in
2207 * memory, to ensure the chunks don't get reused till
2208 * we're done remove it from the list of this
2209 * transaction's changes. Otherwise it will get
2210 * freed/reused while restoring spooled data from
2211 * disk.
2212 */
2213 Assert(change->data.tp.newtuple != NULL);
2214
2215 dlist_delete(&change->node);
2216 ReorderBufferToastAppendChunk(rb, txn, relation,
2217 change);
2218 }
2219
2220 change_done:
2221
2222 /*
2223 * If speculative insertion was confirmed, the record
2224 * isn't needed anymore.
2225 */
2226 if (specinsert != NULL)
2227 {
2228 ReorderBufferReturnChange(rb, specinsert, true);
2229 specinsert = NULL;
2230 }
2231
2232 if (RelationIsValid(relation))
2233 {
2234 RelationClose(relation);
2235 relation = NULL;
2236 }
2237 break;
2238
2239 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2240
2241 /*
2242 * Speculative insertions are dealt with by delaying the
2243 * processing of the insert until the confirmation record
2244 * arrives. For that we simply unlink the record from the
2245 * chain, so it does not get freed/reused while restoring
2246 * spooled data from disk.
2247 *
2248 * This is safe in the face of concurrent catalog changes
2249 * because the relevant relation can't be changed between
2250 * speculative insertion and confirmation due to
2251 * CheckTableNotInUse() and locking.
2252 */
2253
2254 /* clear out a pending (and thus failed) speculation */
2255 if (specinsert != NULL)
2256 {
2257 ReorderBufferReturnChange(rb, specinsert, true);
2258 specinsert = NULL;
2259 }
2260
2261 /* and memorize the pending insertion */
2262 dlist_delete(&change->node);
2263 specinsert = change;
2264 break;
2265
2266 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2267
2268 /*
2269 * Abort for speculative insertion arrived. So cleanup the
2270 * specinsert tuple and toast hash.
2271 *
2272 * Note that we get the spec abort change for each toast
2273 * entry but we need to perform the cleanup only the first
2274 * time we get it for the main table.
2275 */
2276 if (specinsert != NULL)
2277 {
2278 /*
2279 * We must clean the toast hash before processing a
2280 * completely new tuple to avoid confusion about the
2281 * previous tuple's toast chunks.
2282 */
2283 Assert(change->data.tp.clear_toast_afterwards);
2284 ReorderBufferToastReset(rb, txn);
2285
2286 /* We don't need this record anymore. */
2287 ReorderBufferReturnChange(rb, specinsert, true);
2288 specinsert = NULL;
2289 }
2290 break;
2291
2292 case REORDER_BUFFER_CHANGE_TRUNCATE:
2293 {
2294 int i;
2295 int nrelids = change->data.truncate.nrelids;
2296 int nrelations = 0;
2297 Relation *relations;
2298
2299 relations = palloc0(nrelids * sizeof(Relation));
2300 for (i = 0; i < nrelids; i++)
2301 {
2302 Oid relid = change->data.truncate.relids[i];
2303 Relation relation;
2304
2305 relation = RelationIdGetRelation(relid);
2306
2307 if (!RelationIsValid(relation))
2308 elog(ERROR, "could not open relation with OID %u", relid);
2309
2310 if (!RelationIsLogicallyLogged(relation))
2311 continue;
2312
2313 relations[nrelations++] = relation;
2314 }
2315
2316 /* Apply the truncate. */
2317 ReorderBufferApplyTruncate(rb, txn, nrelations,
2318 relations, change,
2319 streaming);
2320
2321 for (i = 0; i < nrelations; i++)
2322 RelationClose(relations[i]);
2323
2324 break;
2325 }
2326
2327 case REORDER_BUFFER_CHANGE_MESSAGE:
2328 ReorderBufferApplyMessage(rb, txn, change, streaming);
2329 break;
2330
2331 case REORDER_BUFFER_CHANGE_INVALIDATION:
2332 /* Execute the invalidation messages locally */
2333 ReorderBufferExecuteInvalidations(
2334 change->data.inval.ninvalidations,
2335 change->data.inval.invalidations);
2336 break;
2337
2338 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2339 /* get rid of the old */
2340 TeardownHistoricSnapshot(false);
2341
2342 if (snapshot_now->copied)
2343 {
2344 ReorderBufferFreeSnap(rb, snapshot_now);
2345 snapshot_now =
2346 ReorderBufferCopySnap(rb, change->data.snapshot,
2347 txn, command_id);
2348 }
2349
2350 /*
2351 * Restored from disk, need to be careful not to double
2352 * free. We could introduce refcounting for that, but for
2353 * now this seems infrequent enough not to care.
2354 */
2355 else if (change->data.snapshot->copied)
2356 {
2357 snapshot_now =
2358 ReorderBufferCopySnap(rb, change->data.snapshot,
2359 txn, command_id);
2360 }
2361 else
2362 {
2363 snapshot_now = change->data.snapshot;
2364 }
2365
2366 /* and continue with the new one */
2367 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2368 break;
2369
2370 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2371 Assert(change->data.command_id != InvalidCommandId);
2372
2373 if (command_id < change->data.command_id)
2374 {
2375 command_id = change->data.command_id;
2376
2377 if (!snapshot_now->copied)
2378 {
2379 /* we don't use the global one anymore */
2380 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2381 txn, command_id);
2382 }
2383
2384 snapshot_now->curcid = command_id;
2385
2386 TeardownHistoricSnapshot(false);
2387 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2388 }
2389
2390 break;
2391
2392 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2393 elog(ERROR, "tuplecid value in changequeue");
2394 break;
2395 }
2396 }
2397
2398 /* speculative insertion record must be freed by now */
2399 Assert(!specinsert);
2400
2401 /* clean up the iterator */
2402 ReorderBufferIterTXNFinish(rb, iterstate);
2403 iterstate = NULL;
2404
2405 /*
2406 * Update total transaction count and total bytes processed by the
2407 * transaction and its subtransactions. Ensure to not count the
2408 * streamed transaction multiple times.
2409 *
2410 * Note that the statistics computation has to be done after
2411 * ReorderBufferIterTXNFinish as it releases the serialized change
2412 * which we have already accounted in ReorderBufferIterTXNNext.
2413 */
2414 if (!rbtxn_is_streamed(txn))
2415 rb->totalTxns++;
2416
2417 rb->totalBytes += txn->total_size;
2418
2419 /*
2420 * Done with current changes, send the last message for this set of
2421 * changes depending upon streaming mode.
2422 */
2423 if (streaming)
2424 {
2425 if (stream_started)
2426 {
2427 rb->stream_stop(rb, txn, prev_lsn);
2428 stream_started = false;
2429 }
2430 }
2431 else
2432 {
2433 /*
2434 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2435 * regular ones).
2436 */
2437 if (rbtxn_prepared(txn))
2438 rb->prepare(rb, txn, commit_lsn);
2439 else
2440 rb->commit(rb, txn, commit_lsn);
2441 }
2442
2443 /* this is just a sanity check against bad output plugin behaviour */
2444 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
2445 elog(ERROR, "output plugin used XID %u",
2446 GetCurrentTransactionId());
2447
2448 /*
2449 * Remember the command ID and snapshot for the next set of changes in
2450 * streaming mode.
2451 */
2452 if (streaming)
2453 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2454 else if (snapshot_now->copied)
2455 ReorderBufferFreeSnap(rb, snapshot_now);
2456
2457 /* cleanup */
2458 TeardownHistoricSnapshot(false);
2459
2460 /*
2461 * Aborting the current (sub-)transaction as a whole has the right
2462 * semantics. We want all locks acquired in here to be released, not
2463 * reassigned to the parent and we do not want any database access
2464 * have persistent effects.
2465 */
2466 AbortCurrentTransaction();
2467
2468 /* make sure there's no cache pollution */
2469 ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2470
2471 if (using_subtxn)
2472 RollbackAndReleaseCurrentSubTransaction();
2473
2474 /*
2475 * We are here due to one of the four reasons: 1. Decoding an
2476 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2477 * prepared txn that was (partially) streamed. 4. Decoding a committed
2478 * txn.
2479 *
2480 * For 1, we allow truncation of txn data by removing the changes
2481 * already streamed but still keeping other things like invalidations,
2482 * snapshot, and tuplecids. For 2 and 3, we indicate
2483 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2484 * data as the entire transaction has been decoded except for commit.
2485 * For 4, as the entire txn has been decoded, we can fully clean up
2486 * the TXN reorder buffer.
2487 */
2488 if (streaming || rbtxn_prepared(txn))
2489 {
2490 ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
2491 /* Reset the CheckXidAlive */
2492 CheckXidAlive = InvalidTransactionId;
2493 }
2494 else
2495 ReorderBufferCleanupTXN(rb, txn);
2496 }
2497 PG_CATCH();
2498 {
2499 MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2500 ErrorData *errdata = CopyErrorData();
2501
2502 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2503 if (iterstate)
2504 ReorderBufferIterTXNFinish(rb, iterstate);
2505
2506 TeardownHistoricSnapshot(true);
2507
2508 /*
2509 * Force cache invalidation to happen outside of a valid transaction
2510 * to prevent catalog access as we just caught an error.
2511 */
2512 AbortCurrentTransaction();
2513
2514 /* make sure there's no cache pollution */
2515 ReorderBufferExecuteInvalidations(txn->ninvalidations,
2516 txn->invalidations);
2517
2518 if (using_subtxn)
2519 RollbackAndReleaseCurrentSubTransaction();
2520
2521 /*
2522 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2523 * abort of the (sub)transaction we are streaming or preparing. We
2524 * need to do the cleanup and return gracefully on this error, see
2525 * SetupCheckXidLive.
2526 *
2527 * This error code can be thrown by one of the callbacks we call
2528 * during decoding so we need to ensure that we return gracefully only
2529 * when we are sending the data in streaming mode and the streaming is
2530 * not finished yet or when we are sending the data out on a PREPARE
2531 * during a two-phase commit.
2532 */
2533 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2534 (stream_started || rbtxn_prepared(txn)))
2535 {
2536 /* curtxn must be set for streaming or prepared transactions */
2537 Assert(curtxn);
2538
2539 /* Cleanup the temporary error state. */
2540 FlushErrorState();
2541 FreeErrorData(errdata);
2542 errdata = NULL;
2543 curtxn->concurrent_abort = true;
2544
2545 /* Reset the TXN so that it is allowed to stream remaining data. */
2546 ReorderBufferResetTXN(rb, txn, snapshot_now,
2547 command_id, prev_lsn,
2548 specinsert);
2549 }
2550 else
2551 {
2552 ReorderBufferCleanupTXN(rb, txn);
2553 MemoryContextSwitchTo(ecxt);
2554 PG_RE_THROW();
2555 }
2556 }
2557 PG_END_TRY();
2558 }
2559
2560 /*
2561 * Perform the replay of a transaction and its non-aborted subtransactions.
2562 *
2563 * Subtransactions previously have to be processed by
2564 * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2565 * transaction with ReorderBufferAssignChild.
2566 *
2567 * This interface is called once a prepare or toplevel commit is read for both
2568 * streamed as well as non-streamed transactions.
2569 */
2570 static void
ReorderBufferReplay(ReorderBufferTXN * txn,ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)2571 ReorderBufferReplay(ReorderBufferTXN *txn,
2572 ReorderBuffer *rb, TransactionId xid,
2573 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2574 TimestampTz commit_time,
2575 RepOriginId origin_id, XLogRecPtr origin_lsn)
2576 {
2577 Snapshot snapshot_now;
2578 CommandId command_id = FirstCommandId;
2579
2580 txn->final_lsn = commit_lsn;
2581 txn->end_lsn = end_lsn;
2582 txn->commit_time = commit_time;
2583 txn->origin_id = origin_id;
2584 txn->origin_lsn = origin_lsn;
2585
2586 /*
2587 * If the transaction was (partially) streamed, we need to commit it in a
2588 * 'streamed' way. That is, we first stream the remaining part of the
2589 * transaction, and then invoke stream_commit message.
2590 *
2591 * Called after everything (origin ID, LSN, ...) is stored in the
2592 * transaction to avoid passing that information directly.
2593 */
2594 if (rbtxn_is_streamed(txn))
2595 {
2596 ReorderBufferStreamCommit(rb, txn);
2597 return;
2598 }
2599
2600 /*
2601 * If this transaction has no snapshot, it didn't make any changes to the
2602 * database, so there's nothing to decode. Note that
2603 * ReorderBufferCommitChild will have transferred any snapshots from
2604 * subtransactions if there were any.
2605 */
2606 if (txn->base_snapshot == NULL)
2607 {
2608 Assert(txn->ninvalidations == 0);
2609
2610 /*
2611 * Removing this txn before a commit might result in the computation
2612 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2613 */
2614 if (!rbtxn_prepared(txn))
2615 ReorderBufferCleanupTXN(rb, txn);
2616 return;
2617 }
2618
2619 snapshot_now = txn->base_snapshot;
2620
2621 /* Process and send the changes to output plugin. */
2622 ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2623 command_id, false);
2624 }
2625
2626 /*
2627 * Commit a transaction.
2628 *
2629 * See comments for ReorderBufferReplay().
2630 */
2631 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)2632 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
2633 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2634 TimestampTz commit_time,
2635 RepOriginId origin_id, XLogRecPtr origin_lsn)
2636 {
2637 ReorderBufferTXN *txn;
2638
2639 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2640 false);
2641
2642 /* unknown transaction, nothing to replay */
2643 if (txn == NULL)
2644 return;
2645
2646 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2647 origin_id, origin_lsn);
2648 }
2649
2650 /*
2651 * Record the prepare information for a transaction.
2652 */
2653 bool
ReorderBufferRememberPrepareInfo(ReorderBuffer * rb,TransactionId xid,XLogRecPtr prepare_lsn,XLogRecPtr end_lsn,TimestampTz prepare_time,RepOriginId origin_id,XLogRecPtr origin_lsn)2654 ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
2655 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2656 TimestampTz prepare_time,
2657 RepOriginId origin_id, XLogRecPtr origin_lsn)
2658 {
2659 ReorderBufferTXN *txn;
2660
2661 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2662
2663 /* unknown transaction, nothing to do */
2664 if (txn == NULL)
2665 return false;
2666
2667 /*
2668 * Remember the prepare information to be later used by commit prepared in
2669 * case we skip doing prepare.
2670 */
2671 txn->final_lsn = prepare_lsn;
2672 txn->end_lsn = end_lsn;
2673 txn->commit_time = prepare_time;
2674 txn->origin_id = origin_id;
2675 txn->origin_lsn = origin_lsn;
2676
2677 return true;
2678 }
2679
2680 /* Remember that we have skipped prepare */
2681 void
ReorderBufferSkipPrepare(ReorderBuffer * rb,TransactionId xid)2682 ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
2683 {
2684 ReorderBufferTXN *txn;
2685
2686 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2687
2688 /* unknown transaction, nothing to do */
2689 if (txn == NULL)
2690 return;
2691
2692 txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
2693 }
2694
2695 /*
2696 * Prepare a two-phase transaction.
2697 *
2698 * See comments for ReorderBufferReplay().
2699 */
2700 void
ReorderBufferPrepare(ReorderBuffer * rb,TransactionId xid,char * gid)2701 ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
2702 char *gid)
2703 {
2704 ReorderBufferTXN *txn;
2705
2706 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2707 false);
2708
2709 /* unknown transaction, nothing to replay */
2710 if (txn == NULL)
2711 return;
2712
2713 txn->txn_flags |= RBTXN_PREPARE;
2714 txn->gid = pstrdup(gid);
2715
2716 /* The prepare info must have been updated in txn by now. */
2717 Assert(txn->final_lsn != InvalidXLogRecPtr);
2718
2719 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2720 txn->commit_time, txn->origin_id, txn->origin_lsn);
2721
2722 /*
2723 * We send the prepare for the concurrently aborted xacts so that later
2724 * when rollback prepared is decoded and sent, the downstream should be
2725 * able to rollback such a xact. See comments atop DecodePrepare.
2726 *
2727 * Note, for the concurrent_abort + streaming case a stream_prepare was
2728 * already sent within the ReorderBufferReplay call above.
2729 */
2730 if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2731 rb->prepare(rb, txn, txn->final_lsn);
2732 }
2733
2734 /*
2735 * This is used to handle COMMIT/ROLLBACK PREPARED.
2736 */
2737 void
ReorderBufferFinishPrepared(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,XLogRecPtr initial_consistent_point,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn,char * gid,bool is_commit)2738 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
2739 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2740 XLogRecPtr initial_consistent_point,
2741 TimestampTz commit_time, RepOriginId origin_id,
2742 XLogRecPtr origin_lsn, char *gid, bool is_commit)
2743 {
2744 ReorderBufferTXN *txn;
2745 XLogRecPtr prepare_end_lsn;
2746 TimestampTz prepare_time;
2747
2748 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2749
2750 /* unknown transaction, nothing to do */
2751 if (txn == NULL)
2752 return;
2753
2754 /*
2755 * By this time the txn has the prepare record information, remember it to
2756 * be later used for rollback.
2757 */
2758 prepare_end_lsn = txn->end_lsn;
2759 prepare_time = txn->commit_time;
2760
2761 /* add the gid in the txn */
2762 txn->gid = pstrdup(gid);
2763
2764 /*
2765 * It is possible that this transaction is not decoded at prepare time
2766 * either because by that time we didn't have a consistent snapshot or it
2767 * was decoded earlier but we have restarted. We only need to send the
2768 * prepare if it was not decoded earlier. We don't need to decode the xact
2769 * for aborts if it is not done already.
2770 */
2771 if ((txn->final_lsn < initial_consistent_point) && is_commit)
2772 {
2773 txn->txn_flags |= RBTXN_PREPARE;
2774
2775 /*
2776 * The prepare info must have been updated in txn even if we skip
2777 * prepare.
2778 */
2779 Assert(txn->final_lsn != InvalidXLogRecPtr);
2780
2781 /*
2782 * By this time the txn has the prepare record information and it is
2783 * important to use that so that downstream gets the accurate
2784 * information. If instead, we have passed commit information here
2785 * then downstream can behave as it has already replayed commit
2786 * prepared after the restart.
2787 */
2788 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2789 txn->commit_time, txn->origin_id, txn->origin_lsn);
2790 }
2791
2792 txn->final_lsn = commit_lsn;
2793 txn->end_lsn = end_lsn;
2794 txn->commit_time = commit_time;
2795 txn->origin_id = origin_id;
2796 txn->origin_lsn = origin_lsn;
2797
2798 if (is_commit)
2799 rb->commit_prepared(rb, txn, commit_lsn);
2800 else
2801 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2802
2803 /* cleanup: make sure there's no cache pollution */
2804 ReorderBufferExecuteInvalidations(txn->ninvalidations,
2805 txn->invalidations);
2806 ReorderBufferCleanupTXN(rb, txn);
2807 }
2808
2809 /*
2810 * Abort a transaction that possibly has previous changes. Needs to be first
2811 * called for subtransactions and then for the toplevel xid.
2812 *
2813 * NB: Transactions handled here have to have actively aborted (i.e. have
2814 * produced an abort record). Implicitly aborted transactions are handled via
2815 * ReorderBufferAbortOld(); transactions we're just not interested in, but
2816 * which have committed are handled in ReorderBufferForget().
2817 *
2818 * This function purges this transaction and its contents from memory and
2819 * disk.
2820 */
2821 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2822 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2823 {
2824 ReorderBufferTXN *txn;
2825
2826 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2827 false);
2828
2829 /* unknown, nothing to remove */
2830 if (txn == NULL)
2831 return;
2832
2833 /* For streamed transactions notify the remote node about the abort. */
2834 if (rbtxn_is_streamed(txn))
2835 {
2836 rb->stream_abort(rb, txn, lsn);
2837
2838 /*
2839 * We might have decoded changes for this transaction that could load
2840 * the cache as per the current transaction's view (consider DDL's
2841 * happened in this transaction). We don't want the decoding of future
2842 * transactions to use those cache entries so execute invalidations.
2843 */
2844 if (txn->ninvalidations > 0)
2845 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2846 txn->invalidations);
2847 }
2848
2849 /* cosmetic... */
2850 txn->final_lsn = lsn;
2851
2852 /* remove potential on-disk data, and deallocate */
2853 ReorderBufferCleanupTXN(rb, txn);
2854 }
2855
2856 /*
2857 * Abort all transactions that aren't actually running anymore because the
2858 * server restarted.
2859 *
2860 * NB: These really have to be transactions that have aborted due to a server
2861 * crash/immediate restart, as we don't deal with invalidations here.
2862 */
2863 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)2864 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
2865 {
2866 dlist_mutable_iter it;
2867
2868 /*
2869 * Iterate through all (potential) toplevel TXNs and abort all that are
2870 * older than what possibly can be running. Once we've found the first
2871 * that is alive we stop, there might be some that acquired an xid earlier
2872 * but started writing later, but it's unlikely and they will be cleaned
2873 * up in a later call to this function.
2874 */
2875 dlist_foreach_modify(it, &rb->toplevel_by_lsn)
2876 {
2877 ReorderBufferTXN *txn;
2878
2879 txn = dlist_container(ReorderBufferTXN, node, it.cur);
2880
2881 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2882 {
2883 elog(DEBUG2, "aborting old transaction %u", txn->xid);
2884
2885 /* remove potential on-disk data, and deallocate this tx */
2886 ReorderBufferCleanupTXN(rb, txn);
2887 }
2888 else
2889 return;
2890 }
2891 }
2892
2893 /*
2894 * Forget the contents of a transaction if we aren't interested in its
2895 * contents. Needs to be first called for subtransactions and then for the
2896 * toplevel xid.
2897 *
2898 * This is significantly different to ReorderBufferAbort() because
2899 * transactions that have committed need to be treated differently from aborted
2900 * ones since they may have modified the catalog.
2901 *
2902 * Note that this is only allowed to be called in the moment a transaction
2903 * commit has just been read, not earlier; otherwise later records referring
2904 * to this xid might re-create the transaction incompletely.
2905 */
2906 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2907 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2908 {
2909 ReorderBufferTXN *txn;
2910
2911 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2912 false);
2913
2914 /* unknown, nothing to forget */
2915 if (txn == NULL)
2916 return;
2917
2918 /* For streamed transactions notify the remote node about the abort. */
2919 if (rbtxn_is_streamed(txn))
2920 rb->stream_abort(rb, txn, lsn);
2921
2922 /* cosmetic... */
2923 txn->final_lsn = lsn;
2924
2925 /*
2926 * Process cache invalidation messages if there are any. Even if we're not
2927 * interested in the transaction's contents, it could have manipulated the
2928 * catalog and we need to update the caches according to that.
2929 */
2930 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2931 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2932 txn->invalidations);
2933 else
2934 Assert(txn->ninvalidations == 0);
2935
2936 /* remove potential on-disk data, and deallocate */
2937 ReorderBufferCleanupTXN(rb, txn);
2938 }
2939
2940 /*
2941 * Invalidate cache for those transactions that need to be skipped just in case
2942 * catalogs were manipulated as part of the transaction.
2943 *
2944 * Note that this is a special-purpose function for prepared transactions where
2945 * we don't want to clean up the TXN even when we decide to skip it. See
2946 * DecodePrepare.
2947 */
2948 void
ReorderBufferInvalidate(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2949 ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2950 {
2951 ReorderBufferTXN *txn;
2952
2953 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2954 false);
2955
2956 /* unknown, nothing to do */
2957 if (txn == NULL)
2958 return;
2959
2960 /*
2961 * Process cache invalidation messages if there are any. Even if we're not
2962 * interested in the transaction's contents, it could have manipulated the
2963 * catalog and we need to update the caches according to that.
2964 */
2965 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2966 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2967 txn->invalidations);
2968 else
2969 Assert(txn->ninvalidations == 0);
2970 }
2971
2972
2973 /*
2974 * Execute invalidations happening outside the context of a decoded
2975 * transaction. That currently happens either for xid-less commits
2976 * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2977 * transactions (via ReorderBufferForget()).
2978 */
2979 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)2980 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
2981 SharedInvalidationMessage *invalidations)
2982 {
2983 bool use_subtxn = IsTransactionOrTransactionBlock();
2984 int i;
2985
2986 if (use_subtxn)
2987 BeginInternalSubTransaction("replay");
2988
2989 /*
2990 * Force invalidations to happen outside of a valid transaction - that way
2991 * entries will just be marked as invalid without accessing the catalog.
2992 * That's advantageous because we don't need to setup the full state
2993 * necessary for catalog access.
2994 */
2995 if (use_subtxn)
2996 AbortCurrentTransaction();
2997
2998 for (i = 0; i < ninvalidations; i++)
2999 LocalExecuteInvalidationMessage(&invalidations[i]);
3000
3001 if (use_subtxn)
3002 RollbackAndReleaseCurrentSubTransaction();
3003 }
3004
3005 /*
3006 * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3007 * least once for every xid in XLogRecord->xl_xid (other places in records
3008 * may, but do not have to be passed through here).
3009 *
3010 * Reorderbuffer keeps some datastructures about transactions in LSN order,
3011 * for efficiency. To do that it has to know about when transactions are seen
3012 * first in the WAL. As many types of records are not actually interesting for
3013 * logical decoding, they do not necessarily pass though here.
3014 */
3015 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)3016 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3017 {
3018 /* many records won't have an xid assigned, centralize check here */
3019 if (xid != InvalidTransactionId)
3020 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3021 }
3022
3023 /*
3024 * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3025 * because the previous snapshot doesn't describe the catalog correctly for
3026 * following rows.
3027 */
3028 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)3029 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
3030 XLogRecPtr lsn, Snapshot snap)
3031 {
3032 ReorderBufferChange *change = ReorderBufferGetChange(rb);
3033
3034 change->data.snapshot = snap;
3035 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
3036
3037 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3038 }
3039
3040 /*
3041 * Set up the transaction's base snapshot.
3042 *
3043 * If we know that xid is a subtransaction, set the base snapshot on the
3044 * top-level transaction instead.
3045 */
3046 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)3047 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
3048 XLogRecPtr lsn, Snapshot snap)
3049 {
3050 ReorderBufferTXN *txn;
3051 bool is_new;
3052
3053 AssertArg(snap != NULL);
3054
3055 /*
3056 * Fetch the transaction to operate on. If we know it's a subtransaction,
3057 * operate on its top-level transaction instead.
3058 */
3059 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3060 if (rbtxn_is_known_subxact(txn))
3061 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3062 NULL, InvalidXLogRecPtr, false);
3063 Assert(txn->base_snapshot == NULL);
3064
3065 txn->base_snapshot = snap;
3066 txn->base_snapshot_lsn = lsn;
3067 dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
3068
3069 AssertTXNLsnOrder(rb);
3070 }
3071
3072 /*
3073 * Access the catalog with this CommandId at this point in the changestream.
3074 *
3075 * May only be called for command ids > 1
3076 */
3077 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)3078 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
3079 XLogRecPtr lsn, CommandId cid)
3080 {
3081 ReorderBufferChange *change = ReorderBufferGetChange(rb);
3082
3083 change->data.command_id = cid;
3084 change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
3085
3086 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3087 }
3088
3089 /*
3090 * Update memory counters to account for the new or removed change.
3091 *
3092 * We update two counters - in the reorder buffer, and in the transaction
3093 * containing the change. The reorder buffer counter allows us to quickly
3094 * decide if we reached the memory limit, the transaction counter allows
3095 * us to quickly pick the largest transaction for eviction.
3096 *
3097 * When streaming is enabled, we need to update the toplevel transaction
3098 * counters instead - we don't really care about subtransactions as we
3099 * can't stream them individually anyway, and we only pick toplevel
3100 * transactions for eviction. So only toplevel transactions matter.
3101 */
3102 static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer * rb,ReorderBufferChange * change,bool addition,Size sz)3103 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3104 ReorderBufferChange *change,
3105 bool addition, Size sz)
3106 {
3107 ReorderBufferTXN *txn;
3108 ReorderBufferTXN *toptxn;
3109
3110 Assert(change->txn);
3111
3112 /*
3113 * Ignore tuple CID changes, because those are not evicted when reaching
3114 * memory limit. So we just don't count them, because it might easily
3115 * trigger a pointless attempt to spill.
3116 */
3117 if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3118 return;
3119
3120 txn = change->txn;
3121
3122 /*
3123 * Update the total size in top level as well. This is later used to
3124 * compute the decoding stats.
3125 */
3126 if (txn->toptxn != NULL)
3127 toptxn = txn->toptxn;
3128 else
3129 toptxn = txn;
3130
3131 if (addition)
3132 {
3133 txn->size += sz;
3134 rb->size += sz;
3135
3136 /* Update the total size in the top transaction. */
3137 toptxn->total_size += sz;
3138 }
3139 else
3140 {
3141 Assert((rb->size >= sz) && (txn->size >= sz));
3142 txn->size -= sz;
3143 rb->size -= sz;
3144
3145 /* Update the total size in the top transaction. */
3146 toptxn->total_size -= sz;
3147 }
3148
3149 Assert(txn->size <= rb->size);
3150 }
3151
3152 /*
3153 * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
3154 *
3155 * We do not include this change type in memory accounting, because we
3156 * keep CIDs in a separate list and do not evict them when reaching
3157 * the memory limit.
3158 */
3159 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)3160 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
3161 XLogRecPtr lsn, RelFileNode node,
3162 ItemPointerData tid, CommandId cmin,
3163 CommandId cmax, CommandId combocid)
3164 {
3165 ReorderBufferChange *change = ReorderBufferGetChange(rb);
3166 ReorderBufferTXN *txn;
3167
3168 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3169
3170 change->data.tuplecid.node = node;
3171 change->data.tuplecid.tid = tid;
3172 change->data.tuplecid.cmin = cmin;
3173 change->data.tuplecid.cmax = cmax;
3174 change->data.tuplecid.combocid = combocid;
3175 change->lsn = lsn;
3176 change->txn = txn;
3177 change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
3178
3179 dlist_push_tail(&txn->tuplecids, &change->node);
3180 txn->ntuplecids++;
3181 }
3182
3183 /*
3184 * Setup the invalidation of the toplevel transaction.
3185 *
3186 * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3187 * accumulates all the invalidation messages in the toplevel transaction as
3188 * well as in the form of change in reorder buffer. We require to record it in
3189 * form of the change so that we can execute only the required invalidations
3190 * instead of executing all the invalidations on each CommandId increment. We
3191 * also need to accumulate these in the toplevel transaction because in some
3192 * cases we skip processing the transaction (see ReorderBufferForget), we need
3193 * to execute all the invalidations together.
3194 */
3195 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)3196 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
3197 XLogRecPtr lsn, Size nmsgs,
3198 SharedInvalidationMessage *msgs)
3199 {
3200 ReorderBufferTXN *txn;
3201 MemoryContext oldcontext;
3202 ReorderBufferChange *change;
3203
3204 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3205
3206 oldcontext = MemoryContextSwitchTo(rb->context);
3207
3208 /*
3209 * Collect all the invalidations under the top transaction so that we can
3210 * execute them all together. See comment atop this function
3211 */
3212 if (txn->toptxn)
3213 txn = txn->toptxn;
3214
3215 Assert(nmsgs > 0);
3216
3217 /* Accumulate invalidations. */
3218 if (txn->ninvalidations == 0)
3219 {
3220 txn->ninvalidations = nmsgs;
3221 txn->invalidations = (SharedInvalidationMessage *)
3222 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3223 memcpy(txn->invalidations, msgs,
3224 sizeof(SharedInvalidationMessage) * nmsgs);
3225 }
3226 else
3227 {
3228 txn->invalidations = (SharedInvalidationMessage *)
3229 repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
3230 (txn->ninvalidations + nmsgs));
3231
3232 memcpy(txn->invalidations + txn->ninvalidations, msgs,
3233 nmsgs * sizeof(SharedInvalidationMessage));
3234 txn->ninvalidations += nmsgs;
3235 }
3236
3237 change = ReorderBufferGetChange(rb);
3238 change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
3239 change->data.inval.ninvalidations = nmsgs;
3240 change->data.inval.invalidations = (SharedInvalidationMessage *)
3241 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3242 memcpy(change->data.inval.invalidations, msgs,
3243 sizeof(SharedInvalidationMessage) * nmsgs);
3244
3245 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3246
3247 MemoryContextSwitchTo(oldcontext);
3248 }
3249
3250 /*
3251 * Apply all invalidations we know. Possibly we only need parts at this point
3252 * in the changestream but we don't know which those are.
3253 */
3254 static void
ReorderBufferExecuteInvalidations(uint32 nmsgs,SharedInvalidationMessage * msgs)3255 ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
3256 {
3257 int i;
3258
3259 for (i = 0; i < nmsgs; i++)
3260 LocalExecuteInvalidationMessage(&msgs[i]);
3261 }
3262
3263 /*
3264 * Mark a transaction as containing catalog changes
3265 */
3266 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)3267 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
3268 XLogRecPtr lsn)
3269 {
3270 ReorderBufferTXN *txn;
3271
3272 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3273
3274 txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3275
3276 /*
3277 * Mark top-level transaction as having catalog changes too if one of its
3278 * children has so that the ReorderBufferBuildTupleCidHash can
3279 * conveniently check just top-level transaction and decide whether to
3280 * build the hash table or not.
3281 */
3282 if (txn->toptxn != NULL)
3283 txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3284 }
3285
3286 /*
3287 * Query whether a transaction is already *known* to contain catalog
3288 * changes. This can be wrong until directly before the commit!
3289 */
3290 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)3291 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
3292 {
3293 ReorderBufferTXN *txn;
3294
3295 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3296 false);
3297 if (txn == NULL)
3298 return false;
3299
3300 return rbtxn_has_catalog_changes(txn);
3301 }
3302
3303 /*
3304 * ReorderBufferXidHasBaseSnapshot
3305 * Have we already set the base snapshot for the given txn/subtxn?
3306 */
3307 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)3308 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
3309 {
3310 ReorderBufferTXN *txn;
3311
3312 txn = ReorderBufferTXNByXid(rb, xid, false,
3313 NULL, InvalidXLogRecPtr, false);
3314
3315 /* transaction isn't known yet, ergo no snapshot */
3316 if (txn == NULL)
3317 return false;
3318
3319 /* a known subtxn? operate on top-level txn instead */
3320 if (rbtxn_is_known_subxact(txn))
3321 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3322 NULL, InvalidXLogRecPtr, false);
3323
3324 return txn->base_snapshot != NULL;
3325 }
3326
3327
3328 /*
3329 * ---------------------------------------
3330 * Disk serialization support
3331 * ---------------------------------------
3332 */
3333
3334 /*
3335 * Ensure the IO buffer is >= sz.
3336 */
3337 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)3338 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
3339 {
3340 if (!rb->outbufsize)
3341 {
3342 rb->outbuf = MemoryContextAlloc(rb->context, sz);
3343 rb->outbufsize = sz;
3344 }
3345 else if (rb->outbufsize < sz)
3346 {
3347 rb->outbuf = repalloc(rb->outbuf, sz);
3348 rb->outbufsize = sz;
3349 }
3350 }
3351
3352 /*
3353 * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3354 *
3355 * XXX With many subtransactions this might be quite slow, because we'll have
3356 * to walk through all of them. There are some options how we could improve
3357 * that: (a) maintain some secondary structure with transactions sorted by
3358 * amount of changes, (b) not looking for the entirely largest transaction,
3359 * but e.g. for transaction using at least some fraction of the memory limit,
3360 * and (c) evicting multiple transactions at once, e.g. to free a given portion
3361 * of the memory limit (e.g. 50%).
3362 */
3363 static ReorderBufferTXN *
ReorderBufferLargestTXN(ReorderBuffer * rb)3364 ReorderBufferLargestTXN(ReorderBuffer *rb)
3365 {
3366 HASH_SEQ_STATUS hash_seq;
3367 ReorderBufferTXNByIdEnt *ent;
3368 ReorderBufferTXN *largest = NULL;
3369
3370 hash_seq_init(&hash_seq, rb->by_txn);
3371 while ((ent = hash_seq_search(&hash_seq)) != NULL)
3372 {
3373 ReorderBufferTXN *txn = ent->txn;
3374
3375 /* if the current transaction is larger, remember it */
3376 if ((!largest) || (txn->size > largest->size))
3377 largest = txn;
3378 }
3379
3380 Assert(largest);
3381 Assert(largest->size > 0);
3382 Assert(largest->size <= rb->size);
3383
3384 return largest;
3385 }
3386
3387 /*
3388 * Find the largest toplevel transaction to evict (by streaming).
3389 *
3390 * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3391 * should give us the same transaction (because we don't update memory account
3392 * for subtransaction with streaming, so it's always 0). But we can simply
3393 * iterate over the limited number of toplevel transactions that have a base
3394 * snapshot. There is no use of selecting a transaction that doesn't have base
3395 * snapshot because we don't decode such transactions.
3396 *
3397 * Note that, we skip transactions that contains incomplete changes. There
3398 * is a scope of optimization here such that we can select the largest
3399 * transaction which has incomplete changes. But that will make the code and
3400 * design quite complex and that might not be worth the benefit. If we plan to
3401 * stream the transactions that contains incomplete changes then we need to
3402 * find a way to partially stream/truncate the transaction changes in-memory
3403 * and build a mechanism to partially truncate the spilled files.
3404 * Additionally, whenever we partially stream the transaction we need to
3405 * maintain the last streamed lsn and next time we need to restore from that
3406 * segment and the offset in WAL. As we stream the changes from the top
3407 * transaction and restore them subtransaction wise, we need to even remember
3408 * the subxact from where we streamed the last change.
3409 */
3410 static ReorderBufferTXN *
ReorderBufferLargestTopTXN(ReorderBuffer * rb)3411 ReorderBufferLargestTopTXN(ReorderBuffer *rb)
3412 {
3413 dlist_iter iter;
3414 Size largest_size = 0;
3415 ReorderBufferTXN *largest = NULL;
3416
3417 /* Find the largest top-level transaction having a base snapshot. */
3418 dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
3419 {
3420 ReorderBufferTXN *txn;
3421
3422 txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3423
3424 /* must not be a subtxn */
3425 Assert(!rbtxn_is_known_subxact(txn));
3426 /* base_snapshot must be set */
3427 Assert(txn->base_snapshot != NULL);
3428
3429 if ((largest == NULL || txn->total_size > largest_size) &&
3430 (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3431 {
3432 largest = txn;
3433 largest_size = txn->total_size;
3434 }
3435 }
3436
3437 return largest;
3438 }
3439
3440 /*
3441 * Check whether the logical_decoding_work_mem limit was reached, and if yes
3442 * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3443 * disk until we reach under the memory limit.
3444 *
3445 * XXX At this point we select the transactions until we reach under the memory
3446 * limit, but we might also adapt a more elaborate eviction strategy - for example
3447 * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3448 * limit.
3449 */
3450 static void
ReorderBufferCheckMemoryLimit(ReorderBuffer * rb)3451 ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
3452 {
3453 ReorderBufferTXN *txn;
3454
3455 /* bail out if we haven't exceeded the memory limit */
3456 if (rb->size < logical_decoding_work_mem * 1024L)
3457 return;
3458
3459 /*
3460 * Loop until we reach under the memory limit. One might think that just
3461 * by evicting the largest (sub)transaction we will come under the memory
3462 * limit based on assumption that the selected transaction is at least as
3463 * large as the most recent change (which caused us to go over the memory
3464 * limit). However, that is not true because a user can reduce the
3465 * logical_decoding_work_mem to a smaller value before the most recent
3466 * change.
3467 */
3468 while (rb->size >= logical_decoding_work_mem * 1024L)
3469 {
3470 /*
3471 * Pick the largest transaction (or subtransaction) and evict it from
3472 * memory by streaming, if possible. Otherwise, spill to disk.
3473 */
3474 if (ReorderBufferCanStartStreaming(rb) &&
3475 (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3476 {
3477 /* we know there has to be one, because the size is not zero */
3478 Assert(txn && !txn->toptxn);
3479 Assert(txn->total_size > 0);
3480 Assert(rb->size >= txn->total_size);
3481
3482 ReorderBufferStreamTXN(rb, txn);
3483 }
3484 else
3485 {
3486 /*
3487 * Pick the largest transaction (or subtransaction) and evict it
3488 * from memory by serializing it to disk.
3489 */
3490 txn = ReorderBufferLargestTXN(rb);
3491
3492 /* we know there has to be one, because the size is not zero */
3493 Assert(txn);
3494 Assert(txn->size > 0);
3495 Assert(rb->size >= txn->size);
3496
3497 ReorderBufferSerializeTXN(rb, txn);
3498 }
3499
3500 /*
3501 * After eviction, the transaction should have no entries in memory,
3502 * and should use 0 bytes for changes.
3503 */
3504 Assert(txn->size == 0);
3505 Assert(txn->nentries_mem == 0);
3506 }
3507
3508 /* We must be under the memory limit now. */
3509 Assert(rb->size < logical_decoding_work_mem * 1024L);
3510 }
3511
3512 /*
3513 * Spill data of a large transaction (and its subtransactions) to disk.
3514 */
3515 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)3516 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
3517 {
3518 dlist_iter subtxn_i;
3519 dlist_mutable_iter change_i;
3520 int fd = -1;
3521 XLogSegNo curOpenSegNo = 0;
3522 Size spilled = 0;
3523 Size size = txn->size;
3524
3525 elog(DEBUG2, "spill %u changes in XID %u to disk",
3526 (uint32) txn->nentries_mem, txn->xid);
3527
3528 /* do the same to all child TXs */
3529 dlist_foreach(subtxn_i, &txn->subtxns)
3530 {
3531 ReorderBufferTXN *subtxn;
3532
3533 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3534 ReorderBufferSerializeTXN(rb, subtxn);
3535 }
3536
3537 /* serialize changestream */
3538 dlist_foreach_modify(change_i, &txn->changes)
3539 {
3540 ReorderBufferChange *change;
3541
3542 change = dlist_container(ReorderBufferChange, node, change_i.cur);
3543
3544 /*
3545 * store in segment in which it belongs by start lsn, don't split over
3546 * multiple segments tho
3547 */
3548 if (fd == -1 ||
3549 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3550 {
3551 char path[MAXPGPATH];
3552
3553 if (fd != -1)
3554 CloseTransientFile(fd);
3555
3556 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3557
3558 /*
3559 * No need to care about TLIs here, only used during a single run,
3560 * so each LSN only maps to a specific WAL record.
3561 */
3562 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
3563 curOpenSegNo);
3564
3565 /* open segment, create it if necessary */
3566 fd = OpenTransientFile(path,
3567 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3568
3569 if (fd < 0)
3570 ereport(ERROR,
3571 (errcode_for_file_access(),
3572 errmsg("could not open file \"%s\": %m", path)));
3573 }
3574
3575 ReorderBufferSerializeChange(rb, txn, fd, change);
3576 dlist_delete(&change->node);
3577 ReorderBufferReturnChange(rb, change, true);
3578
3579 spilled++;
3580 }
3581
3582 /* update the statistics iff we have spilled anything */
3583 if (spilled)
3584 {
3585 rb->spillCount += 1;
3586 rb->spillBytes += size;
3587
3588 /* don't consider already serialized transactions */
3589 rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3590
3591 /* update the decoding stats */
3592 UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3593 }
3594
3595 Assert(spilled == txn->nentries_mem);
3596 Assert(dlist_is_empty(&txn->changes));
3597 txn->nentries_mem = 0;
3598 txn->txn_flags |= RBTXN_IS_SERIALIZED;
3599
3600 if (fd != -1)
3601 CloseTransientFile(fd);
3602 }
3603
3604 /*
3605 * Serialize individual change to disk.
3606 */
3607 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)3608 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
3609 int fd, ReorderBufferChange *change)
3610 {
3611 ReorderBufferDiskChange *ondisk;
3612 Size sz = sizeof(ReorderBufferDiskChange);
3613
3614 ReorderBufferSerializeReserve(rb, sz);
3615
3616 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3617 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3618
3619 switch (change->action)
3620 {
3621 /* fall through these, they're all similar enough */
3622 case REORDER_BUFFER_CHANGE_INSERT:
3623 case REORDER_BUFFER_CHANGE_UPDATE:
3624 case REORDER_BUFFER_CHANGE_DELETE:
3625 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
3626 {
3627 char *data;
3628 ReorderBufferTupleBuf *oldtup,
3629 *newtup;
3630 Size oldlen = 0;
3631 Size newlen = 0;
3632
3633 oldtup = change->data.tp.oldtuple;
3634 newtup = change->data.tp.newtuple;
3635
3636 if (oldtup)
3637 {
3638 sz += sizeof(HeapTupleData);
3639 oldlen = oldtup->tuple.t_len;
3640 sz += oldlen;
3641 }
3642
3643 if (newtup)
3644 {
3645 sz += sizeof(HeapTupleData);
3646 newlen = newtup->tuple.t_len;
3647 sz += newlen;
3648 }
3649
3650 /* make sure we have enough space */
3651 ReorderBufferSerializeReserve(rb, sz);
3652
3653 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3654 /* might have been reallocated above */
3655 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3656
3657 if (oldlen)
3658 {
3659 memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3660 data += sizeof(HeapTupleData);
3661
3662 memcpy(data, oldtup->tuple.t_data, oldlen);
3663 data += oldlen;
3664 }
3665
3666 if (newlen)
3667 {
3668 memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3669 data += sizeof(HeapTupleData);
3670
3671 memcpy(data, newtup->tuple.t_data, newlen);
3672 data += newlen;
3673 }
3674 break;
3675 }
3676 case REORDER_BUFFER_CHANGE_MESSAGE:
3677 {
3678 char *data;
3679 Size prefix_size = strlen(change->data.msg.prefix) + 1;
3680
3681 sz += prefix_size + change->data.msg.message_size +
3682 sizeof(Size) + sizeof(Size);
3683 ReorderBufferSerializeReserve(rb, sz);
3684
3685 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3686
3687 /* might have been reallocated above */
3688 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3689
3690 /* write the prefix including the size */
3691 memcpy(data, &prefix_size, sizeof(Size));
3692 data += sizeof(Size);
3693 memcpy(data, change->data.msg.prefix,
3694 prefix_size);
3695 data += prefix_size;
3696
3697 /* write the message including the size */
3698 memcpy(data, &change->data.msg.message_size, sizeof(Size));
3699 data += sizeof(Size);
3700 memcpy(data, change->data.msg.message,
3701 change->data.msg.message_size);
3702 data += change->data.msg.message_size;
3703
3704 break;
3705 }
3706 case REORDER_BUFFER_CHANGE_INVALIDATION:
3707 {
3708 char *data;
3709 Size inval_size = sizeof(SharedInvalidationMessage) *
3710 change->data.inval.ninvalidations;
3711
3712 sz += inval_size;
3713
3714 ReorderBufferSerializeReserve(rb, sz);
3715 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3716
3717 /* might have been reallocated above */
3718 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3719 memcpy(data, change->data.inval.invalidations, inval_size);
3720 data += inval_size;
3721
3722 break;
3723 }
3724 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
3725 {
3726 Snapshot snap;
3727 char *data;
3728
3729 snap = change->data.snapshot;
3730
3731 sz += sizeof(SnapshotData) +
3732 sizeof(TransactionId) * snap->xcnt +
3733 sizeof(TransactionId) * snap->subxcnt;
3734
3735 /* make sure we have enough space */
3736 ReorderBufferSerializeReserve(rb, sz);
3737 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3738 /* might have been reallocated above */
3739 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3740
3741 memcpy(data, snap, sizeof(SnapshotData));
3742 data += sizeof(SnapshotData);
3743
3744 if (snap->xcnt)
3745 {
3746 memcpy(data, snap->xip,
3747 sizeof(TransactionId) * snap->xcnt);
3748 data += sizeof(TransactionId) * snap->xcnt;
3749 }
3750
3751 if (snap->subxcnt)
3752 {
3753 memcpy(data, snap->subxip,
3754 sizeof(TransactionId) * snap->subxcnt);
3755 data += sizeof(TransactionId) * snap->subxcnt;
3756 }
3757 break;
3758 }
3759 case REORDER_BUFFER_CHANGE_TRUNCATE:
3760 {
3761 Size size;
3762 char *data;
3763
3764 /* account for the OIDs of truncated relations */
3765 size = sizeof(Oid) * change->data.truncate.nrelids;
3766 sz += size;
3767
3768 /* make sure we have enough space */
3769 ReorderBufferSerializeReserve(rb, sz);
3770
3771 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3772 /* might have been reallocated above */
3773 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3774
3775 memcpy(data, change->data.truncate.relids, size);
3776 data += size;
3777
3778 break;
3779 }
3780 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
3781 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
3782 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
3783 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
3784 /* ReorderBufferChange contains everything important */
3785 break;
3786 }
3787
3788 ondisk->size = sz;
3789
3790 errno = 0;
3791 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3792 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3793 {
3794 int save_errno = errno;
3795
3796 CloseTransientFile(fd);
3797
3798 /* if write didn't set errno, assume problem is no disk space */
3799 errno = save_errno ? save_errno : ENOSPC;
3800 ereport(ERROR,
3801 (errcode_for_file_access(),
3802 errmsg("could not write to data file for XID %u: %m",
3803 txn->xid)));
3804 }
3805 pgstat_report_wait_end();
3806
3807 /*
3808 * Keep the transaction's final_lsn up to date with each change we send to
3809 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3810 * only do this on commit and abort records, but that doesn't work if a
3811 * system crash leaves a transaction without its abort record).
3812 *
3813 * Make sure not to move it backwards.
3814 */
3815 if (txn->final_lsn < change->lsn)
3816 txn->final_lsn = change->lsn;
3817
3818 Assert(ondisk->change.action == change->action);
3819 }
3820
3821 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3822 static inline bool
ReorderBufferCanStream(ReorderBuffer * rb)3823 ReorderBufferCanStream(ReorderBuffer *rb)
3824 {
3825 LogicalDecodingContext *ctx = rb->private_data;
3826
3827 return ctx->streaming;
3828 }
3829
3830 /* Returns true, if the streaming can be started now, false, otherwise. */
3831 static inline bool
ReorderBufferCanStartStreaming(ReorderBuffer * rb)3832 ReorderBufferCanStartStreaming(ReorderBuffer *rb)
3833 {
3834 LogicalDecodingContext *ctx = rb->private_data;
3835 SnapBuild *builder = ctx->snapshot_builder;
3836
3837 /* We can't start streaming unless a consistent state is reached. */
3838 if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
3839 return false;
3840
3841 /*
3842 * We can't start streaming immediately even if the streaming is enabled
3843 * because we previously decoded this transaction and now just are
3844 * restarting.
3845 */
3846 if (ReorderBufferCanStream(rb) &&
3847 !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3848 return true;
3849
3850 return false;
3851 }
3852
3853 /*
3854 * Send data of a large transaction (and its subtransactions) to the
3855 * output plugin, but using the stream API.
3856 */
3857 static void
ReorderBufferStreamTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)3858 ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
3859 {
3860 Snapshot snapshot_now;
3861 CommandId command_id;
3862 Size stream_bytes;
3863 bool txn_is_streamed;
3864
3865 /* We can never reach here for a subtransaction. */
3866 Assert(txn->toptxn == NULL);
3867
3868 /*
3869 * We can't make any assumptions about base snapshot here, similar to what
3870 * ReorderBufferCommit() does. That relies on base_snapshot getting
3871 * transferred from subxact in ReorderBufferCommitChild(), but that was
3872 * not yet called as the transaction is in-progress.
3873 *
3874 * So just walk the subxacts and use the same logic here. But we only need
3875 * to do that once, when the transaction is streamed for the first time.
3876 * After that we need to reuse the snapshot from the previous run.
3877 *
3878 * Unlike DecodeCommit which adds xids of all the subtransactions in
3879 * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3880 * but we do add them to subxip array instead via ReorderBufferCopySnap.
3881 * This allows the catalog changes made in subtransactions decoded till
3882 * now to be visible.
3883 */
3884 if (txn->snapshot_now == NULL)
3885 {
3886 dlist_iter subxact_i;
3887
3888 /* make sure this transaction is streamed for the first time */
3889 Assert(!rbtxn_is_streamed(txn));
3890
3891 /* at the beginning we should have invalid command ID */
3892 Assert(txn->command_id == InvalidCommandId);
3893
3894 dlist_foreach(subxact_i, &txn->subtxns)
3895 {
3896 ReorderBufferTXN *subtxn;
3897
3898 subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3899 ReorderBufferTransferSnapToParent(txn, subtxn);
3900 }
3901
3902 /*
3903 * If this transaction has no snapshot, it didn't make any changes to
3904 * the database till now, so there's nothing to decode.
3905 */
3906 if (txn->base_snapshot == NULL)
3907 {
3908 Assert(txn->ninvalidations == 0);
3909 return;
3910 }
3911
3912 command_id = FirstCommandId;
3913 snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3914 txn, command_id);
3915 }
3916 else
3917 {
3918 /* the transaction must have been already streamed */
3919 Assert(rbtxn_is_streamed(txn));
3920
3921 /*
3922 * Nah, we already have snapshot from the previous streaming run. We
3923 * assume new subxacts can't move the LSN backwards, and so can't beat
3924 * the LSN condition in the previous branch (so no need to walk
3925 * through subxacts again). In fact, we must not do that as we may be
3926 * using snapshot half-way through the subxact.
3927 */
3928 command_id = txn->command_id;
3929
3930 /*
3931 * We can't use txn->snapshot_now directly because after the last
3932 * streaming run, we might have got some new sub-transactions. So we
3933 * need to add them to the snapshot.
3934 */
3935 snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3936 txn, command_id);
3937
3938 /* Free the previously copied snapshot. */
3939 Assert(txn->snapshot_now->copied);
3940 ReorderBufferFreeSnap(rb, txn->snapshot_now);
3941 txn->snapshot_now = NULL;
3942 }
3943
3944 /*
3945 * Remember this information to be used later to update stats. We can't
3946 * update the stats here as an error while processing the changes would
3947 * lead to the accumulation of stats even though we haven't streamed all
3948 * the changes.
3949 */
3950 txn_is_streamed = rbtxn_is_streamed(txn);
3951 stream_bytes = txn->total_size;
3952
3953 /* Process and send the changes to output plugin. */
3954 ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3955 command_id, true);
3956
3957 rb->streamCount += 1;
3958 rb->streamBytes += stream_bytes;
3959
3960 /* Don't consider already streamed transaction. */
3961 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
3962
3963 /* update the decoding stats */
3964 UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3965
3966 Assert(dlist_is_empty(&txn->changes));
3967 Assert(txn->nentries == 0);
3968 Assert(txn->nentries_mem == 0);
3969 }
3970
3971 /*
3972 * Size of a change in memory.
3973 */
3974 static Size
ReorderBufferChangeSize(ReorderBufferChange * change)3975 ReorderBufferChangeSize(ReorderBufferChange *change)
3976 {
3977 Size sz = sizeof(ReorderBufferChange);
3978
3979 switch (change->action)
3980 {
3981 /* fall through these, they're all similar enough */
3982 case REORDER_BUFFER_CHANGE_INSERT:
3983 case REORDER_BUFFER_CHANGE_UPDATE:
3984 case REORDER_BUFFER_CHANGE_DELETE:
3985 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
3986 {
3987 ReorderBufferTupleBuf *oldtup,
3988 *newtup;
3989 Size oldlen = 0;
3990 Size newlen = 0;
3991
3992 oldtup = change->data.tp.oldtuple;
3993 newtup = change->data.tp.newtuple;
3994
3995 if (oldtup)
3996 {
3997 sz += sizeof(HeapTupleData);
3998 oldlen = oldtup->tuple.t_len;
3999 sz += oldlen;
4000 }
4001
4002 if (newtup)
4003 {
4004 sz += sizeof(HeapTupleData);
4005 newlen = newtup->tuple.t_len;
4006 sz += newlen;
4007 }
4008
4009 break;
4010 }
4011 case REORDER_BUFFER_CHANGE_MESSAGE:
4012 {
4013 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4014
4015 sz += prefix_size + change->data.msg.message_size +
4016 sizeof(Size) + sizeof(Size);
4017
4018 break;
4019 }
4020 case REORDER_BUFFER_CHANGE_INVALIDATION:
4021 {
4022 sz += sizeof(SharedInvalidationMessage) *
4023 change->data.inval.ninvalidations;
4024 break;
4025 }
4026 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4027 {
4028 Snapshot snap;
4029
4030 snap = change->data.snapshot;
4031
4032 sz += sizeof(SnapshotData) +
4033 sizeof(TransactionId) * snap->xcnt +
4034 sizeof(TransactionId) * snap->subxcnt;
4035
4036 break;
4037 }
4038 case REORDER_BUFFER_CHANGE_TRUNCATE:
4039 {
4040 sz += sizeof(Oid) * change->data.truncate.nrelids;
4041
4042 break;
4043 }
4044 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4045 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4046 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4047 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4048 /* ReorderBufferChange contains everything important */
4049 break;
4050 }
4051
4052 return sz;
4053 }
4054
4055
4056 /*
4057 * Restore a number of changes spilled to disk back into memory.
4058 */
4059 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,TXNEntryFile * file,XLogSegNo * segno)4060 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
4061 TXNEntryFile *file, XLogSegNo *segno)
4062 {
4063 Size restored = 0;
4064 XLogSegNo last_segno;
4065 dlist_mutable_iter cleanup_iter;
4066 File *fd = &file->vfd;
4067
4068 Assert(txn->first_lsn != InvalidXLogRecPtr);
4069 Assert(txn->final_lsn != InvalidXLogRecPtr);
4070
4071 /* free current entries, so we have memory for more */
4072 dlist_foreach_modify(cleanup_iter, &txn->changes)
4073 {
4074 ReorderBufferChange *cleanup =
4075 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4076
4077 dlist_delete(&cleanup->node);
4078 ReorderBufferReturnChange(rb, cleanup, true);
4079 }
4080 txn->nentries_mem = 0;
4081 Assert(dlist_is_empty(&txn->changes));
4082
4083 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4084
4085 while (restored < max_changes_in_memory && *segno <= last_segno)
4086 {
4087 int readBytes;
4088 ReorderBufferDiskChange *ondisk;
4089
4090 if (*fd == -1)
4091 {
4092 char path[MAXPGPATH];
4093
4094 /* first time in */
4095 if (*segno == 0)
4096 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4097
4098 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4099
4100 /*
4101 * No need to care about TLIs here, only used during a single run,
4102 * so each LSN only maps to a specific WAL record.
4103 */
4104 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
4105 *segno);
4106
4107 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4108
4109 /* No harm in resetting the offset even in case of failure */
4110 file->curOffset = 0;
4111
4112 if (*fd < 0 && errno == ENOENT)
4113 {
4114 *fd = -1;
4115 (*segno)++;
4116 continue;
4117 }
4118 else if (*fd < 0)
4119 ereport(ERROR,
4120 (errcode_for_file_access(),
4121 errmsg("could not open file \"%s\": %m",
4122 path)));
4123 }
4124
4125 /*
4126 * Read the statically sized part of a change which has information
4127 * about the total size. If we couldn't read a record, we're at the
4128 * end of this file.
4129 */
4130 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
4131 readBytes = FileRead(file->vfd, rb->outbuf,
4132 sizeof(ReorderBufferDiskChange),
4133 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4134
4135 /* eof */
4136 if (readBytes == 0)
4137 {
4138 FileClose(*fd);
4139 *fd = -1;
4140 (*segno)++;
4141 continue;
4142 }
4143 else if (readBytes < 0)
4144 ereport(ERROR,
4145 (errcode_for_file_access(),
4146 errmsg("could not read from reorderbuffer spill file: %m")));
4147 else if (readBytes != sizeof(ReorderBufferDiskChange))
4148 ereport(ERROR,
4149 (errcode_for_file_access(),
4150 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4151 readBytes,
4152 (uint32) sizeof(ReorderBufferDiskChange))));
4153
4154 file->curOffset += readBytes;
4155
4156 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4157
4158 ReorderBufferSerializeReserve(rb,
4159 sizeof(ReorderBufferDiskChange) + ondisk->size);
4160 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4161
4162 readBytes = FileRead(file->vfd,
4163 rb->outbuf + sizeof(ReorderBufferDiskChange),
4164 ondisk->size - sizeof(ReorderBufferDiskChange),
4165 file->curOffset,
4166 WAIT_EVENT_REORDER_BUFFER_READ);
4167
4168 if (readBytes < 0)
4169 ereport(ERROR,
4170 (errcode_for_file_access(),
4171 errmsg("could not read from reorderbuffer spill file: %m")));
4172 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4173 ereport(ERROR,
4174 (errcode_for_file_access(),
4175 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4176 readBytes,
4177 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4178
4179 file->curOffset += readBytes;
4180
4181 /*
4182 * ok, read a full change from disk, now restore it into proper
4183 * in-memory format
4184 */
4185 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4186 restored++;
4187 }
4188
4189 return restored;
4190 }
4191
4192 /*
4193 * Convert change from its on-disk format to in-memory format and queue it onto
4194 * the TXN's ->changes list.
4195 *
4196 * Note: although "data" is declared char*, at entry it points to a
4197 * maxalign'd buffer, making it safe in most of this function to assume
4198 * that the pointed-to data is suitably aligned for direct access.
4199 */
4200 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)4201 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
4202 char *data)
4203 {
4204 ReorderBufferDiskChange *ondisk;
4205 ReorderBufferChange *change;
4206
4207 ondisk = (ReorderBufferDiskChange *) data;
4208
4209 change = ReorderBufferGetChange(rb);
4210
4211 /* copy static part */
4212 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4213
4214 data += sizeof(ReorderBufferDiskChange);
4215
4216 /* restore individual stuff */
4217 switch (change->action)
4218 {
4219 /* fall through these, they're all similar enough */
4220 case REORDER_BUFFER_CHANGE_INSERT:
4221 case REORDER_BUFFER_CHANGE_UPDATE:
4222 case REORDER_BUFFER_CHANGE_DELETE:
4223 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
4224 if (change->data.tp.oldtuple)
4225 {
4226 uint32 tuplelen = ((HeapTuple) data)->t_len;
4227
4228 change->data.tp.oldtuple =
4229 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4230
4231 /* restore ->tuple */
4232 memcpy(&change->data.tp.oldtuple->tuple, data,
4233 sizeof(HeapTupleData));
4234 data += sizeof(HeapTupleData);
4235
4236 /* reset t_data pointer into the new tuplebuf */
4237 change->data.tp.oldtuple->tuple.t_data =
4238 ReorderBufferTupleBufData(change->data.tp.oldtuple);
4239
4240 /* restore tuple data itself */
4241 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4242 data += tuplelen;
4243 }
4244
4245 if (change->data.tp.newtuple)
4246 {
4247 /* here, data might not be suitably aligned! */
4248 uint32 tuplelen;
4249
4250 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4251 sizeof(uint32));
4252
4253 change->data.tp.newtuple =
4254 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4255
4256 /* restore ->tuple */
4257 memcpy(&change->data.tp.newtuple->tuple, data,
4258 sizeof(HeapTupleData));
4259 data += sizeof(HeapTupleData);
4260
4261 /* reset t_data pointer into the new tuplebuf */
4262 change->data.tp.newtuple->tuple.t_data =
4263 ReorderBufferTupleBufData(change->data.tp.newtuple);
4264
4265 /* restore tuple data itself */
4266 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4267 data += tuplelen;
4268 }
4269
4270 break;
4271 case REORDER_BUFFER_CHANGE_MESSAGE:
4272 {
4273 Size prefix_size;
4274
4275 /* read prefix */
4276 memcpy(&prefix_size, data, sizeof(Size));
4277 data += sizeof(Size);
4278 change->data.msg.prefix = MemoryContextAlloc(rb->context,
4279 prefix_size);
4280 memcpy(change->data.msg.prefix, data, prefix_size);
4281 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4282 data += prefix_size;
4283
4284 /* read the message */
4285 memcpy(&change->data.msg.message_size, data, sizeof(Size));
4286 data += sizeof(Size);
4287 change->data.msg.message = MemoryContextAlloc(rb->context,
4288 change->data.msg.message_size);
4289 memcpy(change->data.msg.message, data,
4290 change->data.msg.message_size);
4291 data += change->data.msg.message_size;
4292
4293 break;
4294 }
4295 case REORDER_BUFFER_CHANGE_INVALIDATION:
4296 {
4297 Size inval_size = sizeof(SharedInvalidationMessage) *
4298 change->data.inval.ninvalidations;
4299
4300 change->data.inval.invalidations =
4301 MemoryContextAlloc(rb->context, inval_size);
4302
4303 /* read the message */
4304 memcpy(change->data.inval.invalidations, data, inval_size);
4305
4306 break;
4307 }
4308 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4309 {
4310 Snapshot oldsnap;
4311 Snapshot newsnap;
4312 Size size;
4313
4314 oldsnap = (Snapshot) data;
4315
4316 size = sizeof(SnapshotData) +
4317 sizeof(TransactionId) * oldsnap->xcnt +
4318 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4319
4320 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4321
4322 newsnap = change->data.snapshot;
4323
4324 memcpy(newsnap, data, size);
4325 newsnap->xip = (TransactionId *)
4326 (((char *) newsnap) + sizeof(SnapshotData));
4327 newsnap->subxip = newsnap->xip + newsnap->xcnt;
4328 newsnap->copied = true;
4329 break;
4330 }
4331 /* the base struct contains all the data, easy peasy */
4332 case REORDER_BUFFER_CHANGE_TRUNCATE:
4333 {
4334 Oid *relids;
4335
4336 relids = ReorderBufferGetRelids(rb,
4337 change->data.truncate.nrelids);
4338 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4339 change->data.truncate.relids = relids;
4340
4341 break;
4342 }
4343 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4344 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4345 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4346 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4347 break;
4348 }
4349
4350 dlist_push_tail(&txn->changes, &change->node);
4351 txn->nentries_mem++;
4352
4353 /*
4354 * Update memory accounting for the restored change. We need to do this
4355 * although we don't check the memory limit when restoring the changes in
4356 * this branch (we only do that when initially queueing the changes after
4357 * decoding), because we will release the changes later, and that will
4358 * update the accounting too (subtracting the size from the counters). And
4359 * we don't want to underflow there.
4360 */
4361 ReorderBufferChangeMemoryUpdate(rb, change, true,
4362 ReorderBufferChangeSize(change));
4363 }
4364
4365 /*
4366 * Remove all on-disk stored for the passed in transaction.
4367 */
4368 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)4369 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
4370 {
4371 XLogSegNo first;
4372 XLogSegNo cur;
4373 XLogSegNo last;
4374
4375 Assert(txn->first_lsn != InvalidXLogRecPtr);
4376 Assert(txn->final_lsn != InvalidXLogRecPtr);
4377
4378 XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4379 XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4380
4381 /* iterate over all possible filenames, and delete them */
4382 for (cur = first; cur <= last; cur++)
4383 {
4384 char path[MAXPGPATH];
4385
4386 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
4387 if (unlink(path) != 0 && errno != ENOENT)
4388 ereport(ERROR,
4389 (errcode_for_file_access(),
4390 errmsg("could not remove file \"%s\": %m", path)));
4391 }
4392 }
4393
4394 /*
4395 * Remove any leftover serialized reorder buffers from a slot directory after a
4396 * prior crash or decoding session exit.
4397 */
4398 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)4399 ReorderBufferCleanupSerializedTXNs(const char *slotname)
4400 {
4401 DIR *spill_dir;
4402 struct dirent *spill_de;
4403 struct stat statbuf;
4404 char path[MAXPGPATH * 2 + 12];
4405
4406 sprintf(path, "pg_replslot/%s", slotname);
4407
4408 /* we're only handling directories here, skip if it's not ours */
4409 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4410 return;
4411
4412 spill_dir = AllocateDir(path);
4413 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4414 {
4415 /* only look at names that can be ours */
4416 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4417 {
4418 snprintf(path, sizeof(path),
4419 "pg_replslot/%s/%s", slotname,
4420 spill_de->d_name);
4421
4422 if (unlink(path) != 0)
4423 ereport(ERROR,
4424 (errcode_for_file_access(),
4425 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4426 path, slotname)));
4427 }
4428 }
4429 FreeDir(spill_dir);
4430 }
4431
4432 /*
4433 * Given a replication slot, transaction ID and segment number, fill in the
4434 * corresponding spill file into 'path', which is a caller-owned buffer of size
4435 * at least MAXPGPATH.
4436 */
4437 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)4438 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
4439 XLogSegNo segno)
4440 {
4441 XLogRecPtr recptr;
4442
4443 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4444
4445 snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4446 NameStr(MyReplicationSlot->data.name),
4447 xid, LSN_FORMAT_ARGS(recptr));
4448 }
4449
4450 /*
4451 * Delete all data spilled to disk after we've restarted/crashed. It will be
4452 * recreated when the respective slots are reused.
4453 */
4454 void
StartupReorderBuffer(void)4455 StartupReorderBuffer(void)
4456 {
4457 DIR *logical_dir;
4458 struct dirent *logical_de;
4459
4460 logical_dir = AllocateDir("pg_replslot");
4461 while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4462 {
4463 if (strcmp(logical_de->d_name, ".") == 0 ||
4464 strcmp(logical_de->d_name, "..") == 0)
4465 continue;
4466
4467 /* if it cannot be a slot, skip the directory */
4468 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4469 continue;
4470
4471 /*
4472 * ok, has to be a surviving logical slot, iterate and delete
4473 * everything starting with xid-*
4474 */
4475 ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
4476 }
4477 FreeDir(logical_dir);
4478 }
4479
4480 /* ---------------------------------------
4481 * toast reassembly support
4482 * ---------------------------------------
4483 */
4484
4485 /*
4486 * Initialize per tuple toast reconstruction support.
4487 */
4488 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)4489 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
4490 {
4491 HASHCTL hash_ctl;
4492
4493 Assert(txn->toast_hash == NULL);
4494
4495 hash_ctl.keysize = sizeof(Oid);
4496 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4497 hash_ctl.hcxt = rb->context;
4498 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4499 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4500 }
4501
4502 /*
4503 * Per toast-chunk handling for toast reconstruction
4504 *
4505 * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4506 * toasted Datum comes along.
4507 */
4508 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)4509 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
4510 Relation relation, ReorderBufferChange *change)
4511 {
4512 ReorderBufferToastEnt *ent;
4513 ReorderBufferTupleBuf *newtup;
4514 bool found;
4515 int32 chunksize;
4516 bool isnull;
4517 Pointer chunk;
4518 TupleDesc desc = RelationGetDescr(relation);
4519 Oid chunk_id;
4520 int32 chunk_seq;
4521
4522 if (txn->toast_hash == NULL)
4523 ReorderBufferToastInitHash(rb, txn);
4524
4525 Assert(IsToastRelation(relation));
4526
4527 newtup = change->data.tp.newtuple;
4528 chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
4529 Assert(!isnull);
4530 chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
4531 Assert(!isnull);
4532
4533 ent = (ReorderBufferToastEnt *)
4534 hash_search(txn->toast_hash,
4535 (void *) &chunk_id,
4536 HASH_ENTER,
4537 &found);
4538
4539 if (!found)
4540 {
4541 Assert(ent->chunk_id == chunk_id);
4542 ent->num_chunks = 0;
4543 ent->last_chunk_seq = 0;
4544 ent->size = 0;
4545 ent->reconstructed = NULL;
4546 dlist_init(&ent->chunks);
4547
4548 if (chunk_seq != 0)
4549 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4550 chunk_seq, chunk_id);
4551 }
4552 else if (found && chunk_seq != ent->last_chunk_seq + 1)
4553 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4554 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4555
4556 chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
4557 Assert(!isnull);
4558
4559 /* calculate size so we can allocate the right size at once later */
4560 if (!VARATT_IS_EXTENDED(chunk))
4561 chunksize = VARSIZE(chunk) - VARHDRSZ;
4562 else if (VARATT_IS_SHORT(chunk))
4563 /* could happen due to heap_form_tuple doing its thing */
4564 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4565 else
4566 elog(ERROR, "unexpected type of toast chunk");
4567
4568 ent->size += chunksize;
4569 ent->last_chunk_seq = chunk_seq;
4570 ent->num_chunks++;
4571 dlist_push_tail(&ent->chunks, &change->node);
4572 }
4573
4574 /*
4575 * Rejigger change->newtuple to point to in-memory toast tuples instead to
4576 * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4577 *
4578 * We cannot replace unchanged toast tuples though, so those will still point
4579 * to on-disk toast data.
4580 *
4581 * While updating the existing change with detoasted tuple data, we need to
4582 * update the memory accounting info, because the change size will differ.
4583 * Otherwise the accounting may get out of sync, triggering serialization
4584 * at unexpected times.
4585 *
4586 * We simply subtract size of the change before rejiggering the tuple, and
4587 * then adding the new size. This makes it look like the change was removed
4588 * and then added back, except it only tweaks the accounting info.
4589 *
4590 * In particular it can't trigger serialization, which would be pointless
4591 * anyway as it happens during commit processing right before handing
4592 * the change to the output plugin.
4593 */
4594 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)4595 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
4596 Relation relation, ReorderBufferChange *change)
4597 {
4598 TupleDesc desc;
4599 int natt;
4600 Datum *attrs;
4601 bool *isnull;
4602 bool *free;
4603 HeapTuple tmphtup;
4604 Relation toast_rel;
4605 TupleDesc toast_desc;
4606 MemoryContext oldcontext;
4607 ReorderBufferTupleBuf *newtup;
4608 Size old_size;
4609
4610 /* no toast tuples changed */
4611 if (txn->toast_hash == NULL)
4612 return;
4613
4614 /*
4615 * We're going to modify the size of the change. So, to make sure the
4616 * accounting is correct we record the current change size and then after
4617 * re-computing the change we'll subtract the recorded size and then
4618 * re-add the new change size at the end. We don't immediately subtract
4619 * the old size because if there is any error before we add the new size,
4620 * we will release the changes and that will update the accounting info
4621 * (subtracting the size from the counters). And we don't want to
4622 * underflow there.
4623 */
4624 old_size = ReorderBufferChangeSize(change);
4625
4626 oldcontext = MemoryContextSwitchTo(rb->context);
4627
4628 /* we should only have toast tuples in an INSERT or UPDATE */
4629 Assert(change->data.tp.newtuple);
4630
4631 desc = RelationGetDescr(relation);
4632
4633 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4634 if (!RelationIsValid(toast_rel))
4635 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4636 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4637
4638 toast_desc = RelationGetDescr(toast_rel);
4639
4640 /* should we allocate from stack instead? */
4641 attrs = palloc0(sizeof(Datum) * desc->natts);
4642 isnull = palloc0(sizeof(bool) * desc->natts);
4643 free = palloc0(sizeof(bool) * desc->natts);
4644
4645 newtup = change->data.tp.newtuple;
4646
4647 heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
4648
4649 for (natt = 0; natt < desc->natts; natt++)
4650 {
4651 Form_pg_attribute attr = TupleDescAttr(desc, natt);
4652 ReorderBufferToastEnt *ent;
4653 struct varlena *varlena;
4654
4655 /* va_rawsize is the size of the original datum -- including header */
4656 struct varatt_external toast_pointer;
4657 struct varatt_indirect redirect_pointer;
4658 struct varlena *new_datum = NULL;
4659 struct varlena *reconstructed;
4660 dlist_iter it;
4661 Size data_done = 0;
4662
4663 /* system columns aren't toasted */
4664 if (attr->attnum < 0)
4665 continue;
4666
4667 if (attr->attisdropped)
4668 continue;
4669
4670 /* not a varlena datatype */
4671 if (attr->attlen != -1)
4672 continue;
4673
4674 /* no data */
4675 if (isnull[natt])
4676 continue;
4677
4678 /* ok, we know we have a toast datum */
4679 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4680
4681 /* no need to do anything if the tuple isn't external */
4682 if (!VARATT_IS_EXTERNAL(varlena))
4683 continue;
4684
4685 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4686
4687 /*
4688 * Check whether the toast tuple changed, replace if so.
4689 */
4690 ent = (ReorderBufferToastEnt *)
4691 hash_search(txn->toast_hash,
4692 (void *) &toast_pointer.va_valueid,
4693 HASH_FIND,
4694 NULL);
4695 if (ent == NULL)
4696 continue;
4697
4698 new_datum =
4699 (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4700
4701 free[natt] = true;
4702
4703 reconstructed = palloc0(toast_pointer.va_rawsize);
4704
4705 ent->reconstructed = reconstructed;
4706
4707 /* stitch toast tuple back together from its parts */
4708 dlist_foreach(it, &ent->chunks)
4709 {
4710 bool isnull;
4711 ReorderBufferChange *cchange;
4712 ReorderBufferTupleBuf *ctup;
4713 Pointer chunk;
4714
4715 cchange = dlist_container(ReorderBufferChange, node, it.cur);
4716 ctup = cchange->data.tp.newtuple;
4717 chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4718
4719 Assert(!isnull);
4720 Assert(!VARATT_IS_EXTERNAL(chunk));
4721 Assert(!VARATT_IS_SHORT(chunk));
4722
4723 memcpy(VARDATA(reconstructed) + data_done,
4724 VARDATA(chunk),
4725 VARSIZE(chunk) - VARHDRSZ);
4726 data_done += VARSIZE(chunk) - VARHDRSZ;
4727 }
4728 Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4729
4730 /* make sure its marked as compressed or not */
4731 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4732 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4733 else
4734 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4735
4736 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4737 redirect_pointer.pointer = reconstructed;
4738
4739 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
4740 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4741 sizeof(redirect_pointer));
4742
4743 attrs[natt] = PointerGetDatum(new_datum);
4744 }
4745
4746 /*
4747 * Build tuple in separate memory & copy tuple back into the tuplebuf
4748 * passed to the output plugin. We can't directly heap_fill_tuple() into
4749 * the tuplebuf because attrs[] will point back into the current content.
4750 */
4751 tmphtup = heap_form_tuple(desc, attrs, isnull);
4752 Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
4753 Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
4754
4755 memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
4756 newtup->tuple.t_len = tmphtup->t_len;
4757
4758 /*
4759 * free resources we won't further need, more persistent stuff will be
4760 * free'd in ReorderBufferToastReset().
4761 */
4762 RelationClose(toast_rel);
4763 pfree(tmphtup);
4764 for (natt = 0; natt < desc->natts; natt++)
4765 {
4766 if (free[natt])
4767 pfree(DatumGetPointer(attrs[natt]));
4768 }
4769 pfree(attrs);
4770 pfree(free);
4771 pfree(isnull);
4772
4773 MemoryContextSwitchTo(oldcontext);
4774
4775 /* subtract the old change size */
4776 ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
4777 /* now add the change back, with the correct size */
4778 ReorderBufferChangeMemoryUpdate(rb, change, true,
4779 ReorderBufferChangeSize(change));
4780 }
4781
4782 /*
4783 * Free all resources allocated for toast reconstruction.
4784 */
4785 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)4786 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
4787 {
4788 HASH_SEQ_STATUS hstat;
4789 ReorderBufferToastEnt *ent;
4790
4791 if (txn->toast_hash == NULL)
4792 return;
4793
4794 /* sequentially walk over the hash and free everything */
4795 hash_seq_init(&hstat, txn->toast_hash);
4796 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4797 {
4798 dlist_mutable_iter it;
4799
4800 if (ent->reconstructed != NULL)
4801 pfree(ent->reconstructed);
4802
4803 dlist_foreach_modify(it, &ent->chunks)
4804 {
4805 ReorderBufferChange *change =
4806 dlist_container(ReorderBufferChange, node, it.cur);
4807
4808 dlist_delete(&change->node);
4809 ReorderBufferReturnChange(rb, change, true);
4810 }
4811 }
4812
4813 hash_destroy(txn->toast_hash);
4814 txn->toast_hash = NULL;
4815 }
4816
4817
4818 /* ---------------------------------------
4819 * Visibility support for logical decoding
4820 *
4821 *
4822 * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4823 * always rely on stored cmin/cmax values because of two scenarios:
4824 *
4825 * * A tuple got changed multiple times during a single transaction and thus
4826 * has got a combo CID. Combo CIDs are only valid for the duration of a
4827 * single transaction.
4828 * * A tuple with a cmin but no cmax (and thus no combo CID) got
4829 * deleted/updated in another transaction than the one which created it
4830 * which we are looking at right now. As only one of cmin, cmax or combo CID
4831 * is actually stored in the heap we don't have access to the value we
4832 * need anymore.
4833 *
4834 * To resolve those problems we have a per-transaction hash of (cmin,
4835 * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
4836 * (cmin, cmax) values. That also takes care of combo CIDs by simply
4837 * not caring about them at all. As we have the real cmin/cmax values
4838 * combo CIDs aren't interesting.
4839 *
4840 * As we only care about catalog tuples here the overhead of this
4841 * hashtable should be acceptable.
4842 *
4843 * Heap rewrites complicate this a bit, check rewriteheap.c for
4844 * details.
4845 * -------------------------------------------------------------------------
4846 */
4847
4848 /* struct for sorting mapping files by LSN efficiently */
4849 typedef struct RewriteMappingFile
4850 {
4851 XLogRecPtr lsn;
4852 char fname[MAXPGPATH];
4853 } RewriteMappingFile;
4854
4855 #ifdef NOT_USED
4856 static void
DisplayMapping(HTAB * tuplecid_data)4857 DisplayMapping(HTAB *tuplecid_data)
4858 {
4859 HASH_SEQ_STATUS hstat;
4860 ReorderBufferTupleCidEnt *ent;
4861
4862 hash_seq_init(&hstat, tuplecid_data);
4863 while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4864 {
4865 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4866 ent->key.relnode.dbNode,
4867 ent->key.relnode.spcNode,
4868 ent->key.relnode.relNode,
4869 ItemPointerGetBlockNumber(&ent->key.tid),
4870 ItemPointerGetOffsetNumber(&ent->key.tid),
4871 ent->cmin,
4872 ent->cmax
4873 );
4874 }
4875 }
4876 #endif
4877
4878 /*
4879 * Apply a single mapping file to tuplecid_data.
4880 *
4881 * The mapping file has to have been verified to be a) committed b) for our
4882 * transaction c) applied in LSN order.
4883 */
4884 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)4885 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
4886 {
4887 char path[MAXPGPATH];
4888 int fd;
4889 int readBytes;
4890 LogicalRewriteMappingData map;
4891
4892 sprintf(path, "pg_logical/mappings/%s", fname);
4893 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4894 if (fd < 0)
4895 ereport(ERROR,
4896 (errcode_for_file_access(),
4897 errmsg("could not open file \"%s\": %m", path)));
4898
4899 while (true)
4900 {
4901 ReorderBufferTupleCidKey key;
4902 ReorderBufferTupleCidEnt *ent;
4903 ReorderBufferTupleCidEnt *new_ent;
4904 bool found;
4905
4906 /* be careful about padding */
4907 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4908
4909 /* read all mappings till the end of the file */
4910 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
4911 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4912 pgstat_report_wait_end();
4913
4914 if (readBytes < 0)
4915 ereport(ERROR,
4916 (errcode_for_file_access(),
4917 errmsg("could not read file \"%s\": %m",
4918 path)));
4919 else if (readBytes == 0) /* EOF */
4920 break;
4921 else if (readBytes != sizeof(LogicalRewriteMappingData))
4922 ereport(ERROR,
4923 (errcode_for_file_access(),
4924 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4925 path, readBytes,
4926 (int32) sizeof(LogicalRewriteMappingData))));
4927
4928 key.relnode = map.old_node;
4929 ItemPointerCopy(&map.old_tid,
4930 &key.tid);
4931
4932
4933 ent = (ReorderBufferTupleCidEnt *)
4934 hash_search(tuplecid_data,
4935 (void *) &key,
4936 HASH_FIND,
4937 NULL);
4938
4939 /* no existing mapping, no need to update */
4940 if (!ent)
4941 continue;
4942
4943 key.relnode = map.new_node;
4944 ItemPointerCopy(&map.new_tid,
4945 &key.tid);
4946
4947 new_ent = (ReorderBufferTupleCidEnt *)
4948 hash_search(tuplecid_data,
4949 (void *) &key,
4950 HASH_ENTER,
4951 &found);
4952
4953 if (found)
4954 {
4955 /*
4956 * Make sure the existing mapping makes sense. We sometime update
4957 * old records that did not yet have a cmax (e.g. pg_class' own
4958 * entry while rewriting it) during rewrites, so allow that.
4959 */
4960 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4961 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4962 }
4963 else
4964 {
4965 /* update mapping */
4966 new_ent->cmin = ent->cmin;
4967 new_ent->cmax = ent->cmax;
4968 new_ent->combocid = ent->combocid;
4969 }
4970 }
4971
4972 if (CloseTransientFile(fd) != 0)
4973 ereport(ERROR,
4974 (errcode_for_file_access(),
4975 errmsg("could not close file \"%s\": %m", path)));
4976 }
4977
4978
4979 /*
4980 * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
4981 */
4982 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)4983 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
4984 {
4985 return bsearch(&xid, xip, num,
4986 sizeof(TransactionId), xidComparator) != NULL;
4987 }
4988
4989 /*
4990 * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
4991 */
4992 static int
file_sort_by_lsn(const ListCell * a_p,const ListCell * b_p)4993 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
4994 {
4995 RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p);
4996 RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p);
4997
4998 if (a->lsn < b->lsn)
4999 return -1;
5000 else if (a->lsn > b->lsn)
5001 return 1;
5002 return 0;
5003 }
5004
5005 /*
5006 * Apply any existing logical remapping files if there are any targeted at our
5007 * transaction for relid.
5008 */
5009 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)5010 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
5011 {
5012 DIR *mapping_dir;
5013 struct dirent *mapping_de;
5014 List *files = NIL;
5015 ListCell *file;
5016 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5017
5018 mapping_dir = AllocateDir("pg_logical/mappings");
5019 while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5020 {
5021 Oid f_dboid;
5022 Oid f_relid;
5023 TransactionId f_mapped_xid;
5024 TransactionId f_create_xid;
5025 XLogRecPtr f_lsn;
5026 uint32 f_hi,
5027 f_lo;
5028 RewriteMappingFile *f;
5029
5030 if (strcmp(mapping_de->d_name, ".") == 0 ||
5031 strcmp(mapping_de->d_name, "..") == 0)
5032 continue;
5033
5034 /* Ignore files that aren't ours */
5035 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5036 continue;
5037
5038 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5039 &f_dboid, &f_relid, &f_hi, &f_lo,
5040 &f_mapped_xid, &f_create_xid) != 6)
5041 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5042
5043 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5044
5045 /* mapping for another database */
5046 if (f_dboid != dboid)
5047 continue;
5048
5049 /* mapping for another relation */
5050 if (f_relid != relid)
5051 continue;
5052
5053 /* did the creating transaction abort? */
5054 if (!TransactionIdDidCommit(f_create_xid))
5055 continue;
5056
5057 /* not for our transaction */
5058 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5059 continue;
5060
5061 /* ok, relevant, queue for apply */
5062 f = palloc(sizeof(RewriteMappingFile));
5063 f->lsn = f_lsn;
5064 strcpy(f->fname, mapping_de->d_name);
5065 files = lappend(files, f);
5066 }
5067 FreeDir(mapping_dir);
5068
5069 /* sort files so we apply them in LSN order */
5070 list_sort(files, file_sort_by_lsn);
5071
5072 foreach(file, files)
5073 {
5074 RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file);
5075
5076 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5077 snapshot->subxip[0]);
5078 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
5079 pfree(f);
5080 }
5081 }
5082
5083 /*
5084 * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5085 * combo CIDs.
5086 */
5087 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)5088 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
5089 Snapshot snapshot,
5090 HeapTuple htup, Buffer buffer,
5091 CommandId *cmin, CommandId *cmax)
5092 {
5093 ReorderBufferTupleCidKey key;
5094 ReorderBufferTupleCidEnt *ent;
5095 ForkNumber forkno;
5096 BlockNumber blockno;
5097 bool updated_mapping = false;
5098
5099 /*
5100 * Return unresolved if tuplecid_data is not valid. That's because when
5101 * streaming in-progress transactions we may run into tuples with the CID
5102 * before actually decoding them. Think e.g. about INSERT followed by
5103 * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5104 * INSERT. So in such cases, we assume the CID is from the future
5105 * command.
5106 */
5107 if (tuplecid_data == NULL)
5108 return false;
5109
5110 /* be careful about padding */
5111 memset(&key, 0, sizeof(key));
5112
5113 Assert(!BufferIsLocal(buffer));
5114
5115 /*
5116 * get relfilenode from the buffer, no convenient way to access it other
5117 * than that.
5118 */
5119 BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
5120
5121 /* tuples can only be in the main fork */
5122 Assert(forkno == MAIN_FORKNUM);
5123 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5124
5125 ItemPointerCopy(&htup->t_self,
5126 &key.tid);
5127
5128 restart:
5129 ent = (ReorderBufferTupleCidEnt *)
5130 hash_search(tuplecid_data,
5131 (void *) &key,
5132 HASH_FIND,
5133 NULL);
5134
5135 /*
5136 * failed to find a mapping, check whether the table was rewritten and
5137 * apply mapping if so, but only do that once - there can be no new
5138 * mappings while we are in here since we have to hold a lock on the
5139 * relation.
5140 */
5141 if (ent == NULL && !updated_mapping)
5142 {
5143 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
5144 /* now check but don't update for a mapping again */
5145 updated_mapping = true;
5146 goto restart;
5147 }
5148 else if (ent == NULL)
5149 return false;
5150
5151 if (cmin)
5152 *cmin = ent->cmin;
5153 if (cmax)
5154 *cmax = ent->cmax;
5155 return true;
5156 }
5157