1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *	  PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *	  This module gets handed individual pieces of transactions in the order
15  *	  they are written to the WAL and is responsible to reassemble them into
16  *	  toplevel transaction sized pieces. When a transaction is completely
17  *	  reassembled - signalled by reading the transaction commit record - it
18  *	  will then call the output plugin (cf. ReorderBufferCommit()) with the
19  *	  individual changes. The output plugins rely on snapshots built by
20  *	  snapbuild.c which hands them to us.
21  *
22  *	  Transactions and subtransactions/savepoints in postgres are not
23  *	  immediately linked to each other from outside the performing
24  *	  backend. Only at commit/abort (or special xact_assignment records) they
25  *	  are linked together. Which means that we will have to splice together a
26  *	  toplevel transaction from its subtransactions. To do that efficiently we
27  *	  build a binary heap indexed by the smallest current lsn of the individual
28  *	  subtransactions' changestreams. As the individual streams are inherently
29  *	  ordered by LSN - since that is where we build them from - the transaction
30  *	  can easily be reassembled by always using the subtransaction with the
31  *	  smallest current LSN from the heap.
32  *
33  *	  In order to cope with large transactions - which can be several times as
34  *	  big as the available memory - this module supports spooling the contents
35  *	  of a large transactions to disk. When the transaction is replayed the
36  *	  contents of individual (sub-)transactions will be read from disk in
37  *	  chunks.
38  *
39  *	  This module also has to deal with reassembling toast records from the
40  *	  individual chunks stored in WAL. When a new (or initial) version of a
41  *	  tuple is stored in WAL it will always be preceded by the toast chunks
42  *	  emitted for the columns stored out of line. Within a single toplevel
43  *	  transaction there will be no other data carrying records between a row's
44  *	  toast chunks and the row data itself. See ReorderBufferToast* for
45  *	  details.
46  *
47  *	  ReorderBuffer uses two special memory context types - SlabContext for
48  *	  allocations of fixed-length structures (changes and transactions), and
49  *	  GenerationContext for the variable-length transaction data (allocated
50  *	  and freed in groups with similar lifespan).
51  *
52  * -------------------------------------------------------------------------
53  */
54 #include "postgres.h"
55 
56 #include <unistd.h>
57 #include <sys/stat.h>
58 
59 #include "access/heapam.h"
60 #include "access/rewriteheap.h"
61 #include "access/transam.h"
62 #include "access/tuptoaster.h"
63 #include "access/xact.h"
64 #include "access/xlog_internal.h"
65 #include "catalog/catalog.h"
66 #include "lib/binaryheap.h"
67 #include "miscadmin.h"
68 #include "pgstat.h"
69 #include "replication/logical.h"
70 #include "replication/reorderbuffer.h"
71 #include "replication/slot.h"
72 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
73 #include "storage/bufmgr.h"
74 #include "storage/fd.h"
75 #include "storage/sinval.h"
76 #include "utils/builtins.h"
77 #include "utils/combocid.h"
78 #include "utils/memdebug.h"
79 #include "utils/memutils.h"
80 #include "utils/rel.h"
81 #include "utils/relfilenodemap.h"
82 
83 
84 /* entry for a hash table we use to map from xid to our transaction state */
85 typedef struct ReorderBufferTXNByIdEnt
86 {
87 	TransactionId xid;
88 	ReorderBufferTXN *txn;
89 } ReorderBufferTXNByIdEnt;
90 
91 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
92 typedef struct ReorderBufferTupleCidKey
93 {
94 	RelFileNode relnode;
95 	ItemPointerData tid;
96 } ReorderBufferTupleCidKey;
97 
98 typedef struct ReorderBufferTupleCidEnt
99 {
100 	ReorderBufferTupleCidKey key;
101 	CommandId	cmin;
102 	CommandId	cmax;
103 	CommandId	combocid;		/* just for debugging */
104 } ReorderBufferTupleCidEnt;
105 
106 /* Virtual file descriptor with file offset tracking */
107 typedef struct TXNEntryFile
108 {
109 	File		vfd;			/* -1 when the file is closed */
110 	off_t		curOffset;		/* offset for next write or read. Reset to 0
111 								 * when vfd is opened. */
112 } TXNEntryFile;
113 
114 /* k-way in-order change iteration support structures */
115 typedef struct ReorderBufferIterTXNEntry
116 {
117 	XLogRecPtr	lsn;
118 	ReorderBufferChange *change;
119 	ReorderBufferTXN *txn;
120 	TXNEntryFile file;
121 	XLogSegNo	segno;
122 } ReorderBufferIterTXNEntry;
123 
124 typedef struct ReorderBufferIterTXNState
125 {
126 	binaryheap *heap;
127 	Size		nr_txns;
128 	dlist_head	old_change;
129 	ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
130 } ReorderBufferIterTXNState;
131 
132 /* toast datastructures */
133 typedef struct ReorderBufferToastEnt
134 {
135 	Oid			chunk_id;		/* toast_table.chunk_id */
136 	int32		last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
137 								 * have seen */
138 	Size		num_chunks;		/* number of chunks we've already seen */
139 	Size		size;			/* combined size of chunks seen */
140 	dlist_head	chunks;			/* linked list of chunks */
141 	struct varlena *reconstructed;	/* reconstructed varlena now pointed to in
142 									 * main tup */
143 } ReorderBufferToastEnt;
144 
145 /* Disk serialization support datastructures */
146 typedef struct ReorderBufferDiskChange
147 {
148 	Size		size;
149 	ReorderBufferChange change;
150 	/* data follows */
151 } ReorderBufferDiskChange;
152 
153 /*
154  * Maximum number of changes kept in memory, per transaction. After that,
155  * changes are spooled to disk.
156  *
157  * The current value should be sufficient to decode the entire transaction
158  * without hitting disk in OLTP workloads, while starting to spool to disk in
159  * other workloads reasonably fast.
160  *
161  * At some point in the future it probably makes sense to have a more elaborate
162  * resource management here, but it's not entirely clear what that would look
163  * like.
164  */
165 static const Size max_changes_in_memory = 4096;
166 
167 /* ---------------------------------------
168  * primary reorderbuffer support routines
169  * ---------------------------------------
170  */
171 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
172 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
173 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
174 											   TransactionId xid, bool create, bool *is_new,
175 											   XLogRecPtr lsn, bool create_as_top);
176 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
177 											  ReorderBufferTXN *subtxn);
178 
179 static void AssertTXNLsnOrder(ReorderBuffer *rb);
180 
181 /* ---------------------------------------
182  * support functions for lsn-order iterating over the ->changes of a
183  * transaction and its subtransactions
184  *
185  * used for iteration over the k-way heap merge of a transaction and its
186  * subtransactions
187  * ---------------------------------------
188  */
189 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
190 									 ReorderBufferIterTXNState *volatile *iter_state);
191 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
192 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
193 									   ReorderBufferIterTXNState *state);
194 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
195 
196 /*
197  * ---------------------------------------
198  * Disk serialization support functions
199  * ---------------------------------------
200  */
201 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
202 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
203 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
204 										 int fd, ReorderBufferChange *change);
205 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
206 										TXNEntryFile *file, XLogSegNo *segno);
207 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
208 									   char *change);
209 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
210 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
211 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
212 										TransactionId xid, XLogSegNo segno);
213 
214 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
215 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
216 									  ReorderBufferTXN *txn, CommandId cid);
217 
218 /* ---------------------------------------
219  * toast reassembly support
220  * ---------------------------------------
221  */
222 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
223 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
224 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
225 									  Relation relation, ReorderBufferChange *change);
226 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
227 										  Relation relation, ReorderBufferChange *change);
228 
229 
230 /*
231  * Allocate a new ReorderBuffer and clean out any old serialized state from
232  * prior ReorderBuffer instances for the same slot.
233  */
234 ReorderBuffer *
ReorderBufferAllocate(void)235 ReorderBufferAllocate(void)
236 {
237 	ReorderBuffer *buffer;
238 	HASHCTL		hash_ctl;
239 	MemoryContext new_ctx;
240 
241 	Assert(MyReplicationSlot != NULL);
242 
243 	/* allocate memory in own context, to have better accountability */
244 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
245 									"ReorderBuffer",
246 									ALLOCSET_DEFAULT_SIZES);
247 
248 	buffer =
249 		(ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
250 
251 	memset(&hash_ctl, 0, sizeof(hash_ctl));
252 
253 	buffer->context = new_ctx;
254 
255 	buffer->change_context = SlabContextCreate(new_ctx,
256 											   "Change",
257 											   SLAB_DEFAULT_BLOCK_SIZE,
258 											   sizeof(ReorderBufferChange));
259 
260 	buffer->txn_context = SlabContextCreate(new_ctx,
261 											"TXN",
262 											SLAB_DEFAULT_BLOCK_SIZE,
263 											sizeof(ReorderBufferTXN));
264 
265 	buffer->tup_context = GenerationContextCreate(new_ctx,
266 												  "Tuples",
267 												  SLAB_LARGE_BLOCK_SIZE);
268 
269 	hash_ctl.keysize = sizeof(TransactionId);
270 	hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
271 	hash_ctl.hcxt = buffer->context;
272 
273 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
274 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
275 
276 	buffer->by_txn_last_xid = InvalidTransactionId;
277 	buffer->by_txn_last_txn = NULL;
278 
279 	buffer->outbuf = NULL;
280 	buffer->outbufsize = 0;
281 
282 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
283 
284 	dlist_init(&buffer->toplevel_by_lsn);
285 	dlist_init(&buffer->txns_by_base_snapshot_lsn);
286 
287 	/*
288 	 * Ensure there's no stale data from prior uses of this slot, in case some
289 	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
290 	 * produce duplicated txns, and it's very cheap if there's nothing there.
291 	 */
292 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
293 
294 	return buffer;
295 }
296 
297 /*
298  * Free a ReorderBuffer
299  */
300 void
ReorderBufferFree(ReorderBuffer * rb)301 ReorderBufferFree(ReorderBuffer *rb)
302 {
303 	MemoryContext context = rb->context;
304 
305 	/*
306 	 * We free separately allocated data by entirely scrapping reorderbuffer's
307 	 * memory context.
308 	 */
309 	MemoryContextDelete(context);
310 
311 	/* Free disk space used by unconsumed reorder buffers */
312 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
313 }
314 
315 /*
316  * Get an unused, possibly preallocated, ReorderBufferTXN.
317  */
318 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)319 ReorderBufferGetTXN(ReorderBuffer *rb)
320 {
321 	ReorderBufferTXN *txn;
322 
323 	txn = (ReorderBufferTXN *)
324 		MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
325 
326 	memset(txn, 0, sizeof(ReorderBufferTXN));
327 
328 	dlist_init(&txn->changes);
329 	dlist_init(&txn->tuplecids);
330 	dlist_init(&txn->subtxns);
331 
332 	return txn;
333 }
334 
335 /*
336  * Free a ReorderBufferTXN.
337  */
338 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)339 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
340 {
341 	/* clean the lookup cache if we were cached (quite likely) */
342 	if (rb->by_txn_last_xid == txn->xid)
343 	{
344 		rb->by_txn_last_xid = InvalidTransactionId;
345 		rb->by_txn_last_txn = NULL;
346 	}
347 
348 	/* free data that's contained */
349 
350 	if (txn->tuplecid_hash != NULL)
351 	{
352 		hash_destroy(txn->tuplecid_hash);
353 		txn->tuplecid_hash = NULL;
354 	}
355 
356 	if (txn->invalidations)
357 	{
358 		pfree(txn->invalidations);
359 		txn->invalidations = NULL;
360 	}
361 
362 	/* Reset the toast hash */
363 	ReorderBufferToastReset(rb, txn);
364 
365 	pfree(txn);
366 }
367 
368 /*
369  * Get an fresh ReorderBufferChange.
370  */
371 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)372 ReorderBufferGetChange(ReorderBuffer *rb)
373 {
374 	ReorderBufferChange *change;
375 
376 	change = (ReorderBufferChange *)
377 		MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
378 
379 	memset(change, 0, sizeof(ReorderBufferChange));
380 	return change;
381 }
382 
383 /*
384  * Free an ReorderBufferChange.
385  */
386 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change)387 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
388 {
389 	/* free contained data */
390 	switch (change->action)
391 	{
392 		case REORDER_BUFFER_CHANGE_INSERT:
393 		case REORDER_BUFFER_CHANGE_UPDATE:
394 		case REORDER_BUFFER_CHANGE_DELETE:
395 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
396 			if (change->data.tp.newtuple)
397 			{
398 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
399 				change->data.tp.newtuple = NULL;
400 			}
401 
402 			if (change->data.tp.oldtuple)
403 			{
404 				ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
405 				change->data.tp.oldtuple = NULL;
406 			}
407 			break;
408 		case REORDER_BUFFER_CHANGE_MESSAGE:
409 			if (change->data.msg.prefix != NULL)
410 				pfree(change->data.msg.prefix);
411 			change->data.msg.prefix = NULL;
412 			if (change->data.msg.message != NULL)
413 				pfree(change->data.msg.message);
414 			change->data.msg.message = NULL;
415 			break;
416 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
417 			if (change->data.snapshot)
418 			{
419 				ReorderBufferFreeSnap(rb, change->data.snapshot);
420 				change->data.snapshot = NULL;
421 			}
422 			break;
423 			/* no data in addition to the struct itself */
424 		case REORDER_BUFFER_CHANGE_TRUNCATE:
425 			if (change->data.truncate.relids != NULL)
426 			{
427 				ReorderBufferReturnRelids(rb, change->data.truncate.relids);
428 				change->data.truncate.relids = NULL;
429 			}
430 			break;
431 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
432 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
433 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
434 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
435 			break;
436 	}
437 
438 	pfree(change);
439 }
440 
441 /*
442  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
443  * tuple_len (excluding header overhead).
444  */
445 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)446 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
447 {
448 	ReorderBufferTupleBuf *tuple;
449 	Size		alloc_len;
450 
451 	alloc_len = tuple_len + SizeofHeapTupleHeader;
452 
453 	tuple = (ReorderBufferTupleBuf *)
454 		MemoryContextAlloc(rb->tup_context,
455 						   sizeof(ReorderBufferTupleBuf) +
456 						   MAXIMUM_ALIGNOF + alloc_len);
457 	tuple->alloc_tuple_size = alloc_len;
458 	tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
459 
460 	return tuple;
461 }
462 
463 /*
464  * Free an ReorderBufferTupleBuf.
465  */
466 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)467 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
468 {
469 	pfree(tuple);
470 }
471 
472 /*
473  * Get an array for relids of truncated relations.
474  *
475  * We use the global memory context (for the whole reorder buffer), because
476  * none of the existing ones seems like a good match (some are SLAB, so we
477  * can't use those, and tup_context is meant for tuple data, not relids). We
478  * could add yet another context, but it seems like an overkill - TRUNCATE is
479  * not particularly common operation, so it does not seem worth it.
480  */
481 Oid *
ReorderBufferGetRelids(ReorderBuffer * rb,int nrelids)482 ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
483 {
484 	Oid		   *relids;
485 	Size		alloc_len;
486 
487 	alloc_len = sizeof(Oid) * nrelids;
488 
489 	relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
490 
491 	return relids;
492 }
493 
494 /*
495  * Free an array of relids.
496  */
497 void
ReorderBufferReturnRelids(ReorderBuffer * rb,Oid * relids)498 ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
499 {
500 	pfree(relids);
501 }
502 
503 /*
504  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
505  * If create is true, and a transaction doesn't already exist, create it
506  * (with the given LSN, and as top transaction if that's specified);
507  * when this happens, is_new is set to true.
508  */
509 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)510 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
511 					  bool *is_new, XLogRecPtr lsn, bool create_as_top)
512 {
513 	ReorderBufferTXN *txn;
514 	ReorderBufferTXNByIdEnt *ent;
515 	bool		found;
516 
517 	Assert(TransactionIdIsValid(xid));
518 
519 	/*
520 	 * Check the one-entry lookup cache first
521 	 */
522 	if (TransactionIdIsValid(rb->by_txn_last_xid) &&
523 		rb->by_txn_last_xid == xid)
524 	{
525 		txn = rb->by_txn_last_txn;
526 
527 		if (txn != NULL)
528 		{
529 			/* found it, and it's valid */
530 			if (is_new)
531 				*is_new = false;
532 			return txn;
533 		}
534 
535 		/*
536 		 * cached as non-existent, and asked not to create? Then nothing else
537 		 * to do.
538 		 */
539 		if (!create)
540 			return NULL;
541 		/* otherwise fall through to create it */
542 	}
543 
544 	/*
545 	 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
546 	 * to create an entry.
547 	 */
548 
549 	/* search the lookup table */
550 	ent = (ReorderBufferTXNByIdEnt *)
551 		hash_search(rb->by_txn,
552 					(void *) &xid,
553 					create ? HASH_ENTER : HASH_FIND,
554 					&found);
555 	if (found)
556 		txn = ent->txn;
557 	else if (create)
558 	{
559 		/* initialize the new entry, if creation was requested */
560 		Assert(ent != NULL);
561 		Assert(lsn != InvalidXLogRecPtr);
562 
563 		ent->txn = ReorderBufferGetTXN(rb);
564 		ent->txn->xid = xid;
565 		txn = ent->txn;
566 		txn->first_lsn = lsn;
567 		txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
568 
569 		if (create_as_top)
570 		{
571 			dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
572 			AssertTXNLsnOrder(rb);
573 		}
574 	}
575 	else
576 		txn = NULL;				/* not found and not asked to create */
577 
578 	/* update cache */
579 	rb->by_txn_last_xid = xid;
580 	rb->by_txn_last_txn = txn;
581 
582 	if (is_new)
583 		*is_new = !found;
584 
585 	Assert(!create || txn != NULL);
586 	return txn;
587 }
588 
589 /*
590  * Queue a change into a transaction so it can be replayed upon commit.
591  */
592 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change)593 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
594 						 ReorderBufferChange *change)
595 {
596 	ReorderBufferTXN *txn;
597 
598 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
599 
600 	change->lsn = lsn;
601 	Assert(InvalidXLogRecPtr != lsn);
602 	dlist_push_tail(&txn->changes, &change->node);
603 	txn->nentries++;
604 	txn->nentries_mem++;
605 
606 	ReorderBufferCheckSerializeTXN(rb, txn);
607 }
608 
609 /*
610  * Queue message into a transaction so it can be processed upon commit.
611  */
612 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)613 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
614 						  Snapshot snapshot, XLogRecPtr lsn,
615 						  bool transactional, const char *prefix,
616 						  Size message_size, const char *message)
617 {
618 	if (transactional)
619 	{
620 		MemoryContext oldcontext;
621 		ReorderBufferChange *change;
622 
623 		Assert(xid != InvalidTransactionId);
624 
625 		oldcontext = MemoryContextSwitchTo(rb->context);
626 
627 		change = ReorderBufferGetChange(rb);
628 		change->action = REORDER_BUFFER_CHANGE_MESSAGE;
629 		change->data.msg.prefix = pstrdup(prefix);
630 		change->data.msg.message_size = message_size;
631 		change->data.msg.message = palloc(message_size);
632 		memcpy(change->data.msg.message, message, message_size);
633 
634 		ReorderBufferQueueChange(rb, xid, lsn, change);
635 
636 		MemoryContextSwitchTo(oldcontext);
637 	}
638 	else
639 	{
640 		ReorderBufferTXN *txn = NULL;
641 		volatile Snapshot snapshot_now = snapshot;
642 
643 		if (xid != InvalidTransactionId)
644 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
645 
646 		/* setup snapshot to allow catalog access */
647 		SetupHistoricSnapshot(snapshot_now, NULL);
648 		PG_TRY();
649 		{
650 			rb->message(rb, txn, lsn, false, prefix, message_size, message);
651 
652 			TeardownHistoricSnapshot(false);
653 		}
654 		PG_CATCH();
655 		{
656 			TeardownHistoricSnapshot(true);
657 			PG_RE_THROW();
658 		}
659 		PG_END_TRY();
660 	}
661 }
662 
663 /*
664  * AssertTXNLsnOrder
665  *		Verify LSN ordering of transaction lists in the reorderbuffer
666  *
667  * Other LSN-related invariants are checked too.
668  *
669  * No-op if assertions are not in use.
670  */
671 static void
AssertTXNLsnOrder(ReorderBuffer * rb)672 AssertTXNLsnOrder(ReorderBuffer *rb)
673 {
674 #ifdef USE_ASSERT_CHECKING
675 	dlist_iter	iter;
676 	XLogRecPtr	prev_first_lsn = InvalidXLogRecPtr;
677 	XLogRecPtr	prev_base_snap_lsn = InvalidXLogRecPtr;
678 
679 	dlist_foreach(iter, &rb->toplevel_by_lsn)
680 	{
681 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
682 													iter.cur);
683 
684 		/* start LSN must be set */
685 		Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
686 
687 		/* If there is an end LSN, it must be higher than start LSN */
688 		if (cur_txn->end_lsn != InvalidXLogRecPtr)
689 			Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
690 
691 		/* Current initial LSN must be strictly higher than previous */
692 		if (prev_first_lsn != InvalidXLogRecPtr)
693 			Assert(prev_first_lsn < cur_txn->first_lsn);
694 
695 		/* known-as-subtxn txns must not be listed */
696 		Assert(!cur_txn->is_known_as_subxact);
697 
698 		prev_first_lsn = cur_txn->first_lsn;
699 	}
700 
701 	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
702 	{
703 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
704 													base_snapshot_node,
705 													iter.cur);
706 
707 		/* base snapshot (and its LSN) must be set */
708 		Assert(cur_txn->base_snapshot != NULL);
709 		Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
710 
711 		/* current LSN must be strictly higher than previous */
712 		if (prev_base_snap_lsn != InvalidXLogRecPtr)
713 			Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
714 
715 		/* known-as-subtxn txns must not be listed */
716 		Assert(!cur_txn->is_known_as_subxact);
717 
718 		prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
719 	}
720 #endif
721 }
722 
723 /*
724  * ReorderBufferGetOldestTXN
725  *		Return oldest transaction in reorderbuffer
726  */
727 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)728 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
729 {
730 	ReorderBufferTXN *txn;
731 
732 	AssertTXNLsnOrder(rb);
733 
734 	if (dlist_is_empty(&rb->toplevel_by_lsn))
735 		return NULL;
736 
737 	txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
738 
739 	Assert(!txn->is_known_as_subxact);
740 	Assert(txn->first_lsn != InvalidXLogRecPtr);
741 	return txn;
742 }
743 
744 /*
745  * ReorderBufferGetOldestXmin
746  *		Return oldest Xmin in reorderbuffer
747  *
748  * Returns oldest possibly running Xid from the point of view of snapshots
749  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
750  * there are none.
751  *
752  * Since snapshots are assigned monotonically, this equals the Xmin of the
753  * base snapshot with minimal base_snapshot_lsn.
754  */
755 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)756 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
757 {
758 	ReorderBufferTXN *txn;
759 
760 	AssertTXNLsnOrder(rb);
761 
762 	if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
763 		return InvalidTransactionId;
764 
765 	txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
766 							 &rb->txns_by_base_snapshot_lsn);
767 	return txn->base_snapshot->xmin;
768 }
769 
770 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)771 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
772 {
773 	rb->current_restart_decoding_lsn = ptr;
774 }
775 
776 /*
777  * ReorderBufferAssignChild
778  *
779  * Make note that we know that subxid is a subtransaction of xid, seen as of
780  * the given lsn.
781  */
782 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)783 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
784 						 TransactionId subxid, XLogRecPtr lsn)
785 {
786 	ReorderBufferTXN *txn;
787 	ReorderBufferTXN *subtxn;
788 	bool		new_top;
789 	bool		new_sub;
790 
791 	txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
792 	subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
793 
794 	if (!new_sub)
795 	{
796 		if (subtxn->is_known_as_subxact)
797 		{
798 			/* already associated, nothing to do */
799 			return;
800 		}
801 		else
802 		{
803 			/*
804 			 * We already saw this transaction, but initially added it to the
805 			 * list of top-level txns.  Now that we know it's not top-level,
806 			 * remove it from there.
807 			 */
808 			dlist_delete(&subtxn->node);
809 		}
810 	}
811 
812 	subtxn->is_known_as_subxact = true;
813 	subtxn->toplevel_xid = xid;
814 	Assert(subtxn->nsubtxns == 0);
815 
816 	/* add to subtransaction list */
817 	dlist_push_tail(&txn->subtxns, &subtxn->node);
818 	txn->nsubtxns++;
819 
820 	/* Possibly transfer the subtxn's snapshot to its top-level txn. */
821 	ReorderBufferTransferSnapToParent(txn, subtxn);
822 
823 	/* Verify LSN-ordering invariant */
824 	AssertTXNLsnOrder(rb);
825 }
826 
827 /*
828  * ReorderBufferTransferSnapToParent
829  *		Transfer base snapshot from subtxn to top-level txn, if needed
830  *
831  * This is done if the top-level txn doesn't have a base snapshot, or if the
832  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
833  * snapshot's LSN.  This can happen if there are no changes in the toplevel
834  * txn but there are some in the subtxn, or the first change in subtxn has
835  * earlier LSN than first change in the top-level txn and we learned about
836  * their kinship only now.
837  *
838  * The subtransaction's snapshot is cleared regardless of the transfer
839  * happening, since it's not needed anymore in either case.
840  *
841  * We do this as soon as we become aware of their kinship, to avoid queueing
842  * extra snapshots to txns known-as-subtxns -- only top-level txns will
843  * receive further snapshots.
844  */
845 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)846 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
847 								  ReorderBufferTXN *subtxn)
848 {
849 	Assert(subtxn->toplevel_xid == txn->xid);
850 
851 	if (subtxn->base_snapshot != NULL)
852 	{
853 		if (txn->base_snapshot == NULL ||
854 			subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
855 		{
856 			/*
857 			 * If the toplevel transaction already has a base snapshot but
858 			 * it's newer than the subxact's, purge it.
859 			 */
860 			if (txn->base_snapshot != NULL)
861 			{
862 				SnapBuildSnapDecRefcount(txn->base_snapshot);
863 				dlist_delete(&txn->base_snapshot_node);
864 			}
865 
866 			/*
867 			 * The snapshot is now the top transaction's; transfer it, and
868 			 * adjust the list position of the top transaction in the list by
869 			 * moving it to where the subtransaction is.
870 			 */
871 			txn->base_snapshot = subtxn->base_snapshot;
872 			txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
873 			dlist_insert_before(&subtxn->base_snapshot_node,
874 								&txn->base_snapshot_node);
875 
876 			/*
877 			 * The subtransaction doesn't have a snapshot anymore (so it
878 			 * mustn't be in the list.)
879 			 */
880 			subtxn->base_snapshot = NULL;
881 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
882 			dlist_delete(&subtxn->base_snapshot_node);
883 		}
884 		else
885 		{
886 			/* Base snap of toplevel is fine, so subxact's is not needed */
887 			SnapBuildSnapDecRefcount(subtxn->base_snapshot);
888 			dlist_delete(&subtxn->base_snapshot_node);
889 			subtxn->base_snapshot = NULL;
890 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
891 		}
892 	}
893 }
894 
895 /*
896  * Associate a subtransaction with its toplevel transaction at commit
897  * time. There may be no further changes added after this.
898  */
899 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)900 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
901 						 TransactionId subxid, XLogRecPtr commit_lsn,
902 						 XLogRecPtr end_lsn)
903 {
904 	ReorderBufferTXN *subtxn;
905 
906 	subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
907 								   InvalidXLogRecPtr, false);
908 
909 	/*
910 	 * No need to do anything if that subtxn didn't contain any changes
911 	 */
912 	if (!subtxn)
913 		return;
914 
915 	subtxn->final_lsn = commit_lsn;
916 	subtxn->end_lsn = end_lsn;
917 
918 	/*
919 	 * Assign this subxact as a child of the toplevel xact (no-op if already
920 	 * done.)
921 	 */
922 	ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
923 }
924 
925 
926 /*
927  * Support for efficiently iterating over a transaction's and its
928  * subtransactions' changes.
929  *
930  * We do by doing a k-way merge between transactions/subtransactions. For that
931  * we model the current heads of the different transactions as a binary heap
932  * so we easily know which (sub-)transaction has the change with the smallest
933  * lsn next.
934  *
935  * We assume the changes in individual transactions are already sorted by LSN.
936  */
937 
938 /*
939  * Binary heap comparison function.
940  */
941 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)942 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
943 {
944 	ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
945 	XLogRecPtr	pos_a = state->entries[DatumGetInt32(a)].lsn;
946 	XLogRecPtr	pos_b = state->entries[DatumGetInt32(b)].lsn;
947 
948 	if (pos_a < pos_b)
949 		return 1;
950 	else if (pos_a == pos_b)
951 		return 0;
952 	return -1;
953 }
954 
955 /*
956  * Allocate & initialize an iterator which iterates in lsn order over a
957  * transaction and all its subtransactions.
958  *
959  * Note: The iterator state is returned through iter_state parameter rather
960  * than the function's return value.  This is because the state gets cleaned up
961  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
962  * back the state even if this function throws an exception.
963  */
964 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)965 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
966 						 ReorderBufferIterTXNState *volatile *iter_state)
967 {
968 	Size		nr_txns = 0;
969 	ReorderBufferIterTXNState *state;
970 	dlist_iter	cur_txn_i;
971 	int32		off;
972 
973 	*iter_state = NULL;
974 
975 	/*
976 	 * Calculate the size of our heap: one element for every transaction that
977 	 * contains changes.  (Besides the transactions already in the reorder
978 	 * buffer, we count the one we were directly passed.)
979 	 */
980 	if (txn->nentries > 0)
981 		nr_txns++;
982 
983 	dlist_foreach(cur_txn_i, &txn->subtxns)
984 	{
985 		ReorderBufferTXN *cur_txn;
986 
987 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
988 
989 		if (cur_txn->nentries > 0)
990 			nr_txns++;
991 	}
992 
993 	/*
994 	 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
995 	 * need to allocate/build a heap then.
996 	 */
997 
998 	/* allocate iteration state */
999 	state = (ReorderBufferIterTXNState *)
1000 		MemoryContextAllocZero(rb->context,
1001 							   sizeof(ReorderBufferIterTXNState) +
1002 							   sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1003 
1004 	state->nr_txns = nr_txns;
1005 	dlist_init(&state->old_change);
1006 
1007 	for (off = 0; off < state->nr_txns; off++)
1008 	{
1009 		state->entries[off].file.vfd = -1;
1010 		state->entries[off].segno = 0;
1011 	}
1012 
1013 	/* allocate heap */
1014 	state->heap = binaryheap_allocate(state->nr_txns,
1015 									  ReorderBufferIterCompare,
1016 									  state);
1017 
1018 	/* Now that the state fields are initialized, it is safe to return it. */
1019 	*iter_state = state;
1020 
1021 	/*
1022 	 * Now insert items into the binary heap, in an unordered fashion.  (We
1023 	 * will run a heap assembly step at the end; this is more efficient.)
1024 	 */
1025 
1026 	off = 0;
1027 
1028 	/* add toplevel transaction if it contains changes */
1029 	if (txn->nentries > 0)
1030 	{
1031 		ReorderBufferChange *cur_change;
1032 
1033 		if (txn->serialized)
1034 		{
1035 			/* serialize remaining changes */
1036 			ReorderBufferSerializeTXN(rb, txn);
1037 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1038 										&state->entries[off].segno);
1039 		}
1040 
1041 		cur_change = dlist_head_element(ReorderBufferChange, node,
1042 										&txn->changes);
1043 
1044 		state->entries[off].lsn = cur_change->lsn;
1045 		state->entries[off].change = cur_change;
1046 		state->entries[off].txn = txn;
1047 
1048 		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1049 	}
1050 
1051 	/* add subtransactions if they contain changes */
1052 	dlist_foreach(cur_txn_i, &txn->subtxns)
1053 	{
1054 		ReorderBufferTXN *cur_txn;
1055 
1056 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1057 
1058 		if (cur_txn->nentries > 0)
1059 		{
1060 			ReorderBufferChange *cur_change;
1061 
1062 			if (cur_txn->serialized)
1063 			{
1064 				/* serialize remaining changes */
1065 				ReorderBufferSerializeTXN(rb, cur_txn);
1066 				ReorderBufferRestoreChanges(rb, cur_txn,
1067 											&state->entries[off].file,
1068 											&state->entries[off].segno);
1069 			}
1070 			cur_change = dlist_head_element(ReorderBufferChange, node,
1071 											&cur_txn->changes);
1072 
1073 			state->entries[off].lsn = cur_change->lsn;
1074 			state->entries[off].change = cur_change;
1075 			state->entries[off].txn = cur_txn;
1076 
1077 			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1078 		}
1079 	}
1080 
1081 	/* assemble a valid binary heap */
1082 	binaryheap_build(state->heap);
1083 }
1084 
1085 /*
1086  * Return the next change when iterating over a transaction and its
1087  * subtransactions.
1088  *
1089  * Returns NULL when no further changes exist.
1090  */
1091 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1092 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1093 {
1094 	ReorderBufferChange *change;
1095 	ReorderBufferIterTXNEntry *entry;
1096 	int32		off;
1097 
1098 	/* nothing there anymore */
1099 	if (state->heap->bh_size == 0)
1100 		return NULL;
1101 
1102 	off = DatumGetInt32(binaryheap_first(state->heap));
1103 	entry = &state->entries[off];
1104 
1105 	/* free memory we might have "leaked" in the previous *Next call */
1106 	if (!dlist_is_empty(&state->old_change))
1107 	{
1108 		change = dlist_container(ReorderBufferChange, node,
1109 								 dlist_pop_head_node(&state->old_change));
1110 		ReorderBufferReturnChange(rb, change);
1111 		Assert(dlist_is_empty(&state->old_change));
1112 	}
1113 
1114 	change = entry->change;
1115 
1116 	/*
1117 	 * update heap with information about which transaction has the next
1118 	 * relevant change in LSN order
1119 	 */
1120 
1121 	/* there are in-memory changes */
1122 	if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1123 	{
1124 		dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1125 		ReorderBufferChange *next_change =
1126 		dlist_container(ReorderBufferChange, node, next);
1127 
1128 		/* txn stays the same */
1129 		state->entries[off].lsn = next_change->lsn;
1130 		state->entries[off].change = next_change;
1131 
1132 		binaryheap_replace_first(state->heap, Int32GetDatum(off));
1133 		return change;
1134 	}
1135 
1136 	/* try to load changes from disk */
1137 	if (entry->txn->nentries != entry->txn->nentries_mem)
1138 	{
1139 		/*
1140 		 * Ugly: restoring changes will reuse *Change records, thus delete the
1141 		 * current one from the per-tx list and only free in the next call.
1142 		 */
1143 		dlist_delete(&change->node);
1144 		dlist_push_tail(&state->old_change, &change->node);
1145 
1146 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1147 										&state->entries[off].segno))
1148 		{
1149 			/* successfully restored changes from disk */
1150 			ReorderBufferChange *next_change =
1151 			dlist_head_element(ReorderBufferChange, node,
1152 							   &entry->txn->changes);
1153 
1154 			elog(DEBUG2, "restored %u/%u changes from disk",
1155 				 (uint32) entry->txn->nentries_mem,
1156 				 (uint32) entry->txn->nentries);
1157 
1158 			Assert(entry->txn->nentries_mem);
1159 			/* txn stays the same */
1160 			state->entries[off].lsn = next_change->lsn;
1161 			state->entries[off].change = next_change;
1162 			binaryheap_replace_first(state->heap, Int32GetDatum(off));
1163 
1164 			return change;
1165 		}
1166 	}
1167 
1168 	/* ok, no changes there anymore, remove */
1169 	binaryheap_remove_first(state->heap);
1170 
1171 	return change;
1172 }
1173 
1174 /*
1175  * Deallocate the iterator
1176  */
1177 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1178 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1179 						   ReorderBufferIterTXNState *state)
1180 {
1181 	int32		off;
1182 
1183 	for (off = 0; off < state->nr_txns; off++)
1184 	{
1185 		if (state->entries[off].file.vfd != -1)
1186 			FileClose(state->entries[off].file.vfd);
1187 	}
1188 
1189 	/* free memory we might have "leaked" in the last *Next call */
1190 	if (!dlist_is_empty(&state->old_change))
1191 	{
1192 		ReorderBufferChange *change;
1193 
1194 		change = dlist_container(ReorderBufferChange, node,
1195 								 dlist_pop_head_node(&state->old_change));
1196 		ReorderBufferReturnChange(rb, change);
1197 		Assert(dlist_is_empty(&state->old_change));
1198 	}
1199 
1200 	binaryheap_free(state->heap);
1201 	pfree(state);
1202 }
1203 
1204 /*
1205  * Cleanup the contents of a transaction, usually after the transaction
1206  * committed or aborted.
1207  */
1208 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1209 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1210 {
1211 	bool		found;
1212 	dlist_mutable_iter iter;
1213 
1214 	/* cleanup subtransactions & their changes */
1215 	dlist_foreach_modify(iter, &txn->subtxns)
1216 	{
1217 		ReorderBufferTXN *subtxn;
1218 
1219 		subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1220 
1221 		/*
1222 		 * Subtransactions are always associated to the toplevel TXN, even if
1223 		 * they originally were happening inside another subtxn, so we won't
1224 		 * ever recurse more than one level deep here.
1225 		 */
1226 		Assert(subtxn->is_known_as_subxact);
1227 		Assert(subtxn->nsubtxns == 0);
1228 
1229 		ReorderBufferCleanupTXN(rb, subtxn);
1230 	}
1231 
1232 	/* cleanup changes in the toplevel txn */
1233 	dlist_foreach_modify(iter, &txn->changes)
1234 	{
1235 		ReorderBufferChange *change;
1236 
1237 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1238 
1239 		ReorderBufferReturnChange(rb, change);
1240 	}
1241 
1242 	/*
1243 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1244 	 * They are always stored in the toplevel transaction.
1245 	 */
1246 	dlist_foreach_modify(iter, &txn->tuplecids)
1247 	{
1248 		ReorderBufferChange *change;
1249 
1250 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1251 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1252 		ReorderBufferReturnChange(rb, change);
1253 	}
1254 
1255 	/*
1256 	 * Cleanup the base snapshot, if set.
1257 	 */
1258 	if (txn->base_snapshot != NULL)
1259 	{
1260 		SnapBuildSnapDecRefcount(txn->base_snapshot);
1261 		dlist_delete(&txn->base_snapshot_node);
1262 	}
1263 
1264 	/*
1265 	 * Remove TXN from its containing list.
1266 	 *
1267 	 * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1268 	 * parent's list of known subxacts; this leaves the parent's nsubxacts
1269 	 * count too high, but we don't care.  Otherwise, we are deleting the TXN
1270 	 * from the LSN-ordered list of toplevel TXNs.
1271 	 */
1272 	dlist_delete(&txn->node);
1273 
1274 	/* now remove reference from buffer */
1275 	hash_search(rb->by_txn,
1276 				(void *) &txn->xid,
1277 				HASH_REMOVE,
1278 				&found);
1279 	Assert(found);
1280 
1281 	/* remove entries spilled to disk */
1282 	if (txn->serialized)
1283 		ReorderBufferRestoreCleanup(rb, txn);
1284 
1285 	/* deallocate */
1286 	ReorderBufferReturnTXN(rb, txn);
1287 }
1288 
1289 /*
1290  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1291  * HeapTupleSatisfiesHistoricMVCC.
1292  */
1293 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1294 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1295 {
1296 	dlist_iter	iter;
1297 	HASHCTL		hash_ctl;
1298 
1299 	if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1300 		return;
1301 
1302 	memset(&hash_ctl, 0, sizeof(hash_ctl));
1303 
1304 	hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1305 	hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1306 	hash_ctl.hcxt = rb->context;
1307 
1308 	/*
1309 	 * create the hash with the exact number of to-be-stored tuplecids from
1310 	 * the start
1311 	 */
1312 	txn->tuplecid_hash =
1313 		hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1314 					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1315 
1316 	dlist_foreach(iter, &txn->tuplecids)
1317 	{
1318 		ReorderBufferTupleCidKey key;
1319 		ReorderBufferTupleCidEnt *ent;
1320 		bool		found;
1321 		ReorderBufferChange *change;
1322 
1323 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1324 
1325 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1326 
1327 		/* be careful about padding */
1328 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1329 
1330 		key.relnode = change->data.tuplecid.node;
1331 
1332 		ItemPointerCopy(&change->data.tuplecid.tid,
1333 						&key.tid);
1334 
1335 		ent = (ReorderBufferTupleCidEnt *)
1336 			hash_search(txn->tuplecid_hash,
1337 						(void *) &key,
1338 						HASH_ENTER,
1339 						&found);
1340 		if (!found)
1341 		{
1342 			ent->cmin = change->data.tuplecid.cmin;
1343 			ent->cmax = change->data.tuplecid.cmax;
1344 			ent->combocid = change->data.tuplecid.combocid;
1345 		}
1346 		else
1347 		{
1348 			/*
1349 			 * Maybe we already saw this tuple before in this transaction, but
1350 			 * if so it must have the same cmin.
1351 			 */
1352 			Assert(ent->cmin == change->data.tuplecid.cmin);
1353 
1354 			/*
1355 			 * cmax may be initially invalid, but once set it can only grow,
1356 			 * and never become invalid again.
1357 			 */
1358 			Assert((ent->cmax == InvalidCommandId) ||
1359 				   ((change->data.tuplecid.cmax != InvalidCommandId) &&
1360 					(change->data.tuplecid.cmax > ent->cmax)));
1361 			ent->cmax = change->data.tuplecid.cmax;
1362 		}
1363 	}
1364 }
1365 
1366 /*
1367  * Copy a provided snapshot so we can modify it privately. This is needed so
1368  * that catalog modifying transactions can look into intermediate catalog
1369  * states.
1370  */
1371 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1372 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1373 					  ReorderBufferTXN *txn, CommandId cid)
1374 {
1375 	Snapshot	snap;
1376 	dlist_iter	iter;
1377 	int			i = 0;
1378 	Size		size;
1379 
1380 	size = sizeof(SnapshotData) +
1381 		sizeof(TransactionId) * orig_snap->xcnt +
1382 		sizeof(TransactionId) * (txn->nsubtxns + 1);
1383 
1384 	snap = MemoryContextAllocZero(rb->context, size);
1385 	memcpy(snap, orig_snap, sizeof(SnapshotData));
1386 
1387 	snap->copied = true;
1388 	snap->active_count = 1;		/* mark as active so nobody frees it */
1389 	snap->regd_count = 0;
1390 	snap->xip = (TransactionId *) (snap + 1);
1391 
1392 	memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1393 
1394 	/*
1395 	 * snap->subxip contains all txids that belong to our transaction which we
1396 	 * need to check via cmin/cmax. That's why we store the toplevel
1397 	 * transaction in there as well.
1398 	 */
1399 	snap->subxip = snap->xip + snap->xcnt;
1400 	snap->subxip[i++] = txn->xid;
1401 
1402 	/*
1403 	 * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1404 	 * Since it's an upper boundary it is safe to use it for the allocation
1405 	 * above.
1406 	 */
1407 	snap->subxcnt = 1;
1408 
1409 	dlist_foreach(iter, &txn->subtxns)
1410 	{
1411 		ReorderBufferTXN *sub_txn;
1412 
1413 		sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1414 		snap->subxip[i++] = sub_txn->xid;
1415 		snap->subxcnt++;
1416 	}
1417 
1418 	/* sort so we can bsearch() later */
1419 	qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1420 
1421 	/* store the specified current CommandId */
1422 	snap->curcid = cid;
1423 
1424 	return snap;
1425 }
1426 
1427 /*
1428  * Free a previously ReorderBufferCopySnap'ed snapshot
1429  */
1430 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1431 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1432 {
1433 	if (snap->copied)
1434 		pfree(snap);
1435 	else
1436 		SnapBuildSnapDecRefcount(snap);
1437 }
1438 
1439 /*
1440  * Perform the replay of a transaction and its non-aborted subtransactions.
1441  *
1442  * Subtransactions previously have to be processed by
1443  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1444  * transaction with ReorderBufferAssignChild.
1445  *
1446  * We currently can only decode a transaction's contents when its commit
1447  * record is read because that's the only place where we know about cache
1448  * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1449  * and subtransactions (using a k-way merge) and replay the changes in lsn
1450  * order.
1451  */
1452 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)1453 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1454 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1455 					TimestampTz commit_time,
1456 					RepOriginId origin_id, XLogRecPtr origin_lsn)
1457 {
1458 	ReorderBufferTXN *txn;
1459 	volatile Snapshot snapshot_now;
1460 	volatile CommandId command_id = FirstCommandId;
1461 	bool		using_subtxn;
1462 	ReorderBufferIterTXNState *volatile iterstate = NULL;
1463 
1464 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1465 								false);
1466 
1467 	/* unknown transaction, nothing to replay */
1468 	if (txn == NULL)
1469 		return;
1470 
1471 	txn->final_lsn = commit_lsn;
1472 	txn->end_lsn = end_lsn;
1473 	txn->commit_time = commit_time;
1474 	txn->origin_id = origin_id;
1475 	txn->origin_lsn = origin_lsn;
1476 
1477 	/*
1478 	 * If this transaction has no snapshot, it didn't make any changes to the
1479 	 * database, so there's nothing to decode.  Note that
1480 	 * ReorderBufferCommitChild will have transferred any snapshots from
1481 	 * subtransactions if there were any.
1482 	 */
1483 	if (txn->base_snapshot == NULL)
1484 	{
1485 		Assert(txn->ninvalidations == 0);
1486 		ReorderBufferCleanupTXN(rb, txn);
1487 		return;
1488 	}
1489 
1490 	snapshot_now = txn->base_snapshot;
1491 
1492 	/* build data to be able to lookup the CommandIds of catalog tuples */
1493 	ReorderBufferBuildTupleCidHash(rb, txn);
1494 
1495 	/* setup the initial snapshot */
1496 	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1497 
1498 	/*
1499 	 * Decoding needs access to syscaches et al., which in turn use
1500 	 * heavyweight locks and such. Thus we need to have enough state around to
1501 	 * keep track of those.  The easiest way is to simply use a transaction
1502 	 * internally.  That also allows us to easily enforce that nothing writes
1503 	 * to the database by checking for xid assignments.
1504 	 *
1505 	 * When we're called via the SQL SRF there's already a transaction
1506 	 * started, so start an explicit subtransaction there.
1507 	 */
1508 	using_subtxn = IsTransactionOrTransactionBlock();
1509 
1510 	PG_TRY();
1511 	{
1512 		ReorderBufferChange *change;
1513 		ReorderBufferChange *specinsert = NULL;
1514 
1515 		if (using_subtxn)
1516 			BeginInternalSubTransaction("replay");
1517 		else
1518 			StartTransactionCommand();
1519 
1520 		rb->begin(rb, txn);
1521 
1522 		ReorderBufferIterTXNInit(rb, txn, &iterstate);
1523 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1524 		{
1525 			Relation	relation = NULL;
1526 			Oid			reloid;
1527 
1528 			switch (change->action)
1529 			{
1530 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1531 
1532 					/*
1533 					 * Confirmation for speculative insertion arrived. Simply
1534 					 * use as a normal record. It'll be cleaned up at the end
1535 					 * of INSERT processing.
1536 					 */
1537 					if (specinsert == NULL)
1538 						elog(ERROR, "invalid ordering of speculative insertion changes");
1539 					Assert(specinsert->data.tp.oldtuple == NULL);
1540 					change = specinsert;
1541 					change->action = REORDER_BUFFER_CHANGE_INSERT;
1542 
1543 					/* intentionally fall through */
1544 				case REORDER_BUFFER_CHANGE_INSERT:
1545 				case REORDER_BUFFER_CHANGE_UPDATE:
1546 				case REORDER_BUFFER_CHANGE_DELETE:
1547 					Assert(snapshot_now);
1548 
1549 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1550 												change->data.tp.relnode.relNode);
1551 
1552 					/*
1553 					 * Mapped catalog tuple without data, emitted while
1554 					 * catalog table was in the process of being rewritten. We
1555 					 * can fail to look up the relfilenode, because the
1556 					 * relmapper has no "historic" view, in contrast to normal
1557 					 * the normal catalog during decoding. Thus repeated
1558 					 * rewrites can cause a lookup failure. That's OK because
1559 					 * we do not decode catalog changes anyway. Normally such
1560 					 * tuples would be skipped over below, but we can't
1561 					 * identify whether the table should be logically logged
1562 					 * without mapping the relfilenode to the oid.
1563 					 */
1564 					if (reloid == InvalidOid &&
1565 						change->data.tp.newtuple == NULL &&
1566 						change->data.tp.oldtuple == NULL)
1567 						goto change_done;
1568 					else if (reloid == InvalidOid)
1569 						elog(ERROR, "could not map filenode \"%s\" to relation OID",
1570 							 relpathperm(change->data.tp.relnode,
1571 										 MAIN_FORKNUM));
1572 
1573 					relation = RelationIdGetRelation(reloid);
1574 
1575 					if (!RelationIsValid(relation))
1576 						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1577 							 reloid,
1578 							 relpathperm(change->data.tp.relnode,
1579 										 MAIN_FORKNUM));
1580 
1581 					if (!RelationIsLogicallyLogged(relation))
1582 						goto change_done;
1583 
1584 					/*
1585 					 * Ignore temporary heaps created during DDL unless the
1586 					 * plugin has asked for them.
1587 					 */
1588 					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1589 						goto change_done;
1590 
1591 					/*
1592 					 * For now ignore sequence changes entirely. Most of the
1593 					 * time they don't log changes using records we
1594 					 * understand, so it doesn't make sense to handle the few
1595 					 * cases we do.
1596 					 */
1597 					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1598 						goto change_done;
1599 
1600 					/* user-triggered change */
1601 					if (!IsToastRelation(relation))
1602 					{
1603 						ReorderBufferToastReplace(rb, txn, relation, change);
1604 						rb->apply_change(rb, txn, relation, change);
1605 
1606 						/*
1607 						 * Only clear reassembled toast chunks if we're sure
1608 						 * they're not required anymore. The creator of the
1609 						 * tuple tells us.
1610 						 */
1611 						if (change->data.tp.clear_toast_afterwards)
1612 							ReorderBufferToastReset(rb, txn);
1613 					}
1614 					/* we're not interested in toast deletions */
1615 					else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1616 					{
1617 						/*
1618 						 * Need to reassemble the full toasted Datum in
1619 						 * memory, to ensure the chunks don't get reused till
1620 						 * we're done remove it from the list of this
1621 						 * transaction's changes. Otherwise it will get
1622 						 * freed/reused while restoring spooled data from
1623 						 * disk.
1624 						 */
1625 						Assert(change->data.tp.newtuple != NULL);
1626 
1627 						dlist_delete(&change->node);
1628 						ReorderBufferToastAppendChunk(rb, txn, relation,
1629 													  change);
1630 					}
1631 
1632 			change_done:
1633 
1634 					/*
1635 					 * If speculative insertion was confirmed, the record isn't
1636 					 * needed anymore.
1637 					 */
1638 					if (specinsert != NULL)
1639 					{
1640 						ReorderBufferReturnChange(rb, specinsert);
1641 						specinsert = NULL;
1642 					}
1643 
1644 					if (relation != NULL)
1645 					{
1646 						RelationClose(relation);
1647 						relation = NULL;
1648 					}
1649 					break;
1650 
1651 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1652 
1653 					/*
1654 					 * Speculative insertions are dealt with by delaying the
1655 					 * processing of the insert until the confirmation record
1656 					 * arrives. For that we simply unlink the record from the
1657 					 * chain, so it does not get freed/reused while restoring
1658 					 * spooled data from disk.
1659 					 *
1660 					 * This is safe in the face of concurrent catalog changes
1661 					 * because the relevant relation can't be changed between
1662 					 * speculative insertion and confirmation due to
1663 					 * CheckTableNotInUse() and locking.
1664 					 */
1665 
1666 					/* clear out a pending (and thus failed) speculation */
1667 					if (specinsert != NULL)
1668 					{
1669 						ReorderBufferReturnChange(rb, specinsert);
1670 						specinsert = NULL;
1671 					}
1672 
1673 					/* and memorize the pending insertion */
1674 					dlist_delete(&change->node);
1675 					specinsert = change;
1676 					break;
1677 
1678 				case REORDER_BUFFER_CHANGE_TRUNCATE:
1679 					{
1680 						int			i;
1681 						int			nrelids = change->data.truncate.nrelids;
1682 						int			nrelations = 0;
1683 						Relation   *relations;
1684 
1685 						relations = palloc0(nrelids * sizeof(Relation));
1686 						for (i = 0; i < nrelids; i++)
1687 						{
1688 							Oid			relid = change->data.truncate.relids[i];
1689 							Relation	relation;
1690 
1691 							relation = RelationIdGetRelation(relid);
1692 
1693 							if (!RelationIsValid(relation))
1694 								elog(ERROR, "could not open relation with OID %u", relid);
1695 
1696 							if (!RelationIsLogicallyLogged(relation))
1697 								continue;
1698 
1699 							relations[nrelations++] = relation;
1700 						}
1701 
1702 						rb->apply_truncate(rb, txn, nrelations, relations, change);
1703 
1704 						for (i = 0; i < nrelations; i++)
1705 							RelationClose(relations[i]);
1706 
1707 						break;
1708 					}
1709 
1710 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
1711 
1712 					/*
1713 					 * Abort for speculative insertion arrived. So cleanup the
1714 					 * specinsert tuple and toast hash.
1715 					 *
1716 					 * Note that we get the spec abort change for each toast
1717 					 * entry but we need to perform the cleanup only the first
1718 					 * time we get it for the main table.
1719 					 */
1720 					if (specinsert != NULL)
1721 					{
1722 						/*
1723 						 * We must clean the toast hash before processing a
1724 						 * completely new tuple to avoid confusion about the
1725 						 * previous tuple's toast chunks.
1726 						 */
1727 						Assert(change->data.tp.clear_toast_afterwards);
1728 						ReorderBufferToastReset(rb, txn);
1729 
1730 						/* We don't need this record anymore. */
1731 						ReorderBufferReturnChange(rb, specinsert);
1732 						specinsert = NULL;
1733 					}
1734 					break;
1735 
1736 				case REORDER_BUFFER_CHANGE_MESSAGE:
1737 					rb->message(rb, txn, change->lsn, true,
1738 								change->data.msg.prefix,
1739 								change->data.msg.message_size,
1740 								change->data.msg.message);
1741 					break;
1742 
1743 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1744 					/* get rid of the old */
1745 					TeardownHistoricSnapshot(false);
1746 
1747 					if (snapshot_now->copied)
1748 					{
1749 						ReorderBufferFreeSnap(rb, snapshot_now);
1750 						snapshot_now =
1751 							ReorderBufferCopySnap(rb, change->data.snapshot,
1752 												  txn, command_id);
1753 					}
1754 
1755 					/*
1756 					 * Restored from disk, need to be careful not to double
1757 					 * free. We could introduce refcounting for that, but for
1758 					 * now this seems infrequent enough not to care.
1759 					 */
1760 					else if (change->data.snapshot->copied)
1761 					{
1762 						snapshot_now =
1763 							ReorderBufferCopySnap(rb, change->data.snapshot,
1764 												  txn, command_id);
1765 					}
1766 					else
1767 					{
1768 						snapshot_now = change->data.snapshot;
1769 					}
1770 
1771 
1772 					/* and continue with the new one */
1773 					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1774 					break;
1775 
1776 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1777 					Assert(change->data.command_id != InvalidCommandId);
1778 
1779 					if (command_id < change->data.command_id)
1780 					{
1781 						command_id = change->data.command_id;
1782 
1783 						if (!snapshot_now->copied)
1784 						{
1785 							/* we don't use the global one anymore */
1786 							snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1787 																 txn, command_id);
1788 						}
1789 
1790 						snapshot_now->curcid = command_id;
1791 
1792 						TeardownHistoricSnapshot(false);
1793 						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1794 
1795 						/*
1796 						 * Every time the CommandId is incremented, we could
1797 						 * see new catalog contents, so execute all
1798 						 * invalidations.
1799 						 */
1800 						ReorderBufferExecuteInvalidations(rb, txn);
1801 					}
1802 
1803 					break;
1804 
1805 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1806 					elog(ERROR, "tuplecid value in changequeue");
1807 					break;
1808 			}
1809 		}
1810 
1811 		/* speculative insertion record must be freed by now */
1812 		Assert(!specinsert);
1813 
1814 		/* clean up the iterator */
1815 		ReorderBufferIterTXNFinish(rb, iterstate);
1816 		iterstate = NULL;
1817 
1818 		/* call commit callback */
1819 		rb->commit(rb, txn, commit_lsn);
1820 
1821 		/* this is just a sanity check against bad output plugin behaviour */
1822 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1823 			elog(ERROR, "output plugin used XID %u",
1824 				 GetCurrentTransactionId());
1825 
1826 		/* cleanup */
1827 		TeardownHistoricSnapshot(false);
1828 
1829 		/*
1830 		 * Aborting the current (sub-)transaction as a whole has the right
1831 		 * semantics. We want all locks acquired in here to be released, not
1832 		 * reassigned to the parent and we do not want any database access
1833 		 * have persistent effects.
1834 		 */
1835 		AbortCurrentTransaction();
1836 
1837 		/* make sure there's no cache pollution */
1838 		ReorderBufferExecuteInvalidations(rb, txn);
1839 
1840 		if (using_subtxn)
1841 			RollbackAndReleaseCurrentSubTransaction();
1842 
1843 		if (snapshot_now->copied)
1844 			ReorderBufferFreeSnap(rb, snapshot_now);
1845 
1846 		/* remove potential on-disk data, and deallocate */
1847 		ReorderBufferCleanupTXN(rb, txn);
1848 	}
1849 	PG_CATCH();
1850 	{
1851 		/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1852 		if (iterstate)
1853 			ReorderBufferIterTXNFinish(rb, iterstate);
1854 
1855 		TeardownHistoricSnapshot(true);
1856 
1857 		/*
1858 		 * Force cache invalidation to happen outside of a valid transaction
1859 		 * to prevent catalog access as we just caught an error.
1860 		 */
1861 		AbortCurrentTransaction();
1862 
1863 		/* make sure there's no cache pollution */
1864 		ReorderBufferExecuteInvalidations(rb, txn);
1865 
1866 		if (using_subtxn)
1867 			RollbackAndReleaseCurrentSubTransaction();
1868 
1869 		if (snapshot_now->copied)
1870 			ReorderBufferFreeSnap(rb, snapshot_now);
1871 
1872 		/* remove potential on-disk data, and deallocate */
1873 		ReorderBufferCleanupTXN(rb, txn);
1874 
1875 		PG_RE_THROW();
1876 	}
1877 	PG_END_TRY();
1878 }
1879 
1880 /*
1881  * Abort a transaction that possibly has previous changes. Needs to be first
1882  * called for subtransactions and then for the toplevel xid.
1883  *
1884  * NB: Transactions handled here have to have actively aborted (i.e. have
1885  * produced an abort record). Implicitly aborted transactions are handled via
1886  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1887  * which have committed are handled in ReorderBufferForget().
1888  *
1889  * This function purges this transaction and its contents from memory and
1890  * disk.
1891  */
1892 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1893 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1894 {
1895 	ReorderBufferTXN *txn;
1896 
1897 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1898 								false);
1899 
1900 	/* unknown, nothing to remove */
1901 	if (txn == NULL)
1902 		return;
1903 
1904 	/* cosmetic... */
1905 	txn->final_lsn = lsn;
1906 
1907 	/* remove potential on-disk data, and deallocate */
1908 	ReorderBufferCleanupTXN(rb, txn);
1909 }
1910 
1911 /*
1912  * Abort all transactions that aren't actually running anymore because the
1913  * server restarted.
1914  *
1915  * NB: These really have to be transactions that have aborted due to a server
1916  * crash/immediate restart, as we don't deal with invalidations here.
1917  */
1918 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)1919 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1920 {
1921 	dlist_mutable_iter it;
1922 
1923 	/*
1924 	 * Iterate through all (potential) toplevel TXNs and abort all that are
1925 	 * older than what possibly can be running. Once we've found the first
1926 	 * that is alive we stop, there might be some that acquired an xid earlier
1927 	 * but started writing later, but it's unlikely and they will be cleaned
1928 	 * up in a later call to this function.
1929 	 */
1930 	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1931 	{
1932 		ReorderBufferTXN *txn;
1933 
1934 		txn = dlist_container(ReorderBufferTXN, node, it.cur);
1935 
1936 		if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1937 		{
1938 			elog(DEBUG2, "aborting old transaction %u", txn->xid);
1939 
1940 			/* remove potential on-disk data, and deallocate this tx */
1941 			ReorderBufferCleanupTXN(rb, txn);
1942 		}
1943 		else
1944 			return;
1945 	}
1946 }
1947 
1948 /*
1949  * Forget the contents of a transaction if we aren't interested in its
1950  * contents. Needs to be first called for subtransactions and then for the
1951  * toplevel xid.
1952  *
1953  * This is significantly different to ReorderBufferAbort() because
1954  * transactions that have committed need to be treated differently from aborted
1955  * ones since they may have modified the catalog.
1956  *
1957  * Note that this is only allowed to be called in the moment a transaction
1958  * commit has just been read, not earlier; otherwise later records referring
1959  * to this xid might re-create the transaction incompletely.
1960  */
1961 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1962 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1963 {
1964 	ReorderBufferTXN *txn;
1965 
1966 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1967 								false);
1968 
1969 	/* unknown, nothing to forget */
1970 	if (txn == NULL)
1971 		return;
1972 
1973 	/* cosmetic... */
1974 	txn->final_lsn = lsn;
1975 
1976 	/*
1977 	 * Process cache invalidation messages if there are any. Even if we're not
1978 	 * interested in the transaction's contents, it could have manipulated the
1979 	 * catalog and we need to update the caches according to that.
1980 	 */
1981 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1982 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1983 										   txn->invalidations);
1984 	else
1985 		Assert(txn->ninvalidations == 0);
1986 
1987 	/* remove potential on-disk data, and deallocate */
1988 	ReorderBufferCleanupTXN(rb, txn);
1989 }
1990 
1991 /*
1992  * Execute invalidations happening outside the context of a decoded
1993  * transaction. That currently happens either for xid-less commits
1994  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
1995  * transactions (via ReorderBufferForget()).
1996  */
1997 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)1998 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1999 								   SharedInvalidationMessage *invalidations)
2000 {
2001 	bool		use_subtxn = IsTransactionOrTransactionBlock();
2002 	int			i;
2003 
2004 	if (use_subtxn)
2005 		BeginInternalSubTransaction("replay");
2006 
2007 	/*
2008 	 * Force invalidations to happen outside of a valid transaction - that way
2009 	 * entries will just be marked as invalid without accessing the catalog.
2010 	 * That's advantageous because we don't need to setup the full state
2011 	 * necessary for catalog access.
2012 	 */
2013 	if (use_subtxn)
2014 		AbortCurrentTransaction();
2015 
2016 	for (i = 0; i < ninvalidations; i++)
2017 		LocalExecuteInvalidationMessage(&invalidations[i]);
2018 
2019 	if (use_subtxn)
2020 		RollbackAndReleaseCurrentSubTransaction();
2021 }
2022 
2023 /*
2024  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2025  * least once for every xid in XLogRecord->xl_xid (other places in records
2026  * may, but do not have to be passed through here).
2027  *
2028  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2029  * for efficiency. To do that it has to know about when transactions are seen
2030  * first in the WAL. As many types of records are not actually interesting for
2031  * logical decoding, they do not necessarily pass though here.
2032  */
2033 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2034 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2035 {
2036 	/* many records won't have an xid assigned, centralize check here */
2037 	if (xid != InvalidTransactionId)
2038 		ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2039 }
2040 
2041 /*
2042  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2043  * because the previous snapshot doesn't describe the catalog correctly for
2044  * following rows.
2045  */
2046 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2047 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
2048 						 XLogRecPtr lsn, Snapshot snap)
2049 {
2050 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2051 
2052 	change->data.snapshot = snap;
2053 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
2054 
2055 	ReorderBufferQueueChange(rb, xid, lsn, change);
2056 }
2057 
2058 /*
2059  * Set up the transaction's base snapshot.
2060  *
2061  * If we know that xid is a subtransaction, set the base snapshot on the
2062  * top-level transaction instead.
2063  */
2064 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2065 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
2066 							 XLogRecPtr lsn, Snapshot snap)
2067 {
2068 	ReorderBufferTXN *txn;
2069 	bool		is_new;
2070 
2071 	AssertArg(snap != NULL);
2072 
2073 	/*
2074 	 * Fetch the transaction to operate on.  If we know it's a subtransaction,
2075 	 * operate on its top-level transaction instead.
2076 	 */
2077 	txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2078 	if (txn->is_known_as_subxact)
2079 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2080 									NULL, InvalidXLogRecPtr, false);
2081 	Assert(txn->base_snapshot == NULL);
2082 
2083 	txn->base_snapshot = snap;
2084 	txn->base_snapshot_lsn = lsn;
2085 	dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
2086 
2087 	AssertTXNLsnOrder(rb);
2088 }
2089 
2090 /*
2091  * Access the catalog with this CommandId at this point in the changestream.
2092  *
2093  * May only be called for command ids > 1
2094  */
2095 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)2096 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
2097 							 XLogRecPtr lsn, CommandId cid)
2098 {
2099 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2100 
2101 	change->data.command_id = cid;
2102 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
2103 
2104 	ReorderBufferQueueChange(rb, xid, lsn, change);
2105 }
2106 
2107 
2108 /*
2109  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2110  */
2111 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)2112 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
2113 							 XLogRecPtr lsn, RelFileNode node,
2114 							 ItemPointerData tid, CommandId cmin,
2115 							 CommandId cmax, CommandId combocid)
2116 {
2117 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2118 	ReorderBufferTXN *txn;
2119 
2120 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2121 
2122 	change->data.tuplecid.node = node;
2123 	change->data.tuplecid.tid = tid;
2124 	change->data.tuplecid.cmin = cmin;
2125 	change->data.tuplecid.cmax = cmax;
2126 	change->data.tuplecid.combocid = combocid;
2127 	change->lsn = lsn;
2128 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
2129 
2130 	dlist_push_tail(&txn->tuplecids, &change->node);
2131 	txn->ntuplecids++;
2132 }
2133 
2134 /*
2135  * Setup the invalidation of the toplevel transaction.
2136  *
2137  * This needs to be done before ReorderBufferCommit is called!
2138  */
2139 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)2140 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
2141 							  XLogRecPtr lsn, Size nmsgs,
2142 							  SharedInvalidationMessage *msgs)
2143 {
2144 	ReorderBufferTXN *txn;
2145 
2146 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2147 
2148 	if (txn->ninvalidations != 0)
2149 		elog(ERROR, "only ever add one set of invalidations");
2150 
2151 	Assert(nmsgs > 0);
2152 
2153 	txn->ninvalidations = nmsgs;
2154 	txn->invalidations = (SharedInvalidationMessage *)
2155 		MemoryContextAlloc(rb->context,
2156 						   sizeof(SharedInvalidationMessage) * nmsgs);
2157 	memcpy(txn->invalidations, msgs,
2158 		   sizeof(SharedInvalidationMessage) * nmsgs);
2159 }
2160 
2161 /*
2162  * Apply all invalidations we know. Possibly we only need parts at this point
2163  * in the changestream but we don't know which those are.
2164  */
2165 static void
ReorderBufferExecuteInvalidations(ReorderBuffer * rb,ReorderBufferTXN * txn)2166 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2167 {
2168 	int			i;
2169 
2170 	for (i = 0; i < txn->ninvalidations; i++)
2171 		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2172 }
2173 
2174 /*
2175  * Mark a transaction as containing catalog changes
2176  */
2177 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2178 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
2179 								  XLogRecPtr lsn)
2180 {
2181 	ReorderBufferTXN *txn;
2182 
2183 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2184 
2185 	txn->has_catalog_changes = true;
2186 }
2187 
2188 /*
2189  * Query whether a transaction is already *known* to contain catalog
2190  * changes. This can be wrong until directly before the commit!
2191  */
2192 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)2193 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
2194 {
2195 	ReorderBufferTXN *txn;
2196 
2197 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2198 								false);
2199 	if (txn == NULL)
2200 		return false;
2201 
2202 	return txn->has_catalog_changes;
2203 }
2204 
2205 /*
2206  * ReorderBufferXidHasBaseSnapshot
2207  *		Have we already set the base snapshot for the given txn/subtxn?
2208  */
2209 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)2210 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
2211 {
2212 	ReorderBufferTXN *txn;
2213 
2214 	txn = ReorderBufferTXNByXid(rb, xid, false,
2215 								NULL, InvalidXLogRecPtr, false);
2216 
2217 	/* transaction isn't known yet, ergo no snapshot */
2218 	if (txn == NULL)
2219 		return false;
2220 
2221 	/* a known subtxn? operate on top-level txn instead */
2222 	if (txn->is_known_as_subxact)
2223 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2224 									NULL, InvalidXLogRecPtr, false);
2225 
2226 	return txn->base_snapshot != NULL;
2227 }
2228 
2229 
2230 /*
2231  * ---------------------------------------
2232  * Disk serialization support
2233  * ---------------------------------------
2234  */
2235 
2236 /*
2237  * Ensure the IO buffer is >= sz.
2238  */
2239 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)2240 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2241 {
2242 	if (!rb->outbufsize)
2243 	{
2244 		rb->outbuf = MemoryContextAlloc(rb->context, sz);
2245 		rb->outbufsize = sz;
2246 	}
2247 	else if (rb->outbufsize < sz)
2248 	{
2249 		rb->outbuf = repalloc(rb->outbuf, sz);
2250 		rb->outbufsize = sz;
2251 	}
2252 }
2253 
2254 /*
2255  * Check whether the transaction tx should spill its data to disk.
2256  */
2257 static void
ReorderBufferCheckSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2258 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2259 {
2260 	/*
2261 	 * TODO: improve accounting so we cheaply can take subtransactions into
2262 	 * account here.
2263 	 */
2264 	if (txn->nentries_mem >= max_changes_in_memory)
2265 	{
2266 		ReorderBufferSerializeTXN(rb, txn);
2267 		Assert(txn->nentries_mem == 0);
2268 	}
2269 }
2270 
2271 /*
2272  * Spill data of a large transaction (and its subtransactions) to disk.
2273  */
2274 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2275 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2276 {
2277 	dlist_iter	subtxn_i;
2278 	dlist_mutable_iter change_i;
2279 	int			fd = -1;
2280 	XLogSegNo	curOpenSegNo = 0;
2281 	Size		spilled = 0;
2282 
2283 	elog(DEBUG2, "spill %u changes in XID %u to disk",
2284 		 (uint32) txn->nentries_mem, txn->xid);
2285 
2286 	/* do the same to all child TXs */
2287 	dlist_foreach(subtxn_i, &txn->subtxns)
2288 	{
2289 		ReorderBufferTXN *subtxn;
2290 
2291 		subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2292 		ReorderBufferSerializeTXN(rb, subtxn);
2293 	}
2294 
2295 	/* serialize changestream */
2296 	dlist_foreach_modify(change_i, &txn->changes)
2297 	{
2298 		ReorderBufferChange *change;
2299 
2300 		change = dlist_container(ReorderBufferChange, node, change_i.cur);
2301 
2302 		/*
2303 		 * store in segment in which it belongs by start lsn, don't split over
2304 		 * multiple segments tho
2305 		 */
2306 		if (fd == -1 ||
2307 			!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2308 		{
2309 			char		path[MAXPGPATH];
2310 
2311 			if (fd != -1)
2312 				CloseTransientFile(fd);
2313 
2314 			XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2315 
2316 			/*
2317 			 * No need to care about TLIs here, only used during a single run,
2318 			 * so each LSN only maps to a specific WAL record.
2319 			 */
2320 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2321 										curOpenSegNo);
2322 
2323 			/* open segment, create it if necessary */
2324 			fd = OpenTransientFile(path,
2325 								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2326 
2327 			if (fd < 0)
2328 				ereport(ERROR,
2329 						(errcode_for_file_access(),
2330 						 errmsg("could not open file \"%s\": %m", path)));
2331 		}
2332 
2333 		ReorderBufferSerializeChange(rb, txn, fd, change);
2334 		dlist_delete(&change->node);
2335 		ReorderBufferReturnChange(rb, change);
2336 
2337 		spilled++;
2338 	}
2339 
2340 	Assert(spilled == txn->nentries_mem);
2341 	Assert(dlist_is_empty(&txn->changes));
2342 	txn->nentries_mem = 0;
2343 	txn->serialized = true;
2344 
2345 	if (fd != -1)
2346 		CloseTransientFile(fd);
2347 }
2348 
2349 /*
2350  * Serialize individual change to disk.
2351  */
2352 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)2353 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2354 							 int fd, ReorderBufferChange *change)
2355 {
2356 	ReorderBufferDiskChange *ondisk;
2357 	Size		sz = sizeof(ReorderBufferDiskChange);
2358 
2359 	ReorderBufferSerializeReserve(rb, sz);
2360 
2361 	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2362 	memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2363 
2364 	switch (change->action)
2365 	{
2366 			/* fall through these, they're all similar enough */
2367 		case REORDER_BUFFER_CHANGE_INSERT:
2368 		case REORDER_BUFFER_CHANGE_UPDATE:
2369 		case REORDER_BUFFER_CHANGE_DELETE:
2370 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2371 			{
2372 				char	   *data;
2373 				ReorderBufferTupleBuf *oldtup,
2374 						   *newtup;
2375 				Size		oldlen = 0;
2376 				Size		newlen = 0;
2377 
2378 				oldtup = change->data.tp.oldtuple;
2379 				newtup = change->data.tp.newtuple;
2380 
2381 				if (oldtup)
2382 				{
2383 					sz += sizeof(HeapTupleData);
2384 					oldlen = oldtup->tuple.t_len;
2385 					sz += oldlen;
2386 				}
2387 
2388 				if (newtup)
2389 				{
2390 					sz += sizeof(HeapTupleData);
2391 					newlen = newtup->tuple.t_len;
2392 					sz += newlen;
2393 				}
2394 
2395 				/* make sure we have enough space */
2396 				ReorderBufferSerializeReserve(rb, sz);
2397 
2398 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2399 				/* might have been reallocated above */
2400 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2401 
2402 				if (oldlen)
2403 				{
2404 					memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2405 					data += sizeof(HeapTupleData);
2406 
2407 					memcpy(data, oldtup->tuple.t_data, oldlen);
2408 					data += oldlen;
2409 				}
2410 
2411 				if (newlen)
2412 				{
2413 					memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2414 					data += sizeof(HeapTupleData);
2415 
2416 					memcpy(data, newtup->tuple.t_data, newlen);
2417 					data += newlen;
2418 				}
2419 				break;
2420 			}
2421 		case REORDER_BUFFER_CHANGE_MESSAGE:
2422 			{
2423 				char	   *data;
2424 				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
2425 
2426 				sz += prefix_size + change->data.msg.message_size +
2427 					sizeof(Size) + sizeof(Size);
2428 				ReorderBufferSerializeReserve(rb, sz);
2429 
2430 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2431 
2432 				/* might have been reallocated above */
2433 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2434 
2435 				/* write the prefix including the size */
2436 				memcpy(data, &prefix_size, sizeof(Size));
2437 				data += sizeof(Size);
2438 				memcpy(data, change->data.msg.prefix,
2439 					   prefix_size);
2440 				data += prefix_size;
2441 
2442 				/* write the message including the size */
2443 				memcpy(data, &change->data.msg.message_size, sizeof(Size));
2444 				data += sizeof(Size);
2445 				memcpy(data, change->data.msg.message,
2446 					   change->data.msg.message_size);
2447 				data += change->data.msg.message_size;
2448 
2449 				break;
2450 			}
2451 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2452 			{
2453 				Snapshot	snap;
2454 				char	   *data;
2455 
2456 				snap = change->data.snapshot;
2457 
2458 				sz += sizeof(SnapshotData) +
2459 					sizeof(TransactionId) * snap->xcnt +
2460 					sizeof(TransactionId) * snap->subxcnt;
2461 
2462 				/* make sure we have enough space */
2463 				ReorderBufferSerializeReserve(rb, sz);
2464 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2465 				/* might have been reallocated above */
2466 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2467 
2468 				memcpy(data, snap, sizeof(SnapshotData));
2469 				data += sizeof(SnapshotData);
2470 
2471 				if (snap->xcnt)
2472 				{
2473 					memcpy(data, snap->xip,
2474 						   sizeof(TransactionId) * snap->xcnt);
2475 					data += sizeof(TransactionId) * snap->xcnt;
2476 				}
2477 
2478 				if (snap->subxcnt)
2479 				{
2480 					memcpy(data, snap->subxip,
2481 						   sizeof(TransactionId) * snap->subxcnt);
2482 					data += sizeof(TransactionId) * snap->subxcnt;
2483 				}
2484 				break;
2485 			}
2486 		case REORDER_BUFFER_CHANGE_TRUNCATE:
2487 			{
2488 				Size		size;
2489 				char	   *data;
2490 
2491 				/* account for the OIDs of truncated relations */
2492 				size = sizeof(Oid) * change->data.truncate.nrelids;
2493 				sz += size;
2494 
2495 				/* make sure we have enough space */
2496 				ReorderBufferSerializeReserve(rb, sz);
2497 
2498 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2499 				/* might have been reallocated above */
2500 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2501 
2502 				memcpy(data, change->data.truncate.relids, size);
2503 				data += size;
2504 
2505 				break;
2506 			}
2507 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2508 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2509 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2510 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2511 			/* ReorderBufferChange contains everything important */
2512 			break;
2513 	}
2514 
2515 	ondisk->size = sz;
2516 
2517 	errno = 0;
2518 	pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2519 	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2520 	{
2521 		int			save_errno = errno;
2522 
2523 		CloseTransientFile(fd);
2524 
2525 		/* if write didn't set errno, assume problem is no disk space */
2526 		errno = save_errno ? save_errno : ENOSPC;
2527 		ereport(ERROR,
2528 				(errcode_for_file_access(),
2529 				 errmsg("could not write to data file for XID %u: %m",
2530 						txn->xid)));
2531 	}
2532 	pgstat_report_wait_end();
2533 
2534 	/*
2535 	 * Keep the transaction's final_lsn up to date with each change we send to
2536 	 * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
2537 	 * only do this on commit and abort records, but that doesn't work if a
2538 	 * system crash leaves a transaction without its abort record).
2539 	 *
2540 	 * Make sure not to move it backwards.
2541 	 */
2542 	if (txn->final_lsn < change->lsn)
2543 		txn->final_lsn = change->lsn;
2544 
2545 	Assert(ondisk->change.action == change->action);
2546 }
2547 
2548 /*
2549  * Restore a number of changes spilled to disk back into memory.
2550  */
2551 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,TXNEntryFile * file,XLogSegNo * segno)2552 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2553 							TXNEntryFile *file, XLogSegNo *segno)
2554 {
2555 	Size		restored = 0;
2556 	XLogSegNo	last_segno;
2557 	dlist_mutable_iter cleanup_iter;
2558 	File	   *fd = &file->vfd;
2559 
2560 	Assert(txn->first_lsn != InvalidXLogRecPtr);
2561 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2562 
2563 	/* free current entries, so we have memory for more */
2564 	dlist_foreach_modify(cleanup_iter, &txn->changes)
2565 	{
2566 		ReorderBufferChange *cleanup =
2567 		dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2568 
2569 		dlist_delete(&cleanup->node);
2570 		ReorderBufferReturnChange(rb, cleanup);
2571 	}
2572 	txn->nentries_mem = 0;
2573 	Assert(dlist_is_empty(&txn->changes));
2574 
2575 	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2576 
2577 	while (restored < max_changes_in_memory && *segno <= last_segno)
2578 	{
2579 		int			readBytes;
2580 		ReorderBufferDiskChange *ondisk;
2581 
2582 		if (*fd == -1)
2583 		{
2584 			char		path[MAXPGPATH];
2585 
2586 			/* first time in */
2587 			if (*segno == 0)
2588 				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2589 
2590 			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2591 
2592 			/*
2593 			 * No need to care about TLIs here, only used during a single run,
2594 			 * so each LSN only maps to a specific WAL record.
2595 			 */
2596 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2597 										*segno);
2598 
2599 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
2600 
2601 			/* No harm in resetting the offset even in case of failure */
2602 			file->curOffset = 0;
2603 
2604 			if (*fd < 0 && errno == ENOENT)
2605 			{
2606 				*fd = -1;
2607 				(*segno)++;
2608 				continue;
2609 			}
2610 			else if (*fd < 0)
2611 				ereport(ERROR,
2612 						(errcode_for_file_access(),
2613 						 errmsg("could not open file \"%s\": %m",
2614 								path)));
2615 		}
2616 
2617 		/*
2618 		 * Read the statically sized part of a change which has information
2619 		 * about the total size. If we couldn't read a record, we're at the
2620 		 * end of this file.
2621 		 */
2622 		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2623 		readBytes = FileRead(file->vfd, rb->outbuf,
2624 							 sizeof(ReorderBufferDiskChange),
2625 							 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
2626 
2627 		/* eof */
2628 		if (readBytes == 0)
2629 		{
2630 			FileClose(*fd);
2631 			*fd = -1;
2632 			(*segno)++;
2633 			continue;
2634 		}
2635 		else if (readBytes < 0)
2636 			ereport(ERROR,
2637 					(errcode_for_file_access(),
2638 					 errmsg("could not read from reorderbuffer spill file: %m")));
2639 		else if (readBytes != sizeof(ReorderBufferDiskChange))
2640 			ereport(ERROR,
2641 					(errcode_for_file_access(),
2642 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2643 							readBytes,
2644 							(uint32) sizeof(ReorderBufferDiskChange))));
2645 
2646 		file->curOffset += readBytes;
2647 
2648 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2649 
2650 		ReorderBufferSerializeReserve(rb,
2651 									  sizeof(ReorderBufferDiskChange) + ondisk->size);
2652 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2653 
2654 		readBytes = FileRead(file->vfd,
2655 							 rb->outbuf + sizeof(ReorderBufferDiskChange),
2656 							 ondisk->size - sizeof(ReorderBufferDiskChange),
2657 							 file->curOffset,
2658 							 WAIT_EVENT_REORDER_BUFFER_READ);
2659 
2660 		if (readBytes < 0)
2661 			ereport(ERROR,
2662 					(errcode_for_file_access(),
2663 					 errmsg("could not read from reorderbuffer spill file: %m")));
2664 		else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2665 			ereport(ERROR,
2666 					(errcode_for_file_access(),
2667 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2668 							readBytes,
2669 							(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2670 
2671 		file->curOffset += readBytes;
2672 
2673 		/*
2674 		 * ok, read a full change from disk, now restore it into proper
2675 		 * in-memory format
2676 		 */
2677 		ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2678 		restored++;
2679 	}
2680 
2681 	return restored;
2682 }
2683 
2684 /*
2685  * Convert change from its on-disk format to in-memory format and queue it onto
2686  * the TXN's ->changes list.
2687  *
2688  * Note: although "data" is declared char*, at entry it points to a
2689  * maxalign'd buffer, making it safe in most of this function to assume
2690  * that the pointed-to data is suitably aligned for direct access.
2691  */
2692 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)2693 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2694 						   char *data)
2695 {
2696 	ReorderBufferDiskChange *ondisk;
2697 	ReorderBufferChange *change;
2698 
2699 	ondisk = (ReorderBufferDiskChange *) data;
2700 
2701 	change = ReorderBufferGetChange(rb);
2702 
2703 	/* copy static part */
2704 	memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2705 
2706 	data += sizeof(ReorderBufferDiskChange);
2707 
2708 	/* restore individual stuff */
2709 	switch (change->action)
2710 	{
2711 			/* fall through these, they're all similar enough */
2712 		case REORDER_BUFFER_CHANGE_INSERT:
2713 		case REORDER_BUFFER_CHANGE_UPDATE:
2714 		case REORDER_BUFFER_CHANGE_DELETE:
2715 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2716 			if (change->data.tp.oldtuple)
2717 			{
2718 				uint32		tuplelen = ((HeapTuple) data)->t_len;
2719 
2720 				change->data.tp.oldtuple =
2721 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2722 
2723 				/* restore ->tuple */
2724 				memcpy(&change->data.tp.oldtuple->tuple, data,
2725 					   sizeof(HeapTupleData));
2726 				data += sizeof(HeapTupleData);
2727 
2728 				/* reset t_data pointer into the new tuplebuf */
2729 				change->data.tp.oldtuple->tuple.t_data =
2730 					ReorderBufferTupleBufData(change->data.tp.oldtuple);
2731 
2732 				/* restore tuple data itself */
2733 				memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2734 				data += tuplelen;
2735 			}
2736 
2737 			if (change->data.tp.newtuple)
2738 			{
2739 				/* here, data might not be suitably aligned! */
2740 				uint32		tuplelen;
2741 
2742 				memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2743 					   sizeof(uint32));
2744 
2745 				change->data.tp.newtuple =
2746 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2747 
2748 				/* restore ->tuple */
2749 				memcpy(&change->data.tp.newtuple->tuple, data,
2750 					   sizeof(HeapTupleData));
2751 				data += sizeof(HeapTupleData);
2752 
2753 				/* reset t_data pointer into the new tuplebuf */
2754 				change->data.tp.newtuple->tuple.t_data =
2755 					ReorderBufferTupleBufData(change->data.tp.newtuple);
2756 
2757 				/* restore tuple data itself */
2758 				memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2759 				data += tuplelen;
2760 			}
2761 
2762 			break;
2763 		case REORDER_BUFFER_CHANGE_MESSAGE:
2764 			{
2765 				Size		prefix_size;
2766 
2767 				/* read prefix */
2768 				memcpy(&prefix_size, data, sizeof(Size));
2769 				data += sizeof(Size);
2770 				change->data.msg.prefix = MemoryContextAlloc(rb->context,
2771 															 prefix_size);
2772 				memcpy(change->data.msg.prefix, data, prefix_size);
2773 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2774 				data += prefix_size;
2775 
2776 				/* read the message */
2777 				memcpy(&change->data.msg.message_size, data, sizeof(Size));
2778 				data += sizeof(Size);
2779 				change->data.msg.message = MemoryContextAlloc(rb->context,
2780 															  change->data.msg.message_size);
2781 				memcpy(change->data.msg.message, data,
2782 					   change->data.msg.message_size);
2783 				data += change->data.msg.message_size;
2784 
2785 				break;
2786 			}
2787 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2788 			{
2789 				Snapshot	oldsnap;
2790 				Snapshot	newsnap;
2791 				Size		size;
2792 
2793 				oldsnap = (Snapshot) data;
2794 
2795 				size = sizeof(SnapshotData) +
2796 					sizeof(TransactionId) * oldsnap->xcnt +
2797 					sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2798 
2799 				change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2800 
2801 				newsnap = change->data.snapshot;
2802 
2803 				memcpy(newsnap, data, size);
2804 				newsnap->xip = (TransactionId *)
2805 					(((char *) newsnap) + sizeof(SnapshotData));
2806 				newsnap->subxip = newsnap->xip + newsnap->xcnt;
2807 				newsnap->copied = true;
2808 				break;
2809 			}
2810 			/* the base struct contains all the data, easy peasy */
2811 		case REORDER_BUFFER_CHANGE_TRUNCATE:
2812 			{
2813 				Oid		   *relids;
2814 
2815 				relids = ReorderBufferGetRelids(rb,
2816 												change->data.truncate.nrelids);
2817 				memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
2818 				change->data.truncate.relids = relids;
2819 
2820 				break;
2821 			}
2822 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2823 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2824 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2825 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2826 			break;
2827 	}
2828 
2829 	dlist_push_tail(&txn->changes, &change->node);
2830 	txn->nentries_mem++;
2831 }
2832 
2833 /*
2834  * Remove all on-disk stored for the passed in transaction.
2835  */
2836 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)2837 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2838 {
2839 	XLogSegNo	first;
2840 	XLogSegNo	cur;
2841 	XLogSegNo	last;
2842 
2843 	Assert(txn->first_lsn != InvalidXLogRecPtr);
2844 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2845 
2846 	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2847 	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2848 
2849 	/* iterate over all possible filenames, and delete them */
2850 	for (cur = first; cur <= last; cur++)
2851 	{
2852 		char		path[MAXPGPATH];
2853 
2854 		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
2855 		if (unlink(path) != 0 && errno != ENOENT)
2856 			ereport(ERROR,
2857 					(errcode_for_file_access(),
2858 					 errmsg("could not remove file \"%s\": %m", path)));
2859 	}
2860 }
2861 
2862 /*
2863  * Remove any leftover serialized reorder buffers from a slot directory after a
2864  * prior crash or decoding session exit.
2865  */
2866 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)2867 ReorderBufferCleanupSerializedTXNs(const char *slotname)
2868 {
2869 	DIR		   *spill_dir;
2870 	struct dirent *spill_de;
2871 	struct stat statbuf;
2872 	char		path[MAXPGPATH * 2 + 12];
2873 
2874 	sprintf(path, "pg_replslot/%s", slotname);
2875 
2876 	/* we're only handling directories here, skip if it's not ours */
2877 	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2878 		return;
2879 
2880 	spill_dir = AllocateDir(path);
2881 	while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2882 	{
2883 		/* only look at names that can be ours */
2884 		if (strncmp(spill_de->d_name, "xid", 3) == 0)
2885 		{
2886 			snprintf(path, sizeof(path),
2887 					 "pg_replslot/%s/%s", slotname,
2888 					 spill_de->d_name);
2889 
2890 			if (unlink(path) != 0)
2891 				ereport(ERROR,
2892 						(errcode_for_file_access(),
2893 						 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
2894 								path, slotname)));
2895 		}
2896 	}
2897 	FreeDir(spill_dir);
2898 }
2899 
2900 /*
2901  * Given a replication slot, transaction ID and segment number, fill in the
2902  * corresponding spill file into 'path', which is a caller-owned buffer of size
2903  * at least MAXPGPATH.
2904  */
2905 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)2906 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2907 							XLogSegNo segno)
2908 {
2909 	XLogRecPtr	recptr;
2910 
2911 	XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
2912 
2913 	snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
2914 			 NameStr(MyReplicationSlot->data.name),
2915 			 xid,
2916 			 (uint32) (recptr >> 32), (uint32) recptr);
2917 }
2918 
2919 /*
2920  * Delete all data spilled to disk after we've restarted/crashed. It will be
2921  * recreated when the respective slots are reused.
2922  */
2923 void
StartupReorderBuffer(void)2924 StartupReorderBuffer(void)
2925 {
2926 	DIR		   *logical_dir;
2927 	struct dirent *logical_de;
2928 
2929 	logical_dir = AllocateDir("pg_replslot");
2930 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2931 	{
2932 		if (strcmp(logical_de->d_name, ".") == 0 ||
2933 			strcmp(logical_de->d_name, "..") == 0)
2934 			continue;
2935 
2936 		/* if it cannot be a slot, skip the directory */
2937 		if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2938 			continue;
2939 
2940 		/*
2941 		 * ok, has to be a surviving logical slot, iterate and delete
2942 		 * everything starting with xid-*
2943 		 */
2944 		ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
2945 	}
2946 	FreeDir(logical_dir);
2947 }
2948 
2949 /* ---------------------------------------
2950  * toast reassembly support
2951  * ---------------------------------------
2952  */
2953 
2954 /*
2955  * Initialize per tuple toast reconstruction support.
2956  */
2957 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)2958 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2959 {
2960 	HASHCTL		hash_ctl;
2961 
2962 	Assert(txn->toast_hash == NULL);
2963 
2964 	memset(&hash_ctl, 0, sizeof(hash_ctl));
2965 	hash_ctl.keysize = sizeof(Oid);
2966 	hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2967 	hash_ctl.hcxt = rb->context;
2968 	txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2969 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2970 }
2971 
2972 /*
2973  * Per toast-chunk handling for toast reconstruction
2974  *
2975  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2976  * toasted Datum comes along.
2977  */
2978 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)2979 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2980 							  Relation relation, ReorderBufferChange *change)
2981 {
2982 	ReorderBufferToastEnt *ent;
2983 	ReorderBufferTupleBuf *newtup;
2984 	bool		found;
2985 	int32		chunksize;
2986 	bool		isnull;
2987 	Pointer		chunk;
2988 	TupleDesc	desc = RelationGetDescr(relation);
2989 	Oid			chunk_id;
2990 	int32		chunk_seq;
2991 
2992 	if (txn->toast_hash == NULL)
2993 		ReorderBufferToastInitHash(rb, txn);
2994 
2995 	Assert(IsToastRelation(relation));
2996 
2997 	newtup = change->data.tp.newtuple;
2998 	chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2999 	Assert(!isnull);
3000 	chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
3001 	Assert(!isnull);
3002 
3003 	ent = (ReorderBufferToastEnt *)
3004 		hash_search(txn->toast_hash,
3005 					(void *) &chunk_id,
3006 					HASH_ENTER,
3007 					&found);
3008 
3009 	if (!found)
3010 	{
3011 		Assert(ent->chunk_id == chunk_id);
3012 		ent->num_chunks = 0;
3013 		ent->last_chunk_seq = 0;
3014 		ent->size = 0;
3015 		ent->reconstructed = NULL;
3016 		dlist_init(&ent->chunks);
3017 
3018 		if (chunk_seq != 0)
3019 			elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3020 				 chunk_seq, chunk_id);
3021 	}
3022 	else if (found && chunk_seq != ent->last_chunk_seq + 1)
3023 		elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3024 			 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3025 
3026 	chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3027 	Assert(!isnull);
3028 
3029 	/* calculate size so we can allocate the right size at once later */
3030 	if (!VARATT_IS_EXTENDED(chunk))
3031 		chunksize = VARSIZE(chunk) - VARHDRSZ;
3032 	else if (VARATT_IS_SHORT(chunk))
3033 		/* could happen due to heap_form_tuple doing its thing */
3034 		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3035 	else
3036 		elog(ERROR, "unexpected type of toast chunk");
3037 
3038 	ent->size += chunksize;
3039 	ent->last_chunk_seq = chunk_seq;
3040 	ent->num_chunks++;
3041 	dlist_push_tail(&ent->chunks, &change->node);
3042 }
3043 
3044 /*
3045  * Rejigger change->newtuple to point to in-memory toast tuples instead to
3046  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3047  *
3048  * We cannot replace unchanged toast tuples though, so those will still point
3049  * to on-disk toast data.
3050  */
3051 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)3052 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
3053 						  Relation relation, ReorderBufferChange *change)
3054 {
3055 	TupleDesc	desc;
3056 	int			natt;
3057 	Datum	   *attrs;
3058 	bool	   *isnull;
3059 	bool	   *free;
3060 	HeapTuple	tmphtup;
3061 	Relation	toast_rel;
3062 	TupleDesc	toast_desc;
3063 	MemoryContext oldcontext;
3064 	ReorderBufferTupleBuf *newtup;
3065 
3066 	/* no toast tuples changed */
3067 	if (txn->toast_hash == NULL)
3068 		return;
3069 
3070 	oldcontext = MemoryContextSwitchTo(rb->context);
3071 
3072 	/* we should only have toast tuples in an INSERT or UPDATE */
3073 	Assert(change->data.tp.newtuple);
3074 
3075 	desc = RelationGetDescr(relation);
3076 
3077 	toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3078 	if (!RelationIsValid(toast_rel))
3079 		elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
3080 			 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
3081 
3082 	toast_desc = RelationGetDescr(toast_rel);
3083 
3084 	/* should we allocate from stack instead? */
3085 	attrs = palloc0(sizeof(Datum) * desc->natts);
3086 	isnull = palloc0(sizeof(bool) * desc->natts);
3087 	free = palloc0(sizeof(bool) * desc->natts);
3088 
3089 	newtup = change->data.tp.newtuple;
3090 
3091 	heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3092 
3093 	for (natt = 0; natt < desc->natts; natt++)
3094 	{
3095 		Form_pg_attribute attr = TupleDescAttr(desc, natt);
3096 		ReorderBufferToastEnt *ent;
3097 		struct varlena *varlena;
3098 
3099 		/* va_rawsize is the size of the original datum -- including header */
3100 		struct varatt_external toast_pointer;
3101 		struct varatt_indirect redirect_pointer;
3102 		struct varlena *new_datum = NULL;
3103 		struct varlena *reconstructed;
3104 		dlist_iter	it;
3105 		Size		data_done = 0;
3106 
3107 		/* system columns aren't toasted */
3108 		if (attr->attnum < 0)
3109 			continue;
3110 
3111 		if (attr->attisdropped)
3112 			continue;
3113 
3114 		/* not a varlena datatype */
3115 		if (attr->attlen != -1)
3116 			continue;
3117 
3118 		/* no data */
3119 		if (isnull[natt])
3120 			continue;
3121 
3122 		/* ok, we know we have a toast datum */
3123 		varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3124 
3125 		/* no need to do anything if the tuple isn't external */
3126 		if (!VARATT_IS_EXTERNAL(varlena))
3127 			continue;
3128 
3129 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3130 
3131 		/*
3132 		 * Check whether the toast tuple changed, replace if so.
3133 		 */
3134 		ent = (ReorderBufferToastEnt *)
3135 			hash_search(txn->toast_hash,
3136 						(void *) &toast_pointer.va_valueid,
3137 						HASH_FIND,
3138 						NULL);
3139 		if (ent == NULL)
3140 			continue;
3141 
3142 		new_datum =
3143 			(struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3144 
3145 		free[natt] = true;
3146 
3147 		reconstructed = palloc0(toast_pointer.va_rawsize);
3148 
3149 		ent->reconstructed = reconstructed;
3150 
3151 		/* stitch toast tuple back together from its parts */
3152 		dlist_foreach(it, &ent->chunks)
3153 		{
3154 			bool		isnull;
3155 			ReorderBufferChange *cchange;
3156 			ReorderBufferTupleBuf *ctup;
3157 			Pointer		chunk;
3158 
3159 			cchange = dlist_container(ReorderBufferChange, node, it.cur);
3160 			ctup = cchange->data.tp.newtuple;
3161 			chunk = DatumGetPointer(
3162 									fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3163 
3164 			Assert(!isnull);
3165 			Assert(!VARATT_IS_EXTERNAL(chunk));
3166 			Assert(!VARATT_IS_SHORT(chunk));
3167 
3168 			memcpy(VARDATA(reconstructed) + data_done,
3169 				   VARDATA(chunk),
3170 				   VARSIZE(chunk) - VARHDRSZ);
3171 			data_done += VARSIZE(chunk) - VARHDRSZ;
3172 		}
3173 		Assert(data_done == toast_pointer.va_extsize);
3174 
3175 		/* make sure its marked as compressed or not */
3176 		if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3177 			SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3178 		else
3179 			SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3180 
3181 		memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3182 		redirect_pointer.pointer = reconstructed;
3183 
3184 		SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
3185 		memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3186 			   sizeof(redirect_pointer));
3187 
3188 		attrs[natt] = PointerGetDatum(new_datum);
3189 	}
3190 
3191 	/*
3192 	 * Build tuple in separate memory & copy tuple back into the tuplebuf
3193 	 * passed to the output plugin. We can't directly heap_fill_tuple() into
3194 	 * the tuplebuf because attrs[] will point back into the current content.
3195 	 */
3196 	tmphtup = heap_form_tuple(desc, attrs, isnull);
3197 	Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3198 	Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3199 
3200 	memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3201 	newtup->tuple.t_len = tmphtup->t_len;
3202 
3203 	/*
3204 	 * free resources we won't further need, more persistent stuff will be
3205 	 * free'd in ReorderBufferToastReset().
3206 	 */
3207 	RelationClose(toast_rel);
3208 	pfree(tmphtup);
3209 	for (natt = 0; natt < desc->natts; natt++)
3210 	{
3211 		if (free[natt])
3212 			pfree(DatumGetPointer(attrs[natt]));
3213 	}
3214 	pfree(attrs);
3215 	pfree(free);
3216 	pfree(isnull);
3217 
3218 	MemoryContextSwitchTo(oldcontext);
3219 }
3220 
3221 /*
3222  * Free all resources allocated for toast reconstruction.
3223  */
3224 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)3225 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
3226 {
3227 	HASH_SEQ_STATUS hstat;
3228 	ReorderBufferToastEnt *ent;
3229 
3230 	if (txn->toast_hash == NULL)
3231 		return;
3232 
3233 	/* sequentially walk over the hash and free everything */
3234 	hash_seq_init(&hstat, txn->toast_hash);
3235 	while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3236 	{
3237 		dlist_mutable_iter it;
3238 
3239 		if (ent->reconstructed != NULL)
3240 			pfree(ent->reconstructed);
3241 
3242 		dlist_foreach_modify(it, &ent->chunks)
3243 		{
3244 			ReorderBufferChange *change =
3245 			dlist_container(ReorderBufferChange, node, it.cur);
3246 
3247 			dlist_delete(&change->node);
3248 			ReorderBufferReturnChange(rb, change);
3249 		}
3250 	}
3251 
3252 	hash_destroy(txn->toast_hash);
3253 	txn->toast_hash = NULL;
3254 }
3255 
3256 
3257 /* ---------------------------------------
3258  * Visibility support for logical decoding
3259  *
3260  *
3261  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3262  * always rely on stored cmin/cmax values because of two scenarios:
3263  *
3264  * * A tuple got changed multiple times during a single transaction and thus
3265  *	 has got a combocid. Combocid's are only valid for the duration of a
3266  *	 single transaction.
3267  * * A tuple with a cmin but no cmax (and thus no combocid) got
3268  *	 deleted/updated in another transaction than the one which created it
3269  *	 which we are looking at right now. As only one of cmin, cmax or combocid
3270  *	 is actually stored in the heap we don't have access to the value we
3271  *	 need anymore.
3272  *
3273  * To resolve those problems we have a per-transaction hash of (cmin,
3274  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3275  * (cmin, cmax) values. That also takes care of combocids by simply
3276  * not caring about them at all. As we have the real cmin/cmax values
3277  * combocids aren't interesting.
3278  *
3279  * As we only care about catalog tuples here the overhead of this
3280  * hashtable should be acceptable.
3281  *
3282  * Heap rewrites complicate this a bit, check rewriteheap.c for
3283  * details.
3284  * -------------------------------------------------------------------------
3285  */
3286 
3287 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
3288 typedef struct RewriteMappingFile
3289 {
3290 	XLogRecPtr	lsn;
3291 	char		fname[MAXPGPATH];
3292 } RewriteMappingFile;
3293 
3294 #if NOT_USED
3295 static void
DisplayMapping(HTAB * tuplecid_data)3296 DisplayMapping(HTAB *tuplecid_data)
3297 {
3298 	HASH_SEQ_STATUS hstat;
3299 	ReorderBufferTupleCidEnt *ent;
3300 
3301 	hash_seq_init(&hstat, tuplecid_data);
3302 	while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3303 	{
3304 		elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3305 			 ent->key.relnode.dbNode,
3306 			 ent->key.relnode.spcNode,
3307 			 ent->key.relnode.relNode,
3308 			 ItemPointerGetBlockNumber(&ent->key.tid),
3309 			 ItemPointerGetOffsetNumber(&ent->key.tid),
3310 			 ent->cmin,
3311 			 ent->cmax
3312 			);
3313 	}
3314 }
3315 #endif
3316 
3317 /*
3318  * Apply a single mapping file to tuplecid_data.
3319  *
3320  * The mapping file has to have been verified to be a) committed b) for our
3321  * transaction c) applied in LSN order.
3322  */
3323 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)3324 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3325 {
3326 	char		path[MAXPGPATH];
3327 	int			fd;
3328 	int			readBytes;
3329 	LogicalRewriteMappingData map;
3330 
3331 	sprintf(path, "pg_logical/mappings/%s", fname);
3332 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3333 	if (fd < 0)
3334 		ereport(ERROR,
3335 				(errcode_for_file_access(),
3336 				 errmsg("could not open file \"%s\": %m", path)));
3337 
3338 	while (true)
3339 	{
3340 		ReorderBufferTupleCidKey key;
3341 		ReorderBufferTupleCidEnt *ent;
3342 		ReorderBufferTupleCidEnt *new_ent;
3343 		bool		found;
3344 
3345 		/* be careful about padding */
3346 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3347 
3348 		/* read all mappings till the end of the file */
3349 		pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3350 		readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3351 		pgstat_report_wait_end();
3352 
3353 		if (readBytes < 0)
3354 			ereport(ERROR,
3355 					(errcode_for_file_access(),
3356 					 errmsg("could not read file \"%s\": %m",
3357 							path)));
3358 		else if (readBytes == 0)	/* EOF */
3359 			break;
3360 		else if (readBytes != sizeof(LogicalRewriteMappingData))
3361 			ereport(ERROR,
3362 					(errcode_for_file_access(),
3363 					 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3364 							path, readBytes,
3365 							(int32) sizeof(LogicalRewriteMappingData))));
3366 
3367 		key.relnode = map.old_node;
3368 		ItemPointerCopy(&map.old_tid,
3369 						&key.tid);
3370 
3371 
3372 		ent = (ReorderBufferTupleCidEnt *)
3373 			hash_search(tuplecid_data,
3374 						(void *) &key,
3375 						HASH_FIND,
3376 						NULL);
3377 
3378 		/* no existing mapping, no need to update */
3379 		if (!ent)
3380 			continue;
3381 
3382 		key.relnode = map.new_node;
3383 		ItemPointerCopy(&map.new_tid,
3384 						&key.tid);
3385 
3386 		new_ent = (ReorderBufferTupleCidEnt *)
3387 			hash_search(tuplecid_data,
3388 						(void *) &key,
3389 						HASH_ENTER,
3390 						&found);
3391 
3392 		if (found)
3393 		{
3394 			/*
3395 			 * Make sure the existing mapping makes sense. We sometime update
3396 			 * old records that did not yet have a cmax (e.g. pg_class' own
3397 			 * entry while rewriting it) during rewrites, so allow that.
3398 			 */
3399 			Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3400 			Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3401 		}
3402 		else
3403 		{
3404 			/* update mapping */
3405 			new_ent->cmin = ent->cmin;
3406 			new_ent->cmax = ent->cmax;
3407 			new_ent->combocid = ent->combocid;
3408 		}
3409 	}
3410 
3411 	if (CloseTransientFile(fd))
3412 		ereport(ERROR,
3413 				(errcode_for_file_access(),
3414 				 errmsg("could not close file \"%s\": %m", path)));
3415 }
3416 
3417 
3418 /*
3419  * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3420  */
3421 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)3422 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3423 {
3424 	return bsearch(&xid, xip, num,
3425 				   sizeof(TransactionId), xidComparator) != NULL;
3426 }
3427 
3428 /*
3429  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3430  */
3431 static int
file_sort_by_lsn(const void * a_p,const void * b_p)3432 file_sort_by_lsn(const void *a_p, const void *b_p)
3433 {
3434 	RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3435 	RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3436 
3437 	if (a->lsn < b->lsn)
3438 		return -1;
3439 	else if (a->lsn > b->lsn)
3440 		return 1;
3441 	return 0;
3442 }
3443 
3444 /*
3445  * Apply any existing logical remapping files if there are any targeted at our
3446  * transaction for relid.
3447  */
3448 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)3449 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3450 {
3451 	DIR		   *mapping_dir;
3452 	struct dirent *mapping_de;
3453 	List	   *files = NIL;
3454 	ListCell   *file;
3455 	RewriteMappingFile **files_a;
3456 	size_t		off;
3457 	Oid			dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3458 
3459 	mapping_dir = AllocateDir("pg_logical/mappings");
3460 	while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3461 	{
3462 		Oid			f_dboid;
3463 		Oid			f_relid;
3464 		TransactionId f_mapped_xid;
3465 		TransactionId f_create_xid;
3466 		XLogRecPtr	f_lsn;
3467 		uint32		f_hi,
3468 					f_lo;
3469 		RewriteMappingFile *f;
3470 
3471 		if (strcmp(mapping_de->d_name, ".") == 0 ||
3472 			strcmp(mapping_de->d_name, "..") == 0)
3473 			continue;
3474 
3475 		/* Ignore files that aren't ours */
3476 		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3477 			continue;
3478 
3479 		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3480 				   &f_dboid, &f_relid, &f_hi, &f_lo,
3481 				   &f_mapped_xid, &f_create_xid) != 6)
3482 			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3483 
3484 		f_lsn = ((uint64) f_hi) << 32 | f_lo;
3485 
3486 		/* mapping for another database */
3487 		if (f_dboid != dboid)
3488 			continue;
3489 
3490 		/* mapping for another relation */
3491 		if (f_relid != relid)
3492 			continue;
3493 
3494 		/* did the creating transaction abort? */
3495 		if (!TransactionIdDidCommit(f_create_xid))
3496 			continue;
3497 
3498 		/* not for our transaction */
3499 		if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3500 			continue;
3501 
3502 		/* ok, relevant, queue for apply */
3503 		f = palloc(sizeof(RewriteMappingFile));
3504 		f->lsn = f_lsn;
3505 		strcpy(f->fname, mapping_de->d_name);
3506 		files = lappend(files, f);
3507 	}
3508 	FreeDir(mapping_dir);
3509 
3510 	/* build array we can easily sort */
3511 	files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3512 	off = 0;
3513 	foreach(file, files)
3514 	{
3515 		files_a[off++] = lfirst(file);
3516 	}
3517 
3518 	/* sort files so we apply them in LSN order */
3519 	qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3520 		  file_sort_by_lsn);
3521 
3522 	for (off = 0; off < list_length(files); off++)
3523 	{
3524 		RewriteMappingFile *f = files_a[off];
3525 
3526 		elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3527 			 snapshot->subxip[0]);
3528 		ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3529 		pfree(f);
3530 	}
3531 }
3532 
3533 /*
3534  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3535  * combocids.
3536  */
3537 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)3538 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3539 							  Snapshot snapshot,
3540 							  HeapTuple htup, Buffer buffer,
3541 							  CommandId *cmin, CommandId *cmax)
3542 {
3543 	ReorderBufferTupleCidKey key;
3544 	ReorderBufferTupleCidEnt *ent;
3545 	ForkNumber	forkno;
3546 	BlockNumber blockno;
3547 	bool		updated_mapping = false;
3548 
3549 	/* be careful about padding */
3550 	memset(&key, 0, sizeof(key));
3551 
3552 	Assert(!BufferIsLocal(buffer));
3553 
3554 	/*
3555 	 * get relfilenode from the buffer, no convenient way to access it other
3556 	 * than that.
3557 	 */
3558 	BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3559 
3560 	/* tuples can only be in the main fork */
3561 	Assert(forkno == MAIN_FORKNUM);
3562 	Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3563 
3564 	ItemPointerCopy(&htup->t_self,
3565 					&key.tid);
3566 
3567 restart:
3568 	ent = (ReorderBufferTupleCidEnt *)
3569 		hash_search(tuplecid_data,
3570 					(void *) &key,
3571 					HASH_FIND,
3572 					NULL);
3573 
3574 	/*
3575 	 * failed to find a mapping, check whether the table was rewritten and
3576 	 * apply mapping if so, but only do that once - there can be no new
3577 	 * mappings while we are in here since we have to hold a lock on the
3578 	 * relation.
3579 	 */
3580 	if (ent == NULL && !updated_mapping)
3581 	{
3582 		UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3583 		/* now check but don't update for a mapping again */
3584 		updated_mapping = true;
3585 		goto restart;
3586 	}
3587 	else if (ent == NULL)
3588 		return false;
3589 
3590 	if (cmin)
3591 		*cmin = ent->cmin;
3592 	if (cmax)
3593 		*cmax = ent->cmax;
3594 	return true;
3595 }
3596