1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  *	  PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  *	  This module gets handed individual pieces of transactions in the order
15  *	  they are written to the WAL and is responsible to reassemble them into
16  *	  toplevel transaction sized pieces. When a transaction is completely
17  *	  reassembled - signalled by reading the transaction commit record - it
18  *	  will then call the output plugin (cf. ReorderBufferCommit()) with the
19  *	  individual changes. The output plugins rely on snapshots built by
20  *	  snapbuild.c which hands them to us.
21  *
22  *	  Transactions and subtransactions/savepoints in postgres are not
23  *	  immediately linked to each other from outside the performing
24  *	  backend. Only at commit/abort (or special xact_assignment records) they
25  *	  are linked together. Which means that we will have to splice together a
26  *	  toplevel transaction from its subtransactions. To do that efficiently we
27  *	  build a binary heap indexed by the smallest current lsn of the individual
28  *	  subtransactions' changestreams. As the individual streams are inherently
29  *	  ordered by LSN - since that is where we build them from - the transaction
30  *	  can easily be reassembled by always using the subtransaction with the
31  *	  smallest current LSN from the heap.
32  *
33  *	  In order to cope with large transactions - which can be several times as
34  *	  big as the available memory - this module supports spooling the contents
35  *	  of a large transactions to disk. When the transaction is replayed the
36  *	  contents of individual (sub-)transactions will be read from disk in
37  *	  chunks.
38  *
39  *	  This module also has to deal with reassembling toast records from the
40  *	  individual chunks stored in WAL. When a new (or initial) version of a
41  *	  tuple is stored in WAL it will always be preceded by the toast chunks
42  *	  emitted for the columns stored out of line. Within a single toplevel
43  *	  transaction there will be no other data carrying records between a row's
44  *	  toast chunks and the row data itself. See ReorderBufferToast* for
45  *	  details.
46  *
47  *	  ReorderBuffer uses two special memory context types - SlabContext for
48  *	  allocations of fixed-length structures (changes and transactions), and
49  *	  GenerationContext for the variable-length transaction data (allocated
50  *	  and freed in groups with similar lifespan).
51  *
52  * -------------------------------------------------------------------------
53  */
54 #include "postgres.h"
55 
56 #include <unistd.h>
57 #include <sys/stat.h>
58 
59 #include "access/rewriteheap.h"
60 #include "access/transam.h"
61 #include "access/tuptoaster.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "lib/binaryheap.h"
66 #include "miscadmin.h"
67 #include "pgstat.h"
68 #include "replication/logical.h"
69 #include "replication/reorderbuffer.h"
70 #include "replication/slot.h"
71 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
72 #include "storage/bufmgr.h"
73 #include "storage/fd.h"
74 #include "storage/sinval.h"
75 #include "utils/builtins.h"
76 #include "utils/combocid.h"
77 #include "utils/memdebug.h"
78 #include "utils/memutils.h"
79 #include "utils/rel.h"
80 #include "utils/relfilenodemap.h"
81 #include "utils/tqual.h"
82 
83 
84 /* entry for a hash table we use to map from xid to our transaction state */
85 typedef struct ReorderBufferTXNByIdEnt
86 {
87 	TransactionId xid;
88 	ReorderBufferTXN *txn;
89 } ReorderBufferTXNByIdEnt;
90 
91 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
92 typedef struct ReorderBufferTupleCidKey
93 {
94 	RelFileNode relnode;
95 	ItemPointerData tid;
96 } ReorderBufferTupleCidKey;
97 
98 typedef struct ReorderBufferTupleCidEnt
99 {
100 	ReorderBufferTupleCidKey key;
101 	CommandId	cmin;
102 	CommandId	cmax;
103 	CommandId	combocid;		/* just for debugging */
104 } ReorderBufferTupleCidEnt;
105 
106 /* k-way in-order change iteration support structures */
107 typedef struct ReorderBufferIterTXNEntry
108 {
109 	XLogRecPtr	lsn;
110 	ReorderBufferChange *change;
111 	ReorderBufferTXN *txn;
112 	File		fd;
113 	XLogSegNo	segno;
114 } ReorderBufferIterTXNEntry;
115 
116 typedef struct ReorderBufferIterTXNState
117 {
118 	binaryheap *heap;
119 	Size		nr_txns;
120 	dlist_head	old_change;
121 	ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
122 } ReorderBufferIterTXNState;
123 
124 /* toast datastructures */
125 typedef struct ReorderBufferToastEnt
126 {
127 	Oid			chunk_id;		/* toast_table.chunk_id */
128 	int32		last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
129 								 * have seen */
130 	Size		num_chunks;		/* number of chunks we've already seen */
131 	Size		size;			/* combined size of chunks seen */
132 	dlist_head	chunks;			/* linked list of chunks */
133 	struct varlena *reconstructed;	/* reconstructed varlena now pointed to in
134 									 * main tup */
135 } ReorderBufferToastEnt;
136 
137 /* Disk serialization support datastructures */
138 typedef struct ReorderBufferDiskChange
139 {
140 	Size		size;
141 	ReorderBufferChange change;
142 	/* data follows */
143 } ReorderBufferDiskChange;
144 
145 /*
146  * Maximum number of changes kept in memory, per transaction. After that,
147  * changes are spooled to disk.
148  *
149  * The current value should be sufficient to decode the entire transaction
150  * without hitting disk in OLTP workloads, while starting to spool to disk in
151  * other workloads reasonably fast.
152  *
153  * At some point in the future it probably makes sense to have a more elaborate
154  * resource management here, but it's not entirely clear what that would look
155  * like.
156  */
157 static const Size max_changes_in_memory = 4096;
158 
159 /* ---------------------------------------
160  * primary reorderbuffer support routines
161  * ---------------------------------------
162  */
163 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
164 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
165 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
166 					  TransactionId xid, bool create, bool *is_new,
167 					  XLogRecPtr lsn, bool create_as_top);
168 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
169 								  ReorderBufferTXN *subtxn);
170 
171 static void AssertTXNLsnOrder(ReorderBuffer *rb);
172 
173 /* ---------------------------------------
174  * support functions for lsn-order iterating over the ->changes of a
175  * transaction and its subtransactions
176  *
177  * used for iteration over the k-way heap merge of a transaction and its
178  * subtransactions
179  * ---------------------------------------
180  */
181 static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
182 									 ReorderBufferIterTXNState *volatile *iter_state);
183 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
184 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
185 						   ReorderBufferIterTXNState *state);
186 static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
187 
188 /*
189  * ---------------------------------------
190  * Disk serialization support functions
191  * ---------------------------------------
192  */
193 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
195 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
196 							 int fd, ReorderBufferChange *change);
197 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
198 							File *fd, XLogSegNo *segno);
199 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
200 						   char *change);
201 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
202 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
203 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
204 							TransactionId xid, XLogSegNo segno);
205 
206 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
207 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
208 					  ReorderBufferTXN *txn, CommandId cid);
209 
210 /* ---------------------------------------
211  * toast reassembly support
212  * ---------------------------------------
213  */
214 static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
215 static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
216 static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
217 						  Relation relation, ReorderBufferChange *change);
218 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
219 							  Relation relation, ReorderBufferChange *change);
220 
221 
222 /*
223  * Allocate a new ReorderBuffer and clean out any old serialized state from
224  * prior ReorderBuffer instances for the same slot.
225  */
226 ReorderBuffer *
ReorderBufferAllocate(void)227 ReorderBufferAllocate(void)
228 {
229 	ReorderBuffer *buffer;
230 	HASHCTL		hash_ctl;
231 	MemoryContext new_ctx;
232 
233 	Assert(MyReplicationSlot != NULL);
234 
235 	/* allocate memory in own context, to have better accountability */
236 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
237 									"ReorderBuffer",
238 									ALLOCSET_DEFAULT_SIZES);
239 
240 	buffer =
241 		(ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
242 
243 	memset(&hash_ctl, 0, sizeof(hash_ctl));
244 
245 	buffer->context = new_ctx;
246 
247 	buffer->change_context = SlabContextCreate(new_ctx,
248 											   "Change",
249 											   SLAB_DEFAULT_BLOCK_SIZE,
250 											   sizeof(ReorderBufferChange));
251 
252 	buffer->txn_context = SlabContextCreate(new_ctx,
253 											"TXN",
254 											SLAB_DEFAULT_BLOCK_SIZE,
255 											sizeof(ReorderBufferTXN));
256 
257 	buffer->tup_context = GenerationContextCreate(new_ctx,
258 												  "Tuples",
259 												  SLAB_LARGE_BLOCK_SIZE);
260 
261 	hash_ctl.keysize = sizeof(TransactionId);
262 	hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
263 	hash_ctl.hcxt = buffer->context;
264 
265 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
266 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
267 
268 	buffer->by_txn_last_xid = InvalidTransactionId;
269 	buffer->by_txn_last_txn = NULL;
270 
271 	buffer->outbuf = NULL;
272 	buffer->outbufsize = 0;
273 
274 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
275 
276 	dlist_init(&buffer->toplevel_by_lsn);
277 	dlist_init(&buffer->txns_by_base_snapshot_lsn);
278 
279 	/*
280 	 * Ensure there's no stale data from prior uses of this slot, in case some
281 	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
282 	 * produce duplicated txns, and it's very cheap if there's nothing there.
283 	 */
284 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
285 
286 	return buffer;
287 }
288 
289 /*
290  * Free a ReorderBuffer
291  */
292 void
ReorderBufferFree(ReorderBuffer * rb)293 ReorderBufferFree(ReorderBuffer *rb)
294 {
295 	MemoryContext context = rb->context;
296 
297 	/*
298 	 * We free separately allocated data by entirely scrapping reorderbuffer's
299 	 * memory context.
300 	 */
301 	MemoryContextDelete(context);
302 
303 	/* Free disk space used by unconsumed reorder buffers */
304 	ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
305 }
306 
307 /*
308  * Get an unused, possibly preallocated, ReorderBufferTXN.
309  */
310 static ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer * rb)311 ReorderBufferGetTXN(ReorderBuffer *rb)
312 {
313 	ReorderBufferTXN *txn;
314 
315 	txn = (ReorderBufferTXN *)
316 		MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
317 
318 	memset(txn, 0, sizeof(ReorderBufferTXN));
319 
320 	dlist_init(&txn->changes);
321 	dlist_init(&txn->tuplecids);
322 	dlist_init(&txn->subtxns);
323 
324 	return txn;
325 }
326 
327 /*
328  * Free a ReorderBufferTXN.
329  */
330 static void
ReorderBufferReturnTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)331 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
332 {
333 	/* clean the lookup cache if we were cached (quite likely) */
334 	if (rb->by_txn_last_xid == txn->xid)
335 	{
336 		rb->by_txn_last_xid = InvalidTransactionId;
337 		rb->by_txn_last_txn = NULL;
338 	}
339 
340 	/* free data that's contained */
341 
342 	if (txn->tuplecid_hash != NULL)
343 	{
344 		hash_destroy(txn->tuplecid_hash);
345 		txn->tuplecid_hash = NULL;
346 	}
347 
348 	if (txn->invalidations)
349 	{
350 		pfree(txn->invalidations);
351 		txn->invalidations = NULL;
352 	}
353 
354 	/* Reset the toast hash */
355 	ReorderBufferToastReset(rb, txn);
356 
357 	pfree(txn);
358 }
359 
360 /*
361  * Get an fresh ReorderBufferChange.
362  */
363 ReorderBufferChange *
ReorderBufferGetChange(ReorderBuffer * rb)364 ReorderBufferGetChange(ReorderBuffer *rb)
365 {
366 	ReorderBufferChange *change;
367 
368 	change = (ReorderBufferChange *)
369 		MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
370 
371 	memset(change, 0, sizeof(ReorderBufferChange));
372 	return change;
373 }
374 
375 /*
376  * Free an ReorderBufferChange.
377  */
378 void
ReorderBufferReturnChange(ReorderBuffer * rb,ReorderBufferChange * change)379 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
380 {
381 	/* free contained data */
382 	switch (change->action)
383 	{
384 		case REORDER_BUFFER_CHANGE_INSERT:
385 		case REORDER_BUFFER_CHANGE_UPDATE:
386 		case REORDER_BUFFER_CHANGE_DELETE:
387 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
388 			if (change->data.tp.newtuple)
389 			{
390 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
391 				change->data.tp.newtuple = NULL;
392 			}
393 
394 			if (change->data.tp.oldtuple)
395 			{
396 				ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
397 				change->data.tp.oldtuple = NULL;
398 			}
399 			break;
400 		case REORDER_BUFFER_CHANGE_MESSAGE:
401 			if (change->data.msg.prefix != NULL)
402 				pfree(change->data.msg.prefix);
403 			change->data.msg.prefix = NULL;
404 			if (change->data.msg.message != NULL)
405 				pfree(change->data.msg.message);
406 			change->data.msg.message = NULL;
407 			break;
408 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
409 			if (change->data.snapshot)
410 			{
411 				ReorderBufferFreeSnap(rb, change->data.snapshot);
412 				change->data.snapshot = NULL;
413 			}
414 			break;
415 			/* no data in addition to the struct itself */
416 		case REORDER_BUFFER_CHANGE_TRUNCATE:
417 			if (change->data.truncate.relids != NULL)
418 			{
419 				ReorderBufferReturnRelids(rb, change->data.truncate.relids);
420 				change->data.truncate.relids = NULL;
421 			}
422 			break;
423 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
424 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
425 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
426 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
427 			break;
428 	}
429 
430 	pfree(change);
431 }
432 
433 /*
434  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
435  * tuple_len (excluding header overhead).
436  */
437 ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer * rb,Size tuple_len)438 ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
439 {
440 	ReorderBufferTupleBuf *tuple;
441 	Size		alloc_len;
442 
443 	alloc_len = tuple_len + SizeofHeapTupleHeader;
444 
445 	tuple = (ReorderBufferTupleBuf *)
446 		MemoryContextAlloc(rb->tup_context,
447 						   sizeof(ReorderBufferTupleBuf) +
448 						   MAXIMUM_ALIGNOF + alloc_len);
449 	tuple->alloc_tuple_size = alloc_len;
450 	tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
451 
452 	return tuple;
453 }
454 
455 /*
456  * Free an ReorderBufferTupleBuf.
457  */
458 void
ReorderBufferReturnTupleBuf(ReorderBuffer * rb,ReorderBufferTupleBuf * tuple)459 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
460 {
461 	pfree(tuple);
462 }
463 
464 /*
465  * Get an array for relids of truncated relations.
466  *
467  * We use the global memory context (for the whole reorder buffer), because
468  * none of the existing ones seems like a good match (some are SLAB, so we
469  * can't use those, and tup_context is meant for tuple data, not relids). We
470  * could add yet another context, but it seems like an overkill - TRUNCATE is
471  * not particularly common operation, so it does not seem worth it.
472  */
473 Oid *
ReorderBufferGetRelids(ReorderBuffer * rb,int nrelids)474 ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
475 {
476 	Oid	   *relids;
477 	Size	alloc_len;
478 
479 	alloc_len = sizeof(Oid) * nrelids;
480 
481 	relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
482 
483 	return relids;
484 }
485 
486 /*
487  * Free an array of relids.
488  */
489 void
ReorderBufferReturnRelids(ReorderBuffer * rb,Oid * relids)490 ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
491 {
492 	pfree(relids);
493 }
494 
495 /*
496  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
497  * If create is true, and a transaction doesn't already exist, create it
498  * (with the given LSN, and as top transaction if that's specified);
499  * when this happens, is_new is set to true.
500  */
501 static ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer * rb,TransactionId xid,bool create,bool * is_new,XLogRecPtr lsn,bool create_as_top)502 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
503 					  bool *is_new, XLogRecPtr lsn, bool create_as_top)
504 {
505 	ReorderBufferTXN *txn;
506 	ReorderBufferTXNByIdEnt *ent;
507 	bool		found;
508 
509 	Assert(TransactionIdIsValid(xid));
510 
511 	/*
512 	 * Check the one-entry lookup cache first
513 	 */
514 	if (TransactionIdIsValid(rb->by_txn_last_xid) &&
515 		rb->by_txn_last_xid == xid)
516 	{
517 		txn = rb->by_txn_last_txn;
518 
519 		if (txn != NULL)
520 		{
521 			/* found it, and it's valid */
522 			if (is_new)
523 				*is_new = false;
524 			return txn;
525 		}
526 
527 		/*
528 		 * cached as non-existent, and asked not to create? Then nothing else
529 		 * to do.
530 		 */
531 		if (!create)
532 			return NULL;
533 		/* otherwise fall through to create it */
534 	}
535 
536 	/*
537 	 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
538 	 * to create an entry.
539 	 */
540 
541 	/* search the lookup table */
542 	ent = (ReorderBufferTXNByIdEnt *)
543 		hash_search(rb->by_txn,
544 					(void *) &xid,
545 					create ? HASH_ENTER : HASH_FIND,
546 					&found);
547 	if (found)
548 		txn = ent->txn;
549 	else if (create)
550 	{
551 		/* initialize the new entry, if creation was requested */
552 		Assert(ent != NULL);
553 		Assert(lsn != InvalidXLogRecPtr);
554 
555 		ent->txn = ReorderBufferGetTXN(rb);
556 		ent->txn->xid = xid;
557 		txn = ent->txn;
558 		txn->first_lsn = lsn;
559 		txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
560 
561 		if (create_as_top)
562 		{
563 			dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
564 			AssertTXNLsnOrder(rb);
565 		}
566 	}
567 	else
568 		txn = NULL;				/* not found and not asked to create */
569 
570 	/* update cache */
571 	rb->by_txn_last_xid = xid;
572 	rb->by_txn_last_txn = txn;
573 
574 	if (is_new)
575 		*is_new = !found;
576 
577 	Assert(!create || txn != NULL);
578 	return txn;
579 }
580 
581 /*
582  * Queue a change into a transaction so it can be replayed upon commit.
583  */
584 void
ReorderBufferQueueChange(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,ReorderBufferChange * change)585 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
586 						 ReorderBufferChange *change)
587 {
588 	ReorderBufferTXN *txn;
589 
590 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
591 
592 	change->lsn = lsn;
593 	Assert(InvalidXLogRecPtr != lsn);
594 	dlist_push_tail(&txn->changes, &change->node);
595 	txn->nentries++;
596 	txn->nentries_mem++;
597 
598 	ReorderBufferCheckSerializeTXN(rb, txn);
599 }
600 
601 /*
602  * Queue message into a transaction so it can be processed upon commit.
603  */
604 void
ReorderBufferQueueMessage(ReorderBuffer * rb,TransactionId xid,Snapshot snapshot,XLogRecPtr lsn,bool transactional,const char * prefix,Size message_size,const char * message)605 ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
606 						  Snapshot snapshot, XLogRecPtr lsn,
607 						  bool transactional, const char *prefix,
608 						  Size message_size, const char *message)
609 {
610 	if (transactional)
611 	{
612 		MemoryContext oldcontext;
613 		ReorderBufferChange *change;
614 
615 		Assert(xid != InvalidTransactionId);
616 
617 		oldcontext = MemoryContextSwitchTo(rb->context);
618 
619 		change = ReorderBufferGetChange(rb);
620 		change->action = REORDER_BUFFER_CHANGE_MESSAGE;
621 		change->data.msg.prefix = pstrdup(prefix);
622 		change->data.msg.message_size = message_size;
623 		change->data.msg.message = palloc(message_size);
624 		memcpy(change->data.msg.message, message, message_size);
625 
626 		ReorderBufferQueueChange(rb, xid, lsn, change);
627 
628 		MemoryContextSwitchTo(oldcontext);
629 	}
630 	else
631 	{
632 		ReorderBufferTXN *txn = NULL;
633 		volatile Snapshot snapshot_now = snapshot;
634 
635 		if (xid != InvalidTransactionId)
636 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
637 
638 		/* setup snapshot to allow catalog access */
639 		SetupHistoricSnapshot(snapshot_now, NULL);
640 		PG_TRY();
641 		{
642 			rb->message(rb, txn, lsn, false, prefix, message_size, message);
643 
644 			TeardownHistoricSnapshot(false);
645 		}
646 		PG_CATCH();
647 		{
648 			TeardownHistoricSnapshot(true);
649 			PG_RE_THROW();
650 		}
651 		PG_END_TRY();
652 	}
653 }
654 
655 /*
656  * AssertTXNLsnOrder
657  *		Verify LSN ordering of transaction lists in the reorderbuffer
658  *
659  * Other LSN-related invariants are checked too.
660  *
661  * No-op if assertions are not in use.
662  */
663 static void
AssertTXNLsnOrder(ReorderBuffer * rb)664 AssertTXNLsnOrder(ReorderBuffer *rb)
665 {
666 #ifdef USE_ASSERT_CHECKING
667 	dlist_iter	iter;
668 	XLogRecPtr	prev_first_lsn = InvalidXLogRecPtr;
669 	XLogRecPtr	prev_base_snap_lsn = InvalidXLogRecPtr;
670 
671 	dlist_foreach(iter, &rb->toplevel_by_lsn)
672 	{
673 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
674 													iter.cur);
675 
676 		/* start LSN must be set */
677 		Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
678 
679 		/* If there is an end LSN, it must be higher than start LSN */
680 		if (cur_txn->end_lsn != InvalidXLogRecPtr)
681 			Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
682 
683 		/* Current initial LSN must be strictly higher than previous */
684 		if (prev_first_lsn != InvalidXLogRecPtr)
685 			Assert(prev_first_lsn < cur_txn->first_lsn);
686 
687 		/* known-as-subtxn txns must not be listed */
688 		Assert(!cur_txn->is_known_as_subxact);
689 
690 		prev_first_lsn = cur_txn->first_lsn;
691 	}
692 
693 	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
694 	{
695 		ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
696 													base_snapshot_node,
697 													iter.cur);
698 
699 		/* base snapshot (and its LSN) must be set */
700 		Assert(cur_txn->base_snapshot != NULL);
701 		Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
702 
703 		/* current LSN must be strictly higher than previous */
704 		if (prev_base_snap_lsn != InvalidXLogRecPtr)
705 			Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
706 
707 		/* known-as-subtxn txns must not be listed */
708 		Assert(!cur_txn->is_known_as_subxact);
709 
710 		prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
711 	}
712 #endif
713 }
714 
715 /*
716  * ReorderBufferGetOldestTXN
717  *		Return oldest transaction in reorderbuffer
718  */
719 ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer * rb)720 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
721 {
722 	ReorderBufferTXN *txn;
723 
724 	AssertTXNLsnOrder(rb);
725 
726 	if (dlist_is_empty(&rb->toplevel_by_lsn))
727 		return NULL;
728 
729 	txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
730 
731 	Assert(!txn->is_known_as_subxact);
732 	Assert(txn->first_lsn != InvalidXLogRecPtr);
733 	return txn;
734 }
735 
736 /*
737  * ReorderBufferGetOldestXmin
738  *		Return oldest Xmin in reorderbuffer
739  *
740  * Returns oldest possibly running Xid from the point of view of snapshots
741  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
742  * there are none.
743  *
744  * Since snapshots are assigned monotonically, this equals the Xmin of the
745  * base snapshot with minimal base_snapshot_lsn.
746  */
747 TransactionId
ReorderBufferGetOldestXmin(ReorderBuffer * rb)748 ReorderBufferGetOldestXmin(ReorderBuffer *rb)
749 {
750 	ReorderBufferTXN *txn;
751 
752 	AssertTXNLsnOrder(rb);
753 
754 	if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
755 		return InvalidTransactionId;
756 
757 	txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
758 							 &rb->txns_by_base_snapshot_lsn);
759 	return txn->base_snapshot->xmin;
760 }
761 
762 void
ReorderBufferSetRestartPoint(ReorderBuffer * rb,XLogRecPtr ptr)763 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
764 {
765 	rb->current_restart_decoding_lsn = ptr;
766 }
767 
768 /*
769  * ReorderBufferAssignChild
770  *
771  * Make note that we know that subxid is a subtransaction of xid, seen as of
772  * the given lsn.
773  */
774 void
ReorderBufferAssignChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr lsn)775 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
776 						 TransactionId subxid, XLogRecPtr lsn)
777 {
778 	ReorderBufferTXN *txn;
779 	ReorderBufferTXN *subtxn;
780 	bool		new_top;
781 	bool		new_sub;
782 
783 	txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
784 	subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
785 
786 	if (!new_sub)
787 	{
788 		if (subtxn->is_known_as_subxact)
789 		{
790 			/* already associated, nothing to do */
791 			return;
792 		}
793 		else
794 		{
795 			/*
796 			 * We already saw this transaction, but initially added it to the
797 			 * list of top-level txns.  Now that we know it's not top-level,
798 			 * remove it from there.
799 			 */
800 			dlist_delete(&subtxn->node);
801 		}
802 	}
803 
804 	subtxn->is_known_as_subxact = true;
805 	subtxn->toplevel_xid = xid;
806 	Assert(subtxn->nsubtxns == 0);
807 
808 	/* add to subtransaction list */
809 	dlist_push_tail(&txn->subtxns, &subtxn->node);
810 	txn->nsubtxns++;
811 
812 	/* Possibly transfer the subtxn's snapshot to its top-level txn. */
813 	ReorderBufferTransferSnapToParent(txn, subtxn);
814 
815 	/* Verify LSN-ordering invariant */
816 	AssertTXNLsnOrder(rb);
817 }
818 
819 /*
820  * ReorderBufferTransferSnapToParent
821  *		Transfer base snapshot from subtxn to top-level txn, if needed
822  *
823  * This is done if the top-level txn doesn't have a base snapshot, or if the
824  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
825  * snapshot's LSN.  This can happen if there are no changes in the toplevel
826  * txn but there are some in the subtxn, or the first change in subtxn has
827  * earlier LSN than first change in the top-level txn and we learned about
828  * their kinship only now.
829  *
830  * The subtransaction's snapshot is cleared regardless of the transfer
831  * happening, since it's not needed anymore in either case.
832  *
833  * We do this as soon as we become aware of their kinship, to avoid queueing
834  * extra snapshots to txns known-as-subtxns -- only top-level txns will
835  * receive further snapshots.
836  */
837 static void
ReorderBufferTransferSnapToParent(ReorderBufferTXN * txn,ReorderBufferTXN * subtxn)838 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
839 								  ReorderBufferTXN *subtxn)
840 {
841 	Assert(subtxn->toplevel_xid == txn->xid);
842 
843 	if (subtxn->base_snapshot != NULL)
844 	{
845 		if (txn->base_snapshot == NULL ||
846 			subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
847 		{
848 			/*
849 			 * If the toplevel transaction already has a base snapshot but
850 			 * it's newer than the subxact's, purge it.
851 			 */
852 			if (txn->base_snapshot != NULL)
853 			{
854 				SnapBuildSnapDecRefcount(txn->base_snapshot);
855 				dlist_delete(&txn->base_snapshot_node);
856 			}
857 
858 			/*
859 			 * The snapshot is now the top transaction's; transfer it, and
860 			 * adjust the list position of the top transaction in the list by
861 			 * moving it to where the subtransaction is.
862 			 */
863 			txn->base_snapshot = subtxn->base_snapshot;
864 			txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
865 			dlist_insert_before(&subtxn->base_snapshot_node,
866 								&txn->base_snapshot_node);
867 
868 			/*
869 			 * The subtransaction doesn't have a snapshot anymore (so it
870 			 * mustn't be in the list.)
871 			 */
872 			subtxn->base_snapshot = NULL;
873 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
874 			dlist_delete(&subtxn->base_snapshot_node);
875 		}
876 		else
877 		{
878 			/* Base snap of toplevel is fine, so subxact's is not needed */
879 			SnapBuildSnapDecRefcount(subtxn->base_snapshot);
880 			dlist_delete(&subtxn->base_snapshot_node);
881 			subtxn->base_snapshot = NULL;
882 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
883 		}
884 	}
885 }
886 
887 /*
888  * Associate a subtransaction with its toplevel transaction at commit
889  * time. There may be no further changes added after this.
890  */
891 void
ReorderBufferCommitChild(ReorderBuffer * rb,TransactionId xid,TransactionId subxid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn)892 ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
893 						 TransactionId subxid, XLogRecPtr commit_lsn,
894 						 XLogRecPtr end_lsn)
895 {
896 	ReorderBufferTXN *subtxn;
897 
898 	subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
899 								   InvalidXLogRecPtr, false);
900 
901 	/*
902 	 * No need to do anything if that subtxn didn't contain any changes
903 	 */
904 	if (!subtxn)
905 		return;
906 
907 	subtxn->final_lsn = commit_lsn;
908 	subtxn->end_lsn = end_lsn;
909 
910 	/*
911 	 * Assign this subxact as a child of the toplevel xact (no-op if already
912 	 * done.)
913 	 */
914 	ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
915 }
916 
917 
918 /*
919  * Support for efficiently iterating over a transaction's and its
920  * subtransactions' changes.
921  *
922  * We do by doing a k-way merge between transactions/subtransactions. For that
923  * we model the current heads of the different transactions as a binary heap
924  * so we easily know which (sub-)transaction has the change with the smallest
925  * lsn next.
926  *
927  * We assume the changes in individual transactions are already sorted by LSN.
928  */
929 
930 /*
931  * Binary heap comparison function.
932  */
933 static int
ReorderBufferIterCompare(Datum a,Datum b,void * arg)934 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
935 {
936 	ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
937 	XLogRecPtr	pos_a = state->entries[DatumGetInt32(a)].lsn;
938 	XLogRecPtr	pos_b = state->entries[DatumGetInt32(b)].lsn;
939 
940 	if (pos_a < pos_b)
941 		return 1;
942 	else if (pos_a == pos_b)
943 		return 0;
944 	return -1;
945 }
946 
947 /*
948  * Allocate & initialize an iterator which iterates in lsn order over a
949  * transaction and all its subtransactions.
950  *
951  * Note: The iterator state is returned through iter_state parameter rather
952  * than the function's return value.  This is because the state gets cleaned up
953  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
954  * back the state even if this function throws an exception.
955  */
956 static void
ReorderBufferIterTXNInit(ReorderBuffer * rb,ReorderBufferTXN * txn,ReorderBufferIterTXNState * volatile * iter_state)957 ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
958 						 ReorderBufferIterTXNState *volatile *iter_state)
959 {
960 	Size		nr_txns = 0;
961 	ReorderBufferIterTXNState *state;
962 	dlist_iter	cur_txn_i;
963 	int32		off;
964 
965 	*iter_state = NULL;
966 
967 	/*
968 	 * Calculate the size of our heap: one element for every transaction that
969 	 * contains changes.  (Besides the transactions already in the reorder
970 	 * buffer, we count the one we were directly passed.)
971 	 */
972 	if (txn->nentries > 0)
973 		nr_txns++;
974 
975 	dlist_foreach(cur_txn_i, &txn->subtxns)
976 	{
977 		ReorderBufferTXN *cur_txn;
978 
979 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
980 
981 		if (cur_txn->nentries > 0)
982 			nr_txns++;
983 	}
984 
985 	/*
986 	 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
987 	 * need to allocate/build a heap then.
988 	 */
989 
990 	/* allocate iteration state */
991 	state = (ReorderBufferIterTXNState *)
992 		MemoryContextAllocZero(rb->context,
993 							   sizeof(ReorderBufferIterTXNState) +
994 							   sizeof(ReorderBufferIterTXNEntry) * nr_txns);
995 
996 	state->nr_txns = nr_txns;
997 	dlist_init(&state->old_change);
998 
999 	for (off = 0; off < state->nr_txns; off++)
1000 	{
1001 		state->entries[off].fd = -1;
1002 		state->entries[off].segno = 0;
1003 	}
1004 
1005 	/* allocate heap */
1006 	state->heap = binaryheap_allocate(state->nr_txns,
1007 									  ReorderBufferIterCompare,
1008 									  state);
1009 
1010 	/* Now that the state fields are initialized, it is safe to return it. */
1011 	*iter_state = state;
1012 
1013 	/*
1014 	 * Now insert items into the binary heap, in an unordered fashion.  (We
1015 	 * will run a heap assembly step at the end; this is more efficient.)
1016 	 */
1017 
1018 	off = 0;
1019 
1020 	/* add toplevel transaction if it contains changes */
1021 	if (txn->nentries > 0)
1022 	{
1023 		ReorderBufferChange *cur_change;
1024 
1025 		if (txn->serialized)
1026 		{
1027 			/* serialize remaining changes */
1028 			ReorderBufferSerializeTXN(rb, txn);
1029 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1030 										&state->entries[off].segno);
1031 		}
1032 
1033 		cur_change = dlist_head_element(ReorderBufferChange, node,
1034 										&txn->changes);
1035 
1036 		state->entries[off].lsn = cur_change->lsn;
1037 		state->entries[off].change = cur_change;
1038 		state->entries[off].txn = txn;
1039 
1040 		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1041 	}
1042 
1043 	/* add subtransactions if they contain changes */
1044 	dlist_foreach(cur_txn_i, &txn->subtxns)
1045 	{
1046 		ReorderBufferTXN *cur_txn;
1047 
1048 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1049 
1050 		if (cur_txn->nentries > 0)
1051 		{
1052 			ReorderBufferChange *cur_change;
1053 
1054 			if (cur_txn->serialized)
1055 			{
1056 				/* serialize remaining changes */
1057 				ReorderBufferSerializeTXN(rb, cur_txn);
1058 				ReorderBufferRestoreChanges(rb, cur_txn,
1059 											&state->entries[off].fd,
1060 											&state->entries[off].segno);
1061 			}
1062 			cur_change = dlist_head_element(ReorderBufferChange, node,
1063 											&cur_txn->changes);
1064 
1065 			state->entries[off].lsn = cur_change->lsn;
1066 			state->entries[off].change = cur_change;
1067 			state->entries[off].txn = cur_txn;
1068 
1069 			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1070 		}
1071 	}
1072 
1073 	/* assemble a valid binary heap */
1074 	binaryheap_build(state->heap);
1075 }
1076 
1077 /*
1078  * Return the next change when iterating over a transaction and its
1079  * subtransactions.
1080  *
1081  * Returns NULL when no further changes exist.
1082  */
1083 static ReorderBufferChange *
ReorderBufferIterTXNNext(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1084 ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1085 {
1086 	ReorderBufferChange *change;
1087 	ReorderBufferIterTXNEntry *entry;
1088 	int32		off;
1089 
1090 	/* nothing there anymore */
1091 	if (state->heap->bh_size == 0)
1092 		return NULL;
1093 
1094 	off = DatumGetInt32(binaryheap_first(state->heap));
1095 	entry = &state->entries[off];
1096 
1097 	/* free memory we might have "leaked" in the previous *Next call */
1098 	if (!dlist_is_empty(&state->old_change))
1099 	{
1100 		change = dlist_container(ReorderBufferChange, node,
1101 								 dlist_pop_head_node(&state->old_change));
1102 		ReorderBufferReturnChange(rb, change);
1103 		Assert(dlist_is_empty(&state->old_change));
1104 	}
1105 
1106 	change = entry->change;
1107 
1108 	/*
1109 	 * update heap with information about which transaction has the next
1110 	 * relevant change in LSN order
1111 	 */
1112 
1113 	/* there are in-memory changes */
1114 	if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1115 	{
1116 		dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1117 		ReorderBufferChange *next_change =
1118 		dlist_container(ReorderBufferChange, node, next);
1119 
1120 		/* txn stays the same */
1121 		state->entries[off].lsn = next_change->lsn;
1122 		state->entries[off].change = next_change;
1123 
1124 		binaryheap_replace_first(state->heap, Int32GetDatum(off));
1125 		return change;
1126 	}
1127 
1128 	/* try to load changes from disk */
1129 	if (entry->txn->nentries != entry->txn->nentries_mem)
1130 	{
1131 		/*
1132 		 * Ugly: restoring changes will reuse *Change records, thus delete the
1133 		 * current one from the per-tx list and only free in the next call.
1134 		 */
1135 		dlist_delete(&change->node);
1136 		dlist_push_tail(&state->old_change, &change->node);
1137 
1138 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1139 										&state->entries[off].segno))
1140 		{
1141 			/* successfully restored changes from disk */
1142 			ReorderBufferChange *next_change =
1143 			dlist_head_element(ReorderBufferChange, node,
1144 							   &entry->txn->changes);
1145 
1146 			elog(DEBUG2, "restored %u/%u changes from disk",
1147 				 (uint32) entry->txn->nentries_mem,
1148 				 (uint32) entry->txn->nentries);
1149 
1150 			Assert(entry->txn->nentries_mem);
1151 			/* txn stays the same */
1152 			state->entries[off].lsn = next_change->lsn;
1153 			state->entries[off].change = next_change;
1154 			binaryheap_replace_first(state->heap, Int32GetDatum(off));
1155 
1156 			return change;
1157 		}
1158 	}
1159 
1160 	/* ok, no changes there anymore, remove */
1161 	binaryheap_remove_first(state->heap);
1162 
1163 	return change;
1164 }
1165 
1166 /*
1167  * Deallocate the iterator
1168  */
1169 static void
ReorderBufferIterTXNFinish(ReorderBuffer * rb,ReorderBufferIterTXNState * state)1170 ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1171 						   ReorderBufferIterTXNState *state)
1172 {
1173 	int32		off;
1174 
1175 	for (off = 0; off < state->nr_txns; off++)
1176 	{
1177 		if (state->entries[off].fd != -1)
1178 			FileClose(state->entries[off].fd);
1179 	}
1180 
1181 	/* free memory we might have "leaked" in the last *Next call */
1182 	if (!dlist_is_empty(&state->old_change))
1183 	{
1184 		ReorderBufferChange *change;
1185 
1186 		change = dlist_container(ReorderBufferChange, node,
1187 								 dlist_pop_head_node(&state->old_change));
1188 		ReorderBufferReturnChange(rb, change);
1189 		Assert(dlist_is_empty(&state->old_change));
1190 	}
1191 
1192 	binaryheap_free(state->heap);
1193 	pfree(state);
1194 }
1195 
1196 /*
1197  * Cleanup the contents of a transaction, usually after the transaction
1198  * committed or aborted.
1199  */
1200 static void
ReorderBufferCleanupTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)1201 ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1202 {
1203 	bool		found;
1204 	dlist_mutable_iter iter;
1205 
1206 	/* cleanup subtransactions & their changes */
1207 	dlist_foreach_modify(iter, &txn->subtxns)
1208 	{
1209 		ReorderBufferTXN *subtxn;
1210 
1211 		subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1212 
1213 		/*
1214 		 * Subtransactions are always associated to the toplevel TXN, even if
1215 		 * they originally were happening inside another subtxn, so we won't
1216 		 * ever recurse more than one level deep here.
1217 		 */
1218 		Assert(subtxn->is_known_as_subxact);
1219 		Assert(subtxn->nsubtxns == 0);
1220 
1221 		ReorderBufferCleanupTXN(rb, subtxn);
1222 	}
1223 
1224 	/* cleanup changes in the toplevel txn */
1225 	dlist_foreach_modify(iter, &txn->changes)
1226 	{
1227 		ReorderBufferChange *change;
1228 
1229 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1230 
1231 		ReorderBufferReturnChange(rb, change);
1232 	}
1233 
1234 	/*
1235 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1236 	 * They are always stored in the toplevel transaction.
1237 	 */
1238 	dlist_foreach_modify(iter, &txn->tuplecids)
1239 	{
1240 		ReorderBufferChange *change;
1241 
1242 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1243 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1244 		ReorderBufferReturnChange(rb, change);
1245 	}
1246 
1247 	/*
1248 	 * Cleanup the base snapshot, if set.
1249 	 */
1250 	if (txn->base_snapshot != NULL)
1251 	{
1252 		SnapBuildSnapDecRefcount(txn->base_snapshot);
1253 		dlist_delete(&txn->base_snapshot_node);
1254 	}
1255 
1256 	/*
1257 	 * Remove TXN from its containing list.
1258 	 *
1259 	 * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1260 	 * parent's list of known subxacts; this leaves the parent's nsubxacts
1261 	 * count too high, but we don't care.  Otherwise, we are deleting the TXN
1262 	 * from the LSN-ordered list of toplevel TXNs.
1263 	 */
1264 	dlist_delete(&txn->node);
1265 
1266 	/* now remove reference from buffer */
1267 	hash_search(rb->by_txn,
1268 				(void *) &txn->xid,
1269 				HASH_REMOVE,
1270 				&found);
1271 	Assert(found);
1272 
1273 	/* remove entries spilled to disk */
1274 	if (txn->serialized)
1275 		ReorderBufferRestoreCleanup(rb, txn);
1276 
1277 	/* deallocate */
1278 	ReorderBufferReturnTXN(rb, txn);
1279 }
1280 
1281 /*
1282  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1283  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1284  */
1285 static void
ReorderBufferBuildTupleCidHash(ReorderBuffer * rb,ReorderBufferTXN * txn)1286 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1287 {
1288 	dlist_iter	iter;
1289 	HASHCTL		hash_ctl;
1290 
1291 	if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1292 		return;
1293 
1294 	memset(&hash_ctl, 0, sizeof(hash_ctl));
1295 
1296 	hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1297 	hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1298 	hash_ctl.hcxt = rb->context;
1299 
1300 	/*
1301 	 * create the hash with the exact number of to-be-stored tuplecids from
1302 	 * the start
1303 	 */
1304 	txn->tuplecid_hash =
1305 		hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1306 					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1307 
1308 	dlist_foreach(iter, &txn->tuplecids)
1309 	{
1310 		ReorderBufferTupleCidKey key;
1311 		ReorderBufferTupleCidEnt *ent;
1312 		bool		found;
1313 		ReorderBufferChange *change;
1314 
1315 		change = dlist_container(ReorderBufferChange, node, iter.cur);
1316 
1317 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1318 
1319 		/* be careful about padding */
1320 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1321 
1322 		key.relnode = change->data.tuplecid.node;
1323 
1324 		ItemPointerCopy(&change->data.tuplecid.tid,
1325 						&key.tid);
1326 
1327 		ent = (ReorderBufferTupleCidEnt *)
1328 			hash_search(txn->tuplecid_hash,
1329 						(void *) &key,
1330 						HASH_ENTER,
1331 						&found);
1332 		if (!found)
1333 		{
1334 			ent->cmin = change->data.tuplecid.cmin;
1335 			ent->cmax = change->data.tuplecid.cmax;
1336 			ent->combocid = change->data.tuplecid.combocid;
1337 		}
1338 		else
1339 		{
1340 			/*
1341 			 * Maybe we already saw this tuple before in this transaction,
1342 			 * but if so it must have the same cmin.
1343 			 */
1344 			Assert(ent->cmin == change->data.tuplecid.cmin);
1345 
1346 			/*
1347 			 * cmax may be initially invalid, but once set it can only grow,
1348 			 * and never become invalid again.
1349 			 */
1350 			Assert((ent->cmax == InvalidCommandId) ||
1351 				   ((change->data.tuplecid.cmax != InvalidCommandId) &&
1352 					(change->data.tuplecid.cmax > ent->cmax)));
1353 			ent->cmax = change->data.tuplecid.cmax;
1354 		}
1355 	}
1356 }
1357 
1358 /*
1359  * Copy a provided snapshot so we can modify it privately. This is needed so
1360  * that catalog modifying transactions can look into intermediate catalog
1361  * states.
1362  */
1363 static Snapshot
ReorderBufferCopySnap(ReorderBuffer * rb,Snapshot orig_snap,ReorderBufferTXN * txn,CommandId cid)1364 ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1365 					  ReorderBufferTXN *txn, CommandId cid)
1366 {
1367 	Snapshot	snap;
1368 	dlist_iter	iter;
1369 	int			i = 0;
1370 	Size		size;
1371 
1372 	size = sizeof(SnapshotData) +
1373 		sizeof(TransactionId) * orig_snap->xcnt +
1374 		sizeof(TransactionId) * (txn->nsubtxns + 1);
1375 
1376 	snap = MemoryContextAllocZero(rb->context, size);
1377 	memcpy(snap, orig_snap, sizeof(SnapshotData));
1378 
1379 	snap->copied = true;
1380 	snap->active_count = 1;		/* mark as active so nobody frees it */
1381 	snap->regd_count = 0;
1382 	snap->xip = (TransactionId *) (snap + 1);
1383 
1384 	memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1385 
1386 	/*
1387 	 * snap->subxip contains all txids that belong to our transaction which we
1388 	 * need to check via cmin/cmax. That's why we store the toplevel
1389 	 * transaction in there as well.
1390 	 */
1391 	snap->subxip = snap->xip + snap->xcnt;
1392 	snap->subxip[i++] = txn->xid;
1393 
1394 	/*
1395 	 * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1396 	 * Since it's an upper boundary it is safe to use it for the allocation
1397 	 * above.
1398 	 */
1399 	snap->subxcnt = 1;
1400 
1401 	dlist_foreach(iter, &txn->subtxns)
1402 	{
1403 		ReorderBufferTXN *sub_txn;
1404 
1405 		sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1406 		snap->subxip[i++] = sub_txn->xid;
1407 		snap->subxcnt++;
1408 	}
1409 
1410 	/* sort so we can bsearch() later */
1411 	qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1412 
1413 	/* store the specified current CommandId */
1414 	snap->curcid = cid;
1415 
1416 	return snap;
1417 }
1418 
1419 /*
1420  * Free a previously ReorderBufferCopySnap'ed snapshot
1421  */
1422 static void
ReorderBufferFreeSnap(ReorderBuffer * rb,Snapshot snap)1423 ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1424 {
1425 	if (snap->copied)
1426 		pfree(snap);
1427 	else
1428 		SnapBuildSnapDecRefcount(snap);
1429 }
1430 
1431 /*
1432  * Perform the replay of a transaction and its non-aborted subtransactions.
1433  *
1434  * Subtransactions previously have to be processed by
1435  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1436  * transaction with ReorderBufferAssignChild.
1437  *
1438  * We currently can only decode a transaction's contents when its commit
1439  * record is read because that's the only place where we know about cache
1440  * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1441  * and subtransactions (using a k-way merge) and replay the changes in lsn
1442  * order.
1443  */
1444 void
ReorderBufferCommit(ReorderBuffer * rb,TransactionId xid,XLogRecPtr commit_lsn,XLogRecPtr end_lsn,TimestampTz commit_time,RepOriginId origin_id,XLogRecPtr origin_lsn)1445 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1446 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1447 					TimestampTz commit_time,
1448 					RepOriginId origin_id, XLogRecPtr origin_lsn)
1449 {
1450 	ReorderBufferTXN *txn;
1451 	volatile Snapshot snapshot_now;
1452 	volatile CommandId command_id = FirstCommandId;
1453 	bool		using_subtxn;
1454 	ReorderBufferIterTXNState *volatile iterstate = NULL;
1455 
1456 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1457 								false);
1458 
1459 	/* unknown transaction, nothing to replay */
1460 	if (txn == NULL)
1461 		return;
1462 
1463 	txn->final_lsn = commit_lsn;
1464 	txn->end_lsn = end_lsn;
1465 	txn->commit_time = commit_time;
1466 	txn->origin_id = origin_id;
1467 	txn->origin_lsn = origin_lsn;
1468 
1469 	/*
1470 	 * If this transaction has no snapshot, it didn't make any changes to the
1471 	 * database, so there's nothing to decode.  Note that
1472 	 * ReorderBufferCommitChild will have transferred any snapshots from
1473 	 * subtransactions if there were any.
1474 	 */
1475 	if (txn->base_snapshot == NULL)
1476 	{
1477 		Assert(txn->ninvalidations == 0);
1478 		ReorderBufferCleanupTXN(rb, txn);
1479 		return;
1480 	}
1481 
1482 	snapshot_now = txn->base_snapshot;
1483 
1484 	/* build data to be able to lookup the CommandIds of catalog tuples */
1485 	ReorderBufferBuildTupleCidHash(rb, txn);
1486 
1487 	/* setup the initial snapshot */
1488 	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1489 
1490 	/*
1491 	 * Decoding needs access to syscaches et al., which in turn use
1492 	 * heavyweight locks and such. Thus we need to have enough state around to
1493 	 * keep track of those.  The easiest way is to simply use a transaction
1494 	 * internally.  That also allows us to easily enforce that nothing writes
1495 	 * to the database by checking for xid assignments.
1496 	 *
1497 	 * When we're called via the SQL SRF there's already a transaction
1498 	 * started, so start an explicit subtransaction there.
1499 	 */
1500 	using_subtxn = IsTransactionOrTransactionBlock();
1501 
1502 	PG_TRY();
1503 	{
1504 		ReorderBufferChange *change;
1505 		ReorderBufferChange *specinsert = NULL;
1506 
1507 		if (using_subtxn)
1508 			BeginInternalSubTransaction("replay");
1509 		else
1510 			StartTransactionCommand();
1511 
1512 		rb->begin(rb, txn);
1513 
1514 		ReorderBufferIterTXNInit(rb, txn, &iterstate);
1515 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1516 		{
1517 			Relation	relation = NULL;
1518 			Oid			reloid;
1519 
1520 			switch (change->action)
1521 			{
1522 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1523 
1524 					/*
1525 					 * Confirmation for speculative insertion arrived. Simply
1526 					 * use as a normal record. It'll be cleaned up at the end
1527 					 * of INSERT processing.
1528 					 */
1529 					if (specinsert == NULL)
1530 						elog(ERROR, "invalid ordering of speculative insertion changes");
1531 					Assert(specinsert->data.tp.oldtuple == NULL);
1532 					change = specinsert;
1533 					change->action = REORDER_BUFFER_CHANGE_INSERT;
1534 
1535 					/* intentionally fall through */
1536 				case REORDER_BUFFER_CHANGE_INSERT:
1537 				case REORDER_BUFFER_CHANGE_UPDATE:
1538 				case REORDER_BUFFER_CHANGE_DELETE:
1539 					Assert(snapshot_now);
1540 
1541 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1542 												change->data.tp.relnode.relNode);
1543 
1544 					/*
1545 					 * Mapped catalog tuple without data, emitted while
1546 					 * catalog table was in the process of being rewritten. We
1547 					 * can fail to look up the relfilenode, because the the
1548 					 * relmapper has no "historic" view, in contrast to normal
1549 					 * the normal catalog during decoding. Thus repeated
1550 					 * rewrites can cause a lookup failure. That's OK because
1551 					 * we do not decode catalog changes anyway. Normally such
1552 					 * tuples would be skipped over below, but we can't
1553 					 * identify whether the table should be logically logged
1554 					 * without mapping the relfilenode to the oid.
1555 					 */
1556 					if (reloid == InvalidOid &&
1557 						change->data.tp.newtuple == NULL &&
1558 						change->data.tp.oldtuple == NULL)
1559 						goto change_done;
1560 					else if (reloid == InvalidOid)
1561 						elog(ERROR, "could not map filenode \"%s\" to relation OID",
1562 							 relpathperm(change->data.tp.relnode,
1563 										 MAIN_FORKNUM));
1564 
1565 					relation = RelationIdGetRelation(reloid);
1566 
1567 					if (!RelationIsValid(relation))
1568 						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1569 							 reloid,
1570 							 relpathperm(change->data.tp.relnode,
1571 										 MAIN_FORKNUM));
1572 
1573 					if (!RelationIsLogicallyLogged(relation))
1574 						goto change_done;
1575 
1576 					/*
1577 					 * Ignore temporary heaps created during DDL unless the
1578 					 * plugin has asked for them.
1579 					 */
1580 					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1581 						goto change_done;
1582 
1583 					/*
1584 					 * For now ignore sequence changes entirely. Most of the
1585 					 * time they don't log changes using records we
1586 					 * understand, so it doesn't make sense to handle the few
1587 					 * cases we do.
1588 					 */
1589 					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1590 						goto change_done;
1591 
1592 					/* user-triggered change */
1593 					if (!IsToastRelation(relation))
1594 					{
1595 						ReorderBufferToastReplace(rb, txn, relation, change);
1596 						rb->apply_change(rb, txn, relation, change);
1597 
1598 						/*
1599 						 * Only clear reassembled toast chunks if we're sure
1600 						 * they're not required anymore. The creator of the
1601 						 * tuple tells us.
1602 						 */
1603 						if (change->data.tp.clear_toast_afterwards)
1604 							ReorderBufferToastReset(rb, txn);
1605 					}
1606 					/* we're not interested in toast deletions */
1607 					else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1608 					{
1609 						/*
1610 						 * Need to reassemble the full toasted Datum in
1611 						 * memory, to ensure the chunks don't get reused till
1612 						 * we're done remove it from the list of this
1613 						 * transaction's changes. Otherwise it will get
1614 						 * freed/reused while restoring spooled data from
1615 						 * disk.
1616 						 */
1617 						Assert(change->data.tp.newtuple != NULL);
1618 
1619 						dlist_delete(&change->node);
1620 						ReorderBufferToastAppendChunk(rb, txn, relation,
1621 													  change);
1622 					}
1623 
1624 			change_done:
1625 
1626 					/*
1627 					 * If speculative insertion was confirmed, the record isn't
1628 					 * needed anymore.
1629 					 */
1630 					if (specinsert != NULL)
1631 					{
1632 						ReorderBufferReturnChange(rb, specinsert);
1633 						specinsert = NULL;
1634 					}
1635 
1636 					if (relation != NULL)
1637 					{
1638 						RelationClose(relation);
1639 						relation = NULL;
1640 					}
1641 					break;
1642 
1643 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1644 
1645 					/*
1646 					 * Speculative insertions are dealt with by delaying the
1647 					 * processing of the insert until the confirmation record
1648 					 * arrives. For that we simply unlink the record from the
1649 					 * chain, so it does not get freed/reused while restoring
1650 					 * spooled data from disk.
1651 					 *
1652 					 * This is safe in the face of concurrent catalog changes
1653 					 * because the relevant relation can't be changed between
1654 					 * speculative insertion and confirmation due to
1655 					 * CheckTableNotInUse() and locking.
1656 					 */
1657 
1658 					/* clear out a pending (and thus failed) speculation */
1659 					if (specinsert != NULL)
1660 					{
1661 						ReorderBufferReturnChange(rb, specinsert);
1662 						specinsert = NULL;
1663 					}
1664 
1665 					/* and memorize the pending insertion */
1666 					dlist_delete(&change->node);
1667 					specinsert = change;
1668 					break;
1669 
1670 				case REORDER_BUFFER_CHANGE_TRUNCATE:
1671 					{
1672 						int			i;
1673 						int			nrelids = change->data.truncate.nrelids;
1674 						int			nrelations = 0;
1675 						Relation   *relations;
1676 
1677 						relations = palloc0(nrelids * sizeof(Relation));
1678 						for (i = 0; i < nrelids; i++)
1679 						{
1680 							Oid			relid = change->data.truncate.relids[i];
1681 							Relation	relation;
1682 
1683 							relation = RelationIdGetRelation(relid);
1684 
1685 							if (!RelationIsValid(relation))
1686 								elog(ERROR, "could not open relation with OID %u", relid);
1687 
1688 							if (!RelationIsLogicallyLogged(relation))
1689 								continue;
1690 
1691 							relations[nrelations++] = relation;
1692 						}
1693 
1694 						rb->apply_truncate(rb, txn, nrelations, relations, change);
1695 
1696 						for (i = 0; i < nrelations; i++)
1697 							RelationClose(relations[i]);
1698 
1699 						break;
1700 					}
1701 
1702 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
1703 
1704 					/*
1705 					 * Abort for speculative insertion arrived. So cleanup the
1706 					 * specinsert tuple and toast hash.
1707 					 *
1708 					 * Note that we get the spec abort change for each toast
1709 					 * entry but we need to perform the cleanup only the first
1710 					 * time we get it for the main table.
1711 					 */
1712 					if (specinsert != NULL)
1713 					{
1714 						/*
1715 						 * We must clean the toast hash before processing a
1716 						 * completely new tuple to avoid confusion about the
1717 						 * previous tuple's toast chunks.
1718 						 */
1719 						Assert(change->data.tp.clear_toast_afterwards);
1720 						ReorderBufferToastReset(rb, txn);
1721 
1722 						/* We don't need this record anymore. */
1723 						ReorderBufferReturnChange(rb, specinsert);
1724 						specinsert = NULL;
1725 					}
1726 					break;
1727 
1728 				case REORDER_BUFFER_CHANGE_MESSAGE:
1729 					rb->message(rb, txn, change->lsn, true,
1730 								change->data.msg.prefix,
1731 								change->data.msg.message_size,
1732 								change->data.msg.message);
1733 					break;
1734 
1735 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1736 					/* get rid of the old */
1737 					TeardownHistoricSnapshot(false);
1738 
1739 					if (snapshot_now->copied)
1740 					{
1741 						ReorderBufferFreeSnap(rb, snapshot_now);
1742 						snapshot_now =
1743 							ReorderBufferCopySnap(rb, change->data.snapshot,
1744 												  txn, command_id);
1745 					}
1746 
1747 					/*
1748 					 * Restored from disk, need to be careful not to double
1749 					 * free. We could introduce refcounting for that, but for
1750 					 * now this seems infrequent enough not to care.
1751 					 */
1752 					else if (change->data.snapshot->copied)
1753 					{
1754 						snapshot_now =
1755 							ReorderBufferCopySnap(rb, change->data.snapshot,
1756 												  txn, command_id);
1757 					}
1758 					else
1759 					{
1760 						snapshot_now = change->data.snapshot;
1761 					}
1762 
1763 
1764 					/* and continue with the new one */
1765 					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1766 					break;
1767 
1768 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1769 					Assert(change->data.command_id != InvalidCommandId);
1770 
1771 					if (command_id < change->data.command_id)
1772 					{
1773 						command_id = change->data.command_id;
1774 
1775 						if (!snapshot_now->copied)
1776 						{
1777 							/* we don't use the global one anymore */
1778 							snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1779 																 txn, command_id);
1780 						}
1781 
1782 						snapshot_now->curcid = command_id;
1783 
1784 						TeardownHistoricSnapshot(false);
1785 						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1786 
1787 						/*
1788 						 * Every time the CommandId is incremented, we could
1789 						 * see new catalog contents, so execute all
1790 						 * invalidations.
1791 						 */
1792 						ReorderBufferExecuteInvalidations(rb, txn);
1793 					}
1794 
1795 					break;
1796 
1797 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1798 					elog(ERROR, "tuplecid value in changequeue");
1799 					break;
1800 			}
1801 		}
1802 
1803 		/* speculative insertion record must be freed by now */
1804 		Assert(!specinsert);
1805 
1806 		/* clean up the iterator */
1807 		ReorderBufferIterTXNFinish(rb, iterstate);
1808 		iterstate = NULL;
1809 
1810 		/* call commit callback */
1811 		rb->commit(rb, txn, commit_lsn);
1812 
1813 		/* this is just a sanity check against bad output plugin behaviour */
1814 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1815 			elog(ERROR, "output plugin used XID %u",
1816 				 GetCurrentTransactionId());
1817 
1818 		/* cleanup */
1819 		TeardownHistoricSnapshot(false);
1820 
1821 		/*
1822 		 * Aborting the current (sub-)transaction as a whole has the right
1823 		 * semantics. We want all locks acquired in here to be released, not
1824 		 * reassigned to the parent and we do not want any database access
1825 		 * have persistent effects.
1826 		 */
1827 		AbortCurrentTransaction();
1828 
1829 		/* make sure there's no cache pollution */
1830 		ReorderBufferExecuteInvalidations(rb, txn);
1831 
1832 		if (using_subtxn)
1833 			RollbackAndReleaseCurrentSubTransaction();
1834 
1835 		if (snapshot_now->copied)
1836 			ReorderBufferFreeSnap(rb, snapshot_now);
1837 
1838 		/* remove potential on-disk data, and deallocate */
1839 		ReorderBufferCleanupTXN(rb, txn);
1840 	}
1841 	PG_CATCH();
1842 	{
1843 		/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1844 		if (iterstate)
1845 			ReorderBufferIterTXNFinish(rb, iterstate);
1846 
1847 		TeardownHistoricSnapshot(true);
1848 
1849 		/*
1850 		 * Force cache invalidation to happen outside of a valid transaction
1851 		 * to prevent catalog access as we just caught an error.
1852 		 */
1853 		AbortCurrentTransaction();
1854 
1855 		/* make sure there's no cache pollution */
1856 		ReorderBufferExecuteInvalidations(rb, txn);
1857 
1858 		if (using_subtxn)
1859 			RollbackAndReleaseCurrentSubTransaction();
1860 
1861 		if (snapshot_now->copied)
1862 			ReorderBufferFreeSnap(rb, snapshot_now);
1863 
1864 		/* remove potential on-disk data, and deallocate */
1865 		ReorderBufferCleanupTXN(rb, txn);
1866 
1867 		PG_RE_THROW();
1868 	}
1869 	PG_END_TRY();
1870 }
1871 
1872 /*
1873  * Abort a transaction that possibly has previous changes. Needs to be first
1874  * called for subtransactions and then for the toplevel xid.
1875  *
1876  * NB: Transactions handled here have to have actively aborted (i.e. have
1877  * produced an abort record). Implicitly aborted transactions are handled via
1878  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1879  * which have committed are handled in ReorderBufferForget().
1880  *
1881  * This function purges this transaction and its contents from memory and
1882  * disk.
1883  */
1884 void
ReorderBufferAbort(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1885 ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1886 {
1887 	ReorderBufferTXN *txn;
1888 
1889 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1890 								false);
1891 
1892 	/* unknown, nothing to remove */
1893 	if (txn == NULL)
1894 		return;
1895 
1896 	/* cosmetic... */
1897 	txn->final_lsn = lsn;
1898 
1899 	/* remove potential on-disk data, and deallocate */
1900 	ReorderBufferCleanupTXN(rb, txn);
1901 }
1902 
1903 /*
1904  * Abort all transactions that aren't actually running anymore because the
1905  * server restarted.
1906  *
1907  * NB: These really have to be transactions that have aborted due to a server
1908  * crash/immediate restart, as we don't deal with invalidations here.
1909  */
1910 void
ReorderBufferAbortOld(ReorderBuffer * rb,TransactionId oldestRunningXid)1911 ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1912 {
1913 	dlist_mutable_iter it;
1914 
1915 	/*
1916 	 * Iterate through all (potential) toplevel TXNs and abort all that are
1917 	 * older than what possibly can be running. Once we've found the first
1918 	 * that is alive we stop, there might be some that acquired an xid earlier
1919 	 * but started writing later, but it's unlikely and they will be cleaned
1920 	 * up in a later call to this function.
1921 	 */
1922 	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1923 	{
1924 		ReorderBufferTXN *txn;
1925 
1926 		txn = dlist_container(ReorderBufferTXN, node, it.cur);
1927 
1928 		if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1929 		{
1930 			elog(DEBUG2, "aborting old transaction %u", txn->xid);
1931 
1932 			/* remove potential on-disk data, and deallocate this tx */
1933 			ReorderBufferCleanupTXN(rb, txn);
1934 		}
1935 		else
1936 			return;
1937 	}
1938 }
1939 
1940 /*
1941  * Forget the contents of a transaction if we aren't interested in it's
1942  * contents. Needs to be first called for subtransactions and then for the
1943  * toplevel xid.
1944  *
1945  * This is significantly different to ReorderBufferAbort() because
1946  * transactions that have committed need to be treated differently from aborted
1947  * ones since they may have modified the catalog.
1948  *
1949  * Note that this is only allowed to be called in the moment a transaction
1950  * commit has just been read, not earlier; otherwise later records referring
1951  * to this xid might re-create the transaction incompletely.
1952  */
1953 void
ReorderBufferForget(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)1954 ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1955 {
1956 	ReorderBufferTXN *txn;
1957 
1958 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1959 								false);
1960 
1961 	/* unknown, nothing to forget */
1962 	if (txn == NULL)
1963 		return;
1964 
1965 	/* cosmetic... */
1966 	txn->final_lsn = lsn;
1967 
1968 	/*
1969 	 * Process cache invalidation messages if there are any. Even if we're not
1970 	 * interested in the transaction's contents, it could have manipulated the
1971 	 * catalog and we need to update the caches according to that.
1972 	 */
1973 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1974 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1975 										   txn->invalidations);
1976 	else
1977 		Assert(txn->ninvalidations == 0);
1978 
1979 	/* remove potential on-disk data, and deallocate */
1980 	ReorderBufferCleanupTXN(rb, txn);
1981 }
1982 
1983 /*
1984  * Execute invalidations happening outside the context of a decoded
1985  * transaction. That currently happens either for xid-less commits
1986  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
1987  * transactions (via ReorderBufferForget()).
1988  */
1989 void
ReorderBufferImmediateInvalidation(ReorderBuffer * rb,uint32 ninvalidations,SharedInvalidationMessage * invalidations)1990 ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1991 								   SharedInvalidationMessage *invalidations)
1992 {
1993 	bool		use_subtxn = IsTransactionOrTransactionBlock();
1994 	int			i;
1995 
1996 	if (use_subtxn)
1997 		BeginInternalSubTransaction("replay");
1998 
1999 	/*
2000 	 * Force invalidations to happen outside of a valid transaction - that way
2001 	 * entries will just be marked as invalid without accessing the catalog.
2002 	 * That's advantageous because we don't need to setup the full state
2003 	 * necessary for catalog access.
2004 	 */
2005 	if (use_subtxn)
2006 		AbortCurrentTransaction();
2007 
2008 	for (i = 0; i < ninvalidations; i++)
2009 		LocalExecuteInvalidationMessage(&invalidations[i]);
2010 
2011 	if (use_subtxn)
2012 		RollbackAndReleaseCurrentSubTransaction();
2013 }
2014 
2015 /*
2016  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2017  * least once for every xid in XLogRecord->xl_xid (other places in records
2018  * may, but do not have to be passed through here).
2019  *
2020  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2021  * for efficiency. To do that it has to know about when transactions are seen
2022  * first in the WAL. As many types of records are not actually interesting for
2023  * logical decoding, they do not necessarily pass though here.
2024  */
2025 void
ReorderBufferProcessXid(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2026 ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2027 {
2028 	/* many records won't have an xid assigned, centralize check here */
2029 	if (xid != InvalidTransactionId)
2030 		ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2031 }
2032 
2033 /*
2034  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2035  * because the previous snapshot doesn't describe the catalog correctly for
2036  * following rows.
2037  */
2038 void
ReorderBufferAddSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2039 ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
2040 						 XLogRecPtr lsn, Snapshot snap)
2041 {
2042 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2043 
2044 	change->data.snapshot = snap;
2045 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
2046 
2047 	ReorderBufferQueueChange(rb, xid, lsn, change);
2048 }
2049 
2050 /*
2051  * Set up the transaction's base snapshot.
2052  *
2053  * If we know that xid is a subtransaction, set the base snapshot on the
2054  * top-level transaction instead.
2055  */
2056 void
ReorderBufferSetBaseSnapshot(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Snapshot snap)2057 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
2058 							 XLogRecPtr lsn, Snapshot snap)
2059 {
2060 	ReorderBufferTXN *txn;
2061 	bool		is_new;
2062 
2063 	AssertArg(snap != NULL);
2064 
2065 	/*
2066 	 * Fetch the transaction to operate on.  If we know it's a subtransaction,
2067 	 * operate on its top-level transaction instead.
2068 	 */
2069 	txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2070 	if (txn->is_known_as_subxact)
2071 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2072 									NULL, InvalidXLogRecPtr, false);
2073 	Assert(txn->base_snapshot == NULL);
2074 
2075 	txn->base_snapshot = snap;
2076 	txn->base_snapshot_lsn = lsn;
2077 	dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
2078 
2079 	AssertTXNLsnOrder(rb);
2080 }
2081 
2082 /*
2083  * Access the catalog with this CommandId at this point in the changestream.
2084  *
2085  * May only be called for command ids > 1
2086  */
2087 void
ReorderBufferAddNewCommandId(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,CommandId cid)2088 ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
2089 							 XLogRecPtr lsn, CommandId cid)
2090 {
2091 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2092 
2093 	change->data.command_id = cid;
2094 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
2095 
2096 	ReorderBufferQueueChange(rb, xid, lsn, change);
2097 }
2098 
2099 
2100 /*
2101  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2102  */
2103 void
ReorderBufferAddNewTupleCids(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,RelFileNode node,ItemPointerData tid,CommandId cmin,CommandId cmax,CommandId combocid)2104 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
2105 							 XLogRecPtr lsn, RelFileNode node,
2106 							 ItemPointerData tid, CommandId cmin,
2107 							 CommandId cmax, CommandId combocid)
2108 {
2109 	ReorderBufferChange *change = ReorderBufferGetChange(rb);
2110 	ReorderBufferTXN *txn;
2111 
2112 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2113 
2114 	change->data.tuplecid.node = node;
2115 	change->data.tuplecid.tid = tid;
2116 	change->data.tuplecid.cmin = cmin;
2117 	change->data.tuplecid.cmax = cmax;
2118 	change->data.tuplecid.combocid = combocid;
2119 	change->lsn = lsn;
2120 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
2121 
2122 	dlist_push_tail(&txn->tuplecids, &change->node);
2123 	txn->ntuplecids++;
2124 }
2125 
2126 /*
2127  * Setup the invalidation of the toplevel transaction.
2128  *
2129  * This needs to be done before ReorderBufferCommit is called!
2130  */
2131 void
ReorderBufferAddInvalidations(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn,Size nmsgs,SharedInvalidationMessage * msgs)2132 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
2133 							  XLogRecPtr lsn, Size nmsgs,
2134 							  SharedInvalidationMessage *msgs)
2135 {
2136 	ReorderBufferTXN *txn;
2137 
2138 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2139 
2140 	if (txn->ninvalidations != 0)
2141 		elog(ERROR, "only ever add one set of invalidations");
2142 
2143 	Assert(nmsgs > 0);
2144 
2145 	txn->ninvalidations = nmsgs;
2146 	txn->invalidations = (SharedInvalidationMessage *)
2147 		MemoryContextAlloc(rb->context,
2148 						   sizeof(SharedInvalidationMessage) * nmsgs);
2149 	memcpy(txn->invalidations, msgs,
2150 		   sizeof(SharedInvalidationMessage) * nmsgs);
2151 }
2152 
2153 /*
2154  * Apply all invalidations we know. Possibly we only need parts at this point
2155  * in the changestream but we don't know which those are.
2156  */
2157 static void
ReorderBufferExecuteInvalidations(ReorderBuffer * rb,ReorderBufferTXN * txn)2158 ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2159 {
2160 	int			i;
2161 
2162 	for (i = 0; i < txn->ninvalidations; i++)
2163 		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2164 }
2165 
2166 /*
2167  * Mark a transaction as containing catalog changes
2168  */
2169 void
ReorderBufferXidSetCatalogChanges(ReorderBuffer * rb,TransactionId xid,XLogRecPtr lsn)2170 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
2171 								  XLogRecPtr lsn)
2172 {
2173 	ReorderBufferTXN *txn;
2174 
2175 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2176 
2177 	txn->has_catalog_changes = true;
2178 }
2179 
2180 /*
2181  * Query whether a transaction is already *known* to contain catalog
2182  * changes. This can be wrong until directly before the commit!
2183  */
2184 bool
ReorderBufferXidHasCatalogChanges(ReorderBuffer * rb,TransactionId xid)2185 ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
2186 {
2187 	ReorderBufferTXN *txn;
2188 
2189 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2190 								false);
2191 	if (txn == NULL)
2192 		return false;
2193 
2194 	return txn->has_catalog_changes;
2195 }
2196 
2197 /*
2198  * ReorderBufferXidHasBaseSnapshot
2199  *		Have we already set the base snapshot for the given txn/subtxn?
2200  */
2201 bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer * rb,TransactionId xid)2202 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
2203 {
2204 	ReorderBufferTXN *txn;
2205 
2206 	txn = ReorderBufferTXNByXid(rb, xid, false,
2207 								NULL, InvalidXLogRecPtr, false);
2208 
2209 	/* transaction isn't known yet, ergo no snapshot */
2210 	if (txn == NULL)
2211 		return false;
2212 
2213 	/* a known subtxn? operate on top-level txn instead */
2214 	if (txn->is_known_as_subxact)
2215 		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2216 									NULL, InvalidXLogRecPtr, false);
2217 
2218 	return txn->base_snapshot != NULL;
2219 }
2220 
2221 
2222 /*
2223  * ---------------------------------------
2224  * Disk serialization support
2225  * ---------------------------------------
2226  */
2227 
2228 /*
2229  * Ensure the IO buffer is >= sz.
2230  */
2231 static void
ReorderBufferSerializeReserve(ReorderBuffer * rb,Size sz)2232 ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2233 {
2234 	if (!rb->outbufsize)
2235 	{
2236 		rb->outbuf = MemoryContextAlloc(rb->context, sz);
2237 		rb->outbufsize = sz;
2238 	}
2239 	else if (rb->outbufsize < sz)
2240 	{
2241 		rb->outbuf = repalloc(rb->outbuf, sz);
2242 		rb->outbufsize = sz;
2243 	}
2244 }
2245 
2246 /*
2247  * Check whether the transaction tx should spill its data to disk.
2248  */
2249 static void
ReorderBufferCheckSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2250 ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2251 {
2252 	/*
2253 	 * TODO: improve accounting so we cheaply can take subtransactions into
2254 	 * account here.
2255 	 */
2256 	if (txn->nentries_mem >= max_changes_in_memory)
2257 	{
2258 		ReorderBufferSerializeTXN(rb, txn);
2259 		Assert(txn->nentries_mem == 0);
2260 	}
2261 }
2262 
2263 /*
2264  * Spill data of a large transaction (and its subtransactions) to disk.
2265  */
2266 static void
ReorderBufferSerializeTXN(ReorderBuffer * rb,ReorderBufferTXN * txn)2267 ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2268 {
2269 	dlist_iter	subtxn_i;
2270 	dlist_mutable_iter change_i;
2271 	int			fd = -1;
2272 	XLogSegNo	curOpenSegNo = 0;
2273 	Size		spilled = 0;
2274 
2275 	elog(DEBUG2, "spill %u changes in XID %u to disk",
2276 		 (uint32) txn->nentries_mem, txn->xid);
2277 
2278 	/* do the same to all child TXs */
2279 	dlist_foreach(subtxn_i, &txn->subtxns)
2280 	{
2281 		ReorderBufferTXN *subtxn;
2282 
2283 		subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2284 		ReorderBufferSerializeTXN(rb, subtxn);
2285 	}
2286 
2287 	/* serialize changestream */
2288 	dlist_foreach_modify(change_i, &txn->changes)
2289 	{
2290 		ReorderBufferChange *change;
2291 
2292 		change = dlist_container(ReorderBufferChange, node, change_i.cur);
2293 
2294 		/*
2295 		 * store in segment in which it belongs by start lsn, don't split over
2296 		 * multiple segments tho
2297 		 */
2298 		if (fd == -1 ||
2299 			!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2300 		{
2301 			char		path[MAXPGPATH];
2302 
2303 			if (fd != -1)
2304 				CloseTransientFile(fd);
2305 
2306 			XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2307 
2308 			/*
2309 			 * No need to care about TLIs here, only used during a single run,
2310 			 * so each LSN only maps to a specific WAL record.
2311 			 */
2312 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2313 										curOpenSegNo);
2314 
2315 			/* open segment, create it if necessary */
2316 			fd = OpenTransientFile(path,
2317 								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2318 
2319 			if (fd < 0)
2320 				ereport(ERROR,
2321 						(errcode_for_file_access(),
2322 						 errmsg("could not open file \"%s\": %m", path)));
2323 		}
2324 
2325 		ReorderBufferSerializeChange(rb, txn, fd, change);
2326 		dlist_delete(&change->node);
2327 		ReorderBufferReturnChange(rb, change);
2328 
2329 		spilled++;
2330 	}
2331 
2332 	Assert(spilled == txn->nentries_mem);
2333 	Assert(dlist_is_empty(&txn->changes));
2334 	txn->nentries_mem = 0;
2335 	txn->serialized = true;
2336 
2337 	if (fd != -1)
2338 		CloseTransientFile(fd);
2339 }
2340 
2341 /*
2342  * Serialize individual change to disk.
2343  */
2344 static void
ReorderBufferSerializeChange(ReorderBuffer * rb,ReorderBufferTXN * txn,int fd,ReorderBufferChange * change)2345 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2346 							 int fd, ReorderBufferChange *change)
2347 {
2348 	ReorderBufferDiskChange *ondisk;
2349 	Size		sz = sizeof(ReorderBufferDiskChange);
2350 
2351 	ReorderBufferSerializeReserve(rb, sz);
2352 
2353 	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2354 	memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2355 
2356 	switch (change->action)
2357 	{
2358 			/* fall through these, they're all similar enough */
2359 		case REORDER_BUFFER_CHANGE_INSERT:
2360 		case REORDER_BUFFER_CHANGE_UPDATE:
2361 		case REORDER_BUFFER_CHANGE_DELETE:
2362 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2363 			{
2364 				char	   *data;
2365 				ReorderBufferTupleBuf *oldtup,
2366 						   *newtup;
2367 				Size		oldlen = 0;
2368 				Size		newlen = 0;
2369 
2370 				oldtup = change->data.tp.oldtuple;
2371 				newtup = change->data.tp.newtuple;
2372 
2373 				if (oldtup)
2374 				{
2375 					sz += sizeof(HeapTupleData);
2376 					oldlen = oldtup->tuple.t_len;
2377 					sz += oldlen;
2378 				}
2379 
2380 				if (newtup)
2381 				{
2382 					sz += sizeof(HeapTupleData);
2383 					newlen = newtup->tuple.t_len;
2384 					sz += newlen;
2385 				}
2386 
2387 				/* make sure we have enough space */
2388 				ReorderBufferSerializeReserve(rb, sz);
2389 
2390 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2391 				/* might have been reallocated above */
2392 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2393 
2394 				if (oldlen)
2395 				{
2396 					memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2397 					data += sizeof(HeapTupleData);
2398 
2399 					memcpy(data, oldtup->tuple.t_data, oldlen);
2400 					data += oldlen;
2401 				}
2402 
2403 				if (newlen)
2404 				{
2405 					memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2406 					data += sizeof(HeapTupleData);
2407 
2408 					memcpy(data, newtup->tuple.t_data, newlen);
2409 					data += newlen;
2410 				}
2411 				break;
2412 			}
2413 		case REORDER_BUFFER_CHANGE_MESSAGE:
2414 			{
2415 				char	   *data;
2416 				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
2417 
2418 				sz += prefix_size + change->data.msg.message_size +
2419 					sizeof(Size) + sizeof(Size);
2420 				ReorderBufferSerializeReserve(rb, sz);
2421 
2422 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2423 
2424 				/* might have been reallocated above */
2425 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2426 
2427 				/* write the prefix including the size */
2428 				memcpy(data, &prefix_size, sizeof(Size));
2429 				data += sizeof(Size);
2430 				memcpy(data, change->data.msg.prefix,
2431 					   prefix_size);
2432 				data += prefix_size;
2433 
2434 				/* write the message including the size */
2435 				memcpy(data, &change->data.msg.message_size, sizeof(Size));
2436 				data += sizeof(Size);
2437 				memcpy(data, change->data.msg.message,
2438 					   change->data.msg.message_size);
2439 				data += change->data.msg.message_size;
2440 
2441 				break;
2442 			}
2443 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2444 			{
2445 				Snapshot	snap;
2446 				char	   *data;
2447 
2448 				snap = change->data.snapshot;
2449 
2450 				sz += sizeof(SnapshotData) +
2451 					sizeof(TransactionId) * snap->xcnt +
2452 					sizeof(TransactionId) * snap->subxcnt;
2453 
2454 				/* make sure we have enough space */
2455 				ReorderBufferSerializeReserve(rb, sz);
2456 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2457 				/* might have been reallocated above */
2458 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2459 
2460 				memcpy(data, snap, sizeof(SnapshotData));
2461 				data += sizeof(SnapshotData);
2462 
2463 				if (snap->xcnt)
2464 				{
2465 					memcpy(data, snap->xip,
2466 						   sizeof(TransactionId) * snap->xcnt);
2467 					data += sizeof(TransactionId) * snap->xcnt;
2468 				}
2469 
2470 				if (snap->subxcnt)
2471 				{
2472 					memcpy(data, snap->subxip,
2473 						   sizeof(TransactionId) * snap->subxcnt);
2474 					data += sizeof(TransactionId) * snap->subxcnt;
2475 				}
2476 				break;
2477 			}
2478 		case REORDER_BUFFER_CHANGE_TRUNCATE:
2479 			{
2480 				Size	size;
2481 				char   *data;
2482 
2483 				/* account for the OIDs of truncated relations */
2484 				size = sizeof(Oid) * change->data.truncate.nrelids;
2485 				sz += size;
2486 
2487 				/* make sure we have enough space */
2488 				ReorderBufferSerializeReserve(rb, sz);
2489 
2490 				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2491 				/* might have been reallocated above */
2492 				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2493 
2494 				memcpy(data, change->data.truncate.relids, size);
2495 				data += size;
2496 
2497 				break;
2498 			}
2499 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2500 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2501 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2502 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2503 			/* ReorderBufferChange contains everything important */
2504 			break;
2505 	}
2506 
2507 	ondisk->size = sz;
2508 
2509 	errno = 0;
2510 	pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2511 	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2512 	{
2513 		int			save_errno = errno;
2514 
2515 		CloseTransientFile(fd);
2516 
2517 		/* if write didn't set errno, assume problem is no disk space */
2518 		errno = save_errno ? save_errno : ENOSPC;
2519 		ereport(ERROR,
2520 				(errcode_for_file_access(),
2521 				 errmsg("could not write to data file for XID %u: %m",
2522 						txn->xid)));
2523 	}
2524 	pgstat_report_wait_end();
2525 
2526 	/*
2527 	 * Keep the transaction's final_lsn up to date with each change we send to
2528 	 * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
2529 	 * only do this on commit and abort records, but that doesn't work if a
2530 	 * system crash leaves a transaction without its abort record).
2531 	 *
2532 	 * Make sure not to move it backwards.
2533 	 */
2534 	if (txn->final_lsn < change->lsn)
2535 		txn->final_lsn = change->lsn;
2536 
2537 	Assert(ondisk->change.action == change->action);
2538 }
2539 
2540 /*
2541  * Restore a number of changes spilled to disk back into memory.
2542  */
2543 static Size
ReorderBufferRestoreChanges(ReorderBuffer * rb,ReorderBufferTXN * txn,File * fd,XLogSegNo * segno)2544 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2545 							File *fd, XLogSegNo *segno)
2546 {
2547 	Size		restored = 0;
2548 	XLogSegNo	last_segno;
2549 	dlist_mutable_iter cleanup_iter;
2550 
2551 	Assert(txn->first_lsn != InvalidXLogRecPtr);
2552 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2553 
2554 	/* free current entries, so we have memory for more */
2555 	dlist_foreach_modify(cleanup_iter, &txn->changes)
2556 	{
2557 		ReorderBufferChange *cleanup =
2558 		dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2559 
2560 		dlist_delete(&cleanup->node);
2561 		ReorderBufferReturnChange(rb, cleanup);
2562 	}
2563 	txn->nentries_mem = 0;
2564 	Assert(dlist_is_empty(&txn->changes));
2565 
2566 	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2567 
2568 	while (restored < max_changes_in_memory && *segno <= last_segno)
2569 	{
2570 		int			readBytes;
2571 		ReorderBufferDiskChange *ondisk;
2572 
2573 		if (*fd == -1)
2574 		{
2575 			char		path[MAXPGPATH];
2576 
2577 			/* first time in */
2578 			if (*segno == 0)
2579 				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2580 
2581 			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2582 
2583 			/*
2584 			 * No need to care about TLIs here, only used during a single run,
2585 			 * so each LSN only maps to a specific WAL record.
2586 			 */
2587 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2588 										*segno);
2589 
2590 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
2591 			if (*fd < 0 && errno == ENOENT)
2592 			{
2593 				*fd = -1;
2594 				(*segno)++;
2595 				continue;
2596 			}
2597 			else if (*fd < 0)
2598 				ereport(ERROR,
2599 						(errcode_for_file_access(),
2600 						 errmsg("could not open file \"%s\": %m",
2601 								path)));
2602 		}
2603 
2604 		/*
2605 		 * Read the statically sized part of a change which has information
2606 		 * about the total size. If we couldn't read a record, we're at the
2607 		 * end of this file.
2608 		 */
2609 		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2610 		readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange),
2611 							 WAIT_EVENT_REORDER_BUFFER_READ);
2612 
2613 		/* eof */
2614 		if (readBytes == 0)
2615 		{
2616 			FileClose(*fd);
2617 			*fd = -1;
2618 			(*segno)++;
2619 			continue;
2620 		}
2621 		else if (readBytes < 0)
2622 			ereport(ERROR,
2623 					(errcode_for_file_access(),
2624 					 errmsg("could not read from reorderbuffer spill file: %m")));
2625 		else if (readBytes != sizeof(ReorderBufferDiskChange))
2626 			ereport(ERROR,
2627 					(errcode_for_file_access(),
2628 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2629 							readBytes,
2630 							(uint32) sizeof(ReorderBufferDiskChange))));
2631 
2632 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2633 
2634 		ReorderBufferSerializeReserve(rb,
2635 									  sizeof(ReorderBufferDiskChange) + ondisk->size);
2636 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2637 
2638 		readBytes = FileRead(*fd,
2639 							 rb->outbuf + sizeof(ReorderBufferDiskChange),
2640 							 ondisk->size - sizeof(ReorderBufferDiskChange),
2641 							 WAIT_EVENT_REORDER_BUFFER_READ);
2642 
2643 		if (readBytes < 0)
2644 			ereport(ERROR,
2645 					(errcode_for_file_access(),
2646 					 errmsg("could not read from reorderbuffer spill file: %m")));
2647 		else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2648 			ereport(ERROR,
2649 					(errcode_for_file_access(),
2650 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2651 							readBytes,
2652 							(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2653 
2654 		/*
2655 		 * ok, read a full change from disk, now restore it into proper
2656 		 * in-memory format
2657 		 */
2658 		ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2659 		restored++;
2660 	}
2661 
2662 	return restored;
2663 }
2664 
2665 /*
2666  * Convert change from its on-disk format to in-memory format and queue it onto
2667  * the TXN's ->changes list.
2668  *
2669  * Note: although "data" is declared char*, at entry it points to a
2670  * maxalign'd buffer, making it safe in most of this function to assume
2671  * that the pointed-to data is suitably aligned for direct access.
2672  */
2673 static void
ReorderBufferRestoreChange(ReorderBuffer * rb,ReorderBufferTXN * txn,char * data)2674 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2675 						   char *data)
2676 {
2677 	ReorderBufferDiskChange *ondisk;
2678 	ReorderBufferChange *change;
2679 
2680 	ondisk = (ReorderBufferDiskChange *) data;
2681 
2682 	change = ReorderBufferGetChange(rb);
2683 
2684 	/* copy static part */
2685 	memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2686 
2687 	data += sizeof(ReorderBufferDiskChange);
2688 
2689 	/* restore individual stuff */
2690 	switch (change->action)
2691 	{
2692 			/* fall through these, they're all similar enough */
2693 		case REORDER_BUFFER_CHANGE_INSERT:
2694 		case REORDER_BUFFER_CHANGE_UPDATE:
2695 		case REORDER_BUFFER_CHANGE_DELETE:
2696 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2697 			if (change->data.tp.oldtuple)
2698 			{
2699 				uint32		tuplelen = ((HeapTuple) data)->t_len;
2700 
2701 				change->data.tp.oldtuple =
2702 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2703 
2704 				/* restore ->tuple */
2705 				memcpy(&change->data.tp.oldtuple->tuple, data,
2706 					   sizeof(HeapTupleData));
2707 				data += sizeof(HeapTupleData);
2708 
2709 				/* reset t_data pointer into the new tuplebuf */
2710 				change->data.tp.oldtuple->tuple.t_data =
2711 					ReorderBufferTupleBufData(change->data.tp.oldtuple);
2712 
2713 				/* restore tuple data itself */
2714 				memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2715 				data += tuplelen;
2716 			}
2717 
2718 			if (change->data.tp.newtuple)
2719 			{
2720 				/* here, data might not be suitably aligned! */
2721 				uint32		tuplelen;
2722 
2723 				memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2724 					   sizeof(uint32));
2725 
2726 				change->data.tp.newtuple =
2727 					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2728 
2729 				/* restore ->tuple */
2730 				memcpy(&change->data.tp.newtuple->tuple, data,
2731 					   sizeof(HeapTupleData));
2732 				data += sizeof(HeapTupleData);
2733 
2734 				/* reset t_data pointer into the new tuplebuf */
2735 				change->data.tp.newtuple->tuple.t_data =
2736 					ReorderBufferTupleBufData(change->data.tp.newtuple);
2737 
2738 				/* restore tuple data itself */
2739 				memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2740 				data += tuplelen;
2741 			}
2742 
2743 			break;
2744 		case REORDER_BUFFER_CHANGE_MESSAGE:
2745 			{
2746 				Size		prefix_size;
2747 
2748 				/* read prefix */
2749 				memcpy(&prefix_size, data, sizeof(Size));
2750 				data += sizeof(Size);
2751 				change->data.msg.prefix = MemoryContextAlloc(rb->context,
2752 															 prefix_size);
2753 				memcpy(change->data.msg.prefix, data, prefix_size);
2754 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2755 				data += prefix_size;
2756 
2757 				/* read the message */
2758 				memcpy(&change->data.msg.message_size, data, sizeof(Size));
2759 				data += sizeof(Size);
2760 				change->data.msg.message = MemoryContextAlloc(rb->context,
2761 															  change->data.msg.message_size);
2762 				memcpy(change->data.msg.message, data,
2763 					   change->data.msg.message_size);
2764 				data += change->data.msg.message_size;
2765 
2766 				break;
2767 			}
2768 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2769 			{
2770 				Snapshot	oldsnap;
2771 				Snapshot	newsnap;
2772 				Size		size;
2773 
2774 				oldsnap = (Snapshot) data;
2775 
2776 				size = sizeof(SnapshotData) +
2777 					sizeof(TransactionId) * oldsnap->xcnt +
2778 					sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2779 
2780 				change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2781 
2782 				newsnap = change->data.snapshot;
2783 
2784 				memcpy(newsnap, data, size);
2785 				newsnap->xip = (TransactionId *)
2786 					(((char *) newsnap) + sizeof(SnapshotData));
2787 				newsnap->subxip = newsnap->xip + newsnap->xcnt;
2788 				newsnap->copied = true;
2789 				break;
2790 			}
2791 			/* the base struct contains all the data, easy peasy */
2792 		case REORDER_BUFFER_CHANGE_TRUNCATE:
2793 			{
2794 				Oid	   *relids;
2795 
2796 				relids = ReorderBufferGetRelids(rb,
2797 												change->data.truncate.nrelids);
2798 				memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
2799 				change->data.truncate.relids = relids;
2800 
2801 				break;
2802 			}
2803 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2804 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2805 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2806 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2807 			break;
2808 	}
2809 
2810 	dlist_push_tail(&txn->changes, &change->node);
2811 	txn->nentries_mem++;
2812 }
2813 
2814 /*
2815  * Remove all on-disk stored for the passed in transaction.
2816  */
2817 static void
ReorderBufferRestoreCleanup(ReorderBuffer * rb,ReorderBufferTXN * txn)2818 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2819 {
2820 	XLogSegNo	first;
2821 	XLogSegNo	cur;
2822 	XLogSegNo	last;
2823 
2824 	Assert(txn->first_lsn != InvalidXLogRecPtr);
2825 	Assert(txn->final_lsn != InvalidXLogRecPtr);
2826 
2827 	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2828 	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2829 
2830 	/* iterate over all possible filenames, and delete them */
2831 	for (cur = first; cur <= last; cur++)
2832 	{
2833 		char		path[MAXPGPATH];
2834 
2835 		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
2836 		if (unlink(path) != 0 && errno != ENOENT)
2837 			ereport(ERROR,
2838 					(errcode_for_file_access(),
2839 					 errmsg("could not remove file \"%s\": %m", path)));
2840 	}
2841 }
2842 
2843 /*
2844  * Remove any leftover serialized reorder buffers from a slot directory after a
2845  * prior crash or decoding session exit.
2846  */
2847 static void
ReorderBufferCleanupSerializedTXNs(const char * slotname)2848 ReorderBufferCleanupSerializedTXNs(const char *slotname)
2849 {
2850 	DIR		   *spill_dir;
2851 	struct dirent *spill_de;
2852 	struct stat statbuf;
2853 	char		path[MAXPGPATH * 2 + 12];
2854 
2855 	sprintf(path, "pg_replslot/%s", slotname);
2856 
2857 	/* we're only handling directories here, skip if it's not ours */
2858 	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2859 		return;
2860 
2861 	spill_dir = AllocateDir(path);
2862 	while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2863 	{
2864 		/* only look at names that can be ours */
2865 		if (strncmp(spill_de->d_name, "xid", 3) == 0)
2866 		{
2867 			snprintf(path, sizeof(path),
2868 					 "pg_replslot/%s/%s", slotname,
2869 					 spill_de->d_name);
2870 
2871 			if (unlink(path) != 0)
2872 				ereport(ERROR,
2873 						(errcode_for_file_access(),
2874 						 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2875 								path, slotname)));
2876 		}
2877 	}
2878 	FreeDir(spill_dir);
2879 }
2880 
2881 /*
2882  * Given a replication slot, transaction ID and segment number, fill in the
2883  * corresponding spill file into 'path', which is a caller-owned buffer of size
2884  * at least MAXPGPATH.
2885  */
2886 static void
ReorderBufferSerializedPath(char * path,ReplicationSlot * slot,TransactionId xid,XLogSegNo segno)2887 ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2888 							XLogSegNo segno)
2889 {
2890 	XLogRecPtr	recptr;
2891 
2892 	XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
2893 
2894 	snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2895 			 NameStr(MyReplicationSlot->data.name),
2896 			 xid,
2897 			 (uint32) (recptr >> 32), (uint32) recptr);
2898 }
2899 
2900 /*
2901  * Delete all data spilled to disk after we've restarted/crashed. It will be
2902  * recreated when the respective slots are reused.
2903  */
2904 void
StartupReorderBuffer(void)2905 StartupReorderBuffer(void)
2906 {
2907 	DIR		   *logical_dir;
2908 	struct dirent *logical_de;
2909 
2910 	logical_dir = AllocateDir("pg_replslot");
2911 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2912 	{
2913 		if (strcmp(logical_de->d_name, ".") == 0 ||
2914 			strcmp(logical_de->d_name, "..") == 0)
2915 			continue;
2916 
2917 		/* if it cannot be a slot, skip the directory */
2918 		if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2919 			continue;
2920 
2921 		/*
2922 		 * ok, has to be a surviving logical slot, iterate and delete
2923 		 * everything starting with xid-*
2924 		 */
2925 		ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
2926 	}
2927 	FreeDir(logical_dir);
2928 }
2929 
2930 /* ---------------------------------------
2931  * toast reassembly support
2932  * ---------------------------------------
2933  */
2934 
2935 /*
2936  * Initialize per tuple toast reconstruction support.
2937  */
2938 static void
ReorderBufferToastInitHash(ReorderBuffer * rb,ReorderBufferTXN * txn)2939 ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2940 {
2941 	HASHCTL		hash_ctl;
2942 
2943 	Assert(txn->toast_hash == NULL);
2944 
2945 	memset(&hash_ctl, 0, sizeof(hash_ctl));
2946 	hash_ctl.keysize = sizeof(Oid);
2947 	hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2948 	hash_ctl.hcxt = rb->context;
2949 	txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2950 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2951 }
2952 
2953 /*
2954  * Per toast-chunk handling for toast reconstruction
2955  *
2956  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2957  * toasted Datum comes along.
2958  */
2959 static void
ReorderBufferToastAppendChunk(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)2960 ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2961 							  Relation relation, ReorderBufferChange *change)
2962 {
2963 	ReorderBufferToastEnt *ent;
2964 	ReorderBufferTupleBuf *newtup;
2965 	bool		found;
2966 	int32		chunksize;
2967 	bool		isnull;
2968 	Pointer		chunk;
2969 	TupleDesc	desc = RelationGetDescr(relation);
2970 	Oid			chunk_id;
2971 	int32		chunk_seq;
2972 
2973 	if (txn->toast_hash == NULL)
2974 		ReorderBufferToastInitHash(rb, txn);
2975 
2976 	Assert(IsToastRelation(relation));
2977 
2978 	newtup = change->data.tp.newtuple;
2979 	chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2980 	Assert(!isnull);
2981 	chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2982 	Assert(!isnull);
2983 
2984 	ent = (ReorderBufferToastEnt *)
2985 		hash_search(txn->toast_hash,
2986 					(void *) &chunk_id,
2987 					HASH_ENTER,
2988 					&found);
2989 
2990 	if (!found)
2991 	{
2992 		Assert(ent->chunk_id == chunk_id);
2993 		ent->num_chunks = 0;
2994 		ent->last_chunk_seq = 0;
2995 		ent->size = 0;
2996 		ent->reconstructed = NULL;
2997 		dlist_init(&ent->chunks);
2998 
2999 		if (chunk_seq != 0)
3000 			elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3001 				 chunk_seq, chunk_id);
3002 	}
3003 	else if (found && chunk_seq != ent->last_chunk_seq + 1)
3004 		elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3005 			 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3006 
3007 	chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3008 	Assert(!isnull);
3009 
3010 	/* calculate size so we can allocate the right size at once later */
3011 	if (!VARATT_IS_EXTENDED(chunk))
3012 		chunksize = VARSIZE(chunk) - VARHDRSZ;
3013 	else if (VARATT_IS_SHORT(chunk))
3014 		/* could happen due to heap_form_tuple doing its thing */
3015 		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3016 	else
3017 		elog(ERROR, "unexpected type of toast chunk");
3018 
3019 	ent->size += chunksize;
3020 	ent->last_chunk_seq = chunk_seq;
3021 	ent->num_chunks++;
3022 	dlist_push_tail(&ent->chunks, &change->node);
3023 }
3024 
3025 /*
3026  * Rejigger change->newtuple to point to in-memory toast tuples instead to
3027  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3028  *
3029  * We cannot replace unchanged toast tuples though, so those will still point
3030  * to on-disk toast data.
3031  */
3032 static void
ReorderBufferToastReplace(ReorderBuffer * rb,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)3033 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
3034 						  Relation relation, ReorderBufferChange *change)
3035 {
3036 	TupleDesc	desc;
3037 	int			natt;
3038 	Datum	   *attrs;
3039 	bool	   *isnull;
3040 	bool	   *free;
3041 	HeapTuple	tmphtup;
3042 	Relation	toast_rel;
3043 	TupleDesc	toast_desc;
3044 	MemoryContext oldcontext;
3045 	ReorderBufferTupleBuf *newtup;
3046 
3047 	/* no toast tuples changed */
3048 	if (txn->toast_hash == NULL)
3049 		return;
3050 
3051 	oldcontext = MemoryContextSwitchTo(rb->context);
3052 
3053 	/* we should only have toast tuples in an INSERT or UPDATE */
3054 	Assert(change->data.tp.newtuple);
3055 
3056 	desc = RelationGetDescr(relation);
3057 
3058 	toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3059 	if (!RelationIsValid(toast_rel))
3060 		elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
3061 			 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
3062 
3063 	toast_desc = RelationGetDescr(toast_rel);
3064 
3065 	/* should we allocate from stack instead? */
3066 	attrs = palloc0(sizeof(Datum) * desc->natts);
3067 	isnull = palloc0(sizeof(bool) * desc->natts);
3068 	free = palloc0(sizeof(bool) * desc->natts);
3069 
3070 	newtup = change->data.tp.newtuple;
3071 
3072 	heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3073 
3074 	for (natt = 0; natt < desc->natts; natt++)
3075 	{
3076 		Form_pg_attribute attr = TupleDescAttr(desc, natt);
3077 		ReorderBufferToastEnt *ent;
3078 		struct varlena *varlena;
3079 
3080 		/* va_rawsize is the size of the original datum -- including header */
3081 		struct varatt_external toast_pointer;
3082 		struct varatt_indirect redirect_pointer;
3083 		struct varlena *new_datum = NULL;
3084 		struct varlena *reconstructed;
3085 		dlist_iter	it;
3086 		Size		data_done = 0;
3087 
3088 		/* system columns aren't toasted */
3089 		if (attr->attnum < 0)
3090 			continue;
3091 
3092 		if (attr->attisdropped)
3093 			continue;
3094 
3095 		/* not a varlena datatype */
3096 		if (attr->attlen != -1)
3097 			continue;
3098 
3099 		/* no data */
3100 		if (isnull[natt])
3101 			continue;
3102 
3103 		/* ok, we know we have a toast datum */
3104 		varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3105 
3106 		/* no need to do anything if the tuple isn't external */
3107 		if (!VARATT_IS_EXTERNAL(varlena))
3108 			continue;
3109 
3110 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3111 
3112 		/*
3113 		 * Check whether the toast tuple changed, replace if so.
3114 		 */
3115 		ent = (ReorderBufferToastEnt *)
3116 			hash_search(txn->toast_hash,
3117 						(void *) &toast_pointer.va_valueid,
3118 						HASH_FIND,
3119 						NULL);
3120 		if (ent == NULL)
3121 			continue;
3122 
3123 		new_datum =
3124 			(struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3125 
3126 		free[natt] = true;
3127 
3128 		reconstructed = palloc0(toast_pointer.va_rawsize);
3129 
3130 		ent->reconstructed = reconstructed;
3131 
3132 		/* stitch toast tuple back together from its parts */
3133 		dlist_foreach(it, &ent->chunks)
3134 		{
3135 			bool		isnull;
3136 			ReorderBufferChange *cchange;
3137 			ReorderBufferTupleBuf *ctup;
3138 			Pointer		chunk;
3139 
3140 			cchange = dlist_container(ReorderBufferChange, node, it.cur);
3141 			ctup = cchange->data.tp.newtuple;
3142 			chunk = DatumGetPointer(
3143 									fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3144 
3145 			Assert(!isnull);
3146 			Assert(!VARATT_IS_EXTERNAL(chunk));
3147 			Assert(!VARATT_IS_SHORT(chunk));
3148 
3149 			memcpy(VARDATA(reconstructed) + data_done,
3150 				   VARDATA(chunk),
3151 				   VARSIZE(chunk) - VARHDRSZ);
3152 			data_done += VARSIZE(chunk) - VARHDRSZ;
3153 		}
3154 		Assert(data_done == toast_pointer.va_extsize);
3155 
3156 		/* make sure its marked as compressed or not */
3157 		if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3158 			SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3159 		else
3160 			SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3161 
3162 		memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3163 		redirect_pointer.pointer = reconstructed;
3164 
3165 		SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
3166 		memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3167 			   sizeof(redirect_pointer));
3168 
3169 		attrs[natt] = PointerGetDatum(new_datum);
3170 	}
3171 
3172 	/*
3173 	 * Build tuple in separate memory & copy tuple back into the tuplebuf
3174 	 * passed to the output plugin. We can't directly heap_fill_tuple() into
3175 	 * the tuplebuf because attrs[] will point back into the current content.
3176 	 */
3177 	tmphtup = heap_form_tuple(desc, attrs, isnull);
3178 	Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3179 	Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3180 
3181 	memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3182 	newtup->tuple.t_len = tmphtup->t_len;
3183 
3184 	/*
3185 	 * free resources we won't further need, more persistent stuff will be
3186 	 * free'd in ReorderBufferToastReset().
3187 	 */
3188 	RelationClose(toast_rel);
3189 	pfree(tmphtup);
3190 	for (natt = 0; natt < desc->natts; natt++)
3191 	{
3192 		if (free[natt])
3193 			pfree(DatumGetPointer(attrs[natt]));
3194 	}
3195 	pfree(attrs);
3196 	pfree(free);
3197 	pfree(isnull);
3198 
3199 	MemoryContextSwitchTo(oldcontext);
3200 }
3201 
3202 /*
3203  * Free all resources allocated for toast reconstruction.
3204  */
3205 static void
ReorderBufferToastReset(ReorderBuffer * rb,ReorderBufferTXN * txn)3206 ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
3207 {
3208 	HASH_SEQ_STATUS hstat;
3209 	ReorderBufferToastEnt *ent;
3210 
3211 	if (txn->toast_hash == NULL)
3212 		return;
3213 
3214 	/* sequentially walk over the hash and free everything */
3215 	hash_seq_init(&hstat, txn->toast_hash);
3216 	while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3217 	{
3218 		dlist_mutable_iter it;
3219 
3220 		if (ent->reconstructed != NULL)
3221 			pfree(ent->reconstructed);
3222 
3223 		dlist_foreach_modify(it, &ent->chunks)
3224 		{
3225 			ReorderBufferChange *change =
3226 			dlist_container(ReorderBufferChange, node, it.cur);
3227 
3228 			dlist_delete(&change->node);
3229 			ReorderBufferReturnChange(rb, change);
3230 		}
3231 	}
3232 
3233 	hash_destroy(txn->toast_hash);
3234 	txn->toast_hash = NULL;
3235 }
3236 
3237 
3238 /* ---------------------------------------
3239  * Visibility support for logical decoding
3240  *
3241  *
3242  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3243  * always rely on stored cmin/cmax values because of two scenarios:
3244  *
3245  * * A tuple got changed multiple times during a single transaction and thus
3246  *	 has got a combocid. Combocid's are only valid for the duration of a
3247  *	 single transaction.
3248  * * A tuple with a cmin but no cmax (and thus no combocid) got
3249  *	 deleted/updated in another transaction than the one which created it
3250  *	 which we are looking at right now. As only one of cmin, cmax or combocid
3251  *	 is actually stored in the heap we don't have access to the value we
3252  *	 need anymore.
3253  *
3254  * To resolve those problems we have a per-transaction hash of (cmin,
3255  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3256  * (cmin, cmax) values. That also takes care of combocids by simply
3257  * not caring about them at all. As we have the real cmin/cmax values
3258  * combocids aren't interesting.
3259  *
3260  * As we only care about catalog tuples here the overhead of this
3261  * hashtable should be acceptable.
3262  *
3263  * Heap rewrites complicate this a bit, check rewriteheap.c for
3264  * details.
3265  * -------------------------------------------------------------------------
3266  */
3267 
3268 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
3269 typedef struct RewriteMappingFile
3270 {
3271 	XLogRecPtr	lsn;
3272 	char		fname[MAXPGPATH];
3273 } RewriteMappingFile;
3274 
3275 #if NOT_USED
3276 static void
DisplayMapping(HTAB * tuplecid_data)3277 DisplayMapping(HTAB *tuplecid_data)
3278 {
3279 	HASH_SEQ_STATUS hstat;
3280 	ReorderBufferTupleCidEnt *ent;
3281 
3282 	hash_seq_init(&hstat, tuplecid_data);
3283 	while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3284 	{
3285 		elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3286 			 ent->key.relnode.dbNode,
3287 			 ent->key.relnode.spcNode,
3288 			 ent->key.relnode.relNode,
3289 			 ItemPointerGetBlockNumber(&ent->key.tid),
3290 			 ItemPointerGetOffsetNumber(&ent->key.tid),
3291 			 ent->cmin,
3292 			 ent->cmax
3293 			);
3294 	}
3295 }
3296 #endif
3297 
3298 /*
3299  * Apply a single mapping file to tuplecid_data.
3300  *
3301  * The mapping file has to have been verified to be a) committed b) for our
3302  * transaction c) applied in LSN order.
3303  */
3304 static void
ApplyLogicalMappingFile(HTAB * tuplecid_data,Oid relid,const char * fname)3305 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3306 {
3307 	char		path[MAXPGPATH];
3308 	int			fd;
3309 	int			readBytes;
3310 	LogicalRewriteMappingData map;
3311 
3312 	sprintf(path, "pg_logical/mappings/%s", fname);
3313 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3314 	if (fd < 0)
3315 		ereport(ERROR,
3316 				(errcode_for_file_access(),
3317 				 errmsg("could not open file \"%s\": %m", path)));
3318 
3319 	while (true)
3320 	{
3321 		ReorderBufferTupleCidKey key;
3322 		ReorderBufferTupleCidEnt *ent;
3323 		ReorderBufferTupleCidEnt *new_ent;
3324 		bool		found;
3325 
3326 		/* be careful about padding */
3327 		memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3328 
3329 		/* read all mappings till the end of the file */
3330 		pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3331 		readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3332 		pgstat_report_wait_end();
3333 
3334 		if (readBytes < 0)
3335 			ereport(ERROR,
3336 					(errcode_for_file_access(),
3337 					 errmsg("could not read file \"%s\": %m",
3338 							path)));
3339 		else if (readBytes == 0)	/* EOF */
3340 			break;
3341 		else if (readBytes != sizeof(LogicalRewriteMappingData))
3342 			ereport(ERROR,
3343 					(errcode_for_file_access(),
3344 					 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3345 							path, readBytes,
3346 							(int32) sizeof(LogicalRewriteMappingData))));
3347 
3348 		key.relnode = map.old_node;
3349 		ItemPointerCopy(&map.old_tid,
3350 						&key.tid);
3351 
3352 
3353 		ent = (ReorderBufferTupleCidEnt *)
3354 			hash_search(tuplecid_data,
3355 						(void *) &key,
3356 						HASH_FIND,
3357 						NULL);
3358 
3359 		/* no existing mapping, no need to update */
3360 		if (!ent)
3361 			continue;
3362 
3363 		key.relnode = map.new_node;
3364 		ItemPointerCopy(&map.new_tid,
3365 						&key.tid);
3366 
3367 		new_ent = (ReorderBufferTupleCidEnt *)
3368 			hash_search(tuplecid_data,
3369 						(void *) &key,
3370 						HASH_ENTER,
3371 						&found);
3372 
3373 		if (found)
3374 		{
3375 			/*
3376 			 * Make sure the existing mapping makes sense. We sometime update
3377 			 * old records that did not yet have a cmax (e.g. pg_class' own
3378 			 * entry while rewriting it) during rewrites, so allow that.
3379 			 */
3380 			Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3381 			Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3382 		}
3383 		else
3384 		{
3385 			/* update mapping */
3386 			new_ent->cmin = ent->cmin;
3387 			new_ent->cmax = ent->cmax;
3388 			new_ent->combocid = ent->combocid;
3389 		}
3390 	}
3391 
3392 	CloseTransientFile(fd);
3393 }
3394 
3395 
3396 /*
3397  * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3398  */
3399 static bool
TransactionIdInArray(TransactionId xid,TransactionId * xip,Size num)3400 TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3401 {
3402 	return bsearch(&xid, xip, num,
3403 				   sizeof(TransactionId), xidComparator) != NULL;
3404 }
3405 
3406 /*
3407  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3408  */
3409 static int
file_sort_by_lsn(const void * a_p,const void * b_p)3410 file_sort_by_lsn(const void *a_p, const void *b_p)
3411 {
3412 	RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3413 	RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3414 
3415 	if (a->lsn < b->lsn)
3416 		return -1;
3417 	else if (a->lsn > b->lsn)
3418 		return 1;
3419 	return 0;
3420 }
3421 
3422 /*
3423  * Apply any existing logical remapping files if there are any targeted at our
3424  * transaction for relid.
3425  */
3426 static void
UpdateLogicalMappings(HTAB * tuplecid_data,Oid relid,Snapshot snapshot)3427 UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3428 {
3429 	DIR		   *mapping_dir;
3430 	struct dirent *mapping_de;
3431 	List	   *files = NIL;
3432 	ListCell   *file;
3433 	RewriteMappingFile **files_a;
3434 	size_t		off;
3435 	Oid			dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3436 
3437 	mapping_dir = AllocateDir("pg_logical/mappings");
3438 	while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3439 	{
3440 		Oid			f_dboid;
3441 		Oid			f_relid;
3442 		TransactionId f_mapped_xid;
3443 		TransactionId f_create_xid;
3444 		XLogRecPtr	f_lsn;
3445 		uint32		f_hi,
3446 					f_lo;
3447 		RewriteMappingFile *f;
3448 
3449 		if (strcmp(mapping_de->d_name, ".") == 0 ||
3450 			strcmp(mapping_de->d_name, "..") == 0)
3451 			continue;
3452 
3453 		/* Ignore files that aren't ours */
3454 		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3455 			continue;
3456 
3457 		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3458 				   &f_dboid, &f_relid, &f_hi, &f_lo,
3459 				   &f_mapped_xid, &f_create_xid) != 6)
3460 			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3461 
3462 		f_lsn = ((uint64) f_hi) << 32 | f_lo;
3463 
3464 		/* mapping for another database */
3465 		if (f_dboid != dboid)
3466 			continue;
3467 
3468 		/* mapping for another relation */
3469 		if (f_relid != relid)
3470 			continue;
3471 
3472 		/* did the creating transaction abort? */
3473 		if (!TransactionIdDidCommit(f_create_xid))
3474 			continue;
3475 
3476 		/* not for our transaction */
3477 		if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3478 			continue;
3479 
3480 		/* ok, relevant, queue for apply */
3481 		f = palloc(sizeof(RewriteMappingFile));
3482 		f->lsn = f_lsn;
3483 		strcpy(f->fname, mapping_de->d_name);
3484 		files = lappend(files, f);
3485 	}
3486 	FreeDir(mapping_dir);
3487 
3488 	/* build array we can easily sort */
3489 	files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3490 	off = 0;
3491 	foreach(file, files)
3492 	{
3493 		files_a[off++] = lfirst(file);
3494 	}
3495 
3496 	/* sort files so we apply them in LSN order */
3497 	qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3498 		  file_sort_by_lsn);
3499 
3500 	for (off = 0; off < list_length(files); off++)
3501 	{
3502 		RewriteMappingFile *f = files_a[off];
3503 
3504 		elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3505 			 snapshot->subxip[0]);
3506 		ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3507 		pfree(f);
3508 	}
3509 }
3510 
3511 /*
3512  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3513  * combocids.
3514  */
3515 bool
ResolveCminCmaxDuringDecoding(HTAB * tuplecid_data,Snapshot snapshot,HeapTuple htup,Buffer buffer,CommandId * cmin,CommandId * cmax)3516 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3517 							  Snapshot snapshot,
3518 							  HeapTuple htup, Buffer buffer,
3519 							  CommandId *cmin, CommandId *cmax)
3520 {
3521 	ReorderBufferTupleCidKey key;
3522 	ReorderBufferTupleCidEnt *ent;
3523 	ForkNumber	forkno;
3524 	BlockNumber blockno;
3525 	bool		updated_mapping = false;
3526 
3527 	/* be careful about padding */
3528 	memset(&key, 0, sizeof(key));
3529 
3530 	Assert(!BufferIsLocal(buffer));
3531 
3532 	/*
3533 	 * get relfilenode from the buffer, no convenient way to access it other
3534 	 * than that.
3535 	 */
3536 	BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3537 
3538 	/* tuples can only be in the main fork */
3539 	Assert(forkno == MAIN_FORKNUM);
3540 	Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3541 
3542 	ItemPointerCopy(&htup->t_self,
3543 					&key.tid);
3544 
3545 restart:
3546 	ent = (ReorderBufferTupleCidEnt *)
3547 		hash_search(tuplecid_data,
3548 					(void *) &key,
3549 					HASH_FIND,
3550 					NULL);
3551 
3552 	/*
3553 	 * failed to find a mapping, check whether the table was rewritten and
3554 	 * apply mapping if so, but only do that once - there can be no new
3555 	 * mappings while we are in here since we have to hold a lock on the
3556 	 * relation.
3557 	 */
3558 	if (ent == NULL && !updated_mapping)
3559 	{
3560 		UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3561 		/* now check but don't update for a mapping again */
3562 		updated_mapping = true;
3563 		goto restart;
3564 	}
3565 	else if (ent == NULL)
3566 		return false;
3567 
3568 	if (cmin)
3569 		*cmin = ent->cmin;
3570 	if (cmax)
3571 		*cmax = ent->cmax;
3572 	return true;
3573 }
3574