1 /* 2 * reorderbuffer.h 3 * PostgreSQL logical replay/reorder buffer management. 4 * 5 * Copyright (c) 2012-2020, PostgreSQL Global Development Group 6 * 7 * src/include/replication/reorderbuffer.h 8 */ 9 #ifndef REORDERBUFFER_H 10 #define REORDERBUFFER_H 11 12 #include "access/htup_details.h" 13 #include "lib/ilist.h" 14 #include "storage/sinval.h" 15 #include "utils/hsearch.h" 16 #include "utils/relcache.h" 17 #include "utils/snapshot.h" 18 #include "utils/timestamp.h" 19 20 extern PGDLLIMPORT int logical_decoding_work_mem; 21 22 /* an individual tuple, stored in one chunk of memory */ 23 typedef struct ReorderBufferTupleBuf 24 { 25 /* position in preallocated list */ 26 slist_node node; 27 28 /* tuple header, the interesting bit for users of logical decoding */ 29 HeapTupleData tuple; 30 31 /* pre-allocated size of tuple buffer, different from tuple size */ 32 Size alloc_tuple_size; 33 34 /* actual tuple data follows */ 35 } ReorderBufferTupleBuf; 36 37 /* pointer to the data stored in a TupleBuf */ 38 #define ReorderBufferTupleBufData(p) \ 39 ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) 40 41 /* 42 * Types of the change passed to a 'change' callback. 43 * 44 * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds 45 * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE 46 * changes. Users of the decoding facilities will never see changes with 47 * *_INTERNAL_* actions. 48 * 49 * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT 50 * changes concern "speculative insertions", their confirmation, and abort 51 * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of 52 * logical decoding don't have to care about these. 53 */ 54 enum ReorderBufferChangeType 55 { 56 REORDER_BUFFER_CHANGE_INSERT, 57 REORDER_BUFFER_CHANGE_UPDATE, 58 REORDER_BUFFER_CHANGE_DELETE, 59 REORDER_BUFFER_CHANGE_MESSAGE, 60 REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, 61 REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, 62 REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, 63 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, 64 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, 65 REORDER_BUFFER_CHANGE_TRUNCATE, 66 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 67 }; 68 69 /* forward declaration */ 70 struct ReorderBufferTXN; 71 72 /* 73 * a single 'change', can be an insert (with one tuple), an update (old, new), 74 * or a delete (old). 75 * 76 * The same struct is also used internally for other purposes but that should 77 * never be visible outside reorderbuffer.c. 78 */ 79 typedef struct ReorderBufferChange 80 { 81 XLogRecPtr lsn; 82 83 /* The type of change. */ 84 enum ReorderBufferChangeType action; 85 86 /* Transaction this change belongs to. */ 87 struct ReorderBufferTXN *txn; 88 89 RepOriginId origin_id; 90 91 /* 92 * Context data for the change. Which part of the union is valid depends 93 * on action. 94 */ 95 union 96 { 97 /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */ 98 struct 99 { 100 /* relation that has been changed */ 101 RelFileNode relnode; 102 103 /* no previously reassembled toast chunks are necessary anymore */ 104 bool clear_toast_afterwards; 105 106 /* valid for DELETE || UPDATE */ 107 ReorderBufferTupleBuf *oldtuple; 108 /* valid for INSERT || UPDATE */ 109 ReorderBufferTupleBuf *newtuple; 110 } tp; 111 112 /* 113 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one 114 * set of relations to be truncated. 115 */ 116 struct 117 { 118 Size nrelids; 119 bool cascade; 120 bool restart_seqs; 121 Oid *relids; 122 } truncate; 123 124 /* Message with arbitrary data. */ 125 struct 126 { 127 char *prefix; 128 Size message_size; 129 char *message; 130 } msg; 131 132 /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ 133 Snapshot snapshot; 134 135 /* 136 * New command id for existing snapshot in a catalog changing tx. Set 137 * when action == *_INTERNAL_COMMAND_ID. 138 */ 139 CommandId command_id; 140 141 /* 142 * New cid mapping for catalog changing transaction, set when action 143 * == *_INTERNAL_TUPLECID. 144 */ 145 struct 146 { 147 RelFileNode node; 148 ItemPointerData tid; 149 CommandId cmin; 150 CommandId cmax; 151 CommandId combocid; 152 } tuplecid; 153 } data; 154 155 /* 156 * While in use this is how a change is linked into a transactions, 157 * otherwise it's the preallocated list. 158 */ 159 dlist_node node; 160 } ReorderBufferChange; 161 162 /* ReorderBufferTXN txn_flags */ 163 #define RBTXN_HAS_CATALOG_CHANGES 0x0001 164 #define RBTXN_IS_SUBXACT 0x0002 165 #define RBTXN_IS_SERIALIZED 0x0004 166 167 /* Does the transaction have catalog changes? */ 168 #define rbtxn_has_catalog_changes(txn) \ 169 ( \ 170 ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \ 171 ) 172 173 /* Is the transaction known as a subxact? */ 174 #define rbtxn_is_known_subxact(txn) \ 175 ( \ 176 ((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \ 177 ) 178 179 /* Has this transaction been spilled to disk? */ 180 #define rbtxn_is_serialized(txn) \ 181 ( \ 182 ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ 183 ) 184 185 typedef struct ReorderBufferTXN 186 { 187 /* See above */ 188 bits32 txn_flags; 189 190 /* The transaction's transaction id, can be a toplevel or sub xid. */ 191 TransactionId xid; 192 193 /* Xid of top-level transaction, if known */ 194 TransactionId toplevel_xid; 195 196 /* 197 * LSN of the first data carrying, WAL record with knowledge about this 198 * xid. This is allowed to *not* be first record adorned with this xid, if 199 * the previous records aren't relevant for logical decoding. 200 */ 201 XLogRecPtr first_lsn; 202 203 /* ---- 204 * LSN of the record that lead to this xact to be committed or 205 * aborted. This can be a 206 * * plain commit record 207 * * plain commit record, of a parent transaction 208 * * prepared transaction commit 209 * * plain abort record 210 * * prepared transaction abort 211 * 212 * This can also become set to earlier values than transaction end when 213 * a transaction is spilled to disk; specifically it's set to the LSN of 214 * the latest change written to disk so far. 215 * ---- 216 */ 217 XLogRecPtr final_lsn; 218 219 /* 220 * LSN pointing to the end of the commit record + 1. 221 */ 222 XLogRecPtr end_lsn; 223 224 /* 225 * LSN of the last lsn at which snapshot information reside, so we can 226 * restart decoding from there and fully recover this transaction from 227 * WAL. 228 */ 229 XLogRecPtr restart_decoding_lsn; 230 231 /* origin of the change that caused this transaction */ 232 RepOriginId origin_id; 233 XLogRecPtr origin_lsn; 234 235 /* 236 * Commit time, only known when we read the actual commit record. 237 */ 238 TimestampTz commit_time; 239 240 /* 241 * The base snapshot is used to decode all changes until either this 242 * transaction modifies the catalog, or another catalog-modifying 243 * transaction commits. 244 */ 245 Snapshot base_snapshot; 246 XLogRecPtr base_snapshot_lsn; 247 dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ 248 249 /* 250 * How many ReorderBufferChange's do we have in this txn. 251 * 252 * Changes in subtransactions are *not* included but tracked separately. 253 */ 254 uint64 nentries; 255 256 /* 257 * How many of the above entries are stored in memory in contrast to being 258 * spilled to disk. 259 */ 260 uint64 nentries_mem; 261 262 /* 263 * List of ReorderBufferChange structs, including new Snapshots and new 264 * CommandIds 265 */ 266 dlist_head changes; 267 268 /* 269 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples. 270 * Those are always assigned to the toplevel transaction. (Keep track of 271 * #entries to create a hash of the right size) 272 */ 273 dlist_head tuplecids; 274 uint64 ntuplecids; 275 276 /* 277 * On-demand built hash for looking up the above values. 278 */ 279 HTAB *tuplecid_hash; 280 281 /* 282 * Hash containing (potentially partial) toast entries. NULL if no toast 283 * tuples have been found for the current change. 284 */ 285 HTAB *toast_hash; 286 287 /* 288 * non-hierarchical list of subtransactions that are *not* aborted. Only 289 * used in toplevel transactions. 290 */ 291 dlist_head subtxns; 292 uint32 nsubtxns; 293 294 /* 295 * Stored cache invalidations. This is not a linked list because we get 296 * all the invalidations at once. 297 */ 298 uint32 ninvalidations; 299 SharedInvalidationMessage *invalidations; 300 301 /* --- 302 * Position in one of three lists: 303 * * list of subtransactions if we are *known* to be subxact 304 * * list of toplevel xacts (can be an as-yet unknown subxact) 305 * * list of preallocated ReorderBufferTXNs (if unused) 306 * --- 307 */ 308 dlist_node node; 309 310 /* 311 * Size of this transaction (changes currently in memory, in bytes). 312 */ 313 Size size; 314 } ReorderBufferTXN; 315 316 /* so we can define the callbacks used inside struct ReorderBuffer itself */ 317 typedef struct ReorderBuffer ReorderBuffer; 318 319 /* change callback signature */ 320 typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb, 321 ReorderBufferTXN *txn, 322 Relation relation, 323 ReorderBufferChange *change); 324 325 /* truncate callback signature */ 326 typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, 327 ReorderBufferTXN *txn, 328 int nrelations, 329 Relation relations[], 330 ReorderBufferChange *change); 331 332 /* begin callback signature */ 333 typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb, 334 ReorderBufferTXN *txn); 335 336 /* commit callback signature */ 337 typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, 338 ReorderBufferTXN *txn, 339 XLogRecPtr commit_lsn); 340 341 /* message callback signature */ 342 typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, 343 ReorderBufferTXN *txn, 344 XLogRecPtr message_lsn, 345 bool transactional, 346 const char *prefix, Size sz, 347 const char *message); 348 349 struct ReorderBuffer 350 { 351 /* 352 * xid => ReorderBufferTXN lookup table 353 */ 354 HTAB *by_txn; 355 356 /* 357 * Transactions that could be a toplevel xact, ordered by LSN of the first 358 * record bearing that xid. 359 */ 360 dlist_head toplevel_by_lsn; 361 362 /* 363 * Transactions and subtransactions that have a base snapshot, ordered by 364 * LSN of the record which caused us to first obtain the base snapshot. 365 * This is not the same as toplevel_by_lsn, because we only set the base 366 * snapshot on the first logical-decoding-relevant record (eg. heap 367 * writes), whereas the initial LSN could be set by other operations. 368 */ 369 dlist_head txns_by_base_snapshot_lsn; 370 371 /* 372 * one-entry sized cache for by_txn. Very frequently the same txn gets 373 * looked up over and over again. 374 */ 375 TransactionId by_txn_last_xid; 376 ReorderBufferTXN *by_txn_last_txn; 377 378 /* 379 * Callbacks to be called when a transactions commits. 380 */ 381 ReorderBufferBeginCB begin; 382 ReorderBufferApplyChangeCB apply_change; 383 ReorderBufferApplyTruncateCB apply_truncate; 384 ReorderBufferCommitCB commit; 385 ReorderBufferMessageCB message; 386 387 /* 388 * Pointer that will be passed untouched to the callbacks. 389 */ 390 void *private_data; 391 392 /* 393 * Saved output plugin option 394 */ 395 bool output_rewrites; 396 397 /* 398 * Private memory context. 399 */ 400 MemoryContext context; 401 402 /* 403 * Memory contexts for specific types objects 404 */ 405 MemoryContext change_context; 406 MemoryContext txn_context; 407 MemoryContext tup_context; 408 409 XLogRecPtr current_restart_decoding_lsn; 410 411 /* buffer for disk<->memory conversions */ 412 char *outbuf; 413 Size outbufsize; 414 415 /* memory accounting */ 416 Size size; 417 }; 418 419 420 ReorderBuffer *ReorderBufferAllocate(void); 421 void ReorderBufferFree(ReorderBuffer *); 422 423 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); 424 void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); 425 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); 426 void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); 427 428 Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); 429 void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); 430 431 void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); 432 void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, 433 bool transactional, const char *prefix, 434 Size message_size, const char *message); 435 void ReorderBufferCommit(ReorderBuffer *, TransactionId, 436 XLogRecPtr commit_lsn, XLogRecPtr end_lsn, 437 TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); 438 void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); 439 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, 440 XLogRecPtr commit_lsn, XLogRecPtr end_lsn); 441 void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); 442 void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); 443 void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); 444 445 void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); 446 void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); 447 void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, 448 CommandId cid); 449 void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, 450 RelFileNode node, ItemPointerData pt, 451 CommandId cmin, CommandId cmax, CommandId combocid); 452 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, 453 Size nmsgs, SharedInvalidationMessage *msgs); 454 void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, 455 SharedInvalidationMessage *invalidations); 456 void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); 457 void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); 458 bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); 459 bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); 460 461 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); 462 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); 463 464 void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); 465 466 void StartupReorderBuffer(void); 467 468 #endif 469