1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *	  PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *	  This module gets handed individual pieces of transactions in the order
15  *	  they are written to the WAL and is responsible to reassemble them into
16  *	  toplevel transaction sized pieces. When a transaction is completely
17  *	  reassembled - signaled by reading the transaction commit record - it
18  *	  will then call the output plugin (cf. ReorderBufferCommit()) with the
19  *	  individual changes. The output plugins rely on snapshots built by
20  *	  snapbuild.c which hands them to us.
21  *
22  *	  Transactions and subtransactions/savepoints in postgres are not
23  *	  immediately linked to each other from outside the performing
24  *	  backend. Only at commit/abort (or special xact_assignment records) they
25  *	  are linked together. Which means that we will have to splice together a
26  *	  toplevel transaction from its subtransactions. To do that efficiently we
27  *	  build a binary heap indexed by the smallest current lsn of the individual
28  *	  subtransactions' changestreams. As the individual streams are inherently
29  *	  ordered by LSN - since that is where we build them from - the transaction
30  *	  can easily be reassembled by always using the subtransaction with the
31  *	  smallest current LSN from the heap.
32  *
33  *	  In order to cope with large transactions - which can be several times as
34  *	  big as the available memory - this module supports spooling the contents
35  *	  of a large transactions to disk. When the transaction is replayed the
36  *	  contents of individual (sub-)transactions will be read from disk in
37  *	  chunks.
38  *
39  *	  This module also has to deal with reassembling toast records from the
40  *	  individual chunks stored in WAL. When a new (or initial) version of a
41  *	  tuple is stored in WAL it will always be preceded by the toast chunks
42  *	  emitted for the columns stored out of line. Within a single toplevel
43  *	  transaction there will be no other data carrying records between a row's
44  *	  toast chunks and the row data itself. See ReorderBufferToast* for
45  *	  details.
46  *
47  *	  ReorderBuffer uses two special memory context types - SlabContext for
48  *	  allocations of fixed-length structures (changes and transactions), and
49  *	  GenerationContext for the variable-length transaction data (allocated
50  *	  and freed in groups with similar lifespans).
51  *
52  *	  To limit the amount of memory used by decoded changes, we track memory
53  *	  used at the reorder buffer level (i.e. total amount of memory), and for
54  *	  each transaction. When the total amount of used memory exceeds the
55  *	  limit, the transaction consuming the most memory is then serialized to
56  *	  disk.
57  *
58  *	  Only decoded changes are evicted from memory (spilled to disk), not the
59  *	  transaction records. The number of toplevel transactions is limited,
60  *	  but a transaction with many subtransactions may still consume significant
61  *	  amounts of memory. However, the transaction records are fairly small and
62  *	  are not included in the memory limit.
63  *
64  *	  The current eviction algorithm is very simple - the transaction is
65  *	  picked merely by size, while it might be useful to also consider age
66  *	  (LSN) of the changes for example. With the new Generational memory
67  *	  allocator, evicting the oldest changes would make it more likely the
68  *	  memory gets actually freed.
69  *
70  *	  We still rely on max_changes_in_memory when loading serialized changes
71  *	  back into memory. At that point we can't use the memory limit directly
72  *	  as we load the subxacts independently. One option to deal with this
73  *	  would be to count the subxacts, and allow each to allocate 1/N of the
74  *	  memory limit. That however does not seem very appealing, because with
75  *	  many subtransactions it may easily cause thrashing (short cycles of
76  *	  deserializing and applying very few changes). We probably should give
77  *	  a bit more memory to the oldest subtransactions, because it's likely
78  *	  they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
98 #include "replication/reorderbuffer.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110 
111 
112 /* entry for a hash table we use to map from xid to our transaction state */
113 typedef struct ReorderBufferTXNByIdEnt
114 {
115 	TransactionId xid;
116 	ReorderBufferTXN *txn;
117 } ReorderBufferTXNByIdEnt;
118 
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
120 typedef struct ReorderBufferTupleCidKey
121 {
122 	RelFileNode relnode;
123 	ItemPointerData tid;
124 } ReorderBufferTupleCidKey;
125 
126 typedef struct ReorderBufferTupleCidEnt
127 {
128 	ReorderBufferTupleCidKey key;
129 	CommandId	cmin;
130 	CommandId	cmax;
131 	CommandId	combocid;		/* just for debugging */
132 } ReorderBufferTupleCidEnt;
133 
134 /* Virtual file descriptor with file offset tracking */
135 typedef struct TXNEntryFile
136 {
137 	File		vfd;			/* -1 when the file is closed */
138 	off_t		curOffset;		/* offset for next write or read. Reset to 0
139 								 * when vfd is opened. */
140 } TXNEntryFile;
141 
142 /* k-way in-order change iteration support structures */
143 typedef struct ReorderBufferIterTXNEntry
144 {
145 	XLogRecPtr	lsn;
146 	ReorderBufferChange *change;
147 	ReorderBufferTXN *txn;
148 	TXNEntryFile file;
149 	XLogSegNo	segno;
150 } ReorderBufferIterTXNEntry;
151 
152 typedef struct ReorderBufferIterTXNState
153 {
154 	binaryheap *heap;
155 	Size		nr_txns;
156 	dlist_head	old_change;
157 	ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
158 } ReorderBufferIterTXNState;
159 
160 /* toast datastructures */
161 typedef struct ReorderBufferToastEnt
162 {
163 	Oid			chunk_id;		/* toast_table.chunk_id */
164 	int32		last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165 								 * have seen */
166 	Size		num_chunks;		/* number of chunks we've already seen */
167 	Size		size;			/* combined size of chunks seen */
168 	dlist_head	chunks;			/* linked list of chunks */
169 	struct varlena *reconstructed;	/* reconstructed varlena now pointed to in
170 									 * main tup */
171 } ReorderBufferToastEnt;
172 
173 /* Disk serialization support datastructures */
174 typedef struct ReorderBufferDiskChange
175 {
176 	Size		size;
177 	ReorderBufferChange change;
178 	/* data follows */
179 } ReorderBufferDiskChange;
180 
181 #define IsSpecInsert(action) \
182 ( \
183 	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
184 )
185 #define IsSpecConfirmOrAbort(action) \
186 ( \
187 	(((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
188 	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
189 )
190 #define IsInsertOrUpdate(action) \
191 ( \
192 	(((action) == REORDER_BUFFER_CHANGE_INSERT) || \
193 	((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
194 	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
195 )
196 
197 /*
198  * Maximum number of changes kept in memory, per transaction. After that,
199  * changes are spooled to disk.
200  *
201  * The current value should be sufficient to decode the entire transaction
202  * without hitting disk in OLTP workloads, while starting to spool to disk in
203  * other workloads reasonably fast.
204  *
205  * At some point in the future it probably makes sense to have a more elaborate
206  * resource management here, but it's not entirely clear what that would look
207  * like.
208  */
209 int			logical_decoding_work_mem;
210 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
211 
212 /* ---------------------------------------
213  * primary reorderbuffer support routines
214  * ---------------------------------------
215  */
216 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
217 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
218 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
219 											   TransactionId xid, bool create, bool *is_new,
220 											   XLogRecPtr lsn, bool create_as_top);
221 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
222 											  ReorderBufferTXN *subtxn);
223 
224 static void AssertTXNLsnOrder(ReorderBuffer *rb);
225 
226 /* ---------------------------------------
227  * support functions for lsn-order iterating over the ->changes of a
228  * transaction and its subtransactions
229  *
230  * used for iteration over the k-way heap merge of a transaction and its
231  * subtransactions
232  * ---------------------------------------
233  */
234 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
235 									 ReorderBufferIterTXNState *volatile *iter_state);
236 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
237 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
238 									   ReorderBufferIterTXNState *state);
239 static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
240 
241 /*
242  * ---------------------------------------
243  * Disk serialization support functions
244  * ---------------------------------------
245  */
246 static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
247 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
248 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
249 										 int fd, ReorderBufferChange *change);
250 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
251 										TXNEntryFile *file, XLogSegNo *segno);
252 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
253 									   char *change);
254 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
255 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
256 									 bool txn_prepared);
257 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
258 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
259 										TransactionId xid, XLogSegNo segno);
260 
261 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
262 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
263 									  ReorderBufferTXN *txn, CommandId cid);
264 
265 /*
266  * ---------------------------------------
267  * Streaming support functions
268  * ---------------------------------------
269  */
270 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
271 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
272 static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
273 static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
274 
275 /* ---------------------------------------
276  * toast reassembly support
277  * ---------------------------------------
278  */
279 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
280 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
281 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
282 									  Relation relation, ReorderBufferChange *change);
283 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
284 										  Relation relation, ReorderBufferChange *change);
285 
286 /*
287  * ---------------------------------------
288  * memory accounting
289  * ---------------------------------------
290  */
291 static Size ReorderBufferChangeSize(ReorderBufferChange *change);
292 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
293 											ReorderBufferChange *change,
294 											bool addition, Size sz);
295 
296 /*
297  * Allocate a new ReorderBuffer and clean out any old serialized state from
298  * prior ReorderBuffer instances for the same slot.
299  */
300 ReorderBuffer *
ReorderBufferAllocate(void)301 ReorderBufferAllocate(void)
302 {
303 	ReorderBuffer *buffer;
304 	HASHCTL		hash_ctl;
305 	MemoryContext new_ctx;
306 
307 	Assert(MyReplicationSlot != NULL);
308 
309 	/* allocate memory in own context, to have better accountability */
310 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
311 									"ReorderBuffer",
312 									ALLOCSET_DEFAULT_SIZES);
313 
314 	buffer =
315 		(ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
316 
317 	memset(&hash_ctl, 0, sizeof(hash_ctl));
318 
319 	buffer->context = new_ctx;
320 
321 	buffer->change_context = SlabContextCreate(new_ctx,
322 											   "Change",
323 											   SLAB_DEFAULT_BLOCK_SIZE,
324 											   sizeof(ReorderBufferChange));
325 
326 	buffer->txn_context = SlabContextCreate(new_ctx,
327 											"TXN",
328 											SLAB_DEFAULT_BLOCK_SIZE,
329 											sizeof(ReorderBufferTXN));
330 
331 	buffer->tup_context = GenerationContextCreate(new_ctx,
332 												  "Tuples",
333 												  SLAB_LARGE_BLOCK_SIZE);
334 
335 	hash_ctl.keysize = sizeof(TransactionId);
336 	hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
337 	hash_ctl.hcxt = buffer->context;
338 
339 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
340 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
341 
342 	buffer->by_txn_last_xid = InvalidTransactionId;
343 	buffer->by_txn_last_txn = NULL;
344 
345 	buffer->outbuf = NULL;
346 	buffer->outbufsize = 0;
347 	buffer->size = 0;
348 
349 	buffer->spillTxns = 0;
350 	buffer->spillCount = 0;
351 	buffer->spillBytes = 0;
352 	buffer->streamTxns = 0;
353 	buffer->streamCount = 0;
354 	buffer->streamBytes = 0;
355 	buffer->totalTxns = 0;
356 	buffer->totalBytes = 0;
357 
358 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
359 
360 	dlist_init(&buffer->toplevel_by_lsn);
361 	dlist_init(&buffer->txns_by_base_snapshot_lsn);
362 
363 	/*
364 	 * Ensure there's no stale data from prior uses of this slot, in case some
365 	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
366 	 * produce duplicated txns, and it's very cheap if there's nothing there.
367 	 */
368 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
369 
370 	return buffer;
371 }
372 
373 /*
374  * Free a ReorderBuffer
375  */
376 void
ReorderBufferFree(ReorderBuffer * rb)377 ReorderBufferFree(ReorderBuffer *rb)
378 {
379 	MemoryContext context = rb->context;
380 
381 	/*
382 	 * We free separately allocated data by entirely scrapping reorderbuffer's
383 	 * memory context.
384 	 */
385 	MemoryContextDelete(context);
386 
387 	/* Free disk space used by unconsumed reorder buffers */
388 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
389 }
390 
391 /*
392  * Get an unused, possibly preallocated, ReorderBufferTXN.
393  */
394 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)395 ReorderBufferGetTXN(ReorderBuffer *rb)
396 {
397 	ReorderBufferTXN *txn;
398 
399 	txn = (ReorderBufferTXN *)
400 		MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
401 
402 	memset(txn, 0, sizeof(ReorderBufferTXN));
403 
404 	dlist_init(&txn->changes);
405 	dlist_init(&txn->tuplecids);
406 	dlist_init(&txn->subtxns);
407 
408 	/* InvalidCommandId is not zero, so set it explicitly */
409 	txn->command_id = InvalidCommandId;
410 	txn->output_plugin_private = NULL;
411 
412 	return txn;
413 }
414 
415 /*
416  * Free a ReorderBufferTXN.
417  */
418 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)419 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
420 {
421 	/* clean the lookup cache if we were cached (quite likely) */
422 	if (rb->by_txn_last_xid == txn->xid)
423 	{
424 		rb->by_txn_last_xid = InvalidTransactionId;
425 		rb->by_txn_last_txn = NULL;
426 	}
427 
428 	/* free data that's contained */
429 
430 	if (txn->gid != NULL)
431 	{
432 		pfree(txn->gid);
433 		txn->gid = NULL;
434 	}
435 
436 	if (txn->tuplecid_hash != NULL)
437 	{
438 		hash_destroy(txn->tuplecid_hash);
439 		txn->tuplecid_hash = NULL;
440 	}
441 
442 	if (txn->invalidations)
443 	{
444 		pfree(txn->invalidations);
445 		txn->invalidations = NULL;
446 	}
447 
448 	/* Reset the toast hash */
449 	ReorderBufferToastReset(rb, txn);
450 
451 	pfree(txn);
452 }
453 
454 /*
455  * Get an fresh ReorderBufferChange.
456  */
457 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)458 ReorderBufferGetChange(ReorderBuffer *rb)
459 {
460 	ReorderBufferChange *change;
461 
462 	change = (ReorderBufferChange *)
463 		MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
464 
465 	memset(change, 0, sizeof(ReorderBufferChange));
466 	return change;
467 }
468 
469 /*
470  * Free a ReorderBufferChange and update memory accounting, if requested.
471  */
472 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change,bool upd_mem)473 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
474 						  bool upd_mem)
475 {
476 	/* update memory accounting info */
477 	if (upd_mem)
478 		ReorderBufferChangeMemoryUpdate(rb, change, false,
479 										ReorderBufferChangeSize(change));
480 
481 	/* free contained data */
482 	switch (change->action)
483 	{
484 		case REORDER_BUFFER_CHANGE_INSERT:
485 		case REORDER_BUFFER_CHANGE_UPDATE:
486 		case REORDER_BUFFER_CHANGE_DELETE:
487 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
488 			if (change->data.tp.newtuple)
489 			{
490 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
491 				change->data.tp.newtuple = NULL;
492 			}
493 
494 			if (change->data.tp.oldtuple)
495 			{
496 				ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
497 				change->data.tp.oldtuple = NULL;
498 			}
499 			break;
500 		case REORDER_BUFFER_CHANGE_MESSAGE:
501 			if (change->data.msg.prefix != NULL)
502 				pfree(change->data.msg.prefix);
503 			change->data.msg.prefix = NULL;
504 			if (change->data.msg.message != NULL)
505 				pfree(change->data.msg.message);
506 			change->data.msg.message = NULL;
507 			break;
508 		case REORDER_BUFFER_CHANGE_INVALIDATION:
509 			if (change->data.inval.invalidations)
510 				pfree(change->data.inval.invalidations);
511 			change->data.inval.invalidations = NULL;
512 			break;
513 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
514 			if (change->data.snapshot)
515 			{
516 				ReorderBufferFreeSnap(rb, change->data.snapshot);
517 				change->data.snapshot = NULL;
518 			}
519 			break;
520 			/* no data in addition to the struct itself */
521 		case REORDER_BUFFER_CHANGE_TRUNCATE:
522 			if (change->data.truncate.relids != NULL)
523 			{
524 				ReorderBufferReturnRelids(rb, change->data.truncate.relids);
525 				change->data.truncate.relids = NULL;
526 			}
527 			break;
528 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
529 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
530 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
531 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
532 			break;
533 	}
534 
535 	pfree(change);
536 }
537 
538 /*
539  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
540  * tuple_len (excluding header overhead).
541  */
542 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)543 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
544 {
545 	ReorderBufferTupleBuf *tuple;
546 	Size		alloc_len;
547 
548 	alloc_len = tuple_len + SizeofHeapTupleHeader;
549 
550 	tuple = (ReorderBufferTupleBuf *)
551 		MemoryContextAlloc(rb->tup_context,
552 						   sizeof(ReorderBufferTupleBuf) +
553 						   MAXIMUM_ALIGNOF + alloc_len);
554 	tuple->alloc_tuple_size = alloc_len;
555 	tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
556 
557 	return tuple;
558 }
559 
560 /*
561  * Free an ReorderBufferTupleBuf.
562  */
563 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)564 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
565 {
566 	pfree(tuple);
567 }
568 
569 /*
570  * Get an array for relids of truncated relations.
571  *
572  * We use the global memory context (for the whole reorder buffer), because
573  * none of the existing ones seems like a good match (some are SLAB, so we
574  * can't use those, and tup_context is meant for tuple data, not relids). We
575  * could add yet another context, but it seems like an overkill - TRUNCATE is
576  * not particularly common operation, so it does not seem worth it.
577  */
578 Oid *
ReorderBufferGetRelids(ReorderBuffer * rb,int nrelids)579 ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
580 {
581 	Oid		   *relids;
582 	Size		alloc_len;
583 
584 	alloc_len = sizeof(Oid) * nrelids;
585 
586 	relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
587 
588 	return relids;
589 }
590 
591 /*
592  * Free an array of relids.
593  */
594 void
ReorderBufferReturnRelids(ReorderBuffer * rb,Oid * relids)595 ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
596 {
597 	pfree(relids);
598 }
599 
600 /*
601  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
602  * If create is true, and a transaction doesn't already exist, create it
603  * (with the given LSN, and as top transaction if that's specified);
604  * when this happens, is_new is set to true.
605  */
606 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)607 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
608 					  bool *is_new, XLogRecPtr lsn, bool create_as_top)
609 {
610 	ReorderBufferTXN *txn;
611 	ReorderBufferTXNByIdEnt *ent;
612 	bool		found;
613 
614 	Assert(TransactionIdIsValid(xid));
615 
616 	/*
617 	 * Check the one-entry lookup cache first
618 	 */
619 	if (TransactionIdIsValid(rb->by_txn_last_xid) &&
620 		rb->by_txn_last_xid == xid)
621 	{
622 		txn = rb->by_txn_last_txn;
623 
624 		if (txn != NULL)
625 		{
626 			/* found it, and it's valid */
627 			if (is_new)
628 				*is_new = false;
629 			return txn;
630 		}
631 
632 		/*
633 		 * cached as non-existent, and asked not to create? Then nothing else
634 		 * to do.
635 		 */
636 		if (!create)
637 			return NULL;
638 		/* otherwise fall through to create it */
639 	}
640 
641 	/*
642 	 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
643 	 * to create an entry.
644 	 */
645 
646 	/* search the lookup table */
647 	ent = (ReorderBufferTXNByIdEnt *)
648 		hash_search(rb->by_txn,
649 					(void *) &xid,
650 					create ? HASH_ENTER : HASH_FIND,
651 					&found);
652 	if (found)
653 		txn = ent->txn;
654 	else if (create)
655 	{
656 		/* initialize the new entry, if creation was requested */
657 		Assert(ent != NULL);
658 		Assert(lsn != InvalidXLogRecPtr);
659 
660 		ent->txn = ReorderBufferGetTXN(rb);
661 		ent->txn->xid = xid;
662 		txn = ent->txn;
663 		txn->first_lsn = lsn;
664 		txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
665 
666 		if (create_as_top)
667 		{
668 			dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
669 			AssertTXNLsnOrder(rb);
670 		}
671 	}
672 	else
673 		txn = NULL;				/* not found and not asked to create */
674 
675 	/* update cache */
676 	rb->by_txn_last_xid = xid;
677 	rb->by_txn_last_txn = txn;
678 
679 	if (is_new)
680 		*is_new = !found;
681 
682 	Assert(!create || txn != NULL);
683 	return txn;
684 }
685 
686 /*
687  * Record the partial change for the streaming of in-progress transactions.  We
688  * can stream only complete changes so if we have a partial change like toast
689  * table insert or speculative insert then we mark such a 'txn' so that it
690  * can't be streamed.  We also ensure that if the changes in such a 'txn' are
691  * above logical_decoding_work_mem threshold then we stream them as soon as we
692  * have a complete change.
693  */
694 static void
ReorderBufferProcessPartialChange(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferChange * change,bool toast_insert)695 ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
696 								  ReorderBufferChange *change,
697 								  bool toast_insert)
698 {
699 	ReorderBufferTXN *toptxn;
700 
701 	/*
702 	 * The partial changes need to be processed only while streaming
703 	 * in-progress transactions.
704 	 */
705 	if (!ReorderBufferCanStream(rb))
706 		return;
707 
708 	/* Get the top transaction. */
709 	if (txn->toptxn != NULL)
710 		toptxn = txn->toptxn;
711 	else
712 		toptxn = txn;
713 
714 	/*
715 	 * Indicate a partial change for toast inserts.  The change will be
716 	 * considered as complete once we get the insert or update on the main
717 	 * table and we are sure that the pending toast chunks are not required
718 	 * anymore.
719 	 *
720 	 * If we allow streaming when there are pending toast chunks then such
721 	 * chunks won't be released till the insert (multi_insert) is complete and
722 	 * we expect the txn to have streamed all changes after streaming.  This
723 	 * restriction is mainly to ensure the correctness of streamed
724 	 * transactions and it doesn't seem worth uplifting such a restriction
725 	 * just to allow this case because anyway we will stream the transaction
726 	 * once such an insert is complete.
727 	 */
728 	if (toast_insert)
729 		toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
730 	else if (rbtxn_has_partial_change(toptxn) &&
731 			 IsInsertOrUpdate(change->action) &&
732 			 change->data.tp.clear_toast_afterwards)
733 		toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
734 
735 	/*
736 	 * Indicate a partial change for speculative inserts.  The change will be
737 	 * considered as complete once we get the speculative confirm or abort
738 	 * token.
739 	 */
740 	if (IsSpecInsert(change->action))
741 		toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
742 	else if (rbtxn_has_partial_change(toptxn) &&
743 			 IsSpecConfirmOrAbort(change->action))
744 		toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
745 
746 	/*
747 	 * Stream the transaction if it is serialized before and the changes are
748 	 * now complete in the top-level transaction.
749 	 *
750 	 * The reason for doing the streaming of such a transaction as soon as we
751 	 * get the complete change for it is that previously it would have reached
752 	 * the memory threshold and wouldn't get streamed because of incomplete
753 	 * changes.  Delaying such transactions would increase apply lag for them.
754 	 */
755 	if (ReorderBufferCanStartStreaming(rb) &&
756 		!(rbtxn_has_partial_change(toptxn)) &&
757 		rbtxn_is_serialized(txn))
758 		ReorderBufferStreamTXN(rb, toptxn);
759 }
760 
761 /*
762  * Queue a change into a transaction so it can be replayed upon commit or will be
763  * streamed when we reach logical_decoding_work_mem threshold.
764  */
765 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change,bool toast_insert)766 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
767 						 ReorderBufferChange *change, bool toast_insert)
768 {
769 	ReorderBufferTXN *txn;
770 
771 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
772 
773 	/*
774 	 * While streaming the previous changes we have detected that the
775 	 * transaction is aborted.  So there is no point in collecting further
776 	 * changes for it.
777 	 */
778 	if (txn->concurrent_abort)
779 	{
780 		/*
781 		 * We don't need to update memory accounting for this change as we
782 		 * have not added it to the queue yet.
783 		 */
784 		ReorderBufferReturnChange(rb, change, false);
785 		return;
786 	}
787 
788 	change->lsn = lsn;
789 	change->txn = txn;
790 
791 	Assert(InvalidXLogRecPtr != lsn);
792 	dlist_push_tail(&txn->changes, &change->node);
793 	txn->nentries++;
794 	txn->nentries_mem++;
795 
796 	/* update memory accounting information */
797 	ReorderBufferChangeMemoryUpdate(rb, change, true,
798 									ReorderBufferChangeSize(change));
799 
800 	/* process partial change */
801 	ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
802 
803 	/* check the memory limits and evict something if needed */
804 	ReorderBufferCheckMemoryLimit(rb);
805 }
806 
807 /*
808  * A transactional message is queued to be processed upon commit and a
809  * non-transactional message gets processed immediately.
810  */
811 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)812 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
813 						  Snapshot snapshot, XLogRecPtr lsn,
814 						  bool transactional, const char *prefix,
815 						  Size message_size, const char *message)
816 {
817 	if (transactional)
818 	{
819 		MemoryContext oldcontext;
820 		ReorderBufferChange *change;
821 
822 		Assert(xid != InvalidTransactionId);
823 
824 		oldcontext = MemoryContextSwitchTo(rb->context);
825 
826 		change = ReorderBufferGetChange(rb);
827 		change->action = REORDER_BUFFER_CHANGE_MESSAGE;
828 		change->data.msg.prefix = pstrdup(prefix);
829 		change->data.msg.message_size = message_size;
830 		change->data.msg.message = palloc(message_size);
831 		memcpy(change->data.msg.message, message, message_size);
832 
833 		ReorderBufferQueueChange(rb, xid, lsn, change, false);
834 
835 		MemoryContextSwitchTo(oldcontext);
836 	}
837 	else
838 	{
839 		ReorderBufferTXN *txn = NULL;
840 		volatile Snapshot snapshot_now = snapshot;
841 
842 		if (xid != InvalidTransactionId)
843 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
844 
845 		/* setup snapshot to allow catalog access */
846 		SetupHistoricSnapshot(snapshot_now, NULL);
847 		PG_TRY();
848 		{
849 			rb->message(rb, txn, lsn, false, prefix, message_size, message);
850 
851 			TeardownHistoricSnapshot(false);
852 		}
853 		PG_CATCH();
854 		{
855 			TeardownHistoricSnapshot(true);
856 			PG_RE_THROW();
857 		}
858 		PG_END_TRY();
859 	}
860 }
861 
862 /*
863  * AssertTXNLsnOrder
864  *		Verify LSN ordering of transaction lists in the reorderbuffer
865  *
866  * Other LSN-related invariants are checked too.
867  *
868  * No-op if assertions are not in use.
869  */
870 static void
AssertTXNLsnOrder(ReorderBuffer * rb)871 AssertTXNLsnOrder(ReorderBuffer *rb)
872 {
873 #ifdef USE_ASSERT_CHECKING
874 	dlist_iter	iter;
875 	XLogRecPtr	prev_first_lsn = InvalidXLogRecPtr;
876 	XLogRecPtr	prev_base_snap_lsn = InvalidXLogRecPtr;
877 
878 	dlist_foreach(iter, &rb->toplevel_by_lsn)
879 	{
880 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
881 													iter.cur);
882 
883 		/* start LSN must be set */
884 		Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
885 
886 		/* If there is an end LSN, it must be higher than start LSN */
887 		if (cur_txn->end_lsn != InvalidXLogRecPtr)
888 			Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
889 
890 		/* Current initial LSN must be strictly higher than previous */
891 		if (prev_first_lsn != InvalidXLogRecPtr)
892 			Assert(prev_first_lsn < cur_txn->first_lsn);
893 
894 		/* known-as-subtxn txns must not be listed */
895 		Assert(!rbtxn_is_known_subxact(cur_txn));
896 
897 		prev_first_lsn = cur_txn->first_lsn;
898 	}
899 
900 	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
901 	{
902 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
903 													base_snapshot_node,
904 													iter.cur);
905 
906 		/* base snapshot (and its LSN) must be set */
907 		Assert(cur_txn->base_snapshot != NULL);
908 		Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
909 
910 		/* current LSN must be strictly higher than previous */
911 		if (prev_base_snap_lsn != InvalidXLogRecPtr)
912 			Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
913 
914 		/* known-as-subtxn txns must not be listed */
915 		Assert(!rbtxn_is_known_subxact(cur_txn));
916 
917 		prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
918 	}
919 #endif
920 }
921 
922 /*
923  * AssertChangeLsnOrder
924  *
925  * Check ordering of changes in the (sub)transaction.
926  */
927 static void
AssertChangeLsnOrder(ReorderBufferTXN * txn)928 AssertChangeLsnOrder(ReorderBufferTXN *txn)
929 {
930 #ifdef USE_ASSERT_CHECKING
931 	dlist_iter	iter;
932 	XLogRecPtr	prev_lsn = txn->first_lsn;
933 
934 	dlist_foreach(iter, &txn->changes)
935 	{
936 		ReorderBufferChange *cur_change;
937 
938 		cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
939 
940 		Assert(txn->first_lsn != InvalidXLogRecPtr);
941 		Assert(cur_change->lsn != InvalidXLogRecPtr);
942 		Assert(txn->first_lsn <= cur_change->lsn);
943 
944 		if (txn->end_lsn != InvalidXLogRecPtr)
945 			Assert(cur_change->lsn <= txn->end_lsn);
946 
947 		Assert(prev_lsn <= cur_change->lsn);
948 
949 		prev_lsn = cur_change->lsn;
950 	}
951 #endif
952 }
953 
954 /*
955  * ReorderBufferGetOldestTXN
956  *		Return oldest transaction in reorderbuffer
957  */
958 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)959 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
960 {
961 	ReorderBufferTXN *txn;
962 
963 	AssertTXNLsnOrder(rb);
964 
965 	if (dlist_is_empty(&rb->toplevel_by_lsn))
966 		return NULL;
967 
968 	txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
969 
970 	Assert(!rbtxn_is_known_subxact(txn));
971 	Assert(txn->first_lsn != InvalidXLogRecPtr);
972 	return txn;
973 }
974 
975 /*
976  * ReorderBufferGetOldestXmin
977  *		Return oldest Xmin in reorderbuffer
978  *
979  * Returns oldest possibly running Xid from the point of view of snapshots
980  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
981  * there are none.
982  *
983  * Since snapshots are assigned monotonically, this equals the Xmin of the
984  * base snapshot with minimal base_snapshot_lsn.
985  */
986 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)987 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
988 {
989 	ReorderBufferTXN *txn;
990 
991 	AssertTXNLsnOrder(rb);
992 
993 	if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
994 		return InvalidTransactionId;
995 
996 	txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
997 							 &rb->txns_by_base_snapshot_lsn);
998 	return txn->base_snapshot->xmin;
999 }
1000 
1001 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)1002 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
1003 {
1004 	rb->current_restart_decoding_lsn = ptr;
1005 }
1006 
1007 /*
1008  * ReorderBufferAssignChild
1009  *
1010  * Make note that we know that subxid is a subtransaction of xid, seen as of
1011  * the given lsn.
1012  */
1013 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)1014 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
1015 						 TransactionId subxid, XLogRecPtr lsn)
1016 {
1017 	ReorderBufferTXN *txn;
1018 	ReorderBufferTXN *subtxn;
1019 	bool		new_top;
1020 	bool		new_sub;
1021 
1022 	txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1023 	subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1024 
1025 	if (!new_sub)
1026 	{
1027 		if (rbtxn_is_known_subxact(subtxn))
1028 		{
1029 			/* already associated, nothing to do */
1030 			return;
1031 		}
1032 		else
1033 		{
1034 			/*
1035 			 * We already saw this transaction, but initially added it to the
1036 			 * list of top-level txns.  Now that we know it's not top-level,
1037 			 * remove it from there.
1038 			 */
1039 			dlist_delete(&subtxn->node);
1040 		}
1041 	}
1042 
1043 	subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1044 	subtxn->toplevel_xid = xid;
1045 	Assert(subtxn->nsubtxns == 0);
1046 
1047 	/* set the reference to top-level transaction */
1048 	subtxn->toptxn = txn;
1049 
1050 	/* add to subtransaction list */
1051 	dlist_push_tail(&txn->subtxns, &subtxn->node);
1052 	txn->nsubtxns++;
1053 
1054 	/* Possibly transfer the subtxn's snapshot to its top-level txn. */
1055 	ReorderBufferTransferSnapToParent(txn, subtxn);
1056 
1057 	/* Verify LSN-ordering invariant */
1058 	AssertTXNLsnOrder(rb);
1059 }
1060 
1061 /*
1062  * ReorderBufferTransferSnapToParent
1063  *		Transfer base snapshot from subtxn to top-level txn, if needed
1064  *
1065  * This is done if the top-level txn doesn't have a base snapshot, or if the
1066  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1067  * snapshot's LSN.  This can happen if there are no changes in the toplevel
1068  * txn but there are some in the subtxn, or the first change in subtxn has
1069  * earlier LSN than first change in the top-level txn and we learned about
1070  * their kinship only now.
1071  *
1072  * The subtransaction's snapshot is cleared regardless of the transfer
1073  * happening, since it's not needed anymore in either case.
1074  *
1075  * We do this as soon as we become aware of their kinship, to avoid queueing
1076  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1077  * receive further snapshots.
1078  */
1079 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)1080 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
1081 								  ReorderBufferTXN *subtxn)
1082 {
1083 	Assert(subtxn->toplevel_xid == txn->xid);
1084 
1085 	if (subtxn->base_snapshot != NULL)
1086 	{
1087 		if (txn->base_snapshot == NULL ||
1088 			subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1089 		{
1090 			/*
1091 			 * If the toplevel transaction already has a base snapshot but
1092 			 * it's newer than the subxact's, purge it.
1093 			 */
1094 			if (txn->base_snapshot != NULL)
1095 			{
1096 				SnapBuildSnapDecRefcount(txn->base_snapshot);
1097 				dlist_delete(&txn->base_snapshot_node);
1098 			}
1099 
1100 			/*
1101 			 * The snapshot is now the top transaction's; transfer it, and
1102 			 * adjust the list position of the top transaction in the list by
1103 			 * moving it to where the subtransaction is.
1104 			 */
1105 			txn->base_snapshot = subtxn->base_snapshot;
1106 			txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1107 			dlist_insert_before(&subtxn->base_snapshot_node,
1108 								&txn->base_snapshot_node);
1109 
1110 			/*
1111 			 * The subtransaction doesn't have a snapshot anymore (so it
1112 			 * mustn't be in the list.)
1113 			 */
1114 			subtxn->base_snapshot = NULL;
1115 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1116 			dlist_delete(&subtxn->base_snapshot_node);
1117 		}
1118 		else
1119 		{
1120 			/* Base snap of toplevel is fine, so subxact's is not needed */
1121 			SnapBuildSnapDecRefcount(subtxn->base_snapshot);
1122 			dlist_delete(&subtxn->base_snapshot_node);
1123 			subtxn->base_snapshot = NULL;
1124 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1125 		}
1126 	}
1127 }
1128 
1129 /*
1130  * Associate a subtransaction with its toplevel transaction at commit
1131  * time. There may be no further changes added after this.
1132  */
1133 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)1134 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
1135 						 TransactionId subxid, XLogRecPtr commit_lsn,
1136 						 XLogRecPtr end_lsn)
1137 {
1138 	ReorderBufferTXN *subtxn;
1139 
1140 	subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1141 								   InvalidXLogRecPtr, false);
1142 
1143 	/*
1144 	 * No need to do anything if that subtxn didn't contain any changes
1145 	 */
1146 	if (!subtxn)
1147 		return;
1148 
1149 	subtxn->final_lsn = commit_lsn;
1150 	subtxn->end_lsn = end_lsn;
1151 
1152 	/*
1153 	 * Assign this subxact as a child of the toplevel xact (no-op if already
1154 	 * done.)
1155 	 */
1156 	ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1157 }
1158 
1159 
1160 /*
1161  * Support for efficiently iterating over a transaction's and its
1162  * subtransactions' changes.
1163  *
1164  * We do by doing a k-way merge between transactions/subtransactions. For that
1165  * we model the current heads of the different transactions as a binary heap
1166  * so we easily know which (sub-)transaction has the change with the smallest
1167  * lsn next.
1168  *
1169  * We assume the changes in individual transactions are already sorted by LSN.
1170  */
1171 
1172 /*
1173  * Binary heap comparison function.
1174  */
1175 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)1176 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
1177 {
1178 	ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
1179 	XLogRecPtr	pos_a = state->entries[DatumGetInt32(a)].lsn;
1180 	XLogRecPtr	pos_b = state->entries[DatumGetInt32(b)].lsn;
1181 
1182 	if (pos_a < pos_b)
1183 		return 1;
1184 	else if (pos_a == pos_b)
1185 		return 0;
1186 	return -1;
1187 }
1188 
1189 /*
1190  * Allocate & initialize an iterator which iterates in lsn order over a
1191  * transaction and all its subtransactions.
1192  *
1193  * Note: The iterator state is returned through iter_state parameter rather
1194  * than the function's return value.  This is because the state gets cleaned up
1195  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1196  * back the state even if this function throws an exception.
1197  */
1198 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)1199 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
1200 						 ReorderBufferIterTXNState *volatile *iter_state)
1201 {
1202 	Size		nr_txns = 0;
1203 	ReorderBufferIterTXNState *state;
1204 	dlist_iter	cur_txn_i;
1205 	int32		off;
1206 
1207 	*iter_state = NULL;
1208 
1209 	/* Check ordering of changes in the toplevel transaction. */
1210 	AssertChangeLsnOrder(txn);
1211 
1212 	/*
1213 	 * Calculate the size of our heap: one element for every transaction that
1214 	 * contains changes.  (Besides the transactions already in the reorder
1215 	 * buffer, we count the one we were directly passed.)
1216 	 */
1217 	if (txn->nentries > 0)
1218 		nr_txns++;
1219 
1220 	dlist_foreach(cur_txn_i, &txn->subtxns)
1221 	{
1222 		ReorderBufferTXN *cur_txn;
1223 
1224 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1225 
1226 		/* Check ordering of changes in this subtransaction. */
1227 		AssertChangeLsnOrder(cur_txn);
1228 
1229 		if (cur_txn->nentries > 0)
1230 			nr_txns++;
1231 	}
1232 
1233 	/* allocate iteration state */
1234 	state = (ReorderBufferIterTXNState *)
1235 		MemoryContextAllocZero(rb->context,
1236 							   sizeof(ReorderBufferIterTXNState) +
1237 							   sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1238 
1239 	state->nr_txns = nr_txns;
1240 	dlist_init(&state->old_change);
1241 
1242 	for (off = 0; off < state->nr_txns; off++)
1243 	{
1244 		state->entries[off].file.vfd = -1;
1245 		state->entries[off].segno = 0;
1246 	}
1247 
1248 	/* allocate heap */
1249 	state->heap = binaryheap_allocate(state->nr_txns,
1250 									  ReorderBufferIterCompare,
1251 									  state);
1252 
1253 	/* Now that the state fields are initialized, it is safe to return it. */
1254 	*iter_state = state;
1255 
1256 	/*
1257 	 * Now insert items into the binary heap, in an unordered fashion.  (We
1258 	 * will run a heap assembly step at the end; this is more efficient.)
1259 	 */
1260 
1261 	off = 0;
1262 
1263 	/* add toplevel transaction if it contains changes */
1264 	if (txn->nentries > 0)
1265 	{
1266 		ReorderBufferChange *cur_change;
1267 
1268 		if (rbtxn_is_serialized(txn))
1269 		{
1270 			/* serialize remaining changes */
1271 			ReorderBufferSerializeTXN(rb, txn);
1272 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1273 										&state->entries[off].segno);
1274 		}
1275 
1276 		cur_change = dlist_head_element(ReorderBufferChange, node,
1277 										&txn->changes);
1278 
1279 		state->entries[off].lsn = cur_change->lsn;
1280 		state->entries[off].change = cur_change;
1281 		state->entries[off].txn = txn;
1282 
1283 		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1284 	}
1285 
1286 	/* add subtransactions if they contain changes */
1287 	dlist_foreach(cur_txn_i, &txn->subtxns)
1288 	{
1289 		ReorderBufferTXN *cur_txn;
1290 
1291 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1292 
1293 		if (cur_txn->nentries > 0)
1294 		{
1295 			ReorderBufferChange *cur_change;
1296 
1297 			if (rbtxn_is_serialized(cur_txn))
1298 			{
1299 				/* serialize remaining changes */
1300 				ReorderBufferSerializeTXN(rb, cur_txn);
1301 				ReorderBufferRestoreChanges(rb, cur_txn,
1302 											&state->entries[off].file,
1303 											&state->entries[off].segno);
1304 			}
1305 			cur_change = dlist_head_element(ReorderBufferChange, node,
1306 											&cur_txn->changes);
1307 
1308 			state->entries[off].lsn = cur_change->lsn;
1309 			state->entries[off].change = cur_change;
1310 			state->entries[off].txn = cur_txn;
1311 
1312 			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1313 		}
1314 	}
1315 
1316 	/* assemble a valid binary heap */
1317 	binaryheap_build(state->heap);
1318 }
1319 
1320 /*
1321  * Return the next change when iterating over a transaction and its
1322  * subtransactions.
1323  *
1324  * Returns NULL when no further changes exist.
1325  */
1326 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1327 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1328 {
1329 	ReorderBufferChange *change;
1330 	ReorderBufferIterTXNEntry *entry;
1331 	int32		off;
1332 
1333 	/* nothing there anymore */
1334 	if (state->heap->bh_size == 0)
1335 		return NULL;
1336 
1337 	off = DatumGetInt32(binaryheap_first(state->heap));
1338 	entry = &state->entries[off];
1339 
1340 	/* free memory we might have "leaked" in the previous *Next call */
1341 	if (!dlist_is_empty(&state->old_change))
1342 	{
1343 		change = dlist_container(ReorderBufferChange, node,
1344 								 dlist_pop_head_node(&state->old_change));
1345 		ReorderBufferReturnChange(rb, change, true);
1346 		Assert(dlist_is_empty(&state->old_change));
1347 	}
1348 
1349 	change = entry->change;
1350 
1351 	/*
1352 	 * update heap with information about which transaction has the next
1353 	 * relevant change in LSN order
1354 	 */
1355 
1356 	/* there are in-memory changes */
1357 	if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1358 	{
1359 		dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1360 		ReorderBufferChange *next_change =
1361 		dlist_container(ReorderBufferChange, node, next);
1362 
1363 		/* txn stays the same */
1364 		state->entries[off].lsn = next_change->lsn;
1365 		state->entries[off].change = next_change;
1366 
1367 		binaryheap_replace_first(state->heap, Int32GetDatum(off));
1368 		return change;
1369 	}
1370 
1371 	/* try to load changes from disk */
1372 	if (entry->txn->nentries != entry->txn->nentries_mem)
1373 	{
1374 		/*
1375 		 * Ugly: restoring changes will reuse *Change records, thus delete the
1376 		 * current one from the per-tx list and only free in the next call.
1377 		 */
1378 		dlist_delete(&change->node);
1379 		dlist_push_tail(&state->old_change, &change->node);
1380 
1381 		/*
1382 		 * Update the total bytes processed by the txn for which we are
1383 		 * releasing the current set of changes and restoring the new set of
1384 		 * changes.
1385 		 */
1386 		rb->totalBytes += entry->txn->size;
1387 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1388 										&state->entries[off].segno))
1389 		{
1390 			/* successfully restored changes from disk */
1391 			ReorderBufferChange *next_change =
1392 			dlist_head_element(ReorderBufferChange, node,
1393 							   &entry->txn->changes);
1394 
1395 			elog(DEBUG2, "restored %u/%u changes from disk",
1396 				 (uint32) entry->txn->nentries_mem,
1397 				 (uint32) entry->txn->nentries);
1398 
1399 			Assert(entry->txn->nentries_mem);
1400 			/* txn stays the same */
1401 			state->entries[off].lsn = next_change->lsn;
1402 			state->entries[off].change = next_change;
1403 			binaryheap_replace_first(state->heap, Int32GetDatum(off));
1404 
1405 			return change;
1406 		}
1407 	}
1408 
1409 	/* ok, no changes there anymore, remove */
1410 	binaryheap_remove_first(state->heap);
1411 
1412 	return change;
1413 }
1414 
1415 /*
1416  * Deallocate the iterator
1417  */
1418 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1419 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1420 						   ReorderBufferIterTXNState *state)
1421 {
1422 	int32		off;
1423 
1424 	for (off = 0; off < state->nr_txns; off++)
1425 	{
1426 		if (state->entries[off].file.vfd != -1)
1427 			FileClose(state->entries[off].file.vfd);
1428 	}
1429 
1430 	/* free memory we might have "leaked" in the last *Next call */
1431 	if (!dlist_is_empty(&state->old_change))
1432 	{
1433 		ReorderBufferChange *change;
1434 
1435 		change = dlist_container(ReorderBufferChange, node,
1436 								 dlist_pop_head_node(&state->old_change));
1437 		ReorderBufferReturnChange(rb, change, true);
1438 		Assert(dlist_is_empty(&state->old_change));
1439 	}
1440 
1441 	binaryheap_free(state->heap);
1442 	pfree(state);
1443 }
1444 
1445 /*
1446  * Cleanup the contents of a transaction, usually after the transaction
1447  * committed or aborted.
1448  */
1449 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1450 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1451 {
1452 	bool		found;
1453 	dlist_mutable_iter iter;
1454 
1455 	/* cleanup subtransactions & their changes */
1456 	dlist_foreach_modify(iter, &txn->subtxns)
1457 	{
1458 		ReorderBufferTXN *subtxn;
1459 
1460 		subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1461 
1462 		/*
1463 		 * Subtransactions are always associated to the toplevel TXN, even if
1464 		 * they originally were happening inside another subtxn, so we won't
1465 		 * ever recurse more than one level deep here.
1466 		 */
1467 		Assert(rbtxn_is_known_subxact(subtxn));
1468 		Assert(subtxn->nsubtxns == 0);
1469 
1470 		ReorderBufferCleanupTXN(rb, subtxn);
1471 	}
1472 
1473 	/* cleanup changes in the txn */
1474 	dlist_foreach_modify(iter, &txn->changes)
1475 	{
1476 		ReorderBufferChange *change;
1477 
1478 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1479 
1480 		/* Check we're not mixing changes from different transactions. */
1481 		Assert(change->txn == txn);
1482 
1483 		ReorderBufferReturnChange(rb, change, true);
1484 	}
1485 
1486 	/*
1487 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1488 	 * They are always stored in the toplevel transaction.
1489 	 */
1490 	dlist_foreach_modify(iter, &txn->tuplecids)
1491 	{
1492 		ReorderBufferChange *change;
1493 
1494 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1495 
1496 		/* Check we're not mixing changes from different transactions. */
1497 		Assert(change->txn == txn);
1498 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1499 
1500 		ReorderBufferReturnChange(rb, change, true);
1501 	}
1502 
1503 	/*
1504 	 * Cleanup the base snapshot, if set.
1505 	 */
1506 	if (txn->base_snapshot != NULL)
1507 	{
1508 		SnapBuildSnapDecRefcount(txn->base_snapshot);
1509 		dlist_delete(&txn->base_snapshot_node);
1510 	}
1511 
1512 	/*
1513 	 * Cleanup the snapshot for the last streamed run.
1514 	 */
1515 	if (txn->snapshot_now != NULL)
1516 	{
1517 		Assert(rbtxn_is_streamed(txn));
1518 		ReorderBufferFreeSnap(rb, txn->snapshot_now);
1519 	}
1520 
1521 	/*
1522 	 * Remove TXN from its containing list.
1523 	 *
1524 	 * Note: if txn is known as subxact, we are deleting the TXN from its
1525 	 * parent's list of known subxacts; this leaves the parent's nsubxacts
1526 	 * count too high, but we don't care.  Otherwise, we are deleting the TXN
1527 	 * from the LSN-ordered list of toplevel TXNs.
1528 	 */
1529 	dlist_delete(&txn->node);
1530 
1531 	/* now remove reference from buffer */
1532 	hash_search(rb->by_txn,
1533 				(void *) &txn->xid,
1534 				HASH_REMOVE,
1535 				&found);
1536 	Assert(found);
1537 
1538 	/* remove entries spilled to disk */
1539 	if (rbtxn_is_serialized(txn))
1540 		ReorderBufferRestoreCleanup(rb, txn);
1541 
1542 	/* deallocate */
1543 	ReorderBufferReturnTXN(rb, txn);
1544 }
1545 
1546 /*
1547  * Discard changes from a transaction (and subtransactions), either after
1548  * streaming or decoding them at PREPARE. Keep the remaining info -
1549  * transactions, tuplecids, invalidations and snapshots.
1550  *
1551  * We additionaly remove tuplecids after decoding the transaction at prepare
1552  * time as we only need to perform invalidation at rollback or commit prepared.
1553  *
1554  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1555  * time.
1556  */
1557 static void
ReorderBufferTruncateTXN(ReorderBuffer * rb,ReorderBufferTXN * txn,bool txn_prepared)1558 ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
1559 {
1560 	dlist_mutable_iter iter;
1561 
1562 	/* cleanup subtransactions & their changes */
1563 	dlist_foreach_modify(iter, &txn->subtxns)
1564 	{
1565 		ReorderBufferTXN *subtxn;
1566 
1567 		subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1568 
1569 		/*
1570 		 * Subtransactions are always associated to the toplevel TXN, even if
1571 		 * they originally were happening inside another subtxn, so we won't
1572 		 * ever recurse more than one level deep here.
1573 		 */
1574 		Assert(rbtxn_is_known_subxact(subtxn));
1575 		Assert(subtxn->nsubtxns == 0);
1576 
1577 		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1578 	}
1579 
1580 	/* cleanup changes in the txn */
1581 	dlist_foreach_modify(iter, &txn->changes)
1582 	{
1583 		ReorderBufferChange *change;
1584 
1585 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1586 
1587 		/* Check we're not mixing changes from different transactions. */
1588 		Assert(change->txn == txn);
1589 
1590 		/* remove the change from it's containing list */
1591 		dlist_delete(&change->node);
1592 
1593 		ReorderBufferReturnChange(rb, change, true);
1594 	}
1595 
1596 	/*
1597 	 * Mark the transaction as streamed.
1598 	 *
1599 	 * The toplevel transaction, identified by (toptxn==NULL), is marked as
1600 	 * streamed always, even if it does not contain any changes (that is, when
1601 	 * all the changes are in subtransactions).
1602 	 *
1603 	 * For subtransactions, we only mark them as streamed when there are
1604 	 * changes in them.
1605 	 *
1606 	 * We do it this way because of aborts - we don't want to send aborts for
1607 	 * XIDs the downstream is not aware of. And of course, it always knows
1608 	 * about the toplevel xact (we send the XID in all messages), but we never
1609 	 * stream XIDs of empty subxacts.
1610 	 */
1611 	if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
1612 		txn->txn_flags |= RBTXN_IS_STREAMED;
1613 
1614 	if (txn_prepared)
1615 	{
1616 		/*
1617 		 * If this is a prepared txn, cleanup the tuplecids we stored for
1618 		 * decoding catalog snapshot access. They are always stored in the
1619 		 * toplevel transaction.
1620 		 */
1621 		dlist_foreach_modify(iter, &txn->tuplecids)
1622 		{
1623 			ReorderBufferChange *change;
1624 
1625 			change = dlist_container(ReorderBufferChange, node, iter.cur);
1626 
1627 			/* Check we're not mixing changes from different transactions. */
1628 			Assert(change->txn == txn);
1629 			Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1630 
1631 			/* Remove the change from its containing list. */
1632 			dlist_delete(&change->node);
1633 
1634 			ReorderBufferReturnChange(rb, change, true);
1635 		}
1636 	}
1637 
1638 	/*
1639 	 * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
1640 	 * memory. We could also keep the hash table and update it with new ctid
1641 	 * values, but this seems simpler and good enough for now.
1642 	 */
1643 	if (txn->tuplecid_hash != NULL)
1644 	{
1645 		hash_destroy(txn->tuplecid_hash);
1646 		txn->tuplecid_hash = NULL;
1647 	}
1648 
1649 	/* If this txn is serialized then clean the disk space. */
1650 	if (rbtxn_is_serialized(txn))
1651 	{
1652 		ReorderBufferRestoreCleanup(rb, txn);
1653 		txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1654 
1655 		/*
1656 		 * We set this flag to indicate if the transaction is ever serialized.
1657 		 * We need this to accurately update the stats as otherwise the same
1658 		 * transaction can be counted as serialized multiple times.
1659 		 */
1660 		txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
1661 	}
1662 
1663 	/* also reset the number of entries in the transaction */
1664 	txn->nentries_mem = 0;
1665 	txn->nentries = 0;
1666 }
1667 
1668 /*
1669  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1670  * HeapTupleSatisfiesHistoricMVCC.
1671  */
1672 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1673 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1674 {
1675 	dlist_iter	iter;
1676 	HASHCTL		hash_ctl;
1677 
1678 	if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
1679 		return;
1680 
1681 	hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1682 	hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1683 	hash_ctl.hcxt = rb->context;
1684 
1685 	/*
1686 	 * create the hash with the exact number of to-be-stored tuplecids from
1687 	 * the start
1688 	 */
1689 	txn->tuplecid_hash =
1690 		hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1691 					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1692 
1693 	dlist_foreach(iter, &txn->tuplecids)
1694 	{
1695 		ReorderBufferTupleCidKey key;
1696 		ReorderBufferTupleCidEnt *ent;
1697 		bool		found;
1698 		ReorderBufferChange *change;
1699 
1700 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1701 
1702 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1703 
1704 		/* be careful about padding */
1705 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1706 
1707 		key.relnode = change->data.tuplecid.node;
1708 
1709 		ItemPointerCopy(&change->data.tuplecid.tid,
1710 						&key.tid);
1711 
1712 		ent = (ReorderBufferTupleCidEnt *)
1713 			hash_search(txn->tuplecid_hash,
1714 						(void *) &key,
1715 						HASH_ENTER,
1716 						&found);
1717 		if (!found)
1718 		{
1719 			ent->cmin = change->data.tuplecid.cmin;
1720 			ent->cmax = change->data.tuplecid.cmax;
1721 			ent->combocid = change->data.tuplecid.combocid;
1722 		}
1723 		else
1724 		{
1725 			/*
1726 			 * Maybe we already saw this tuple before in this transaction, but
1727 			 * if so it must have the same cmin.
1728 			 */
1729 			Assert(ent->cmin == change->data.tuplecid.cmin);
1730 
1731 			/*
1732 			 * cmax may be initially invalid, but once set it can only grow,
1733 			 * and never become invalid again.
1734 			 */
1735 			Assert((ent->cmax == InvalidCommandId) ||
1736 				   ((change->data.tuplecid.cmax != InvalidCommandId) &&
1737 					(change->data.tuplecid.cmax > ent->cmax)));
1738 			ent->cmax = change->data.tuplecid.cmax;
1739 		}
1740 	}
1741 }
1742 
1743 /*
1744  * Copy a provided snapshot so we can modify it privately. This is needed so
1745  * that catalog modifying transactions can look into intermediate catalog
1746  * states.
1747  */
1748 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1749 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1750 					  ReorderBufferTXN *txn, CommandId cid)
1751 {
1752 	Snapshot	snap;
1753 	dlist_iter	iter;
1754 	int			i = 0;
1755 	Size		size;
1756 
1757 	size = sizeof(SnapshotData) +
1758 		sizeof(TransactionId) * orig_snap->xcnt +
1759 		sizeof(TransactionId) * (txn->nsubtxns + 1);
1760 
1761 	snap = MemoryContextAllocZero(rb->context, size);
1762 	memcpy(snap, orig_snap, sizeof(SnapshotData));
1763 
1764 	snap->copied = true;
1765 	snap->active_count = 1;		/* mark as active so nobody frees it */
1766 	snap->regd_count = 0;
1767 	snap->xip = (TransactionId *) (snap + 1);
1768 
1769 	memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1770 
1771 	/*
1772 	 * snap->subxip contains all txids that belong to our transaction which we
1773 	 * need to check via cmin/cmax. That's why we store the toplevel
1774 	 * transaction in there as well.
1775 	 */
1776 	snap->subxip = snap->xip + snap->xcnt;
1777 	snap->subxip[i++] = txn->xid;
1778 
1779 	/*
1780 	 * subxcnt isn't decreased when subtransactions abort, so count manually.
1781 	 * Since it's an upper boundary it is safe to use it for the allocation
1782 	 * above.
1783 	 */
1784 	snap->subxcnt = 1;
1785 
1786 	dlist_foreach(iter, &txn->subtxns)
1787 	{
1788 		ReorderBufferTXN *sub_txn;
1789 
1790 		sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1791 		snap->subxip[i++] = sub_txn->xid;
1792 		snap->subxcnt++;
1793 	}
1794 
1795 	/* sort so we can bsearch() later */
1796 	qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1797 
1798 	/* store the specified current CommandId */
1799 	snap->curcid = cid;
1800 
1801 	return snap;
1802 }
1803 
1804 /*
1805  * Free a previously ReorderBufferCopySnap'ed snapshot
1806  */
1807 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1808 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1809 {
1810 	if (snap->copied)
1811 		pfree(snap);
1812 	else
1813 		SnapBuildSnapDecRefcount(snap);
1814 }
1815 
1816 /*
1817  * If the transaction was (partially) streamed, we need to prepare or commit
1818  * it in a 'streamed' way.  That is, we first stream the remaining part of the
1819  * transaction, and then invoke stream_prepare or stream_commit message as per
1820  * the case.
1821  */
1822 static void
ReorderBufferStreamCommit(ReorderBuffer * rb,ReorderBufferTXN * txn)1823 ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1824 {
1825 	/* we should only call this for previously streamed transactions */
1826 	Assert(rbtxn_is_streamed(txn));
1827 
1828 	ReorderBufferStreamTXN(rb, txn);
1829 
1830 	if (rbtxn_prepared(txn))
1831 	{
1832 		/*
1833 		 * Note, we send stream prepare even if a concurrent abort is
1834 		 * detected. See DecodePrepare for more information.
1835 		 */
1836 		rb->stream_prepare(rb, txn, txn->final_lsn);
1837 
1838 		/*
1839 		 * This is a PREPARED transaction, part of a two-phase commit. The
1840 		 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1841 		 * just truncate txn by removing changes and tuple_cids.
1842 		 */
1843 		ReorderBufferTruncateTXN(rb, txn, true);
1844 		/* Reset the CheckXidAlive */
1845 		CheckXidAlive = InvalidTransactionId;
1846 	}
1847 	else
1848 	{
1849 		rb->stream_commit(rb, txn, txn->final_lsn);
1850 		ReorderBufferCleanupTXN(rb, txn);
1851 	}
1852 }
1853 
1854 /*
1855  * Set xid to detect concurrent aborts.
1856  *
1857  * While streaming an in-progress transaction or decoding a prepared
1858  * transaction there is a possibility that the (sub)transaction might get
1859  * aborted concurrently.  In such case if the (sub)transaction has catalog
1860  * update then we might decode the tuple using wrong catalog version.  For
1861  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0).  Now,
1862  * the transaction 501 updates the catalog tuple and after that we will have
1863  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0).  Now, if 501 is
1864  * aborted and some other transaction say 502 updates the same catalog tuple
1865  * then the first tuple will be changed to (xmin: 500, xmax: 502).  So, the
1866  * problem is that when we try to decode the tuple inserted/updated in 501
1867  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1868  * xmax: 502) as visible because it will consider that the tuple is deleted by
1869  * xid 502 which is not visible to our snapshot.  And when we will try to
1870  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1871  * So, it is necessary to detect concurrent aborts to allow streaming of
1872  * in-progress transactions or decoding of prepared  transactions.
1873  *
1874  * For detecting the concurrent abort we set CheckXidAlive to the current
1875  * (sub)transaction's xid for which this change belongs to.  And, during
1876  * catalog scan we can check the status of the xid and if it is aborted we will
1877  * report a specific error so that we can stop streaming current transaction
1878  * and discard the already streamed changes on such an error.  We might have
1879  * already streamed some of the changes for the aborted (sub)transaction, but
1880  * that is fine because when we decode the abort we will stream abort message
1881  * to truncate the changes in the subscriber. Similarly, for prepared
1882  * transactions, we stop decoding if concurrent abort is detected and then
1883  * rollback the changes when rollback prepared is encountered. See
1884  * DecodePrepare.
1885  */
1886 static inline void
SetupCheckXidLive(TransactionId xid)1887 SetupCheckXidLive(TransactionId xid)
1888 {
1889 	/*
1890 	 * If the input transaction id is already set as a CheckXidAlive then
1891 	 * nothing to do.
1892 	 */
1893 	if (TransactionIdEquals(CheckXidAlive, xid))
1894 		return;
1895 
1896 	/*
1897 	 * setup CheckXidAlive if it's not committed yet.  We don't check if the
1898 	 * xid is aborted.  That will happen during catalog access.
1899 	 */
1900 	if (!TransactionIdDidCommit(xid))
1901 		CheckXidAlive = xid;
1902 	else
1903 		CheckXidAlive = InvalidTransactionId;
1904 }
1905 
1906 /*
1907  * Helper function for ReorderBufferProcessTXN for applying change.
1908  */
1909 static inline void
ReorderBufferApplyChange(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change,bool streaming)1910 ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
1911 						 Relation relation, ReorderBufferChange *change,
1912 						 bool streaming)
1913 {
1914 	if (streaming)
1915 		rb->stream_change(rb, txn, relation, change);
1916 	else
1917 		rb->apply_change(rb, txn, relation, change);
1918 }
1919 
1920 /*
1921  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1922  */
1923 static inline void
ReorderBufferApplyTruncate(ReorderBuffer * rb,ReorderBufferTXN * txn,int nrelations,Relation * relations,ReorderBufferChange * change,bool streaming)1924 ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
1925 						   int nrelations, Relation *relations,
1926 						   ReorderBufferChange *change, bool streaming)
1927 {
1928 	if (streaming)
1929 		rb->stream_truncate(rb, txn, nrelations, relations, change);
1930 	else
1931 		rb->apply_truncate(rb, txn, nrelations, relations, change);
1932 }
1933 
1934 /*
1935  * Helper function for ReorderBufferProcessTXN for applying the message.
1936  */
1937 static inline void
ReorderBufferApplyMessage(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferChange * change,bool streaming)1938 ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
1939 						  ReorderBufferChange *change, bool streaming)
1940 {
1941 	if (streaming)
1942 		rb->stream_message(rb, txn, change->lsn, true,
1943 						   change->data.msg.prefix,
1944 						   change->data.msg.message_size,
1945 						   change->data.msg.message);
1946 	else
1947 		rb->message(rb, txn, change->lsn, true,
1948 					change->data.msg.prefix,
1949 					change->data.msg.message_size,
1950 					change->data.msg.message);
1951 }
1952 
1953 /*
1954  * Function to store the command id and snapshot at the end of the current
1955  * stream so that we can reuse the same while sending the next stream.
1956  */
1957 static inline void
ReorderBufferSaveTXNSnapshot(ReorderBuffer * rb,ReorderBufferTXN * txn,Snapshot snapshot_now,CommandId command_id)1958 ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
1959 							 Snapshot snapshot_now, CommandId command_id)
1960 {
1961 	txn->command_id = command_id;
1962 
1963 	/* Avoid copying if it's already copied. */
1964 	if (snapshot_now->copied)
1965 		txn->snapshot_now = snapshot_now;
1966 	else
1967 		txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1968 												  txn, command_id);
1969 }
1970 
1971 /*
1972  * Helper function for ReorderBufferProcessTXN to handle the concurrent
1973  * abort of the streaming transaction.  This resets the TXN such that it
1974  * can be used to stream the remaining data of transaction being processed.
1975  * This can happen when the subtransaction is aborted and we still want to
1976  * continue processing the main or other subtransactions data.
1977  */
1978 static void
ReorderBufferResetTXN(ReorderBuffer * rb,ReorderBufferTXN * txn,Snapshot snapshot_now,CommandId command_id,XLogRecPtr last_lsn,ReorderBufferChange * specinsert)1979 ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
1980 					  Snapshot snapshot_now,
1981 					  CommandId command_id,
1982 					  XLogRecPtr last_lsn,
1983 					  ReorderBufferChange *specinsert)
1984 {
1985 	/* Discard the changes that we just streamed */
1986 	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
1987 
1988 	/* Free all resources allocated for toast reconstruction */
1989 	ReorderBufferToastReset(rb, txn);
1990 
1991 	/* Return the spec insert change if it is not NULL */
1992 	if (specinsert != NULL)
1993 	{
1994 		ReorderBufferReturnChange(rb, specinsert, true);
1995 		specinsert = NULL;
1996 	}
1997 
1998 	/*
1999 	 * For the streaming case, stop the stream and remember the command ID and
2000 	 * snapshot for the streaming run.
2001 	 */
2002 	if (rbtxn_is_streamed(txn))
2003 	{
2004 		rb->stream_stop(rb, txn, last_lsn);
2005 		ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2006 	}
2007 }
2008 
2009 /*
2010  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2011  *
2012  * Send data of a transaction (and its subtransactions) to the
2013  * output plugin. We iterate over the top and subtransactions (using a k-way
2014  * merge) and replay the changes in lsn order.
2015  *
2016  * If streaming is true then data will be sent using stream API.
2017  *
2018  * Note: "volatile" markers on some parameters are to avoid trouble with
2019  * PG_TRY inside the function.
2020  */
2021 static void
ReorderBufferProcessTXN(ReorderBuffer * rb,ReorderBufferTXN * txn,XLogRecPtr commit_lsn,volatile Snapshot snapshot_now,volatile CommandId command_id,bool streaming)2022 ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2023 						XLogRecPtr commit_lsn,
2024 						volatile Snapshot snapshot_now,
2025 						volatile CommandId command_id,
2026 						bool streaming)
2027 {
2028 	bool		using_subtxn;
2029 	MemoryContext ccxt = CurrentMemoryContext;
2030 	ReorderBufferIterTXNState *volatile iterstate = NULL;
2031 	volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2032 	ReorderBufferChange *volatile specinsert = NULL;
2033 	volatile bool stream_started = false;
2034 	ReorderBufferTXN *volatile curtxn = NULL;
2035 
2036 	/* build data to be able to lookup the CommandIds of catalog tuples */
2037 	ReorderBufferBuildTupleCidHash(rb, txn);
2038 
2039 	/* setup the initial snapshot */
2040 	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2041 
2042 	/*
2043 	 * Decoding needs access to syscaches et al., which in turn use
2044 	 * heavyweight locks and such. Thus we need to have enough state around to
2045 	 * keep track of those.  The easiest way is to simply use a transaction
2046 	 * internally.  That also allows us to easily enforce that nothing writes
2047 	 * to the database by checking for xid assignments.
2048 	 *
2049 	 * When we're called via the SQL SRF there's already a transaction
2050 	 * started, so start an explicit subtransaction there.
2051 	 */
2052 	using_subtxn = IsTransactionOrTransactionBlock();
2053 
2054 	PG_TRY();
2055 	{
2056 		ReorderBufferChange *change;
2057 
2058 		if (using_subtxn)
2059 			BeginInternalSubTransaction(streaming ? "stream" : "replay");
2060 		else
2061 			StartTransactionCommand();
2062 
2063 		/*
2064 		 * We only need to send begin/begin-prepare for non-streamed
2065 		 * transactions.
2066 		 */
2067 		if (!streaming)
2068 		{
2069 			if (rbtxn_prepared(txn))
2070 				rb->begin_prepare(rb, txn);
2071 			else
2072 				rb->begin(rb, txn);
2073 		}
2074 
2075 		ReorderBufferIterTXNInit(rb, txn, &iterstate);
2076 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2077 		{
2078 			Relation	relation = NULL;
2079 			Oid			reloid;
2080 
2081 			/*
2082 			 * We can't call start stream callback before processing first
2083 			 * change.
2084 			 */
2085 			if (prev_lsn == InvalidXLogRecPtr)
2086 			{
2087 				if (streaming)
2088 				{
2089 					txn->origin_id = change->origin_id;
2090 					rb->stream_start(rb, txn, change->lsn);
2091 					stream_started = true;
2092 				}
2093 			}
2094 
2095 			/*
2096 			 * Enforce correct ordering of changes, merged from multiple
2097 			 * subtransactions. The changes may have the same LSN due to
2098 			 * MULTI_INSERT xlog records.
2099 			 */
2100 			Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2101 
2102 			prev_lsn = change->lsn;
2103 
2104 			/*
2105 			 * Set the current xid to detect concurrent aborts. This is
2106 			 * required for the cases when we decode the changes before the
2107 			 * COMMIT record is processed.
2108 			 */
2109 			if (streaming || rbtxn_prepared(change->txn))
2110 			{
2111 				curtxn = change->txn;
2112 				SetupCheckXidLive(curtxn->xid);
2113 			}
2114 
2115 			switch (change->action)
2116 			{
2117 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2118 
2119 					/*
2120 					 * Confirmation for speculative insertion arrived. Simply
2121 					 * use as a normal record. It'll be cleaned up at the end
2122 					 * of INSERT processing.
2123 					 */
2124 					if (specinsert == NULL)
2125 						elog(ERROR, "invalid ordering of speculative insertion changes");
2126 					Assert(specinsert->data.tp.oldtuple == NULL);
2127 					change = specinsert;
2128 					change->action = REORDER_BUFFER_CHANGE_INSERT;
2129 
2130 					/* intentionally fall through */
2131 				case REORDER_BUFFER_CHANGE_INSERT:
2132 				case REORDER_BUFFER_CHANGE_UPDATE:
2133 				case REORDER_BUFFER_CHANGE_DELETE:
2134 					Assert(snapshot_now);
2135 
2136 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2137 												change->data.tp.relnode.relNode);
2138 
2139 					/*
2140 					 * Mapped catalog tuple without data, emitted while
2141 					 * catalog table was in the process of being rewritten. We
2142 					 * can fail to look up the relfilenode, because the
2143 					 * relmapper has no "historic" view, in contrast to the
2144 					 * normal catalog during decoding. Thus repeated rewrites
2145 					 * can cause a lookup failure. That's OK because we do not
2146 					 * decode catalog changes anyway. Normally such tuples
2147 					 * would be skipped over below, but we can't identify
2148 					 * whether the table should be logically logged without
2149 					 * mapping the relfilenode to the oid.
2150 					 */
2151 					if (reloid == InvalidOid &&
2152 						change->data.tp.newtuple == NULL &&
2153 						change->data.tp.oldtuple == NULL)
2154 						goto change_done;
2155 					else if (reloid == InvalidOid)
2156 						elog(ERROR, "could not map filenode \"%s\" to relation OID",
2157 							 relpathperm(change->data.tp.relnode,
2158 										 MAIN_FORKNUM));
2159 
2160 					relation = RelationIdGetRelation(reloid);
2161 
2162 					if (!RelationIsValid(relation))
2163 						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2164 							 reloid,
2165 							 relpathperm(change->data.tp.relnode,
2166 										 MAIN_FORKNUM));
2167 
2168 					if (!RelationIsLogicallyLogged(relation))
2169 						goto change_done;
2170 
2171 					/*
2172 					 * Ignore temporary heaps created during DDL unless the
2173 					 * plugin has asked for them.
2174 					 */
2175 					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2176 						goto change_done;
2177 
2178 					/*
2179 					 * For now ignore sequence changes entirely. Most of the
2180 					 * time they don't log changes using records we
2181 					 * understand, so it doesn't make sense to handle the few
2182 					 * cases we do.
2183 					 */
2184 					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2185 						goto change_done;
2186 
2187 					/* user-triggered change */
2188 					if (!IsToastRelation(relation))
2189 					{
2190 						ReorderBufferToastReplace(rb, txn, relation, change);
2191 						ReorderBufferApplyChange(rb, txn, relation, change,
2192 												 streaming);
2193 
2194 						/*
2195 						 * Only clear reassembled toast chunks if we're sure
2196 						 * they're not required anymore. The creator of the
2197 						 * tuple tells us.
2198 						 */
2199 						if (change->data.tp.clear_toast_afterwards)
2200 							ReorderBufferToastReset(rb, txn);
2201 					}
2202 					/* we're not interested in toast deletions */
2203 					else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2204 					{
2205 						/*
2206 						 * Need to reassemble the full toasted Datum in
2207 						 * memory, to ensure the chunks don't get reused till
2208 						 * we're done remove it from the list of this
2209 						 * transaction's changes. Otherwise it will get
2210 						 * freed/reused while restoring spooled data from
2211 						 * disk.
2212 						 */
2213 						Assert(change->data.tp.newtuple != NULL);
2214 
2215 						dlist_delete(&change->node);
2216 						ReorderBufferToastAppendChunk(rb, txn, relation,
2217 													  change);
2218 					}
2219 
2220 			change_done:
2221 
2222 					/*
2223 					 * If speculative insertion was confirmed, the record
2224 					 * isn't needed anymore.
2225 					 */
2226 					if (specinsert != NULL)
2227 					{
2228 						ReorderBufferReturnChange(rb, specinsert, true);
2229 						specinsert = NULL;
2230 					}
2231 
2232 					if (RelationIsValid(relation))
2233 					{
2234 						RelationClose(relation);
2235 						relation = NULL;
2236 					}
2237 					break;
2238 
2239 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2240 
2241 					/*
2242 					 * Speculative insertions are dealt with by delaying the
2243 					 * processing of the insert until the confirmation record
2244 					 * arrives. For that we simply unlink the record from the
2245 					 * chain, so it does not get freed/reused while restoring
2246 					 * spooled data from disk.
2247 					 *
2248 					 * This is safe in the face of concurrent catalog changes
2249 					 * because the relevant relation can't be changed between
2250 					 * speculative insertion and confirmation due to
2251 					 * CheckTableNotInUse() and locking.
2252 					 */
2253 
2254 					/* clear out a pending (and thus failed) speculation */
2255 					if (specinsert != NULL)
2256 					{
2257 						ReorderBufferReturnChange(rb, specinsert, true);
2258 						specinsert = NULL;
2259 					}
2260 
2261 					/* and memorize the pending insertion */
2262 					dlist_delete(&change->node);
2263 					specinsert = change;
2264 					break;
2265 
2266 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2267 
2268 					/*
2269 					 * Abort for speculative insertion arrived. So cleanup the
2270 					 * specinsert tuple and toast hash.
2271 					 *
2272 					 * Note that we get the spec abort change for each toast
2273 					 * entry but we need to perform the cleanup only the first
2274 					 * time we get it for the main table.
2275 					 */
2276 					if (specinsert != NULL)
2277 					{
2278 						/*
2279 						 * We must clean the toast hash before processing a
2280 						 * completely new tuple to avoid confusion about the
2281 						 * previous tuple's toast chunks.
2282 						 */
2283 						Assert(change->data.tp.clear_toast_afterwards);
2284 						ReorderBufferToastReset(rb, txn);
2285 
2286 						/* We don't need this record anymore. */
2287 						ReorderBufferReturnChange(rb, specinsert, true);
2288 						specinsert = NULL;
2289 					}
2290 					break;
2291 
2292 				case REORDER_BUFFER_CHANGE_TRUNCATE:
2293 					{
2294 						int			i;
2295 						int			nrelids = change->data.truncate.nrelids;
2296 						int			nrelations = 0;
2297 						Relation   *relations;
2298 
2299 						relations = palloc0(nrelids * sizeof(Relation));
2300 						for (i = 0; i < nrelids; i++)
2301 						{
2302 							Oid			relid = change->data.truncate.relids[i];
2303 							Relation	relation;
2304 
2305 							relation = RelationIdGetRelation(relid);
2306 
2307 							if (!RelationIsValid(relation))
2308 								elog(ERROR, "could not open relation with OID %u", relid);
2309 
2310 							if (!RelationIsLogicallyLogged(relation))
2311 								continue;
2312 
2313 							relations[nrelations++] = relation;
2314 						}
2315 
2316 						/* Apply the truncate. */
2317 						ReorderBufferApplyTruncate(rb, txn, nrelations,
2318 												   relations, change,
2319 												   streaming);
2320 
2321 						for (i = 0; i < nrelations; i++)
2322 							RelationClose(relations[i]);
2323 
2324 						break;
2325 					}
2326 
2327 				case REORDER_BUFFER_CHANGE_MESSAGE:
2328 					ReorderBufferApplyMessage(rb, txn, change, streaming);
2329 					break;
2330 
2331 				case REORDER_BUFFER_CHANGE_INVALIDATION:
2332 					/* Execute the invalidation messages locally */
2333 					ReorderBufferExecuteInvalidations(
2334 													  change->data.inval.ninvalidations,
2335 													  change->data.inval.invalidations);
2336 					break;
2337 
2338 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2339 					/* get rid of the old */
2340 					TeardownHistoricSnapshot(false);
2341 
2342 					if (snapshot_now->copied)
2343 					{
2344 						ReorderBufferFreeSnap(rb, snapshot_now);
2345 						snapshot_now =
2346 							ReorderBufferCopySnap(rb, change->data.snapshot,
2347 												  txn, command_id);
2348 					}
2349 
2350 					/*
2351 					 * Restored from disk, need to be careful not to double
2352 					 * free. We could introduce refcounting for that, but for
2353 					 * now this seems infrequent enough not to care.
2354 					 */
2355 					else if (change->data.snapshot->copied)
2356 					{
2357 						snapshot_now =
2358 							ReorderBufferCopySnap(rb, change->data.snapshot,
2359 												  txn, command_id);
2360 					}
2361 					else
2362 					{
2363 						snapshot_now = change->data.snapshot;
2364 					}
2365 
2366 					/* and continue with the new one */
2367 					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2368 					break;
2369 
2370 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2371 					Assert(change->data.command_id != InvalidCommandId);
2372 
2373 					if (command_id < change->data.command_id)
2374 					{
2375 						command_id = change->data.command_id;
2376 
2377 						if (!snapshot_now->copied)
2378 						{
2379 							/* we don't use the global one anymore */
2380 							snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2381 																 txn, command_id);
2382 						}
2383 
2384 						snapshot_now->curcid = command_id;
2385 
2386 						TeardownHistoricSnapshot(false);
2387 						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2388 					}
2389 
2390 					break;
2391 
2392 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2393 					elog(ERROR, "tuplecid value in changequeue");
2394 					break;
2395 			}
2396 		}
2397 
2398 		/* speculative insertion record must be freed by now */
2399 		Assert(!specinsert);
2400 
2401 		/* clean up the iterator */
2402 		ReorderBufferIterTXNFinish(rb, iterstate);
2403 		iterstate = NULL;
2404 
2405 		/*
2406 		 * Update total transaction count and total bytes processed by the
2407 		 * transaction and its subtransactions. Ensure to not count the
2408 		 * streamed transaction multiple times.
2409 		 *
2410 		 * Note that the statistics computation has to be done after
2411 		 * ReorderBufferIterTXNFinish as it releases the serialized change
2412 		 * which we have already accounted in ReorderBufferIterTXNNext.
2413 		 */
2414 		if (!rbtxn_is_streamed(txn))
2415 			rb->totalTxns++;
2416 
2417 		rb->totalBytes += txn->total_size;
2418 
2419 		/*
2420 		 * Done with current changes, send the last message for this set of
2421 		 * changes depending upon streaming mode.
2422 		 */
2423 		if (streaming)
2424 		{
2425 			if (stream_started)
2426 			{
2427 				rb->stream_stop(rb, txn, prev_lsn);
2428 				stream_started = false;
2429 			}
2430 		}
2431 		else
2432 		{
2433 			/*
2434 			 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2435 			 * regular ones).
2436 			 */
2437 			if (rbtxn_prepared(txn))
2438 				rb->prepare(rb, txn, commit_lsn);
2439 			else
2440 				rb->commit(rb, txn, commit_lsn);
2441 		}
2442 
2443 		/* this is just a sanity check against bad output plugin behaviour */
2444 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
2445 			elog(ERROR, "output plugin used XID %u",
2446 				 GetCurrentTransactionId());
2447 
2448 		/*
2449 		 * Remember the command ID and snapshot for the next set of changes in
2450 		 * streaming mode.
2451 		 */
2452 		if (streaming)
2453 			ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2454 		else if (snapshot_now->copied)
2455 			ReorderBufferFreeSnap(rb, snapshot_now);
2456 
2457 		/* cleanup */
2458 		TeardownHistoricSnapshot(false);
2459 
2460 		/*
2461 		 * Aborting the current (sub-)transaction as a whole has the right
2462 		 * semantics. We want all locks acquired in here to be released, not
2463 		 * reassigned to the parent and we do not want any database access
2464 		 * have persistent effects.
2465 		 */
2466 		AbortCurrentTransaction();
2467 
2468 		/* make sure there's no cache pollution */
2469 		ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2470 
2471 		if (using_subtxn)
2472 			RollbackAndReleaseCurrentSubTransaction();
2473 
2474 		/*
2475 		 * We are here due to one of the four reasons: 1. Decoding an
2476 		 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2477 		 * prepared txn that was (partially) streamed. 4. Decoding a committed
2478 		 * txn.
2479 		 *
2480 		 * For 1, we allow truncation of txn data by removing the changes
2481 		 * already streamed but still keeping other things like invalidations,
2482 		 * snapshot, and tuplecids. For 2 and 3, we indicate
2483 		 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2484 		 * data as the entire transaction has been decoded except for commit.
2485 		 * For 4, as the entire txn has been decoded, we can fully clean up
2486 		 * the TXN reorder buffer.
2487 		 */
2488 		if (streaming || rbtxn_prepared(txn))
2489 		{
2490 			ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
2491 			/* Reset the CheckXidAlive */
2492 			CheckXidAlive = InvalidTransactionId;
2493 		}
2494 		else
2495 			ReorderBufferCleanupTXN(rb, txn);
2496 	}
2497 	PG_CATCH();
2498 	{
2499 		MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2500 		ErrorData  *errdata = CopyErrorData();
2501 
2502 		/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2503 		if (iterstate)
2504 			ReorderBufferIterTXNFinish(rb, iterstate);
2505 
2506 		TeardownHistoricSnapshot(true);
2507 
2508 		/*
2509 		 * Force cache invalidation to happen outside of a valid transaction
2510 		 * to prevent catalog access as we just caught an error.
2511 		 */
2512 		AbortCurrentTransaction();
2513 
2514 		/* make sure there's no cache pollution */
2515 		ReorderBufferExecuteInvalidations(txn->ninvalidations,
2516 										  txn->invalidations);
2517 
2518 		if (using_subtxn)
2519 			RollbackAndReleaseCurrentSubTransaction();
2520 
2521 		/*
2522 		 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2523 		 * abort of the (sub)transaction we are streaming or preparing. We
2524 		 * need to do the cleanup and return gracefully on this error, see
2525 		 * SetupCheckXidLive.
2526 		 *
2527 		 * This error code can be thrown by one of the callbacks we call
2528 		 * during decoding so we need to ensure that we return gracefully only
2529 		 * when we are sending the data in streaming mode and the streaming is
2530 		 * not finished yet or when we are sending the data out on a PREPARE
2531 		 * during a two-phase commit.
2532 		 */
2533 		if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2534 			(stream_started || rbtxn_prepared(txn)))
2535 		{
2536 			/* curtxn must be set for streaming or prepared transactions */
2537 			Assert(curtxn);
2538 
2539 			/* Cleanup the temporary error state. */
2540 			FlushErrorState();
2541 			FreeErrorData(errdata);
2542 			errdata = NULL;
2543 			curtxn->concurrent_abort = true;
2544 
2545 			/* Reset the TXN so that it is allowed to stream remaining data. */
2546 			ReorderBufferResetTXN(rb, txn, snapshot_now,
2547 								  command_id, prev_lsn,
2548 								  specinsert);
2549 		}
2550 		else
2551 		{
2552 			ReorderBufferCleanupTXN(rb, txn);
2553 			MemoryContextSwitchTo(ecxt);
2554 			PG_RE_THROW();
2555 		}
2556 	}
2557 	PG_END_TRY();
2558 }
2559 
2560 /*
2561  * Perform the replay of a transaction and its non-aborted subtransactions.
2562  *
2563  * Subtransactions previously have to be processed by
2564  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2565  * transaction with ReorderBufferAssignChild.
2566  *
2567  * This interface is called once a prepare or toplevel commit is read for both
2568  * streamed as well as non-streamed transactions.
2569  */
2570 static void
ReorderBufferReplay(ReorderBufferTXN * txn,ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)2571 ReorderBufferReplay(ReorderBufferTXN *txn,
2572 					ReorderBuffer *rb, TransactionId xid,
2573 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2574 					TimestampTz commit_time,
2575 					RepOriginId origin_id, XLogRecPtr origin_lsn)
2576 {
2577 	Snapshot	snapshot_now;
2578 	CommandId	command_id = FirstCommandId;
2579 
2580 	txn->final_lsn = commit_lsn;
2581 	txn->end_lsn = end_lsn;
2582 	txn->commit_time = commit_time;
2583 	txn->origin_id = origin_id;
2584 	txn->origin_lsn = origin_lsn;
2585 
2586 	/*
2587 	 * If the transaction was (partially) streamed, we need to commit it in a
2588 	 * 'streamed' way. That is, we first stream the remaining part of the
2589 	 * transaction, and then invoke stream_commit message.
2590 	 *
2591 	 * Called after everything (origin ID, LSN, ...) is stored in the
2592 	 * transaction to avoid passing that information directly.
2593 	 */
2594 	if (rbtxn_is_streamed(txn))
2595 	{
2596 		ReorderBufferStreamCommit(rb, txn);
2597 		return;
2598 	}
2599 
2600 	/*
2601 	 * If this transaction has no snapshot, it didn't make any changes to the
2602 	 * database, so there's nothing to decode.  Note that
2603 	 * ReorderBufferCommitChild will have transferred any snapshots from
2604 	 * subtransactions if there were any.
2605 	 */
2606 	if (txn->base_snapshot == NULL)
2607 	{
2608 		Assert(txn->ninvalidations == 0);
2609 
2610 		/*
2611 		 * Removing this txn before a commit might result in the computation
2612 		 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2613 		 */
2614 		if (!rbtxn_prepared(txn))
2615 			ReorderBufferCleanupTXN(rb, txn);
2616 		return;
2617 	}
2618 
2619 	snapshot_now = txn->base_snapshot;
2620 
2621 	/* Process and send the changes to output plugin. */
2622 	ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2623 							command_id, false);
2624 }
2625 
2626 /*
2627  * Commit a transaction.
2628  *
2629  * See comments for ReorderBufferReplay().
2630  */
2631 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)2632 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
2633 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2634 					TimestampTz commit_time,
2635 					RepOriginId origin_id, XLogRecPtr origin_lsn)
2636 {
2637 	ReorderBufferTXN *txn;
2638 
2639 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2640 								false);
2641 
2642 	/* unknown transaction, nothing to replay */
2643 	if (txn == NULL)
2644 		return;
2645 
2646 	ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2647 						origin_id, origin_lsn);
2648 }
2649 
2650 /*
2651  * Record the prepare information for a transaction.
2652  */
2653 bool
ReorderBufferRememberPrepareInfo(ReorderBuffer * rb,TransactionId xid,XLogRecPtr prepare_lsn,XLogRecPtr end_lsn,TimestampTz prepare_time,RepOriginId origin_id,XLogRecPtr origin_lsn)2654 ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
2655 								 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2656 								 TimestampTz prepare_time,
2657 								 RepOriginId origin_id, XLogRecPtr origin_lsn)
2658 {
2659 	ReorderBufferTXN *txn;
2660 
2661 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2662 
2663 	/* unknown transaction, nothing to do */
2664 	if (txn == NULL)
2665 		return false;
2666 
2667 	/*
2668 	 * Remember the prepare information to be later used by commit prepared in
2669 	 * case we skip doing prepare.
2670 	 */
2671 	txn->final_lsn = prepare_lsn;
2672 	txn->end_lsn = end_lsn;
2673 	txn->commit_time = prepare_time;
2674 	txn->origin_id = origin_id;
2675 	txn->origin_lsn = origin_lsn;
2676 
2677 	return true;
2678 }
2679 
2680 /* Remember that we have skipped prepare */
2681 void
ReorderBufferSkipPrepare(ReorderBuffer * rb,TransactionId xid)2682 ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
2683 {
2684 	ReorderBufferTXN *txn;
2685 
2686 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2687 
2688 	/* unknown transaction, nothing to do */
2689 	if (txn == NULL)
2690 		return;
2691 
2692 	txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
2693 }
2694 
2695 /*
2696  * Prepare a two-phase transaction.
2697  *
2698  * See comments for ReorderBufferReplay().
2699  */
2700 void
ReorderBufferPrepare(ReorderBuffer * rb,TransactionId xid,char * gid)2701 ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
2702 					 char *gid)
2703 {
2704 	ReorderBufferTXN *txn;
2705 
2706 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2707 								false);
2708 
2709 	/* unknown transaction, nothing to replay */
2710 	if (txn == NULL)
2711 		return;
2712 
2713 	txn->txn_flags |= RBTXN_PREPARE;
2714 	txn->gid = pstrdup(gid);
2715 
2716 	/* The prepare info must have been updated in txn by now. */
2717 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2718 
2719 	ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2720 						txn->commit_time, txn->origin_id, txn->origin_lsn);
2721 
2722 	/*
2723 	 * We send the prepare for the concurrently aborted xacts so that later
2724 	 * when rollback prepared is decoded and sent, the downstream should be
2725 	 * able to rollback such a xact. See comments atop DecodePrepare.
2726 	 *
2727 	 * Note, for the concurrent_abort + streaming case a stream_prepare was
2728 	 * already sent within the ReorderBufferReplay call above.
2729 	 */
2730 	if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2731 		rb->prepare(rb, txn, txn->final_lsn);
2732 }
2733 
2734 /*
2735  * This is used to handle COMMIT/ROLLBACK PREPARED.
2736  */
2737 void
ReorderBufferFinishPrepared(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,XLogRecPtr initial_consistent_point,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn,char * gid,bool is_commit)2738 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
2739 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2740 							XLogRecPtr initial_consistent_point,
2741 							TimestampTz commit_time, RepOriginId origin_id,
2742 							XLogRecPtr origin_lsn, char *gid, bool is_commit)
2743 {
2744 	ReorderBufferTXN *txn;
2745 	XLogRecPtr	prepare_end_lsn;
2746 	TimestampTz prepare_time;
2747 
2748 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2749 
2750 	/* unknown transaction, nothing to do */
2751 	if (txn == NULL)
2752 		return;
2753 
2754 	/*
2755 	 * By this time the txn has the prepare record information, remember it to
2756 	 * be later used for rollback.
2757 	 */
2758 	prepare_end_lsn = txn->end_lsn;
2759 	prepare_time = txn->commit_time;
2760 
2761 	/* add the gid in the txn */
2762 	txn->gid = pstrdup(gid);
2763 
2764 	/*
2765 	 * It is possible that this transaction is not decoded at prepare time
2766 	 * either because by that time we didn't have a consistent snapshot or it
2767 	 * was decoded earlier but we have restarted. We only need to send the
2768 	 * prepare if it was not decoded earlier. We don't need to decode the xact
2769 	 * for aborts if it is not done already.
2770 	 */
2771 	if ((txn->final_lsn < initial_consistent_point) && is_commit)
2772 	{
2773 		txn->txn_flags |= RBTXN_PREPARE;
2774 
2775 		/*
2776 		 * The prepare info must have been updated in txn even if we skip
2777 		 * prepare.
2778 		 */
2779 		Assert(txn->final_lsn != InvalidXLogRecPtr);
2780 
2781 		/*
2782 		 * By this time the txn has the prepare record information and it is
2783 		 * important to use that so that downstream gets the accurate
2784 		 * information. If instead, we have passed commit information here
2785 		 * then downstream can behave as it has already replayed commit
2786 		 * prepared after the restart.
2787 		 */
2788 		ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2789 							txn->commit_time, txn->origin_id, txn->origin_lsn);
2790 	}
2791 
2792 	txn->final_lsn = commit_lsn;
2793 	txn->end_lsn = end_lsn;
2794 	txn->commit_time = commit_time;
2795 	txn->origin_id = origin_id;
2796 	txn->origin_lsn = origin_lsn;
2797 
2798 	if (is_commit)
2799 		rb->commit_prepared(rb, txn, commit_lsn);
2800 	else
2801 		rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2802 
2803 	/* cleanup: make sure there's no cache pollution */
2804 	ReorderBufferExecuteInvalidations(txn->ninvalidations,
2805 									  txn->invalidations);
2806 	ReorderBufferCleanupTXN(rb, txn);
2807 }
2808 
2809 /*
2810  * Abort a transaction that possibly has previous changes. Needs to be first
2811  * called for subtransactions and then for the toplevel xid.
2812  *
2813  * NB: Transactions handled here have to have actively aborted (i.e. have
2814  * produced an abort record). Implicitly aborted transactions are handled via
2815  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2816  * which have committed are handled in ReorderBufferForget().
2817  *
2818  * This function purges this transaction and its contents from memory and
2819  * disk.
2820  */
2821 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2822 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2823 {
2824 	ReorderBufferTXN *txn;
2825 
2826 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2827 								false);
2828 
2829 	/* unknown, nothing to remove */
2830 	if (txn == NULL)
2831 		return;
2832 
2833 	/* For streamed transactions notify the remote node about the abort. */
2834 	if (rbtxn_is_streamed(txn))
2835 	{
2836 		rb->stream_abort(rb, txn, lsn);
2837 
2838 		/*
2839 		 * We might have decoded changes for this transaction that could load
2840 		 * the cache as per the current transaction's view (consider DDL's
2841 		 * happened in this transaction). We don't want the decoding of future
2842 		 * transactions to use those cache entries so execute invalidations.
2843 		 */
2844 		if (txn->ninvalidations > 0)
2845 			ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2846 											   txn->invalidations);
2847 	}
2848 
2849 	/* cosmetic... */
2850 	txn->final_lsn = lsn;
2851 
2852 	/* remove potential on-disk data, and deallocate */
2853 	ReorderBufferCleanupTXN(rb, txn);
2854 }
2855 
2856 /*
2857  * Abort all transactions that aren't actually running anymore because the
2858  * server restarted.
2859  *
2860  * NB: These really have to be transactions that have aborted due to a server
2861  * crash/immediate restart, as we don't deal with invalidations here.
2862  */
2863 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)2864 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
2865 {
2866 	dlist_mutable_iter it;
2867 
2868 	/*
2869 	 * Iterate through all (potential) toplevel TXNs and abort all that are
2870 	 * older than what possibly can be running. Once we've found the first
2871 	 * that is alive we stop, there might be some that acquired an xid earlier
2872 	 * but started writing later, but it's unlikely and they will be cleaned
2873 	 * up in a later call to this function.
2874 	 */
2875 	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
2876 	{
2877 		ReorderBufferTXN *txn;
2878 
2879 		txn = dlist_container(ReorderBufferTXN, node, it.cur);
2880 
2881 		if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2882 		{
2883 			elog(DEBUG2, "aborting old transaction %u", txn->xid);
2884 
2885 			/* remove potential on-disk data, and deallocate this tx */
2886 			ReorderBufferCleanupTXN(rb, txn);
2887 		}
2888 		else
2889 			return;
2890 	}
2891 }
2892 
2893 /*
2894  * Forget the contents of a transaction if we aren't interested in its
2895  * contents. Needs to be first called for subtransactions and then for the
2896  * toplevel xid.
2897  *
2898  * This is significantly different to ReorderBufferAbort() because
2899  * transactions that have committed need to be treated differently from aborted
2900  * ones since they may have modified the catalog.
2901  *
2902  * Note that this is only allowed to be called in the moment a transaction
2903  * commit has just been read, not earlier; otherwise later records referring
2904  * to this xid might re-create the transaction incompletely.
2905  */
2906 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2907 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2908 {
2909 	ReorderBufferTXN *txn;
2910 
2911 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2912 								false);
2913 
2914 	/* unknown, nothing to forget */
2915 	if (txn == NULL)
2916 		return;
2917 
2918 	/* For streamed transactions notify the remote node about the abort. */
2919 	if (rbtxn_is_streamed(txn))
2920 		rb->stream_abort(rb, txn, lsn);
2921 
2922 	/* cosmetic... */
2923 	txn->final_lsn = lsn;
2924 
2925 	/*
2926 	 * Process cache invalidation messages if there are any. Even if we're not
2927 	 * interested in the transaction's contents, it could have manipulated the
2928 	 * catalog and we need to update the caches according to that.
2929 	 */
2930 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2931 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2932 										   txn->invalidations);
2933 	else
2934 		Assert(txn->ninvalidations == 0);
2935 
2936 	/* remove potential on-disk data, and deallocate */
2937 	ReorderBufferCleanupTXN(rb, txn);
2938 }
2939 
2940 /*
2941  * Invalidate cache for those transactions that need to be skipped just in case
2942  * catalogs were manipulated as part of the transaction.
2943  *
2944  * Note that this is a special-purpose function for prepared transactions where
2945  * we don't want to clean up the TXN even when we decide to skip it. See
2946  * DecodePrepare.
2947  */
2948 void
ReorderBufferInvalidate(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2949 ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2950 {
2951 	ReorderBufferTXN *txn;
2952 
2953 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2954 								false);
2955 
2956 	/* unknown, nothing to do */
2957 	if (txn == NULL)
2958 		return;
2959 
2960 	/*
2961 	 * Process cache invalidation messages if there are any. Even if we're not
2962 	 * interested in the transaction's contents, it could have manipulated the
2963 	 * catalog and we need to update the caches according to that.
2964 	 */
2965 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2966 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2967 										   txn->invalidations);
2968 	else
2969 		Assert(txn->ninvalidations == 0);
2970 }
2971 
2972 
2973 /*
2974  * Execute invalidations happening outside the context of a decoded
2975  * transaction. That currently happens either for xid-less commits
2976  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2977  * transactions (via ReorderBufferForget()).
2978  */
2979 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)2980 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
2981 								   SharedInvalidationMessage *invalidations)
2982 {
2983 	bool		use_subtxn = IsTransactionOrTransactionBlock();
2984 	int			i;
2985 
2986 	if (use_subtxn)
2987 		BeginInternalSubTransaction("replay");
2988 
2989 	/*
2990 	 * Force invalidations to happen outside of a valid transaction - that way
2991 	 * entries will just be marked as invalid without accessing the catalog.
2992 	 * That's advantageous because we don't need to setup the full state
2993 	 * necessary for catalog access.
2994 	 */
2995 	if (use_subtxn)
2996 		AbortCurrentTransaction();
2997 
2998 	for (i = 0; i < ninvalidations; i++)
2999 		LocalExecuteInvalidationMessage(&invalidations[i]);
3000 
3001 	if (use_subtxn)
3002 		RollbackAndReleaseCurrentSubTransaction();
3003 }
3004 
3005 /*
3006  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3007  * least once for every xid in XLogRecord->xl_xid (other places in records
3008  * may, but do not have to be passed through here).
3009  *
3010  * Reorderbuffer keeps some datastructures about transactions in LSN order,
3011  * for efficiency. To do that it has to know about when transactions are seen
3012  * first in the WAL. As many types of records are not actually interesting for
3013  * logical decoding, they do not necessarily pass though here.
3014  */
3015 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)3016 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3017 {
3018 	/* many records won't have an xid assigned, centralize check here */
3019 	if (xid != InvalidTransactionId)
3020 		ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3021 }
3022 
3023 /*
3024  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3025  * because the previous snapshot doesn't describe the catalog correctly for
3026  * following rows.
3027  */
3028 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)3029 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
3030 						 XLogRecPtr lsn, Snapshot snap)
3031 {
3032 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
3033 
3034 	change->data.snapshot = snap;
3035 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
3036 
3037 	ReorderBufferQueueChange(rb, xid, lsn, change, false);
3038 }
3039 
3040 /*
3041  * Set up the transaction's base snapshot.
3042  *
3043  * If we know that xid is a subtransaction, set the base snapshot on the
3044  * top-level transaction instead.
3045  */
3046 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)3047 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
3048 							 XLogRecPtr lsn, Snapshot snap)
3049 {
3050 	ReorderBufferTXN *txn;
3051 	bool		is_new;
3052 
3053 	AssertArg(snap != NULL);
3054 
3055 	/*
3056 	 * Fetch the transaction to operate on.  If we know it's a subtransaction,
3057 	 * operate on its top-level transaction instead.
3058 	 */
3059 	txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3060 	if (rbtxn_is_known_subxact(txn))
3061 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3062 									NULL, InvalidXLogRecPtr, false);
3063 	Assert(txn->base_snapshot == NULL);
3064 
3065 	txn->base_snapshot = snap;
3066 	txn->base_snapshot_lsn = lsn;
3067 	dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
3068 
3069 	AssertTXNLsnOrder(rb);
3070 }
3071 
3072 /*
3073  * Access the catalog with this CommandId at this point in the changestream.
3074  *
3075  * May only be called for command ids > 1
3076  */
3077 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)3078 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
3079 							 XLogRecPtr lsn, CommandId cid)
3080 {
3081 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
3082 
3083 	change->data.command_id = cid;
3084 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
3085 
3086 	ReorderBufferQueueChange(rb, xid, lsn, change, false);
3087 }
3088 
3089 /*
3090  * Update memory counters to account for the new or removed change.
3091  *
3092  * We update two counters - in the reorder buffer, and in the transaction
3093  * containing the change. The reorder buffer counter allows us to quickly
3094  * decide if we reached the memory limit, the transaction counter allows
3095  * us to quickly pick the largest transaction for eviction.
3096  *
3097  * When streaming is enabled, we need to update the toplevel transaction
3098  * counters instead - we don't really care about subtransactions as we
3099  * can't stream them individually anyway, and we only pick toplevel
3100  * transactions for eviction. So only toplevel transactions matter.
3101  */
3102 static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer * rb,ReorderBufferChange * change,bool addition,Size sz)3103 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3104 								ReorderBufferChange *change,
3105 								bool addition, Size sz)
3106 {
3107 	ReorderBufferTXN *txn;
3108 	ReorderBufferTXN *toptxn;
3109 
3110 	Assert(change->txn);
3111 
3112 	/*
3113 	 * Ignore tuple CID changes, because those are not evicted when reaching
3114 	 * memory limit. So we just don't count them, because it might easily
3115 	 * trigger a pointless attempt to spill.
3116 	 */
3117 	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3118 		return;
3119 
3120 	txn = change->txn;
3121 
3122 	/*
3123 	 * Update the total size in top level as well. This is later used to
3124 	 * compute the decoding stats.
3125 	 */
3126 	if (txn->toptxn != NULL)
3127 		toptxn = txn->toptxn;
3128 	else
3129 		toptxn = txn;
3130 
3131 	if (addition)
3132 	{
3133 		txn->size += sz;
3134 		rb->size += sz;
3135 
3136 		/* Update the total size in the top transaction. */
3137 		toptxn->total_size += sz;
3138 	}
3139 	else
3140 	{
3141 		Assert((rb->size >= sz) && (txn->size >= sz));
3142 		txn->size -= sz;
3143 		rb->size -= sz;
3144 
3145 		/* Update the total size in the top transaction. */
3146 		toptxn->total_size -= sz;
3147 	}
3148 
3149 	Assert(txn->size <= rb->size);
3150 }
3151 
3152 /*
3153  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
3154  *
3155  * We do not include this change type in memory accounting, because we
3156  * keep CIDs in a separate list and do not evict them when reaching
3157  * the memory limit.
3158  */
3159 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)3160 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
3161 							 XLogRecPtr lsn, RelFileNode node,
3162 							 ItemPointerData tid, CommandId cmin,
3163 							 CommandId cmax, CommandId combocid)
3164 {
3165 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
3166 	ReorderBufferTXN *txn;
3167 
3168 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3169 
3170 	change->data.tuplecid.node = node;
3171 	change->data.tuplecid.tid = tid;
3172 	change->data.tuplecid.cmin = cmin;
3173 	change->data.tuplecid.cmax = cmax;
3174 	change->data.tuplecid.combocid = combocid;
3175 	change->lsn = lsn;
3176 	change->txn = txn;
3177 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
3178 
3179 	dlist_push_tail(&txn->tuplecids, &change->node);
3180 	txn->ntuplecids++;
3181 }
3182 
3183 /*
3184  * Setup the invalidation of the toplevel transaction.
3185  *
3186  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3187  * accumulates all the invalidation messages in the toplevel transaction as
3188  * well as in the form of change in reorder buffer.  We require to record it in
3189  * form of the change so that we can execute only the required invalidations
3190  * instead of executing all the invalidations on each CommandId increment.  We
3191  * also need to accumulate these in the toplevel transaction because in some
3192  * cases we skip processing the transaction (see ReorderBufferForget), we need
3193  * to execute all the invalidations together.
3194  */
3195 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)3196 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
3197 							  XLogRecPtr lsn, Size nmsgs,
3198 							  SharedInvalidationMessage *msgs)
3199 {
3200 	ReorderBufferTXN *txn;
3201 	MemoryContext oldcontext;
3202 	ReorderBufferChange *change;
3203 
3204 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3205 
3206 	oldcontext = MemoryContextSwitchTo(rb->context);
3207 
3208 	/*
3209 	 * Collect all the invalidations under the top transaction so that we can
3210 	 * execute them all together.  See comment atop this function
3211 	 */
3212 	if (txn->toptxn)
3213 		txn = txn->toptxn;
3214 
3215 	Assert(nmsgs > 0);
3216 
3217 	/* Accumulate invalidations. */
3218 	if (txn->ninvalidations == 0)
3219 	{
3220 		txn->ninvalidations = nmsgs;
3221 		txn->invalidations = (SharedInvalidationMessage *)
3222 			palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3223 		memcpy(txn->invalidations, msgs,
3224 			   sizeof(SharedInvalidationMessage) * nmsgs);
3225 	}
3226 	else
3227 	{
3228 		txn->invalidations = (SharedInvalidationMessage *)
3229 			repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
3230 					 (txn->ninvalidations + nmsgs));
3231 
3232 		memcpy(txn->invalidations + txn->ninvalidations, msgs,
3233 			   nmsgs * sizeof(SharedInvalidationMessage));
3234 		txn->ninvalidations += nmsgs;
3235 	}
3236 
3237 	change = ReorderBufferGetChange(rb);
3238 	change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
3239 	change->data.inval.ninvalidations = nmsgs;
3240 	change->data.inval.invalidations = (SharedInvalidationMessage *)
3241 		palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3242 	memcpy(change->data.inval.invalidations, msgs,
3243 		   sizeof(SharedInvalidationMessage) * nmsgs);
3244 
3245 	ReorderBufferQueueChange(rb, xid, lsn, change, false);
3246 
3247 	MemoryContextSwitchTo(oldcontext);
3248 }
3249 
3250 /*
3251  * Apply all invalidations we know. Possibly we only need parts at this point
3252  * in the changestream but we don't know which those are.
3253  */
3254 static void
ReorderBufferExecuteInvalidations(uint32 nmsgs,SharedInvalidationMessage * msgs)3255 ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
3256 {
3257 	int			i;
3258 
3259 	for (i = 0; i < nmsgs; i++)
3260 		LocalExecuteInvalidationMessage(&msgs[i]);
3261 }
3262 
3263 /*
3264  * Mark a transaction as containing catalog changes
3265  */
3266 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)3267 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
3268 								  XLogRecPtr lsn)
3269 {
3270 	ReorderBufferTXN *txn;
3271 
3272 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3273 
3274 	txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3275 
3276 	/*
3277 	 * Mark top-level transaction as having catalog changes too if one of its
3278 	 * children has so that the ReorderBufferBuildTupleCidHash can
3279 	 * conveniently check just top-level transaction and decide whether to
3280 	 * build the hash table or not.
3281 	 */
3282 	if (txn->toptxn != NULL)
3283 		txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3284 }
3285 
3286 /*
3287  * Query whether a transaction is already *known* to contain catalog
3288  * changes. This can be wrong until directly before the commit!
3289  */
3290 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)3291 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
3292 {
3293 	ReorderBufferTXN *txn;
3294 
3295 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3296 								false);
3297 	if (txn == NULL)
3298 		return false;
3299 
3300 	return rbtxn_has_catalog_changes(txn);
3301 }
3302 
3303 /*
3304  * ReorderBufferXidHasBaseSnapshot
3305  *		Have we already set the base snapshot for the given txn/subtxn?
3306  */
3307 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)3308 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
3309 {
3310 	ReorderBufferTXN *txn;
3311 
3312 	txn = ReorderBufferTXNByXid(rb, xid, false,
3313 								NULL, InvalidXLogRecPtr, false);
3314 
3315 	/* transaction isn't known yet, ergo no snapshot */
3316 	if (txn == NULL)
3317 		return false;
3318 
3319 	/* a known subtxn? operate on top-level txn instead */
3320 	if (rbtxn_is_known_subxact(txn))
3321 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3322 									NULL, InvalidXLogRecPtr, false);
3323 
3324 	return txn->base_snapshot != NULL;
3325 }
3326 
3327 
3328 /*
3329  * ---------------------------------------
3330  * Disk serialization support
3331  * ---------------------------------------
3332  */
3333 
3334 /*
3335  * Ensure the IO buffer is >= sz.
3336  */
3337 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)3338 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
3339 {
3340 	if (!rb->outbufsize)
3341 	{
3342 		rb->outbuf = MemoryContextAlloc(rb->context, sz);
3343 		rb->outbufsize = sz;
3344 	}
3345 	else if (rb->outbufsize < sz)
3346 	{
3347 		rb->outbuf = repalloc(rb->outbuf, sz);
3348 		rb->outbufsize = sz;
3349 	}
3350 }
3351 
3352 /*
3353  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3354  *
3355  * XXX With many subtransactions this might be quite slow, because we'll have
3356  * to walk through all of them. There are some options how we could improve
3357  * that: (a) maintain some secondary structure with transactions sorted by
3358  * amount of changes, (b) not looking for the entirely largest transaction,
3359  * but e.g. for transaction using at least some fraction of the memory limit,
3360  * and (c) evicting multiple transactions at once, e.g. to free a given portion
3361  * of the memory limit (e.g. 50%).
3362  */
3363 static ReorderBufferTXN *
ReorderBufferLargestTXN(ReorderBuffer * rb)3364 ReorderBufferLargestTXN(ReorderBuffer *rb)
3365 {
3366 	HASH_SEQ_STATUS hash_seq;
3367 	ReorderBufferTXNByIdEnt *ent;
3368 	ReorderBufferTXN *largest = NULL;
3369 
3370 	hash_seq_init(&hash_seq, rb->by_txn);
3371 	while ((ent = hash_seq_search(&hash_seq)) != NULL)
3372 	{
3373 		ReorderBufferTXN *txn = ent->txn;
3374 
3375 		/* if the current transaction is larger, remember it */
3376 		if ((!largest) || (txn->size > largest->size))
3377 			largest = txn;
3378 	}
3379 
3380 	Assert(largest);
3381 	Assert(largest->size > 0);
3382 	Assert(largest->size <= rb->size);
3383 
3384 	return largest;
3385 }
3386 
3387 /*
3388  * Find the largest toplevel transaction to evict (by streaming).
3389  *
3390  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3391  * should give us the same transaction (because we don't update memory account
3392  * for subtransaction with streaming, so it's always 0). But we can simply
3393  * iterate over the limited number of toplevel transactions that have a base
3394  * snapshot. There is no use of selecting a transaction that doesn't have base
3395  * snapshot because we don't decode such transactions.
3396  *
3397  * Note that, we skip transactions that contains incomplete changes. There
3398  * is a scope of optimization here such that we can select the largest
3399  * transaction which has incomplete changes.  But that will make the code and
3400  * design quite complex and that might not be worth the benefit.  If we plan to
3401  * stream the transactions that contains incomplete changes then we need to
3402  * find a way to partially stream/truncate the transaction changes in-memory
3403  * and build a mechanism to partially truncate the spilled files.
3404  * Additionally, whenever we partially stream the transaction we need to
3405  * maintain the last streamed lsn and next time we need to restore from that
3406  * segment and the offset in WAL.  As we stream the changes from the top
3407  * transaction and restore them subtransaction wise, we need to even remember
3408  * the subxact from where we streamed the last change.
3409  */
3410 static ReorderBufferTXN *
ReorderBufferLargestTopTXN(ReorderBuffer * rb)3411 ReorderBufferLargestTopTXN(ReorderBuffer *rb)
3412 {
3413 	dlist_iter	iter;
3414 	Size		largest_size = 0;
3415 	ReorderBufferTXN *largest = NULL;
3416 
3417 	/* Find the largest top-level transaction having a base snapshot. */
3418 	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
3419 	{
3420 		ReorderBufferTXN *txn;
3421 
3422 		txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3423 
3424 		/* must not be a subtxn */
3425 		Assert(!rbtxn_is_known_subxact(txn));
3426 		/* base_snapshot must be set */
3427 		Assert(txn->base_snapshot != NULL);
3428 
3429 		if ((largest == NULL || txn->total_size > largest_size) &&
3430 			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3431 		{
3432 			largest = txn;
3433 			largest_size = txn->total_size;
3434 		}
3435 	}
3436 
3437 	return largest;
3438 }
3439 
3440 /*
3441  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3442  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3443  * disk until we reach under the memory limit.
3444  *
3445  * XXX At this point we select the transactions until we reach under the memory
3446  * limit, but we might also adapt a more elaborate eviction strategy - for example
3447  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3448  * limit.
3449  */
3450 static void
ReorderBufferCheckMemoryLimit(ReorderBuffer * rb)3451 ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
3452 {
3453 	ReorderBufferTXN *txn;
3454 
3455 	/* bail out if we haven't exceeded the memory limit */
3456 	if (rb->size < logical_decoding_work_mem * 1024L)
3457 		return;
3458 
3459 	/*
3460 	 * Loop until we reach under the memory limit.  One might think that just
3461 	 * by evicting the largest (sub)transaction we will come under the memory
3462 	 * limit based on assumption that the selected transaction is at least as
3463 	 * large as the most recent change (which caused us to go over the memory
3464 	 * limit). However, that is not true because a user can reduce the
3465 	 * logical_decoding_work_mem to a smaller value before the most recent
3466 	 * change.
3467 	 */
3468 	while (rb->size >= logical_decoding_work_mem * 1024L)
3469 	{
3470 		/*
3471 		 * Pick the largest transaction (or subtransaction) and evict it from
3472 		 * memory by streaming, if possible.  Otherwise, spill to disk.
3473 		 */
3474 		if (ReorderBufferCanStartStreaming(rb) &&
3475 			(txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3476 		{
3477 			/* we know there has to be one, because the size is not zero */
3478 			Assert(txn && !txn->toptxn);
3479 			Assert(txn->total_size > 0);
3480 			Assert(rb->size >= txn->total_size);
3481 
3482 			ReorderBufferStreamTXN(rb, txn);
3483 		}
3484 		else
3485 		{
3486 			/*
3487 			 * Pick the largest transaction (or subtransaction) and evict it
3488 			 * from memory by serializing it to disk.
3489 			 */
3490 			txn = ReorderBufferLargestTXN(rb);
3491 
3492 			/* we know there has to be one, because the size is not zero */
3493 			Assert(txn);
3494 			Assert(txn->size > 0);
3495 			Assert(rb->size >= txn->size);
3496 
3497 			ReorderBufferSerializeTXN(rb, txn);
3498 		}
3499 
3500 		/*
3501 		 * After eviction, the transaction should have no entries in memory,
3502 		 * and should use 0 bytes for changes.
3503 		 */
3504 		Assert(txn->size == 0);
3505 		Assert(txn->nentries_mem == 0);
3506 	}
3507 
3508 	/* We must be under the memory limit now. */
3509 	Assert(rb->size < logical_decoding_work_mem * 1024L);
3510 }
3511 
3512 /*
3513  * Spill data of a large transaction (and its subtransactions) to disk.
3514  */
3515 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)3516 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
3517 {
3518 	dlist_iter	subtxn_i;
3519 	dlist_mutable_iter change_i;
3520 	int			fd = -1;
3521 	XLogSegNo	curOpenSegNo = 0;
3522 	Size		spilled = 0;
3523 	Size		size = txn->size;
3524 
3525 	elog(DEBUG2, "spill %u changes in XID %u to disk",
3526 		 (uint32) txn->nentries_mem, txn->xid);
3527 
3528 	/* do the same to all child TXs */
3529 	dlist_foreach(subtxn_i, &txn->subtxns)
3530 	{
3531 		ReorderBufferTXN *subtxn;
3532 
3533 		subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3534 		ReorderBufferSerializeTXN(rb, subtxn);
3535 	}
3536 
3537 	/* serialize changestream */
3538 	dlist_foreach_modify(change_i, &txn->changes)
3539 	{
3540 		ReorderBufferChange *change;
3541 
3542 		change = dlist_container(ReorderBufferChange, node, change_i.cur);
3543 
3544 		/*
3545 		 * store in segment in which it belongs by start lsn, don't split over
3546 		 * multiple segments tho
3547 		 */
3548 		if (fd == -1 ||
3549 			!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3550 		{
3551 			char		path[MAXPGPATH];
3552 
3553 			if (fd != -1)
3554 				CloseTransientFile(fd);
3555 
3556 			XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3557 
3558 			/*
3559 			 * No need to care about TLIs here, only used during a single run,
3560 			 * so each LSN only maps to a specific WAL record.
3561 			 */
3562 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
3563 										curOpenSegNo);
3564 
3565 			/* open segment, create it if necessary */
3566 			fd = OpenTransientFile(path,
3567 								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3568 
3569 			if (fd < 0)
3570 				ereport(ERROR,
3571 						(errcode_for_file_access(),
3572 						 errmsg("could not open file \"%s\": %m", path)));
3573 		}
3574 
3575 		ReorderBufferSerializeChange(rb, txn, fd, change);
3576 		dlist_delete(&change->node);
3577 		ReorderBufferReturnChange(rb, change, true);
3578 
3579 		spilled++;
3580 	}
3581 
3582 	/* update the statistics iff we have spilled anything */
3583 	if (spilled)
3584 	{
3585 		rb->spillCount += 1;
3586 		rb->spillBytes += size;
3587 
3588 		/* don't consider already serialized transactions */
3589 		rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3590 
3591 		/* update the decoding stats */
3592 		UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3593 	}
3594 
3595 	Assert(spilled == txn->nentries_mem);
3596 	Assert(dlist_is_empty(&txn->changes));
3597 	txn->nentries_mem = 0;
3598 	txn->txn_flags |= RBTXN_IS_SERIALIZED;
3599 
3600 	if (fd != -1)
3601 		CloseTransientFile(fd);
3602 }
3603 
3604 /*
3605  * Serialize individual change to disk.
3606  */
3607 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)3608 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
3609 							 int fd, ReorderBufferChange *change)
3610 {
3611 	ReorderBufferDiskChange *ondisk;
3612 	Size		sz = sizeof(ReorderBufferDiskChange);
3613 
3614 	ReorderBufferSerializeReserve(rb, sz);
3615 
3616 	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3617 	memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3618 
3619 	switch (change->action)
3620 	{
3621 			/* fall through these, they're all similar enough */
3622 		case REORDER_BUFFER_CHANGE_INSERT:
3623 		case REORDER_BUFFER_CHANGE_UPDATE:
3624 		case REORDER_BUFFER_CHANGE_DELETE:
3625 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
3626 			{
3627 				char	   *data;
3628 				ReorderBufferTupleBuf *oldtup,
3629 						   *newtup;
3630 				Size		oldlen = 0;
3631 				Size		newlen = 0;
3632 
3633 				oldtup = change->data.tp.oldtuple;
3634 				newtup = change->data.tp.newtuple;
3635 
3636 				if (oldtup)
3637 				{
3638 					sz += sizeof(HeapTupleData);
3639 					oldlen = oldtup->tuple.t_len;
3640 					sz += oldlen;
3641 				}
3642 
3643 				if (newtup)
3644 				{
3645 					sz += sizeof(HeapTupleData);
3646 					newlen = newtup->tuple.t_len;
3647 					sz += newlen;
3648 				}
3649 
3650 				/* make sure we have enough space */
3651 				ReorderBufferSerializeReserve(rb, sz);
3652 
3653 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3654 				/* might have been reallocated above */
3655 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3656 
3657 				if (oldlen)
3658 				{
3659 					memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3660 					data += sizeof(HeapTupleData);
3661 
3662 					memcpy(data, oldtup->tuple.t_data, oldlen);
3663 					data += oldlen;
3664 				}
3665 
3666 				if (newlen)
3667 				{
3668 					memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3669 					data += sizeof(HeapTupleData);
3670 
3671 					memcpy(data, newtup->tuple.t_data, newlen);
3672 					data += newlen;
3673 				}
3674 				break;
3675 			}
3676 		case REORDER_BUFFER_CHANGE_MESSAGE:
3677 			{
3678 				char	   *data;
3679 				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
3680 
3681 				sz += prefix_size + change->data.msg.message_size +
3682 					sizeof(Size) + sizeof(Size);
3683 				ReorderBufferSerializeReserve(rb, sz);
3684 
3685 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3686 
3687 				/* might have been reallocated above */
3688 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3689 
3690 				/* write the prefix including the size */
3691 				memcpy(data, &prefix_size, sizeof(Size));
3692 				data += sizeof(Size);
3693 				memcpy(data, change->data.msg.prefix,
3694 					   prefix_size);
3695 				data += prefix_size;
3696 
3697 				/* write the message including the size */
3698 				memcpy(data, &change->data.msg.message_size, sizeof(Size));
3699 				data += sizeof(Size);
3700 				memcpy(data, change->data.msg.message,
3701 					   change->data.msg.message_size);
3702 				data += change->data.msg.message_size;
3703 
3704 				break;
3705 			}
3706 		case REORDER_BUFFER_CHANGE_INVALIDATION:
3707 			{
3708 				char	   *data;
3709 				Size		inval_size = sizeof(SharedInvalidationMessage) *
3710 				change->data.inval.ninvalidations;
3711 
3712 				sz += inval_size;
3713 
3714 				ReorderBufferSerializeReserve(rb, sz);
3715 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3716 
3717 				/* might have been reallocated above */
3718 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3719 				memcpy(data, change->data.inval.invalidations, inval_size);
3720 				data += inval_size;
3721 
3722 				break;
3723 			}
3724 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
3725 			{
3726 				Snapshot	snap;
3727 				char	   *data;
3728 
3729 				snap = change->data.snapshot;
3730 
3731 				sz += sizeof(SnapshotData) +
3732 					sizeof(TransactionId) * snap->xcnt +
3733 					sizeof(TransactionId) * snap->subxcnt;
3734 
3735 				/* make sure we have enough space */
3736 				ReorderBufferSerializeReserve(rb, sz);
3737 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3738 				/* might have been reallocated above */
3739 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3740 
3741 				memcpy(data, snap, sizeof(SnapshotData));
3742 				data += sizeof(SnapshotData);
3743 
3744 				if (snap->xcnt)
3745 				{
3746 					memcpy(data, snap->xip,
3747 						   sizeof(TransactionId) * snap->xcnt);
3748 					data += sizeof(TransactionId) * snap->xcnt;
3749 				}
3750 
3751 				if (snap->subxcnt)
3752 				{
3753 					memcpy(data, snap->subxip,
3754 						   sizeof(TransactionId) * snap->subxcnt);
3755 					data += sizeof(TransactionId) * snap->subxcnt;
3756 				}
3757 				break;
3758 			}
3759 		case REORDER_BUFFER_CHANGE_TRUNCATE:
3760 			{
3761 				Size		size;
3762 				char	   *data;
3763 
3764 				/* account for the OIDs of truncated relations */
3765 				size = sizeof(Oid) * change->data.truncate.nrelids;
3766 				sz += size;
3767 
3768 				/* make sure we have enough space */
3769 				ReorderBufferSerializeReserve(rb, sz);
3770 
3771 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3772 				/* might have been reallocated above */
3773 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3774 
3775 				memcpy(data, change->data.truncate.relids, size);
3776 				data += size;
3777 
3778 				break;
3779 			}
3780 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
3781 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
3782 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
3783 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
3784 			/* ReorderBufferChange contains everything important */
3785 			break;
3786 	}
3787 
3788 	ondisk->size = sz;
3789 
3790 	errno = 0;
3791 	pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3792 	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3793 	{
3794 		int			save_errno = errno;
3795 
3796 		CloseTransientFile(fd);
3797 
3798 		/* if write didn't set errno, assume problem is no disk space */
3799 		errno = save_errno ? save_errno : ENOSPC;
3800 		ereport(ERROR,
3801 				(errcode_for_file_access(),
3802 				 errmsg("could not write to data file for XID %u: %m",
3803 						txn->xid)));
3804 	}
3805 	pgstat_report_wait_end();
3806 
3807 	/*
3808 	 * Keep the transaction's final_lsn up to date with each change we send to
3809 	 * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
3810 	 * only do this on commit and abort records, but that doesn't work if a
3811 	 * system crash leaves a transaction without its abort record).
3812 	 *
3813 	 * Make sure not to move it backwards.
3814 	 */
3815 	if (txn->final_lsn < change->lsn)
3816 		txn->final_lsn = change->lsn;
3817 
3818 	Assert(ondisk->change.action == change->action);
3819 }
3820 
3821 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3822 static inline bool
ReorderBufferCanStream(ReorderBuffer * rb)3823 ReorderBufferCanStream(ReorderBuffer *rb)
3824 {
3825 	LogicalDecodingContext *ctx = rb->private_data;
3826 
3827 	return ctx->streaming;
3828 }
3829 
3830 /* Returns true, if the streaming can be started now, false, otherwise. */
3831 static inline bool
ReorderBufferCanStartStreaming(ReorderBuffer * rb)3832 ReorderBufferCanStartStreaming(ReorderBuffer *rb)
3833 {
3834 	LogicalDecodingContext *ctx = rb->private_data;
3835 	SnapBuild  *builder = ctx->snapshot_builder;
3836 
3837 	/* We can't start streaming unless a consistent state is reached. */
3838 	if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
3839 		return false;
3840 
3841 	/*
3842 	 * We can't start streaming immediately even if the streaming is enabled
3843 	 * because we previously decoded this transaction and now just are
3844 	 * restarting.
3845 	 */
3846 	if (ReorderBufferCanStream(rb) &&
3847 		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3848 		return true;
3849 
3850 	return false;
3851 }
3852 
3853 /*
3854  * Send data of a large transaction (and its subtransactions) to the
3855  * output plugin, but using the stream API.
3856  */
3857 static void
ReorderBufferStreamTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)3858 ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
3859 {
3860 	Snapshot	snapshot_now;
3861 	CommandId	command_id;
3862 	Size		stream_bytes;
3863 	bool		txn_is_streamed;
3864 
3865 	/* We can never reach here for a subtransaction. */
3866 	Assert(txn->toptxn == NULL);
3867 
3868 	/*
3869 	 * We can't make any assumptions about base snapshot here, similar to what
3870 	 * ReorderBufferCommit() does. That relies on base_snapshot getting
3871 	 * transferred from subxact in ReorderBufferCommitChild(), but that was
3872 	 * not yet called as the transaction is in-progress.
3873 	 *
3874 	 * So just walk the subxacts and use the same logic here. But we only need
3875 	 * to do that once, when the transaction is streamed for the first time.
3876 	 * After that we need to reuse the snapshot from the previous run.
3877 	 *
3878 	 * Unlike DecodeCommit which adds xids of all the subtransactions in
3879 	 * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3880 	 * but we do add them to subxip array instead via ReorderBufferCopySnap.
3881 	 * This allows the catalog changes made in subtransactions decoded till
3882 	 * now to be visible.
3883 	 */
3884 	if (txn->snapshot_now == NULL)
3885 	{
3886 		dlist_iter	subxact_i;
3887 
3888 		/* make sure this transaction is streamed for the first time */
3889 		Assert(!rbtxn_is_streamed(txn));
3890 
3891 		/* at the beginning we should have invalid command ID */
3892 		Assert(txn->command_id == InvalidCommandId);
3893 
3894 		dlist_foreach(subxact_i, &txn->subtxns)
3895 		{
3896 			ReorderBufferTXN *subtxn;
3897 
3898 			subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3899 			ReorderBufferTransferSnapToParent(txn, subtxn);
3900 		}
3901 
3902 		/*
3903 		 * If this transaction has no snapshot, it didn't make any changes to
3904 		 * the database till now, so there's nothing to decode.
3905 		 */
3906 		if (txn->base_snapshot == NULL)
3907 		{
3908 			Assert(txn->ninvalidations == 0);
3909 			return;
3910 		}
3911 
3912 		command_id = FirstCommandId;
3913 		snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3914 											 txn, command_id);
3915 	}
3916 	else
3917 	{
3918 		/* the transaction must have been already streamed */
3919 		Assert(rbtxn_is_streamed(txn));
3920 
3921 		/*
3922 		 * Nah, we already have snapshot from the previous streaming run. We
3923 		 * assume new subxacts can't move the LSN backwards, and so can't beat
3924 		 * the LSN condition in the previous branch (so no need to walk
3925 		 * through subxacts again). In fact, we must not do that as we may be
3926 		 * using snapshot half-way through the subxact.
3927 		 */
3928 		command_id = txn->command_id;
3929 
3930 		/*
3931 		 * We can't use txn->snapshot_now directly because after the last
3932 		 * streaming run, we might have got some new sub-transactions. So we
3933 		 * need to add them to the snapshot.
3934 		 */
3935 		snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3936 											 txn, command_id);
3937 
3938 		/* Free the previously copied snapshot. */
3939 		Assert(txn->snapshot_now->copied);
3940 		ReorderBufferFreeSnap(rb, txn->snapshot_now);
3941 		txn->snapshot_now = NULL;
3942 	}
3943 
3944 	/*
3945 	 * Remember this information to be used later to update stats. We can't
3946 	 * update the stats here as an error while processing the changes would
3947 	 * lead to the accumulation of stats even though we haven't streamed all
3948 	 * the changes.
3949 	 */
3950 	txn_is_streamed = rbtxn_is_streamed(txn);
3951 	stream_bytes = txn->total_size;
3952 
3953 	/* Process and send the changes to output plugin. */
3954 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3955 							command_id, true);
3956 
3957 	rb->streamCount += 1;
3958 	rb->streamBytes += stream_bytes;
3959 
3960 	/* Don't consider already streamed transaction. */
3961 	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
3962 
3963 	/* update the decoding stats */
3964 	UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3965 
3966 	Assert(dlist_is_empty(&txn->changes));
3967 	Assert(txn->nentries == 0);
3968 	Assert(txn->nentries_mem == 0);
3969 }
3970 
3971 /*
3972  * Size of a change in memory.
3973  */
3974 static Size
ReorderBufferChangeSize(ReorderBufferChange * change)3975 ReorderBufferChangeSize(ReorderBufferChange *change)
3976 {
3977 	Size		sz = sizeof(ReorderBufferChange);
3978 
3979 	switch (change->action)
3980 	{
3981 			/* fall through these, they're all similar enough */
3982 		case REORDER_BUFFER_CHANGE_INSERT:
3983 		case REORDER_BUFFER_CHANGE_UPDATE:
3984 		case REORDER_BUFFER_CHANGE_DELETE:
3985 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
3986 			{
3987 				ReorderBufferTupleBuf *oldtup,
3988 						   *newtup;
3989 				Size		oldlen = 0;
3990 				Size		newlen = 0;
3991 
3992 				oldtup = change->data.tp.oldtuple;
3993 				newtup = change->data.tp.newtuple;
3994 
3995 				if (oldtup)
3996 				{
3997 					sz += sizeof(HeapTupleData);
3998 					oldlen = oldtup->tuple.t_len;
3999 					sz += oldlen;
4000 				}
4001 
4002 				if (newtup)
4003 				{
4004 					sz += sizeof(HeapTupleData);
4005 					newlen = newtup->tuple.t_len;
4006 					sz += newlen;
4007 				}
4008 
4009 				break;
4010 			}
4011 		case REORDER_BUFFER_CHANGE_MESSAGE:
4012 			{
4013 				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
4014 
4015 				sz += prefix_size + change->data.msg.message_size +
4016 					sizeof(Size) + sizeof(Size);
4017 
4018 				break;
4019 			}
4020 		case REORDER_BUFFER_CHANGE_INVALIDATION:
4021 			{
4022 				sz += sizeof(SharedInvalidationMessage) *
4023 					change->data.inval.ninvalidations;
4024 				break;
4025 			}
4026 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4027 			{
4028 				Snapshot	snap;
4029 
4030 				snap = change->data.snapshot;
4031 
4032 				sz += sizeof(SnapshotData) +
4033 					sizeof(TransactionId) * snap->xcnt +
4034 					sizeof(TransactionId) * snap->subxcnt;
4035 
4036 				break;
4037 			}
4038 		case REORDER_BUFFER_CHANGE_TRUNCATE:
4039 			{
4040 				sz += sizeof(Oid) * change->data.truncate.nrelids;
4041 
4042 				break;
4043 			}
4044 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4045 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4046 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4047 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4048 			/* ReorderBufferChange contains everything important */
4049 			break;
4050 	}
4051 
4052 	return sz;
4053 }
4054 
4055 
4056 /*
4057  * Restore a number of changes spilled to disk back into memory.
4058  */
4059 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,TXNEntryFile * file,XLogSegNo * segno)4060 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
4061 							TXNEntryFile *file, XLogSegNo *segno)
4062 {
4063 	Size		restored = 0;
4064 	XLogSegNo	last_segno;
4065 	dlist_mutable_iter cleanup_iter;
4066 	File	   *fd = &file->vfd;
4067 
4068 	Assert(txn->first_lsn != InvalidXLogRecPtr);
4069 	Assert(txn->final_lsn != InvalidXLogRecPtr);
4070 
4071 	/* free current entries, so we have memory for more */
4072 	dlist_foreach_modify(cleanup_iter, &txn->changes)
4073 	{
4074 		ReorderBufferChange *cleanup =
4075 		dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4076 
4077 		dlist_delete(&cleanup->node);
4078 		ReorderBufferReturnChange(rb, cleanup, true);
4079 	}
4080 	txn->nentries_mem = 0;
4081 	Assert(dlist_is_empty(&txn->changes));
4082 
4083 	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4084 
4085 	while (restored < max_changes_in_memory && *segno <= last_segno)
4086 	{
4087 		int			readBytes;
4088 		ReorderBufferDiskChange *ondisk;
4089 
4090 		if (*fd == -1)
4091 		{
4092 			char		path[MAXPGPATH];
4093 
4094 			/* first time in */
4095 			if (*segno == 0)
4096 				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4097 
4098 			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4099 
4100 			/*
4101 			 * No need to care about TLIs here, only used during a single run,
4102 			 * so each LSN only maps to a specific WAL record.
4103 			 */
4104 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
4105 										*segno);
4106 
4107 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4108 
4109 			/* No harm in resetting the offset even in case of failure */
4110 			file->curOffset = 0;
4111 
4112 			if (*fd < 0 && errno == ENOENT)
4113 			{
4114 				*fd = -1;
4115 				(*segno)++;
4116 				continue;
4117 			}
4118 			else if (*fd < 0)
4119 				ereport(ERROR,
4120 						(errcode_for_file_access(),
4121 						 errmsg("could not open file \"%s\": %m",
4122 								path)));
4123 		}
4124 
4125 		/*
4126 		 * Read the statically sized part of a change which has information
4127 		 * about the total size. If we couldn't read a record, we're at the
4128 		 * end of this file.
4129 		 */
4130 		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
4131 		readBytes = FileRead(file->vfd, rb->outbuf,
4132 							 sizeof(ReorderBufferDiskChange),
4133 							 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4134 
4135 		/* eof */
4136 		if (readBytes == 0)
4137 		{
4138 			FileClose(*fd);
4139 			*fd = -1;
4140 			(*segno)++;
4141 			continue;
4142 		}
4143 		else if (readBytes < 0)
4144 			ereport(ERROR,
4145 					(errcode_for_file_access(),
4146 					 errmsg("could not read from reorderbuffer spill file: %m")));
4147 		else if (readBytes != sizeof(ReorderBufferDiskChange))
4148 			ereport(ERROR,
4149 					(errcode_for_file_access(),
4150 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4151 							readBytes,
4152 							(uint32) sizeof(ReorderBufferDiskChange))));
4153 
4154 		file->curOffset += readBytes;
4155 
4156 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4157 
4158 		ReorderBufferSerializeReserve(rb,
4159 									  sizeof(ReorderBufferDiskChange) + ondisk->size);
4160 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4161 
4162 		readBytes = FileRead(file->vfd,
4163 							 rb->outbuf + sizeof(ReorderBufferDiskChange),
4164 							 ondisk->size - sizeof(ReorderBufferDiskChange),
4165 							 file->curOffset,
4166 							 WAIT_EVENT_REORDER_BUFFER_READ);
4167 
4168 		if (readBytes < 0)
4169 			ereport(ERROR,
4170 					(errcode_for_file_access(),
4171 					 errmsg("could not read from reorderbuffer spill file: %m")));
4172 		else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4173 			ereport(ERROR,
4174 					(errcode_for_file_access(),
4175 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4176 							readBytes,
4177 							(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4178 
4179 		file->curOffset += readBytes;
4180 
4181 		/*
4182 		 * ok, read a full change from disk, now restore it into proper
4183 		 * in-memory format
4184 		 */
4185 		ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4186 		restored++;
4187 	}
4188 
4189 	return restored;
4190 }
4191 
4192 /*
4193  * Convert change from its on-disk format to in-memory format and queue it onto
4194  * the TXN's ->changes list.
4195  *
4196  * Note: although "data" is declared char*, at entry it points to a
4197  * maxalign'd buffer, making it safe in most of this function to assume
4198  * that the pointed-to data is suitably aligned for direct access.
4199  */
4200 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)4201 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
4202 						   char *data)
4203 {
4204 	ReorderBufferDiskChange *ondisk;
4205 	ReorderBufferChange *change;
4206 
4207 	ondisk = (ReorderBufferDiskChange *) data;
4208 
4209 	change = ReorderBufferGetChange(rb);
4210 
4211 	/* copy static part */
4212 	memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4213 
4214 	data += sizeof(ReorderBufferDiskChange);
4215 
4216 	/* restore individual stuff */
4217 	switch (change->action)
4218 	{
4219 			/* fall through these, they're all similar enough */
4220 		case REORDER_BUFFER_CHANGE_INSERT:
4221 		case REORDER_BUFFER_CHANGE_UPDATE:
4222 		case REORDER_BUFFER_CHANGE_DELETE:
4223 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
4224 			if (change->data.tp.oldtuple)
4225 			{
4226 				uint32		tuplelen = ((HeapTuple) data)->t_len;
4227 
4228 				change->data.tp.oldtuple =
4229 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4230 
4231 				/* restore ->tuple */
4232 				memcpy(&change->data.tp.oldtuple->tuple, data,
4233 					   sizeof(HeapTupleData));
4234 				data += sizeof(HeapTupleData);
4235 
4236 				/* reset t_data pointer into the new tuplebuf */
4237 				change->data.tp.oldtuple->tuple.t_data =
4238 					ReorderBufferTupleBufData(change->data.tp.oldtuple);
4239 
4240 				/* restore tuple data itself */
4241 				memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4242 				data += tuplelen;
4243 			}
4244 
4245 			if (change->data.tp.newtuple)
4246 			{
4247 				/* here, data might not be suitably aligned! */
4248 				uint32		tuplelen;
4249 
4250 				memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4251 					   sizeof(uint32));
4252 
4253 				change->data.tp.newtuple =
4254 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4255 
4256 				/* restore ->tuple */
4257 				memcpy(&change->data.tp.newtuple->tuple, data,
4258 					   sizeof(HeapTupleData));
4259 				data += sizeof(HeapTupleData);
4260 
4261 				/* reset t_data pointer into the new tuplebuf */
4262 				change->data.tp.newtuple->tuple.t_data =
4263 					ReorderBufferTupleBufData(change->data.tp.newtuple);
4264 
4265 				/* restore tuple data itself */
4266 				memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4267 				data += tuplelen;
4268 			}
4269 
4270 			break;
4271 		case REORDER_BUFFER_CHANGE_MESSAGE:
4272 			{
4273 				Size		prefix_size;
4274 
4275 				/* read prefix */
4276 				memcpy(&prefix_size, data, sizeof(Size));
4277 				data += sizeof(Size);
4278 				change->data.msg.prefix = MemoryContextAlloc(rb->context,
4279 															 prefix_size);
4280 				memcpy(change->data.msg.prefix, data, prefix_size);
4281 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4282 				data += prefix_size;
4283 
4284 				/* read the message */
4285 				memcpy(&change->data.msg.message_size, data, sizeof(Size));
4286 				data += sizeof(Size);
4287 				change->data.msg.message = MemoryContextAlloc(rb->context,
4288 															  change->data.msg.message_size);
4289 				memcpy(change->data.msg.message, data,
4290 					   change->data.msg.message_size);
4291 				data += change->data.msg.message_size;
4292 
4293 				break;
4294 			}
4295 		case REORDER_BUFFER_CHANGE_INVALIDATION:
4296 			{
4297 				Size		inval_size = sizeof(SharedInvalidationMessage) *
4298 				change->data.inval.ninvalidations;
4299 
4300 				change->data.inval.invalidations =
4301 					MemoryContextAlloc(rb->context, inval_size);
4302 
4303 				/* read the message */
4304 				memcpy(change->data.inval.invalidations, data, inval_size);
4305 
4306 				break;
4307 			}
4308 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4309 			{
4310 				Snapshot	oldsnap;
4311 				Snapshot	newsnap;
4312 				Size		size;
4313 
4314 				oldsnap = (Snapshot) data;
4315 
4316 				size = sizeof(SnapshotData) +
4317 					sizeof(TransactionId) * oldsnap->xcnt +
4318 					sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4319 
4320 				change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4321 
4322 				newsnap = change->data.snapshot;
4323 
4324 				memcpy(newsnap, data, size);
4325 				newsnap->xip = (TransactionId *)
4326 					(((char *) newsnap) + sizeof(SnapshotData));
4327 				newsnap->subxip = newsnap->xip + newsnap->xcnt;
4328 				newsnap->copied = true;
4329 				break;
4330 			}
4331 			/* the base struct contains all the data, easy peasy */
4332 		case REORDER_BUFFER_CHANGE_TRUNCATE:
4333 			{
4334 				Oid		   *relids;
4335 
4336 				relids = ReorderBufferGetRelids(rb,
4337 												change->data.truncate.nrelids);
4338 				memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4339 				change->data.truncate.relids = relids;
4340 
4341 				break;
4342 			}
4343 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4344 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4345 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4346 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4347 			break;
4348 	}
4349 
4350 	dlist_push_tail(&txn->changes, &change->node);
4351 	txn->nentries_mem++;
4352 
4353 	/*
4354 	 * Update memory accounting for the restored change.  We need to do this
4355 	 * although we don't check the memory limit when restoring the changes in
4356 	 * this branch (we only do that when initially queueing the changes after
4357 	 * decoding), because we will release the changes later, and that will
4358 	 * update the accounting too (subtracting the size from the counters). And
4359 	 * we don't want to underflow there.
4360 	 */
4361 	ReorderBufferChangeMemoryUpdate(rb, change, true,
4362 									ReorderBufferChangeSize(change));
4363 }
4364 
4365 /*
4366  * Remove all on-disk stored for the passed in transaction.
4367  */
4368 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)4369 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
4370 {
4371 	XLogSegNo	first;
4372 	XLogSegNo	cur;
4373 	XLogSegNo	last;
4374 
4375 	Assert(txn->first_lsn != InvalidXLogRecPtr);
4376 	Assert(txn->final_lsn != InvalidXLogRecPtr);
4377 
4378 	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4379 	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4380 
4381 	/* iterate over all possible filenames, and delete them */
4382 	for (cur = first; cur <= last; cur++)
4383 	{
4384 		char		path[MAXPGPATH];
4385 
4386 		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
4387 		if (unlink(path) != 0 && errno != ENOENT)
4388 			ereport(ERROR,
4389 					(errcode_for_file_access(),
4390 					 errmsg("could not remove file \"%s\": %m", path)));
4391 	}
4392 }
4393 
4394 /*
4395  * Remove any leftover serialized reorder buffers from a slot directory after a
4396  * prior crash or decoding session exit.
4397  */
4398 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)4399 ReorderBufferCleanupSerializedTXNs(const char *slotname)
4400 {
4401 	DIR		   *spill_dir;
4402 	struct dirent *spill_de;
4403 	struct stat statbuf;
4404 	char		path[MAXPGPATH * 2 + 12];
4405 
4406 	sprintf(path, "pg_replslot/%s", slotname);
4407 
4408 	/* we're only handling directories here, skip if it's not ours */
4409 	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4410 		return;
4411 
4412 	spill_dir = AllocateDir(path);
4413 	while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4414 	{
4415 		/* only look at names that can be ours */
4416 		if (strncmp(spill_de->d_name, "xid", 3) == 0)
4417 		{
4418 			snprintf(path, sizeof(path),
4419 					 "pg_replslot/%s/%s", slotname,
4420 					 spill_de->d_name);
4421 
4422 			if (unlink(path) != 0)
4423 				ereport(ERROR,
4424 						(errcode_for_file_access(),
4425 						 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4426 								path, slotname)));
4427 		}
4428 	}
4429 	FreeDir(spill_dir);
4430 }
4431 
4432 /*
4433  * Given a replication slot, transaction ID and segment number, fill in the
4434  * corresponding spill file into 'path', which is a caller-owned buffer of size
4435  * at least MAXPGPATH.
4436  */
4437 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)4438 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
4439 							XLogSegNo segno)
4440 {
4441 	XLogRecPtr	recptr;
4442 
4443 	XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4444 
4445 	snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4446 			 NameStr(MyReplicationSlot->data.name),
4447 			 xid, LSN_FORMAT_ARGS(recptr));
4448 }
4449 
4450 /*
4451  * Delete all data spilled to disk after we've restarted/crashed. It will be
4452  * recreated when the respective slots are reused.
4453  */
4454 void
StartupReorderBuffer(void)4455 StartupReorderBuffer(void)
4456 {
4457 	DIR		   *logical_dir;
4458 	struct dirent *logical_de;
4459 
4460 	logical_dir = AllocateDir("pg_replslot");
4461 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4462 	{
4463 		if (strcmp(logical_de->d_name, ".") == 0 ||
4464 			strcmp(logical_de->d_name, "..") == 0)
4465 			continue;
4466 
4467 		/* if it cannot be a slot, skip the directory */
4468 		if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4469 			continue;
4470 
4471 		/*
4472 		 * ok, has to be a surviving logical slot, iterate and delete
4473 		 * everything starting with xid-*
4474 		 */
4475 		ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
4476 	}
4477 	FreeDir(logical_dir);
4478 }
4479 
4480 /* ---------------------------------------
4481  * toast reassembly support
4482  * ---------------------------------------
4483  */
4484 
4485 /*
4486  * Initialize per tuple toast reconstruction support.
4487  */
4488 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)4489 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
4490 {
4491 	HASHCTL		hash_ctl;
4492 
4493 	Assert(txn->toast_hash == NULL);
4494 
4495 	hash_ctl.keysize = sizeof(Oid);
4496 	hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4497 	hash_ctl.hcxt = rb->context;
4498 	txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4499 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4500 }
4501 
4502 /*
4503  * Per toast-chunk handling for toast reconstruction
4504  *
4505  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4506  * toasted Datum comes along.
4507  */
4508 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)4509 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
4510 							  Relation relation, ReorderBufferChange *change)
4511 {
4512 	ReorderBufferToastEnt *ent;
4513 	ReorderBufferTupleBuf *newtup;
4514 	bool		found;
4515 	int32		chunksize;
4516 	bool		isnull;
4517 	Pointer		chunk;
4518 	TupleDesc	desc = RelationGetDescr(relation);
4519 	Oid			chunk_id;
4520 	int32		chunk_seq;
4521 
4522 	if (txn->toast_hash == NULL)
4523 		ReorderBufferToastInitHash(rb, txn);
4524 
4525 	Assert(IsToastRelation(relation));
4526 
4527 	newtup = change->data.tp.newtuple;
4528 	chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
4529 	Assert(!isnull);
4530 	chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
4531 	Assert(!isnull);
4532 
4533 	ent = (ReorderBufferToastEnt *)
4534 		hash_search(txn->toast_hash,
4535 					(void *) &chunk_id,
4536 					HASH_ENTER,
4537 					&found);
4538 
4539 	if (!found)
4540 	{
4541 		Assert(ent->chunk_id == chunk_id);
4542 		ent->num_chunks = 0;
4543 		ent->last_chunk_seq = 0;
4544 		ent->size = 0;
4545 		ent->reconstructed = NULL;
4546 		dlist_init(&ent->chunks);
4547 
4548 		if (chunk_seq != 0)
4549 			elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4550 				 chunk_seq, chunk_id);
4551 	}
4552 	else if (found && chunk_seq != ent->last_chunk_seq + 1)
4553 		elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4554 			 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4555 
4556 	chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
4557 	Assert(!isnull);
4558 
4559 	/* calculate size so we can allocate the right size at once later */
4560 	if (!VARATT_IS_EXTENDED(chunk))
4561 		chunksize = VARSIZE(chunk) - VARHDRSZ;
4562 	else if (VARATT_IS_SHORT(chunk))
4563 		/* could happen due to heap_form_tuple doing its thing */
4564 		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4565 	else
4566 		elog(ERROR, "unexpected type of toast chunk");
4567 
4568 	ent->size += chunksize;
4569 	ent->last_chunk_seq = chunk_seq;
4570 	ent->num_chunks++;
4571 	dlist_push_tail(&ent->chunks, &change->node);
4572 }
4573 
4574 /*
4575  * Rejigger change->newtuple to point to in-memory toast tuples instead to
4576  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4577  *
4578  * We cannot replace unchanged toast tuples though, so those will still point
4579  * to on-disk toast data.
4580  *
4581  * While updating the existing change with detoasted tuple data, we need to
4582  * update the memory accounting info, because the change size will differ.
4583  * Otherwise the accounting may get out of sync, triggering serialization
4584  * at unexpected times.
4585  *
4586  * We simply subtract size of the change before rejiggering the tuple, and
4587  * then adding the new size. This makes it look like the change was removed
4588  * and then added back, except it only tweaks the accounting info.
4589  *
4590  * In particular it can't trigger serialization, which would be pointless
4591  * anyway as it happens during commit processing right before handing
4592  * the change to the output plugin.
4593  */
4594 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)4595 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
4596 						  Relation relation, ReorderBufferChange *change)
4597 {
4598 	TupleDesc	desc;
4599 	int			natt;
4600 	Datum	   *attrs;
4601 	bool	   *isnull;
4602 	bool	   *free;
4603 	HeapTuple	tmphtup;
4604 	Relation	toast_rel;
4605 	TupleDesc	toast_desc;
4606 	MemoryContext oldcontext;
4607 	ReorderBufferTupleBuf *newtup;
4608 	Size		old_size;
4609 
4610 	/* no toast tuples changed */
4611 	if (txn->toast_hash == NULL)
4612 		return;
4613 
4614 	/*
4615 	 * We're going to modify the size of the change. So, to make sure the
4616 	 * accounting is correct we record the current change size and then after
4617 	 * re-computing the change we'll subtract the recorded size and then
4618 	 * re-add the new change size at the end. We don't immediately subtract
4619 	 * the old size because if there is any error before we add the new size,
4620 	 * we will release the changes and that will update the accounting info
4621 	 * (subtracting the size from the counters). And we don't want to
4622 	 * underflow there.
4623 	 */
4624 	old_size = ReorderBufferChangeSize(change);
4625 
4626 	oldcontext = MemoryContextSwitchTo(rb->context);
4627 
4628 	/* we should only have toast tuples in an INSERT or UPDATE */
4629 	Assert(change->data.tp.newtuple);
4630 
4631 	desc = RelationGetDescr(relation);
4632 
4633 	toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4634 	if (!RelationIsValid(toast_rel))
4635 		elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4636 			 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4637 
4638 	toast_desc = RelationGetDescr(toast_rel);
4639 
4640 	/* should we allocate from stack instead? */
4641 	attrs = palloc0(sizeof(Datum) * desc->natts);
4642 	isnull = palloc0(sizeof(bool) * desc->natts);
4643 	free = palloc0(sizeof(bool) * desc->natts);
4644 
4645 	newtup = change->data.tp.newtuple;
4646 
4647 	heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
4648 
4649 	for (natt = 0; natt < desc->natts; natt++)
4650 	{
4651 		Form_pg_attribute attr = TupleDescAttr(desc, natt);
4652 		ReorderBufferToastEnt *ent;
4653 		struct varlena *varlena;
4654 
4655 		/* va_rawsize is the size of the original datum -- including header */
4656 		struct varatt_external toast_pointer;
4657 		struct varatt_indirect redirect_pointer;
4658 		struct varlena *new_datum = NULL;
4659 		struct varlena *reconstructed;
4660 		dlist_iter	it;
4661 		Size		data_done = 0;
4662 
4663 		/* system columns aren't toasted */
4664 		if (attr->attnum < 0)
4665 			continue;
4666 
4667 		if (attr->attisdropped)
4668 			continue;
4669 
4670 		/* not a varlena datatype */
4671 		if (attr->attlen != -1)
4672 			continue;
4673 
4674 		/* no data */
4675 		if (isnull[natt])
4676 			continue;
4677 
4678 		/* ok, we know we have a toast datum */
4679 		varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4680 
4681 		/* no need to do anything if the tuple isn't external */
4682 		if (!VARATT_IS_EXTERNAL(varlena))
4683 			continue;
4684 
4685 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4686 
4687 		/*
4688 		 * Check whether the toast tuple changed, replace if so.
4689 		 */
4690 		ent = (ReorderBufferToastEnt *)
4691 			hash_search(txn->toast_hash,
4692 						(void *) &toast_pointer.va_valueid,
4693 						HASH_FIND,
4694 						NULL);
4695 		if (ent == NULL)
4696 			continue;
4697 
4698 		new_datum =
4699 			(struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4700 
4701 		free[natt] = true;
4702 
4703 		reconstructed = palloc0(toast_pointer.va_rawsize);
4704 
4705 		ent->reconstructed = reconstructed;
4706 
4707 		/* stitch toast tuple back together from its parts */
4708 		dlist_foreach(it, &ent->chunks)
4709 		{
4710 			bool		isnull;
4711 			ReorderBufferChange *cchange;
4712 			ReorderBufferTupleBuf *ctup;
4713 			Pointer		chunk;
4714 
4715 			cchange = dlist_container(ReorderBufferChange, node, it.cur);
4716 			ctup = cchange->data.tp.newtuple;
4717 			chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4718 
4719 			Assert(!isnull);
4720 			Assert(!VARATT_IS_EXTERNAL(chunk));
4721 			Assert(!VARATT_IS_SHORT(chunk));
4722 
4723 			memcpy(VARDATA(reconstructed) + data_done,
4724 				   VARDATA(chunk),
4725 				   VARSIZE(chunk) - VARHDRSZ);
4726 			data_done += VARSIZE(chunk) - VARHDRSZ;
4727 		}
4728 		Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4729 
4730 		/* make sure its marked as compressed or not */
4731 		if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4732 			SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4733 		else
4734 			SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4735 
4736 		memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4737 		redirect_pointer.pointer = reconstructed;
4738 
4739 		SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
4740 		memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4741 			   sizeof(redirect_pointer));
4742 
4743 		attrs[natt] = PointerGetDatum(new_datum);
4744 	}
4745 
4746 	/*
4747 	 * Build tuple in separate memory & copy tuple back into the tuplebuf
4748 	 * passed to the output plugin. We can't directly heap_fill_tuple() into
4749 	 * the tuplebuf because attrs[] will point back into the current content.
4750 	 */
4751 	tmphtup = heap_form_tuple(desc, attrs, isnull);
4752 	Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
4753 	Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
4754 
4755 	memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
4756 	newtup->tuple.t_len = tmphtup->t_len;
4757 
4758 	/*
4759 	 * free resources we won't further need, more persistent stuff will be
4760 	 * free'd in ReorderBufferToastReset().
4761 	 */
4762 	RelationClose(toast_rel);
4763 	pfree(tmphtup);
4764 	for (natt = 0; natt < desc->natts; natt++)
4765 	{
4766 		if (free[natt])
4767 			pfree(DatumGetPointer(attrs[natt]));
4768 	}
4769 	pfree(attrs);
4770 	pfree(free);
4771 	pfree(isnull);
4772 
4773 	MemoryContextSwitchTo(oldcontext);
4774 
4775 	/* subtract the old change size */
4776 	ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
4777 	/* now add the change back, with the correct size */
4778 	ReorderBufferChangeMemoryUpdate(rb, change, true,
4779 									ReorderBufferChangeSize(change));
4780 }
4781 
4782 /*
4783  * Free all resources allocated for toast reconstruction.
4784  */
4785 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)4786 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
4787 {
4788 	HASH_SEQ_STATUS hstat;
4789 	ReorderBufferToastEnt *ent;
4790 
4791 	if (txn->toast_hash == NULL)
4792 		return;
4793 
4794 	/* sequentially walk over the hash and free everything */
4795 	hash_seq_init(&hstat, txn->toast_hash);
4796 	while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4797 	{
4798 		dlist_mutable_iter it;
4799 
4800 		if (ent->reconstructed != NULL)
4801 			pfree(ent->reconstructed);
4802 
4803 		dlist_foreach_modify(it, &ent->chunks)
4804 		{
4805 			ReorderBufferChange *change =
4806 			dlist_container(ReorderBufferChange, node, it.cur);
4807 
4808 			dlist_delete(&change->node);
4809 			ReorderBufferReturnChange(rb, change, true);
4810 		}
4811 	}
4812 
4813 	hash_destroy(txn->toast_hash);
4814 	txn->toast_hash = NULL;
4815 }
4816 
4817 
4818 /* ---------------------------------------
4819  * Visibility support for logical decoding
4820  *
4821  *
4822  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4823  * always rely on stored cmin/cmax values because of two scenarios:
4824  *
4825  * * A tuple got changed multiple times during a single transaction and thus
4826  *	 has got a combo CID. Combo CIDs are only valid for the duration of a
4827  *	 single transaction.
4828  * * A tuple with a cmin but no cmax (and thus no combo CID) got
4829  *	 deleted/updated in another transaction than the one which created it
4830  *	 which we are looking at right now. As only one of cmin, cmax or combo CID
4831  *	 is actually stored in the heap we don't have access to the value we
4832  *	 need anymore.
4833  *
4834  * To resolve those problems we have a per-transaction hash of (cmin,
4835  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
4836  * (cmin, cmax) values. That also takes care of combo CIDs by simply
4837  * not caring about them at all. As we have the real cmin/cmax values
4838  * combo CIDs aren't interesting.
4839  *
4840  * As we only care about catalog tuples here the overhead of this
4841  * hashtable should be acceptable.
4842  *
4843  * Heap rewrites complicate this a bit, check rewriteheap.c for
4844  * details.
4845  * -------------------------------------------------------------------------
4846  */
4847 
4848 /* struct for sorting mapping files by LSN efficiently */
4849 typedef struct RewriteMappingFile
4850 {
4851 	XLogRecPtr	lsn;
4852 	char		fname[MAXPGPATH];
4853 } RewriteMappingFile;
4854 
4855 #ifdef NOT_USED
4856 static void
DisplayMapping(HTAB * tuplecid_data)4857 DisplayMapping(HTAB *tuplecid_data)
4858 {
4859 	HASH_SEQ_STATUS hstat;
4860 	ReorderBufferTupleCidEnt *ent;
4861 
4862 	hash_seq_init(&hstat, tuplecid_data);
4863 	while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4864 	{
4865 		elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4866 			 ent->key.relnode.dbNode,
4867 			 ent->key.relnode.spcNode,
4868 			 ent->key.relnode.relNode,
4869 			 ItemPointerGetBlockNumber(&ent->key.tid),
4870 			 ItemPointerGetOffsetNumber(&ent->key.tid),
4871 			 ent->cmin,
4872 			 ent->cmax
4873 			);
4874 	}
4875 }
4876 #endif
4877 
4878 /*
4879  * Apply a single mapping file to tuplecid_data.
4880  *
4881  * The mapping file has to have been verified to be a) committed b) for our
4882  * transaction c) applied in LSN order.
4883  */
4884 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)4885 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
4886 {
4887 	char		path[MAXPGPATH];
4888 	int			fd;
4889 	int			readBytes;
4890 	LogicalRewriteMappingData map;
4891 
4892 	sprintf(path, "pg_logical/mappings/%s", fname);
4893 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4894 	if (fd < 0)
4895 		ereport(ERROR,
4896 				(errcode_for_file_access(),
4897 				 errmsg("could not open file \"%s\": %m", path)));
4898 
4899 	while (true)
4900 	{
4901 		ReorderBufferTupleCidKey key;
4902 		ReorderBufferTupleCidEnt *ent;
4903 		ReorderBufferTupleCidEnt *new_ent;
4904 		bool		found;
4905 
4906 		/* be careful about padding */
4907 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4908 
4909 		/* read all mappings till the end of the file */
4910 		pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
4911 		readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4912 		pgstat_report_wait_end();
4913 
4914 		if (readBytes < 0)
4915 			ereport(ERROR,
4916 					(errcode_for_file_access(),
4917 					 errmsg("could not read file \"%s\": %m",
4918 							path)));
4919 		else if (readBytes == 0)	/* EOF */
4920 			break;
4921 		else if (readBytes != sizeof(LogicalRewriteMappingData))
4922 			ereport(ERROR,
4923 					(errcode_for_file_access(),
4924 					 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4925 							path, readBytes,
4926 							(int32) sizeof(LogicalRewriteMappingData))));
4927 
4928 		key.relnode = map.old_node;
4929 		ItemPointerCopy(&map.old_tid,
4930 						&key.tid);
4931 
4932 
4933 		ent = (ReorderBufferTupleCidEnt *)
4934 			hash_search(tuplecid_data,
4935 						(void *) &key,
4936 						HASH_FIND,
4937 						NULL);
4938 
4939 		/* no existing mapping, no need to update */
4940 		if (!ent)
4941 			continue;
4942 
4943 		key.relnode = map.new_node;
4944 		ItemPointerCopy(&map.new_tid,
4945 						&key.tid);
4946 
4947 		new_ent = (ReorderBufferTupleCidEnt *)
4948 			hash_search(tuplecid_data,
4949 						(void *) &key,
4950 						HASH_ENTER,
4951 						&found);
4952 
4953 		if (found)
4954 		{
4955 			/*
4956 			 * Make sure the existing mapping makes sense. We sometime update
4957 			 * old records that did not yet have a cmax (e.g. pg_class' own
4958 			 * entry while rewriting it) during rewrites, so allow that.
4959 			 */
4960 			Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4961 			Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4962 		}
4963 		else
4964 		{
4965 			/* update mapping */
4966 			new_ent->cmin = ent->cmin;
4967 			new_ent->cmax = ent->cmax;
4968 			new_ent->combocid = ent->combocid;
4969 		}
4970 	}
4971 
4972 	if (CloseTransientFile(fd) != 0)
4973 		ereport(ERROR,
4974 				(errcode_for_file_access(),
4975 				 errmsg("could not close file \"%s\": %m", path)));
4976 }
4977 
4978 
4979 /*
4980  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
4981  */
4982 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)4983 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
4984 {
4985 	return bsearch(&xid, xip, num,
4986 				   sizeof(TransactionId), xidComparator) != NULL;
4987 }
4988 
4989 /*
4990  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
4991  */
4992 static int
file_sort_by_lsn(const ListCell * a_p,const ListCell * b_p)4993 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
4994 {
4995 	RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p);
4996 	RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p);
4997 
4998 	if (a->lsn < b->lsn)
4999 		return -1;
5000 	else if (a->lsn > b->lsn)
5001 		return 1;
5002 	return 0;
5003 }
5004 
5005 /*
5006  * Apply any existing logical remapping files if there are any targeted at our
5007  * transaction for relid.
5008  */
5009 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)5010 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
5011 {
5012 	DIR		   *mapping_dir;
5013 	struct dirent *mapping_de;
5014 	List	   *files = NIL;
5015 	ListCell   *file;
5016 	Oid			dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5017 
5018 	mapping_dir = AllocateDir("pg_logical/mappings");
5019 	while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5020 	{
5021 		Oid			f_dboid;
5022 		Oid			f_relid;
5023 		TransactionId f_mapped_xid;
5024 		TransactionId f_create_xid;
5025 		XLogRecPtr	f_lsn;
5026 		uint32		f_hi,
5027 					f_lo;
5028 		RewriteMappingFile *f;
5029 
5030 		if (strcmp(mapping_de->d_name, ".") == 0 ||
5031 			strcmp(mapping_de->d_name, "..") == 0)
5032 			continue;
5033 
5034 		/* Ignore files that aren't ours */
5035 		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5036 			continue;
5037 
5038 		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5039 				   &f_dboid, &f_relid, &f_hi, &f_lo,
5040 				   &f_mapped_xid, &f_create_xid) != 6)
5041 			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5042 
5043 		f_lsn = ((uint64) f_hi) << 32 | f_lo;
5044 
5045 		/* mapping for another database */
5046 		if (f_dboid != dboid)
5047 			continue;
5048 
5049 		/* mapping for another relation */
5050 		if (f_relid != relid)
5051 			continue;
5052 
5053 		/* did the creating transaction abort? */
5054 		if (!TransactionIdDidCommit(f_create_xid))
5055 			continue;
5056 
5057 		/* not for our transaction */
5058 		if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5059 			continue;
5060 
5061 		/* ok, relevant, queue for apply */
5062 		f = palloc(sizeof(RewriteMappingFile));
5063 		f->lsn = f_lsn;
5064 		strcpy(f->fname, mapping_de->d_name);
5065 		files = lappend(files, f);
5066 	}
5067 	FreeDir(mapping_dir);
5068 
5069 	/* sort files so we apply them in LSN order */
5070 	list_sort(files, file_sort_by_lsn);
5071 
5072 	foreach(file, files)
5073 	{
5074 		RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file);
5075 
5076 		elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5077 			 snapshot->subxip[0]);
5078 		ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
5079 		pfree(f);
5080 	}
5081 }
5082 
5083 /*
5084  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5085  * combo CIDs.
5086  */
5087 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)5088 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
5089 							  Snapshot snapshot,
5090 							  HeapTuple htup, Buffer buffer,
5091 							  CommandId *cmin, CommandId *cmax)
5092 {
5093 	ReorderBufferTupleCidKey key;
5094 	ReorderBufferTupleCidEnt *ent;
5095 	ForkNumber	forkno;
5096 	BlockNumber blockno;
5097 	bool		updated_mapping = false;
5098 
5099 	/*
5100 	 * Return unresolved if tuplecid_data is not valid.  That's because when
5101 	 * streaming in-progress transactions we may run into tuples with the CID
5102 	 * before actually decoding them.  Think e.g. about INSERT followed by
5103 	 * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5104 	 * INSERT.  So in such cases, we assume the CID is from the future
5105 	 * command.
5106 	 */
5107 	if (tuplecid_data == NULL)
5108 		return false;
5109 
5110 	/* be careful about padding */
5111 	memset(&key, 0, sizeof(key));
5112 
5113 	Assert(!BufferIsLocal(buffer));
5114 
5115 	/*
5116 	 * get relfilenode from the buffer, no convenient way to access it other
5117 	 * than that.
5118 	 */
5119 	BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
5120 
5121 	/* tuples can only be in the main fork */
5122 	Assert(forkno == MAIN_FORKNUM);
5123 	Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5124 
5125 	ItemPointerCopy(&htup->t_self,
5126 					&key.tid);
5127 
5128 restart:
5129 	ent = (ReorderBufferTupleCidEnt *)
5130 		hash_search(tuplecid_data,
5131 					(void *) &key,
5132 					HASH_FIND,
5133 					NULL);
5134 
5135 	/*
5136 	 * failed to find a mapping, check whether the table was rewritten and
5137 	 * apply mapping if so, but only do that once - there can be no new
5138 	 * mappings while we are in here since we have to hold a lock on the
5139 	 * relation.
5140 	 */
5141 	if (ent == NULL && !updated_mapping)
5142 	{
5143 		UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
5144 		/* now check but don't update for a mapping again */
5145 		updated_mapping = true;
5146 		goto restart;
5147 	}
5148 	else if (ent == NULL)
5149 		return false;
5150 
5151 	if (cmin)
5152 		*cmin = ent->cmin;
5153 	if (cmax)
5154 		*cmax = ent->cmax;
5155 	return true;
5156 }
5157