1 /*
2  * reorderbuffer.h
3  *	  PostgreSQL logical replay/reorder buffer management.
4  *
5  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
6  *
7  * src/include/replication/reorderbuffer.h
8  */
9 #ifndef REORDERBUFFER_H
10 #define REORDERBUFFER_H
11 
12 #include "access/htup_details.h"
13 #include "lib/ilist.h"
14 #include "storage/sinval.h"
15 #include "utils/hsearch.h"
16 #include "utils/relcache.h"
17 #include "utils/snapshot.h"
18 #include "utils/timestamp.h"
19 
20 /* an individual tuple, stored in one chunk of memory */
21 typedef struct ReorderBufferTupleBuf
22 {
23 	/* position in preallocated list */
24 	slist_node	node;
25 
26 	/* tuple header, the interesting bit for users of logical decoding */
27 	HeapTupleData tuple;
28 
29 	/* pre-allocated size of tuple buffer, different from tuple size */
30 	Size		alloc_tuple_size;
31 
32 	/* actual tuple data follows */
33 } ReorderBufferTupleBuf;
34 
35 /* pointer to the data stored in a TupleBuf */
36 #define ReorderBufferTupleBufData(p) \
37 	((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
38 
39 /*
40  * Types of the change passed to a 'change' callback.
41  *
42  * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
43  * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
44  * changes. Users of the decoding facilities will never see changes with
45  * *_INTERNAL_* actions.
46  *
47  * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
48  * changes concern "speculative insertions", their confirmation, and abort
49  * respectively.  They're used by INSERT .. ON CONFLICT .. UPDATE.  Users of
50  * logical decoding don't have to care about these.
51  */
52 enum ReorderBufferChangeType
53 {
54 	REORDER_BUFFER_CHANGE_INSERT,
55 	REORDER_BUFFER_CHANGE_UPDATE,
56 	REORDER_BUFFER_CHANGE_DELETE,
57 	REORDER_BUFFER_CHANGE_MESSAGE,
58 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
59 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
60 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
61 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
62 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
63 	REORDER_BUFFER_CHANGE_TRUNCATE,
64 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
65 };
66 
67 /*
68  * a single 'change', can be an insert (with one tuple), an update (old, new),
69  * or a delete (old).
70  *
71  * The same struct is also used internally for other purposes but that should
72  * never be visible outside reorderbuffer.c.
73  */
74 typedef struct ReorderBufferChange
75 {
76 	XLogRecPtr	lsn;
77 
78 	/* The type of change. */
79 	enum ReorderBufferChangeType action;
80 
81 	RepOriginId origin_id;
82 
83 	/*
84 	 * Context data for the change. Which part of the union is valid depends
85 	 * on action.
86 	 */
87 	union
88 	{
89 		/* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
90 		struct
91 		{
92 			/* relation that has been changed */
93 			RelFileNode relnode;
94 
95 			/* no previously reassembled toast chunks are necessary anymore */
96 			bool		clear_toast_afterwards;
97 
98 			/* valid for DELETE || UPDATE */
99 			ReorderBufferTupleBuf *oldtuple;
100 			/* valid for INSERT || UPDATE */
101 			ReorderBufferTupleBuf *newtuple;
102 		}			tp;
103 
104 		/*
105 		 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
106 		 * set of relations to be truncated.
107 		 */
108 		struct
109 		{
110 			Size		nrelids;
111 			bool		cascade;
112 			bool		restart_seqs;
113 			Oid		   *relids;
114 		}			truncate;
115 
116 		/* Message with arbitrary data. */
117 		struct
118 		{
119 			char	   *prefix;
120 			Size		message_size;
121 			char	   *message;
122 		}			msg;
123 
124 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
125 		Snapshot	snapshot;
126 
127 		/*
128 		 * New command id for existing snapshot in a catalog changing tx. Set
129 		 * when action == *_INTERNAL_COMMAND_ID.
130 		 */
131 		CommandId	command_id;
132 
133 		/*
134 		 * New cid mapping for catalog changing transaction, set when action
135 		 * == *_INTERNAL_TUPLECID.
136 		 */
137 		struct
138 		{
139 			RelFileNode node;
140 			ItemPointerData tid;
141 			CommandId	cmin;
142 			CommandId	cmax;
143 			CommandId	combocid;
144 		}			tuplecid;
145 	}			data;
146 
147 	/*
148 	 * While in use this is how a change is linked into a transactions,
149 	 * otherwise it's the preallocated list.
150 	 */
151 	dlist_node	node;
152 } ReorderBufferChange;
153 
154 typedef struct ReorderBufferTXN
155 {
156 	/*
157 	 * The transactions transaction id, can be a toplevel or sub xid.
158 	 */
159 	TransactionId xid;
160 
161 	/* did the TX have catalog changes */
162 	bool		has_catalog_changes;
163 
164 	/* Do we know this is a subxact?  Xid of top-level txn if so */
165 	bool		is_known_as_subxact;
166 	TransactionId toplevel_xid;
167 
168 	/*
169 	 * LSN of the first data carrying, WAL record with knowledge about this
170 	 * xid. This is allowed to *not* be first record adorned with this xid, if
171 	 * the previous records aren't relevant for logical decoding.
172 	 */
173 	XLogRecPtr	first_lsn;
174 
175 	/* ----
176 	 * LSN of the record that lead to this xact to be committed or
177 	 * aborted. This can be a
178 	 * * plain commit record
179 	 * * plain commit record, of a parent transaction
180 	 * * prepared transaction commit
181 	 * * plain abort record
182 	 * * prepared transaction abort
183 	 *
184 	 * This can also become set to earlier values than transaction end when
185 	 * a transaction is spilled to disk; specifically it's set to the LSN of
186 	 * the latest change written to disk so far.
187 	 * ----
188 	 */
189 	XLogRecPtr	final_lsn;
190 
191 	/*
192 	 * LSN pointing to the end of the commit record + 1.
193 	 */
194 	XLogRecPtr	end_lsn;
195 
196 	/*
197 	 * LSN of the last lsn at which snapshot information reside, so we can
198 	 * restart decoding from there and fully recover this transaction from
199 	 * WAL.
200 	 */
201 	XLogRecPtr	restart_decoding_lsn;
202 
203 	/* origin of the change that caused this transaction */
204 	RepOriginId origin_id;
205 	XLogRecPtr	origin_lsn;
206 
207 	/*
208 	 * Commit time, only known when we read the actual commit record.
209 	 */
210 	TimestampTz commit_time;
211 
212 	/*
213 	 * The base snapshot is used to decode all changes until either this
214 	 * transaction modifies the catalog, or another catalog-modifying
215 	 * transaction commits.
216 	 */
217 	Snapshot	base_snapshot;
218 	XLogRecPtr	base_snapshot_lsn;
219 	dlist_node	base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
220 
221 	/*
222 	 * How many ReorderBufferChange's do we have in this txn.
223 	 *
224 	 * Changes in subtransactions are *not* included but tracked separately.
225 	 */
226 	uint64		nentries;
227 
228 	/*
229 	 * How many of the above entries are stored in memory in contrast to being
230 	 * spilled to disk.
231 	 */
232 	uint64		nentries_mem;
233 
234 	/*
235 	 * Has this transaction been spilled to disk?  It's not always possible to
236 	 * deduce that fact by comparing nentries with nentries_mem, because e.g.
237 	 * subtransactions of a large transaction might get serialized together
238 	 * with the parent - if they're restored to memory they'd have
239 	 * nentries_mem == nentries.
240 	 */
241 	bool		serialized;
242 
243 	/*
244 	 * List of ReorderBufferChange structs, including new Snapshots and new
245 	 * CommandIds
246 	 */
247 	dlist_head	changes;
248 
249 	/*
250 	 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
251 	 * Those are always assigned to the toplevel transaction. (Keep track of
252 	 * #entries to create a hash of the right size)
253 	 */
254 	dlist_head	tuplecids;
255 	uint64		ntuplecids;
256 
257 	/*
258 	 * On-demand built hash for looking up the above values.
259 	 */
260 	HTAB	   *tuplecid_hash;
261 
262 	/*
263 	 * Hash containing (potentially partial) toast entries. NULL if no toast
264 	 * tuples have been found for the current change.
265 	 */
266 	HTAB	   *toast_hash;
267 
268 	/*
269 	 * non-hierarchical list of subtransactions that are *not* aborted. Only
270 	 * used in toplevel transactions.
271 	 */
272 	dlist_head	subtxns;
273 	uint32		nsubtxns;
274 
275 	/*
276 	 * Stored cache invalidations. This is not a linked list because we get
277 	 * all the invalidations at once.
278 	 */
279 	uint32		ninvalidations;
280 	SharedInvalidationMessage *invalidations;
281 
282 	/* ---
283 	 * Position in one of three lists:
284 	 * * list of subtransactions if we are *known* to be subxact
285 	 * * list of toplevel xacts (can be an as-yet unknown subxact)
286 	 * * list of preallocated ReorderBufferTXNs (if unused)
287 	 * ---
288 	 */
289 	dlist_node	node;
290 
291 } ReorderBufferTXN;
292 
293 /* so we can define the callbacks used inside struct ReorderBuffer itself */
294 typedef struct ReorderBuffer ReorderBuffer;
295 
296 /* change callback signature */
297 typedef void (*ReorderBufferApplyChangeCB) (
298 											ReorderBuffer *rb,
299 											ReorderBufferTXN *txn,
300 											Relation relation,
301 											ReorderBufferChange *change);
302 
303 /* truncate callback signature */
304 typedef void (*ReorderBufferApplyTruncateCB) (
305 											  ReorderBuffer *rb,
306 											  ReorderBufferTXN *txn,
307 											  int nrelations,
308 											  Relation relations[],
309 											  ReorderBufferChange *change);
310 
311 /* begin callback signature */
312 typedef void (*ReorderBufferBeginCB) (
313 									  ReorderBuffer *rb,
314 									  ReorderBufferTXN *txn);
315 
316 /* commit callback signature */
317 typedef void (*ReorderBufferCommitCB) (
318 									   ReorderBuffer *rb,
319 									   ReorderBufferTXN *txn,
320 									   XLogRecPtr commit_lsn);
321 
322 /* message callback signature */
323 typedef void (*ReorderBufferMessageCB) (
324 										ReorderBuffer *rb,
325 										ReorderBufferTXN *txn,
326 										XLogRecPtr message_lsn,
327 										bool transactional,
328 										const char *prefix, Size sz,
329 										const char *message);
330 
331 struct ReorderBuffer
332 {
333 	/*
334 	 * xid => ReorderBufferTXN lookup table
335 	 */
336 	HTAB	   *by_txn;
337 
338 	/*
339 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
340 	 * record bearing that xid.
341 	 */
342 	dlist_head	toplevel_by_lsn;
343 
344 	/*
345 	 * Transactions and subtransactions that have a base snapshot, ordered by
346 	 * LSN of the record which caused us to first obtain the base snapshot.
347 	 * This is not the same as toplevel_by_lsn, because we only set the base
348 	 * snapshot on the first logical-decoding-relevant record (eg. heap
349 	 * writes), whereas the initial LSN could be set by other operations.
350 	 */
351 	dlist_head	txns_by_base_snapshot_lsn;
352 
353 	/*
354 	 * one-entry sized cache for by_txn. Very frequently the same txn gets
355 	 * looked up over and over again.
356 	 */
357 	TransactionId by_txn_last_xid;
358 	ReorderBufferTXN *by_txn_last_txn;
359 
360 	/*
361 	 * Callbacks to be called when a transactions commits.
362 	 */
363 	ReorderBufferBeginCB begin;
364 	ReorderBufferApplyChangeCB apply_change;
365 	ReorderBufferApplyTruncateCB apply_truncate;
366 	ReorderBufferCommitCB commit;
367 	ReorderBufferMessageCB message;
368 
369 	/*
370 	 * Pointer that will be passed untouched to the callbacks.
371 	 */
372 	void	   *private_data;
373 
374 	/*
375 	 * Saved output plugin option
376 	 */
377 	bool		output_rewrites;
378 
379 	/*
380 	 * Private memory context.
381 	 */
382 	MemoryContext context;
383 
384 	/*
385 	 * Memory contexts for specific types objects
386 	 */
387 	MemoryContext change_context;
388 	MemoryContext txn_context;
389 	MemoryContext tup_context;
390 
391 	XLogRecPtr	current_restart_decoding_lsn;
392 
393 	/* buffer for disk<->memory conversions */
394 	char	   *outbuf;
395 	Size		outbufsize;
396 };
397 
398 
399 ReorderBuffer *ReorderBufferAllocate(void);
400 void		ReorderBufferFree(ReorderBuffer *);
401 
402 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
403 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
404 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
405 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
406 
407 Oid * ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
408 void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
409 
410 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
411 void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
412 						  bool transactional, const char *prefix,
413 						  Size message_size, const char *message);
414 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
415 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
416 					TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
417 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
418 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
419 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
420 void		ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
421 void		ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
422 void		ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
423 
424 void		ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
425 void		ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
426 void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
427 							 CommandId cid);
428 void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
429 							 RelFileNode node, ItemPointerData pt,
430 							 CommandId cmin, CommandId cmax, CommandId combocid);
431 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
432 							  Size nmsgs, SharedInvalidationMessage *msgs);
433 void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
434 								   SharedInvalidationMessage *invalidations);
435 void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
436 void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
437 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
438 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
439 
440 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
441 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
442 
443 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
444 
445 void		StartupReorderBuffer(void);
446 
447 #endif
448