1 /*
2  * reorderbuffer.h
3  *	  PostgreSQL logical replay/reorder buffer management.
4  *
5  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
6  *
7  * src/include/replication/reorderbuffer.h
8  */
9 #ifndef REORDERBUFFER_H
10 #define REORDERBUFFER_H
11 
12 #include "access/htup_details.h"
13 #include "lib/ilist.h"
14 #include "storage/sinval.h"
15 #include "utils/hsearch.h"
16 #include "utils/relcache.h"
17 #include "utils/snapshot.h"
18 #include "utils/timestamp.h"
19 
20 extern PGDLLIMPORT int logical_decoding_work_mem;
21 
22 /* an individual tuple, stored in one chunk of memory */
23 typedef struct ReorderBufferTupleBuf
24 {
25 	/* position in preallocated list */
26 	slist_node	node;
27 
28 	/* tuple header, the interesting bit for users of logical decoding */
29 	HeapTupleData tuple;
30 
31 	/* pre-allocated size of tuple buffer, different from tuple size */
32 	Size		alloc_tuple_size;
33 
34 	/* actual tuple data follows */
35 } ReorderBufferTupleBuf;
36 
37 /* pointer to the data stored in a TupleBuf */
38 #define ReorderBufferTupleBufData(p) \
39 	((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
40 
41 /*
42  * Types of the change passed to a 'change' callback.
43  *
44  * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
45  * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
46  * changes. Users of the decoding facilities will never see changes with
47  * *_INTERNAL_* actions.
48  *
49  * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
50  * changes concern "speculative insertions", their confirmation, and abort
51  * respectively.  They're used by INSERT .. ON CONFLICT .. UPDATE.  Users of
52  * logical decoding don't have to care about these.
53  */
54 enum ReorderBufferChangeType
55 {
56 	REORDER_BUFFER_CHANGE_INSERT,
57 	REORDER_BUFFER_CHANGE_UPDATE,
58 	REORDER_BUFFER_CHANGE_DELETE,
59 	REORDER_BUFFER_CHANGE_MESSAGE,
60 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
61 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
62 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
63 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
64 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
65 	REORDER_BUFFER_CHANGE_TRUNCATE,
66 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
67 };
68 
69 /* forward declaration */
70 struct ReorderBufferTXN;
71 
72 /*
73  * a single 'change', can be an insert (with one tuple), an update (old, new),
74  * or a delete (old).
75  *
76  * The same struct is also used internally for other purposes but that should
77  * never be visible outside reorderbuffer.c.
78  */
79 typedef struct ReorderBufferChange
80 {
81 	XLogRecPtr	lsn;
82 
83 	/* The type of change. */
84 	enum ReorderBufferChangeType action;
85 
86 	/* Transaction this change belongs to. */
87 	struct ReorderBufferTXN *txn;
88 
89 	RepOriginId origin_id;
90 
91 	/*
92 	 * Context data for the change. Which part of the union is valid depends
93 	 * on action.
94 	 */
95 	union
96 	{
97 		/* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
98 		struct
99 		{
100 			/* relation that has been changed */
101 			RelFileNode relnode;
102 
103 			/* no previously reassembled toast chunks are necessary anymore */
104 			bool		clear_toast_afterwards;
105 
106 			/* valid for DELETE || UPDATE */
107 			ReorderBufferTupleBuf *oldtuple;
108 			/* valid for INSERT || UPDATE */
109 			ReorderBufferTupleBuf *newtuple;
110 		}			tp;
111 
112 		/*
113 		 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
114 		 * set of relations to be truncated.
115 		 */
116 		struct
117 		{
118 			Size		nrelids;
119 			bool		cascade;
120 			bool		restart_seqs;
121 			Oid		   *relids;
122 		}			truncate;
123 
124 		/* Message with arbitrary data. */
125 		struct
126 		{
127 			char	   *prefix;
128 			Size		message_size;
129 			char	   *message;
130 		}			msg;
131 
132 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
133 		Snapshot	snapshot;
134 
135 		/*
136 		 * New command id for existing snapshot in a catalog changing tx. Set
137 		 * when action == *_INTERNAL_COMMAND_ID.
138 		 */
139 		CommandId	command_id;
140 
141 		/*
142 		 * New cid mapping for catalog changing transaction, set when action
143 		 * == *_INTERNAL_TUPLECID.
144 		 */
145 		struct
146 		{
147 			RelFileNode node;
148 			ItemPointerData tid;
149 			CommandId	cmin;
150 			CommandId	cmax;
151 			CommandId	combocid;
152 		}			tuplecid;
153 	}			data;
154 
155 	/*
156 	 * While in use this is how a change is linked into a transactions,
157 	 * otherwise it's the preallocated list.
158 	 */
159 	dlist_node	node;
160 } ReorderBufferChange;
161 
162 /* ReorderBufferTXN txn_flags */
163 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
164 #define RBTXN_IS_SUBXACT          0x0002
165 #define RBTXN_IS_SERIALIZED       0x0004
166 
167 /* Does the transaction have catalog changes? */
168 #define rbtxn_has_catalog_changes(txn) \
169 ( \
170 	 ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
171 )
172 
173 /* Is the transaction known as a subxact? */
174 #define rbtxn_is_known_subxact(txn) \
175 ( \
176 	((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
177 )
178 
179 /* Has this transaction been spilled to disk? */
180 #define rbtxn_is_serialized(txn) \
181 ( \
182 	((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
183 )
184 
185 typedef struct ReorderBufferTXN
186 {
187 	/* See above */
188 	bits32		txn_flags;
189 
190 	/* The transaction's transaction id, can be a toplevel or sub xid. */
191 	TransactionId xid;
192 
193 	/* Xid of top-level transaction, if known */
194 	TransactionId toplevel_xid;
195 
196 	/*
197 	 * LSN of the first data carrying, WAL record with knowledge about this
198 	 * xid. This is allowed to *not* be first record adorned with this xid, if
199 	 * the previous records aren't relevant for logical decoding.
200 	 */
201 	XLogRecPtr	first_lsn;
202 
203 	/* ----
204 	 * LSN of the record that lead to this xact to be committed or
205 	 * aborted. This can be a
206 	 * * plain commit record
207 	 * * plain commit record, of a parent transaction
208 	 * * prepared transaction commit
209 	 * * plain abort record
210 	 * * prepared transaction abort
211 	 *
212 	 * This can also become set to earlier values than transaction end when
213 	 * a transaction is spilled to disk; specifically it's set to the LSN of
214 	 * the latest change written to disk so far.
215 	 * ----
216 	 */
217 	XLogRecPtr	final_lsn;
218 
219 	/*
220 	 * LSN pointing to the end of the commit record + 1.
221 	 */
222 	XLogRecPtr	end_lsn;
223 
224 	/*
225 	 * LSN of the last lsn at which snapshot information reside, so we can
226 	 * restart decoding from there and fully recover this transaction from
227 	 * WAL.
228 	 */
229 	XLogRecPtr	restart_decoding_lsn;
230 
231 	/* origin of the change that caused this transaction */
232 	RepOriginId origin_id;
233 	XLogRecPtr	origin_lsn;
234 
235 	/*
236 	 * Commit time, only known when we read the actual commit record.
237 	 */
238 	TimestampTz commit_time;
239 
240 	/*
241 	 * The base snapshot is used to decode all changes until either this
242 	 * transaction modifies the catalog, or another catalog-modifying
243 	 * transaction commits.
244 	 */
245 	Snapshot	base_snapshot;
246 	XLogRecPtr	base_snapshot_lsn;
247 	dlist_node	base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
248 
249 	/*
250 	 * How many ReorderBufferChange's do we have in this txn.
251 	 *
252 	 * Changes in subtransactions are *not* included but tracked separately.
253 	 */
254 	uint64		nentries;
255 
256 	/*
257 	 * How many of the above entries are stored in memory in contrast to being
258 	 * spilled to disk.
259 	 */
260 	uint64		nentries_mem;
261 
262 	/*
263 	 * List of ReorderBufferChange structs, including new Snapshots and new
264 	 * CommandIds
265 	 */
266 	dlist_head	changes;
267 
268 	/*
269 	 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
270 	 * Those are always assigned to the toplevel transaction. (Keep track of
271 	 * #entries to create a hash of the right size)
272 	 */
273 	dlist_head	tuplecids;
274 	uint64		ntuplecids;
275 
276 	/*
277 	 * On-demand built hash for looking up the above values.
278 	 */
279 	HTAB	   *tuplecid_hash;
280 
281 	/*
282 	 * Hash containing (potentially partial) toast entries. NULL if no toast
283 	 * tuples have been found for the current change.
284 	 */
285 	HTAB	   *toast_hash;
286 
287 	/*
288 	 * non-hierarchical list of subtransactions that are *not* aborted. Only
289 	 * used in toplevel transactions.
290 	 */
291 	dlist_head	subtxns;
292 	uint32		nsubtxns;
293 
294 	/*
295 	 * Stored cache invalidations. This is not a linked list because we get
296 	 * all the invalidations at once.
297 	 */
298 	uint32		ninvalidations;
299 	SharedInvalidationMessage *invalidations;
300 
301 	/* ---
302 	 * Position in one of three lists:
303 	 * * list of subtransactions if we are *known* to be subxact
304 	 * * list of toplevel xacts (can be an as-yet unknown subxact)
305 	 * * list of preallocated ReorderBufferTXNs (if unused)
306 	 * ---
307 	 */
308 	dlist_node	node;
309 
310 	/*
311 	 * Size of this transaction (changes currently in memory, in bytes).
312 	 */
313 	Size		size;
314 } ReorderBufferTXN;
315 
316 /* so we can define the callbacks used inside struct ReorderBuffer itself */
317 typedef struct ReorderBuffer ReorderBuffer;
318 
319 /* change callback signature */
320 typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb,
321 											ReorderBufferTXN *txn,
322 											Relation relation,
323 											ReorderBufferChange *change);
324 
325 /* truncate callback signature */
326 typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb,
327 											  ReorderBufferTXN *txn,
328 											  int nrelations,
329 											  Relation relations[],
330 											  ReorderBufferChange *change);
331 
332 /* begin callback signature */
333 typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
334 									  ReorderBufferTXN *txn);
335 
336 /* commit callback signature */
337 typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
338 									   ReorderBufferTXN *txn,
339 									   XLogRecPtr commit_lsn);
340 
341 /* message callback signature */
342 typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
343 										ReorderBufferTXN *txn,
344 										XLogRecPtr message_lsn,
345 										bool transactional,
346 										const char *prefix, Size sz,
347 										const char *message);
348 
349 struct ReorderBuffer
350 {
351 	/*
352 	 * xid => ReorderBufferTXN lookup table
353 	 */
354 	HTAB	   *by_txn;
355 
356 	/*
357 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
358 	 * record bearing that xid.
359 	 */
360 	dlist_head	toplevel_by_lsn;
361 
362 	/*
363 	 * Transactions and subtransactions that have a base snapshot, ordered by
364 	 * LSN of the record which caused us to first obtain the base snapshot.
365 	 * This is not the same as toplevel_by_lsn, because we only set the base
366 	 * snapshot on the first logical-decoding-relevant record (eg. heap
367 	 * writes), whereas the initial LSN could be set by other operations.
368 	 */
369 	dlist_head	txns_by_base_snapshot_lsn;
370 
371 	/*
372 	 * one-entry sized cache for by_txn. Very frequently the same txn gets
373 	 * looked up over and over again.
374 	 */
375 	TransactionId by_txn_last_xid;
376 	ReorderBufferTXN *by_txn_last_txn;
377 
378 	/*
379 	 * Callbacks to be called when a transactions commits.
380 	 */
381 	ReorderBufferBeginCB begin;
382 	ReorderBufferApplyChangeCB apply_change;
383 	ReorderBufferApplyTruncateCB apply_truncate;
384 	ReorderBufferCommitCB commit;
385 	ReorderBufferMessageCB message;
386 
387 	/*
388 	 * Pointer that will be passed untouched to the callbacks.
389 	 */
390 	void	   *private_data;
391 
392 	/*
393 	 * Saved output plugin option
394 	 */
395 	bool		output_rewrites;
396 
397 	/*
398 	 * Private memory context.
399 	 */
400 	MemoryContext context;
401 
402 	/*
403 	 * Memory contexts for specific types objects
404 	 */
405 	MemoryContext change_context;
406 	MemoryContext txn_context;
407 	MemoryContext tup_context;
408 
409 	XLogRecPtr	current_restart_decoding_lsn;
410 
411 	/* buffer for disk<->memory conversions */
412 	char	   *outbuf;
413 	Size		outbufsize;
414 
415 	/* memory accounting */
416 	Size		size;
417 };
418 
419 
420 ReorderBuffer *ReorderBufferAllocate(void);
421 void		ReorderBufferFree(ReorderBuffer *);
422 
423 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
424 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
425 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
426 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
427 
428 Oid		   *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
429 void		ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
430 
431 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
432 void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
433 									  bool transactional, const char *prefix,
434 									  Size message_size, const char *message);
435 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
436 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
437 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
438 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
439 void		ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
440 									 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
441 void		ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
442 void		ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
443 void		ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
444 
445 void		ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
446 void		ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
447 void		ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
448 										 CommandId cid);
449 void		ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
450 										 RelFileNode node, ItemPointerData pt,
451 										 CommandId cmin, CommandId cmax, CommandId combocid);
452 void		ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
453 										  Size nmsgs, SharedInvalidationMessage *msgs);
454 void		ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
455 											   SharedInvalidationMessage *invalidations);
456 void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
457 void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
458 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
459 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
460 
461 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
462 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
463 
464 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
465 
466 void		StartupReorderBuffer(void);
467 
468 #endif
469