1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *	  PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *	  This module gets handed individual pieces of transactions in the order
15  *	  they are written to the WAL and is responsible to reassemble them into
16  *	  toplevel transaction sized pieces. When a transaction is completely
17  *	  reassembled - signalled by reading the transaction commit record - it
18  *	  will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  *	  individual changes. The output plugins rely on snapshots built by
20  *	  snapbuild.c which hands them to us.
21  *
22  *	  Transactions and subtransactions/savepoints in postgres are not
23  *	  immediately linked to each other from outside the performing
24  *	  backend. Only at commit/abort (or special xact_assignment records) they
25  *	  are linked together. Which means that we will have to splice together a
26  *	  toplevel transaction from its subtransactions. To do that efficiently we
27  *	  build a binary heap indexed by the smallest current lsn of the individual
28  *	  subtransactions' changestreams. As the individual streams are inherently
29  *	  ordered by LSN - since that is where we build them from - the transaction
30  *	  can easily be reassembled by always using the subtransaction with the
31  *	  smallest current LSN from the heap.
32  *
33  *	  In order to cope with large transactions - which can be several times as
34  *	  big as the available memory - this module supports spooling the contents
35  *	  of a large transactions to disk. When the transaction is replayed the
36  *	  contents of individual (sub-)transactions will be read from disk in
37  *	  chunks.
38  *
39  *	  This module also has to deal with reassembling toast records from the
40  *	  individual chunks stored in WAL. When a new (or initial) version of a
41  *	  tuple is stored in WAL it will always be preceded by the toast chunks
42  *	  emitted for the columns stored out of line. Within a single toplevel
43  *	  transaction there will be no other data carrying records between a row's
44  *	  toast chunks and the row data itself. See ReorderBufferToast* for
45  *	  details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49 
50 #include <unistd.h>
51 #include <sys/stat.h>
52 
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "replication/logical.h"
62 #include "replication/reorderbuffer.h"
63 #include "replication/slot.h"
64 #include "replication/snapbuild.h"		/* just for SnapBuildSnapDecRefcount */
65 #include "storage/bufmgr.h"
66 #include "storage/fd.h"
67 #include "storage/sinval.h"
68 #include "utils/builtins.h"
69 #include "utils/combocid.h"
70 #include "utils/memdebug.h"
71 #include "utils/memutils.h"
72 #include "utils/rel.h"
73 #include "utils/relfilenodemap.h"
74 #include "utils/tqual.h"
75 
76 
77 /* entry for a hash table we use to map from xid to our transaction state */
78 typedef struct ReorderBufferTXNByIdEnt
79 {
80 	TransactionId xid;
81 	ReorderBufferTXN *txn;
82 } ReorderBufferTXNByIdEnt;
83 
84 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
85 typedef struct ReorderBufferTupleCidKey
86 {
87 	RelFileNode relnode;
88 	ItemPointerData tid;
89 } ReorderBufferTupleCidKey;
90 
91 typedef struct ReorderBufferTupleCidEnt
92 {
93 	ReorderBufferTupleCidKey key;
94 	CommandId	cmin;
95 	CommandId	cmax;
96 	CommandId	combocid;		/* just for debugging */
97 } ReorderBufferTupleCidEnt;
98 
99 /* k-way in-order change iteration support structures */
100 typedef struct ReorderBufferIterTXNEntry
101 {
102 	XLogRecPtr	lsn;
103 	ReorderBufferChange *change;
104 	ReorderBufferTXN *txn;
105 	File		fd;
106 	XLogSegNo	segno;
107 } ReorderBufferIterTXNEntry;
108 
109 typedef struct ReorderBufferIterTXNState
110 {
111 	binaryheap *heap;
112 	Size		nr_txns;
113 	dlist_head	old_change;
114 	ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
115 } ReorderBufferIterTXNState;
116 
117 /* toast datastructures */
118 typedef struct ReorderBufferToastEnt
119 {
120 	Oid			chunk_id;		/* toast_table.chunk_id */
121 	int32		last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
122 								 * have seen */
123 	Size		num_chunks;		/* number of chunks we've already seen */
124 	Size		size;			/* combined size of chunks seen */
125 	dlist_head	chunks;			/* linked list of chunks */
126 	struct varlena *reconstructed;		/* reconstructed varlena now pointed
127 										 * to in main tup */
128 } ReorderBufferToastEnt;
129 
130 /* Disk serialization support datastructures */
131 typedef struct ReorderBufferDiskChange
132 {
133 	Size		size;
134 	ReorderBufferChange change;
135 	/* data follows */
136 } ReorderBufferDiskChange;
137 
138 /*
139  * Maximum number of changes kept in memory, per transaction. After that,
140  * changes are spooled to disk.
141  *
142  * The current value should be sufficient to decode the entire transaction
143  * without hitting disk in OLTP workloads, while starting to spool to disk in
144  * other workloads reasonably fast.
145  *
146  * At some point in the future it probably makes sense to have a more elaborate
147  * resource management here, but it's not entirely clear what that would look
148  * like.
149  */
150 static const Size max_changes_in_memory = 4096;
151 
152 /*
153  * We use a very simple form of a slab allocator for frequently allocated
154  * objects, simply keeping a fixed number in a linked list when unused,
155  * instead pfree()ing them. Without that in many workloads aset.c becomes a
156  * major bottleneck, especially when spilling to disk while decoding batch
157  * workloads.
158  */
159 static const Size max_cached_changes = 4096 * 2;
160 static const Size max_cached_tuplebufs = 4096 * 2;		/* ~64MB */
161 static const Size max_cached_transactions = 512;
162 
163 
164 /* ---------------------------------------
165  * primary reorderbuffer support routines
166  * ---------------------------------------
167  */
168 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
169 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
170 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
171 					  TransactionId xid, bool create, bool *is_new,
172 					  XLogRecPtr lsn, bool create_as_top);
173 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
174 								  ReorderBufferTXN *subtxn);
175 
176 static void AssertTXNLsnOrder(ReorderBuffer *rb);
177 
178 /* ---------------------------------------
179  * support functions for lsn-order iterating over the ->changes of a
180  * transaction and its subtransactions
181  *
182  * used for iteration over the k-way heap merge of a transaction and its
183  * subtransactions
184  * ---------------------------------------
185  */
186 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
187 									 ReorderBufferIterTXNState *volatile *iter_state);
188 static ReorderBufferChange *
189 			ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
190 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
191 						   ReorderBufferIterTXNState *state);
192 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
193 
194 /*
195  * ---------------------------------------
196  * Disk serialization support functions
197  * ---------------------------------------
198  */
199 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
200 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
201 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
202 							 int fd, ReorderBufferChange *change);
203 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
204 							File *fd, XLogSegNo *segno);
205 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
206 						   char *change);
207 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
208 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
209 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
210 							TransactionId xid, XLogSegNo segno);
211 
212 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
213 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
214 					  ReorderBufferTXN *txn, CommandId cid);
215 
216 /* ---------------------------------------
217  * toast reassembly support
218  * ---------------------------------------
219  */
220 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
221 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
222 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
223 						  Relation relation, ReorderBufferChange *change);
224 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
225 							  Relation relation, ReorderBufferChange *change);
226 
227 
228 /*
229  * Allocate a new ReorderBuffer and clean out any old serialized state from
230  * prior ReorderBuffer instances for the same slot.
231  */
232 ReorderBuffer *
ReorderBufferAllocate(void)233 ReorderBufferAllocate(void)
234 {
235 	ReorderBuffer *buffer;
236 	HASHCTL		hash_ctl;
237 	MemoryContext new_ctx;
238 
239 	Assert(MyReplicationSlot != NULL);
240 
241 	/* allocate memory in own context, to have better accountability */
242 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
243 									"ReorderBuffer",
244 									ALLOCSET_DEFAULT_SIZES);
245 
246 	buffer =
247 		(ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
248 
249 	memset(&hash_ctl, 0, sizeof(hash_ctl));
250 
251 	buffer->context = new_ctx;
252 
253 	hash_ctl.keysize = sizeof(TransactionId);
254 	hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
255 	hash_ctl.hcxt = buffer->context;
256 
257 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
258 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
259 
260 	buffer->by_txn_last_xid = InvalidTransactionId;
261 	buffer->by_txn_last_txn = NULL;
262 
263 	buffer->nr_cached_transactions = 0;
264 	buffer->nr_cached_changes = 0;
265 	buffer->nr_cached_tuplebufs = 0;
266 
267 	buffer->outbuf = NULL;
268 	buffer->outbufsize = 0;
269 
270 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
271 
272 	dlist_init(&buffer->toplevel_by_lsn);
273 	dlist_init(&buffer->txns_by_base_snapshot_lsn);
274 	dlist_init(&buffer->cached_transactions);
275 	dlist_init(&buffer->cached_changes);
276 	slist_init(&buffer->cached_tuplebufs);
277 
278 	/*
279 	 * Ensure there's no stale data from prior uses of this slot, in case some
280 	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
281 	 * produce duplicated txns, and it's very cheap if there's nothing there.
282 	 */
283 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
284 
285 	return buffer;
286 }
287 
288 /*
289  * Free a ReorderBuffer
290  */
291 void
ReorderBufferFree(ReorderBuffer * rb)292 ReorderBufferFree(ReorderBuffer *rb)
293 {
294 	MemoryContext context = rb->context;
295 
296 	/*
297 	 * We free separately allocated data by entirely scrapping reorderbuffer's
298 	 * memory context.
299 	 */
300 	MemoryContextDelete(context);
301 
302 	/* Free disk space used by unconsumed reorder buffers */
303 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
304 }
305 
306 /*
307  * Get an unused, possibly preallocated, ReorderBufferTXN.
308  */
309 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)310 ReorderBufferGetTXN(ReorderBuffer *rb)
311 {
312 	ReorderBufferTXN *txn;
313 
314 	/* check the slab cache */
315 	if (rb->nr_cached_transactions > 0)
316 	{
317 		rb->nr_cached_transactions--;
318 		txn = (ReorderBufferTXN *)
319 			dlist_container(ReorderBufferTXN, node,
320 							dlist_pop_head_node(&rb->cached_transactions));
321 	}
322 	else
323 	{
324 		txn = (ReorderBufferTXN *)
325 			MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
326 	}
327 
328 	memset(txn, 0, sizeof(ReorderBufferTXN));
329 
330 	dlist_init(&txn->changes);
331 	dlist_init(&txn->tuplecids);
332 	dlist_init(&txn->subtxns);
333 
334 	return txn;
335 }
336 
337 /*
338  * Free a ReorderBufferTXN.
339  *
340  * Deallocation might be delayed for efficiency purposes, for details check
341  * the comments above max_cached_changes's definition.
342  */
343 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)344 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
345 {
346 	/* clean the lookup cache if we were cached (quite likely) */
347 	if (rb->by_txn_last_xid == txn->xid)
348 	{
349 		rb->by_txn_last_xid = InvalidTransactionId;
350 		rb->by_txn_last_txn = NULL;
351 	}
352 
353 	/* free data that's contained */
354 
355 	if (txn->tuplecid_hash != NULL)
356 	{
357 		hash_destroy(txn->tuplecid_hash);
358 		txn->tuplecid_hash = NULL;
359 	}
360 
361 	if (txn->invalidations)
362 	{
363 		pfree(txn->invalidations);
364 		txn->invalidations = NULL;
365 	}
366 
367 	/* Reset the toast hash */
368 	ReorderBufferToastReset(rb, txn);
369 
370 	/* check whether to put into the slab cache */
371 	if (rb->nr_cached_transactions < max_cached_transactions)
372 	{
373 		rb->nr_cached_transactions++;
374 		dlist_push_head(&rb->cached_transactions, &txn->node);
375 		VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
376 		VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
377 	}
378 	else
379 	{
380 		pfree(txn);
381 	}
382 }
383 
384 /*
385  * Get an unused, possibly preallocated, ReorderBufferChange.
386  */
387 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)388 ReorderBufferGetChange(ReorderBuffer *rb)
389 {
390 	ReorderBufferChange *change;
391 
392 	/* check the slab cache */
393 	if (rb->nr_cached_changes)
394 	{
395 		rb->nr_cached_changes--;
396 		change = (ReorderBufferChange *)
397 			dlist_container(ReorderBufferChange, node,
398 							dlist_pop_head_node(&rb->cached_changes));
399 	}
400 	else
401 	{
402 		change = (ReorderBufferChange *)
403 			MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
404 	}
405 
406 	memset(change, 0, sizeof(ReorderBufferChange));
407 	return change;
408 }
409 
410 /*
411  * Free an ReorderBufferChange.
412  *
413  * Deallocation might be delayed for efficiency purposes, for details check
414  * the comments above max_cached_changes's definition.
415  */
416 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change)417 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
418 {
419 	/* free contained data */
420 	switch (change->action)
421 	{
422 		case REORDER_BUFFER_CHANGE_INSERT:
423 		case REORDER_BUFFER_CHANGE_UPDATE:
424 		case REORDER_BUFFER_CHANGE_DELETE:
425 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
426 			if (change->data.tp.newtuple)
427 			{
428 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
429 				change->data.tp.newtuple = NULL;
430 			}
431 
432 			if (change->data.tp.oldtuple)
433 			{
434 				ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
435 				change->data.tp.oldtuple = NULL;
436 			}
437 			break;
438 		case REORDER_BUFFER_CHANGE_MESSAGE:
439 			if (change->data.msg.prefix != NULL)
440 				pfree(change->data.msg.prefix);
441 			change->data.msg.prefix = NULL;
442 			if (change->data.msg.message != NULL)
443 				pfree(change->data.msg.message);
444 			change->data.msg.message = NULL;
445 			break;
446 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
447 			if (change->data.snapshot)
448 			{
449 				ReorderBufferFreeSnap(rb, change->data.snapshot);
450 				change->data.snapshot = NULL;
451 			}
452 			break;
453 			/* no data in addition to the struct itself */
454 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
455 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
456 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
457 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
458 			break;
459 	}
460 
461 	/* check whether to put into the slab cache */
462 	if (rb->nr_cached_changes < max_cached_changes)
463 	{
464 		rb->nr_cached_changes++;
465 		dlist_push_head(&rb->cached_changes, &change->node);
466 		VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
467 		VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
468 	}
469 	else
470 	{
471 		pfree(change);
472 	}
473 }
474 
475 
476 /*
477  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
478  * least a tuple of size tuple_len (excluding header overhead).
479  */
480 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)481 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
482 {
483 	ReorderBufferTupleBuf *tuple;
484 	Size		alloc_len;
485 
486 	alloc_len = tuple_len + SizeofHeapTupleHeader;
487 
488 	/*
489 	 * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
490 	 * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
491 	 * generated for oldtuples can be bigger, as they don't have out-of-line
492 	 * toast columns.
493 	 */
494 	if (alloc_len < MaxHeapTupleSize)
495 		alloc_len = MaxHeapTupleSize;
496 
497 
498 	/* if small enough, check the slab cache */
499 	if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
500 	{
501 		rb->nr_cached_tuplebufs--;
502 		tuple = slist_container(ReorderBufferTupleBuf, node,
503 								slist_pop_head_node(&rb->cached_tuplebufs));
504 		Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
505 #ifdef USE_ASSERT_CHECKING
506 		memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
507 		VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
508 #endif
509 		tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
510 #ifdef USE_ASSERT_CHECKING
511 		memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
512 		VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
513 #endif
514 	}
515 	else
516 	{
517 		tuple = (ReorderBufferTupleBuf *)
518 			MemoryContextAlloc(rb->context,
519 							   sizeof(ReorderBufferTupleBuf) +
520 							   MAXIMUM_ALIGNOF + alloc_len);
521 		tuple->alloc_tuple_size = alloc_len;
522 		tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
523 	}
524 
525 	return tuple;
526 }
527 
528 /*
529  * Free an ReorderBufferTupleBuf.
530  *
531  * Deallocation might be delayed for efficiency purposes, for details check
532  * the comments above max_cached_changes's definition.
533  */
534 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)535 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
536 {
537 	/* check whether to put into the slab cache, oversized tuples never are */
538 	if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
539 		rb->nr_cached_tuplebufs < max_cached_tuplebufs)
540 	{
541 		rb->nr_cached_tuplebufs++;
542 		slist_push_head(&rb->cached_tuplebufs, &tuple->node);
543 		VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
544 		VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
545 		VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
546 		VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
547 	}
548 	else
549 	{
550 		pfree(tuple);
551 	}
552 }
553 
554 /*
555  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
556  * If create is true, and a transaction doesn't already exist, create it
557  * (with the given LSN, and as top transaction if that's specified);
558  * when this happens, is_new is set to true.
559  */
560 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)561 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
562 					  bool *is_new, XLogRecPtr lsn, bool create_as_top)
563 {
564 	ReorderBufferTXN *txn;
565 	ReorderBufferTXNByIdEnt *ent;
566 	bool		found;
567 
568 	Assert(TransactionIdIsValid(xid));
569 
570 	/*
571 	 * Check the one-entry lookup cache first
572 	 */
573 	if (TransactionIdIsValid(rb->by_txn_last_xid) &&
574 		rb->by_txn_last_xid == xid)
575 	{
576 		txn = rb->by_txn_last_txn;
577 
578 		if (txn != NULL)
579 		{
580 			/* found it, and it's valid */
581 			if (is_new)
582 				*is_new = false;
583 			return txn;
584 		}
585 
586 		/*
587 		 * cached as non-existent, and asked not to create? Then nothing else
588 		 * to do.
589 		 */
590 		if (!create)
591 			return NULL;
592 		/* otherwise fall through to create it */
593 	}
594 
595 	/*
596 	 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
597 	 * to create an entry.
598 	 */
599 
600 	/* search the lookup table */
601 	ent = (ReorderBufferTXNByIdEnt *)
602 		hash_search(rb->by_txn,
603 					(void *) &xid,
604 					create ? HASH_ENTER : HASH_FIND,
605 					&found);
606 	if (found)
607 		txn = ent->txn;
608 	else if (create)
609 	{
610 		/* initialize the new entry, if creation was requested */
611 		Assert(ent != NULL);
612 		Assert(lsn != InvalidXLogRecPtr);
613 
614 		ent->txn = ReorderBufferGetTXN(rb);
615 		ent->txn->xid = xid;
616 		txn = ent->txn;
617 		txn->first_lsn = lsn;
618 		txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
619 
620 		if (create_as_top)
621 		{
622 			dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
623 			AssertTXNLsnOrder(rb);
624 		}
625 	}
626 	else
627 		txn = NULL;				/* not found and not asked to create */
628 
629 	/* update cache */
630 	rb->by_txn_last_xid = xid;
631 	rb->by_txn_last_txn = txn;
632 
633 	if (is_new)
634 		*is_new = !found;
635 
636 	Assert(!create || txn != NULL);
637 	return txn;
638 }
639 
640 /*
641  * Queue a change into a transaction so it can be replayed upon commit.
642  */
643 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change)644 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
645 						 ReorderBufferChange *change)
646 {
647 	ReorderBufferTXN *txn;
648 
649 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
650 
651 	change->lsn = lsn;
652 	Assert(InvalidXLogRecPtr != lsn);
653 	dlist_push_tail(&txn->changes, &change->node);
654 	txn->nentries++;
655 	txn->nentries_mem++;
656 
657 	ReorderBufferCheckSerializeTXN(rb, txn);
658 }
659 
660 /*
661  * Queue message into a transaction so it can be processed upon commit.
662  */
663 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)664 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
665 						  Snapshot snapshot, XLogRecPtr lsn,
666 						  bool transactional, const char *prefix,
667 						  Size message_size, const char *message)
668 {
669 	if (transactional)
670 	{
671 		MemoryContext oldcontext;
672 		ReorderBufferChange *change;
673 
674 		Assert(xid != InvalidTransactionId);
675 
676 		oldcontext = MemoryContextSwitchTo(rb->context);
677 
678 		change = ReorderBufferGetChange(rb);
679 		change->action = REORDER_BUFFER_CHANGE_MESSAGE;
680 		change->data.msg.prefix = pstrdup(prefix);
681 		change->data.msg.message_size = message_size;
682 		change->data.msg.message = palloc(message_size);
683 		memcpy(change->data.msg.message, message, message_size);
684 
685 		ReorderBufferQueueChange(rb, xid, lsn, change);
686 
687 		MemoryContextSwitchTo(oldcontext);
688 	}
689 	else
690 	{
691 		ReorderBufferTXN *txn = NULL;
692 		volatile Snapshot snapshot_now = snapshot;
693 
694 		if (xid != InvalidTransactionId)
695 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
696 
697 		/* setup snapshot to allow catalog access */
698 		SetupHistoricSnapshot(snapshot_now, NULL);
699 		PG_TRY();
700 		{
701 			rb->message(rb, txn, lsn, false, prefix, message_size, message);
702 
703 			TeardownHistoricSnapshot(false);
704 		}
705 		PG_CATCH();
706 		{
707 			TeardownHistoricSnapshot(true);
708 			PG_RE_THROW();
709 		}
710 		PG_END_TRY();
711 	}
712 }
713 
714 /*
715  * AssertTXNLsnOrder
716  *		Verify LSN ordering of transaction lists in the reorderbuffer
717  *
718  * Other LSN-related invariants are checked too.
719  *
720  * No-op if assertions are not in use.
721  */
722 static void
AssertTXNLsnOrder(ReorderBuffer * rb)723 AssertTXNLsnOrder(ReorderBuffer *rb)
724 {
725 #ifdef USE_ASSERT_CHECKING
726 	dlist_iter	iter;
727 	XLogRecPtr	prev_first_lsn = InvalidXLogRecPtr;
728 	XLogRecPtr	prev_base_snap_lsn = InvalidXLogRecPtr;
729 
730 	dlist_foreach(iter, &rb->toplevel_by_lsn)
731 	{
732 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
733 													iter.cur);
734 
735 		/* start LSN must be set */
736 		Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
737 
738 		/* If there is an end LSN, it must be higher than start LSN */
739 		if (cur_txn->end_lsn != InvalidXLogRecPtr)
740 			Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
741 
742 		/* Current initial LSN must be strictly higher than previous */
743 		if (prev_first_lsn != InvalidXLogRecPtr)
744 			Assert(prev_first_lsn < cur_txn->first_lsn);
745 
746 		/* known-as-subtxn txns must not be listed */
747 		Assert(!cur_txn->is_known_as_subxact);
748 
749 		prev_first_lsn = cur_txn->first_lsn;
750 	}
751 
752 	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
753 	{
754 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
755 													base_snapshot_node,
756 													iter.cur);
757 
758 		/* base snapshot (and its LSN) must be set */
759 		Assert(cur_txn->base_snapshot != NULL);
760 		Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
761 
762 		/* current LSN must be strictly higher than previous */
763 		if (prev_base_snap_lsn != InvalidXLogRecPtr)
764 			Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
765 
766 		/* known-as-subtxn txns must not be listed */
767 		Assert(!cur_txn->is_known_as_subxact);
768 
769 		prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
770 	}
771 #endif
772 }
773 
774 /*
775  * ReorderBufferGetOldestTXN
776  *		Return oldest transaction in reorderbuffer
777  */
778 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)779 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
780 {
781 	ReorderBufferTXN *txn;
782 
783 	AssertTXNLsnOrder(rb);
784 
785 	if (dlist_is_empty(&rb->toplevel_by_lsn))
786 		return NULL;
787 
788 	txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
789 
790 	Assert(!txn->is_known_as_subxact);
791 	Assert(txn->first_lsn != InvalidXLogRecPtr);
792 	return txn;
793 }
794 
795 /*
796  * ReorderBufferGetOldestXmin
797  *		Return oldest Xmin in reorderbuffer
798  *
799  * Returns oldest possibly running Xid from the point of view of snapshots
800  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
801  * there are none.
802  *
803  * Since snapshots are assigned monotonically, this equals the Xmin of the
804  * base snapshot with minimal base_snapshot_lsn.
805  */
806 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)807 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
808 {
809 	ReorderBufferTXN *txn;
810 
811 	AssertTXNLsnOrder(rb);
812 
813 	if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
814 		return InvalidTransactionId;
815 
816 	txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
817 							 &rb->txns_by_base_snapshot_lsn);
818 	return txn->base_snapshot->xmin;
819 }
820 
821 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)822 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
823 {
824 	rb->current_restart_decoding_lsn = ptr;
825 }
826 
827 /*
828  * ReorderBufferAssignChild
829  *
830  * Make note that we know that subxid is a subtransaction of xid, seen as of
831  * the given lsn.
832  */
833 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)834 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
835 						 TransactionId subxid, XLogRecPtr lsn)
836 {
837 	ReorderBufferTXN *txn;
838 	ReorderBufferTXN *subtxn;
839 	bool		new_top;
840 	bool		new_sub;
841 
842 	txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
843 	subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
844 
845 	if (!new_sub)
846 	{
847 		if (subtxn->is_known_as_subxact)
848 		{
849 			/* already associated, nothing to do */
850 			return;
851 		}
852 		else
853 		{
854 			/*
855 			 * We already saw this transaction, but initially added it to the list
856 			 * of top-level txns.  Now that we know it's not top-level, remove
857 			 * it from there.
858 			 */
859 			dlist_delete(&subtxn->node);
860 		}
861 	}
862 
863 	subtxn->is_known_as_subxact = true;
864 	subtxn->toplevel_xid = xid;
865 	Assert(subtxn->nsubtxns == 0);
866 
867 	/* add to subtransaction list */
868 	dlist_push_tail(&txn->subtxns, &subtxn->node);
869 	txn->nsubtxns++;
870 
871 	/* Possibly transfer the subtxn's snapshot to its top-level txn. */
872 	ReorderBufferTransferSnapToParent(txn, subtxn);
873 
874 	/* Verify LSN-ordering invariant */
875 	AssertTXNLsnOrder(rb);
876 }
877 
878 /*
879  * ReorderBufferTransferSnapToParent
880  *		Transfer base snapshot from subtxn to top-level txn, if needed
881  *
882  * This is done if the top-level txn doesn't have a base snapshot, or if the
883  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
884  * snapshot's LSN.  This can happen if there are no changes in the toplevel
885  * txn but there are some in the subtxn, or the first change in subtxn has
886  * earlier LSN than first change in the top-level txn and we learned about
887  * their kinship only now.
888  *
889  * The subtransaction's snapshot is cleared regardless of the transfer
890  * happening, since it's not needed anymore in either case.
891  *
892  * We do this as soon as we become aware of their kinship, to avoid queueing
893  * extra snapshots to txns known-as-subtxns -- only top-level txns will
894  * receive further snapshots.
895  */
896 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)897 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
898 								  ReorderBufferTXN *subtxn)
899 {
900 	Assert(subtxn->toplevel_xid == txn->xid);
901 
902 	if (subtxn->base_snapshot != NULL)
903 	{
904 		if (txn->base_snapshot == NULL ||
905 			subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
906 		{
907 			/*
908 			 * If the toplevel transaction already has a base snapshot but
909 			 * it's newer than the subxact's, purge it.
910 			 */
911 			if (txn->base_snapshot != NULL)
912 			{
913 				SnapBuildSnapDecRefcount(txn->base_snapshot);
914 				dlist_delete(&txn->base_snapshot_node);
915 			}
916 
917 			/*
918 			 * The snapshot is now the top transaction's; transfer it, and
919 			 * adjust the list position of the top transaction in the list by
920 			 * moving it to where the subtransaction is.
921 			 */
922 			txn->base_snapshot = subtxn->base_snapshot;
923 			txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
924 			dlist_insert_before(&subtxn->base_snapshot_node,
925 								&txn->base_snapshot_node);
926 
927 			/*
928 			 * The subtransaction doesn't have a snapshot anymore (so it
929 			 * mustn't be in the list.)
930 			 */
931 			subtxn->base_snapshot = NULL;
932 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
933 			dlist_delete(&subtxn->base_snapshot_node);
934 		}
935 		else
936 		{
937 			/* Base snap of toplevel is fine, so subxact's is not needed */
938 			SnapBuildSnapDecRefcount(subtxn->base_snapshot);
939 			dlist_delete(&subtxn->base_snapshot_node);
940 			subtxn->base_snapshot = NULL;
941 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
942 		}
943 	}
944 }
945 
946 /*
947  * Associate a subtransaction with its toplevel transaction at commit
948  * time. There may be no further changes added after this.
949  */
950 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)951 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
952 						 TransactionId subxid, XLogRecPtr commit_lsn,
953 						 XLogRecPtr end_lsn)
954 {
955 	ReorderBufferTXN *subtxn;
956 
957 	subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
958 								   InvalidXLogRecPtr, false);
959 
960 	/*
961 	 * No need to do anything if that subtxn didn't contain any changes
962 	 */
963 	if (!subtxn)
964 		return;
965 
966 	subtxn->final_lsn = commit_lsn;
967 	subtxn->end_lsn = end_lsn;
968 
969 	/*
970 	 * Assign this subxact as a child of the toplevel xact (no-op if already
971 	 * done.)
972 	 */
973 	ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
974 }
975 
976 
977 /*
978  * Support for efficiently iterating over a transaction's and its
979  * subtransactions' changes.
980  *
981  * We do by doing a k-way merge between transactions/subtransactions. For that
982  * we model the current heads of the different transactions as a binary heap
983  * so we easily know which (sub-)transaction has the change with the smallest
984  * lsn next.
985  *
986  * We assume the changes in individual transactions are already sorted by LSN.
987  */
988 
989 /*
990  * Binary heap comparison function.
991  */
992 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)993 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
994 {
995 	ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
996 	XLogRecPtr	pos_a = state->entries[DatumGetInt32(a)].lsn;
997 	XLogRecPtr	pos_b = state->entries[DatumGetInt32(b)].lsn;
998 
999 	if (pos_a < pos_b)
1000 		return 1;
1001 	else if (pos_a == pos_b)
1002 		return 0;
1003 	return -1;
1004 }
1005 
1006 /*
1007  * Allocate & initialize an iterator which iterates in lsn order over a
1008  * transaction and all its subtransactions.
1009  *
1010  * Note: The iterator state is returned through iter_state parameter rather
1011  * than the function's return value.  This is because the state gets cleaned up
1012  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1013  * back the state even if this function throws an exception.
1014  */
1015 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)1016 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
1017 						 ReorderBufferIterTXNState *volatile *iter_state)
1018 {
1019 	Size		nr_txns = 0;
1020 	ReorderBufferIterTXNState *state;
1021 	dlist_iter	cur_txn_i;
1022 	int32		off;
1023 
1024 	*iter_state = NULL;
1025 
1026 	/*
1027 	 * Calculate the size of our heap: one element for every transaction that
1028 	 * contains changes.  (Besides the transactions already in the reorder
1029 	 * buffer, we count the one we were directly passed.)
1030 	 */
1031 	if (txn->nentries > 0)
1032 		nr_txns++;
1033 
1034 	dlist_foreach(cur_txn_i, &txn->subtxns)
1035 	{
1036 		ReorderBufferTXN *cur_txn;
1037 
1038 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1039 
1040 		if (cur_txn->nentries > 0)
1041 			nr_txns++;
1042 	}
1043 
1044 	/*
1045 	 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
1046 	 * need to allocate/build a heap then.
1047 	 */
1048 
1049 	/* allocate iteration state */
1050 	state = (ReorderBufferIterTXNState *)
1051 		MemoryContextAllocZero(rb->context,
1052 							   sizeof(ReorderBufferIterTXNState) +
1053 							   sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1054 
1055 	state->nr_txns = nr_txns;
1056 	dlist_init(&state->old_change);
1057 
1058 	for (off = 0; off < state->nr_txns; off++)
1059 	{
1060 		state->entries[off].fd = -1;
1061 		state->entries[off].segno = 0;
1062 	}
1063 
1064 	/* allocate heap */
1065 	state->heap = binaryheap_allocate(state->nr_txns,
1066 									  ReorderBufferIterCompare,
1067 									  state);
1068 
1069 	/* Now that the state fields are initialized, it is safe to return it. */
1070 	*iter_state = state;
1071 
1072 	/*
1073 	 * Now insert items into the binary heap, in an unordered fashion.  (We
1074 	 * will run a heap assembly step at the end; this is more efficient.)
1075 	 */
1076 
1077 	off = 0;
1078 
1079 	/* add toplevel transaction if it contains changes */
1080 	if (txn->nentries > 0)
1081 	{
1082 		ReorderBufferChange *cur_change;
1083 
1084 		if (txn->serialized)
1085 		{
1086 			/* serialize remaining changes */
1087 			ReorderBufferSerializeTXN(rb, txn);
1088 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1089 										&state->entries[off].segno);
1090 		}
1091 
1092 		cur_change = dlist_head_element(ReorderBufferChange, node,
1093 										&txn->changes);
1094 
1095 		state->entries[off].lsn = cur_change->lsn;
1096 		state->entries[off].change = cur_change;
1097 		state->entries[off].txn = txn;
1098 
1099 		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1100 	}
1101 
1102 	/* add subtransactions if they contain changes */
1103 	dlist_foreach(cur_txn_i, &txn->subtxns)
1104 	{
1105 		ReorderBufferTXN *cur_txn;
1106 
1107 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1108 
1109 		if (cur_txn->nentries > 0)
1110 		{
1111 			ReorderBufferChange *cur_change;
1112 
1113 			if (cur_txn->serialized)
1114 			{
1115 				/* serialize remaining changes */
1116 				ReorderBufferSerializeTXN(rb, cur_txn);
1117 				ReorderBufferRestoreChanges(rb, cur_txn,
1118 											&state->entries[off].fd,
1119 											&state->entries[off].segno);
1120 			}
1121 			cur_change = dlist_head_element(ReorderBufferChange, node,
1122 											&cur_txn->changes);
1123 
1124 			state->entries[off].lsn = cur_change->lsn;
1125 			state->entries[off].change = cur_change;
1126 			state->entries[off].txn = cur_txn;
1127 
1128 			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1129 		}
1130 	}
1131 
1132 	/* assemble a valid binary heap */
1133 	binaryheap_build(state->heap);
1134 }
1135 
1136 /*
1137  * Return the next change when iterating over a transaction and its
1138  * subtransactions.
1139  *
1140  * Returns NULL when no further changes exist.
1141  */
1142 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1143 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1144 {
1145 	ReorderBufferChange *change;
1146 	ReorderBufferIterTXNEntry *entry;
1147 	int32		off;
1148 
1149 	/* nothing there anymore */
1150 	if (state->heap->bh_size == 0)
1151 		return NULL;
1152 
1153 	off = DatumGetInt32(binaryheap_first(state->heap));
1154 	entry = &state->entries[off];
1155 
1156 	/* free memory we might have "leaked" in the previous *Next call */
1157 	if (!dlist_is_empty(&state->old_change))
1158 	{
1159 		change = dlist_container(ReorderBufferChange, node,
1160 								 dlist_pop_head_node(&state->old_change));
1161 		ReorderBufferReturnChange(rb, change);
1162 		Assert(dlist_is_empty(&state->old_change));
1163 	}
1164 
1165 	change = entry->change;
1166 
1167 	/*
1168 	 * update heap with information about which transaction has the next
1169 	 * relevant change in LSN order
1170 	 */
1171 
1172 	/* there are in-memory changes */
1173 	if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1174 	{
1175 		dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1176 		ReorderBufferChange *next_change =
1177 		dlist_container(ReorderBufferChange, node, next);
1178 
1179 		/* txn stays the same */
1180 		state->entries[off].lsn = next_change->lsn;
1181 		state->entries[off].change = next_change;
1182 
1183 		binaryheap_replace_first(state->heap, Int32GetDatum(off));
1184 		return change;
1185 	}
1186 
1187 	/* try to load changes from disk */
1188 	if (entry->txn->nentries != entry->txn->nentries_mem)
1189 	{
1190 		/*
1191 		 * Ugly: restoring changes will reuse *Change records, thus delete the
1192 		 * current one from the per-tx list and only free in the next call.
1193 		 */
1194 		dlist_delete(&change->node);
1195 		dlist_push_tail(&state->old_change, &change->node);
1196 
1197 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1198 										&state->entries[off].segno))
1199 		{
1200 			/* successfully restored changes from disk */
1201 			ReorderBufferChange *next_change =
1202 			dlist_head_element(ReorderBufferChange, node,
1203 							   &entry->txn->changes);
1204 
1205 			elog(DEBUG2, "restored %u/%u changes from disk",
1206 				 (uint32) entry->txn->nentries_mem,
1207 				 (uint32) entry->txn->nentries);
1208 
1209 			Assert(entry->txn->nentries_mem);
1210 			/* txn stays the same */
1211 			state->entries[off].lsn = next_change->lsn;
1212 			state->entries[off].change = next_change;
1213 			binaryheap_replace_first(state->heap, Int32GetDatum(off));
1214 
1215 			return change;
1216 		}
1217 	}
1218 
1219 	/* ok, no changes there anymore, remove */
1220 	binaryheap_remove_first(state->heap);
1221 
1222 	return change;
1223 }
1224 
1225 /*
1226  * Deallocate the iterator
1227  */
1228 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1229 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1230 						   ReorderBufferIterTXNState *state)
1231 {
1232 	int32		off;
1233 
1234 	for (off = 0; off < state->nr_txns; off++)
1235 	{
1236 		if (state->entries[off].fd != -1)
1237 			FileClose(state->entries[off].fd);
1238 	}
1239 
1240 	/* free memory we might have "leaked" in the last *Next call */
1241 	if (!dlist_is_empty(&state->old_change))
1242 	{
1243 		ReorderBufferChange *change;
1244 
1245 		change = dlist_container(ReorderBufferChange, node,
1246 								 dlist_pop_head_node(&state->old_change));
1247 		ReorderBufferReturnChange(rb, change);
1248 		Assert(dlist_is_empty(&state->old_change));
1249 	}
1250 
1251 	binaryheap_free(state->heap);
1252 	pfree(state);
1253 }
1254 
1255 /*
1256  * Cleanup the contents of a transaction, usually after the transaction
1257  * committed or aborted.
1258  */
1259 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1260 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1261 {
1262 	bool		found;
1263 	dlist_mutable_iter iter;
1264 
1265 	/* cleanup subtransactions & their changes */
1266 	dlist_foreach_modify(iter, &txn->subtxns)
1267 	{
1268 		ReorderBufferTXN *subtxn;
1269 
1270 		subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1271 
1272 		/*
1273 		 * Subtransactions are always associated to the toplevel TXN, even if
1274 		 * they originally were happening inside another subtxn, so we won't
1275 		 * ever recurse more than one level deep here.
1276 		 */
1277 		Assert(subtxn->is_known_as_subxact);
1278 		Assert(subtxn->nsubtxns == 0);
1279 
1280 		ReorderBufferCleanupTXN(rb, subtxn);
1281 	}
1282 
1283 	/* cleanup changes in the toplevel txn */
1284 	dlist_foreach_modify(iter, &txn->changes)
1285 	{
1286 		ReorderBufferChange *change;
1287 
1288 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1289 
1290 		ReorderBufferReturnChange(rb, change);
1291 	}
1292 
1293 	/*
1294 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1295 	 * They are always stored in the toplevel transaction.
1296 	 */
1297 	dlist_foreach_modify(iter, &txn->tuplecids)
1298 	{
1299 		ReorderBufferChange *change;
1300 
1301 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1302 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1303 		ReorderBufferReturnChange(rb, change);
1304 	}
1305 
1306 	/*
1307 	 * Cleanup the base snapshot, if set.
1308 	 */
1309 	if (txn->base_snapshot != NULL)
1310 	{
1311 		SnapBuildSnapDecRefcount(txn->base_snapshot);
1312 		dlist_delete(&txn->base_snapshot_node);
1313 	}
1314 
1315 	/* delete from list of known subxacts */
1316 	if (txn->is_known_as_subxact)
1317 	{
1318 		/* NB: nsubxacts count of parent will be too high now */
1319 		dlist_delete(&txn->node);
1320 	}
1321 	/* delete from LSN ordered list of toplevel TXNs */
1322 	else
1323 	{
1324 		dlist_delete(&txn->node);
1325 	}
1326 
1327 	/* now remove reference from buffer */
1328 	hash_search(rb->by_txn,
1329 				(void *) &txn->xid,
1330 				HASH_REMOVE,
1331 				&found);
1332 	Assert(found);
1333 
1334 	/* remove entries spilled to disk */
1335 	if (txn->serialized)
1336 		ReorderBufferRestoreCleanup(rb, txn);
1337 
1338 	/* deallocate */
1339 	ReorderBufferReturnTXN(rb, txn);
1340 }
1341 
1342 /*
1343  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1344  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1345  */
1346 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1347 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1348 {
1349 	dlist_iter	iter;
1350 	HASHCTL		hash_ctl;
1351 
1352 	if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1353 		return;
1354 
1355 	memset(&hash_ctl, 0, sizeof(hash_ctl));
1356 
1357 	hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1358 	hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1359 	hash_ctl.hcxt = rb->context;
1360 
1361 	/*
1362 	 * create the hash with the exact number of to-be-stored tuplecids from
1363 	 * the start
1364 	 */
1365 	txn->tuplecid_hash =
1366 		hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1367 					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1368 
1369 	dlist_foreach(iter, &txn->tuplecids)
1370 	{
1371 		ReorderBufferTupleCidKey key;
1372 		ReorderBufferTupleCidEnt *ent;
1373 		bool		found;
1374 		ReorderBufferChange *change;
1375 
1376 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1377 
1378 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1379 
1380 		/* be careful about padding */
1381 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1382 
1383 		key.relnode = change->data.tuplecid.node;
1384 
1385 		ItemPointerCopy(&change->data.tuplecid.tid,
1386 						&key.tid);
1387 
1388 		ent = (ReorderBufferTupleCidEnt *)
1389 			hash_search(txn->tuplecid_hash,
1390 						(void *) &key,
1391 						HASH_ENTER | HASH_FIND,
1392 						&found);
1393 		if (!found)
1394 		{
1395 			ent->cmin = change->data.tuplecid.cmin;
1396 			ent->cmax = change->data.tuplecid.cmax;
1397 			ent->combocid = change->data.tuplecid.combocid;
1398 		}
1399 		else
1400 		{
1401 			/*
1402 			 * Maybe we already saw this tuple before in this transaction,
1403 			 * but if so it must have the same cmin.
1404 			 */
1405 			Assert(ent->cmin == change->data.tuplecid.cmin);
1406 
1407 			/*
1408 			 * cmax may be initially invalid, but once set it can only grow,
1409 			 * and never become invalid again.
1410 			 */
1411 			Assert((ent->cmax == InvalidCommandId) ||
1412 				   ((change->data.tuplecid.cmax != InvalidCommandId) &&
1413 					(change->data.tuplecid.cmax > ent->cmax)));
1414 			ent->cmax = change->data.tuplecid.cmax;
1415 		}
1416 	}
1417 }
1418 
1419 /*
1420  * Copy a provided snapshot so we can modify it privately. This is needed so
1421  * that catalog modifying transactions can look into intermediate catalog
1422  * states.
1423  */
1424 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1425 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1426 					  ReorderBufferTXN *txn, CommandId cid)
1427 {
1428 	Snapshot	snap;
1429 	dlist_iter	iter;
1430 	int			i = 0;
1431 	Size		size;
1432 
1433 	size = sizeof(SnapshotData) +
1434 		sizeof(TransactionId) * orig_snap->xcnt +
1435 		sizeof(TransactionId) * (txn->nsubtxns + 1);
1436 
1437 	snap = MemoryContextAllocZero(rb->context, size);
1438 	memcpy(snap, orig_snap, sizeof(SnapshotData));
1439 
1440 	snap->copied = true;
1441 	snap->active_count = 1;		/* mark as active so nobody frees it */
1442 	snap->regd_count = 0;
1443 	snap->xip = (TransactionId *) (snap + 1);
1444 
1445 	memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1446 
1447 	/*
1448 	 * snap->subxip contains all txids that belong to our transaction which we
1449 	 * need to check via cmin/cmax. Thats why we store the toplevel
1450 	 * transaction in there as well.
1451 	 */
1452 	snap->subxip = snap->xip + snap->xcnt;
1453 	snap->subxip[i++] = txn->xid;
1454 
1455 	/*
1456 	 * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1457 	 * Since it's an upper boundary it is safe to use it for the allocation
1458 	 * above.
1459 	 */
1460 	snap->subxcnt = 1;
1461 
1462 	dlist_foreach(iter, &txn->subtxns)
1463 	{
1464 		ReorderBufferTXN *sub_txn;
1465 
1466 		sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1467 		snap->subxip[i++] = sub_txn->xid;
1468 		snap->subxcnt++;
1469 	}
1470 
1471 	/* sort so we can bsearch() later */
1472 	qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1473 
1474 	/* store the specified current CommandId */
1475 	snap->curcid = cid;
1476 
1477 	return snap;
1478 }
1479 
1480 /*
1481  * Free a previously ReorderBufferCopySnap'ed snapshot
1482  */
1483 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1484 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1485 {
1486 	if (snap->copied)
1487 		pfree(snap);
1488 	else
1489 		SnapBuildSnapDecRefcount(snap);
1490 }
1491 
1492 /*
1493  * Perform the replay of a transaction and its non-aborted subtransactions.
1494  *
1495  * Subtransactions previously have to be processed by
1496  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1497  * transaction with ReorderBufferAssignChild.
1498  *
1499  * We currently can only decode a transaction's contents when its commit
1500  * record is read because that's the only place where we know about cache
1501  * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1502  * and subtransactions (using a k-way merge) and replay the changes in lsn
1503  * order.
1504  */
1505 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)1506 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1507 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1508 					TimestampTz commit_time,
1509 					RepOriginId origin_id, XLogRecPtr origin_lsn)
1510 {
1511 	ReorderBufferTXN *txn;
1512 	volatile Snapshot snapshot_now;
1513 	volatile CommandId command_id = FirstCommandId;
1514 	bool		using_subtxn;
1515 	ReorderBufferIterTXNState *volatile iterstate = NULL;
1516 
1517 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1518 								false);
1519 
1520 	/* unknown transaction, nothing to replay */
1521 	if (txn == NULL)
1522 		return;
1523 
1524 	txn->final_lsn = commit_lsn;
1525 	txn->end_lsn = end_lsn;
1526 	txn->commit_time = commit_time;
1527 	txn->origin_id = origin_id;
1528 	txn->origin_lsn = origin_lsn;
1529 
1530 	/*
1531 	 * If this transaction has no snapshot, it didn't make any changes to the
1532 	 * database, so there's nothing to decode.  Note that
1533 	 * ReorderBufferCommitChild will have transferred any snapshots from
1534 	 * subtransactions if there were any.
1535 	 */
1536 	if (txn->base_snapshot == NULL)
1537 	{
1538 		Assert(txn->ninvalidations == 0);
1539 		ReorderBufferCleanupTXN(rb, txn);
1540 		return;
1541 	}
1542 
1543 	snapshot_now = txn->base_snapshot;
1544 
1545 	/* build data to be able to lookup the CommandIds of catalog tuples */
1546 	ReorderBufferBuildTupleCidHash(rb, txn);
1547 
1548 	/* setup the initial snapshot */
1549 	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1550 
1551 	/*
1552 	 * Decoding needs access to syscaches et al., which in turn use
1553 	 * heavyweight locks and such. Thus we need to have enough state around to
1554 	 * keep track of those.  The easiest way is to simply use a transaction
1555 	 * internally.  That also allows us to easily enforce that nothing writes
1556 	 * to the database by checking for xid assignments.
1557 	 *
1558 	 * When we're called via the SQL SRF there's already a transaction
1559 	 * started, so start an explicit subtransaction there.
1560 	 */
1561 	using_subtxn = IsTransactionOrTransactionBlock();
1562 
1563 	PG_TRY();
1564 	{
1565 		ReorderBufferChange *change;
1566 		ReorderBufferChange *specinsert = NULL;
1567 
1568 		if (using_subtxn)
1569 			BeginInternalSubTransaction("replay");
1570 		else
1571 			StartTransactionCommand();
1572 
1573 		rb->begin(rb, txn);
1574 
1575 		ReorderBufferIterTXNInit(rb, txn, &iterstate);
1576 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1577 		{
1578 			Relation	relation = NULL;
1579 			Oid			reloid;
1580 
1581 			switch (change->action)
1582 			{
1583 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1584 
1585 					/*
1586 					 * Confirmation for speculative insertion arrived. Simply
1587 					 * use as a normal record. It'll be cleaned up at the end
1588 					 * of INSERT processing.
1589 					 */
1590 					if (specinsert == NULL)
1591 						elog(ERROR, "invalid ordering of speculative insertion changes");
1592 					Assert(specinsert->data.tp.oldtuple == NULL);
1593 					change = specinsert;
1594 					change->action = REORDER_BUFFER_CHANGE_INSERT;
1595 
1596 					/* intentionally fall through */
1597 				case REORDER_BUFFER_CHANGE_INSERT:
1598 				case REORDER_BUFFER_CHANGE_UPDATE:
1599 				case REORDER_BUFFER_CHANGE_DELETE:
1600 					Assert(snapshot_now);
1601 
1602 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1603 											change->data.tp.relnode.relNode);
1604 
1605 					/*
1606 					 * Mapped catalog tuple without data, emitted while
1607 					 * catalog table was in the process of being rewritten. We
1608 					 * can fail to look up the relfilenode, because the the
1609 					 * relmapper has no "historic" view, in contrast to normal
1610 					 * the normal catalog during decoding. Thus repeated
1611 					 * rewrites can cause a lookup failure. That's OK because
1612 					 * we do not decode catalog changes anyway. Normally such
1613 					 * tuples would be skipped over below, but we can't
1614 					 * identify whether the table should be logically logged
1615 					 * without mapping the relfilenode to the oid.
1616 					 */
1617 					if (reloid == InvalidOid &&
1618 						change->data.tp.newtuple == NULL &&
1619 						change->data.tp.oldtuple == NULL)
1620 						goto change_done;
1621 					else if (reloid == InvalidOid)
1622 						elog(ERROR, "could not map filenode \"%s\" to relation OID",
1623 							 relpathperm(change->data.tp.relnode,
1624 										 MAIN_FORKNUM));
1625 
1626 					relation = RelationIdGetRelation(reloid);
1627 
1628 					if (!RelationIsValid(relation))
1629 						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1630 							 reloid,
1631 							 relpathperm(change->data.tp.relnode,
1632 										 MAIN_FORKNUM));
1633 
1634 					if (!RelationIsLogicallyLogged(relation))
1635 						goto change_done;
1636 
1637 					/*
1638 					 * For now ignore sequence changes entirely. Most of the
1639 					 * time they don't log changes using records we
1640 					 * understand, so it doesn't make sense to handle the few
1641 					 * cases we do.
1642 					 */
1643 					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1644 						goto change_done;
1645 
1646 					/* user-triggered change */
1647 					if (!IsToastRelation(relation))
1648 					{
1649 						ReorderBufferToastReplace(rb, txn, relation, change);
1650 						rb->apply_change(rb, txn, relation, change);
1651 
1652 						/*
1653 						 * Only clear reassembled toast chunks if we're sure
1654 						 * they're not required anymore. The creator of the
1655 						 * tuple tells us.
1656 						 */
1657 						if (change->data.tp.clear_toast_afterwards)
1658 							ReorderBufferToastReset(rb, txn);
1659 					}
1660 					/* we're not interested in toast deletions */
1661 					else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1662 					{
1663 						/*
1664 						 * Need to reassemble the full toasted Datum in
1665 						 * memory, to ensure the chunks don't get reused till
1666 						 * we're done remove it from the list of this
1667 						 * transaction's changes. Otherwise it will get
1668 						 * freed/reused while restoring spooled data from
1669 						 * disk.
1670 						 */
1671 						Assert(change->data.tp.newtuple != NULL);
1672 
1673 						dlist_delete(&change->node);
1674 						ReorderBufferToastAppendChunk(rb, txn, relation,
1675 													  change);
1676 					}
1677 
1678 			change_done:
1679 
1680 					/*
1681 					 * If speculative insertion was confirmed, the record isn't
1682 					 * needed anymore.
1683 					 */
1684 					if (specinsert != NULL)
1685 					{
1686 						ReorderBufferReturnChange(rb, specinsert);
1687 						specinsert = NULL;
1688 					}
1689 
1690 					if (relation != NULL)
1691 					{
1692 						RelationClose(relation);
1693 						relation = NULL;
1694 					}
1695 					break;
1696 
1697 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1698 
1699 					/*
1700 					 * Speculative insertions are dealt with by delaying the
1701 					 * processing of the insert until the confirmation record
1702 					 * arrives. For that we simply unlink the record from the
1703 					 * chain, so it does not get freed/reused while restoring
1704 					 * spooled data from disk.
1705 					 *
1706 					 * This is safe in the face of concurrent catalog changes
1707 					 * because the relevant relation can't be changed between
1708 					 * speculative insertion and confirmation due to
1709 					 * CheckTableNotInUse() and locking.
1710 					 */
1711 
1712 					/* clear out a pending (and thus failed) speculation */
1713 					if (specinsert != NULL)
1714 					{
1715 						ReorderBufferReturnChange(rb, specinsert);
1716 						specinsert = NULL;
1717 					}
1718 
1719 					/* and memorize the pending insertion */
1720 					dlist_delete(&change->node);
1721 					specinsert = change;
1722 					break;
1723 
1724 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
1725 
1726 					/*
1727 					 * Abort for speculative insertion arrived. So cleanup the
1728 					 * specinsert tuple and toast hash.
1729 					 *
1730 					 * Note that we get the spec abort change for each toast
1731 					 * entry but we need to perform the cleanup only the first
1732 					 * time we get it for the main table.
1733 					 */
1734 					if (specinsert != NULL)
1735 					{
1736 						/*
1737 						 * We must clean the toast hash before processing a
1738 						 * completely new tuple to avoid confusion about the
1739 						 * previous tuple's toast chunks.
1740 						 */
1741 						Assert(change->data.tp.clear_toast_afterwards);
1742 						ReorderBufferToastReset(rb, txn);
1743 
1744 						/* We don't need this record anymore. */
1745 						ReorderBufferReturnChange(rb, specinsert);
1746 						specinsert = NULL;
1747 					}
1748 					break;
1749 
1750 				case REORDER_BUFFER_CHANGE_MESSAGE:
1751 					rb->message(rb, txn, change->lsn, true,
1752 								change->data.msg.prefix,
1753 								change->data.msg.message_size,
1754 								change->data.msg.message);
1755 					break;
1756 
1757 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1758 					/* get rid of the old */
1759 					TeardownHistoricSnapshot(false);
1760 
1761 					if (snapshot_now->copied)
1762 					{
1763 						ReorderBufferFreeSnap(rb, snapshot_now);
1764 						snapshot_now =
1765 							ReorderBufferCopySnap(rb, change->data.snapshot,
1766 												  txn, command_id);
1767 					}
1768 
1769 					/*
1770 					 * Restored from disk, need to be careful not to double
1771 					 * free. We could introduce refcounting for that, but for
1772 					 * now this seems infrequent enough not to care.
1773 					 */
1774 					else if (change->data.snapshot->copied)
1775 					{
1776 						snapshot_now =
1777 							ReorderBufferCopySnap(rb, change->data.snapshot,
1778 												  txn, command_id);
1779 					}
1780 					else
1781 					{
1782 						snapshot_now = change->data.snapshot;
1783 					}
1784 
1785 
1786 					/* and continue with the new one */
1787 					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1788 					break;
1789 
1790 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1791 					Assert(change->data.command_id != InvalidCommandId);
1792 
1793 					if (command_id < change->data.command_id)
1794 					{
1795 						command_id = change->data.command_id;
1796 
1797 						if (!snapshot_now->copied)
1798 						{
1799 							/* we don't use the global one anymore */
1800 							snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1801 															txn, command_id);
1802 						}
1803 
1804 						snapshot_now->curcid = command_id;
1805 
1806 						TeardownHistoricSnapshot(false);
1807 						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1808 
1809 						/*
1810 						 * Every time the CommandId is incremented, we could
1811 						 * see new catalog contents, so execute all
1812 						 * invalidations.
1813 						 */
1814 						ReorderBufferExecuteInvalidations(rb, txn);
1815 					}
1816 
1817 					break;
1818 
1819 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1820 					elog(ERROR, "tuplecid value in changequeue");
1821 					break;
1822 			}
1823 		}
1824 
1825 		/* speculative insertion record must be freed by now */
1826 		Assert(!specinsert);
1827 
1828 		/* clean up the iterator */
1829 		ReorderBufferIterTXNFinish(rb, iterstate);
1830 		iterstate = NULL;
1831 
1832 		/* call commit callback */
1833 		rb->commit(rb, txn, commit_lsn);
1834 
1835 		/* this is just a sanity check against bad output plugin behaviour */
1836 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1837 			elog(ERROR, "output plugin used XID %u",
1838 				 GetCurrentTransactionId());
1839 
1840 		/* cleanup */
1841 		TeardownHistoricSnapshot(false);
1842 
1843 		/*
1844 		 * Aborting the current (sub-)transaction as a whole has the right
1845 		 * semantics. We want all locks acquired in here to be released, not
1846 		 * reassigned to the parent and we do not want any database access
1847 		 * have persistent effects.
1848 		 */
1849 		AbortCurrentTransaction();
1850 
1851 		/* make sure there's no cache pollution */
1852 		ReorderBufferExecuteInvalidations(rb, txn);
1853 
1854 		if (using_subtxn)
1855 			RollbackAndReleaseCurrentSubTransaction();
1856 
1857 		if (snapshot_now->copied)
1858 			ReorderBufferFreeSnap(rb, snapshot_now);
1859 
1860 		/* remove potential on-disk data, and deallocate */
1861 		ReorderBufferCleanupTXN(rb, txn);
1862 	}
1863 	PG_CATCH();
1864 	{
1865 		/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1866 		if (iterstate)
1867 			ReorderBufferIterTXNFinish(rb, iterstate);
1868 
1869 		TeardownHistoricSnapshot(true);
1870 
1871 		/*
1872 		 * Force cache invalidation to happen outside of a valid transaction
1873 		 * to prevent catalog access as we just caught an error.
1874 		 */
1875 		AbortCurrentTransaction();
1876 
1877 		/* make sure there's no cache pollution */
1878 		ReorderBufferExecuteInvalidations(rb, txn);
1879 
1880 		if (using_subtxn)
1881 			RollbackAndReleaseCurrentSubTransaction();
1882 
1883 		if (snapshot_now->copied)
1884 			ReorderBufferFreeSnap(rb, snapshot_now);
1885 
1886 		/* remove potential on-disk data, and deallocate */
1887 		ReorderBufferCleanupTXN(rb, txn);
1888 
1889 		PG_RE_THROW();
1890 	}
1891 	PG_END_TRY();
1892 }
1893 
1894 /*
1895  * Abort a transaction that possibly has previous changes. Needs to be first
1896  * called for subtransactions and then for the toplevel xid.
1897  *
1898  * NB: Transactions handled here have to have actively aborted (i.e. have
1899  * produced an abort record). Implicitly aborted transactions are handled via
1900  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1901  * which have committed are handled in ReorderBufferForget().
1902  *
1903  * This function purges this transaction and its contents from memory and
1904  * disk.
1905  */
1906 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1907 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1908 {
1909 	ReorderBufferTXN *txn;
1910 
1911 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1912 								false);
1913 
1914 	/* unknown, nothing to remove */
1915 	if (txn == NULL)
1916 		return;
1917 
1918 	/* cosmetic... */
1919 	txn->final_lsn = lsn;
1920 
1921 	/* remove potential on-disk data, and deallocate */
1922 	ReorderBufferCleanupTXN(rb, txn);
1923 }
1924 
1925 /*
1926  * Abort all transactions that aren't actually running anymore because the
1927  * server restarted.
1928  *
1929  * NB: These really have to be transactions that have aborted due to a server
1930  * crash/immediate restart, as we don't deal with invalidations here.
1931  */
1932 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)1933 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1934 {
1935 	dlist_mutable_iter it;
1936 
1937 	/*
1938 	 * Iterate through all (potential) toplevel TXNs and abort all that are
1939 	 * older than what possibly can be running. Once we've found the first
1940 	 * that is alive we stop, there might be some that acquired an xid earlier
1941 	 * but started writing later, but it's unlikely and they will be cleaned
1942 	 * up in a later call to this function.
1943 	 */
1944 	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1945 	{
1946 		ReorderBufferTXN *txn;
1947 
1948 		txn = dlist_container(ReorderBufferTXN, node, it.cur);
1949 
1950 		if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1951 		{
1952 			elog(DEBUG2, "aborting old transaction %u", txn->xid);
1953 
1954 			/* remove potential on-disk data, and deallocate this tx */
1955 			ReorderBufferCleanupTXN(rb, txn);
1956 		}
1957 		else
1958 			return;
1959 	}
1960 }
1961 
1962 /*
1963  * Forget the contents of a transaction if we aren't interested in it's
1964  * contents. Needs to be first called for subtransactions and then for the
1965  * toplevel xid.
1966  *
1967  * This is significantly different to ReorderBufferAbort() because
1968  * transactions that have committed need to be treated differently from aborted
1969  * ones since they may have modified the catalog.
1970  *
1971  * Note that this is only allowed to be called in the moment a transaction
1972  * commit has just been read, not earlier; otherwise later records referring
1973  * to this xid might re-create the transaction incompletely.
1974  */
1975 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1976 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1977 {
1978 	ReorderBufferTXN *txn;
1979 
1980 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1981 								false);
1982 
1983 	/* unknown, nothing to forget */
1984 	if (txn == NULL)
1985 		return;
1986 
1987 	/* cosmetic... */
1988 	txn->final_lsn = lsn;
1989 
1990 	/*
1991 	 * Process cache invalidation messages if there are any. Even if we're not
1992 	 * interested in the transaction's contents, it could have manipulated the
1993 	 * catalog and we need to update the caches according to that.
1994 	 */
1995 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1996 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1997 										   txn->invalidations);
1998 	else
1999 		Assert(txn->ninvalidations == 0);
2000 
2001 	/* remove potential on-disk data, and deallocate */
2002 	ReorderBufferCleanupTXN(rb, txn);
2003 }
2004 
2005 /*
2006  * Execute invalidations happening outside the context of a decoded
2007  * transaction. That currently happens either for xid-less commits
2008  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
2009  * transactions (via ReorderBufferForget()).
2010  */
2011 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)2012 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
2013 								   SharedInvalidationMessage *invalidations)
2014 {
2015 	bool		use_subtxn = IsTransactionOrTransactionBlock();
2016 	int			i;
2017 
2018 	if (use_subtxn)
2019 		BeginInternalSubTransaction("replay");
2020 
2021 	/*
2022 	 * Force invalidations to happen outside of a valid transaction - that way
2023 	 * entries will just be marked as invalid without accessing the catalog.
2024 	 * That's advantageous because we don't need to setup the full state
2025 	 * necessary for catalog access.
2026 	 */
2027 	if (use_subtxn)
2028 		AbortCurrentTransaction();
2029 
2030 	for (i = 0; i < ninvalidations; i++)
2031 		LocalExecuteInvalidationMessage(&invalidations[i]);
2032 
2033 	if (use_subtxn)
2034 		RollbackAndReleaseCurrentSubTransaction();
2035 }
2036 
2037 /*
2038  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2039  * least once for every xid in XLogRecord->xl_xid (other places in records
2040  * may, but do not have to be passed through here).
2041  *
2042  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2043  * for efficiency. To do that it has to know about when transactions are seen
2044  * first in the WAL. As many types of records are not actually interesting for
2045  * logical decoding, they do not necessarily pass though here.
2046  */
2047 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2048 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2049 {
2050 	/* many records won't have an xid assigned, centralize check here */
2051 	if (xid != InvalidTransactionId)
2052 		ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2053 }
2054 
2055 /*
2056  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2057  * because the previous snapshot doesn't describe the catalog correctly for
2058  * following rows.
2059  */
2060 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2061 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
2062 						 XLogRecPtr lsn, Snapshot snap)
2063 {
2064 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2065 
2066 	change->data.snapshot = snap;
2067 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
2068 
2069 	ReorderBufferQueueChange(rb, xid, lsn, change);
2070 }
2071 
2072 /*
2073  * Set up the transaction's base snapshot.
2074  *
2075  * If we know that xid is a subtransaction, set the base snapshot on the
2076  * top-level transaction instead.
2077  */
2078 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2079 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
2080 							 XLogRecPtr lsn, Snapshot snap)
2081 {
2082 	ReorderBufferTXN *txn;
2083 	bool		is_new;
2084 
2085 	AssertArg(snap != NULL);
2086 
2087 	/*
2088 	 * Fetch the transaction to operate on.  If we know it's a subtransaction,
2089 	 * operate on its top-level transaction instead.
2090 	 */
2091 	txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2092 	if (txn->is_known_as_subxact)
2093 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2094 									NULL, InvalidXLogRecPtr, false);
2095 	Assert(txn->base_snapshot == NULL);
2096 
2097 	txn->base_snapshot = snap;
2098 	txn->base_snapshot_lsn = lsn;
2099 	dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
2100 
2101 	AssertTXNLsnOrder(rb);
2102 }
2103 
2104 /*
2105  * Access the catalog with this CommandId at this point in the changestream.
2106  *
2107  * May only be called for command ids > 1
2108  */
2109 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)2110 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
2111 							 XLogRecPtr lsn, CommandId cid)
2112 {
2113 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2114 
2115 	change->data.command_id = cid;
2116 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
2117 
2118 	ReorderBufferQueueChange(rb, xid, lsn, change);
2119 }
2120 
2121 
2122 /*
2123  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2124  */
2125 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)2126 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
2127 							 XLogRecPtr lsn, RelFileNode node,
2128 							 ItemPointerData tid, CommandId cmin,
2129 							 CommandId cmax, CommandId combocid)
2130 {
2131 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2132 	ReorderBufferTXN *txn;
2133 
2134 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2135 
2136 	change->data.tuplecid.node = node;
2137 	change->data.tuplecid.tid = tid;
2138 	change->data.tuplecid.cmin = cmin;
2139 	change->data.tuplecid.cmax = cmax;
2140 	change->data.tuplecid.combocid = combocid;
2141 	change->lsn = lsn;
2142 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
2143 
2144 	dlist_push_tail(&txn->tuplecids, &change->node);
2145 	txn->ntuplecids++;
2146 }
2147 
2148 /*
2149  * Setup the invalidation of the toplevel transaction.
2150  *
2151  * This needs to be done before ReorderBufferCommit is called!
2152  */
2153 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)2154 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
2155 							  XLogRecPtr lsn, Size nmsgs,
2156 							  SharedInvalidationMessage *msgs)
2157 {
2158 	ReorderBufferTXN *txn;
2159 
2160 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2161 
2162 	if (txn->ninvalidations != 0)
2163 		elog(ERROR, "only ever add one set of invalidations");
2164 
2165 	Assert(nmsgs > 0);
2166 
2167 	txn->ninvalidations = nmsgs;
2168 	txn->invalidations = (SharedInvalidationMessage *)
2169 		MemoryContextAlloc(rb->context,
2170 						   sizeof(SharedInvalidationMessage) * nmsgs);
2171 	memcpy(txn->invalidations, msgs,
2172 		   sizeof(SharedInvalidationMessage) * nmsgs);
2173 }
2174 
2175 /*
2176  * Apply all invalidations we know. Possibly we only need parts at this point
2177  * in the changestream but we don't know which those are.
2178  */
2179 static void
ReorderBufferExecuteInvalidations(ReorderBuffer * rb,ReorderBufferTXN * txn)2180 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2181 {
2182 	int			i;
2183 
2184 	for (i = 0; i < txn->ninvalidations; i++)
2185 		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2186 }
2187 
2188 /*
2189  * Mark a transaction as containing catalog changes
2190  */
2191 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2192 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
2193 								  XLogRecPtr lsn)
2194 {
2195 	ReorderBufferTXN *txn;
2196 
2197 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2198 
2199 	txn->has_catalog_changes = true;
2200 }
2201 
2202 /*
2203  * Query whether a transaction is already *known* to contain catalog
2204  * changes. This can be wrong until directly before the commit!
2205  */
2206 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)2207 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
2208 {
2209 	ReorderBufferTXN *txn;
2210 
2211 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2212 								false);
2213 	if (txn == NULL)
2214 		return false;
2215 
2216 	return txn->has_catalog_changes;
2217 }
2218 
2219 /*
2220  * ReorderBufferXidHasBaseSnapshot
2221  *		Have we already set the base snapshot for the given txn/subtxn?
2222  */
2223 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)2224 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
2225 {
2226 	ReorderBufferTXN *txn;
2227 
2228 	txn = ReorderBufferTXNByXid(rb, xid, false,
2229 								NULL, InvalidXLogRecPtr, false);
2230 
2231 	/* transaction isn't known yet, ergo no snapshot */
2232 	if (txn == NULL)
2233 		return false;
2234 
2235 	/* a known subtxn? operate on top-level txn instead */
2236 	if (txn->is_known_as_subxact)
2237 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2238 									NULL, InvalidXLogRecPtr, false);
2239 
2240 	return txn->base_snapshot != NULL;
2241 }
2242 
2243 
2244 /*
2245  * ---------------------------------------
2246  * Disk serialization support
2247  * ---------------------------------------
2248  */
2249 
2250 /*
2251  * Ensure the IO buffer is >= sz.
2252  */
2253 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)2254 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2255 {
2256 	if (!rb->outbufsize)
2257 	{
2258 		rb->outbuf = MemoryContextAlloc(rb->context, sz);
2259 		rb->outbufsize = sz;
2260 	}
2261 	else if (rb->outbufsize < sz)
2262 	{
2263 		rb->outbuf = repalloc(rb->outbuf, sz);
2264 		rb->outbufsize = sz;
2265 	}
2266 }
2267 
2268 /*
2269  * Check whether the transaction tx should spill its data to disk.
2270  */
2271 static void
ReorderBufferCheckSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2272 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2273 {
2274 	/*
2275 	 * TODO: improve accounting so we cheaply can take subtransactions into
2276 	 * account here.
2277 	 */
2278 	if (txn->nentries_mem >= max_changes_in_memory)
2279 	{
2280 		ReorderBufferSerializeTXN(rb, txn);
2281 		Assert(txn->nentries_mem == 0);
2282 	}
2283 }
2284 
2285 /*
2286  * Spill data of a large transaction (and its subtransactions) to disk.
2287  */
2288 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2289 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2290 {
2291 	dlist_iter	subtxn_i;
2292 	dlist_mutable_iter change_i;
2293 	int			fd = -1;
2294 	XLogSegNo	curOpenSegNo = 0;
2295 	Size		spilled = 0;
2296 
2297 	elog(DEBUG2, "spill %u changes in XID %u to disk",
2298 		 (uint32) txn->nentries_mem, txn->xid);
2299 
2300 	/* do the same to all child TXs */
2301 	dlist_foreach(subtxn_i, &txn->subtxns)
2302 	{
2303 		ReorderBufferTXN *subtxn;
2304 
2305 		subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2306 		ReorderBufferSerializeTXN(rb, subtxn);
2307 	}
2308 
2309 	/* serialize changestream */
2310 	dlist_foreach_modify(change_i, &txn->changes)
2311 	{
2312 		ReorderBufferChange *change;
2313 
2314 		change = dlist_container(ReorderBufferChange, node, change_i.cur);
2315 
2316 		/*
2317 		 * store in segment in which it belongs by start lsn, don't split over
2318 		 * multiple segments tho
2319 		 */
2320 		if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2321 		{
2322 			char		path[MAXPGPATH];
2323 
2324 			if (fd != -1)
2325 				CloseTransientFile(fd);
2326 
2327 			XLByteToSeg(change->lsn, curOpenSegNo);
2328 
2329 			/*
2330 			 * No need to care about TLIs here, only used during a single run,
2331 			 * so each LSN only maps to a specific WAL record.
2332 			 */
2333 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2334 										curOpenSegNo);
2335 
2336 			/* open segment, create it if necessary */
2337 			fd = OpenTransientFile(path,
2338 								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2339 								   S_IRUSR | S_IWUSR);
2340 
2341 			if (fd < 0)
2342 				ereport(ERROR,
2343 						(errcode_for_file_access(),
2344 						 errmsg("could not open file \"%s\": %m", path)));
2345 		}
2346 
2347 		ReorderBufferSerializeChange(rb, txn, fd, change);
2348 		dlist_delete(&change->node);
2349 		ReorderBufferReturnChange(rb, change);
2350 
2351 		spilled++;
2352 	}
2353 
2354 	Assert(spilled == txn->nentries_mem);
2355 	Assert(dlist_is_empty(&txn->changes));
2356 	txn->nentries_mem = 0;
2357 	txn->serialized = true;
2358 
2359 	if (fd != -1)
2360 		CloseTransientFile(fd);
2361 }
2362 
2363 /*
2364  * Serialize individual change to disk.
2365  */
2366 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)2367 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2368 							 int fd, ReorderBufferChange *change)
2369 {
2370 	ReorderBufferDiskChange *ondisk;
2371 	Size		sz = sizeof(ReorderBufferDiskChange);
2372 
2373 	ReorderBufferSerializeReserve(rb, sz);
2374 
2375 	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2376 	memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2377 
2378 	switch (change->action)
2379 	{
2380 			/* fall through these, they're all similar enough */
2381 		case REORDER_BUFFER_CHANGE_INSERT:
2382 		case REORDER_BUFFER_CHANGE_UPDATE:
2383 		case REORDER_BUFFER_CHANGE_DELETE:
2384 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2385 			{
2386 				char	   *data;
2387 				ReorderBufferTupleBuf *oldtup,
2388 						   *newtup;
2389 				Size		oldlen = 0;
2390 				Size		newlen = 0;
2391 
2392 				oldtup = change->data.tp.oldtuple;
2393 				newtup = change->data.tp.newtuple;
2394 
2395 				if (oldtup)
2396 				{
2397 					sz += sizeof(HeapTupleData);
2398 					oldlen = oldtup->tuple.t_len;
2399 					sz += oldlen;
2400 				}
2401 
2402 				if (newtup)
2403 				{
2404 					sz += sizeof(HeapTupleData);
2405 					newlen = newtup->tuple.t_len;
2406 					sz += newlen;
2407 				}
2408 
2409 				/* make sure we have enough space */
2410 				ReorderBufferSerializeReserve(rb, sz);
2411 
2412 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2413 				/* might have been reallocated above */
2414 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2415 
2416 				if (oldlen)
2417 				{
2418 					memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2419 					data += sizeof(HeapTupleData);
2420 
2421 					memcpy(data, oldtup->tuple.t_data, oldlen);
2422 					data += oldlen;
2423 				}
2424 
2425 				if (newlen)
2426 				{
2427 					memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2428 					data += sizeof(HeapTupleData);
2429 
2430 					memcpy(data, newtup->tuple.t_data, newlen);
2431 					data += newlen;
2432 				}
2433 				break;
2434 			}
2435 		case REORDER_BUFFER_CHANGE_MESSAGE:
2436 			{
2437 				char	   *data;
2438 				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
2439 
2440 				sz += prefix_size + change->data.msg.message_size +
2441 					sizeof(Size) + sizeof(Size);
2442 				ReorderBufferSerializeReserve(rb, sz);
2443 
2444 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2445 
2446 				/* might have been reallocated above */
2447 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2448 
2449 				/* write the prefix including the size */
2450 				memcpy(data, &prefix_size, sizeof(Size));
2451 				data += sizeof(Size);
2452 				memcpy(data, change->data.msg.prefix,
2453 					   prefix_size);
2454 				data += prefix_size;
2455 
2456 				/* write the message including the size */
2457 				memcpy(data, &change->data.msg.message_size, sizeof(Size));
2458 				data += sizeof(Size);
2459 				memcpy(data, change->data.msg.message,
2460 					   change->data.msg.message_size);
2461 				data += change->data.msg.message_size;
2462 
2463 				break;
2464 			}
2465 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2466 			{
2467 				Snapshot	snap;
2468 				char	   *data;
2469 
2470 				snap = change->data.snapshot;
2471 
2472 				sz += sizeof(SnapshotData) +
2473 					sizeof(TransactionId) * snap->xcnt +
2474 					sizeof(TransactionId) * snap->subxcnt;
2475 
2476 				/* make sure we have enough space */
2477 				ReorderBufferSerializeReserve(rb, sz);
2478 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2479 				/* might have been reallocated above */
2480 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2481 
2482 				memcpy(data, snap, sizeof(SnapshotData));
2483 				data += sizeof(SnapshotData);
2484 
2485 				if (snap->xcnt)
2486 				{
2487 					memcpy(data, snap->xip,
2488 						   sizeof(TransactionId) * snap->xcnt);
2489 					data += sizeof(TransactionId) * snap->xcnt;
2490 				}
2491 
2492 				if (snap->subxcnt)
2493 				{
2494 					memcpy(data, snap->subxip,
2495 						   sizeof(TransactionId) * snap->subxcnt);
2496 					data += sizeof(TransactionId) * snap->subxcnt;
2497 				}
2498 				break;
2499 			}
2500 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2501 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2502 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2503 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2504 			/* ReorderBufferChange contains everything important */
2505 			break;
2506 	}
2507 
2508 	ondisk->size = sz;
2509 
2510 	errno = 0;
2511 	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2512 	{
2513 		int			save_errno = errno;
2514 
2515 		CloseTransientFile(fd);
2516 
2517 		/* if write didn't set errno, assume problem is no disk space */
2518 		errno = save_errno ? save_errno : ENOSPC;
2519 		ereport(ERROR,
2520 				(errcode_for_file_access(),
2521 				 errmsg("could not write to data file for XID %u: %m",
2522 						txn->xid)));
2523 	}
2524 
2525 	/*
2526 	 * Keep the transaction's final_lsn up to date with each change we send to
2527 	 * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
2528 	 * only do this on commit and abort records, but that doesn't work if a
2529 	 * system crash leaves a transaction without its abort record).
2530 	 *
2531 	 * Make sure not to move it backwards.
2532 	 */
2533 	if (txn->final_lsn < change->lsn)
2534 		txn->final_lsn = change->lsn;
2535 
2536 	Assert(ondisk->change.action == change->action);
2537 }
2538 
2539 /*
2540  * Restore a number of changes spilled to disk back into memory.
2541  */
2542 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,File * fd,XLogSegNo * segno)2543 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2544 							File *fd, XLogSegNo *segno)
2545 {
2546 	Size		restored = 0;
2547 	XLogSegNo	last_segno;
2548 	dlist_mutable_iter cleanup_iter;
2549 
2550 	Assert(txn->first_lsn != InvalidXLogRecPtr);
2551 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2552 
2553 	/* free current entries, so we have memory for more */
2554 	dlist_foreach_modify(cleanup_iter, &txn->changes)
2555 	{
2556 		ReorderBufferChange *cleanup =
2557 		dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2558 
2559 		dlist_delete(&cleanup->node);
2560 		ReorderBufferReturnChange(rb, cleanup);
2561 	}
2562 	txn->nentries_mem = 0;
2563 	Assert(dlist_is_empty(&txn->changes));
2564 
2565 	XLByteToSeg(txn->final_lsn, last_segno);
2566 
2567 	while (restored < max_changes_in_memory && *segno <= last_segno)
2568 	{
2569 		int			readBytes;
2570 		ReorderBufferDiskChange *ondisk;
2571 
2572 		if (*fd == -1)
2573 		{
2574 			char		path[MAXPGPATH];
2575 
2576 			/* first time in */
2577 			if (*segno == 0)
2578 				XLByteToSeg(txn->first_lsn, *segno);
2579 
2580 			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2581 
2582 			/*
2583 			 * No need to care about TLIs here, only used during a single run,
2584 			 * so each LSN only maps to a specific WAL record.
2585 			 */
2586 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2587 										*segno);
2588 
2589 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0);
2590 			if (*fd < 0 && errno == ENOENT)
2591 			{
2592 				*fd = -1;
2593 				(*segno)++;
2594 				continue;
2595 			}
2596 			else if (*fd < 0)
2597 				ereport(ERROR,
2598 						(errcode_for_file_access(),
2599 						 errmsg("could not open file \"%s\": %m",
2600 								path)));
2601 
2602 		}
2603 
2604 		/*
2605 		 * Read the statically sized part of a change which has information
2606 		 * about the total size. If we couldn't read a record, we're at the
2607 		 * end of this file.
2608 		 */
2609 		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2610 		readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2611 
2612 		/* eof */
2613 		if (readBytes == 0)
2614 		{
2615 			FileClose(*fd);
2616 			*fd = -1;
2617 			(*segno)++;
2618 			continue;
2619 		}
2620 		else if (readBytes < 0)
2621 			ereport(ERROR,
2622 					(errcode_for_file_access(),
2623 				errmsg("could not read from reorderbuffer spill file: %m")));
2624 		else if (readBytes != sizeof(ReorderBufferDiskChange))
2625 			ereport(ERROR,
2626 					(errcode_for_file_access(),
2627 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2628 							readBytes,
2629 							(uint32) sizeof(ReorderBufferDiskChange))));
2630 
2631 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2632 
2633 		ReorderBufferSerializeReserve(rb,
2634 							 sizeof(ReorderBufferDiskChange) + ondisk->size);
2635 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2636 
2637 		readBytes = FileRead(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2638 							 ondisk->size - sizeof(ReorderBufferDiskChange));
2639 
2640 		if (readBytes < 0)
2641 			ereport(ERROR,
2642 					(errcode_for_file_access(),
2643 				errmsg("could not read from reorderbuffer spill file: %m")));
2644 		else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2645 			ereport(ERROR,
2646 					(errcode_for_file_access(),
2647 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2648 							readBytes,
2649 				(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2650 
2651 		/*
2652 		 * ok, read a full change from disk, now restore it into proper
2653 		 * in-memory format
2654 		 */
2655 		ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2656 		restored++;
2657 	}
2658 
2659 	return restored;
2660 }
2661 
2662 /*
2663  * Convert change from its on-disk format to in-memory format and queue it onto
2664  * the TXN's ->changes list.
2665  *
2666  * Note: although "data" is declared char*, at entry it points to a
2667  * maxalign'd buffer, making it safe in most of this function to assume
2668  * that the pointed-to data is suitably aligned for direct access.
2669  */
2670 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)2671 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2672 						   char *data)
2673 {
2674 	ReorderBufferDiskChange *ondisk;
2675 	ReorderBufferChange *change;
2676 
2677 	ondisk = (ReorderBufferDiskChange *) data;
2678 
2679 	change = ReorderBufferGetChange(rb);
2680 
2681 	/* copy static part */
2682 	memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2683 
2684 	data += sizeof(ReorderBufferDiskChange);
2685 
2686 	/* restore individual stuff */
2687 	switch (change->action)
2688 	{
2689 			/* fall through these, they're all similar enough */
2690 		case REORDER_BUFFER_CHANGE_INSERT:
2691 		case REORDER_BUFFER_CHANGE_UPDATE:
2692 		case REORDER_BUFFER_CHANGE_DELETE:
2693 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2694 			if (change->data.tp.oldtuple)
2695 			{
2696 				uint32		tuplelen = ((HeapTuple) data)->t_len;
2697 
2698 				change->data.tp.oldtuple =
2699 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2700 
2701 				/* restore ->tuple */
2702 				memcpy(&change->data.tp.oldtuple->tuple, data,
2703 					   sizeof(HeapTupleData));
2704 				data += sizeof(HeapTupleData);
2705 
2706 				/* reset t_data pointer into the new tuplebuf */
2707 				change->data.tp.oldtuple->tuple.t_data =
2708 					ReorderBufferTupleBufData(change->data.tp.oldtuple);
2709 
2710 				/* restore tuple data itself */
2711 				memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2712 				data += tuplelen;
2713 			}
2714 
2715 			if (change->data.tp.newtuple)
2716 			{
2717 				/* here, data might not be suitably aligned! */
2718 				uint32		tuplelen;
2719 
2720 				memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2721 					   sizeof(uint32));
2722 
2723 				change->data.tp.newtuple =
2724 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2725 
2726 				/* restore ->tuple */
2727 				memcpy(&change->data.tp.newtuple->tuple, data,
2728 					   sizeof(HeapTupleData));
2729 				data += sizeof(HeapTupleData);
2730 
2731 				/* reset t_data pointer into the new tuplebuf */
2732 				change->data.tp.newtuple->tuple.t_data =
2733 					ReorderBufferTupleBufData(change->data.tp.newtuple);
2734 
2735 				/* restore tuple data itself */
2736 				memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2737 				data += tuplelen;
2738 			}
2739 
2740 			break;
2741 		case REORDER_BUFFER_CHANGE_MESSAGE:
2742 			{
2743 				Size		prefix_size;
2744 
2745 				/* read prefix */
2746 				memcpy(&prefix_size, data, sizeof(Size));
2747 				data += sizeof(Size);
2748 				change->data.msg.prefix = MemoryContextAlloc(rb->context,
2749 															 prefix_size);
2750 				memcpy(change->data.msg.prefix, data, prefix_size);
2751 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2752 				data += prefix_size;
2753 
2754 				/* read the message */
2755 				memcpy(&change->data.msg.message_size, data, sizeof(Size));
2756 				data += sizeof(Size);
2757 				change->data.msg.message = MemoryContextAlloc(rb->context,
2758 											  change->data.msg.message_size);
2759 				memcpy(change->data.msg.message, data,
2760 					   change->data.msg.message_size);
2761 				data += change->data.msg.message_size;
2762 
2763 				break;
2764 			}
2765 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2766 			{
2767 				Snapshot	oldsnap;
2768 				Snapshot	newsnap;
2769 				Size		size;
2770 
2771 				oldsnap = (Snapshot) data;
2772 
2773 				size = sizeof(SnapshotData) +
2774 					sizeof(TransactionId) * oldsnap->xcnt +
2775 					sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2776 
2777 				change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2778 
2779 				newsnap = change->data.snapshot;
2780 
2781 				memcpy(newsnap, data, size);
2782 				newsnap->xip = (TransactionId *)
2783 					(((char *) newsnap) + sizeof(SnapshotData));
2784 				newsnap->subxip = newsnap->xip + newsnap->xcnt;
2785 				newsnap->copied = true;
2786 				break;
2787 			}
2788 			/* the base struct contains all the data, easy peasy */
2789 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2790 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2791 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2792 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2793 			break;
2794 	}
2795 
2796 	dlist_push_tail(&txn->changes, &change->node);
2797 	txn->nentries_mem++;
2798 }
2799 
2800 /*
2801  * Remove all on-disk stored for the passed in transaction.
2802  */
2803 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)2804 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2805 {
2806 	XLogSegNo	first;
2807 	XLogSegNo	cur;
2808 	XLogSegNo	last;
2809 
2810 	Assert(txn->first_lsn != InvalidXLogRecPtr);
2811 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2812 
2813 	XLByteToSeg(txn->first_lsn, first);
2814 	XLByteToSeg(txn->final_lsn, last);
2815 
2816 	/* iterate over all possible filenames, and delete them */
2817 	for (cur = first; cur <= last; cur++)
2818 	{
2819 		char		path[MAXPGPATH];
2820 
2821 		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
2822 		if (unlink(path) != 0 && errno != ENOENT)
2823 			ereport(ERROR,
2824 					(errcode_for_file_access(),
2825 					 errmsg("could not remove file \"%s\": %m", path)));
2826 	}
2827 }
2828 
2829 /*
2830  * Remove any leftover serialized reorder buffers from a slot directory after a
2831  * prior crash or decoding session exit.
2832  */
2833 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)2834 ReorderBufferCleanupSerializedTXNs(const char *slotname)
2835 {
2836 	DIR		   *spill_dir;
2837 	struct dirent *spill_de;
2838 	struct stat statbuf;
2839 	char		path[MAXPGPATH * 2 + 12];
2840 
2841 	sprintf(path, "pg_replslot/%s", slotname);
2842 
2843 	/* we're only handling directories here, skip if it's not ours */
2844 	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2845 		return;
2846 
2847 	spill_dir = AllocateDir(path);
2848 	while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2849 	{
2850 		/* only look at names that can be ours */
2851 		if (strncmp(spill_de->d_name, "xid", 3) == 0)
2852 		{
2853 			snprintf(path, sizeof(path),
2854 					 "pg_replslot/%s/%s", slotname,
2855 					 spill_de->d_name);
2856 
2857 			if (unlink(path) != 0)
2858 				ereport(ERROR,
2859 						(errcode_for_file_access(),
2860 						 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2861 								path, slotname)));
2862 		}
2863 	}
2864 	FreeDir(spill_dir);
2865 }
2866 
2867 /*
2868  * Given a replication slot, transaction ID and segment number, fill in the
2869  * corresponding spill file into 'path', which is a caller-owned buffer of size
2870  * at least MAXPGPATH.
2871  */
2872 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)2873 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2874 							XLogSegNo segno)
2875 {
2876 	XLogRecPtr	recptr;
2877 
2878 	XLogSegNoOffsetToRecPtr(segno, 0, recptr);
2879 
2880 	snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2881 			NameStr(MyReplicationSlot->data.name),
2882 			xid,
2883 			(uint32) (recptr >> 32), (uint32) recptr);
2884 }
2885 
2886 /*
2887  * Delete all data spilled to disk after we've restarted/crashed. It will be
2888  * recreated when the respective slots are reused.
2889  */
2890 void
StartupReorderBuffer(void)2891 StartupReorderBuffer(void)
2892 {
2893 	DIR		   *logical_dir;
2894 	struct dirent *logical_de;
2895 
2896 	logical_dir = AllocateDir("pg_replslot");
2897 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2898 	{
2899 		if (strcmp(logical_de->d_name, ".") == 0 ||
2900 			strcmp(logical_de->d_name, "..") == 0)
2901 			continue;
2902 
2903 		/* if it cannot be a slot, skip the directory */
2904 		if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2905 			continue;
2906 
2907 		/*
2908 		 * ok, has to be a surviving logical slot, iterate and delete
2909 		 * everything starting with xid-*
2910 		 */
2911 		ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
2912 	}
2913 	FreeDir(logical_dir);
2914 }
2915 
2916 /* ---------------------------------------
2917  * toast reassembly support
2918  * ---------------------------------------
2919  */
2920 
2921 /*
2922  * Initialize per tuple toast reconstruction support.
2923  */
2924 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)2925 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2926 {
2927 	HASHCTL		hash_ctl;
2928 
2929 	Assert(txn->toast_hash == NULL);
2930 
2931 	memset(&hash_ctl, 0, sizeof(hash_ctl));
2932 	hash_ctl.keysize = sizeof(Oid);
2933 	hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2934 	hash_ctl.hcxt = rb->context;
2935 	txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2936 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2937 }
2938 
2939 /*
2940  * Per toast-chunk handling for toast reconstruction
2941  *
2942  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2943  * toasted Datum comes along.
2944  */
2945 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)2946 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2947 							  Relation relation, ReorderBufferChange *change)
2948 {
2949 	ReorderBufferToastEnt *ent;
2950 	ReorderBufferTupleBuf *newtup;
2951 	bool		found;
2952 	int32		chunksize;
2953 	bool		isnull;
2954 	Pointer		chunk;
2955 	TupleDesc	desc = RelationGetDescr(relation);
2956 	Oid			chunk_id;
2957 	int32		chunk_seq;
2958 
2959 	if (txn->toast_hash == NULL)
2960 		ReorderBufferToastInitHash(rb, txn);
2961 
2962 	Assert(IsToastRelation(relation));
2963 
2964 	newtup = change->data.tp.newtuple;
2965 	chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2966 	Assert(!isnull);
2967 	chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2968 	Assert(!isnull);
2969 
2970 	ent = (ReorderBufferToastEnt *)
2971 		hash_search(txn->toast_hash,
2972 					(void *) &chunk_id,
2973 					HASH_ENTER,
2974 					&found);
2975 
2976 	if (!found)
2977 	{
2978 		Assert(ent->chunk_id == chunk_id);
2979 		ent->num_chunks = 0;
2980 		ent->last_chunk_seq = 0;
2981 		ent->size = 0;
2982 		ent->reconstructed = NULL;
2983 		dlist_init(&ent->chunks);
2984 
2985 		if (chunk_seq != 0)
2986 			elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2987 				 chunk_seq, chunk_id);
2988 	}
2989 	else if (found && chunk_seq != ent->last_chunk_seq + 1)
2990 		elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2991 			 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2992 
2993 	chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2994 	Assert(!isnull);
2995 
2996 	/* calculate size so we can allocate the right size at once later */
2997 	if (!VARATT_IS_EXTENDED(chunk))
2998 		chunksize = VARSIZE(chunk) - VARHDRSZ;
2999 	else if (VARATT_IS_SHORT(chunk))
3000 		/* could happen due to heap_form_tuple doing its thing */
3001 		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3002 	else
3003 		elog(ERROR, "unexpected type of toast chunk");
3004 
3005 	ent->size += chunksize;
3006 	ent->last_chunk_seq = chunk_seq;
3007 	ent->num_chunks++;
3008 	dlist_push_tail(&ent->chunks, &change->node);
3009 }
3010 
3011 /*
3012  * Rejigger change->newtuple to point to in-memory toast tuples instead to
3013  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3014  *
3015  * We cannot replace unchanged toast tuples though, so those will still point
3016  * to on-disk toast data.
3017  */
3018 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)3019 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
3020 						  Relation relation, ReorderBufferChange *change)
3021 {
3022 	TupleDesc	desc;
3023 	int			natt;
3024 	Datum	   *attrs;
3025 	bool	   *isnull;
3026 	bool	   *free;
3027 	HeapTuple	tmphtup;
3028 	Relation	toast_rel;
3029 	TupleDesc	toast_desc;
3030 	MemoryContext oldcontext;
3031 	ReorderBufferTupleBuf *newtup;
3032 
3033 	/* no toast tuples changed */
3034 	if (txn->toast_hash == NULL)
3035 		return;
3036 
3037 	oldcontext = MemoryContextSwitchTo(rb->context);
3038 
3039 	/* we should only have toast tuples in an INSERT or UPDATE */
3040 	Assert(change->data.tp.newtuple);
3041 
3042 	desc = RelationGetDescr(relation);
3043 
3044 	toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3045 	if (!RelationIsValid(toast_rel))
3046 		elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
3047 			 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
3048 
3049 	toast_desc = RelationGetDescr(toast_rel);
3050 
3051 	/* should we allocate from stack instead? */
3052 	attrs = palloc0(sizeof(Datum) * desc->natts);
3053 	isnull = palloc0(sizeof(bool) * desc->natts);
3054 	free = palloc0(sizeof(bool) * desc->natts);
3055 
3056 	newtup = change->data.tp.newtuple;
3057 
3058 	heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3059 
3060 	for (natt = 0; natt < desc->natts; natt++)
3061 	{
3062 		Form_pg_attribute attr = desc->attrs[natt];
3063 		ReorderBufferToastEnt *ent;
3064 		struct varlena *varlena;
3065 
3066 		/* va_rawsize is the size of the original datum -- including header */
3067 		struct varatt_external toast_pointer;
3068 		struct varatt_indirect redirect_pointer;
3069 		struct varlena *new_datum = NULL;
3070 		struct varlena *reconstructed;
3071 		dlist_iter	it;
3072 		Size		data_done = 0;
3073 
3074 		/* system columns aren't toasted */
3075 		if (attr->attnum < 0)
3076 			continue;
3077 
3078 		if (attr->attisdropped)
3079 			continue;
3080 
3081 		/* not a varlena datatype */
3082 		if (attr->attlen != -1)
3083 			continue;
3084 
3085 		/* no data */
3086 		if (isnull[natt])
3087 			continue;
3088 
3089 		/* ok, we know we have a toast datum */
3090 		varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3091 
3092 		/* no need to do anything if the tuple isn't external */
3093 		if (!VARATT_IS_EXTERNAL(varlena))
3094 			continue;
3095 
3096 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3097 
3098 		/*
3099 		 * Check whether the toast tuple changed, replace if so.
3100 		 */
3101 		ent = (ReorderBufferToastEnt *)
3102 			hash_search(txn->toast_hash,
3103 						(void *) &toast_pointer.va_valueid,
3104 						HASH_FIND,
3105 						NULL);
3106 		if (ent == NULL)
3107 			continue;
3108 
3109 		new_datum =
3110 			(struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3111 
3112 		free[natt] = true;
3113 
3114 		reconstructed = palloc0(toast_pointer.va_rawsize);
3115 
3116 		ent->reconstructed = reconstructed;
3117 
3118 		/* stitch toast tuple back together from its parts */
3119 		dlist_foreach(it, &ent->chunks)
3120 		{
3121 			bool		isnull;
3122 			ReorderBufferChange *cchange;
3123 			ReorderBufferTupleBuf *ctup;
3124 			Pointer		chunk;
3125 
3126 			cchange = dlist_container(ReorderBufferChange, node, it.cur);
3127 			ctup = cchange->data.tp.newtuple;
3128 			chunk = DatumGetPointer(
3129 						  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3130 
3131 			Assert(!isnull);
3132 			Assert(!VARATT_IS_EXTERNAL(chunk));
3133 			Assert(!VARATT_IS_SHORT(chunk));
3134 
3135 			memcpy(VARDATA(reconstructed) + data_done,
3136 				   VARDATA(chunk),
3137 				   VARSIZE(chunk) - VARHDRSZ);
3138 			data_done += VARSIZE(chunk) - VARHDRSZ;
3139 		}
3140 		Assert(data_done == toast_pointer.va_extsize);
3141 
3142 		/* make sure its marked as compressed or not */
3143 		if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3144 			SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3145 		else
3146 			SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3147 
3148 		memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3149 		redirect_pointer.pointer = reconstructed;
3150 
3151 		SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
3152 		memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3153 			   sizeof(redirect_pointer));
3154 
3155 		attrs[natt] = PointerGetDatum(new_datum);
3156 	}
3157 
3158 	/*
3159 	 * Build tuple in separate memory & copy tuple back into the tuplebuf
3160 	 * passed to the output plugin. We can't directly heap_fill_tuple() into
3161 	 * the tuplebuf because attrs[] will point back into the current content.
3162 	 */
3163 	tmphtup = heap_form_tuple(desc, attrs, isnull);
3164 	Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3165 	Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3166 
3167 	memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3168 	newtup->tuple.t_len = tmphtup->t_len;
3169 
3170 	/*
3171 	 * free resources we won't further need, more persistent stuff will be
3172 	 * free'd in ReorderBufferToastReset().
3173 	 */
3174 	RelationClose(toast_rel);
3175 	pfree(tmphtup);
3176 	for (natt = 0; natt < desc->natts; natt++)
3177 	{
3178 		if (free[natt])
3179 			pfree(DatumGetPointer(attrs[natt]));
3180 	}
3181 	pfree(attrs);
3182 	pfree(free);
3183 	pfree(isnull);
3184 
3185 	MemoryContextSwitchTo(oldcontext);
3186 }
3187 
3188 /*
3189  * Free all resources allocated for toast reconstruction.
3190  */
3191 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)3192 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
3193 {
3194 	HASH_SEQ_STATUS hstat;
3195 	ReorderBufferToastEnt *ent;
3196 
3197 	if (txn->toast_hash == NULL)
3198 		return;
3199 
3200 	/* sequentially walk over the hash and free everything */
3201 	hash_seq_init(&hstat, txn->toast_hash);
3202 	while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3203 	{
3204 		dlist_mutable_iter it;
3205 
3206 		if (ent->reconstructed != NULL)
3207 			pfree(ent->reconstructed);
3208 
3209 		dlist_foreach_modify(it, &ent->chunks)
3210 		{
3211 			ReorderBufferChange *change =
3212 			dlist_container(ReorderBufferChange, node, it.cur);
3213 
3214 			dlist_delete(&change->node);
3215 			ReorderBufferReturnChange(rb, change);
3216 		}
3217 	}
3218 
3219 	hash_destroy(txn->toast_hash);
3220 	txn->toast_hash = NULL;
3221 }
3222 
3223 
3224 /* ---------------------------------------
3225  * Visibility support for logical decoding
3226  *
3227  *
3228  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3229  * always rely on stored cmin/cmax values because of two scenarios:
3230  *
3231  * * A tuple got changed multiple times during a single transaction and thus
3232  *	 has got a combocid. Combocid's are only valid for the duration of a
3233  *	 single transaction.
3234  * * A tuple with a cmin but no cmax (and thus no combocid) got
3235  *	 deleted/updated in another transaction than the one which created it
3236  *	 which we are looking at right now. As only one of cmin, cmax or combocid
3237  *	 is actually stored in the heap we don't have access to the value we
3238  *	 need anymore.
3239  *
3240  * To resolve those problems we have a per-transaction hash of (cmin,
3241  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3242  * (cmin, cmax) values. That also takes care of combocids by simply
3243  * not caring about them at all. As we have the real cmin/cmax values
3244  * combocids aren't interesting.
3245  *
3246  * As we only care about catalog tuples here the overhead of this
3247  * hashtable should be acceptable.
3248  *
3249  * Heap rewrites complicate this a bit, check rewriteheap.c for
3250  * details.
3251  * -------------------------------------------------------------------------
3252  */
3253 
3254 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
3255 typedef struct RewriteMappingFile
3256 {
3257 	XLogRecPtr	lsn;
3258 	char		fname[MAXPGPATH];
3259 } RewriteMappingFile;
3260 
3261 #if NOT_USED
3262 static void
DisplayMapping(HTAB * tuplecid_data)3263 DisplayMapping(HTAB *tuplecid_data)
3264 {
3265 	HASH_SEQ_STATUS hstat;
3266 	ReorderBufferTupleCidEnt *ent;
3267 
3268 	hash_seq_init(&hstat, tuplecid_data);
3269 	while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3270 	{
3271 		elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3272 			 ent->key.relnode.dbNode,
3273 			 ent->key.relnode.spcNode,
3274 			 ent->key.relnode.relNode,
3275 			 BlockIdGetBlockNumber(&ent->key.tid.ip_blkid),
3276 			 ent->key.tid.ip_posid,
3277 			 ent->cmin,
3278 			 ent->cmax
3279 			);
3280 	}
3281 }
3282 #endif
3283 
3284 /*
3285  * Apply a single mapping file to tuplecid_data.
3286  *
3287  * The mapping file has to have been verified to be a) committed b) for our
3288  * transaction c) applied in LSN order.
3289  */
3290 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)3291 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3292 {
3293 	char		path[MAXPGPATH];
3294 	int			fd;
3295 	int			readBytes;
3296 	LogicalRewriteMappingData map;
3297 
3298 	sprintf(path, "pg_logical/mappings/%s", fname);
3299 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3300 	if (fd < 0)
3301 		ereport(ERROR,
3302 				(errcode_for_file_access(),
3303 				 errmsg("could not open file \"%s\": %m", path)));
3304 
3305 	while (true)
3306 	{
3307 		ReorderBufferTupleCidKey key;
3308 		ReorderBufferTupleCidEnt *ent;
3309 		ReorderBufferTupleCidEnt *new_ent;
3310 		bool		found;
3311 
3312 		/* be careful about padding */
3313 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3314 
3315 		/* read all mappings till the end of the file */
3316 		readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3317 
3318 		if (readBytes < 0)
3319 			ereport(ERROR,
3320 					(errcode_for_file_access(),
3321 					 errmsg("could not read file \"%s\": %m",
3322 							path)));
3323 		else if (readBytes == 0)	/* EOF */
3324 			break;
3325 		else if (readBytes != sizeof(LogicalRewriteMappingData))
3326 			ereport(ERROR,
3327 					(errcode_for_file_access(),
3328 					 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3329 							path, readBytes,
3330 							(int32) sizeof(LogicalRewriteMappingData))));
3331 
3332 		key.relnode = map.old_node;
3333 		ItemPointerCopy(&map.old_tid,
3334 						&key.tid);
3335 
3336 
3337 		ent = (ReorderBufferTupleCidEnt *)
3338 			hash_search(tuplecid_data,
3339 						(void *) &key,
3340 						HASH_FIND,
3341 						NULL);
3342 
3343 		/* no existing mapping, no need to update */
3344 		if (!ent)
3345 			continue;
3346 
3347 		key.relnode = map.new_node;
3348 		ItemPointerCopy(&map.new_tid,
3349 						&key.tid);
3350 
3351 		new_ent = (ReorderBufferTupleCidEnt *)
3352 			hash_search(tuplecid_data,
3353 						(void *) &key,
3354 						HASH_ENTER,
3355 						&found);
3356 
3357 		if (found)
3358 		{
3359 			/*
3360 			 * Make sure the existing mapping makes sense. We sometime update
3361 			 * old records that did not yet have a cmax (e.g. pg_class' own
3362 			 * entry while rewriting it) during rewrites, so allow that.
3363 			 */
3364 			Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3365 			Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3366 		}
3367 		else
3368 		{
3369 			/* update mapping */
3370 			new_ent->cmin = ent->cmin;
3371 			new_ent->cmax = ent->cmax;
3372 			new_ent->combocid = ent->combocid;
3373 		}
3374 	}
3375 
3376 	CloseTransientFile(fd);
3377 }
3378 
3379 
3380 /*
3381  * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3382  */
3383 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)3384 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3385 {
3386 	return bsearch(&xid, xip, num,
3387 				   sizeof(TransactionId), xidComparator) != NULL;
3388 }
3389 
3390 /*
3391  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3392  */
3393 static int
file_sort_by_lsn(const void * a_p,const void * b_p)3394 file_sort_by_lsn(const void *a_p, const void *b_p)
3395 {
3396 	RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3397 	RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3398 
3399 	if (a->lsn < b->lsn)
3400 		return -1;
3401 	else if (a->lsn > b->lsn)
3402 		return 1;
3403 	return 0;
3404 }
3405 
3406 /*
3407  * Apply any existing logical remapping files if there are any targeted at our
3408  * transaction for relid.
3409  */
3410 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)3411 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3412 {
3413 	DIR		   *mapping_dir;
3414 	struct dirent *mapping_de;
3415 	List	   *files = NIL;
3416 	ListCell   *file;
3417 	RewriteMappingFile **files_a;
3418 	size_t		off;
3419 	Oid			dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3420 
3421 	mapping_dir = AllocateDir("pg_logical/mappings");
3422 	while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3423 	{
3424 		Oid			f_dboid;
3425 		Oid			f_relid;
3426 		TransactionId f_mapped_xid;
3427 		TransactionId f_create_xid;
3428 		XLogRecPtr	f_lsn;
3429 		uint32		f_hi,
3430 					f_lo;
3431 		RewriteMappingFile *f;
3432 
3433 		if (strcmp(mapping_de->d_name, ".") == 0 ||
3434 			strcmp(mapping_de->d_name, "..") == 0)
3435 			continue;
3436 
3437 		/* Ignore files that aren't ours */
3438 		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3439 			continue;
3440 
3441 		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3442 				   &f_dboid, &f_relid, &f_hi, &f_lo,
3443 				   &f_mapped_xid, &f_create_xid) != 6)
3444 			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3445 
3446 		f_lsn = ((uint64) f_hi) << 32 | f_lo;
3447 
3448 		/* mapping for another database */
3449 		if (f_dboid != dboid)
3450 			continue;
3451 
3452 		/* mapping for another relation */
3453 		if (f_relid != relid)
3454 			continue;
3455 
3456 		/* did the creating transaction abort? */
3457 		if (!TransactionIdDidCommit(f_create_xid))
3458 			continue;
3459 
3460 		/* not for our transaction */
3461 		if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3462 			continue;
3463 
3464 		/* ok, relevant, queue for apply */
3465 		f = palloc(sizeof(RewriteMappingFile));
3466 		f->lsn = f_lsn;
3467 		strcpy(f->fname, mapping_de->d_name);
3468 		files = lappend(files, f);
3469 	}
3470 	FreeDir(mapping_dir);
3471 
3472 	/* build array we can easily sort */
3473 	files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3474 	off = 0;
3475 	foreach(file, files)
3476 	{
3477 		files_a[off++] = lfirst(file);
3478 	}
3479 
3480 	/* sort files so we apply them in LSN order */
3481 	qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3482 		  file_sort_by_lsn);
3483 
3484 	for (off = 0; off < list_length(files); off++)
3485 	{
3486 		RewriteMappingFile *f = files_a[off];
3487 
3488 		elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3489 			 snapshot->subxip[0]);
3490 		ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3491 		pfree(f);
3492 	}
3493 }
3494 
3495 /*
3496  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3497  * combocids.
3498  */
3499 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)3500 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3501 							  Snapshot snapshot,
3502 							  HeapTuple htup, Buffer buffer,
3503 							  CommandId *cmin, CommandId *cmax)
3504 {
3505 	ReorderBufferTupleCidKey key;
3506 	ReorderBufferTupleCidEnt *ent;
3507 	ForkNumber	forkno;
3508 	BlockNumber blockno;
3509 	bool		updated_mapping = false;
3510 
3511 	/* be careful about padding */
3512 	memset(&key, 0, sizeof(key));
3513 
3514 	Assert(!BufferIsLocal(buffer));
3515 
3516 	/*
3517 	 * get relfilenode from the buffer, no convenient way to access it other
3518 	 * than that.
3519 	 */
3520 	BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3521 
3522 	/* tuples can only be in the main fork */
3523 	Assert(forkno == MAIN_FORKNUM);
3524 	Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3525 
3526 	ItemPointerCopy(&htup->t_self,
3527 					&key.tid);
3528 
3529 restart:
3530 	ent = (ReorderBufferTupleCidEnt *)
3531 		hash_search(tuplecid_data,
3532 					(void *) &key,
3533 					HASH_FIND,
3534 					NULL);
3535 
3536 	/*
3537 	 * failed to find a mapping, check whether the table was rewritten and
3538 	 * apply mapping if so, but only do that once - there can be no new
3539 	 * mappings while we are in here since we have to hold a lock on the
3540 	 * relation.
3541 	 */
3542 	if (ent == NULL && !updated_mapping)
3543 	{
3544 		UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3545 		/* now check but don't update for a mapping again */
3546 		updated_mapping = true;
3547 		goto restart;
3548 	}
3549 	else if (ent == NULL)
3550 		return false;
3551 
3552 	if (cmin)
3553 		*cmin = ent->cmin;
3554 	if (cmax)
3555 		*cmax = ent->cmax;
3556 	return true;
3557 }
3558