1 /*-------------------------------------------------------------------------
2 *
3 * reorderbuffer.c
4 * PostgreSQL logical replay/reorder buffer management
5 *
6 *
7 * Copyright (c) 2012-2018, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/reorderbuffer.c
12 *
13 * NOTES
14 * This module gets handed individual pieces of transactions in the order
15 * they are written to the WAL and is responsible to reassemble them into
16 * toplevel transaction sized pieces. When a transaction is completely
17 * reassembled - signalled by reading the transaction commit record - it
18 * will then call the output plugin (cf. ReorderBufferCommit()) with the
19 * individual changes. The output plugins rely on snapshots built by
20 * snapbuild.c which hands them to us.
21 *
22 * Transactions and subtransactions/savepoints in postgres are not
23 * immediately linked to each other from outside the performing
24 * backend. Only at commit/abort (or special xact_assignment records) they
25 * are linked together. Which means that we will have to splice together a
26 * toplevel transaction from its subtransactions. To do that efficiently we
27 * build a binary heap indexed by the smallest current lsn of the individual
28 * subtransactions' changestreams. As the individual streams are inherently
29 * ordered by LSN - since that is where we build them from - the transaction
30 * can easily be reassembled by always using the subtransaction with the
31 * smallest current LSN from the heap.
32 *
33 * In order to cope with large transactions - which can be several times as
34 * big as the available memory - this module supports spooling the contents
35 * of a large transactions to disk. When the transaction is replayed the
36 * contents of individual (sub-)transactions will be read from disk in
37 * chunks.
38 *
39 * This module also has to deal with reassembling toast records from the
40 * individual chunks stored in WAL. When a new (or initial) version of a
41 * tuple is stored in WAL it will always be preceded by the toast chunks
42 * emitted for the columns stored out of line. Within a single toplevel
43 * transaction there will be no other data carrying records between a row's
44 * toast chunks and the row data itself. See ReorderBufferToast* for
45 * details.
46 *
47 * ReorderBuffer uses two special memory context types - SlabContext for
48 * allocations of fixed-length structures (changes and transactions), and
49 * GenerationContext for the variable-length transaction data (allocated
50 * and freed in groups with similar lifespan).
51 *
52 * -------------------------------------------------------------------------
53 */
54 #include "postgres.h"
55
56 #include <unistd.h>
57 #include <sys/stat.h>
58
59 #include "access/rewriteheap.h"
60 #include "access/transam.h"
61 #include "access/tuptoaster.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "lib/binaryheap.h"
66 #include "miscadmin.h"
67 #include "pgstat.h"
68 #include "replication/logical.h"
69 #include "replication/reorderbuffer.h"
70 #include "replication/slot.h"
71 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
72 #include "storage/bufmgr.h"
73 #include "storage/fd.h"
74 #include "storage/sinval.h"
75 #include "utils/builtins.h"
76 #include "utils/combocid.h"
77 #include "utils/memdebug.h"
78 #include "utils/memutils.h"
79 #include "utils/rel.h"
80 #include "utils/relfilenodemap.h"
81 #include "utils/tqual.h"
82
83
84 /* entry for a hash table we use to map from xid to our transaction state */
85 typedef struct ReorderBufferTXNByIdEnt
86 {
87 TransactionId xid;
88 ReorderBufferTXN *txn;
89 } ReorderBufferTXNByIdEnt;
90
91 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
92 typedef struct ReorderBufferTupleCidKey
93 {
94 RelFileNode relnode;
95 ItemPointerData tid;
96 } ReorderBufferTupleCidKey;
97
98 typedef struct ReorderBufferTupleCidEnt
99 {
100 ReorderBufferTupleCidKey key;
101 CommandId cmin;
102 CommandId cmax;
103 CommandId combocid; /* just for debugging */
104 } ReorderBufferTupleCidEnt;
105
106 /* k-way in-order change iteration support structures */
107 typedef struct ReorderBufferIterTXNEntry
108 {
109 XLogRecPtr lsn;
110 ReorderBufferChange *change;
111 ReorderBufferTXN *txn;
112 File fd;
113 XLogSegNo segno;
114 } ReorderBufferIterTXNEntry;
115
116 typedef struct ReorderBufferIterTXNState
117 {
118 binaryheap *heap;
119 Size nr_txns;
120 dlist_head old_change;
121 ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
122 } ReorderBufferIterTXNState;
123
124 /* toast datastructures */
125 typedef struct ReorderBufferToastEnt
126 {
127 Oid chunk_id; /* toast_table.chunk_id */
128 int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
129 * have seen */
130 Size num_chunks; /* number of chunks we've already seen */
131 Size size; /* combined size of chunks seen */
132 dlist_head chunks; /* linked list of chunks */
133 struct varlena *reconstructed; /* reconstructed varlena now pointed to in
134 * main tup */
135 } ReorderBufferToastEnt;
136
137 /* Disk serialization support datastructures */
138 typedef struct ReorderBufferDiskChange
139 {
140 Size size;
141 ReorderBufferChange change;
142 /* data follows */
143 } ReorderBufferDiskChange;
144
145 /*
146 * Maximum number of changes kept in memory, per transaction. After that,
147 * changes are spooled to disk.
148 *
149 * The current value should be sufficient to decode the entire transaction
150 * without hitting disk in OLTP workloads, while starting to spool to disk in
151 * other workloads reasonably fast.
152 *
153 * At some point in the future it probably makes sense to have a more elaborate
154 * resource management here, but it's not entirely clear what that would look
155 * like.
156 */
157 static const Size max_changes_in_memory = 4096;
158
159 /* ---------------------------------------
160 * primary reorderbuffer support routines
161 * ---------------------------------------
162 */
163 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
164 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
165 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
166 TransactionId xid, bool create, bool *is_new,
167 XLogRecPtr lsn, bool create_as_top);
168 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
169 ReorderBufferTXN *subtxn);
170
171 static void AssertTXNLsnOrder(ReorderBuffer *rb);
172
173 /* ---------------------------------------
174 * support functions for lsn-order iterating over the ->changes of a
175 * transaction and its subtransactions
176 *
177 * used for iteration over the k-way heap merge of a transaction and its
178 * subtransactions
179 * ---------------------------------------
180 */
181 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
182 ReorderBufferIterTXNState *volatile *iter_state);
183 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
184 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
185 ReorderBufferIterTXNState *state);
186 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
187
188 /*
189 * ---------------------------------------
190 * Disk serialization support functions
191 * ---------------------------------------
192 */
193 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
195 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
196 int fd, ReorderBufferChange *change);
197 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
198 File *fd, XLogSegNo *segno);
199 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
200 char *change);
201 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
202 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
203 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
204 TransactionId xid, XLogSegNo segno);
205
206 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
207 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
208 ReorderBufferTXN *txn, CommandId cid);
209
210 /* ---------------------------------------
211 * toast reassembly support
212 * ---------------------------------------
213 */
214 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
215 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
216 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
217 Relation relation, ReorderBufferChange *change);
218 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
219 Relation relation, ReorderBufferChange *change);
220
221
222 /*
223 * Allocate a new ReorderBuffer and clean out any old serialized state from
224 * prior ReorderBuffer instances for the same slot.
225 */
226 ReorderBuffer *
ReorderBufferAllocate(void)227 ReorderBufferAllocate(void)
228 {
229 ReorderBuffer *buffer;
230 HASHCTL hash_ctl;
231 MemoryContext new_ctx;
232
233 Assert(MyReplicationSlot != NULL);
234
235 /* allocate memory in own context, to have better accountability */
236 new_ctx = AllocSetContextCreate(CurrentMemoryContext,
237 "ReorderBuffer",
238 ALLOCSET_DEFAULT_SIZES);
239
240 buffer =
241 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
242
243 memset(&hash_ctl, 0, sizeof(hash_ctl));
244
245 buffer->context = new_ctx;
246
247 buffer->change_context = SlabContextCreate(new_ctx,
248 "Change",
249 SLAB_DEFAULT_BLOCK_SIZE,
250 sizeof(ReorderBufferChange));
251
252 buffer->txn_context = SlabContextCreate(new_ctx,
253 "TXN",
254 SLAB_DEFAULT_BLOCK_SIZE,
255 sizeof(ReorderBufferTXN));
256
257 buffer->tup_context = GenerationContextCreate(new_ctx,
258 "Tuples",
259 SLAB_LARGE_BLOCK_SIZE);
260
261 hash_ctl.keysize = sizeof(TransactionId);
262 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
263 hash_ctl.hcxt = buffer->context;
264
265 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
266 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
267
268 buffer->by_txn_last_xid = InvalidTransactionId;
269 buffer->by_txn_last_txn = NULL;
270
271 buffer->outbuf = NULL;
272 buffer->outbufsize = 0;
273
274 buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
275
276 dlist_init(&buffer->toplevel_by_lsn);
277 dlist_init(&buffer->txns_by_base_snapshot_lsn);
278
279 /*
280 * Ensure there's no stale data from prior uses of this slot, in case some
281 * prior exit avoided calling ReorderBufferFree. Failure to do this can
282 * produce duplicated txns, and it's very cheap if there's nothing there.
283 */
284 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
285
286 return buffer;
287 }
288
289 /*
290 * Free a ReorderBuffer
291 */
292 void
ReorderBufferFree(ReorderBuffer * rb)293 ReorderBufferFree(ReorderBuffer *rb)
294 {
295 MemoryContext context = rb->context;
296
297 /*
298 * We free separately allocated data by entirely scrapping reorderbuffer's
299 * memory context.
300 */
301 MemoryContextDelete(context);
302
303 /* Free disk space used by unconsumed reorder buffers */
304 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
305 }
306
307 /*
308 * Get an unused, possibly preallocated, ReorderBufferTXN.
309 */
310 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)311 ReorderBufferGetTXN(ReorderBuffer *rb)
312 {
313 ReorderBufferTXN *txn;
314
315 txn = (ReorderBufferTXN *)
316 MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
317
318 memset(txn, 0, sizeof(ReorderBufferTXN));
319
320 dlist_init(&txn->changes);
321 dlist_init(&txn->tuplecids);
322 dlist_init(&txn->subtxns);
323
324 return txn;
325 }
326
327 /*
328 * Free a ReorderBufferTXN.
329 */
330 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)331 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
332 {
333 /* clean the lookup cache if we were cached (quite likely) */
334 if (rb->by_txn_last_xid == txn->xid)
335 {
336 rb->by_txn_last_xid = InvalidTransactionId;
337 rb->by_txn_last_txn = NULL;
338 }
339
340 /* free data that's contained */
341
342 if (txn->tuplecid_hash != NULL)
343 {
344 hash_destroy(txn->tuplecid_hash);
345 txn->tuplecid_hash = NULL;
346 }
347
348 if (txn->invalidations)
349 {
350 pfree(txn->invalidations);
351 txn->invalidations = NULL;
352 }
353
354 /* Reset the toast hash */
355 ReorderBufferToastReset(rb, txn);
356
357 pfree(txn);
358 }
359
360 /*
361 * Get an fresh ReorderBufferChange.
362 */
363 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)364 ReorderBufferGetChange(ReorderBuffer *rb)
365 {
366 ReorderBufferChange *change;
367
368 change = (ReorderBufferChange *)
369 MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
370
371 memset(change, 0, sizeof(ReorderBufferChange));
372 return change;
373 }
374
375 /*
376 * Free an ReorderBufferChange.
377 */
378 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change)379 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
380 {
381 /* free contained data */
382 switch (change->action)
383 {
384 case REORDER_BUFFER_CHANGE_INSERT:
385 case REORDER_BUFFER_CHANGE_UPDATE:
386 case REORDER_BUFFER_CHANGE_DELETE:
387 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
388 if (change->data.tp.newtuple)
389 {
390 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
391 change->data.tp.newtuple = NULL;
392 }
393
394 if (change->data.tp.oldtuple)
395 {
396 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
397 change->data.tp.oldtuple = NULL;
398 }
399 break;
400 case REORDER_BUFFER_CHANGE_MESSAGE:
401 if (change->data.msg.prefix != NULL)
402 pfree(change->data.msg.prefix);
403 change->data.msg.prefix = NULL;
404 if (change->data.msg.message != NULL)
405 pfree(change->data.msg.message);
406 change->data.msg.message = NULL;
407 break;
408 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
409 if (change->data.snapshot)
410 {
411 ReorderBufferFreeSnap(rb, change->data.snapshot);
412 change->data.snapshot = NULL;
413 }
414 break;
415 /* no data in addition to the struct itself */
416 case REORDER_BUFFER_CHANGE_TRUNCATE:
417 if (change->data.truncate.relids != NULL)
418 {
419 ReorderBufferReturnRelids(rb, change->data.truncate.relids);
420 change->data.truncate.relids = NULL;
421 }
422 break;
423 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
424 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
425 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
426 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
427 break;
428 }
429
430 pfree(change);
431 }
432
433 /*
434 * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
435 * tuple_len (excluding header overhead).
436 */
437 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)438 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
439 {
440 ReorderBufferTupleBuf *tuple;
441 Size alloc_len;
442
443 alloc_len = tuple_len + SizeofHeapTupleHeader;
444
445 tuple = (ReorderBufferTupleBuf *)
446 MemoryContextAlloc(rb->tup_context,
447 sizeof(ReorderBufferTupleBuf) +
448 MAXIMUM_ALIGNOF + alloc_len);
449 tuple->alloc_tuple_size = alloc_len;
450 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
451
452 return tuple;
453 }
454
455 /*
456 * Free an ReorderBufferTupleBuf.
457 */
458 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)459 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
460 {
461 pfree(tuple);
462 }
463
464 /*
465 * Get an array for relids of truncated relations.
466 *
467 * We use the global memory context (for the whole reorder buffer), because
468 * none of the existing ones seems like a good match (some are SLAB, so we
469 * can't use those, and tup_context is meant for tuple data, not relids). We
470 * could add yet another context, but it seems like an overkill - TRUNCATE is
471 * not particularly common operation, so it does not seem worth it.
472 */
473 Oid *
ReorderBufferGetRelids(ReorderBuffer * rb,int nrelids)474 ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
475 {
476 Oid *relids;
477 Size alloc_len;
478
479 alloc_len = sizeof(Oid) * nrelids;
480
481 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
482
483 return relids;
484 }
485
486 /*
487 * Free an array of relids.
488 */
489 void
ReorderBufferReturnRelids(ReorderBuffer * rb,Oid * relids)490 ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
491 {
492 pfree(relids);
493 }
494
495 /*
496 * Return the ReorderBufferTXN from the given buffer, specified by Xid.
497 * If create is true, and a transaction doesn't already exist, create it
498 * (with the given LSN, and as top transaction if that's specified);
499 * when this happens, is_new is set to true.
500 */
501 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)502 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
503 bool *is_new, XLogRecPtr lsn, bool create_as_top)
504 {
505 ReorderBufferTXN *txn;
506 ReorderBufferTXNByIdEnt *ent;
507 bool found;
508
509 Assert(TransactionIdIsValid(xid));
510
511 /*
512 * Check the one-entry lookup cache first
513 */
514 if (TransactionIdIsValid(rb->by_txn_last_xid) &&
515 rb->by_txn_last_xid == xid)
516 {
517 txn = rb->by_txn_last_txn;
518
519 if (txn != NULL)
520 {
521 /* found it, and it's valid */
522 if (is_new)
523 *is_new = false;
524 return txn;
525 }
526
527 /*
528 * cached as non-existent, and asked not to create? Then nothing else
529 * to do.
530 */
531 if (!create)
532 return NULL;
533 /* otherwise fall through to create it */
534 }
535
536 /*
537 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
538 * to create an entry.
539 */
540
541 /* search the lookup table */
542 ent = (ReorderBufferTXNByIdEnt *)
543 hash_search(rb->by_txn,
544 (void *) &xid,
545 create ? HASH_ENTER : HASH_FIND,
546 &found);
547 if (found)
548 txn = ent->txn;
549 else if (create)
550 {
551 /* initialize the new entry, if creation was requested */
552 Assert(ent != NULL);
553 Assert(lsn != InvalidXLogRecPtr);
554
555 ent->txn = ReorderBufferGetTXN(rb);
556 ent->txn->xid = xid;
557 txn = ent->txn;
558 txn->first_lsn = lsn;
559 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
560
561 if (create_as_top)
562 {
563 dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
564 AssertTXNLsnOrder(rb);
565 }
566 }
567 else
568 txn = NULL; /* not found and not asked to create */
569
570 /* update cache */
571 rb->by_txn_last_xid = xid;
572 rb->by_txn_last_txn = txn;
573
574 if (is_new)
575 *is_new = !found;
576
577 Assert(!create || txn != NULL);
578 return txn;
579 }
580
581 /*
582 * Queue a change into a transaction so it can be replayed upon commit.
583 */
584 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change)585 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
586 ReorderBufferChange *change)
587 {
588 ReorderBufferTXN *txn;
589
590 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
591
592 change->lsn = lsn;
593 Assert(InvalidXLogRecPtr != lsn);
594 dlist_push_tail(&txn->changes, &change->node);
595 txn->nentries++;
596 txn->nentries_mem++;
597
598 ReorderBufferCheckSerializeTXN(rb, txn);
599 }
600
601 /*
602 * Queue message into a transaction so it can be processed upon commit.
603 */
604 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)605 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
606 Snapshot snapshot, XLogRecPtr lsn,
607 bool transactional, const char *prefix,
608 Size message_size, const char *message)
609 {
610 if (transactional)
611 {
612 MemoryContext oldcontext;
613 ReorderBufferChange *change;
614
615 Assert(xid != InvalidTransactionId);
616
617 oldcontext = MemoryContextSwitchTo(rb->context);
618
619 change = ReorderBufferGetChange(rb);
620 change->action = REORDER_BUFFER_CHANGE_MESSAGE;
621 change->data.msg.prefix = pstrdup(prefix);
622 change->data.msg.message_size = message_size;
623 change->data.msg.message = palloc(message_size);
624 memcpy(change->data.msg.message, message, message_size);
625
626 ReorderBufferQueueChange(rb, xid, lsn, change);
627
628 MemoryContextSwitchTo(oldcontext);
629 }
630 else
631 {
632 ReorderBufferTXN *txn = NULL;
633 volatile Snapshot snapshot_now = snapshot;
634
635 if (xid != InvalidTransactionId)
636 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
637
638 /* setup snapshot to allow catalog access */
639 SetupHistoricSnapshot(snapshot_now, NULL);
640 PG_TRY();
641 {
642 rb->message(rb, txn, lsn, false, prefix, message_size, message);
643
644 TeardownHistoricSnapshot(false);
645 }
646 PG_CATCH();
647 {
648 TeardownHistoricSnapshot(true);
649 PG_RE_THROW();
650 }
651 PG_END_TRY();
652 }
653 }
654
655 /*
656 * AssertTXNLsnOrder
657 * Verify LSN ordering of transaction lists in the reorderbuffer
658 *
659 * Other LSN-related invariants are checked too.
660 *
661 * No-op if assertions are not in use.
662 */
663 static void
AssertTXNLsnOrder(ReorderBuffer * rb)664 AssertTXNLsnOrder(ReorderBuffer *rb)
665 {
666 #ifdef USE_ASSERT_CHECKING
667 dlist_iter iter;
668 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
669 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
670
671 dlist_foreach(iter, &rb->toplevel_by_lsn)
672 {
673 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
674 iter.cur);
675
676 /* start LSN must be set */
677 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
678
679 /* If there is an end LSN, it must be higher than start LSN */
680 if (cur_txn->end_lsn != InvalidXLogRecPtr)
681 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
682
683 /* Current initial LSN must be strictly higher than previous */
684 if (prev_first_lsn != InvalidXLogRecPtr)
685 Assert(prev_first_lsn < cur_txn->first_lsn);
686
687 /* known-as-subtxn txns must not be listed */
688 Assert(!cur_txn->is_known_as_subxact);
689
690 prev_first_lsn = cur_txn->first_lsn;
691 }
692
693 dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
694 {
695 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
696 base_snapshot_node,
697 iter.cur);
698
699 /* base snapshot (and its LSN) must be set */
700 Assert(cur_txn->base_snapshot != NULL);
701 Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
702
703 /* current LSN must be strictly higher than previous */
704 if (prev_base_snap_lsn != InvalidXLogRecPtr)
705 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
706
707 /* known-as-subtxn txns must not be listed */
708 Assert(!cur_txn->is_known_as_subxact);
709
710 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
711 }
712 #endif
713 }
714
715 /*
716 * ReorderBufferGetOldestTXN
717 * Return oldest transaction in reorderbuffer
718 */
719 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)720 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
721 {
722 ReorderBufferTXN *txn;
723
724 AssertTXNLsnOrder(rb);
725
726 if (dlist_is_empty(&rb->toplevel_by_lsn))
727 return NULL;
728
729 txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
730
731 Assert(!txn->is_known_as_subxact);
732 Assert(txn->first_lsn != InvalidXLogRecPtr);
733 return txn;
734 }
735
736 /*
737 * ReorderBufferGetOldestXmin
738 * Return oldest Xmin in reorderbuffer
739 *
740 * Returns oldest possibly running Xid from the point of view of snapshots
741 * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
742 * there are none.
743 *
744 * Since snapshots are assigned monotonically, this equals the Xmin of the
745 * base snapshot with minimal base_snapshot_lsn.
746 */
747 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)748 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
749 {
750 ReorderBufferTXN *txn;
751
752 AssertTXNLsnOrder(rb);
753
754 if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
755 return InvalidTransactionId;
756
757 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
758 &rb->txns_by_base_snapshot_lsn);
759 return txn->base_snapshot->xmin;
760 }
761
762 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)763 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
764 {
765 rb->current_restart_decoding_lsn = ptr;
766 }
767
768 /*
769 * ReorderBufferAssignChild
770 *
771 * Make note that we know that subxid is a subtransaction of xid, seen as of
772 * the given lsn.
773 */
774 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)775 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
776 TransactionId subxid, XLogRecPtr lsn)
777 {
778 ReorderBufferTXN *txn;
779 ReorderBufferTXN *subtxn;
780 bool new_top;
781 bool new_sub;
782
783 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
784 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
785
786 if (!new_sub)
787 {
788 if (subtxn->is_known_as_subxact)
789 {
790 /* already associated, nothing to do */
791 return;
792 }
793 else
794 {
795 /*
796 * We already saw this transaction, but initially added it to the
797 * list of top-level txns. Now that we know it's not top-level,
798 * remove it from there.
799 */
800 dlist_delete(&subtxn->node);
801 }
802 }
803
804 subtxn->is_known_as_subxact = true;
805 subtxn->toplevel_xid = xid;
806 Assert(subtxn->nsubtxns == 0);
807
808 /* add to subtransaction list */
809 dlist_push_tail(&txn->subtxns, &subtxn->node);
810 txn->nsubtxns++;
811
812 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
813 ReorderBufferTransferSnapToParent(txn, subtxn);
814
815 /* Verify LSN-ordering invariant */
816 AssertTXNLsnOrder(rb);
817 }
818
819 /*
820 * ReorderBufferTransferSnapToParent
821 * Transfer base snapshot from subtxn to top-level txn, if needed
822 *
823 * This is done if the top-level txn doesn't have a base snapshot, or if the
824 * subtxn's base snapshot has an earlier LSN than the top-level txn's base
825 * snapshot's LSN. This can happen if there are no changes in the toplevel
826 * txn but there are some in the subtxn, or the first change in subtxn has
827 * earlier LSN than first change in the top-level txn and we learned about
828 * their kinship only now.
829 *
830 * The subtransaction's snapshot is cleared regardless of the transfer
831 * happening, since it's not needed anymore in either case.
832 *
833 * We do this as soon as we become aware of their kinship, to avoid queueing
834 * extra snapshots to txns known-as-subtxns -- only top-level txns will
835 * receive further snapshots.
836 */
837 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)838 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
839 ReorderBufferTXN *subtxn)
840 {
841 Assert(subtxn->toplevel_xid == txn->xid);
842
843 if (subtxn->base_snapshot != NULL)
844 {
845 if (txn->base_snapshot == NULL ||
846 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
847 {
848 /*
849 * If the toplevel transaction already has a base snapshot but
850 * it's newer than the subxact's, purge it.
851 */
852 if (txn->base_snapshot != NULL)
853 {
854 SnapBuildSnapDecRefcount(txn->base_snapshot);
855 dlist_delete(&txn->base_snapshot_node);
856 }
857
858 /*
859 * The snapshot is now the top transaction's; transfer it, and
860 * adjust the list position of the top transaction in the list by
861 * moving it to where the subtransaction is.
862 */
863 txn->base_snapshot = subtxn->base_snapshot;
864 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
865 dlist_insert_before(&subtxn->base_snapshot_node,
866 &txn->base_snapshot_node);
867
868 /*
869 * The subtransaction doesn't have a snapshot anymore (so it
870 * mustn't be in the list.)
871 */
872 subtxn->base_snapshot = NULL;
873 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
874 dlist_delete(&subtxn->base_snapshot_node);
875 }
876 else
877 {
878 /* Base snap of toplevel is fine, so subxact's is not needed */
879 SnapBuildSnapDecRefcount(subtxn->base_snapshot);
880 dlist_delete(&subtxn->base_snapshot_node);
881 subtxn->base_snapshot = NULL;
882 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
883 }
884 }
885 }
886
887 /*
888 * Associate a subtransaction with its toplevel transaction at commit
889 * time. There may be no further changes added after this.
890 */
891 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)892 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
893 TransactionId subxid, XLogRecPtr commit_lsn,
894 XLogRecPtr end_lsn)
895 {
896 ReorderBufferTXN *subtxn;
897
898 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
899 InvalidXLogRecPtr, false);
900
901 /*
902 * No need to do anything if that subtxn didn't contain any changes
903 */
904 if (!subtxn)
905 return;
906
907 subtxn->final_lsn = commit_lsn;
908 subtxn->end_lsn = end_lsn;
909
910 /*
911 * Assign this subxact as a child of the toplevel xact (no-op if already
912 * done.)
913 */
914 ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
915 }
916
917
918 /*
919 * Support for efficiently iterating over a transaction's and its
920 * subtransactions' changes.
921 *
922 * We do by doing a k-way merge between transactions/subtransactions. For that
923 * we model the current heads of the different transactions as a binary heap
924 * so we easily know which (sub-)transaction has the change with the smallest
925 * lsn next.
926 *
927 * We assume the changes in individual transactions are already sorted by LSN.
928 */
929
930 /*
931 * Binary heap comparison function.
932 */
933 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)934 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
935 {
936 ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
937 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
938 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
939
940 if (pos_a < pos_b)
941 return 1;
942 else if (pos_a == pos_b)
943 return 0;
944 return -1;
945 }
946
947 /*
948 * Allocate & initialize an iterator which iterates in lsn order over a
949 * transaction and all its subtransactions.
950 *
951 * Note: The iterator state is returned through iter_state parameter rather
952 * than the function's return value. This is because the state gets cleaned up
953 * in a PG_CATCH block in the caller, so we want to make sure the caller gets
954 * back the state even if this function throws an exception.
955 */
956 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)957 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
958 ReorderBufferIterTXNState *volatile *iter_state)
959 {
960 Size nr_txns = 0;
961 ReorderBufferIterTXNState *state;
962 dlist_iter cur_txn_i;
963 int32 off;
964
965 *iter_state = NULL;
966
967 /*
968 * Calculate the size of our heap: one element for every transaction that
969 * contains changes. (Besides the transactions already in the reorder
970 * buffer, we count the one we were directly passed.)
971 */
972 if (txn->nentries > 0)
973 nr_txns++;
974
975 dlist_foreach(cur_txn_i, &txn->subtxns)
976 {
977 ReorderBufferTXN *cur_txn;
978
979 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
980
981 if (cur_txn->nentries > 0)
982 nr_txns++;
983 }
984
985 /*
986 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
987 * need to allocate/build a heap then.
988 */
989
990 /* allocate iteration state */
991 state = (ReorderBufferIterTXNState *)
992 MemoryContextAllocZero(rb->context,
993 sizeof(ReorderBufferIterTXNState) +
994 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
995
996 state->nr_txns = nr_txns;
997 dlist_init(&state->old_change);
998
999 for (off = 0; off < state->nr_txns; off++)
1000 {
1001 state->entries[off].fd = -1;
1002 state->entries[off].segno = 0;
1003 }
1004
1005 /* allocate heap */
1006 state->heap = binaryheap_allocate(state->nr_txns,
1007 ReorderBufferIterCompare,
1008 state);
1009
1010 /* Now that the state fields are initialized, it is safe to return it. */
1011 *iter_state = state;
1012
1013 /*
1014 * Now insert items into the binary heap, in an unordered fashion. (We
1015 * will run a heap assembly step at the end; this is more efficient.)
1016 */
1017
1018 off = 0;
1019
1020 /* add toplevel transaction if it contains changes */
1021 if (txn->nentries > 0)
1022 {
1023 ReorderBufferChange *cur_change;
1024
1025 if (txn->serialized)
1026 {
1027 /* serialize remaining changes */
1028 ReorderBufferSerializeTXN(rb, txn);
1029 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1030 &state->entries[off].segno);
1031 }
1032
1033 cur_change = dlist_head_element(ReorderBufferChange, node,
1034 &txn->changes);
1035
1036 state->entries[off].lsn = cur_change->lsn;
1037 state->entries[off].change = cur_change;
1038 state->entries[off].txn = txn;
1039
1040 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1041 }
1042
1043 /* add subtransactions if they contain changes */
1044 dlist_foreach(cur_txn_i, &txn->subtxns)
1045 {
1046 ReorderBufferTXN *cur_txn;
1047
1048 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1049
1050 if (cur_txn->nentries > 0)
1051 {
1052 ReorderBufferChange *cur_change;
1053
1054 if (cur_txn->serialized)
1055 {
1056 /* serialize remaining changes */
1057 ReorderBufferSerializeTXN(rb, cur_txn);
1058 ReorderBufferRestoreChanges(rb, cur_txn,
1059 &state->entries[off].fd,
1060 &state->entries[off].segno);
1061 }
1062 cur_change = dlist_head_element(ReorderBufferChange, node,
1063 &cur_txn->changes);
1064
1065 state->entries[off].lsn = cur_change->lsn;
1066 state->entries[off].change = cur_change;
1067 state->entries[off].txn = cur_txn;
1068
1069 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1070 }
1071 }
1072
1073 /* assemble a valid binary heap */
1074 binaryheap_build(state->heap);
1075 }
1076
1077 /*
1078 * Return the next change when iterating over a transaction and its
1079 * subtransactions.
1080 *
1081 * Returns NULL when no further changes exist.
1082 */
1083 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1084 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1085 {
1086 ReorderBufferChange *change;
1087 ReorderBufferIterTXNEntry *entry;
1088 int32 off;
1089
1090 /* nothing there anymore */
1091 if (state->heap->bh_size == 0)
1092 return NULL;
1093
1094 off = DatumGetInt32(binaryheap_first(state->heap));
1095 entry = &state->entries[off];
1096
1097 /* free memory we might have "leaked" in the previous *Next call */
1098 if (!dlist_is_empty(&state->old_change))
1099 {
1100 change = dlist_container(ReorderBufferChange, node,
1101 dlist_pop_head_node(&state->old_change));
1102 ReorderBufferReturnChange(rb, change);
1103 Assert(dlist_is_empty(&state->old_change));
1104 }
1105
1106 change = entry->change;
1107
1108 /*
1109 * update heap with information about which transaction has the next
1110 * relevant change in LSN order
1111 */
1112
1113 /* there are in-memory changes */
1114 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1115 {
1116 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1117 ReorderBufferChange *next_change =
1118 dlist_container(ReorderBufferChange, node, next);
1119
1120 /* txn stays the same */
1121 state->entries[off].lsn = next_change->lsn;
1122 state->entries[off].change = next_change;
1123
1124 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1125 return change;
1126 }
1127
1128 /* try to load changes from disk */
1129 if (entry->txn->nentries != entry->txn->nentries_mem)
1130 {
1131 /*
1132 * Ugly: restoring changes will reuse *Change records, thus delete the
1133 * current one from the per-tx list and only free in the next call.
1134 */
1135 dlist_delete(&change->node);
1136 dlist_push_tail(&state->old_change, &change->node);
1137
1138 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1139 &state->entries[off].segno))
1140 {
1141 /* successfully restored changes from disk */
1142 ReorderBufferChange *next_change =
1143 dlist_head_element(ReorderBufferChange, node,
1144 &entry->txn->changes);
1145
1146 elog(DEBUG2, "restored %u/%u changes from disk",
1147 (uint32) entry->txn->nentries_mem,
1148 (uint32) entry->txn->nentries);
1149
1150 Assert(entry->txn->nentries_mem);
1151 /* txn stays the same */
1152 state->entries[off].lsn = next_change->lsn;
1153 state->entries[off].change = next_change;
1154 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1155
1156 return change;
1157 }
1158 }
1159
1160 /* ok, no changes there anymore, remove */
1161 binaryheap_remove_first(state->heap);
1162
1163 return change;
1164 }
1165
1166 /*
1167 * Deallocate the iterator
1168 */
1169 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1170 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1171 ReorderBufferIterTXNState *state)
1172 {
1173 int32 off;
1174
1175 for (off = 0; off < state->nr_txns; off++)
1176 {
1177 if (state->entries[off].fd != -1)
1178 FileClose(state->entries[off].fd);
1179 }
1180
1181 /* free memory we might have "leaked" in the last *Next call */
1182 if (!dlist_is_empty(&state->old_change))
1183 {
1184 ReorderBufferChange *change;
1185
1186 change = dlist_container(ReorderBufferChange, node,
1187 dlist_pop_head_node(&state->old_change));
1188 ReorderBufferReturnChange(rb, change);
1189 Assert(dlist_is_empty(&state->old_change));
1190 }
1191
1192 binaryheap_free(state->heap);
1193 pfree(state);
1194 }
1195
1196 /*
1197 * Cleanup the contents of a transaction, usually after the transaction
1198 * committed or aborted.
1199 */
1200 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1201 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1202 {
1203 bool found;
1204 dlist_mutable_iter iter;
1205
1206 /* cleanup subtransactions & their changes */
1207 dlist_foreach_modify(iter, &txn->subtxns)
1208 {
1209 ReorderBufferTXN *subtxn;
1210
1211 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1212
1213 /*
1214 * Subtransactions are always associated to the toplevel TXN, even if
1215 * they originally were happening inside another subtxn, so we won't
1216 * ever recurse more than one level deep here.
1217 */
1218 Assert(subtxn->is_known_as_subxact);
1219 Assert(subtxn->nsubtxns == 0);
1220
1221 ReorderBufferCleanupTXN(rb, subtxn);
1222 }
1223
1224 /* cleanup changes in the toplevel txn */
1225 dlist_foreach_modify(iter, &txn->changes)
1226 {
1227 ReorderBufferChange *change;
1228
1229 change = dlist_container(ReorderBufferChange, node, iter.cur);
1230
1231 ReorderBufferReturnChange(rb, change);
1232 }
1233
1234 /*
1235 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1236 * They are always stored in the toplevel transaction.
1237 */
1238 dlist_foreach_modify(iter, &txn->tuplecids)
1239 {
1240 ReorderBufferChange *change;
1241
1242 change = dlist_container(ReorderBufferChange, node, iter.cur);
1243 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1244 ReorderBufferReturnChange(rb, change);
1245 }
1246
1247 /*
1248 * Cleanup the base snapshot, if set.
1249 */
1250 if (txn->base_snapshot != NULL)
1251 {
1252 SnapBuildSnapDecRefcount(txn->base_snapshot);
1253 dlist_delete(&txn->base_snapshot_node);
1254 }
1255
1256 /*
1257 * Remove TXN from its containing list.
1258 *
1259 * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1260 * parent's list of known subxacts; this leaves the parent's nsubxacts
1261 * count too high, but we don't care. Otherwise, we are deleting the TXN
1262 * from the LSN-ordered list of toplevel TXNs.
1263 */
1264 dlist_delete(&txn->node);
1265
1266 /* now remove reference from buffer */
1267 hash_search(rb->by_txn,
1268 (void *) &txn->xid,
1269 HASH_REMOVE,
1270 &found);
1271 Assert(found);
1272
1273 /* remove entries spilled to disk */
1274 if (txn->serialized)
1275 ReorderBufferRestoreCleanup(rb, txn);
1276
1277 /* deallocate */
1278 ReorderBufferReturnTXN(rb, txn);
1279 }
1280
1281 /*
1282 * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1283 * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1284 */
1285 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1286 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1287 {
1288 dlist_iter iter;
1289 HASHCTL hash_ctl;
1290
1291 if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1292 return;
1293
1294 memset(&hash_ctl, 0, sizeof(hash_ctl));
1295
1296 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1297 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1298 hash_ctl.hcxt = rb->context;
1299
1300 /*
1301 * create the hash with the exact number of to-be-stored tuplecids from
1302 * the start
1303 */
1304 txn->tuplecid_hash =
1305 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1306 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1307
1308 dlist_foreach(iter, &txn->tuplecids)
1309 {
1310 ReorderBufferTupleCidKey key;
1311 ReorderBufferTupleCidEnt *ent;
1312 bool found;
1313 ReorderBufferChange *change;
1314
1315 change = dlist_container(ReorderBufferChange, node, iter.cur);
1316
1317 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1318
1319 /* be careful about padding */
1320 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1321
1322 key.relnode = change->data.tuplecid.node;
1323
1324 ItemPointerCopy(&change->data.tuplecid.tid,
1325 &key.tid);
1326
1327 ent = (ReorderBufferTupleCidEnt *)
1328 hash_search(txn->tuplecid_hash,
1329 (void *) &key,
1330 HASH_ENTER,
1331 &found);
1332 if (!found)
1333 {
1334 ent->cmin = change->data.tuplecid.cmin;
1335 ent->cmax = change->data.tuplecid.cmax;
1336 ent->combocid = change->data.tuplecid.combocid;
1337 }
1338 else
1339 {
1340 /*
1341 * Maybe we already saw this tuple before in this transaction,
1342 * but if so it must have the same cmin.
1343 */
1344 Assert(ent->cmin == change->data.tuplecid.cmin);
1345
1346 /*
1347 * cmax may be initially invalid, but once set it can only grow,
1348 * and never become invalid again.
1349 */
1350 Assert((ent->cmax == InvalidCommandId) ||
1351 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1352 (change->data.tuplecid.cmax > ent->cmax)));
1353 ent->cmax = change->data.tuplecid.cmax;
1354 }
1355 }
1356 }
1357
1358 /*
1359 * Copy a provided snapshot so we can modify it privately. This is needed so
1360 * that catalog modifying transactions can look into intermediate catalog
1361 * states.
1362 */
1363 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1364 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1365 ReorderBufferTXN *txn, CommandId cid)
1366 {
1367 Snapshot snap;
1368 dlist_iter iter;
1369 int i = 0;
1370 Size size;
1371
1372 size = sizeof(SnapshotData) +
1373 sizeof(TransactionId) * orig_snap->xcnt +
1374 sizeof(TransactionId) * (txn->nsubtxns + 1);
1375
1376 snap = MemoryContextAllocZero(rb->context, size);
1377 memcpy(snap, orig_snap, sizeof(SnapshotData));
1378
1379 snap->copied = true;
1380 snap->active_count = 1; /* mark as active so nobody frees it */
1381 snap->regd_count = 0;
1382 snap->xip = (TransactionId *) (snap + 1);
1383
1384 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1385
1386 /*
1387 * snap->subxip contains all txids that belong to our transaction which we
1388 * need to check via cmin/cmax. That's why we store the toplevel
1389 * transaction in there as well.
1390 */
1391 snap->subxip = snap->xip + snap->xcnt;
1392 snap->subxip[i++] = txn->xid;
1393
1394 /*
1395 * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1396 * Since it's an upper boundary it is safe to use it for the allocation
1397 * above.
1398 */
1399 snap->subxcnt = 1;
1400
1401 dlist_foreach(iter, &txn->subtxns)
1402 {
1403 ReorderBufferTXN *sub_txn;
1404
1405 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1406 snap->subxip[i++] = sub_txn->xid;
1407 snap->subxcnt++;
1408 }
1409
1410 /* sort so we can bsearch() later */
1411 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1412
1413 /* store the specified current CommandId */
1414 snap->curcid = cid;
1415
1416 return snap;
1417 }
1418
1419 /*
1420 * Free a previously ReorderBufferCopySnap'ed snapshot
1421 */
1422 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1423 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1424 {
1425 if (snap->copied)
1426 pfree(snap);
1427 else
1428 SnapBuildSnapDecRefcount(snap);
1429 }
1430
1431 /*
1432 * Perform the replay of a transaction and its non-aborted subtransactions.
1433 *
1434 * Subtransactions previously have to be processed by
1435 * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1436 * transaction with ReorderBufferAssignChild.
1437 *
1438 * We currently can only decode a transaction's contents when its commit
1439 * record is read because that's the only place where we know about cache
1440 * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1441 * and subtransactions (using a k-way merge) and replay the changes in lsn
1442 * order.
1443 */
1444 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)1445 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1446 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1447 TimestampTz commit_time,
1448 RepOriginId origin_id, XLogRecPtr origin_lsn)
1449 {
1450 ReorderBufferTXN *txn;
1451 volatile Snapshot snapshot_now;
1452 volatile CommandId command_id = FirstCommandId;
1453 bool using_subtxn;
1454 ReorderBufferIterTXNState *volatile iterstate = NULL;
1455
1456 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1457 false);
1458
1459 /* unknown transaction, nothing to replay */
1460 if (txn == NULL)
1461 return;
1462
1463 txn->final_lsn = commit_lsn;
1464 txn->end_lsn = end_lsn;
1465 txn->commit_time = commit_time;
1466 txn->origin_id = origin_id;
1467 txn->origin_lsn = origin_lsn;
1468
1469 /*
1470 * If this transaction has no snapshot, it didn't make any changes to the
1471 * database, so there's nothing to decode. Note that
1472 * ReorderBufferCommitChild will have transferred any snapshots from
1473 * subtransactions if there were any.
1474 */
1475 if (txn->base_snapshot == NULL)
1476 {
1477 Assert(txn->ninvalidations == 0);
1478 ReorderBufferCleanupTXN(rb, txn);
1479 return;
1480 }
1481
1482 snapshot_now = txn->base_snapshot;
1483
1484 /* build data to be able to lookup the CommandIds of catalog tuples */
1485 ReorderBufferBuildTupleCidHash(rb, txn);
1486
1487 /* setup the initial snapshot */
1488 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1489
1490 /*
1491 * Decoding needs access to syscaches et al., which in turn use
1492 * heavyweight locks and such. Thus we need to have enough state around to
1493 * keep track of those. The easiest way is to simply use a transaction
1494 * internally. That also allows us to easily enforce that nothing writes
1495 * to the database by checking for xid assignments.
1496 *
1497 * When we're called via the SQL SRF there's already a transaction
1498 * started, so start an explicit subtransaction there.
1499 */
1500 using_subtxn = IsTransactionOrTransactionBlock();
1501
1502 PG_TRY();
1503 {
1504 ReorderBufferChange *change;
1505 ReorderBufferChange *specinsert = NULL;
1506
1507 if (using_subtxn)
1508 BeginInternalSubTransaction("replay");
1509 else
1510 StartTransactionCommand();
1511
1512 rb->begin(rb, txn);
1513
1514 ReorderBufferIterTXNInit(rb, txn, &iterstate);
1515 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1516 {
1517 Relation relation = NULL;
1518 Oid reloid;
1519
1520 switch (change->action)
1521 {
1522 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1523
1524 /*
1525 * Confirmation for speculative insertion arrived. Simply
1526 * use as a normal record. It'll be cleaned up at the end
1527 * of INSERT processing.
1528 */
1529 if (specinsert == NULL)
1530 elog(ERROR, "invalid ordering of speculative insertion changes");
1531 Assert(specinsert->data.tp.oldtuple == NULL);
1532 change = specinsert;
1533 change->action = REORDER_BUFFER_CHANGE_INSERT;
1534
1535 /* intentionally fall through */
1536 case REORDER_BUFFER_CHANGE_INSERT:
1537 case REORDER_BUFFER_CHANGE_UPDATE:
1538 case REORDER_BUFFER_CHANGE_DELETE:
1539 Assert(snapshot_now);
1540
1541 reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1542 change->data.tp.relnode.relNode);
1543
1544 /*
1545 * Mapped catalog tuple without data, emitted while
1546 * catalog table was in the process of being rewritten. We
1547 * can fail to look up the relfilenode, because the the
1548 * relmapper has no "historic" view, in contrast to normal
1549 * the normal catalog during decoding. Thus repeated
1550 * rewrites can cause a lookup failure. That's OK because
1551 * we do not decode catalog changes anyway. Normally such
1552 * tuples would be skipped over below, but we can't
1553 * identify whether the table should be logically logged
1554 * without mapping the relfilenode to the oid.
1555 */
1556 if (reloid == InvalidOid &&
1557 change->data.tp.newtuple == NULL &&
1558 change->data.tp.oldtuple == NULL)
1559 goto change_done;
1560 else if (reloid == InvalidOid)
1561 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1562 relpathperm(change->data.tp.relnode,
1563 MAIN_FORKNUM));
1564
1565 relation = RelationIdGetRelation(reloid);
1566
1567 if (!RelationIsValid(relation))
1568 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1569 reloid,
1570 relpathperm(change->data.tp.relnode,
1571 MAIN_FORKNUM));
1572
1573 if (!RelationIsLogicallyLogged(relation))
1574 goto change_done;
1575
1576 /*
1577 * Ignore temporary heaps created during DDL unless the
1578 * plugin has asked for them.
1579 */
1580 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1581 goto change_done;
1582
1583 /*
1584 * For now ignore sequence changes entirely. Most of the
1585 * time they don't log changes using records we
1586 * understand, so it doesn't make sense to handle the few
1587 * cases we do.
1588 */
1589 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1590 goto change_done;
1591
1592 /* user-triggered change */
1593 if (!IsToastRelation(relation))
1594 {
1595 ReorderBufferToastReplace(rb, txn, relation, change);
1596 rb->apply_change(rb, txn, relation, change);
1597
1598 /*
1599 * Only clear reassembled toast chunks if we're sure
1600 * they're not required anymore. The creator of the
1601 * tuple tells us.
1602 */
1603 if (change->data.tp.clear_toast_afterwards)
1604 ReorderBufferToastReset(rb, txn);
1605 }
1606 /* we're not interested in toast deletions */
1607 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1608 {
1609 /*
1610 * Need to reassemble the full toasted Datum in
1611 * memory, to ensure the chunks don't get reused till
1612 * we're done remove it from the list of this
1613 * transaction's changes. Otherwise it will get
1614 * freed/reused while restoring spooled data from
1615 * disk.
1616 */
1617 Assert(change->data.tp.newtuple != NULL);
1618
1619 dlist_delete(&change->node);
1620 ReorderBufferToastAppendChunk(rb, txn, relation,
1621 change);
1622 }
1623
1624 change_done:
1625
1626 /*
1627 * If speculative insertion was confirmed, the record isn't
1628 * needed anymore.
1629 */
1630 if (specinsert != NULL)
1631 {
1632 ReorderBufferReturnChange(rb, specinsert);
1633 specinsert = NULL;
1634 }
1635
1636 if (relation != NULL)
1637 {
1638 RelationClose(relation);
1639 relation = NULL;
1640 }
1641 break;
1642
1643 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1644
1645 /*
1646 * Speculative insertions are dealt with by delaying the
1647 * processing of the insert until the confirmation record
1648 * arrives. For that we simply unlink the record from the
1649 * chain, so it does not get freed/reused while restoring
1650 * spooled data from disk.
1651 *
1652 * This is safe in the face of concurrent catalog changes
1653 * because the relevant relation can't be changed between
1654 * speculative insertion and confirmation due to
1655 * CheckTableNotInUse() and locking.
1656 */
1657
1658 /* clear out a pending (and thus failed) speculation */
1659 if (specinsert != NULL)
1660 {
1661 ReorderBufferReturnChange(rb, specinsert);
1662 specinsert = NULL;
1663 }
1664
1665 /* and memorize the pending insertion */
1666 dlist_delete(&change->node);
1667 specinsert = change;
1668 break;
1669
1670 case REORDER_BUFFER_CHANGE_TRUNCATE:
1671 {
1672 int i;
1673 int nrelids = change->data.truncate.nrelids;
1674 int nrelations = 0;
1675 Relation *relations;
1676
1677 relations = palloc0(nrelids * sizeof(Relation));
1678 for (i = 0; i < nrelids; i++)
1679 {
1680 Oid relid = change->data.truncate.relids[i];
1681 Relation relation;
1682
1683 relation = RelationIdGetRelation(relid);
1684
1685 if (!RelationIsValid(relation))
1686 elog(ERROR, "could not open relation with OID %u", relid);
1687
1688 if (!RelationIsLogicallyLogged(relation))
1689 continue;
1690
1691 relations[nrelations++] = relation;
1692 }
1693
1694 rb->apply_truncate(rb, txn, nrelations, relations, change);
1695
1696 for (i = 0; i < nrelations; i++)
1697 RelationClose(relations[i]);
1698
1699 break;
1700 }
1701
1702 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
1703
1704 /*
1705 * Abort for speculative insertion arrived. So cleanup the
1706 * specinsert tuple and toast hash.
1707 *
1708 * Note that we get the spec abort change for each toast
1709 * entry but we need to perform the cleanup only the first
1710 * time we get it for the main table.
1711 */
1712 if (specinsert != NULL)
1713 {
1714 /*
1715 * We must clean the toast hash before processing a
1716 * completely new tuple to avoid confusion about the
1717 * previous tuple's toast chunks.
1718 */
1719 Assert(change->data.tp.clear_toast_afterwards);
1720 ReorderBufferToastReset(rb, txn);
1721
1722 /* We don't need this record anymore. */
1723 ReorderBufferReturnChange(rb, specinsert);
1724 specinsert = NULL;
1725 }
1726 break;
1727
1728 case REORDER_BUFFER_CHANGE_MESSAGE:
1729 rb->message(rb, txn, change->lsn, true,
1730 change->data.msg.prefix,
1731 change->data.msg.message_size,
1732 change->data.msg.message);
1733 break;
1734
1735 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1736 /* get rid of the old */
1737 TeardownHistoricSnapshot(false);
1738
1739 if (snapshot_now->copied)
1740 {
1741 ReorderBufferFreeSnap(rb, snapshot_now);
1742 snapshot_now =
1743 ReorderBufferCopySnap(rb, change->data.snapshot,
1744 txn, command_id);
1745 }
1746
1747 /*
1748 * Restored from disk, need to be careful not to double
1749 * free. We could introduce refcounting for that, but for
1750 * now this seems infrequent enough not to care.
1751 */
1752 else if (change->data.snapshot->copied)
1753 {
1754 snapshot_now =
1755 ReorderBufferCopySnap(rb, change->data.snapshot,
1756 txn, command_id);
1757 }
1758 else
1759 {
1760 snapshot_now = change->data.snapshot;
1761 }
1762
1763
1764 /* and continue with the new one */
1765 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1766 break;
1767
1768 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1769 Assert(change->data.command_id != InvalidCommandId);
1770
1771 if (command_id < change->data.command_id)
1772 {
1773 command_id = change->data.command_id;
1774
1775 if (!snapshot_now->copied)
1776 {
1777 /* we don't use the global one anymore */
1778 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1779 txn, command_id);
1780 }
1781
1782 snapshot_now->curcid = command_id;
1783
1784 TeardownHistoricSnapshot(false);
1785 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1786
1787 /*
1788 * Every time the CommandId is incremented, we could
1789 * see new catalog contents, so execute all
1790 * invalidations.
1791 */
1792 ReorderBufferExecuteInvalidations(rb, txn);
1793 }
1794
1795 break;
1796
1797 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1798 elog(ERROR, "tuplecid value in changequeue");
1799 break;
1800 }
1801 }
1802
1803 /* speculative insertion record must be freed by now */
1804 Assert(!specinsert);
1805
1806 /* clean up the iterator */
1807 ReorderBufferIterTXNFinish(rb, iterstate);
1808 iterstate = NULL;
1809
1810 /* call commit callback */
1811 rb->commit(rb, txn, commit_lsn);
1812
1813 /* this is just a sanity check against bad output plugin behaviour */
1814 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1815 elog(ERROR, "output plugin used XID %u",
1816 GetCurrentTransactionId());
1817
1818 /* cleanup */
1819 TeardownHistoricSnapshot(false);
1820
1821 /*
1822 * Aborting the current (sub-)transaction as a whole has the right
1823 * semantics. We want all locks acquired in here to be released, not
1824 * reassigned to the parent and we do not want any database access
1825 * have persistent effects.
1826 */
1827 AbortCurrentTransaction();
1828
1829 /* make sure there's no cache pollution */
1830 ReorderBufferExecuteInvalidations(rb, txn);
1831
1832 if (using_subtxn)
1833 RollbackAndReleaseCurrentSubTransaction();
1834
1835 if (snapshot_now->copied)
1836 ReorderBufferFreeSnap(rb, snapshot_now);
1837
1838 /* remove potential on-disk data, and deallocate */
1839 ReorderBufferCleanupTXN(rb, txn);
1840 }
1841 PG_CATCH();
1842 {
1843 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1844 if (iterstate)
1845 ReorderBufferIterTXNFinish(rb, iterstate);
1846
1847 TeardownHistoricSnapshot(true);
1848
1849 /*
1850 * Force cache invalidation to happen outside of a valid transaction
1851 * to prevent catalog access as we just caught an error.
1852 */
1853 AbortCurrentTransaction();
1854
1855 /* make sure there's no cache pollution */
1856 ReorderBufferExecuteInvalidations(rb, txn);
1857
1858 if (using_subtxn)
1859 RollbackAndReleaseCurrentSubTransaction();
1860
1861 if (snapshot_now->copied)
1862 ReorderBufferFreeSnap(rb, snapshot_now);
1863
1864 /* remove potential on-disk data, and deallocate */
1865 ReorderBufferCleanupTXN(rb, txn);
1866
1867 PG_RE_THROW();
1868 }
1869 PG_END_TRY();
1870 }
1871
1872 /*
1873 * Abort a transaction that possibly has previous changes. Needs to be first
1874 * called for subtransactions and then for the toplevel xid.
1875 *
1876 * NB: Transactions handled here have to have actively aborted (i.e. have
1877 * produced an abort record). Implicitly aborted transactions are handled via
1878 * ReorderBufferAbortOld(); transactions we're just not interested in, but
1879 * which have committed are handled in ReorderBufferForget().
1880 *
1881 * This function purges this transaction and its contents from memory and
1882 * disk.
1883 */
1884 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1885 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1886 {
1887 ReorderBufferTXN *txn;
1888
1889 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1890 false);
1891
1892 /* unknown, nothing to remove */
1893 if (txn == NULL)
1894 return;
1895
1896 /* cosmetic... */
1897 txn->final_lsn = lsn;
1898
1899 /* remove potential on-disk data, and deallocate */
1900 ReorderBufferCleanupTXN(rb, txn);
1901 }
1902
1903 /*
1904 * Abort all transactions that aren't actually running anymore because the
1905 * server restarted.
1906 *
1907 * NB: These really have to be transactions that have aborted due to a server
1908 * crash/immediate restart, as we don't deal with invalidations here.
1909 */
1910 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)1911 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1912 {
1913 dlist_mutable_iter it;
1914
1915 /*
1916 * Iterate through all (potential) toplevel TXNs and abort all that are
1917 * older than what possibly can be running. Once we've found the first
1918 * that is alive we stop, there might be some that acquired an xid earlier
1919 * but started writing later, but it's unlikely and they will be cleaned
1920 * up in a later call to this function.
1921 */
1922 dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1923 {
1924 ReorderBufferTXN *txn;
1925
1926 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1927
1928 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1929 {
1930 elog(DEBUG2, "aborting old transaction %u", txn->xid);
1931
1932 /* remove potential on-disk data, and deallocate this tx */
1933 ReorderBufferCleanupTXN(rb, txn);
1934 }
1935 else
1936 return;
1937 }
1938 }
1939
1940 /*
1941 * Forget the contents of a transaction if we aren't interested in it's
1942 * contents. Needs to be first called for subtransactions and then for the
1943 * toplevel xid.
1944 *
1945 * This is significantly different to ReorderBufferAbort() because
1946 * transactions that have committed need to be treated differently from aborted
1947 * ones since they may have modified the catalog.
1948 *
1949 * Note that this is only allowed to be called in the moment a transaction
1950 * commit has just been read, not earlier; otherwise later records referring
1951 * to this xid might re-create the transaction incompletely.
1952 */
1953 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1954 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1955 {
1956 ReorderBufferTXN *txn;
1957
1958 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1959 false);
1960
1961 /* unknown, nothing to forget */
1962 if (txn == NULL)
1963 return;
1964
1965 /* cosmetic... */
1966 txn->final_lsn = lsn;
1967
1968 /*
1969 * Process cache invalidation messages if there are any. Even if we're not
1970 * interested in the transaction's contents, it could have manipulated the
1971 * catalog and we need to update the caches according to that.
1972 */
1973 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1974 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1975 txn->invalidations);
1976 else
1977 Assert(txn->ninvalidations == 0);
1978
1979 /* remove potential on-disk data, and deallocate */
1980 ReorderBufferCleanupTXN(rb, txn);
1981 }
1982
1983 /*
1984 * Execute invalidations happening outside the context of a decoded
1985 * transaction. That currently happens either for xid-less commits
1986 * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
1987 * transactions (via ReorderBufferForget()).
1988 */
1989 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)1990 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1991 SharedInvalidationMessage *invalidations)
1992 {
1993 bool use_subtxn = IsTransactionOrTransactionBlock();
1994 int i;
1995
1996 if (use_subtxn)
1997 BeginInternalSubTransaction("replay");
1998
1999 /*
2000 * Force invalidations to happen outside of a valid transaction - that way
2001 * entries will just be marked as invalid without accessing the catalog.
2002 * That's advantageous because we don't need to setup the full state
2003 * necessary for catalog access.
2004 */
2005 if (use_subtxn)
2006 AbortCurrentTransaction();
2007
2008 for (i = 0; i < ninvalidations; i++)
2009 LocalExecuteInvalidationMessage(&invalidations[i]);
2010
2011 if (use_subtxn)
2012 RollbackAndReleaseCurrentSubTransaction();
2013 }
2014
2015 /*
2016 * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2017 * least once for every xid in XLogRecord->xl_xid (other places in records
2018 * may, but do not have to be passed through here).
2019 *
2020 * Reorderbuffer keeps some datastructures about transactions in LSN order,
2021 * for efficiency. To do that it has to know about when transactions are seen
2022 * first in the WAL. As many types of records are not actually interesting for
2023 * logical decoding, they do not necessarily pass though here.
2024 */
2025 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2026 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2027 {
2028 /* many records won't have an xid assigned, centralize check here */
2029 if (xid != InvalidTransactionId)
2030 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2031 }
2032
2033 /*
2034 * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2035 * because the previous snapshot doesn't describe the catalog correctly for
2036 * following rows.
2037 */
2038 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2039 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
2040 XLogRecPtr lsn, Snapshot snap)
2041 {
2042 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2043
2044 change->data.snapshot = snap;
2045 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
2046
2047 ReorderBufferQueueChange(rb, xid, lsn, change);
2048 }
2049
2050 /*
2051 * Set up the transaction's base snapshot.
2052 *
2053 * If we know that xid is a subtransaction, set the base snapshot on the
2054 * top-level transaction instead.
2055 */
2056 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2057 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
2058 XLogRecPtr lsn, Snapshot snap)
2059 {
2060 ReorderBufferTXN *txn;
2061 bool is_new;
2062
2063 AssertArg(snap != NULL);
2064
2065 /*
2066 * Fetch the transaction to operate on. If we know it's a subtransaction,
2067 * operate on its top-level transaction instead.
2068 */
2069 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2070 if (txn->is_known_as_subxact)
2071 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2072 NULL, InvalidXLogRecPtr, false);
2073 Assert(txn->base_snapshot == NULL);
2074
2075 txn->base_snapshot = snap;
2076 txn->base_snapshot_lsn = lsn;
2077 dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
2078
2079 AssertTXNLsnOrder(rb);
2080 }
2081
2082 /*
2083 * Access the catalog with this CommandId at this point in the changestream.
2084 *
2085 * May only be called for command ids > 1
2086 */
2087 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)2088 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
2089 XLogRecPtr lsn, CommandId cid)
2090 {
2091 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2092
2093 change->data.command_id = cid;
2094 change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
2095
2096 ReorderBufferQueueChange(rb, xid, lsn, change);
2097 }
2098
2099
2100 /*
2101 * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2102 */
2103 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)2104 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
2105 XLogRecPtr lsn, RelFileNode node,
2106 ItemPointerData tid, CommandId cmin,
2107 CommandId cmax, CommandId combocid)
2108 {
2109 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2110 ReorderBufferTXN *txn;
2111
2112 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2113
2114 change->data.tuplecid.node = node;
2115 change->data.tuplecid.tid = tid;
2116 change->data.tuplecid.cmin = cmin;
2117 change->data.tuplecid.cmax = cmax;
2118 change->data.tuplecid.combocid = combocid;
2119 change->lsn = lsn;
2120 change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
2121
2122 dlist_push_tail(&txn->tuplecids, &change->node);
2123 txn->ntuplecids++;
2124 }
2125
2126 /*
2127 * Setup the invalidation of the toplevel transaction.
2128 *
2129 * This needs to be done before ReorderBufferCommit is called!
2130 */
2131 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)2132 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
2133 XLogRecPtr lsn, Size nmsgs,
2134 SharedInvalidationMessage *msgs)
2135 {
2136 ReorderBufferTXN *txn;
2137
2138 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2139
2140 if (txn->ninvalidations != 0)
2141 elog(ERROR, "only ever add one set of invalidations");
2142
2143 Assert(nmsgs > 0);
2144
2145 txn->ninvalidations = nmsgs;
2146 txn->invalidations = (SharedInvalidationMessage *)
2147 MemoryContextAlloc(rb->context,
2148 sizeof(SharedInvalidationMessage) * nmsgs);
2149 memcpy(txn->invalidations, msgs,
2150 sizeof(SharedInvalidationMessage) * nmsgs);
2151 }
2152
2153 /*
2154 * Apply all invalidations we know. Possibly we only need parts at this point
2155 * in the changestream but we don't know which those are.
2156 */
2157 static void
ReorderBufferExecuteInvalidations(ReorderBuffer * rb,ReorderBufferTXN * txn)2158 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2159 {
2160 int i;
2161
2162 for (i = 0; i < txn->ninvalidations; i++)
2163 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2164 }
2165
2166 /*
2167 * Mark a transaction as containing catalog changes
2168 */
2169 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2170 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
2171 XLogRecPtr lsn)
2172 {
2173 ReorderBufferTXN *txn;
2174
2175 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2176
2177 txn->has_catalog_changes = true;
2178 }
2179
2180 /*
2181 * Query whether a transaction is already *known* to contain catalog
2182 * changes. This can be wrong until directly before the commit!
2183 */
2184 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)2185 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
2186 {
2187 ReorderBufferTXN *txn;
2188
2189 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2190 false);
2191 if (txn == NULL)
2192 return false;
2193
2194 return txn->has_catalog_changes;
2195 }
2196
2197 /*
2198 * ReorderBufferXidHasBaseSnapshot
2199 * Have we already set the base snapshot for the given txn/subtxn?
2200 */
2201 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)2202 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
2203 {
2204 ReorderBufferTXN *txn;
2205
2206 txn = ReorderBufferTXNByXid(rb, xid, false,
2207 NULL, InvalidXLogRecPtr, false);
2208
2209 /* transaction isn't known yet, ergo no snapshot */
2210 if (txn == NULL)
2211 return false;
2212
2213 /* a known subtxn? operate on top-level txn instead */
2214 if (txn->is_known_as_subxact)
2215 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2216 NULL, InvalidXLogRecPtr, false);
2217
2218 return txn->base_snapshot != NULL;
2219 }
2220
2221
2222 /*
2223 * ---------------------------------------
2224 * Disk serialization support
2225 * ---------------------------------------
2226 */
2227
2228 /*
2229 * Ensure the IO buffer is >= sz.
2230 */
2231 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)2232 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2233 {
2234 if (!rb->outbufsize)
2235 {
2236 rb->outbuf = MemoryContextAlloc(rb->context, sz);
2237 rb->outbufsize = sz;
2238 }
2239 else if (rb->outbufsize < sz)
2240 {
2241 rb->outbuf = repalloc(rb->outbuf, sz);
2242 rb->outbufsize = sz;
2243 }
2244 }
2245
2246 /*
2247 * Check whether the transaction tx should spill its data to disk.
2248 */
2249 static void
ReorderBufferCheckSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2250 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2251 {
2252 /*
2253 * TODO: improve accounting so we cheaply can take subtransactions into
2254 * account here.
2255 */
2256 if (txn->nentries_mem >= max_changes_in_memory)
2257 {
2258 ReorderBufferSerializeTXN(rb, txn);
2259 Assert(txn->nentries_mem == 0);
2260 }
2261 }
2262
2263 /*
2264 * Spill data of a large transaction (and its subtransactions) to disk.
2265 */
2266 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2267 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2268 {
2269 dlist_iter subtxn_i;
2270 dlist_mutable_iter change_i;
2271 int fd = -1;
2272 XLogSegNo curOpenSegNo = 0;
2273 Size spilled = 0;
2274
2275 elog(DEBUG2, "spill %u changes in XID %u to disk",
2276 (uint32) txn->nentries_mem, txn->xid);
2277
2278 /* do the same to all child TXs */
2279 dlist_foreach(subtxn_i, &txn->subtxns)
2280 {
2281 ReorderBufferTXN *subtxn;
2282
2283 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2284 ReorderBufferSerializeTXN(rb, subtxn);
2285 }
2286
2287 /* serialize changestream */
2288 dlist_foreach_modify(change_i, &txn->changes)
2289 {
2290 ReorderBufferChange *change;
2291
2292 change = dlist_container(ReorderBufferChange, node, change_i.cur);
2293
2294 /*
2295 * store in segment in which it belongs by start lsn, don't split over
2296 * multiple segments tho
2297 */
2298 if (fd == -1 ||
2299 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2300 {
2301 char path[MAXPGPATH];
2302
2303 if (fd != -1)
2304 CloseTransientFile(fd);
2305
2306 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2307
2308 /*
2309 * No need to care about TLIs here, only used during a single run,
2310 * so each LSN only maps to a specific WAL record.
2311 */
2312 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2313 curOpenSegNo);
2314
2315 /* open segment, create it if necessary */
2316 fd = OpenTransientFile(path,
2317 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2318
2319 if (fd < 0)
2320 ereport(ERROR,
2321 (errcode_for_file_access(),
2322 errmsg("could not open file \"%s\": %m", path)));
2323 }
2324
2325 ReorderBufferSerializeChange(rb, txn, fd, change);
2326 dlist_delete(&change->node);
2327 ReorderBufferReturnChange(rb, change);
2328
2329 spilled++;
2330 }
2331
2332 Assert(spilled == txn->nentries_mem);
2333 Assert(dlist_is_empty(&txn->changes));
2334 txn->nentries_mem = 0;
2335 txn->serialized = true;
2336
2337 if (fd != -1)
2338 CloseTransientFile(fd);
2339 }
2340
2341 /*
2342 * Serialize individual change to disk.
2343 */
2344 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)2345 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2346 int fd, ReorderBufferChange *change)
2347 {
2348 ReorderBufferDiskChange *ondisk;
2349 Size sz = sizeof(ReorderBufferDiskChange);
2350
2351 ReorderBufferSerializeReserve(rb, sz);
2352
2353 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2354 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2355
2356 switch (change->action)
2357 {
2358 /* fall through these, they're all similar enough */
2359 case REORDER_BUFFER_CHANGE_INSERT:
2360 case REORDER_BUFFER_CHANGE_UPDATE:
2361 case REORDER_BUFFER_CHANGE_DELETE:
2362 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2363 {
2364 char *data;
2365 ReorderBufferTupleBuf *oldtup,
2366 *newtup;
2367 Size oldlen = 0;
2368 Size newlen = 0;
2369
2370 oldtup = change->data.tp.oldtuple;
2371 newtup = change->data.tp.newtuple;
2372
2373 if (oldtup)
2374 {
2375 sz += sizeof(HeapTupleData);
2376 oldlen = oldtup->tuple.t_len;
2377 sz += oldlen;
2378 }
2379
2380 if (newtup)
2381 {
2382 sz += sizeof(HeapTupleData);
2383 newlen = newtup->tuple.t_len;
2384 sz += newlen;
2385 }
2386
2387 /* make sure we have enough space */
2388 ReorderBufferSerializeReserve(rb, sz);
2389
2390 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2391 /* might have been reallocated above */
2392 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2393
2394 if (oldlen)
2395 {
2396 memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2397 data += sizeof(HeapTupleData);
2398
2399 memcpy(data, oldtup->tuple.t_data, oldlen);
2400 data += oldlen;
2401 }
2402
2403 if (newlen)
2404 {
2405 memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2406 data += sizeof(HeapTupleData);
2407
2408 memcpy(data, newtup->tuple.t_data, newlen);
2409 data += newlen;
2410 }
2411 break;
2412 }
2413 case REORDER_BUFFER_CHANGE_MESSAGE:
2414 {
2415 char *data;
2416 Size prefix_size = strlen(change->data.msg.prefix) + 1;
2417
2418 sz += prefix_size + change->data.msg.message_size +
2419 sizeof(Size) + sizeof(Size);
2420 ReorderBufferSerializeReserve(rb, sz);
2421
2422 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2423
2424 /* might have been reallocated above */
2425 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2426
2427 /* write the prefix including the size */
2428 memcpy(data, &prefix_size, sizeof(Size));
2429 data += sizeof(Size);
2430 memcpy(data, change->data.msg.prefix,
2431 prefix_size);
2432 data += prefix_size;
2433
2434 /* write the message including the size */
2435 memcpy(data, &change->data.msg.message_size, sizeof(Size));
2436 data += sizeof(Size);
2437 memcpy(data, change->data.msg.message,
2438 change->data.msg.message_size);
2439 data += change->data.msg.message_size;
2440
2441 break;
2442 }
2443 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2444 {
2445 Snapshot snap;
2446 char *data;
2447
2448 snap = change->data.snapshot;
2449
2450 sz += sizeof(SnapshotData) +
2451 sizeof(TransactionId) * snap->xcnt +
2452 sizeof(TransactionId) * snap->subxcnt;
2453
2454 /* make sure we have enough space */
2455 ReorderBufferSerializeReserve(rb, sz);
2456 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2457 /* might have been reallocated above */
2458 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2459
2460 memcpy(data, snap, sizeof(SnapshotData));
2461 data += sizeof(SnapshotData);
2462
2463 if (snap->xcnt)
2464 {
2465 memcpy(data, snap->xip,
2466 sizeof(TransactionId) * snap->xcnt);
2467 data += sizeof(TransactionId) * snap->xcnt;
2468 }
2469
2470 if (snap->subxcnt)
2471 {
2472 memcpy(data, snap->subxip,
2473 sizeof(TransactionId) * snap->subxcnt);
2474 data += sizeof(TransactionId) * snap->subxcnt;
2475 }
2476 break;
2477 }
2478 case REORDER_BUFFER_CHANGE_TRUNCATE:
2479 {
2480 Size size;
2481 char *data;
2482
2483 /* account for the OIDs of truncated relations */
2484 size = sizeof(Oid) * change->data.truncate.nrelids;
2485 sz += size;
2486
2487 /* make sure we have enough space */
2488 ReorderBufferSerializeReserve(rb, sz);
2489
2490 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2491 /* might have been reallocated above */
2492 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2493
2494 memcpy(data, change->data.truncate.relids, size);
2495 data += size;
2496
2497 break;
2498 }
2499 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2500 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2501 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2502 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2503 /* ReorderBufferChange contains everything important */
2504 break;
2505 }
2506
2507 ondisk->size = sz;
2508
2509 errno = 0;
2510 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2511 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2512 {
2513 int save_errno = errno;
2514
2515 CloseTransientFile(fd);
2516
2517 /* if write didn't set errno, assume problem is no disk space */
2518 errno = save_errno ? save_errno : ENOSPC;
2519 ereport(ERROR,
2520 (errcode_for_file_access(),
2521 errmsg("could not write to data file for XID %u: %m",
2522 txn->xid)));
2523 }
2524 pgstat_report_wait_end();
2525
2526 /*
2527 * Keep the transaction's final_lsn up to date with each change we send to
2528 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
2529 * only do this on commit and abort records, but that doesn't work if a
2530 * system crash leaves a transaction without its abort record).
2531 *
2532 * Make sure not to move it backwards.
2533 */
2534 if (txn->final_lsn < change->lsn)
2535 txn->final_lsn = change->lsn;
2536
2537 Assert(ondisk->change.action == change->action);
2538 }
2539
2540 /*
2541 * Restore a number of changes spilled to disk back into memory.
2542 */
2543 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,File * fd,XLogSegNo * segno)2544 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2545 File *fd, XLogSegNo *segno)
2546 {
2547 Size restored = 0;
2548 XLogSegNo last_segno;
2549 dlist_mutable_iter cleanup_iter;
2550
2551 Assert(txn->first_lsn != InvalidXLogRecPtr);
2552 Assert(txn->final_lsn != InvalidXLogRecPtr);
2553
2554 /* free current entries, so we have memory for more */
2555 dlist_foreach_modify(cleanup_iter, &txn->changes)
2556 {
2557 ReorderBufferChange *cleanup =
2558 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2559
2560 dlist_delete(&cleanup->node);
2561 ReorderBufferReturnChange(rb, cleanup);
2562 }
2563 txn->nentries_mem = 0;
2564 Assert(dlist_is_empty(&txn->changes));
2565
2566 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2567
2568 while (restored < max_changes_in_memory && *segno <= last_segno)
2569 {
2570 int readBytes;
2571 ReorderBufferDiskChange *ondisk;
2572
2573 if (*fd == -1)
2574 {
2575 char path[MAXPGPATH];
2576
2577 /* first time in */
2578 if (*segno == 0)
2579 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2580
2581 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2582
2583 /*
2584 * No need to care about TLIs here, only used during a single run,
2585 * so each LSN only maps to a specific WAL record.
2586 */
2587 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2588 *segno);
2589
2590 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
2591 if (*fd < 0 && errno == ENOENT)
2592 {
2593 *fd = -1;
2594 (*segno)++;
2595 continue;
2596 }
2597 else if (*fd < 0)
2598 ereport(ERROR,
2599 (errcode_for_file_access(),
2600 errmsg("could not open file \"%s\": %m",
2601 path)));
2602 }
2603
2604 /*
2605 * Read the statically sized part of a change which has information
2606 * about the total size. If we couldn't read a record, we're at the
2607 * end of this file.
2608 */
2609 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2610 readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange),
2611 WAIT_EVENT_REORDER_BUFFER_READ);
2612
2613 /* eof */
2614 if (readBytes == 0)
2615 {
2616 FileClose(*fd);
2617 *fd = -1;
2618 (*segno)++;
2619 continue;
2620 }
2621 else if (readBytes < 0)
2622 ereport(ERROR,
2623 (errcode_for_file_access(),
2624 errmsg("could not read from reorderbuffer spill file: %m")));
2625 else if (readBytes != sizeof(ReorderBufferDiskChange))
2626 ereport(ERROR,
2627 (errcode_for_file_access(),
2628 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2629 readBytes,
2630 (uint32) sizeof(ReorderBufferDiskChange))));
2631
2632 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2633
2634 ReorderBufferSerializeReserve(rb,
2635 sizeof(ReorderBufferDiskChange) + ondisk->size);
2636 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2637
2638 readBytes = FileRead(*fd,
2639 rb->outbuf + sizeof(ReorderBufferDiskChange),
2640 ondisk->size - sizeof(ReorderBufferDiskChange),
2641 WAIT_EVENT_REORDER_BUFFER_READ);
2642
2643 if (readBytes < 0)
2644 ereport(ERROR,
2645 (errcode_for_file_access(),
2646 errmsg("could not read from reorderbuffer spill file: %m")));
2647 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2648 ereport(ERROR,
2649 (errcode_for_file_access(),
2650 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2651 readBytes,
2652 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2653
2654 /*
2655 * ok, read a full change from disk, now restore it into proper
2656 * in-memory format
2657 */
2658 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2659 restored++;
2660 }
2661
2662 return restored;
2663 }
2664
2665 /*
2666 * Convert change from its on-disk format to in-memory format and queue it onto
2667 * the TXN's ->changes list.
2668 *
2669 * Note: although "data" is declared char*, at entry it points to a
2670 * maxalign'd buffer, making it safe in most of this function to assume
2671 * that the pointed-to data is suitably aligned for direct access.
2672 */
2673 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)2674 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2675 char *data)
2676 {
2677 ReorderBufferDiskChange *ondisk;
2678 ReorderBufferChange *change;
2679
2680 ondisk = (ReorderBufferDiskChange *) data;
2681
2682 change = ReorderBufferGetChange(rb);
2683
2684 /* copy static part */
2685 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2686
2687 data += sizeof(ReorderBufferDiskChange);
2688
2689 /* restore individual stuff */
2690 switch (change->action)
2691 {
2692 /* fall through these, they're all similar enough */
2693 case REORDER_BUFFER_CHANGE_INSERT:
2694 case REORDER_BUFFER_CHANGE_UPDATE:
2695 case REORDER_BUFFER_CHANGE_DELETE:
2696 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2697 if (change->data.tp.oldtuple)
2698 {
2699 uint32 tuplelen = ((HeapTuple) data)->t_len;
2700
2701 change->data.tp.oldtuple =
2702 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2703
2704 /* restore ->tuple */
2705 memcpy(&change->data.tp.oldtuple->tuple, data,
2706 sizeof(HeapTupleData));
2707 data += sizeof(HeapTupleData);
2708
2709 /* reset t_data pointer into the new tuplebuf */
2710 change->data.tp.oldtuple->tuple.t_data =
2711 ReorderBufferTupleBufData(change->data.tp.oldtuple);
2712
2713 /* restore tuple data itself */
2714 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2715 data += tuplelen;
2716 }
2717
2718 if (change->data.tp.newtuple)
2719 {
2720 /* here, data might not be suitably aligned! */
2721 uint32 tuplelen;
2722
2723 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2724 sizeof(uint32));
2725
2726 change->data.tp.newtuple =
2727 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2728
2729 /* restore ->tuple */
2730 memcpy(&change->data.tp.newtuple->tuple, data,
2731 sizeof(HeapTupleData));
2732 data += sizeof(HeapTupleData);
2733
2734 /* reset t_data pointer into the new tuplebuf */
2735 change->data.tp.newtuple->tuple.t_data =
2736 ReorderBufferTupleBufData(change->data.tp.newtuple);
2737
2738 /* restore tuple data itself */
2739 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2740 data += tuplelen;
2741 }
2742
2743 break;
2744 case REORDER_BUFFER_CHANGE_MESSAGE:
2745 {
2746 Size prefix_size;
2747
2748 /* read prefix */
2749 memcpy(&prefix_size, data, sizeof(Size));
2750 data += sizeof(Size);
2751 change->data.msg.prefix = MemoryContextAlloc(rb->context,
2752 prefix_size);
2753 memcpy(change->data.msg.prefix, data, prefix_size);
2754 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2755 data += prefix_size;
2756
2757 /* read the message */
2758 memcpy(&change->data.msg.message_size, data, sizeof(Size));
2759 data += sizeof(Size);
2760 change->data.msg.message = MemoryContextAlloc(rb->context,
2761 change->data.msg.message_size);
2762 memcpy(change->data.msg.message, data,
2763 change->data.msg.message_size);
2764 data += change->data.msg.message_size;
2765
2766 break;
2767 }
2768 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2769 {
2770 Snapshot oldsnap;
2771 Snapshot newsnap;
2772 Size size;
2773
2774 oldsnap = (Snapshot) data;
2775
2776 size = sizeof(SnapshotData) +
2777 sizeof(TransactionId) * oldsnap->xcnt +
2778 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2779
2780 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2781
2782 newsnap = change->data.snapshot;
2783
2784 memcpy(newsnap, data, size);
2785 newsnap->xip = (TransactionId *)
2786 (((char *) newsnap) + sizeof(SnapshotData));
2787 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2788 newsnap->copied = true;
2789 break;
2790 }
2791 /* the base struct contains all the data, easy peasy */
2792 case REORDER_BUFFER_CHANGE_TRUNCATE:
2793 {
2794 Oid *relids;
2795
2796 relids = ReorderBufferGetRelids(rb,
2797 change->data.truncate.nrelids);
2798 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
2799 change->data.truncate.relids = relids;
2800
2801 break;
2802 }
2803 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2804 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2805 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2806 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2807 break;
2808 }
2809
2810 dlist_push_tail(&txn->changes, &change->node);
2811 txn->nentries_mem++;
2812 }
2813
2814 /*
2815 * Remove all on-disk stored for the passed in transaction.
2816 */
2817 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)2818 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2819 {
2820 XLogSegNo first;
2821 XLogSegNo cur;
2822 XLogSegNo last;
2823
2824 Assert(txn->first_lsn != InvalidXLogRecPtr);
2825 Assert(txn->final_lsn != InvalidXLogRecPtr);
2826
2827 XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2828 XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2829
2830 /* iterate over all possible filenames, and delete them */
2831 for (cur = first; cur <= last; cur++)
2832 {
2833 char path[MAXPGPATH];
2834
2835 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
2836 if (unlink(path) != 0 && errno != ENOENT)
2837 ereport(ERROR,
2838 (errcode_for_file_access(),
2839 errmsg("could not remove file \"%s\": %m", path)));
2840 }
2841 }
2842
2843 /*
2844 * Remove any leftover serialized reorder buffers from a slot directory after a
2845 * prior crash or decoding session exit.
2846 */
2847 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)2848 ReorderBufferCleanupSerializedTXNs(const char *slotname)
2849 {
2850 DIR *spill_dir;
2851 struct dirent *spill_de;
2852 struct stat statbuf;
2853 char path[MAXPGPATH * 2 + 12];
2854
2855 sprintf(path, "pg_replslot/%s", slotname);
2856
2857 /* we're only handling directories here, skip if it's not ours */
2858 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2859 return;
2860
2861 spill_dir = AllocateDir(path);
2862 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2863 {
2864 /* only look at names that can be ours */
2865 if (strncmp(spill_de->d_name, "xid", 3) == 0)
2866 {
2867 snprintf(path, sizeof(path),
2868 "pg_replslot/%s/%s", slotname,
2869 spill_de->d_name);
2870
2871 if (unlink(path) != 0)
2872 ereport(ERROR,
2873 (errcode_for_file_access(),
2874 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2875 path, slotname)));
2876 }
2877 }
2878 FreeDir(spill_dir);
2879 }
2880
2881 /*
2882 * Given a replication slot, transaction ID and segment number, fill in the
2883 * corresponding spill file into 'path', which is a caller-owned buffer of size
2884 * at least MAXPGPATH.
2885 */
2886 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)2887 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2888 XLogSegNo segno)
2889 {
2890 XLogRecPtr recptr;
2891
2892 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
2893
2894 snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2895 NameStr(MyReplicationSlot->data.name),
2896 xid,
2897 (uint32) (recptr >> 32), (uint32) recptr);
2898 }
2899
2900 /*
2901 * Delete all data spilled to disk after we've restarted/crashed. It will be
2902 * recreated when the respective slots are reused.
2903 */
2904 void
StartupReorderBuffer(void)2905 StartupReorderBuffer(void)
2906 {
2907 DIR *logical_dir;
2908 struct dirent *logical_de;
2909
2910 logical_dir = AllocateDir("pg_replslot");
2911 while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2912 {
2913 if (strcmp(logical_de->d_name, ".") == 0 ||
2914 strcmp(logical_de->d_name, "..") == 0)
2915 continue;
2916
2917 /* if it cannot be a slot, skip the directory */
2918 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2919 continue;
2920
2921 /*
2922 * ok, has to be a surviving logical slot, iterate and delete
2923 * everything starting with xid-*
2924 */
2925 ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
2926 }
2927 FreeDir(logical_dir);
2928 }
2929
2930 /* ---------------------------------------
2931 * toast reassembly support
2932 * ---------------------------------------
2933 */
2934
2935 /*
2936 * Initialize per tuple toast reconstruction support.
2937 */
2938 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)2939 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2940 {
2941 HASHCTL hash_ctl;
2942
2943 Assert(txn->toast_hash == NULL);
2944
2945 memset(&hash_ctl, 0, sizeof(hash_ctl));
2946 hash_ctl.keysize = sizeof(Oid);
2947 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2948 hash_ctl.hcxt = rb->context;
2949 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2950 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2951 }
2952
2953 /*
2954 * Per toast-chunk handling for toast reconstruction
2955 *
2956 * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2957 * toasted Datum comes along.
2958 */
2959 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)2960 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2961 Relation relation, ReorderBufferChange *change)
2962 {
2963 ReorderBufferToastEnt *ent;
2964 ReorderBufferTupleBuf *newtup;
2965 bool found;
2966 int32 chunksize;
2967 bool isnull;
2968 Pointer chunk;
2969 TupleDesc desc = RelationGetDescr(relation);
2970 Oid chunk_id;
2971 int32 chunk_seq;
2972
2973 if (txn->toast_hash == NULL)
2974 ReorderBufferToastInitHash(rb, txn);
2975
2976 Assert(IsToastRelation(relation));
2977
2978 newtup = change->data.tp.newtuple;
2979 chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2980 Assert(!isnull);
2981 chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2982 Assert(!isnull);
2983
2984 ent = (ReorderBufferToastEnt *)
2985 hash_search(txn->toast_hash,
2986 (void *) &chunk_id,
2987 HASH_ENTER,
2988 &found);
2989
2990 if (!found)
2991 {
2992 Assert(ent->chunk_id == chunk_id);
2993 ent->num_chunks = 0;
2994 ent->last_chunk_seq = 0;
2995 ent->size = 0;
2996 ent->reconstructed = NULL;
2997 dlist_init(&ent->chunks);
2998
2999 if (chunk_seq != 0)
3000 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3001 chunk_seq, chunk_id);
3002 }
3003 else if (found && chunk_seq != ent->last_chunk_seq + 1)
3004 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3005 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3006
3007 chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3008 Assert(!isnull);
3009
3010 /* calculate size so we can allocate the right size at once later */
3011 if (!VARATT_IS_EXTENDED(chunk))
3012 chunksize = VARSIZE(chunk) - VARHDRSZ;
3013 else if (VARATT_IS_SHORT(chunk))
3014 /* could happen due to heap_form_tuple doing its thing */
3015 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3016 else
3017 elog(ERROR, "unexpected type of toast chunk");
3018
3019 ent->size += chunksize;
3020 ent->last_chunk_seq = chunk_seq;
3021 ent->num_chunks++;
3022 dlist_push_tail(&ent->chunks, &change->node);
3023 }
3024
3025 /*
3026 * Rejigger change->newtuple to point to in-memory toast tuples instead to
3027 * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3028 *
3029 * We cannot replace unchanged toast tuples though, so those will still point
3030 * to on-disk toast data.
3031 */
3032 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)3033 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
3034 Relation relation, ReorderBufferChange *change)
3035 {
3036 TupleDesc desc;
3037 int natt;
3038 Datum *attrs;
3039 bool *isnull;
3040 bool *free;
3041 HeapTuple tmphtup;
3042 Relation toast_rel;
3043 TupleDesc toast_desc;
3044 MemoryContext oldcontext;
3045 ReorderBufferTupleBuf *newtup;
3046
3047 /* no toast tuples changed */
3048 if (txn->toast_hash == NULL)
3049 return;
3050
3051 oldcontext = MemoryContextSwitchTo(rb->context);
3052
3053 /* we should only have toast tuples in an INSERT or UPDATE */
3054 Assert(change->data.tp.newtuple);
3055
3056 desc = RelationGetDescr(relation);
3057
3058 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3059 if (!RelationIsValid(toast_rel))
3060 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
3061 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
3062
3063 toast_desc = RelationGetDescr(toast_rel);
3064
3065 /* should we allocate from stack instead? */
3066 attrs = palloc0(sizeof(Datum) * desc->natts);
3067 isnull = palloc0(sizeof(bool) * desc->natts);
3068 free = palloc0(sizeof(bool) * desc->natts);
3069
3070 newtup = change->data.tp.newtuple;
3071
3072 heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3073
3074 for (natt = 0; natt < desc->natts; natt++)
3075 {
3076 Form_pg_attribute attr = TupleDescAttr(desc, natt);
3077 ReorderBufferToastEnt *ent;
3078 struct varlena *varlena;
3079
3080 /* va_rawsize is the size of the original datum -- including header */
3081 struct varatt_external toast_pointer;
3082 struct varatt_indirect redirect_pointer;
3083 struct varlena *new_datum = NULL;
3084 struct varlena *reconstructed;
3085 dlist_iter it;
3086 Size data_done = 0;
3087
3088 /* system columns aren't toasted */
3089 if (attr->attnum < 0)
3090 continue;
3091
3092 if (attr->attisdropped)
3093 continue;
3094
3095 /* not a varlena datatype */
3096 if (attr->attlen != -1)
3097 continue;
3098
3099 /* no data */
3100 if (isnull[natt])
3101 continue;
3102
3103 /* ok, we know we have a toast datum */
3104 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3105
3106 /* no need to do anything if the tuple isn't external */
3107 if (!VARATT_IS_EXTERNAL(varlena))
3108 continue;
3109
3110 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3111
3112 /*
3113 * Check whether the toast tuple changed, replace if so.
3114 */
3115 ent = (ReorderBufferToastEnt *)
3116 hash_search(txn->toast_hash,
3117 (void *) &toast_pointer.va_valueid,
3118 HASH_FIND,
3119 NULL);
3120 if (ent == NULL)
3121 continue;
3122
3123 new_datum =
3124 (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3125
3126 free[natt] = true;
3127
3128 reconstructed = palloc0(toast_pointer.va_rawsize);
3129
3130 ent->reconstructed = reconstructed;
3131
3132 /* stitch toast tuple back together from its parts */
3133 dlist_foreach(it, &ent->chunks)
3134 {
3135 bool isnull;
3136 ReorderBufferChange *cchange;
3137 ReorderBufferTupleBuf *ctup;
3138 Pointer chunk;
3139
3140 cchange = dlist_container(ReorderBufferChange, node, it.cur);
3141 ctup = cchange->data.tp.newtuple;
3142 chunk = DatumGetPointer(
3143 fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3144
3145 Assert(!isnull);
3146 Assert(!VARATT_IS_EXTERNAL(chunk));
3147 Assert(!VARATT_IS_SHORT(chunk));
3148
3149 memcpy(VARDATA(reconstructed) + data_done,
3150 VARDATA(chunk),
3151 VARSIZE(chunk) - VARHDRSZ);
3152 data_done += VARSIZE(chunk) - VARHDRSZ;
3153 }
3154 Assert(data_done == toast_pointer.va_extsize);
3155
3156 /* make sure its marked as compressed or not */
3157 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3158 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3159 else
3160 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3161
3162 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3163 redirect_pointer.pointer = reconstructed;
3164
3165 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
3166 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3167 sizeof(redirect_pointer));
3168
3169 attrs[natt] = PointerGetDatum(new_datum);
3170 }
3171
3172 /*
3173 * Build tuple in separate memory & copy tuple back into the tuplebuf
3174 * passed to the output plugin. We can't directly heap_fill_tuple() into
3175 * the tuplebuf because attrs[] will point back into the current content.
3176 */
3177 tmphtup = heap_form_tuple(desc, attrs, isnull);
3178 Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3179 Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3180
3181 memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3182 newtup->tuple.t_len = tmphtup->t_len;
3183
3184 /*
3185 * free resources we won't further need, more persistent stuff will be
3186 * free'd in ReorderBufferToastReset().
3187 */
3188 RelationClose(toast_rel);
3189 pfree(tmphtup);
3190 for (natt = 0; natt < desc->natts; natt++)
3191 {
3192 if (free[natt])
3193 pfree(DatumGetPointer(attrs[natt]));
3194 }
3195 pfree(attrs);
3196 pfree(free);
3197 pfree(isnull);
3198
3199 MemoryContextSwitchTo(oldcontext);
3200 }
3201
3202 /*
3203 * Free all resources allocated for toast reconstruction.
3204 */
3205 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)3206 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
3207 {
3208 HASH_SEQ_STATUS hstat;
3209 ReorderBufferToastEnt *ent;
3210
3211 if (txn->toast_hash == NULL)
3212 return;
3213
3214 /* sequentially walk over the hash and free everything */
3215 hash_seq_init(&hstat, txn->toast_hash);
3216 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3217 {
3218 dlist_mutable_iter it;
3219
3220 if (ent->reconstructed != NULL)
3221 pfree(ent->reconstructed);
3222
3223 dlist_foreach_modify(it, &ent->chunks)
3224 {
3225 ReorderBufferChange *change =
3226 dlist_container(ReorderBufferChange, node, it.cur);
3227
3228 dlist_delete(&change->node);
3229 ReorderBufferReturnChange(rb, change);
3230 }
3231 }
3232
3233 hash_destroy(txn->toast_hash);
3234 txn->toast_hash = NULL;
3235 }
3236
3237
3238 /* ---------------------------------------
3239 * Visibility support for logical decoding
3240 *
3241 *
3242 * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3243 * always rely on stored cmin/cmax values because of two scenarios:
3244 *
3245 * * A tuple got changed multiple times during a single transaction and thus
3246 * has got a combocid. Combocid's are only valid for the duration of a
3247 * single transaction.
3248 * * A tuple with a cmin but no cmax (and thus no combocid) got
3249 * deleted/updated in another transaction than the one which created it
3250 * which we are looking at right now. As only one of cmin, cmax or combocid
3251 * is actually stored in the heap we don't have access to the value we
3252 * need anymore.
3253 *
3254 * To resolve those problems we have a per-transaction hash of (cmin,
3255 * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3256 * (cmin, cmax) values. That also takes care of combocids by simply
3257 * not caring about them at all. As we have the real cmin/cmax values
3258 * combocids aren't interesting.
3259 *
3260 * As we only care about catalog tuples here the overhead of this
3261 * hashtable should be acceptable.
3262 *
3263 * Heap rewrites complicate this a bit, check rewriteheap.c for
3264 * details.
3265 * -------------------------------------------------------------------------
3266 */
3267
3268 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
3269 typedef struct RewriteMappingFile
3270 {
3271 XLogRecPtr lsn;
3272 char fname[MAXPGPATH];
3273 } RewriteMappingFile;
3274
3275 #if NOT_USED
3276 static void
DisplayMapping(HTAB * tuplecid_data)3277 DisplayMapping(HTAB *tuplecid_data)
3278 {
3279 HASH_SEQ_STATUS hstat;
3280 ReorderBufferTupleCidEnt *ent;
3281
3282 hash_seq_init(&hstat, tuplecid_data);
3283 while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3284 {
3285 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3286 ent->key.relnode.dbNode,
3287 ent->key.relnode.spcNode,
3288 ent->key.relnode.relNode,
3289 ItemPointerGetBlockNumber(&ent->key.tid),
3290 ItemPointerGetOffsetNumber(&ent->key.tid),
3291 ent->cmin,
3292 ent->cmax
3293 );
3294 }
3295 }
3296 #endif
3297
3298 /*
3299 * Apply a single mapping file to tuplecid_data.
3300 *
3301 * The mapping file has to have been verified to be a) committed b) for our
3302 * transaction c) applied in LSN order.
3303 */
3304 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)3305 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3306 {
3307 char path[MAXPGPATH];
3308 int fd;
3309 int readBytes;
3310 LogicalRewriteMappingData map;
3311
3312 sprintf(path, "pg_logical/mappings/%s", fname);
3313 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3314 if (fd < 0)
3315 ereport(ERROR,
3316 (errcode_for_file_access(),
3317 errmsg("could not open file \"%s\": %m", path)));
3318
3319 while (true)
3320 {
3321 ReorderBufferTupleCidKey key;
3322 ReorderBufferTupleCidEnt *ent;
3323 ReorderBufferTupleCidEnt *new_ent;
3324 bool found;
3325
3326 /* be careful about padding */
3327 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3328
3329 /* read all mappings till the end of the file */
3330 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3331 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3332 pgstat_report_wait_end();
3333
3334 if (readBytes < 0)
3335 ereport(ERROR,
3336 (errcode_for_file_access(),
3337 errmsg("could not read file \"%s\": %m",
3338 path)));
3339 else if (readBytes == 0) /* EOF */
3340 break;
3341 else if (readBytes != sizeof(LogicalRewriteMappingData))
3342 ereport(ERROR,
3343 (errcode_for_file_access(),
3344 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3345 path, readBytes,
3346 (int32) sizeof(LogicalRewriteMappingData))));
3347
3348 key.relnode = map.old_node;
3349 ItemPointerCopy(&map.old_tid,
3350 &key.tid);
3351
3352
3353 ent = (ReorderBufferTupleCidEnt *)
3354 hash_search(tuplecid_data,
3355 (void *) &key,
3356 HASH_FIND,
3357 NULL);
3358
3359 /* no existing mapping, no need to update */
3360 if (!ent)
3361 continue;
3362
3363 key.relnode = map.new_node;
3364 ItemPointerCopy(&map.new_tid,
3365 &key.tid);
3366
3367 new_ent = (ReorderBufferTupleCidEnt *)
3368 hash_search(tuplecid_data,
3369 (void *) &key,
3370 HASH_ENTER,
3371 &found);
3372
3373 if (found)
3374 {
3375 /*
3376 * Make sure the existing mapping makes sense. We sometime update
3377 * old records that did not yet have a cmax (e.g. pg_class' own
3378 * entry while rewriting it) during rewrites, so allow that.
3379 */
3380 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3381 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3382 }
3383 else
3384 {
3385 /* update mapping */
3386 new_ent->cmin = ent->cmin;
3387 new_ent->cmax = ent->cmax;
3388 new_ent->combocid = ent->combocid;
3389 }
3390 }
3391
3392 CloseTransientFile(fd);
3393 }
3394
3395
3396 /*
3397 * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3398 */
3399 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)3400 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3401 {
3402 return bsearch(&xid, xip, num,
3403 sizeof(TransactionId), xidComparator) != NULL;
3404 }
3405
3406 /*
3407 * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3408 */
3409 static int
file_sort_by_lsn(const void * a_p,const void * b_p)3410 file_sort_by_lsn(const void *a_p, const void *b_p)
3411 {
3412 RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3413 RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3414
3415 if (a->lsn < b->lsn)
3416 return -1;
3417 else if (a->lsn > b->lsn)
3418 return 1;
3419 return 0;
3420 }
3421
3422 /*
3423 * Apply any existing logical remapping files if there are any targeted at our
3424 * transaction for relid.
3425 */
3426 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)3427 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3428 {
3429 DIR *mapping_dir;
3430 struct dirent *mapping_de;
3431 List *files = NIL;
3432 ListCell *file;
3433 RewriteMappingFile **files_a;
3434 size_t off;
3435 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3436
3437 mapping_dir = AllocateDir("pg_logical/mappings");
3438 while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3439 {
3440 Oid f_dboid;
3441 Oid f_relid;
3442 TransactionId f_mapped_xid;
3443 TransactionId f_create_xid;
3444 XLogRecPtr f_lsn;
3445 uint32 f_hi,
3446 f_lo;
3447 RewriteMappingFile *f;
3448
3449 if (strcmp(mapping_de->d_name, ".") == 0 ||
3450 strcmp(mapping_de->d_name, "..") == 0)
3451 continue;
3452
3453 /* Ignore files that aren't ours */
3454 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3455 continue;
3456
3457 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3458 &f_dboid, &f_relid, &f_hi, &f_lo,
3459 &f_mapped_xid, &f_create_xid) != 6)
3460 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3461
3462 f_lsn = ((uint64) f_hi) << 32 | f_lo;
3463
3464 /* mapping for another database */
3465 if (f_dboid != dboid)
3466 continue;
3467
3468 /* mapping for another relation */
3469 if (f_relid != relid)
3470 continue;
3471
3472 /* did the creating transaction abort? */
3473 if (!TransactionIdDidCommit(f_create_xid))
3474 continue;
3475
3476 /* not for our transaction */
3477 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3478 continue;
3479
3480 /* ok, relevant, queue for apply */
3481 f = palloc(sizeof(RewriteMappingFile));
3482 f->lsn = f_lsn;
3483 strcpy(f->fname, mapping_de->d_name);
3484 files = lappend(files, f);
3485 }
3486 FreeDir(mapping_dir);
3487
3488 /* build array we can easily sort */
3489 files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3490 off = 0;
3491 foreach(file, files)
3492 {
3493 files_a[off++] = lfirst(file);
3494 }
3495
3496 /* sort files so we apply them in LSN order */
3497 qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3498 file_sort_by_lsn);
3499
3500 for (off = 0; off < list_length(files); off++)
3501 {
3502 RewriteMappingFile *f = files_a[off];
3503
3504 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3505 snapshot->subxip[0]);
3506 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3507 pfree(f);
3508 }
3509 }
3510
3511 /*
3512 * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3513 * combocids.
3514 */
3515 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)3516 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3517 Snapshot snapshot,
3518 HeapTuple htup, Buffer buffer,
3519 CommandId *cmin, CommandId *cmax)
3520 {
3521 ReorderBufferTupleCidKey key;
3522 ReorderBufferTupleCidEnt *ent;
3523 ForkNumber forkno;
3524 BlockNumber blockno;
3525 bool updated_mapping = false;
3526
3527 /* be careful about padding */
3528 memset(&key, 0, sizeof(key));
3529
3530 Assert(!BufferIsLocal(buffer));
3531
3532 /*
3533 * get relfilenode from the buffer, no convenient way to access it other
3534 * than that.
3535 */
3536 BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3537
3538 /* tuples can only be in the main fork */
3539 Assert(forkno == MAIN_FORKNUM);
3540 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3541
3542 ItemPointerCopy(&htup->t_self,
3543 &key.tid);
3544
3545 restart:
3546 ent = (ReorderBufferTupleCidEnt *)
3547 hash_search(tuplecid_data,
3548 (void *) &key,
3549 HASH_FIND,
3550 NULL);
3551
3552 /*
3553 * failed to find a mapping, check whether the table was rewritten and
3554 * apply mapping if so, but only do that once - there can be no new
3555 * mappings while we are in here since we have to hold a lock on the
3556 * relation.
3557 */
3558 if (ent == NULL && !updated_mapping)
3559 {
3560 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3561 /* now check but don't update for a mapping again */
3562 updated_mapping = true;
3563 goto restart;
3564 }
3565 else if (ent == NULL)
3566 return false;
3567
3568 if (cmin)
3569 *cmin = ent->cmin;
3570 if (cmax)
3571 *cmax = ent->cmax;
3572 return true;
3573 }
3574