1 /* 2 * reorderbuffer.h 3 * PostgreSQL logical replay/reorder buffer management. 4 * 5 * Copyright (c) 2012-2018, PostgreSQL Global Development Group 6 * 7 * src/include/replication/reorderbuffer.h 8 */ 9 #ifndef REORDERBUFFER_H 10 #define REORDERBUFFER_H 11 12 #include "access/htup_details.h" 13 #include "lib/ilist.h" 14 #include "storage/sinval.h" 15 #include "utils/hsearch.h" 16 #include "utils/relcache.h" 17 #include "utils/snapshot.h" 18 #include "utils/timestamp.h" 19 20 /* an individual tuple, stored in one chunk of memory */ 21 typedef struct ReorderBufferTupleBuf 22 { 23 /* position in preallocated list */ 24 slist_node node; 25 26 /* tuple header, the interesting bit for users of logical decoding */ 27 HeapTupleData tuple; 28 29 /* pre-allocated size of tuple buffer, different from tuple size */ 30 Size alloc_tuple_size; 31 32 /* actual tuple data follows */ 33 } ReorderBufferTupleBuf; 34 35 /* pointer to the data stored in a TupleBuf */ 36 #define ReorderBufferTupleBufData(p) \ 37 ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) 38 39 /* 40 * Types of the change passed to a 'change' callback. 41 * 42 * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds 43 * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE 44 * changes. Users of the decoding facilities will never see changes with 45 * *_INTERNAL_* actions. 46 * 47 * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT 48 * changes concern "speculative insertions", their confirmation, and abort 49 * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of 50 * logical decoding don't have to care about these. 51 */ 52 enum ReorderBufferChangeType 53 { 54 REORDER_BUFFER_CHANGE_INSERT, 55 REORDER_BUFFER_CHANGE_UPDATE, 56 REORDER_BUFFER_CHANGE_DELETE, 57 REORDER_BUFFER_CHANGE_MESSAGE, 58 REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, 59 REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, 60 REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, 61 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, 62 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, 63 REORDER_BUFFER_CHANGE_TRUNCATE, 64 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 65 }; 66 67 /* 68 * a single 'change', can be an insert (with one tuple), an update (old, new), 69 * or a delete (old). 70 * 71 * The same struct is also used internally for other purposes but that should 72 * never be visible outside reorderbuffer.c. 73 */ 74 typedef struct ReorderBufferChange 75 { 76 XLogRecPtr lsn; 77 78 /* The type of change. */ 79 enum ReorderBufferChangeType action; 80 81 RepOriginId origin_id; 82 83 /* 84 * Context data for the change. Which part of the union is valid depends 85 * on action. 86 */ 87 union 88 { 89 /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */ 90 struct 91 { 92 /* relation that has been changed */ 93 RelFileNode relnode; 94 95 /* no previously reassembled toast chunks are necessary anymore */ 96 bool clear_toast_afterwards; 97 98 /* valid for DELETE || UPDATE */ 99 ReorderBufferTupleBuf *oldtuple; 100 /* valid for INSERT || UPDATE */ 101 ReorderBufferTupleBuf *newtuple; 102 } tp; 103 104 /* 105 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one 106 * set of relations to be truncated. 107 */ 108 struct 109 { 110 Size nrelids; 111 bool cascade; 112 bool restart_seqs; 113 Oid *relids; 114 } truncate; 115 116 /* Message with arbitrary data. */ 117 struct 118 { 119 char *prefix; 120 Size message_size; 121 char *message; 122 } msg; 123 124 /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ 125 Snapshot snapshot; 126 127 /* 128 * New command id for existing snapshot in a catalog changing tx. Set 129 * when action == *_INTERNAL_COMMAND_ID. 130 */ 131 CommandId command_id; 132 133 /* 134 * New cid mapping for catalog changing transaction, set when action 135 * == *_INTERNAL_TUPLECID. 136 */ 137 struct 138 { 139 RelFileNode node; 140 ItemPointerData tid; 141 CommandId cmin; 142 CommandId cmax; 143 CommandId combocid; 144 } tuplecid; 145 } data; 146 147 /* 148 * While in use this is how a change is linked into a transactions, 149 * otherwise it's the preallocated list. 150 */ 151 dlist_node node; 152 } ReorderBufferChange; 153 154 typedef struct ReorderBufferTXN 155 { 156 /* 157 * The transactions transaction id, can be a toplevel or sub xid. 158 */ 159 TransactionId xid; 160 161 /* did the TX have catalog changes */ 162 bool has_catalog_changes; 163 164 /* Do we know this is a subxact? Xid of top-level txn if so */ 165 bool is_known_as_subxact; 166 TransactionId toplevel_xid; 167 168 /* 169 * LSN of the first data carrying, WAL record with knowledge about this 170 * xid. This is allowed to *not* be first record adorned with this xid, if 171 * the previous records aren't relevant for logical decoding. 172 */ 173 XLogRecPtr first_lsn; 174 175 /* ---- 176 * LSN of the record that lead to this xact to be committed or 177 * aborted. This can be a 178 * * plain commit record 179 * * plain commit record, of a parent transaction 180 * * prepared transaction commit 181 * * plain abort record 182 * * prepared transaction abort 183 * 184 * This can also become set to earlier values than transaction end when 185 * a transaction is spilled to disk; specifically it's set to the LSN of 186 * the latest change written to disk so far. 187 * ---- 188 */ 189 XLogRecPtr final_lsn; 190 191 /* 192 * LSN pointing to the end of the commit record + 1. 193 */ 194 XLogRecPtr end_lsn; 195 196 /* 197 * LSN of the last lsn at which snapshot information reside, so we can 198 * restart decoding from there and fully recover this transaction from 199 * WAL. 200 */ 201 XLogRecPtr restart_decoding_lsn; 202 203 /* origin of the change that caused this transaction */ 204 RepOriginId origin_id; 205 XLogRecPtr origin_lsn; 206 207 /* 208 * Commit time, only known when we read the actual commit record. 209 */ 210 TimestampTz commit_time; 211 212 /* 213 * The base snapshot is used to decode all changes until either this 214 * transaction modifies the catalog, or another catalog-modifying 215 * transaction commits. 216 */ 217 Snapshot base_snapshot; 218 XLogRecPtr base_snapshot_lsn; 219 dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ 220 221 /* 222 * How many ReorderBufferChange's do we have in this txn. 223 * 224 * Changes in subtransactions are *not* included but tracked separately. 225 */ 226 uint64 nentries; 227 228 /* 229 * How many of the above entries are stored in memory in contrast to being 230 * spilled to disk. 231 */ 232 uint64 nentries_mem; 233 234 /* 235 * Has this transaction been spilled to disk? It's not always possible to 236 * deduce that fact by comparing nentries with nentries_mem, because e.g. 237 * subtransactions of a large transaction might get serialized together 238 * with the parent - if they're restored to memory they'd have 239 * nentries_mem == nentries. 240 */ 241 bool serialized; 242 243 /* 244 * List of ReorderBufferChange structs, including new Snapshots and new 245 * CommandIds 246 */ 247 dlist_head changes; 248 249 /* 250 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples. 251 * Those are always assigned to the toplevel transaction. (Keep track of 252 * #entries to create a hash of the right size) 253 */ 254 dlist_head tuplecids; 255 uint64 ntuplecids; 256 257 /* 258 * On-demand built hash for looking up the above values. 259 */ 260 HTAB *tuplecid_hash; 261 262 /* 263 * Hash containing (potentially partial) toast entries. NULL if no toast 264 * tuples have been found for the current change. 265 */ 266 HTAB *toast_hash; 267 268 /* 269 * non-hierarchical list of subtransactions that are *not* aborted. Only 270 * used in toplevel transactions. 271 */ 272 dlist_head subtxns; 273 uint32 nsubtxns; 274 275 /* 276 * Stored cache invalidations. This is not a linked list because we get 277 * all the invalidations at once. 278 */ 279 uint32 ninvalidations; 280 SharedInvalidationMessage *invalidations; 281 282 /* --- 283 * Position in one of three lists: 284 * * list of subtransactions if we are *known* to be subxact 285 * * list of toplevel xacts (can be an as-yet unknown subxact) 286 * * list of preallocated ReorderBufferTXNs (if unused) 287 * --- 288 */ 289 dlist_node node; 290 291 } ReorderBufferTXN; 292 293 /* so we can define the callbacks used inside struct ReorderBuffer itself */ 294 typedef struct ReorderBuffer ReorderBuffer; 295 296 /* change callback signature */ 297 typedef void (*ReorderBufferApplyChangeCB) ( 298 ReorderBuffer *rb, 299 ReorderBufferTXN *txn, 300 Relation relation, 301 ReorderBufferChange *change); 302 303 /* truncate callback signature */ 304 typedef void (*ReorderBufferApplyTruncateCB) ( 305 ReorderBuffer *rb, 306 ReorderBufferTXN *txn, 307 int nrelations, 308 Relation relations[], 309 ReorderBufferChange *change); 310 311 /* begin callback signature */ 312 typedef void (*ReorderBufferBeginCB) ( 313 ReorderBuffer *rb, 314 ReorderBufferTXN *txn); 315 316 /* commit callback signature */ 317 typedef void (*ReorderBufferCommitCB) ( 318 ReorderBuffer *rb, 319 ReorderBufferTXN *txn, 320 XLogRecPtr commit_lsn); 321 322 /* message callback signature */ 323 typedef void (*ReorderBufferMessageCB) ( 324 ReorderBuffer *rb, 325 ReorderBufferTXN *txn, 326 XLogRecPtr message_lsn, 327 bool transactional, 328 const char *prefix, Size sz, 329 const char *message); 330 331 struct ReorderBuffer 332 { 333 /* 334 * xid => ReorderBufferTXN lookup table 335 */ 336 HTAB *by_txn; 337 338 /* 339 * Transactions that could be a toplevel xact, ordered by LSN of the first 340 * record bearing that xid. 341 */ 342 dlist_head toplevel_by_lsn; 343 344 /* 345 * Transactions and subtransactions that have a base snapshot, ordered by 346 * LSN of the record which caused us to first obtain the base snapshot. 347 * This is not the same as toplevel_by_lsn, because we only set the base 348 * snapshot on the first logical-decoding-relevant record (eg. heap 349 * writes), whereas the initial LSN could be set by other operations. 350 */ 351 dlist_head txns_by_base_snapshot_lsn; 352 353 /* 354 * one-entry sized cache for by_txn. Very frequently the same txn gets 355 * looked up over and over again. 356 */ 357 TransactionId by_txn_last_xid; 358 ReorderBufferTXN *by_txn_last_txn; 359 360 /* 361 * Callbacks to be called when a transactions commits. 362 */ 363 ReorderBufferBeginCB begin; 364 ReorderBufferApplyChangeCB apply_change; 365 ReorderBufferApplyTruncateCB apply_truncate; 366 ReorderBufferCommitCB commit; 367 ReorderBufferMessageCB message; 368 369 /* 370 * Pointer that will be passed untouched to the callbacks. 371 */ 372 void *private_data; 373 374 /* 375 * Saved output plugin option 376 */ 377 bool output_rewrites; 378 379 /* 380 * Private memory context. 381 */ 382 MemoryContext context; 383 384 /* 385 * Memory contexts for specific types objects 386 */ 387 MemoryContext change_context; 388 MemoryContext txn_context; 389 MemoryContext tup_context; 390 391 XLogRecPtr current_restart_decoding_lsn; 392 393 /* buffer for disk<->memory conversions */ 394 char *outbuf; 395 Size outbufsize; 396 }; 397 398 399 ReorderBuffer *ReorderBufferAllocate(void); 400 void ReorderBufferFree(ReorderBuffer *); 401 402 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); 403 void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); 404 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); 405 void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); 406 407 Oid * ReorderBufferGetRelids(ReorderBuffer *, int nrelids); 408 void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); 409 410 void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); 411 void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, 412 bool transactional, const char *prefix, 413 Size message_size, const char *message); 414 void ReorderBufferCommit(ReorderBuffer *, TransactionId, 415 XLogRecPtr commit_lsn, XLogRecPtr end_lsn, 416 TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); 417 void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); 418 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, 419 XLogRecPtr commit_lsn, XLogRecPtr end_lsn); 420 void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); 421 void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); 422 void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); 423 424 void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); 425 void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); 426 void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, 427 CommandId cid); 428 void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, 429 RelFileNode node, ItemPointerData pt, 430 CommandId cmin, CommandId cmax, CommandId combocid); 431 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, 432 Size nmsgs, SharedInvalidationMessage *msgs); 433 void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, 434 SharedInvalidationMessage *invalidations); 435 void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); 436 void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); 437 bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); 438 bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); 439 440 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); 441 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); 442 443 void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); 444 445 void StartupReorderBuffer(void); 446 447 #endif 448