1 /*-------------------------------------------------------------------------
2 *
3 * reorderbuffer.c
4 * PostgreSQL logical replay/reorder buffer management
5 *
6 *
7 * Copyright (c) 2012-2016, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/reorderbuffer.c
12 *
13 * NOTES
14 * This module gets handed individual pieces of transactions in the order
15 * they are written to the WAL and is responsible to reassemble them into
16 * toplevel transaction sized pieces. When a transaction is completely
17 * reassembled - signalled by reading the transaction commit record - it
18 * will then call the output plugin (c.f. ReorderBufferCommit()) with the
19 * individual changes. The output plugins rely on snapshots built by
20 * snapbuild.c which hands them to us.
21 *
22 * Transactions and subtransactions/savepoints in postgres are not
23 * immediately linked to each other from outside the performing
24 * backend. Only at commit/abort (or special xact_assignment records) they
25 * are linked together. Which means that we will have to splice together a
26 * toplevel transaction from its subtransactions. To do that efficiently we
27 * build a binary heap indexed by the smallest current lsn of the individual
28 * subtransactions' changestreams. As the individual streams are inherently
29 * ordered by LSN - since that is where we build them from - the transaction
30 * can easily be reassembled by always using the subtransaction with the
31 * smallest current LSN from the heap.
32 *
33 * In order to cope with large transactions - which can be several times as
34 * big as the available memory - this module supports spooling the contents
35 * of a large transactions to disk. When the transaction is replayed the
36 * contents of individual (sub-)transactions will be read from disk in
37 * chunks.
38 *
39 * This module also has to deal with reassembling toast records from the
40 * individual chunks stored in WAL. When a new (or initial) version of a
41 * tuple is stored in WAL it will always be preceded by the toast chunks
42 * emitted for the columns stored out of line. Within a single toplevel
43 * transaction there will be no other data carrying records between a row's
44 * toast chunks and the row data itself. See ReorderBufferToast* for
45 * details.
46 * -------------------------------------------------------------------------
47 */
48 #include "postgres.h"
49
50 #include <unistd.h>
51 #include <sys/stat.h>
52
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "replication/logical.h"
62 #include "replication/reorderbuffer.h"
63 #include "replication/slot.h"
64 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
65 #include "storage/bufmgr.h"
66 #include "storage/fd.h"
67 #include "storage/sinval.h"
68 #include "utils/builtins.h"
69 #include "utils/combocid.h"
70 #include "utils/memdebug.h"
71 #include "utils/memutils.h"
72 #include "utils/rel.h"
73 #include "utils/relfilenodemap.h"
74 #include "utils/tqual.h"
75
76
77 /* entry for a hash table we use to map from xid to our transaction state */
78 typedef struct ReorderBufferTXNByIdEnt
79 {
80 TransactionId xid;
81 ReorderBufferTXN *txn;
82 } ReorderBufferTXNByIdEnt;
83
84 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
85 typedef struct ReorderBufferTupleCidKey
86 {
87 RelFileNode relnode;
88 ItemPointerData tid;
89 } ReorderBufferTupleCidKey;
90
91 typedef struct ReorderBufferTupleCidEnt
92 {
93 ReorderBufferTupleCidKey key;
94 CommandId cmin;
95 CommandId cmax;
96 CommandId combocid; /* just for debugging */
97 } ReorderBufferTupleCidEnt;
98
99 /* k-way in-order change iteration support structures */
100 typedef struct ReorderBufferIterTXNEntry
101 {
102 XLogRecPtr lsn;
103 ReorderBufferChange *change;
104 ReorderBufferTXN *txn;
105 File fd;
106 XLogSegNo segno;
107 } ReorderBufferIterTXNEntry;
108
109 typedef struct ReorderBufferIterTXNState
110 {
111 binaryheap *heap;
112 Size nr_txns;
113 dlist_head old_change;
114 ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
115 } ReorderBufferIterTXNState;
116
117 /* toast datastructures */
118 typedef struct ReorderBufferToastEnt
119 {
120 Oid chunk_id; /* toast_table.chunk_id */
121 int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
122 * have seen */
123 Size num_chunks; /* number of chunks we've already seen */
124 Size size; /* combined size of chunks seen */
125 dlist_head chunks; /* linked list of chunks */
126 struct varlena *reconstructed; /* reconstructed varlena now pointed
127 * to in main tup */
128 } ReorderBufferToastEnt;
129
130 /* Disk serialization support datastructures */
131 typedef struct ReorderBufferDiskChange
132 {
133 Size size;
134 ReorderBufferChange change;
135 /* data follows */
136 } ReorderBufferDiskChange;
137
138 /*
139 * Maximum number of changes kept in memory, per transaction. After that,
140 * changes are spooled to disk.
141 *
142 * The current value should be sufficient to decode the entire transaction
143 * without hitting disk in OLTP workloads, while starting to spool to disk in
144 * other workloads reasonably fast.
145 *
146 * At some point in the future it probably makes sense to have a more elaborate
147 * resource management here, but it's not entirely clear what that would look
148 * like.
149 */
150 static const Size max_changes_in_memory = 4096;
151
152 /*
153 * We use a very simple form of a slab allocator for frequently allocated
154 * objects, simply keeping a fixed number in a linked list when unused,
155 * instead pfree()ing them. Without that in many workloads aset.c becomes a
156 * major bottleneck, especially when spilling to disk while decoding batch
157 * workloads.
158 */
159 static const Size max_cached_changes = 4096 * 2;
160 static const Size max_cached_tuplebufs = 4096 * 2; /* ~64MB */
161 static const Size max_cached_transactions = 512;
162
163
164 /* ---------------------------------------
165 * primary reorderbuffer support routines
166 * ---------------------------------------
167 */
168 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
169 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
170 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
171 TransactionId xid, bool create, bool *is_new,
172 XLogRecPtr lsn, bool create_as_top);
173 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
174 ReorderBufferTXN *subtxn);
175
176 static void AssertTXNLsnOrder(ReorderBuffer *rb);
177
178 /* ---------------------------------------
179 * support functions for lsn-order iterating over the ->changes of a
180 * transaction and its subtransactions
181 *
182 * used for iteration over the k-way heap merge of a transaction and its
183 * subtransactions
184 * ---------------------------------------
185 */
186 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
187 ReorderBufferIterTXNState *volatile *iter_state);
188 static ReorderBufferChange *
189 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
190 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
191 ReorderBufferIterTXNState *state);
192 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
193
194 /*
195 * ---------------------------------------
196 * Disk serialization support functions
197 * ---------------------------------------
198 */
199 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
200 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
201 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
202 int fd, ReorderBufferChange *change);
203 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
204 File *fd, XLogSegNo *segno);
205 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
206 char *change);
207 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
208 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
209 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
210 TransactionId xid, XLogSegNo segno);
211
212 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
213 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
214 ReorderBufferTXN *txn, CommandId cid);
215
216 /* ---------------------------------------
217 * toast reassembly support
218 * ---------------------------------------
219 */
220 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
221 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
222 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
223 Relation relation, ReorderBufferChange *change);
224 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
225 Relation relation, ReorderBufferChange *change);
226
227
228 /*
229 * Allocate a new ReorderBuffer and clean out any old serialized state from
230 * prior ReorderBuffer instances for the same slot.
231 */
232 ReorderBuffer *
ReorderBufferAllocate(void)233 ReorderBufferAllocate(void)
234 {
235 ReorderBuffer *buffer;
236 HASHCTL hash_ctl;
237 MemoryContext new_ctx;
238
239 Assert(MyReplicationSlot != NULL);
240
241 /* allocate memory in own context, to have better accountability */
242 new_ctx = AllocSetContextCreate(CurrentMemoryContext,
243 "ReorderBuffer",
244 ALLOCSET_DEFAULT_SIZES);
245
246 buffer =
247 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
248
249 memset(&hash_ctl, 0, sizeof(hash_ctl));
250
251 buffer->context = new_ctx;
252
253 hash_ctl.keysize = sizeof(TransactionId);
254 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
255 hash_ctl.hcxt = buffer->context;
256
257 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
258 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
259
260 buffer->by_txn_last_xid = InvalidTransactionId;
261 buffer->by_txn_last_txn = NULL;
262
263 buffer->nr_cached_transactions = 0;
264 buffer->nr_cached_changes = 0;
265 buffer->nr_cached_tuplebufs = 0;
266
267 buffer->outbuf = NULL;
268 buffer->outbufsize = 0;
269
270 buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
271
272 dlist_init(&buffer->toplevel_by_lsn);
273 dlist_init(&buffer->txns_by_base_snapshot_lsn);
274 dlist_init(&buffer->cached_transactions);
275 dlist_init(&buffer->cached_changes);
276 slist_init(&buffer->cached_tuplebufs);
277
278 /*
279 * Ensure there's no stale data from prior uses of this slot, in case some
280 * prior exit avoided calling ReorderBufferFree. Failure to do this can
281 * produce duplicated txns, and it's very cheap if there's nothing there.
282 */
283 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
284
285 return buffer;
286 }
287
288 /*
289 * Free a ReorderBuffer
290 */
291 void
ReorderBufferFree(ReorderBuffer * rb)292 ReorderBufferFree(ReorderBuffer *rb)
293 {
294 MemoryContext context = rb->context;
295
296 /*
297 * We free separately allocated data by entirely scrapping reorderbuffer's
298 * memory context.
299 */
300 MemoryContextDelete(context);
301
302 /* Free disk space used by unconsumed reorder buffers */
303 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
304 }
305
306 /*
307 * Get an unused, possibly preallocated, ReorderBufferTXN.
308 */
309 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)310 ReorderBufferGetTXN(ReorderBuffer *rb)
311 {
312 ReorderBufferTXN *txn;
313
314 /* check the slab cache */
315 if (rb->nr_cached_transactions > 0)
316 {
317 rb->nr_cached_transactions--;
318 txn = (ReorderBufferTXN *)
319 dlist_container(ReorderBufferTXN, node,
320 dlist_pop_head_node(&rb->cached_transactions));
321 }
322 else
323 {
324 txn = (ReorderBufferTXN *)
325 MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
326 }
327
328 memset(txn, 0, sizeof(ReorderBufferTXN));
329
330 dlist_init(&txn->changes);
331 dlist_init(&txn->tuplecids);
332 dlist_init(&txn->subtxns);
333
334 return txn;
335 }
336
337 /*
338 * Free a ReorderBufferTXN.
339 *
340 * Deallocation might be delayed for efficiency purposes, for details check
341 * the comments above max_cached_changes's definition.
342 */
343 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)344 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
345 {
346 /* clean the lookup cache if we were cached (quite likely) */
347 if (rb->by_txn_last_xid == txn->xid)
348 {
349 rb->by_txn_last_xid = InvalidTransactionId;
350 rb->by_txn_last_txn = NULL;
351 }
352
353 /* free data that's contained */
354
355 if (txn->tuplecid_hash != NULL)
356 {
357 hash_destroy(txn->tuplecid_hash);
358 txn->tuplecid_hash = NULL;
359 }
360
361 if (txn->invalidations)
362 {
363 pfree(txn->invalidations);
364 txn->invalidations = NULL;
365 }
366
367 /* Reset the toast hash */
368 ReorderBufferToastReset(rb, txn);
369
370 /* check whether to put into the slab cache */
371 if (rb->nr_cached_transactions < max_cached_transactions)
372 {
373 rb->nr_cached_transactions++;
374 dlist_push_head(&rb->cached_transactions, &txn->node);
375 VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
376 VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
377 }
378 else
379 {
380 pfree(txn);
381 }
382 }
383
384 /*
385 * Get an unused, possibly preallocated, ReorderBufferChange.
386 */
387 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)388 ReorderBufferGetChange(ReorderBuffer *rb)
389 {
390 ReorderBufferChange *change;
391
392 /* check the slab cache */
393 if (rb->nr_cached_changes)
394 {
395 rb->nr_cached_changes--;
396 change = (ReorderBufferChange *)
397 dlist_container(ReorderBufferChange, node,
398 dlist_pop_head_node(&rb->cached_changes));
399 }
400 else
401 {
402 change = (ReorderBufferChange *)
403 MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
404 }
405
406 memset(change, 0, sizeof(ReorderBufferChange));
407 return change;
408 }
409
410 /*
411 * Free an ReorderBufferChange.
412 *
413 * Deallocation might be delayed for efficiency purposes, for details check
414 * the comments above max_cached_changes's definition.
415 */
416 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change)417 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
418 {
419 /* free contained data */
420 switch (change->action)
421 {
422 case REORDER_BUFFER_CHANGE_INSERT:
423 case REORDER_BUFFER_CHANGE_UPDATE:
424 case REORDER_BUFFER_CHANGE_DELETE:
425 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
426 if (change->data.tp.newtuple)
427 {
428 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
429 change->data.tp.newtuple = NULL;
430 }
431
432 if (change->data.tp.oldtuple)
433 {
434 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
435 change->data.tp.oldtuple = NULL;
436 }
437 break;
438 case REORDER_BUFFER_CHANGE_MESSAGE:
439 if (change->data.msg.prefix != NULL)
440 pfree(change->data.msg.prefix);
441 change->data.msg.prefix = NULL;
442 if (change->data.msg.message != NULL)
443 pfree(change->data.msg.message);
444 change->data.msg.message = NULL;
445 break;
446 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
447 if (change->data.snapshot)
448 {
449 ReorderBufferFreeSnap(rb, change->data.snapshot);
450 change->data.snapshot = NULL;
451 }
452 break;
453 /* no data in addition to the struct itself */
454 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
455 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
456 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
457 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
458 break;
459 }
460
461 /* check whether to put into the slab cache */
462 if (rb->nr_cached_changes < max_cached_changes)
463 {
464 rb->nr_cached_changes++;
465 dlist_push_head(&rb->cached_changes, &change->node);
466 VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
467 VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
468 }
469 else
470 {
471 pfree(change);
472 }
473 }
474
475
476 /*
477 * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
478 * least a tuple of size tuple_len (excluding header overhead).
479 */
480 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)481 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
482 {
483 ReorderBufferTupleBuf *tuple;
484 Size alloc_len;
485
486 alloc_len = tuple_len + SizeofHeapTupleHeader;
487
488 /*
489 * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
490 * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
491 * generated for oldtuples can be bigger, as they don't have out-of-line
492 * toast columns.
493 */
494 if (alloc_len < MaxHeapTupleSize)
495 alloc_len = MaxHeapTupleSize;
496
497
498 /* if small enough, check the slab cache */
499 if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
500 {
501 rb->nr_cached_tuplebufs--;
502 tuple = slist_container(ReorderBufferTupleBuf, node,
503 slist_pop_head_node(&rb->cached_tuplebufs));
504 Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
505 #ifdef USE_ASSERT_CHECKING
506 memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
507 VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
508 #endif
509 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
510 #ifdef USE_ASSERT_CHECKING
511 memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
512 VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
513 #endif
514 }
515 else
516 {
517 tuple = (ReorderBufferTupleBuf *)
518 MemoryContextAlloc(rb->context,
519 sizeof(ReorderBufferTupleBuf) +
520 MAXIMUM_ALIGNOF + alloc_len);
521 tuple->alloc_tuple_size = alloc_len;
522 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
523 }
524
525 return tuple;
526 }
527
528 /*
529 * Free an ReorderBufferTupleBuf.
530 *
531 * Deallocation might be delayed for efficiency purposes, for details check
532 * the comments above max_cached_changes's definition.
533 */
534 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)535 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
536 {
537 /* check whether to put into the slab cache, oversized tuples never are */
538 if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
539 rb->nr_cached_tuplebufs < max_cached_tuplebufs)
540 {
541 rb->nr_cached_tuplebufs++;
542 slist_push_head(&rb->cached_tuplebufs, &tuple->node);
543 VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
544 VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
545 VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
546 VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
547 }
548 else
549 {
550 pfree(tuple);
551 }
552 }
553
554 /*
555 * Return the ReorderBufferTXN from the given buffer, specified by Xid.
556 * If create is true, and a transaction doesn't already exist, create it
557 * (with the given LSN, and as top transaction if that's specified);
558 * when this happens, is_new is set to true.
559 */
560 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)561 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
562 bool *is_new, XLogRecPtr lsn, bool create_as_top)
563 {
564 ReorderBufferTXN *txn;
565 ReorderBufferTXNByIdEnt *ent;
566 bool found;
567
568 Assert(TransactionIdIsValid(xid));
569
570 /*
571 * Check the one-entry lookup cache first
572 */
573 if (TransactionIdIsValid(rb->by_txn_last_xid) &&
574 rb->by_txn_last_xid == xid)
575 {
576 txn = rb->by_txn_last_txn;
577
578 if (txn != NULL)
579 {
580 /* found it, and it's valid */
581 if (is_new)
582 *is_new = false;
583 return txn;
584 }
585
586 /*
587 * cached as non-existent, and asked not to create? Then nothing else
588 * to do.
589 */
590 if (!create)
591 return NULL;
592 /* otherwise fall through to create it */
593 }
594
595 /*
596 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
597 * to create an entry.
598 */
599
600 /* search the lookup table */
601 ent = (ReorderBufferTXNByIdEnt *)
602 hash_search(rb->by_txn,
603 (void *) &xid,
604 create ? HASH_ENTER : HASH_FIND,
605 &found);
606 if (found)
607 txn = ent->txn;
608 else if (create)
609 {
610 /* initialize the new entry, if creation was requested */
611 Assert(ent != NULL);
612 Assert(lsn != InvalidXLogRecPtr);
613
614 ent->txn = ReorderBufferGetTXN(rb);
615 ent->txn->xid = xid;
616 txn = ent->txn;
617 txn->first_lsn = lsn;
618 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
619
620 if (create_as_top)
621 {
622 dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
623 AssertTXNLsnOrder(rb);
624 }
625 }
626 else
627 txn = NULL; /* not found and not asked to create */
628
629 /* update cache */
630 rb->by_txn_last_xid = xid;
631 rb->by_txn_last_txn = txn;
632
633 if (is_new)
634 *is_new = !found;
635
636 Assert(!create || txn != NULL);
637 return txn;
638 }
639
640 /*
641 * Queue a change into a transaction so it can be replayed upon commit.
642 */
643 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change)644 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
645 ReorderBufferChange *change)
646 {
647 ReorderBufferTXN *txn;
648
649 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
650
651 change->lsn = lsn;
652 Assert(InvalidXLogRecPtr != lsn);
653 dlist_push_tail(&txn->changes, &change->node);
654 txn->nentries++;
655 txn->nentries_mem++;
656
657 ReorderBufferCheckSerializeTXN(rb, txn);
658 }
659
660 /*
661 * Queue message into a transaction so it can be processed upon commit.
662 */
663 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)664 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
665 Snapshot snapshot, XLogRecPtr lsn,
666 bool transactional, const char *prefix,
667 Size message_size, const char *message)
668 {
669 if (transactional)
670 {
671 MemoryContext oldcontext;
672 ReorderBufferChange *change;
673
674 Assert(xid != InvalidTransactionId);
675
676 oldcontext = MemoryContextSwitchTo(rb->context);
677
678 change = ReorderBufferGetChange(rb);
679 change->action = REORDER_BUFFER_CHANGE_MESSAGE;
680 change->data.msg.prefix = pstrdup(prefix);
681 change->data.msg.message_size = message_size;
682 change->data.msg.message = palloc(message_size);
683 memcpy(change->data.msg.message, message, message_size);
684
685 ReorderBufferQueueChange(rb, xid, lsn, change);
686
687 MemoryContextSwitchTo(oldcontext);
688 }
689 else
690 {
691 ReorderBufferTXN *txn = NULL;
692 volatile Snapshot snapshot_now = snapshot;
693
694 if (xid != InvalidTransactionId)
695 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
696
697 /* setup snapshot to allow catalog access */
698 SetupHistoricSnapshot(snapshot_now, NULL);
699 PG_TRY();
700 {
701 rb->message(rb, txn, lsn, false, prefix, message_size, message);
702
703 TeardownHistoricSnapshot(false);
704 }
705 PG_CATCH();
706 {
707 TeardownHistoricSnapshot(true);
708 PG_RE_THROW();
709 }
710 PG_END_TRY();
711 }
712 }
713
714 /*
715 * AssertTXNLsnOrder
716 * Verify LSN ordering of transaction lists in the reorderbuffer
717 *
718 * Other LSN-related invariants are checked too.
719 *
720 * No-op if assertions are not in use.
721 */
722 static void
AssertTXNLsnOrder(ReorderBuffer * rb)723 AssertTXNLsnOrder(ReorderBuffer *rb)
724 {
725 #ifdef USE_ASSERT_CHECKING
726 dlist_iter iter;
727 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
728 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
729
730 dlist_foreach(iter, &rb->toplevel_by_lsn)
731 {
732 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
733 iter.cur);
734
735 /* start LSN must be set */
736 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
737
738 /* If there is an end LSN, it must be higher than start LSN */
739 if (cur_txn->end_lsn != InvalidXLogRecPtr)
740 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
741
742 /* Current initial LSN must be strictly higher than previous */
743 if (prev_first_lsn != InvalidXLogRecPtr)
744 Assert(prev_first_lsn < cur_txn->first_lsn);
745
746 /* known-as-subtxn txns must not be listed */
747 Assert(!cur_txn->is_known_as_subxact);
748
749 prev_first_lsn = cur_txn->first_lsn;
750 }
751
752 dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
753 {
754 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
755 base_snapshot_node,
756 iter.cur);
757
758 /* base snapshot (and its LSN) must be set */
759 Assert(cur_txn->base_snapshot != NULL);
760 Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
761
762 /* current LSN must be strictly higher than previous */
763 if (prev_base_snap_lsn != InvalidXLogRecPtr)
764 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
765
766 /* known-as-subtxn txns must not be listed */
767 Assert(!cur_txn->is_known_as_subxact);
768
769 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
770 }
771 #endif
772 }
773
774 /*
775 * ReorderBufferGetOldestTXN
776 * Return oldest transaction in reorderbuffer
777 */
778 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)779 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
780 {
781 ReorderBufferTXN *txn;
782
783 AssertTXNLsnOrder(rb);
784
785 if (dlist_is_empty(&rb->toplevel_by_lsn))
786 return NULL;
787
788 txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
789
790 Assert(!txn->is_known_as_subxact);
791 Assert(txn->first_lsn != InvalidXLogRecPtr);
792 return txn;
793 }
794
795 /*
796 * ReorderBufferGetOldestXmin
797 * Return oldest Xmin in reorderbuffer
798 *
799 * Returns oldest possibly running Xid from the point of view of snapshots
800 * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
801 * there are none.
802 *
803 * Since snapshots are assigned monotonically, this equals the Xmin of the
804 * base snapshot with minimal base_snapshot_lsn.
805 */
806 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)807 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
808 {
809 ReorderBufferTXN *txn;
810
811 AssertTXNLsnOrder(rb);
812
813 if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
814 return InvalidTransactionId;
815
816 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
817 &rb->txns_by_base_snapshot_lsn);
818 return txn->base_snapshot->xmin;
819 }
820
821 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)822 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
823 {
824 rb->current_restart_decoding_lsn = ptr;
825 }
826
827 /*
828 * ReorderBufferAssignChild
829 *
830 * Make note that we know that subxid is a subtransaction of xid, seen as of
831 * the given lsn.
832 */
833 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)834 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
835 TransactionId subxid, XLogRecPtr lsn)
836 {
837 ReorderBufferTXN *txn;
838 ReorderBufferTXN *subtxn;
839 bool new_top;
840 bool new_sub;
841
842 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
843 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
844
845 if (!new_sub)
846 {
847 if (subtxn->is_known_as_subxact)
848 {
849 /* already associated, nothing to do */
850 return;
851 }
852 else
853 {
854 /*
855 * We already saw this transaction, but initially added it to the list
856 * of top-level txns. Now that we know it's not top-level, remove
857 * it from there.
858 */
859 dlist_delete(&subtxn->node);
860 }
861 }
862
863 subtxn->is_known_as_subxact = true;
864 subtxn->toplevel_xid = xid;
865 Assert(subtxn->nsubtxns == 0);
866
867 /* add to subtransaction list */
868 dlist_push_tail(&txn->subtxns, &subtxn->node);
869 txn->nsubtxns++;
870
871 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
872 ReorderBufferTransferSnapToParent(txn, subtxn);
873
874 /* Verify LSN-ordering invariant */
875 AssertTXNLsnOrder(rb);
876 }
877
878 /*
879 * ReorderBufferTransferSnapToParent
880 * Transfer base snapshot from subtxn to top-level txn, if needed
881 *
882 * This is done if the top-level txn doesn't have a base snapshot, or if the
883 * subtxn's base snapshot has an earlier LSN than the top-level txn's base
884 * snapshot's LSN. This can happen if there are no changes in the toplevel
885 * txn but there are some in the subtxn, or the first change in subtxn has
886 * earlier LSN than first change in the top-level txn and we learned about
887 * their kinship only now.
888 *
889 * The subtransaction's snapshot is cleared regardless of the transfer
890 * happening, since it's not needed anymore in either case.
891 *
892 * We do this as soon as we become aware of their kinship, to avoid queueing
893 * extra snapshots to txns known-as-subtxns -- only top-level txns will
894 * receive further snapshots.
895 */
896 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)897 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
898 ReorderBufferTXN *subtxn)
899 {
900 Assert(subtxn->toplevel_xid == txn->xid);
901
902 if (subtxn->base_snapshot != NULL)
903 {
904 if (txn->base_snapshot == NULL ||
905 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
906 {
907 /*
908 * If the toplevel transaction already has a base snapshot but
909 * it's newer than the subxact's, purge it.
910 */
911 if (txn->base_snapshot != NULL)
912 {
913 SnapBuildSnapDecRefcount(txn->base_snapshot);
914 dlist_delete(&txn->base_snapshot_node);
915 }
916
917 /*
918 * The snapshot is now the top transaction's; transfer it, and
919 * adjust the list position of the top transaction in the list by
920 * moving it to where the subtransaction is.
921 */
922 txn->base_snapshot = subtxn->base_snapshot;
923 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
924 dlist_insert_before(&subtxn->base_snapshot_node,
925 &txn->base_snapshot_node);
926
927 /*
928 * The subtransaction doesn't have a snapshot anymore (so it
929 * mustn't be in the list.)
930 */
931 subtxn->base_snapshot = NULL;
932 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
933 dlist_delete(&subtxn->base_snapshot_node);
934 }
935 else
936 {
937 /* Base snap of toplevel is fine, so subxact's is not needed */
938 SnapBuildSnapDecRefcount(subtxn->base_snapshot);
939 dlist_delete(&subtxn->base_snapshot_node);
940 subtxn->base_snapshot = NULL;
941 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
942 }
943 }
944 }
945
946 /*
947 * Associate a subtransaction with its toplevel transaction at commit
948 * time. There may be no further changes added after this.
949 */
950 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)951 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
952 TransactionId subxid, XLogRecPtr commit_lsn,
953 XLogRecPtr end_lsn)
954 {
955 ReorderBufferTXN *subtxn;
956
957 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
958 InvalidXLogRecPtr, false);
959
960 /*
961 * No need to do anything if that subtxn didn't contain any changes
962 */
963 if (!subtxn)
964 return;
965
966 subtxn->final_lsn = commit_lsn;
967 subtxn->end_lsn = end_lsn;
968
969 /*
970 * Assign this subxact as a child of the toplevel xact (no-op if already
971 * done.)
972 */
973 ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
974 }
975
976
977 /*
978 * Support for efficiently iterating over a transaction's and its
979 * subtransactions' changes.
980 *
981 * We do by doing a k-way merge between transactions/subtransactions. For that
982 * we model the current heads of the different transactions as a binary heap
983 * so we easily know which (sub-)transaction has the change with the smallest
984 * lsn next.
985 *
986 * We assume the changes in individual transactions are already sorted by LSN.
987 */
988
989 /*
990 * Binary heap comparison function.
991 */
992 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)993 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
994 {
995 ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
996 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
997 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
998
999 if (pos_a < pos_b)
1000 return 1;
1001 else if (pos_a == pos_b)
1002 return 0;
1003 return -1;
1004 }
1005
1006 /*
1007 * Allocate & initialize an iterator which iterates in lsn order over a
1008 * transaction and all its subtransactions.
1009 *
1010 * Note: The iterator state is returned through iter_state parameter rather
1011 * than the function's return value. This is because the state gets cleaned up
1012 * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1013 * back the state even if this function throws an exception.
1014 */
1015 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)1016 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
1017 ReorderBufferIterTXNState *volatile *iter_state)
1018 {
1019 Size nr_txns = 0;
1020 ReorderBufferIterTXNState *state;
1021 dlist_iter cur_txn_i;
1022 int32 off;
1023
1024 *iter_state = NULL;
1025
1026 /*
1027 * Calculate the size of our heap: one element for every transaction that
1028 * contains changes. (Besides the transactions already in the reorder
1029 * buffer, we count the one we were directly passed.)
1030 */
1031 if (txn->nentries > 0)
1032 nr_txns++;
1033
1034 dlist_foreach(cur_txn_i, &txn->subtxns)
1035 {
1036 ReorderBufferTXN *cur_txn;
1037
1038 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1039
1040 if (cur_txn->nentries > 0)
1041 nr_txns++;
1042 }
1043
1044 /*
1045 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
1046 * need to allocate/build a heap then.
1047 */
1048
1049 /* allocate iteration state */
1050 state = (ReorderBufferIterTXNState *)
1051 MemoryContextAllocZero(rb->context,
1052 sizeof(ReorderBufferIterTXNState) +
1053 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1054
1055 state->nr_txns = nr_txns;
1056 dlist_init(&state->old_change);
1057
1058 for (off = 0; off < state->nr_txns; off++)
1059 {
1060 state->entries[off].fd = -1;
1061 state->entries[off].segno = 0;
1062 }
1063
1064 /* allocate heap */
1065 state->heap = binaryheap_allocate(state->nr_txns,
1066 ReorderBufferIterCompare,
1067 state);
1068
1069 /* Now that the state fields are initialized, it is safe to return it. */
1070 *iter_state = state;
1071
1072 /*
1073 * Now insert items into the binary heap, in an unordered fashion. (We
1074 * will run a heap assembly step at the end; this is more efficient.)
1075 */
1076
1077 off = 0;
1078
1079 /* add toplevel transaction if it contains changes */
1080 if (txn->nentries > 0)
1081 {
1082 ReorderBufferChange *cur_change;
1083
1084 if (txn->serialized)
1085 {
1086 /* serialize remaining changes */
1087 ReorderBufferSerializeTXN(rb, txn);
1088 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1089 &state->entries[off].segno);
1090 }
1091
1092 cur_change = dlist_head_element(ReorderBufferChange, node,
1093 &txn->changes);
1094
1095 state->entries[off].lsn = cur_change->lsn;
1096 state->entries[off].change = cur_change;
1097 state->entries[off].txn = txn;
1098
1099 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1100 }
1101
1102 /* add subtransactions if they contain changes */
1103 dlist_foreach(cur_txn_i, &txn->subtxns)
1104 {
1105 ReorderBufferTXN *cur_txn;
1106
1107 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1108
1109 if (cur_txn->nentries > 0)
1110 {
1111 ReorderBufferChange *cur_change;
1112
1113 if (cur_txn->serialized)
1114 {
1115 /* serialize remaining changes */
1116 ReorderBufferSerializeTXN(rb, cur_txn);
1117 ReorderBufferRestoreChanges(rb, cur_txn,
1118 &state->entries[off].fd,
1119 &state->entries[off].segno);
1120 }
1121 cur_change = dlist_head_element(ReorderBufferChange, node,
1122 &cur_txn->changes);
1123
1124 state->entries[off].lsn = cur_change->lsn;
1125 state->entries[off].change = cur_change;
1126 state->entries[off].txn = cur_txn;
1127
1128 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1129 }
1130 }
1131
1132 /* assemble a valid binary heap */
1133 binaryheap_build(state->heap);
1134 }
1135
1136 /*
1137 * Return the next change when iterating over a transaction and its
1138 * subtransactions.
1139 *
1140 * Returns NULL when no further changes exist.
1141 */
1142 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1143 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1144 {
1145 ReorderBufferChange *change;
1146 ReorderBufferIterTXNEntry *entry;
1147 int32 off;
1148
1149 /* nothing there anymore */
1150 if (state->heap->bh_size == 0)
1151 return NULL;
1152
1153 off = DatumGetInt32(binaryheap_first(state->heap));
1154 entry = &state->entries[off];
1155
1156 /* free memory we might have "leaked" in the previous *Next call */
1157 if (!dlist_is_empty(&state->old_change))
1158 {
1159 change = dlist_container(ReorderBufferChange, node,
1160 dlist_pop_head_node(&state->old_change));
1161 ReorderBufferReturnChange(rb, change);
1162 Assert(dlist_is_empty(&state->old_change));
1163 }
1164
1165 change = entry->change;
1166
1167 /*
1168 * update heap with information about which transaction has the next
1169 * relevant change in LSN order
1170 */
1171
1172 /* there are in-memory changes */
1173 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1174 {
1175 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1176 ReorderBufferChange *next_change =
1177 dlist_container(ReorderBufferChange, node, next);
1178
1179 /* txn stays the same */
1180 state->entries[off].lsn = next_change->lsn;
1181 state->entries[off].change = next_change;
1182
1183 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1184 return change;
1185 }
1186
1187 /* try to load changes from disk */
1188 if (entry->txn->nentries != entry->txn->nentries_mem)
1189 {
1190 /*
1191 * Ugly: restoring changes will reuse *Change records, thus delete the
1192 * current one from the per-tx list and only free in the next call.
1193 */
1194 dlist_delete(&change->node);
1195 dlist_push_tail(&state->old_change, &change->node);
1196
1197 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1198 &state->entries[off].segno))
1199 {
1200 /* successfully restored changes from disk */
1201 ReorderBufferChange *next_change =
1202 dlist_head_element(ReorderBufferChange, node,
1203 &entry->txn->changes);
1204
1205 elog(DEBUG2, "restored %u/%u changes from disk",
1206 (uint32) entry->txn->nentries_mem,
1207 (uint32) entry->txn->nentries);
1208
1209 Assert(entry->txn->nentries_mem);
1210 /* txn stays the same */
1211 state->entries[off].lsn = next_change->lsn;
1212 state->entries[off].change = next_change;
1213 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1214
1215 return change;
1216 }
1217 }
1218
1219 /* ok, no changes there anymore, remove */
1220 binaryheap_remove_first(state->heap);
1221
1222 return change;
1223 }
1224
1225 /*
1226 * Deallocate the iterator
1227 */
1228 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1229 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1230 ReorderBufferIterTXNState *state)
1231 {
1232 int32 off;
1233
1234 for (off = 0; off < state->nr_txns; off++)
1235 {
1236 if (state->entries[off].fd != -1)
1237 FileClose(state->entries[off].fd);
1238 }
1239
1240 /* free memory we might have "leaked" in the last *Next call */
1241 if (!dlist_is_empty(&state->old_change))
1242 {
1243 ReorderBufferChange *change;
1244
1245 change = dlist_container(ReorderBufferChange, node,
1246 dlist_pop_head_node(&state->old_change));
1247 ReorderBufferReturnChange(rb, change);
1248 Assert(dlist_is_empty(&state->old_change));
1249 }
1250
1251 binaryheap_free(state->heap);
1252 pfree(state);
1253 }
1254
1255 /*
1256 * Cleanup the contents of a transaction, usually after the transaction
1257 * committed or aborted.
1258 */
1259 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1260 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1261 {
1262 bool found;
1263 dlist_mutable_iter iter;
1264
1265 /* cleanup subtransactions & their changes */
1266 dlist_foreach_modify(iter, &txn->subtxns)
1267 {
1268 ReorderBufferTXN *subtxn;
1269
1270 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1271
1272 /*
1273 * Subtransactions are always associated to the toplevel TXN, even if
1274 * they originally were happening inside another subtxn, so we won't
1275 * ever recurse more than one level deep here.
1276 */
1277 Assert(subtxn->is_known_as_subxact);
1278 Assert(subtxn->nsubtxns == 0);
1279
1280 ReorderBufferCleanupTXN(rb, subtxn);
1281 }
1282
1283 /* cleanup changes in the toplevel txn */
1284 dlist_foreach_modify(iter, &txn->changes)
1285 {
1286 ReorderBufferChange *change;
1287
1288 change = dlist_container(ReorderBufferChange, node, iter.cur);
1289
1290 ReorderBufferReturnChange(rb, change);
1291 }
1292
1293 /*
1294 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1295 * They are always stored in the toplevel transaction.
1296 */
1297 dlist_foreach_modify(iter, &txn->tuplecids)
1298 {
1299 ReorderBufferChange *change;
1300
1301 change = dlist_container(ReorderBufferChange, node, iter.cur);
1302 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1303 ReorderBufferReturnChange(rb, change);
1304 }
1305
1306 /*
1307 * Cleanup the base snapshot, if set.
1308 */
1309 if (txn->base_snapshot != NULL)
1310 {
1311 SnapBuildSnapDecRefcount(txn->base_snapshot);
1312 dlist_delete(&txn->base_snapshot_node);
1313 }
1314
1315 /* delete from list of known subxacts */
1316 if (txn->is_known_as_subxact)
1317 {
1318 /* NB: nsubxacts count of parent will be too high now */
1319 dlist_delete(&txn->node);
1320 }
1321 /* delete from LSN ordered list of toplevel TXNs */
1322 else
1323 {
1324 dlist_delete(&txn->node);
1325 }
1326
1327 /* now remove reference from buffer */
1328 hash_search(rb->by_txn,
1329 (void *) &txn->xid,
1330 HASH_REMOVE,
1331 &found);
1332 Assert(found);
1333
1334 /* remove entries spilled to disk */
1335 if (txn->serialized)
1336 ReorderBufferRestoreCleanup(rb, txn);
1337
1338 /* deallocate */
1339 ReorderBufferReturnTXN(rb, txn);
1340 }
1341
1342 /*
1343 * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1344 * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1345 */
1346 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1347 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1348 {
1349 dlist_iter iter;
1350 HASHCTL hash_ctl;
1351
1352 if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1353 return;
1354
1355 memset(&hash_ctl, 0, sizeof(hash_ctl));
1356
1357 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1358 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1359 hash_ctl.hcxt = rb->context;
1360
1361 /*
1362 * create the hash with the exact number of to-be-stored tuplecids from
1363 * the start
1364 */
1365 txn->tuplecid_hash =
1366 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1367 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1368
1369 dlist_foreach(iter, &txn->tuplecids)
1370 {
1371 ReorderBufferTupleCidKey key;
1372 ReorderBufferTupleCidEnt *ent;
1373 bool found;
1374 ReorderBufferChange *change;
1375
1376 change = dlist_container(ReorderBufferChange, node, iter.cur);
1377
1378 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1379
1380 /* be careful about padding */
1381 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1382
1383 key.relnode = change->data.tuplecid.node;
1384
1385 ItemPointerCopy(&change->data.tuplecid.tid,
1386 &key.tid);
1387
1388 ent = (ReorderBufferTupleCidEnt *)
1389 hash_search(txn->tuplecid_hash,
1390 (void *) &key,
1391 HASH_ENTER | HASH_FIND,
1392 &found);
1393 if (!found)
1394 {
1395 ent->cmin = change->data.tuplecid.cmin;
1396 ent->cmax = change->data.tuplecid.cmax;
1397 ent->combocid = change->data.tuplecid.combocid;
1398 }
1399 else
1400 {
1401 /*
1402 * Maybe we already saw this tuple before in this transaction,
1403 * but if so it must have the same cmin.
1404 */
1405 Assert(ent->cmin == change->data.tuplecid.cmin);
1406
1407 /*
1408 * cmax may be initially invalid, but once set it can only grow,
1409 * and never become invalid again.
1410 */
1411 Assert((ent->cmax == InvalidCommandId) ||
1412 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1413 (change->data.tuplecid.cmax > ent->cmax)));
1414 ent->cmax = change->data.tuplecid.cmax;
1415 }
1416 }
1417 }
1418
1419 /*
1420 * Copy a provided snapshot so we can modify it privately. This is needed so
1421 * that catalog modifying transactions can look into intermediate catalog
1422 * states.
1423 */
1424 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1425 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1426 ReorderBufferTXN *txn, CommandId cid)
1427 {
1428 Snapshot snap;
1429 dlist_iter iter;
1430 int i = 0;
1431 Size size;
1432
1433 size = sizeof(SnapshotData) +
1434 sizeof(TransactionId) * orig_snap->xcnt +
1435 sizeof(TransactionId) * (txn->nsubtxns + 1);
1436
1437 snap = MemoryContextAllocZero(rb->context, size);
1438 memcpy(snap, orig_snap, sizeof(SnapshotData));
1439
1440 snap->copied = true;
1441 snap->active_count = 1; /* mark as active so nobody frees it */
1442 snap->regd_count = 0;
1443 snap->xip = (TransactionId *) (snap + 1);
1444
1445 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1446
1447 /*
1448 * snap->subxip contains all txids that belong to our transaction which we
1449 * need to check via cmin/cmax. Thats why we store the toplevel
1450 * transaction in there as well.
1451 */
1452 snap->subxip = snap->xip + snap->xcnt;
1453 snap->subxip[i++] = txn->xid;
1454
1455 /*
1456 * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1457 * Since it's an upper boundary it is safe to use it for the allocation
1458 * above.
1459 */
1460 snap->subxcnt = 1;
1461
1462 dlist_foreach(iter, &txn->subtxns)
1463 {
1464 ReorderBufferTXN *sub_txn;
1465
1466 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1467 snap->subxip[i++] = sub_txn->xid;
1468 snap->subxcnt++;
1469 }
1470
1471 /* sort so we can bsearch() later */
1472 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1473
1474 /* store the specified current CommandId */
1475 snap->curcid = cid;
1476
1477 return snap;
1478 }
1479
1480 /*
1481 * Free a previously ReorderBufferCopySnap'ed snapshot
1482 */
1483 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1484 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1485 {
1486 if (snap->copied)
1487 pfree(snap);
1488 else
1489 SnapBuildSnapDecRefcount(snap);
1490 }
1491
1492 /*
1493 * Perform the replay of a transaction and its non-aborted subtransactions.
1494 *
1495 * Subtransactions previously have to be processed by
1496 * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1497 * transaction with ReorderBufferAssignChild.
1498 *
1499 * We currently can only decode a transaction's contents when its commit
1500 * record is read because that's the only place where we know about cache
1501 * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1502 * and subtransactions (using a k-way merge) and replay the changes in lsn
1503 * order.
1504 */
1505 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)1506 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1507 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1508 TimestampTz commit_time,
1509 RepOriginId origin_id, XLogRecPtr origin_lsn)
1510 {
1511 ReorderBufferTXN *txn;
1512 volatile Snapshot snapshot_now;
1513 volatile CommandId command_id = FirstCommandId;
1514 bool using_subtxn;
1515 ReorderBufferIterTXNState *volatile iterstate = NULL;
1516
1517 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1518 false);
1519
1520 /* unknown transaction, nothing to replay */
1521 if (txn == NULL)
1522 return;
1523
1524 txn->final_lsn = commit_lsn;
1525 txn->end_lsn = end_lsn;
1526 txn->commit_time = commit_time;
1527 txn->origin_id = origin_id;
1528 txn->origin_lsn = origin_lsn;
1529
1530 /*
1531 * If this transaction has no snapshot, it didn't make any changes to the
1532 * database, so there's nothing to decode. Note that
1533 * ReorderBufferCommitChild will have transferred any snapshots from
1534 * subtransactions if there were any.
1535 */
1536 if (txn->base_snapshot == NULL)
1537 {
1538 Assert(txn->ninvalidations == 0);
1539 ReorderBufferCleanupTXN(rb, txn);
1540 return;
1541 }
1542
1543 snapshot_now = txn->base_snapshot;
1544
1545 /* build data to be able to lookup the CommandIds of catalog tuples */
1546 ReorderBufferBuildTupleCidHash(rb, txn);
1547
1548 /* setup the initial snapshot */
1549 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1550
1551 /*
1552 * Decoding needs access to syscaches et al., which in turn use
1553 * heavyweight locks and such. Thus we need to have enough state around to
1554 * keep track of those. The easiest way is to simply use a transaction
1555 * internally. That also allows us to easily enforce that nothing writes
1556 * to the database by checking for xid assignments.
1557 *
1558 * When we're called via the SQL SRF there's already a transaction
1559 * started, so start an explicit subtransaction there.
1560 */
1561 using_subtxn = IsTransactionOrTransactionBlock();
1562
1563 PG_TRY();
1564 {
1565 ReorderBufferChange *change;
1566 ReorderBufferChange *specinsert = NULL;
1567
1568 if (using_subtxn)
1569 BeginInternalSubTransaction("replay");
1570 else
1571 StartTransactionCommand();
1572
1573 rb->begin(rb, txn);
1574
1575 ReorderBufferIterTXNInit(rb, txn, &iterstate);
1576 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1577 {
1578 Relation relation = NULL;
1579 Oid reloid;
1580
1581 switch (change->action)
1582 {
1583 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1584
1585 /*
1586 * Confirmation for speculative insertion arrived. Simply
1587 * use as a normal record. It'll be cleaned up at the end
1588 * of INSERT processing.
1589 */
1590 if (specinsert == NULL)
1591 elog(ERROR, "invalid ordering of speculative insertion changes");
1592 Assert(specinsert->data.tp.oldtuple == NULL);
1593 change = specinsert;
1594 change->action = REORDER_BUFFER_CHANGE_INSERT;
1595
1596 /* intentionally fall through */
1597 case REORDER_BUFFER_CHANGE_INSERT:
1598 case REORDER_BUFFER_CHANGE_UPDATE:
1599 case REORDER_BUFFER_CHANGE_DELETE:
1600 Assert(snapshot_now);
1601
1602 reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1603 change->data.tp.relnode.relNode);
1604
1605 /*
1606 * Mapped catalog tuple without data, emitted while
1607 * catalog table was in the process of being rewritten. We
1608 * can fail to look up the relfilenode, because the the
1609 * relmapper has no "historic" view, in contrast to normal
1610 * the normal catalog during decoding. Thus repeated
1611 * rewrites can cause a lookup failure. That's OK because
1612 * we do not decode catalog changes anyway. Normally such
1613 * tuples would be skipped over below, but we can't
1614 * identify whether the table should be logically logged
1615 * without mapping the relfilenode to the oid.
1616 */
1617 if (reloid == InvalidOid &&
1618 change->data.tp.newtuple == NULL &&
1619 change->data.tp.oldtuple == NULL)
1620 goto change_done;
1621 else if (reloid == InvalidOid)
1622 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1623 relpathperm(change->data.tp.relnode,
1624 MAIN_FORKNUM));
1625
1626 relation = RelationIdGetRelation(reloid);
1627
1628 if (!RelationIsValid(relation))
1629 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1630 reloid,
1631 relpathperm(change->data.tp.relnode,
1632 MAIN_FORKNUM));
1633
1634 if (!RelationIsLogicallyLogged(relation))
1635 goto change_done;
1636
1637 /*
1638 * For now ignore sequence changes entirely. Most of the
1639 * time they don't log changes using records we
1640 * understand, so it doesn't make sense to handle the few
1641 * cases we do.
1642 */
1643 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1644 goto change_done;
1645
1646 /* user-triggered change */
1647 if (!IsToastRelation(relation))
1648 {
1649 ReorderBufferToastReplace(rb, txn, relation, change);
1650 rb->apply_change(rb, txn, relation, change);
1651
1652 /*
1653 * Only clear reassembled toast chunks if we're sure
1654 * they're not required anymore. The creator of the
1655 * tuple tells us.
1656 */
1657 if (change->data.tp.clear_toast_afterwards)
1658 ReorderBufferToastReset(rb, txn);
1659 }
1660 /* we're not interested in toast deletions */
1661 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1662 {
1663 /*
1664 * Need to reassemble the full toasted Datum in
1665 * memory, to ensure the chunks don't get reused till
1666 * we're done remove it from the list of this
1667 * transaction's changes. Otherwise it will get
1668 * freed/reused while restoring spooled data from
1669 * disk.
1670 */
1671 Assert(change->data.tp.newtuple != NULL);
1672
1673 dlist_delete(&change->node);
1674 ReorderBufferToastAppendChunk(rb, txn, relation,
1675 change);
1676 }
1677
1678 change_done:
1679
1680 /*
1681 * If speculative insertion was confirmed, the record isn't
1682 * needed anymore.
1683 */
1684 if (specinsert != NULL)
1685 {
1686 ReorderBufferReturnChange(rb, specinsert);
1687 specinsert = NULL;
1688 }
1689
1690 if (relation != NULL)
1691 {
1692 RelationClose(relation);
1693 relation = NULL;
1694 }
1695 break;
1696
1697 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1698
1699 /*
1700 * Speculative insertions are dealt with by delaying the
1701 * processing of the insert until the confirmation record
1702 * arrives. For that we simply unlink the record from the
1703 * chain, so it does not get freed/reused while restoring
1704 * spooled data from disk.
1705 *
1706 * This is safe in the face of concurrent catalog changes
1707 * because the relevant relation can't be changed between
1708 * speculative insertion and confirmation due to
1709 * CheckTableNotInUse() and locking.
1710 */
1711
1712 /* clear out a pending (and thus failed) speculation */
1713 if (specinsert != NULL)
1714 {
1715 ReorderBufferReturnChange(rb, specinsert);
1716 specinsert = NULL;
1717 }
1718
1719 /* and memorize the pending insertion */
1720 dlist_delete(&change->node);
1721 specinsert = change;
1722 break;
1723
1724 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
1725
1726 /*
1727 * Abort for speculative insertion arrived. So cleanup the
1728 * specinsert tuple and toast hash.
1729 *
1730 * Note that we get the spec abort change for each toast
1731 * entry but we need to perform the cleanup only the first
1732 * time we get it for the main table.
1733 */
1734 if (specinsert != NULL)
1735 {
1736 /*
1737 * We must clean the toast hash before processing a
1738 * completely new tuple to avoid confusion about the
1739 * previous tuple's toast chunks.
1740 */
1741 Assert(change->data.tp.clear_toast_afterwards);
1742 ReorderBufferToastReset(rb, txn);
1743
1744 /* We don't need this record anymore. */
1745 ReorderBufferReturnChange(rb, specinsert);
1746 specinsert = NULL;
1747 }
1748 break;
1749
1750 case REORDER_BUFFER_CHANGE_MESSAGE:
1751 rb->message(rb, txn, change->lsn, true,
1752 change->data.msg.prefix,
1753 change->data.msg.message_size,
1754 change->data.msg.message);
1755 break;
1756
1757 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1758 /* get rid of the old */
1759 TeardownHistoricSnapshot(false);
1760
1761 if (snapshot_now->copied)
1762 {
1763 ReorderBufferFreeSnap(rb, snapshot_now);
1764 snapshot_now =
1765 ReorderBufferCopySnap(rb, change->data.snapshot,
1766 txn, command_id);
1767 }
1768
1769 /*
1770 * Restored from disk, need to be careful not to double
1771 * free. We could introduce refcounting for that, but for
1772 * now this seems infrequent enough not to care.
1773 */
1774 else if (change->data.snapshot->copied)
1775 {
1776 snapshot_now =
1777 ReorderBufferCopySnap(rb, change->data.snapshot,
1778 txn, command_id);
1779 }
1780 else
1781 {
1782 snapshot_now = change->data.snapshot;
1783 }
1784
1785
1786 /* and continue with the new one */
1787 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1788 break;
1789
1790 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1791 Assert(change->data.command_id != InvalidCommandId);
1792
1793 if (command_id < change->data.command_id)
1794 {
1795 command_id = change->data.command_id;
1796
1797 if (!snapshot_now->copied)
1798 {
1799 /* we don't use the global one anymore */
1800 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1801 txn, command_id);
1802 }
1803
1804 snapshot_now->curcid = command_id;
1805
1806 TeardownHistoricSnapshot(false);
1807 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1808
1809 /*
1810 * Every time the CommandId is incremented, we could
1811 * see new catalog contents, so execute all
1812 * invalidations.
1813 */
1814 ReorderBufferExecuteInvalidations(rb, txn);
1815 }
1816
1817 break;
1818
1819 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1820 elog(ERROR, "tuplecid value in changequeue");
1821 break;
1822 }
1823 }
1824
1825 /* speculative insertion record must be freed by now */
1826 Assert(!specinsert);
1827
1828 /* clean up the iterator */
1829 ReorderBufferIterTXNFinish(rb, iterstate);
1830 iterstate = NULL;
1831
1832 /* call commit callback */
1833 rb->commit(rb, txn, commit_lsn);
1834
1835 /* this is just a sanity check against bad output plugin behaviour */
1836 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1837 elog(ERROR, "output plugin used XID %u",
1838 GetCurrentTransactionId());
1839
1840 /* cleanup */
1841 TeardownHistoricSnapshot(false);
1842
1843 /*
1844 * Aborting the current (sub-)transaction as a whole has the right
1845 * semantics. We want all locks acquired in here to be released, not
1846 * reassigned to the parent and we do not want any database access
1847 * have persistent effects.
1848 */
1849 AbortCurrentTransaction();
1850
1851 /* make sure there's no cache pollution */
1852 ReorderBufferExecuteInvalidations(rb, txn);
1853
1854 if (using_subtxn)
1855 RollbackAndReleaseCurrentSubTransaction();
1856
1857 if (snapshot_now->copied)
1858 ReorderBufferFreeSnap(rb, snapshot_now);
1859
1860 /* remove potential on-disk data, and deallocate */
1861 ReorderBufferCleanupTXN(rb, txn);
1862 }
1863 PG_CATCH();
1864 {
1865 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1866 if (iterstate)
1867 ReorderBufferIterTXNFinish(rb, iterstate);
1868
1869 TeardownHistoricSnapshot(true);
1870
1871 /*
1872 * Force cache invalidation to happen outside of a valid transaction
1873 * to prevent catalog access as we just caught an error.
1874 */
1875 AbortCurrentTransaction();
1876
1877 /* make sure there's no cache pollution */
1878 ReorderBufferExecuteInvalidations(rb, txn);
1879
1880 if (using_subtxn)
1881 RollbackAndReleaseCurrentSubTransaction();
1882
1883 if (snapshot_now->copied)
1884 ReorderBufferFreeSnap(rb, snapshot_now);
1885
1886 /* remove potential on-disk data, and deallocate */
1887 ReorderBufferCleanupTXN(rb, txn);
1888
1889 PG_RE_THROW();
1890 }
1891 PG_END_TRY();
1892 }
1893
1894 /*
1895 * Abort a transaction that possibly has previous changes. Needs to be first
1896 * called for subtransactions and then for the toplevel xid.
1897 *
1898 * NB: Transactions handled here have to have actively aborted (i.e. have
1899 * produced an abort record). Implicitly aborted transactions are handled via
1900 * ReorderBufferAbortOld(); transactions we're just not interested in, but
1901 * which have committed are handled in ReorderBufferForget().
1902 *
1903 * This function purges this transaction and its contents from memory and
1904 * disk.
1905 */
1906 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1907 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1908 {
1909 ReorderBufferTXN *txn;
1910
1911 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1912 false);
1913
1914 /* unknown, nothing to remove */
1915 if (txn == NULL)
1916 return;
1917
1918 /* cosmetic... */
1919 txn->final_lsn = lsn;
1920
1921 /* remove potential on-disk data, and deallocate */
1922 ReorderBufferCleanupTXN(rb, txn);
1923 }
1924
1925 /*
1926 * Abort all transactions that aren't actually running anymore because the
1927 * server restarted.
1928 *
1929 * NB: These really have to be transactions that have aborted due to a server
1930 * crash/immediate restart, as we don't deal with invalidations here.
1931 */
1932 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)1933 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1934 {
1935 dlist_mutable_iter it;
1936
1937 /*
1938 * Iterate through all (potential) toplevel TXNs and abort all that are
1939 * older than what possibly can be running. Once we've found the first
1940 * that is alive we stop, there might be some that acquired an xid earlier
1941 * but started writing later, but it's unlikely and they will be cleaned
1942 * up in a later call to this function.
1943 */
1944 dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1945 {
1946 ReorderBufferTXN *txn;
1947
1948 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1949
1950 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1951 {
1952 elog(DEBUG2, "aborting old transaction %u", txn->xid);
1953
1954 /* remove potential on-disk data, and deallocate this tx */
1955 ReorderBufferCleanupTXN(rb, txn);
1956 }
1957 else
1958 return;
1959 }
1960 }
1961
1962 /*
1963 * Forget the contents of a transaction if we aren't interested in it's
1964 * contents. Needs to be first called for subtransactions and then for the
1965 * toplevel xid.
1966 *
1967 * This is significantly different to ReorderBufferAbort() because
1968 * transactions that have committed need to be treated differently from aborted
1969 * ones since they may have modified the catalog.
1970 *
1971 * Note that this is only allowed to be called in the moment a transaction
1972 * commit has just been read, not earlier; otherwise later records referring
1973 * to this xid might re-create the transaction incompletely.
1974 */
1975 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1976 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1977 {
1978 ReorderBufferTXN *txn;
1979
1980 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1981 false);
1982
1983 /* unknown, nothing to forget */
1984 if (txn == NULL)
1985 return;
1986
1987 /* cosmetic... */
1988 txn->final_lsn = lsn;
1989
1990 /*
1991 * Process cache invalidation messages if there are any. Even if we're not
1992 * interested in the transaction's contents, it could have manipulated the
1993 * catalog and we need to update the caches according to that.
1994 */
1995 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1996 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1997 txn->invalidations);
1998 else
1999 Assert(txn->ninvalidations == 0);
2000
2001 /* remove potential on-disk data, and deallocate */
2002 ReorderBufferCleanupTXN(rb, txn);
2003 }
2004
2005 /*
2006 * Execute invalidations happening outside the context of a decoded
2007 * transaction. That currently happens either for xid-less commits
2008 * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
2009 * transactions (via ReorderBufferForget()).
2010 */
2011 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)2012 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
2013 SharedInvalidationMessage *invalidations)
2014 {
2015 bool use_subtxn = IsTransactionOrTransactionBlock();
2016 int i;
2017
2018 if (use_subtxn)
2019 BeginInternalSubTransaction("replay");
2020
2021 /*
2022 * Force invalidations to happen outside of a valid transaction - that way
2023 * entries will just be marked as invalid without accessing the catalog.
2024 * That's advantageous because we don't need to setup the full state
2025 * necessary for catalog access.
2026 */
2027 if (use_subtxn)
2028 AbortCurrentTransaction();
2029
2030 for (i = 0; i < ninvalidations; i++)
2031 LocalExecuteInvalidationMessage(&invalidations[i]);
2032
2033 if (use_subtxn)
2034 RollbackAndReleaseCurrentSubTransaction();
2035 }
2036
2037 /*
2038 * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2039 * least once for every xid in XLogRecord->xl_xid (other places in records
2040 * may, but do not have to be passed through here).
2041 *
2042 * Reorderbuffer keeps some datastructures about transactions in LSN order,
2043 * for efficiency. To do that it has to know about when transactions are seen
2044 * first in the WAL. As many types of records are not actually interesting for
2045 * logical decoding, they do not necessarily pass though here.
2046 */
2047 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2048 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2049 {
2050 /* many records won't have an xid assigned, centralize check here */
2051 if (xid != InvalidTransactionId)
2052 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2053 }
2054
2055 /*
2056 * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2057 * because the previous snapshot doesn't describe the catalog correctly for
2058 * following rows.
2059 */
2060 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2061 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
2062 XLogRecPtr lsn, Snapshot snap)
2063 {
2064 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2065
2066 change->data.snapshot = snap;
2067 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
2068
2069 ReorderBufferQueueChange(rb, xid, lsn, change);
2070 }
2071
2072 /*
2073 * Set up the transaction's base snapshot.
2074 *
2075 * If we know that xid is a subtransaction, set the base snapshot on the
2076 * top-level transaction instead.
2077 */
2078 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2079 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
2080 XLogRecPtr lsn, Snapshot snap)
2081 {
2082 ReorderBufferTXN *txn;
2083 bool is_new;
2084
2085 AssertArg(snap != NULL);
2086
2087 /*
2088 * Fetch the transaction to operate on. If we know it's a subtransaction,
2089 * operate on its top-level transaction instead.
2090 */
2091 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2092 if (txn->is_known_as_subxact)
2093 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2094 NULL, InvalidXLogRecPtr, false);
2095 Assert(txn->base_snapshot == NULL);
2096
2097 txn->base_snapshot = snap;
2098 txn->base_snapshot_lsn = lsn;
2099 dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
2100
2101 AssertTXNLsnOrder(rb);
2102 }
2103
2104 /*
2105 * Access the catalog with this CommandId at this point in the changestream.
2106 *
2107 * May only be called for command ids > 1
2108 */
2109 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)2110 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
2111 XLogRecPtr lsn, CommandId cid)
2112 {
2113 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2114
2115 change->data.command_id = cid;
2116 change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
2117
2118 ReorderBufferQueueChange(rb, xid, lsn, change);
2119 }
2120
2121
2122 /*
2123 * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2124 */
2125 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)2126 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
2127 XLogRecPtr lsn, RelFileNode node,
2128 ItemPointerData tid, CommandId cmin,
2129 CommandId cmax, CommandId combocid)
2130 {
2131 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2132 ReorderBufferTXN *txn;
2133
2134 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2135
2136 change->data.tuplecid.node = node;
2137 change->data.tuplecid.tid = tid;
2138 change->data.tuplecid.cmin = cmin;
2139 change->data.tuplecid.cmax = cmax;
2140 change->data.tuplecid.combocid = combocid;
2141 change->lsn = lsn;
2142 change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
2143
2144 dlist_push_tail(&txn->tuplecids, &change->node);
2145 txn->ntuplecids++;
2146 }
2147
2148 /*
2149 * Setup the invalidation of the toplevel transaction.
2150 *
2151 * This needs to be done before ReorderBufferCommit is called!
2152 */
2153 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)2154 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
2155 XLogRecPtr lsn, Size nmsgs,
2156 SharedInvalidationMessage *msgs)
2157 {
2158 ReorderBufferTXN *txn;
2159
2160 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2161
2162 if (txn->ninvalidations != 0)
2163 elog(ERROR, "only ever add one set of invalidations");
2164
2165 Assert(nmsgs > 0);
2166
2167 txn->ninvalidations = nmsgs;
2168 txn->invalidations = (SharedInvalidationMessage *)
2169 MemoryContextAlloc(rb->context,
2170 sizeof(SharedInvalidationMessage) * nmsgs);
2171 memcpy(txn->invalidations, msgs,
2172 sizeof(SharedInvalidationMessage) * nmsgs);
2173 }
2174
2175 /*
2176 * Apply all invalidations we know. Possibly we only need parts at this point
2177 * in the changestream but we don't know which those are.
2178 */
2179 static void
ReorderBufferExecuteInvalidations(ReorderBuffer * rb,ReorderBufferTXN * txn)2180 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2181 {
2182 int i;
2183
2184 for (i = 0; i < txn->ninvalidations; i++)
2185 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2186 }
2187
2188 /*
2189 * Mark a transaction as containing catalog changes
2190 */
2191 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2192 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
2193 XLogRecPtr lsn)
2194 {
2195 ReorderBufferTXN *txn;
2196
2197 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2198
2199 txn->has_catalog_changes = true;
2200 }
2201
2202 /*
2203 * Query whether a transaction is already *known* to contain catalog
2204 * changes. This can be wrong until directly before the commit!
2205 */
2206 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)2207 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
2208 {
2209 ReorderBufferTXN *txn;
2210
2211 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2212 false);
2213 if (txn == NULL)
2214 return false;
2215
2216 return txn->has_catalog_changes;
2217 }
2218
2219 /*
2220 * ReorderBufferXidHasBaseSnapshot
2221 * Have we already set the base snapshot for the given txn/subtxn?
2222 */
2223 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)2224 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
2225 {
2226 ReorderBufferTXN *txn;
2227
2228 txn = ReorderBufferTXNByXid(rb, xid, false,
2229 NULL, InvalidXLogRecPtr, false);
2230
2231 /* transaction isn't known yet, ergo no snapshot */
2232 if (txn == NULL)
2233 return false;
2234
2235 /* a known subtxn? operate on top-level txn instead */
2236 if (txn->is_known_as_subxact)
2237 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2238 NULL, InvalidXLogRecPtr, false);
2239
2240 return txn->base_snapshot != NULL;
2241 }
2242
2243
2244 /*
2245 * ---------------------------------------
2246 * Disk serialization support
2247 * ---------------------------------------
2248 */
2249
2250 /*
2251 * Ensure the IO buffer is >= sz.
2252 */
2253 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)2254 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2255 {
2256 if (!rb->outbufsize)
2257 {
2258 rb->outbuf = MemoryContextAlloc(rb->context, sz);
2259 rb->outbufsize = sz;
2260 }
2261 else if (rb->outbufsize < sz)
2262 {
2263 rb->outbuf = repalloc(rb->outbuf, sz);
2264 rb->outbufsize = sz;
2265 }
2266 }
2267
2268 /*
2269 * Check whether the transaction tx should spill its data to disk.
2270 */
2271 static void
ReorderBufferCheckSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2272 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2273 {
2274 /*
2275 * TODO: improve accounting so we cheaply can take subtransactions into
2276 * account here.
2277 */
2278 if (txn->nentries_mem >= max_changes_in_memory)
2279 {
2280 ReorderBufferSerializeTXN(rb, txn);
2281 Assert(txn->nentries_mem == 0);
2282 }
2283 }
2284
2285 /*
2286 * Spill data of a large transaction (and its subtransactions) to disk.
2287 */
2288 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2289 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2290 {
2291 dlist_iter subtxn_i;
2292 dlist_mutable_iter change_i;
2293 int fd = -1;
2294 XLogSegNo curOpenSegNo = 0;
2295 Size spilled = 0;
2296
2297 elog(DEBUG2, "spill %u changes in XID %u to disk",
2298 (uint32) txn->nentries_mem, txn->xid);
2299
2300 /* do the same to all child TXs */
2301 dlist_foreach(subtxn_i, &txn->subtxns)
2302 {
2303 ReorderBufferTXN *subtxn;
2304
2305 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2306 ReorderBufferSerializeTXN(rb, subtxn);
2307 }
2308
2309 /* serialize changestream */
2310 dlist_foreach_modify(change_i, &txn->changes)
2311 {
2312 ReorderBufferChange *change;
2313
2314 change = dlist_container(ReorderBufferChange, node, change_i.cur);
2315
2316 /*
2317 * store in segment in which it belongs by start lsn, don't split over
2318 * multiple segments tho
2319 */
2320 if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2321 {
2322 char path[MAXPGPATH];
2323
2324 if (fd != -1)
2325 CloseTransientFile(fd);
2326
2327 XLByteToSeg(change->lsn, curOpenSegNo);
2328
2329 /*
2330 * No need to care about TLIs here, only used during a single run,
2331 * so each LSN only maps to a specific WAL record.
2332 */
2333 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2334 curOpenSegNo);
2335
2336 /* open segment, create it if necessary */
2337 fd = OpenTransientFile(path,
2338 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2339 S_IRUSR | S_IWUSR);
2340
2341 if (fd < 0)
2342 ereport(ERROR,
2343 (errcode_for_file_access(),
2344 errmsg("could not open file \"%s\": %m", path)));
2345 }
2346
2347 ReorderBufferSerializeChange(rb, txn, fd, change);
2348 dlist_delete(&change->node);
2349 ReorderBufferReturnChange(rb, change);
2350
2351 spilled++;
2352 }
2353
2354 Assert(spilled == txn->nentries_mem);
2355 Assert(dlist_is_empty(&txn->changes));
2356 txn->nentries_mem = 0;
2357 txn->serialized = true;
2358
2359 if (fd != -1)
2360 CloseTransientFile(fd);
2361 }
2362
2363 /*
2364 * Serialize individual change to disk.
2365 */
2366 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)2367 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2368 int fd, ReorderBufferChange *change)
2369 {
2370 ReorderBufferDiskChange *ondisk;
2371 Size sz = sizeof(ReorderBufferDiskChange);
2372
2373 ReorderBufferSerializeReserve(rb, sz);
2374
2375 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2376 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2377
2378 switch (change->action)
2379 {
2380 /* fall through these, they're all similar enough */
2381 case REORDER_BUFFER_CHANGE_INSERT:
2382 case REORDER_BUFFER_CHANGE_UPDATE:
2383 case REORDER_BUFFER_CHANGE_DELETE:
2384 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2385 {
2386 char *data;
2387 ReorderBufferTupleBuf *oldtup,
2388 *newtup;
2389 Size oldlen = 0;
2390 Size newlen = 0;
2391
2392 oldtup = change->data.tp.oldtuple;
2393 newtup = change->data.tp.newtuple;
2394
2395 if (oldtup)
2396 {
2397 sz += sizeof(HeapTupleData);
2398 oldlen = oldtup->tuple.t_len;
2399 sz += oldlen;
2400 }
2401
2402 if (newtup)
2403 {
2404 sz += sizeof(HeapTupleData);
2405 newlen = newtup->tuple.t_len;
2406 sz += newlen;
2407 }
2408
2409 /* make sure we have enough space */
2410 ReorderBufferSerializeReserve(rb, sz);
2411
2412 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2413 /* might have been reallocated above */
2414 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2415
2416 if (oldlen)
2417 {
2418 memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2419 data += sizeof(HeapTupleData);
2420
2421 memcpy(data, oldtup->tuple.t_data, oldlen);
2422 data += oldlen;
2423 }
2424
2425 if (newlen)
2426 {
2427 memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2428 data += sizeof(HeapTupleData);
2429
2430 memcpy(data, newtup->tuple.t_data, newlen);
2431 data += newlen;
2432 }
2433 break;
2434 }
2435 case REORDER_BUFFER_CHANGE_MESSAGE:
2436 {
2437 char *data;
2438 Size prefix_size = strlen(change->data.msg.prefix) + 1;
2439
2440 sz += prefix_size + change->data.msg.message_size +
2441 sizeof(Size) + sizeof(Size);
2442 ReorderBufferSerializeReserve(rb, sz);
2443
2444 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2445
2446 /* might have been reallocated above */
2447 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2448
2449 /* write the prefix including the size */
2450 memcpy(data, &prefix_size, sizeof(Size));
2451 data += sizeof(Size);
2452 memcpy(data, change->data.msg.prefix,
2453 prefix_size);
2454 data += prefix_size;
2455
2456 /* write the message including the size */
2457 memcpy(data, &change->data.msg.message_size, sizeof(Size));
2458 data += sizeof(Size);
2459 memcpy(data, change->data.msg.message,
2460 change->data.msg.message_size);
2461 data += change->data.msg.message_size;
2462
2463 break;
2464 }
2465 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2466 {
2467 Snapshot snap;
2468 char *data;
2469
2470 snap = change->data.snapshot;
2471
2472 sz += sizeof(SnapshotData) +
2473 sizeof(TransactionId) * snap->xcnt +
2474 sizeof(TransactionId) * snap->subxcnt;
2475
2476 /* make sure we have enough space */
2477 ReorderBufferSerializeReserve(rb, sz);
2478 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2479 /* might have been reallocated above */
2480 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2481
2482 memcpy(data, snap, sizeof(SnapshotData));
2483 data += sizeof(SnapshotData);
2484
2485 if (snap->xcnt)
2486 {
2487 memcpy(data, snap->xip,
2488 sizeof(TransactionId) * snap->xcnt);
2489 data += sizeof(TransactionId) * snap->xcnt;
2490 }
2491
2492 if (snap->subxcnt)
2493 {
2494 memcpy(data, snap->subxip,
2495 sizeof(TransactionId) * snap->subxcnt);
2496 data += sizeof(TransactionId) * snap->subxcnt;
2497 }
2498 break;
2499 }
2500 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2501 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2502 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2503 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2504 /* ReorderBufferChange contains everything important */
2505 break;
2506 }
2507
2508 ondisk->size = sz;
2509
2510 errno = 0;
2511 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2512 {
2513 int save_errno = errno;
2514
2515 CloseTransientFile(fd);
2516
2517 /* if write didn't set errno, assume problem is no disk space */
2518 errno = save_errno ? save_errno : ENOSPC;
2519 ereport(ERROR,
2520 (errcode_for_file_access(),
2521 errmsg("could not write to data file for XID %u: %m",
2522 txn->xid)));
2523 }
2524
2525 /*
2526 * Keep the transaction's final_lsn up to date with each change we send to
2527 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
2528 * only do this on commit and abort records, but that doesn't work if a
2529 * system crash leaves a transaction without its abort record).
2530 *
2531 * Make sure not to move it backwards.
2532 */
2533 if (txn->final_lsn < change->lsn)
2534 txn->final_lsn = change->lsn;
2535
2536 Assert(ondisk->change.action == change->action);
2537 }
2538
2539 /*
2540 * Restore a number of changes spilled to disk back into memory.
2541 */
2542 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,File * fd,XLogSegNo * segno)2543 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2544 File *fd, XLogSegNo *segno)
2545 {
2546 Size restored = 0;
2547 XLogSegNo last_segno;
2548 dlist_mutable_iter cleanup_iter;
2549
2550 Assert(txn->first_lsn != InvalidXLogRecPtr);
2551 Assert(txn->final_lsn != InvalidXLogRecPtr);
2552
2553 /* free current entries, so we have memory for more */
2554 dlist_foreach_modify(cleanup_iter, &txn->changes)
2555 {
2556 ReorderBufferChange *cleanup =
2557 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2558
2559 dlist_delete(&cleanup->node);
2560 ReorderBufferReturnChange(rb, cleanup);
2561 }
2562 txn->nentries_mem = 0;
2563 Assert(dlist_is_empty(&txn->changes));
2564
2565 XLByteToSeg(txn->final_lsn, last_segno);
2566
2567 while (restored < max_changes_in_memory && *segno <= last_segno)
2568 {
2569 int readBytes;
2570 ReorderBufferDiskChange *ondisk;
2571
2572 if (*fd == -1)
2573 {
2574 char path[MAXPGPATH];
2575
2576 /* first time in */
2577 if (*segno == 0)
2578 XLByteToSeg(txn->first_lsn, *segno);
2579
2580 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2581
2582 /*
2583 * No need to care about TLIs here, only used during a single run,
2584 * so each LSN only maps to a specific WAL record.
2585 */
2586 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2587 *segno);
2588
2589 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0);
2590 if (*fd < 0 && errno == ENOENT)
2591 {
2592 *fd = -1;
2593 (*segno)++;
2594 continue;
2595 }
2596 else if (*fd < 0)
2597 ereport(ERROR,
2598 (errcode_for_file_access(),
2599 errmsg("could not open file \"%s\": %m",
2600 path)));
2601
2602 }
2603
2604 /*
2605 * Read the statically sized part of a change which has information
2606 * about the total size. If we couldn't read a record, we're at the
2607 * end of this file.
2608 */
2609 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2610 readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2611
2612 /* eof */
2613 if (readBytes == 0)
2614 {
2615 FileClose(*fd);
2616 *fd = -1;
2617 (*segno)++;
2618 continue;
2619 }
2620 else if (readBytes < 0)
2621 ereport(ERROR,
2622 (errcode_for_file_access(),
2623 errmsg("could not read from reorderbuffer spill file: %m")));
2624 else if (readBytes != sizeof(ReorderBufferDiskChange))
2625 ereport(ERROR,
2626 (errcode_for_file_access(),
2627 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2628 readBytes,
2629 (uint32) sizeof(ReorderBufferDiskChange))));
2630
2631 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2632
2633 ReorderBufferSerializeReserve(rb,
2634 sizeof(ReorderBufferDiskChange) + ondisk->size);
2635 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2636
2637 readBytes = FileRead(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2638 ondisk->size - sizeof(ReorderBufferDiskChange));
2639
2640 if (readBytes < 0)
2641 ereport(ERROR,
2642 (errcode_for_file_access(),
2643 errmsg("could not read from reorderbuffer spill file: %m")));
2644 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2645 ereport(ERROR,
2646 (errcode_for_file_access(),
2647 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2648 readBytes,
2649 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2650
2651 /*
2652 * ok, read a full change from disk, now restore it into proper
2653 * in-memory format
2654 */
2655 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2656 restored++;
2657 }
2658
2659 return restored;
2660 }
2661
2662 /*
2663 * Convert change from its on-disk format to in-memory format and queue it onto
2664 * the TXN's ->changes list.
2665 *
2666 * Note: although "data" is declared char*, at entry it points to a
2667 * maxalign'd buffer, making it safe in most of this function to assume
2668 * that the pointed-to data is suitably aligned for direct access.
2669 */
2670 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)2671 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2672 char *data)
2673 {
2674 ReorderBufferDiskChange *ondisk;
2675 ReorderBufferChange *change;
2676
2677 ondisk = (ReorderBufferDiskChange *) data;
2678
2679 change = ReorderBufferGetChange(rb);
2680
2681 /* copy static part */
2682 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2683
2684 data += sizeof(ReorderBufferDiskChange);
2685
2686 /* restore individual stuff */
2687 switch (change->action)
2688 {
2689 /* fall through these, they're all similar enough */
2690 case REORDER_BUFFER_CHANGE_INSERT:
2691 case REORDER_BUFFER_CHANGE_UPDATE:
2692 case REORDER_BUFFER_CHANGE_DELETE:
2693 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2694 if (change->data.tp.oldtuple)
2695 {
2696 uint32 tuplelen = ((HeapTuple) data)->t_len;
2697
2698 change->data.tp.oldtuple =
2699 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2700
2701 /* restore ->tuple */
2702 memcpy(&change->data.tp.oldtuple->tuple, data,
2703 sizeof(HeapTupleData));
2704 data += sizeof(HeapTupleData);
2705
2706 /* reset t_data pointer into the new tuplebuf */
2707 change->data.tp.oldtuple->tuple.t_data =
2708 ReorderBufferTupleBufData(change->data.tp.oldtuple);
2709
2710 /* restore tuple data itself */
2711 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2712 data += tuplelen;
2713 }
2714
2715 if (change->data.tp.newtuple)
2716 {
2717 /* here, data might not be suitably aligned! */
2718 uint32 tuplelen;
2719
2720 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2721 sizeof(uint32));
2722
2723 change->data.tp.newtuple =
2724 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2725
2726 /* restore ->tuple */
2727 memcpy(&change->data.tp.newtuple->tuple, data,
2728 sizeof(HeapTupleData));
2729 data += sizeof(HeapTupleData);
2730
2731 /* reset t_data pointer into the new tuplebuf */
2732 change->data.tp.newtuple->tuple.t_data =
2733 ReorderBufferTupleBufData(change->data.tp.newtuple);
2734
2735 /* restore tuple data itself */
2736 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2737 data += tuplelen;
2738 }
2739
2740 break;
2741 case REORDER_BUFFER_CHANGE_MESSAGE:
2742 {
2743 Size prefix_size;
2744
2745 /* read prefix */
2746 memcpy(&prefix_size, data, sizeof(Size));
2747 data += sizeof(Size);
2748 change->data.msg.prefix = MemoryContextAlloc(rb->context,
2749 prefix_size);
2750 memcpy(change->data.msg.prefix, data, prefix_size);
2751 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2752 data += prefix_size;
2753
2754 /* read the message */
2755 memcpy(&change->data.msg.message_size, data, sizeof(Size));
2756 data += sizeof(Size);
2757 change->data.msg.message = MemoryContextAlloc(rb->context,
2758 change->data.msg.message_size);
2759 memcpy(change->data.msg.message, data,
2760 change->data.msg.message_size);
2761 data += change->data.msg.message_size;
2762
2763 break;
2764 }
2765 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2766 {
2767 Snapshot oldsnap;
2768 Snapshot newsnap;
2769 Size size;
2770
2771 oldsnap = (Snapshot) data;
2772
2773 size = sizeof(SnapshotData) +
2774 sizeof(TransactionId) * oldsnap->xcnt +
2775 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2776
2777 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2778
2779 newsnap = change->data.snapshot;
2780
2781 memcpy(newsnap, data, size);
2782 newsnap->xip = (TransactionId *)
2783 (((char *) newsnap) + sizeof(SnapshotData));
2784 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2785 newsnap->copied = true;
2786 break;
2787 }
2788 /* the base struct contains all the data, easy peasy */
2789 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2790 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2791 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2792 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2793 break;
2794 }
2795
2796 dlist_push_tail(&txn->changes, &change->node);
2797 txn->nentries_mem++;
2798 }
2799
2800 /*
2801 * Remove all on-disk stored for the passed in transaction.
2802 */
2803 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)2804 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2805 {
2806 XLogSegNo first;
2807 XLogSegNo cur;
2808 XLogSegNo last;
2809
2810 Assert(txn->first_lsn != InvalidXLogRecPtr);
2811 Assert(txn->final_lsn != InvalidXLogRecPtr);
2812
2813 XLByteToSeg(txn->first_lsn, first);
2814 XLByteToSeg(txn->final_lsn, last);
2815
2816 /* iterate over all possible filenames, and delete them */
2817 for (cur = first; cur <= last; cur++)
2818 {
2819 char path[MAXPGPATH];
2820
2821 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
2822 if (unlink(path) != 0 && errno != ENOENT)
2823 ereport(ERROR,
2824 (errcode_for_file_access(),
2825 errmsg("could not remove file \"%s\": %m", path)));
2826 }
2827 }
2828
2829 /*
2830 * Remove any leftover serialized reorder buffers from a slot directory after a
2831 * prior crash or decoding session exit.
2832 */
2833 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)2834 ReorderBufferCleanupSerializedTXNs(const char *slotname)
2835 {
2836 DIR *spill_dir;
2837 struct dirent *spill_de;
2838 struct stat statbuf;
2839 char path[MAXPGPATH * 2 + 12];
2840
2841 sprintf(path, "pg_replslot/%s", slotname);
2842
2843 /* we're only handling directories here, skip if it's not ours */
2844 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2845 return;
2846
2847 spill_dir = AllocateDir(path);
2848 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2849 {
2850 /* only look at names that can be ours */
2851 if (strncmp(spill_de->d_name, "xid", 3) == 0)
2852 {
2853 snprintf(path, sizeof(path),
2854 "pg_replslot/%s/%s", slotname,
2855 spill_de->d_name);
2856
2857 if (unlink(path) != 0)
2858 ereport(ERROR,
2859 (errcode_for_file_access(),
2860 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2861 path, slotname)));
2862 }
2863 }
2864 FreeDir(spill_dir);
2865 }
2866
2867 /*
2868 * Given a replication slot, transaction ID and segment number, fill in the
2869 * corresponding spill file into 'path', which is a caller-owned buffer of size
2870 * at least MAXPGPATH.
2871 */
2872 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)2873 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2874 XLogSegNo segno)
2875 {
2876 XLogRecPtr recptr;
2877
2878 XLogSegNoOffsetToRecPtr(segno, 0, recptr);
2879
2880 snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2881 NameStr(MyReplicationSlot->data.name),
2882 xid,
2883 (uint32) (recptr >> 32), (uint32) recptr);
2884 }
2885
2886 /*
2887 * Delete all data spilled to disk after we've restarted/crashed. It will be
2888 * recreated when the respective slots are reused.
2889 */
2890 void
StartupReorderBuffer(void)2891 StartupReorderBuffer(void)
2892 {
2893 DIR *logical_dir;
2894 struct dirent *logical_de;
2895
2896 logical_dir = AllocateDir("pg_replslot");
2897 while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2898 {
2899 if (strcmp(logical_de->d_name, ".") == 0 ||
2900 strcmp(logical_de->d_name, "..") == 0)
2901 continue;
2902
2903 /* if it cannot be a slot, skip the directory */
2904 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2905 continue;
2906
2907 /*
2908 * ok, has to be a surviving logical slot, iterate and delete
2909 * everything starting with xid-*
2910 */
2911 ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
2912 }
2913 FreeDir(logical_dir);
2914 }
2915
2916 /* ---------------------------------------
2917 * toast reassembly support
2918 * ---------------------------------------
2919 */
2920
2921 /*
2922 * Initialize per tuple toast reconstruction support.
2923 */
2924 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)2925 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2926 {
2927 HASHCTL hash_ctl;
2928
2929 Assert(txn->toast_hash == NULL);
2930
2931 memset(&hash_ctl, 0, sizeof(hash_ctl));
2932 hash_ctl.keysize = sizeof(Oid);
2933 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2934 hash_ctl.hcxt = rb->context;
2935 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2936 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2937 }
2938
2939 /*
2940 * Per toast-chunk handling for toast reconstruction
2941 *
2942 * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2943 * toasted Datum comes along.
2944 */
2945 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)2946 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2947 Relation relation, ReorderBufferChange *change)
2948 {
2949 ReorderBufferToastEnt *ent;
2950 ReorderBufferTupleBuf *newtup;
2951 bool found;
2952 int32 chunksize;
2953 bool isnull;
2954 Pointer chunk;
2955 TupleDesc desc = RelationGetDescr(relation);
2956 Oid chunk_id;
2957 int32 chunk_seq;
2958
2959 if (txn->toast_hash == NULL)
2960 ReorderBufferToastInitHash(rb, txn);
2961
2962 Assert(IsToastRelation(relation));
2963
2964 newtup = change->data.tp.newtuple;
2965 chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2966 Assert(!isnull);
2967 chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2968 Assert(!isnull);
2969
2970 ent = (ReorderBufferToastEnt *)
2971 hash_search(txn->toast_hash,
2972 (void *) &chunk_id,
2973 HASH_ENTER,
2974 &found);
2975
2976 if (!found)
2977 {
2978 Assert(ent->chunk_id == chunk_id);
2979 ent->num_chunks = 0;
2980 ent->last_chunk_seq = 0;
2981 ent->size = 0;
2982 ent->reconstructed = NULL;
2983 dlist_init(&ent->chunks);
2984
2985 if (chunk_seq != 0)
2986 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2987 chunk_seq, chunk_id);
2988 }
2989 else if (found && chunk_seq != ent->last_chunk_seq + 1)
2990 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2991 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2992
2993 chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2994 Assert(!isnull);
2995
2996 /* calculate size so we can allocate the right size at once later */
2997 if (!VARATT_IS_EXTENDED(chunk))
2998 chunksize = VARSIZE(chunk) - VARHDRSZ;
2999 else if (VARATT_IS_SHORT(chunk))
3000 /* could happen due to heap_form_tuple doing its thing */
3001 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3002 else
3003 elog(ERROR, "unexpected type of toast chunk");
3004
3005 ent->size += chunksize;
3006 ent->last_chunk_seq = chunk_seq;
3007 ent->num_chunks++;
3008 dlist_push_tail(&ent->chunks, &change->node);
3009 }
3010
3011 /*
3012 * Rejigger change->newtuple to point to in-memory toast tuples instead to
3013 * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3014 *
3015 * We cannot replace unchanged toast tuples though, so those will still point
3016 * to on-disk toast data.
3017 */
3018 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)3019 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
3020 Relation relation, ReorderBufferChange *change)
3021 {
3022 TupleDesc desc;
3023 int natt;
3024 Datum *attrs;
3025 bool *isnull;
3026 bool *free;
3027 HeapTuple tmphtup;
3028 Relation toast_rel;
3029 TupleDesc toast_desc;
3030 MemoryContext oldcontext;
3031 ReorderBufferTupleBuf *newtup;
3032
3033 /* no toast tuples changed */
3034 if (txn->toast_hash == NULL)
3035 return;
3036
3037 oldcontext = MemoryContextSwitchTo(rb->context);
3038
3039 /* we should only have toast tuples in an INSERT or UPDATE */
3040 Assert(change->data.tp.newtuple);
3041
3042 desc = RelationGetDescr(relation);
3043
3044 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3045 if (!RelationIsValid(toast_rel))
3046 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
3047 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
3048
3049 toast_desc = RelationGetDescr(toast_rel);
3050
3051 /* should we allocate from stack instead? */
3052 attrs = palloc0(sizeof(Datum) * desc->natts);
3053 isnull = palloc0(sizeof(bool) * desc->natts);
3054 free = palloc0(sizeof(bool) * desc->natts);
3055
3056 newtup = change->data.tp.newtuple;
3057
3058 heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3059
3060 for (natt = 0; natt < desc->natts; natt++)
3061 {
3062 Form_pg_attribute attr = desc->attrs[natt];
3063 ReorderBufferToastEnt *ent;
3064 struct varlena *varlena;
3065
3066 /* va_rawsize is the size of the original datum -- including header */
3067 struct varatt_external toast_pointer;
3068 struct varatt_indirect redirect_pointer;
3069 struct varlena *new_datum = NULL;
3070 struct varlena *reconstructed;
3071 dlist_iter it;
3072 Size data_done = 0;
3073
3074 /* system columns aren't toasted */
3075 if (attr->attnum < 0)
3076 continue;
3077
3078 if (attr->attisdropped)
3079 continue;
3080
3081 /* not a varlena datatype */
3082 if (attr->attlen != -1)
3083 continue;
3084
3085 /* no data */
3086 if (isnull[natt])
3087 continue;
3088
3089 /* ok, we know we have a toast datum */
3090 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3091
3092 /* no need to do anything if the tuple isn't external */
3093 if (!VARATT_IS_EXTERNAL(varlena))
3094 continue;
3095
3096 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3097
3098 /*
3099 * Check whether the toast tuple changed, replace if so.
3100 */
3101 ent = (ReorderBufferToastEnt *)
3102 hash_search(txn->toast_hash,
3103 (void *) &toast_pointer.va_valueid,
3104 HASH_FIND,
3105 NULL);
3106 if (ent == NULL)
3107 continue;
3108
3109 new_datum =
3110 (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3111
3112 free[natt] = true;
3113
3114 reconstructed = palloc0(toast_pointer.va_rawsize);
3115
3116 ent->reconstructed = reconstructed;
3117
3118 /* stitch toast tuple back together from its parts */
3119 dlist_foreach(it, &ent->chunks)
3120 {
3121 bool isnull;
3122 ReorderBufferChange *cchange;
3123 ReorderBufferTupleBuf *ctup;
3124 Pointer chunk;
3125
3126 cchange = dlist_container(ReorderBufferChange, node, it.cur);
3127 ctup = cchange->data.tp.newtuple;
3128 chunk = DatumGetPointer(
3129 fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3130
3131 Assert(!isnull);
3132 Assert(!VARATT_IS_EXTERNAL(chunk));
3133 Assert(!VARATT_IS_SHORT(chunk));
3134
3135 memcpy(VARDATA(reconstructed) + data_done,
3136 VARDATA(chunk),
3137 VARSIZE(chunk) - VARHDRSZ);
3138 data_done += VARSIZE(chunk) - VARHDRSZ;
3139 }
3140 Assert(data_done == toast_pointer.va_extsize);
3141
3142 /* make sure its marked as compressed or not */
3143 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3144 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3145 else
3146 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3147
3148 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3149 redirect_pointer.pointer = reconstructed;
3150
3151 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
3152 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3153 sizeof(redirect_pointer));
3154
3155 attrs[natt] = PointerGetDatum(new_datum);
3156 }
3157
3158 /*
3159 * Build tuple in separate memory & copy tuple back into the tuplebuf
3160 * passed to the output plugin. We can't directly heap_fill_tuple() into
3161 * the tuplebuf because attrs[] will point back into the current content.
3162 */
3163 tmphtup = heap_form_tuple(desc, attrs, isnull);
3164 Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3165 Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3166
3167 memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3168 newtup->tuple.t_len = tmphtup->t_len;
3169
3170 /*
3171 * free resources we won't further need, more persistent stuff will be
3172 * free'd in ReorderBufferToastReset().
3173 */
3174 RelationClose(toast_rel);
3175 pfree(tmphtup);
3176 for (natt = 0; natt < desc->natts; natt++)
3177 {
3178 if (free[natt])
3179 pfree(DatumGetPointer(attrs[natt]));
3180 }
3181 pfree(attrs);
3182 pfree(free);
3183 pfree(isnull);
3184
3185 MemoryContextSwitchTo(oldcontext);
3186 }
3187
3188 /*
3189 * Free all resources allocated for toast reconstruction.
3190 */
3191 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)3192 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
3193 {
3194 HASH_SEQ_STATUS hstat;
3195 ReorderBufferToastEnt *ent;
3196
3197 if (txn->toast_hash == NULL)
3198 return;
3199
3200 /* sequentially walk over the hash and free everything */
3201 hash_seq_init(&hstat, txn->toast_hash);
3202 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3203 {
3204 dlist_mutable_iter it;
3205
3206 if (ent->reconstructed != NULL)
3207 pfree(ent->reconstructed);
3208
3209 dlist_foreach_modify(it, &ent->chunks)
3210 {
3211 ReorderBufferChange *change =
3212 dlist_container(ReorderBufferChange, node, it.cur);
3213
3214 dlist_delete(&change->node);
3215 ReorderBufferReturnChange(rb, change);
3216 }
3217 }
3218
3219 hash_destroy(txn->toast_hash);
3220 txn->toast_hash = NULL;
3221 }
3222
3223
3224 /* ---------------------------------------
3225 * Visibility support for logical decoding
3226 *
3227 *
3228 * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3229 * always rely on stored cmin/cmax values because of two scenarios:
3230 *
3231 * * A tuple got changed multiple times during a single transaction and thus
3232 * has got a combocid. Combocid's are only valid for the duration of a
3233 * single transaction.
3234 * * A tuple with a cmin but no cmax (and thus no combocid) got
3235 * deleted/updated in another transaction than the one which created it
3236 * which we are looking at right now. As only one of cmin, cmax or combocid
3237 * is actually stored in the heap we don't have access to the value we
3238 * need anymore.
3239 *
3240 * To resolve those problems we have a per-transaction hash of (cmin,
3241 * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3242 * (cmin, cmax) values. That also takes care of combocids by simply
3243 * not caring about them at all. As we have the real cmin/cmax values
3244 * combocids aren't interesting.
3245 *
3246 * As we only care about catalog tuples here the overhead of this
3247 * hashtable should be acceptable.
3248 *
3249 * Heap rewrites complicate this a bit, check rewriteheap.c for
3250 * details.
3251 * -------------------------------------------------------------------------
3252 */
3253
3254 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
3255 typedef struct RewriteMappingFile
3256 {
3257 XLogRecPtr lsn;
3258 char fname[MAXPGPATH];
3259 } RewriteMappingFile;
3260
3261 #if NOT_USED
3262 static void
DisplayMapping(HTAB * tuplecid_data)3263 DisplayMapping(HTAB *tuplecid_data)
3264 {
3265 HASH_SEQ_STATUS hstat;
3266 ReorderBufferTupleCidEnt *ent;
3267
3268 hash_seq_init(&hstat, tuplecid_data);
3269 while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3270 {
3271 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3272 ent->key.relnode.dbNode,
3273 ent->key.relnode.spcNode,
3274 ent->key.relnode.relNode,
3275 BlockIdGetBlockNumber(&ent->key.tid.ip_blkid),
3276 ent->key.tid.ip_posid,
3277 ent->cmin,
3278 ent->cmax
3279 );
3280 }
3281 }
3282 #endif
3283
3284 /*
3285 * Apply a single mapping file to tuplecid_data.
3286 *
3287 * The mapping file has to have been verified to be a) committed b) for our
3288 * transaction c) applied in LSN order.
3289 */
3290 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)3291 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3292 {
3293 char path[MAXPGPATH];
3294 int fd;
3295 int readBytes;
3296 LogicalRewriteMappingData map;
3297
3298 sprintf(path, "pg_logical/mappings/%s", fname);
3299 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3300 if (fd < 0)
3301 ereport(ERROR,
3302 (errcode_for_file_access(),
3303 errmsg("could not open file \"%s\": %m", path)));
3304
3305 while (true)
3306 {
3307 ReorderBufferTupleCidKey key;
3308 ReorderBufferTupleCidEnt *ent;
3309 ReorderBufferTupleCidEnt *new_ent;
3310 bool found;
3311
3312 /* be careful about padding */
3313 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3314
3315 /* read all mappings till the end of the file */
3316 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3317
3318 if (readBytes < 0)
3319 ereport(ERROR,
3320 (errcode_for_file_access(),
3321 errmsg("could not read file \"%s\": %m",
3322 path)));
3323 else if (readBytes == 0) /* EOF */
3324 break;
3325 else if (readBytes != sizeof(LogicalRewriteMappingData))
3326 ereport(ERROR,
3327 (errcode_for_file_access(),
3328 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3329 path, readBytes,
3330 (int32) sizeof(LogicalRewriteMappingData))));
3331
3332 key.relnode = map.old_node;
3333 ItemPointerCopy(&map.old_tid,
3334 &key.tid);
3335
3336
3337 ent = (ReorderBufferTupleCidEnt *)
3338 hash_search(tuplecid_data,
3339 (void *) &key,
3340 HASH_FIND,
3341 NULL);
3342
3343 /* no existing mapping, no need to update */
3344 if (!ent)
3345 continue;
3346
3347 key.relnode = map.new_node;
3348 ItemPointerCopy(&map.new_tid,
3349 &key.tid);
3350
3351 new_ent = (ReorderBufferTupleCidEnt *)
3352 hash_search(tuplecid_data,
3353 (void *) &key,
3354 HASH_ENTER,
3355 &found);
3356
3357 if (found)
3358 {
3359 /*
3360 * Make sure the existing mapping makes sense. We sometime update
3361 * old records that did not yet have a cmax (e.g. pg_class' own
3362 * entry while rewriting it) during rewrites, so allow that.
3363 */
3364 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3365 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3366 }
3367 else
3368 {
3369 /* update mapping */
3370 new_ent->cmin = ent->cmin;
3371 new_ent->cmax = ent->cmax;
3372 new_ent->combocid = ent->combocid;
3373 }
3374 }
3375
3376 CloseTransientFile(fd);
3377 }
3378
3379
3380 /*
3381 * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3382 */
3383 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)3384 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3385 {
3386 return bsearch(&xid, xip, num,
3387 sizeof(TransactionId), xidComparator) != NULL;
3388 }
3389
3390 /*
3391 * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3392 */
3393 static int
file_sort_by_lsn(const void * a_p,const void * b_p)3394 file_sort_by_lsn(const void *a_p, const void *b_p)
3395 {
3396 RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3397 RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3398
3399 if (a->lsn < b->lsn)
3400 return -1;
3401 else if (a->lsn > b->lsn)
3402 return 1;
3403 return 0;
3404 }
3405
3406 /*
3407 * Apply any existing logical remapping files if there are any targeted at our
3408 * transaction for relid.
3409 */
3410 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)3411 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3412 {
3413 DIR *mapping_dir;
3414 struct dirent *mapping_de;
3415 List *files = NIL;
3416 ListCell *file;
3417 RewriteMappingFile **files_a;
3418 size_t off;
3419 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3420
3421 mapping_dir = AllocateDir("pg_logical/mappings");
3422 while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3423 {
3424 Oid f_dboid;
3425 Oid f_relid;
3426 TransactionId f_mapped_xid;
3427 TransactionId f_create_xid;
3428 XLogRecPtr f_lsn;
3429 uint32 f_hi,
3430 f_lo;
3431 RewriteMappingFile *f;
3432
3433 if (strcmp(mapping_de->d_name, ".") == 0 ||
3434 strcmp(mapping_de->d_name, "..") == 0)
3435 continue;
3436
3437 /* Ignore files that aren't ours */
3438 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3439 continue;
3440
3441 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3442 &f_dboid, &f_relid, &f_hi, &f_lo,
3443 &f_mapped_xid, &f_create_xid) != 6)
3444 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3445
3446 f_lsn = ((uint64) f_hi) << 32 | f_lo;
3447
3448 /* mapping for another database */
3449 if (f_dboid != dboid)
3450 continue;
3451
3452 /* mapping for another relation */
3453 if (f_relid != relid)
3454 continue;
3455
3456 /* did the creating transaction abort? */
3457 if (!TransactionIdDidCommit(f_create_xid))
3458 continue;
3459
3460 /* not for our transaction */
3461 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3462 continue;
3463
3464 /* ok, relevant, queue for apply */
3465 f = palloc(sizeof(RewriteMappingFile));
3466 f->lsn = f_lsn;
3467 strcpy(f->fname, mapping_de->d_name);
3468 files = lappend(files, f);
3469 }
3470 FreeDir(mapping_dir);
3471
3472 /* build array we can easily sort */
3473 files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3474 off = 0;
3475 foreach(file, files)
3476 {
3477 files_a[off++] = lfirst(file);
3478 }
3479
3480 /* sort files so we apply them in LSN order */
3481 qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3482 file_sort_by_lsn);
3483
3484 for (off = 0; off < list_length(files); off++)
3485 {
3486 RewriteMappingFile *f = files_a[off];
3487
3488 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3489 snapshot->subxip[0]);
3490 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3491 pfree(f);
3492 }
3493 }
3494
3495 /*
3496 * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3497 * combocids.
3498 */
3499 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)3500 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3501 Snapshot snapshot,
3502 HeapTuple htup, Buffer buffer,
3503 CommandId *cmin, CommandId *cmax)
3504 {
3505 ReorderBufferTupleCidKey key;
3506 ReorderBufferTupleCidEnt *ent;
3507 ForkNumber forkno;
3508 BlockNumber blockno;
3509 bool updated_mapping = false;
3510
3511 /* be careful about padding */
3512 memset(&key, 0, sizeof(key));
3513
3514 Assert(!BufferIsLocal(buffer));
3515
3516 /*
3517 * get relfilenode from the buffer, no convenient way to access it other
3518 * than that.
3519 */
3520 BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3521
3522 /* tuples can only be in the main fork */
3523 Assert(forkno == MAIN_FORKNUM);
3524 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3525
3526 ItemPointerCopy(&htup->t_self,
3527 &key.tid);
3528
3529 restart:
3530 ent = (ReorderBufferTupleCidEnt *)
3531 hash_search(tuplecid_data,
3532 (void *) &key,
3533 HASH_FIND,
3534 NULL);
3535
3536 /*
3537 * failed to find a mapping, check whether the table was rewritten and
3538 * apply mapping if so, but only do that once - there can be no new
3539 * mappings while we are in here since we have to hold a lock on the
3540 * relation.
3541 */
3542 if (ent == NULL && !updated_mapping)
3543 {
3544 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3545 /* now check but don't update for a mapping again */
3546 updated_mapping = true;
3547 goto restart;
3548 }
3549 else if (ent == NULL)
3550 return false;
3551
3552 if (cmin)
3553 *cmin = ent->cmin;
3554 if (cmax)
3555 *cmax = ent->cmax;
3556 return true;
3557 }
3558