1 /*------------------------------------------------------------------------- 2 * 3 * rewriteheap.c 4 * Support functions to rewrite tables. 5 * 6 * These functions provide a facility to completely rewrite a heap, while 7 * preserving visibility information and update chains. 8 * 9 * INTERFACE 10 * 11 * The caller is responsible for creating the new heap, all catalog 12 * changes, supplying the tuples to be written to the new heap, and 13 * rebuilding indexes. The caller must hold AccessExclusiveLock on the 14 * target table, because we assume no one else is writing into it. 15 * 16 * To use the facility: 17 * 18 * begin_heap_rewrite 19 * while (fetch next tuple) 20 * { 21 * if (tuple is dead) 22 * rewrite_heap_dead_tuple 23 * else 24 * { 25 * // do any transformations here if required 26 * rewrite_heap_tuple 27 * } 28 * } 29 * end_heap_rewrite 30 * 31 * The contents of the new relation shouldn't be relied on until after 32 * end_heap_rewrite is called. 33 * 34 * 35 * IMPLEMENTATION 36 * 37 * This would be a fairly trivial affair, except that we need to maintain 38 * the ctid chains that link versions of an updated tuple together. 39 * Since the newly stored tuples will have tids different from the original 40 * ones, if we just copied t_ctid fields to the new table the links would 41 * be wrong. When we are required to copy a (presumably recently-dead or 42 * delete-in-progress) tuple whose ctid doesn't point to itself, we have 43 * to substitute the correct ctid instead. 44 * 45 * For each ctid reference from A -> B, we might encounter either A first 46 * or B first. (Note that a tuple in the middle of a chain is both A and B 47 * of different pairs.) 48 * 49 * If we encounter A first, we'll store the tuple in the unresolved_tups 50 * hash table. When we later encounter B, we remove A from the hash table, 51 * fix the ctid to point to the new location of B, and insert both A and B 52 * to the new heap. 53 * 54 * If we encounter B first, we can insert B to the new heap right away. 55 * We then add an entry to the old_new_tid_map hash table showing B's 56 * original tid (in the old heap) and new tid (in the new heap). 57 * When we later encounter A, we get the new location of B from the table, 58 * and can write A immediately with the correct ctid. 59 * 60 * Entries in the hash tables can be removed as soon as the later tuple 61 * is encountered. That helps to keep the memory usage down. At the end, 62 * both tables are usually empty; we should have encountered both A and B 63 * of each pair. However, it's possible for A to be RECENTLY_DEAD and B 64 * entirely DEAD according to HeapTupleSatisfiesVacuum, because the test 65 * for deadness using OldestXmin is not exact. In such a case we might 66 * encounter B first, and skip it, and find A later. Then A would be added 67 * to unresolved_tups, and stay there until end of the rewrite. Since 68 * this case is very unusual, we don't worry about the memory usage. 69 * 70 * Using in-memory hash tables means that we use some memory for each live 71 * update chain in the table, from the time we find one end of the 72 * reference until we find the other end. That shouldn't be a problem in 73 * practice, but if you do something like an UPDATE without a where-clause 74 * on a large table, and then run CLUSTER in the same transaction, you 75 * could run out of memory. It doesn't seem worthwhile to add support for 76 * spill-to-disk, as there shouldn't be that many RECENTLY_DEAD tuples in a 77 * table under normal circumstances. Furthermore, in the typical scenario 78 * of CLUSTERing on an unchanging key column, we'll see all the versions 79 * of a given tuple together anyway, and so the peak memory usage is only 80 * proportional to the number of RECENTLY_DEAD versions of a single row, not 81 * in the whole table. Note that if we do fail halfway through a CLUSTER, 82 * the old table is still valid, so failure is not catastrophic. 83 * 84 * We can't use the normal heap_insert function to insert into the new 85 * heap, because heap_insert overwrites the visibility information. 86 * We use a special-purpose raw_heap_insert function instead, which 87 * is optimized for bulk inserting a lot of tuples, knowing that we have 88 * exclusive access to the heap. raw_heap_insert builds new pages in 89 * local storage. When a page is full, or at the end of the process, 90 * we insert it to WAL as a single record and then write it to disk 91 * directly through smgr. Note, however, that any data sent to the new 92 * heap's TOAST table will go through the normal bufmgr. 93 * 94 * 95 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 96 * Portions Copyright (c) 1994-5, Regents of the University of California 97 * 98 * IDENTIFICATION 99 * src/backend/access/heap/rewriteheap.c 100 * 101 *------------------------------------------------------------------------- 102 */ 103 #include "postgres.h" 104 105 #include <sys/stat.h> 106 #include <unistd.h> 107 108 #include "miscadmin.h" 109 110 #include "access/heapam.h" 111 #include "access/heapam_xlog.h" 112 #include "access/rewriteheap.h" 113 #include "access/transam.h" 114 #include "access/tuptoaster.h" 115 #include "access/xact.h" 116 #include "access/xloginsert.h" 117 118 #include "catalog/catalog.h" 119 120 #include "lib/ilist.h" 121 122 #include "pgstat.h" 123 124 #include "replication/logical.h" 125 #include "replication/slot.h" 126 127 #include "storage/bufmgr.h" 128 #include "storage/fd.h" 129 #include "storage/smgr.h" 130 131 #include "utils/memutils.h" 132 #include "utils/rel.h" 133 #include "utils/tqual.h" 134 135 #include "storage/procarray.h" 136 137 /* 138 * State associated with a rewrite operation. This is opaque to the user 139 * of the rewrite facility. 140 */ 141 typedef struct RewriteStateData 142 { 143 Relation rs_old_rel; /* source heap */ 144 Relation rs_new_rel; /* destination heap */ 145 Page rs_buffer; /* page currently being built */ 146 BlockNumber rs_blockno; /* block where page will go */ 147 bool rs_buffer_valid; /* T if any tuples in buffer */ 148 bool rs_use_wal; /* must we WAL-log inserts? */ 149 bool rs_logical_rewrite; /* do we need to do logical rewriting */ 150 TransactionId rs_oldest_xmin; /* oldest xmin used by caller to determine 151 * tuple visibility */ 152 TransactionId rs_freeze_xid; /* Xid that will be used as freeze cutoff 153 * point */ 154 TransactionId rs_logical_xmin; /* Xid that will be used as cutoff point 155 * for logical rewrites */ 156 MultiXactId rs_cutoff_multi; /* MultiXactId that will be used as cutoff 157 * point for multixacts */ 158 MemoryContext rs_cxt; /* for hash tables and entries and tuples in 159 * them */ 160 XLogRecPtr rs_begin_lsn; /* XLogInsertLsn when starting the rewrite */ 161 HTAB *rs_unresolved_tups; /* unmatched A tuples */ 162 HTAB *rs_old_new_tid_map; /* unmatched B tuples */ 163 HTAB *rs_logical_mappings; /* logical remapping files */ 164 uint32 rs_num_rewrite_mappings; /* # in memory mappings */ 165 } RewriteStateData; 166 167 /* 168 * The lookup keys for the hash tables are tuple TID and xmin (we must check 169 * both to avoid false matches from dead tuples). Beware that there is 170 * probably some padding space in this struct; it must be zeroed out for 171 * correct hashtable operation. 172 */ 173 typedef struct 174 { 175 TransactionId xmin; /* tuple xmin */ 176 ItemPointerData tid; /* tuple location in old heap */ 177 } TidHashKey; 178 179 /* 180 * Entry structures for the hash tables 181 */ 182 typedef struct 183 { 184 TidHashKey key; /* expected xmin/old location of B tuple */ 185 ItemPointerData old_tid; /* A's location in the old heap */ 186 HeapTuple tuple; /* A's tuple contents */ 187 } UnresolvedTupData; 188 189 typedef UnresolvedTupData *UnresolvedTup; 190 191 typedef struct 192 { 193 TidHashKey key; /* actual xmin/old location of B tuple */ 194 ItemPointerData new_tid; /* where we put it in the new heap */ 195 } OldToNewMappingData; 196 197 typedef OldToNewMappingData *OldToNewMapping; 198 199 /* 200 * In-Memory data for an xid that might need logical remapping entries 201 * to be logged. 202 */ 203 typedef struct RewriteMappingFile 204 { 205 TransactionId xid; /* xid that might need to see the row */ 206 int vfd; /* fd of mappings file */ 207 off_t off; /* how far have we written yet */ 208 uint32 num_mappings; /* number of in-memory mappings */ 209 dlist_head mappings; /* list of in-memory mappings */ 210 char path[MAXPGPATH]; /* path, for error messages */ 211 } RewriteMappingFile; 212 213 /* 214 * A single In-Memory logical rewrite mapping, hanging off 215 * RewriteMappingFile->mappings. 216 */ 217 typedef struct RewriteMappingDataEntry 218 { 219 LogicalRewriteMappingData map; /* map between old and new location of the 220 * tuple */ 221 dlist_node node; 222 } RewriteMappingDataEntry; 223 224 225 /* prototypes for internal functions */ 226 static void raw_heap_insert(RewriteState state, HeapTuple tup); 227 228 /* internal logical remapping prototypes */ 229 static void logical_begin_heap_rewrite(RewriteState state); 230 static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple); 231 static void logical_end_heap_rewrite(RewriteState state); 232 233 234 /* 235 * Begin a rewrite of a table 236 * 237 * old_heap old, locked heap relation tuples will be read from 238 * new_heap new, locked heap relation to insert tuples to 239 * oldest_xmin xid used by the caller to determine which tuples are dead 240 * freeze_xid xid before which tuples will be frozen 241 * min_multi multixact before which multis will be removed 242 * use_wal should the inserts to the new heap be WAL-logged? 243 * 244 * Returns an opaque RewriteState, allocated in current memory context, 245 * to be used in subsequent calls to the other functions. 246 */ 247 RewriteState 248 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, 249 TransactionId freeze_xid, MultiXactId cutoff_multi, 250 bool use_wal) 251 { 252 RewriteState state; 253 MemoryContext rw_cxt; 254 MemoryContext old_cxt; 255 HASHCTL hash_ctl; 256 257 /* 258 * To ease cleanup, make a separate context that will contain the 259 * RewriteState struct itself plus all subsidiary data. 260 */ 261 rw_cxt = AllocSetContextCreate(CurrentMemoryContext, 262 "Table rewrite", 263 ALLOCSET_DEFAULT_SIZES); 264 old_cxt = MemoryContextSwitchTo(rw_cxt); 265 266 /* Create and fill in the state struct */ 267 state = palloc0(sizeof(RewriteStateData)); 268 269 state->rs_old_rel = old_heap; 270 state->rs_new_rel = new_heap; 271 state->rs_buffer = (Page) palloc(BLCKSZ); 272 /* new_heap needn't be empty, just locked */ 273 state->rs_blockno = RelationGetNumberOfBlocks(new_heap); 274 state->rs_buffer_valid = false; 275 state->rs_use_wal = use_wal; 276 state->rs_oldest_xmin = oldest_xmin; 277 state->rs_freeze_xid = freeze_xid; 278 state->rs_cutoff_multi = cutoff_multi; 279 state->rs_cxt = rw_cxt; 280 281 /* Initialize hash tables used to track update chains */ 282 memset(&hash_ctl, 0, sizeof(hash_ctl)); 283 hash_ctl.keysize = sizeof(TidHashKey); 284 hash_ctl.entrysize = sizeof(UnresolvedTupData); 285 hash_ctl.hcxt = state->rs_cxt; 286 287 state->rs_unresolved_tups = 288 hash_create("Rewrite / Unresolved ctids", 289 128, /* arbitrary initial size */ 290 &hash_ctl, 291 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); 292 293 hash_ctl.entrysize = sizeof(OldToNewMappingData); 294 295 state->rs_old_new_tid_map = 296 hash_create("Rewrite / Old to new tid map", 297 128, /* arbitrary initial size */ 298 &hash_ctl, 299 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); 300 301 MemoryContextSwitchTo(old_cxt); 302 303 logical_begin_heap_rewrite(state); 304 305 return state; 306 } 307 308 /* 309 * End a rewrite. 310 * 311 * state and any other resources are freed. 312 */ 313 void 314 end_heap_rewrite(RewriteState state) 315 { 316 HASH_SEQ_STATUS seq_status; 317 UnresolvedTup unresolved; 318 319 /* 320 * Write any remaining tuples in the UnresolvedTups table. If we have any 321 * left, they should in fact be dead, but let's err on the safe side. 322 */ 323 hash_seq_init(&seq_status, state->rs_unresolved_tups); 324 325 while ((unresolved = hash_seq_search(&seq_status)) != NULL) 326 { 327 ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid); 328 raw_heap_insert(state, unresolved->tuple); 329 } 330 331 /* Write the last page, if any */ 332 if (state->rs_buffer_valid) 333 { 334 if (state->rs_use_wal) 335 log_newpage(&state->rs_new_rel->rd_node, 336 MAIN_FORKNUM, 337 state->rs_blockno, 338 state->rs_buffer, 339 true); 340 RelationOpenSmgr(state->rs_new_rel); 341 342 PageSetChecksumInplace(state->rs_buffer, state->rs_blockno); 343 344 smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno, 345 (char *) state->rs_buffer, true); 346 } 347 348 /* 349 * If the rel is WAL-logged, must fsync before commit. We use heap_sync 350 * to ensure that the toast table gets fsync'd too. 351 * 352 * It's obvious that we must do this when not WAL-logging. It's less 353 * obvious that we have to do it even if we did WAL-log the pages. The 354 * reason is the same as in tablecmds.c's copy_relation_data(): we're 355 * writing data that's not in shared buffers, and so a CHECKPOINT 356 * occurring during the rewriteheap operation won't have fsync'd data we 357 * wrote before the checkpoint. 358 */ 359 if (RelationNeedsWAL(state->rs_new_rel)) 360 heap_sync(state->rs_new_rel); 361 362 logical_end_heap_rewrite(state); 363 364 /* Deleting the context frees everything */ 365 MemoryContextDelete(state->rs_cxt); 366 } 367 368 /* 369 * Add a tuple to the new heap. 370 * 371 * Visibility information is copied from the original tuple, except that 372 * we "freeze" very-old tuples. Note that since we scribble on new_tuple, 373 * it had better be temp storage not a pointer to the original tuple. 374 * 375 * state opaque state as returned by begin_heap_rewrite 376 * old_tuple original tuple in the old heap 377 * new_tuple new, rewritten tuple to be inserted to new heap 378 */ 379 void 380 rewrite_heap_tuple(RewriteState state, 381 HeapTuple old_tuple, HeapTuple new_tuple) 382 { 383 MemoryContext old_cxt; 384 ItemPointerData old_tid; 385 TidHashKey hashkey; 386 bool found; 387 bool free_new; 388 389 old_cxt = MemoryContextSwitchTo(state->rs_cxt); 390 391 /* 392 * Copy the original tuple's visibility information into new_tuple. 393 * 394 * XXX we might later need to copy some t_infomask2 bits, too? Right now, 395 * we intentionally clear the HOT status bits. 396 */ 397 memcpy(&new_tuple->t_data->t_choice.t_heap, 398 &old_tuple->t_data->t_choice.t_heap, 399 sizeof(HeapTupleFields)); 400 401 new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK; 402 new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK; 403 new_tuple->t_data->t_infomask |= 404 old_tuple->t_data->t_infomask & HEAP_XACT_MASK; 405 406 /* 407 * While we have our hands on the tuple, we may as well freeze any 408 * eligible xmin or xmax, so that future VACUUM effort can be saved. 409 */ 410 heap_freeze_tuple(new_tuple->t_data, 411 state->rs_old_rel->rd_rel->relfrozenxid, 412 state->rs_old_rel->rd_rel->relminmxid, 413 state->rs_freeze_xid, 414 state->rs_cutoff_multi); 415 416 /* 417 * Invalid ctid means that ctid should point to the tuple itself. We'll 418 * override it later if the tuple is part of an update chain. 419 */ 420 ItemPointerSetInvalid(&new_tuple->t_data->t_ctid); 421 422 /* 423 * If the tuple has been updated, check the old-to-new mapping hash table. 424 */ 425 if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) || 426 HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) && 427 !HeapTupleHeaderIndicatesMovedPartitions(old_tuple->t_data) && 428 !(ItemPointerEquals(&(old_tuple->t_self), 429 &(old_tuple->t_data->t_ctid)))) 430 { 431 OldToNewMapping mapping; 432 433 memset(&hashkey, 0, sizeof(hashkey)); 434 hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data); 435 hashkey.tid = old_tuple->t_data->t_ctid; 436 437 mapping = (OldToNewMapping) 438 hash_search(state->rs_old_new_tid_map, &hashkey, 439 HASH_FIND, NULL); 440 441 if (mapping != NULL) 442 { 443 /* 444 * We've already copied the tuple that t_ctid points to, so we can 445 * set the ctid of this tuple to point to the new location, and 446 * insert it right away. 447 */ 448 new_tuple->t_data->t_ctid = mapping->new_tid; 449 450 /* We don't need the mapping entry anymore */ 451 hash_search(state->rs_old_new_tid_map, &hashkey, 452 HASH_REMOVE, &found); 453 Assert(found); 454 } 455 else 456 { 457 /* 458 * We haven't seen the tuple t_ctid points to yet. Stash this 459 * tuple into unresolved_tups to be written later. 460 */ 461 UnresolvedTup unresolved; 462 463 unresolved = hash_search(state->rs_unresolved_tups, &hashkey, 464 HASH_ENTER, &found); 465 Assert(!found); 466 467 unresolved->old_tid = old_tuple->t_self; 468 unresolved->tuple = heap_copytuple(new_tuple); 469 470 /* 471 * We can't do anything more now, since we don't know where the 472 * tuple will be written. 473 */ 474 MemoryContextSwitchTo(old_cxt); 475 return; 476 } 477 } 478 479 /* 480 * Now we will write the tuple, and then check to see if it is the B tuple 481 * in any new or known pair. When we resolve a known pair, we will be 482 * able to write that pair's A tuple, and then we have to check if it 483 * resolves some other pair. Hence, we need a loop here. 484 */ 485 old_tid = old_tuple->t_self; 486 free_new = false; 487 488 for (;;) 489 { 490 ItemPointerData new_tid; 491 492 /* Insert the tuple and find out where it's put in new_heap */ 493 raw_heap_insert(state, new_tuple); 494 new_tid = new_tuple->t_self; 495 496 logical_rewrite_heap_tuple(state, old_tid, new_tuple); 497 498 /* 499 * If the tuple is the updated version of a row, and the prior version 500 * wouldn't be DEAD yet, then we need to either resolve the prior 501 * version (if it's waiting in rs_unresolved_tups), or make an entry 502 * in rs_old_new_tid_map (so we can resolve it when we do see it). The 503 * previous tuple's xmax would equal this one's xmin, so it's 504 * RECENTLY_DEAD if and only if the xmin is not before OldestXmin. 505 */ 506 if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) && 507 !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data), 508 state->rs_oldest_xmin)) 509 { 510 /* 511 * Okay, this is B in an update pair. See if we've seen A. 512 */ 513 UnresolvedTup unresolved; 514 515 memset(&hashkey, 0, sizeof(hashkey)); 516 hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data); 517 hashkey.tid = old_tid; 518 519 unresolved = hash_search(state->rs_unresolved_tups, &hashkey, 520 HASH_FIND, NULL); 521 522 if (unresolved != NULL) 523 { 524 /* 525 * We have seen and memorized the previous tuple already. Now 526 * that we know where we inserted the tuple its t_ctid points 527 * to, fix its t_ctid and insert it to the new heap. 528 */ 529 if (free_new) 530 heap_freetuple(new_tuple); 531 new_tuple = unresolved->tuple; 532 free_new = true; 533 old_tid = unresolved->old_tid; 534 new_tuple->t_data->t_ctid = new_tid; 535 536 /* 537 * We don't need the hash entry anymore, but don't free its 538 * tuple just yet. 539 */ 540 hash_search(state->rs_unresolved_tups, &hashkey, 541 HASH_REMOVE, &found); 542 Assert(found); 543 544 /* loop back to insert the previous tuple in the chain */ 545 continue; 546 } 547 else 548 { 549 /* 550 * Remember the new tid of this tuple. We'll use it to set the 551 * ctid when we find the previous tuple in the chain. 552 */ 553 OldToNewMapping mapping; 554 555 mapping = hash_search(state->rs_old_new_tid_map, &hashkey, 556 HASH_ENTER, &found); 557 Assert(!found); 558 559 mapping->new_tid = new_tid; 560 } 561 } 562 563 /* Done with this (chain of) tuples, for now */ 564 if (free_new) 565 heap_freetuple(new_tuple); 566 break; 567 } 568 569 MemoryContextSwitchTo(old_cxt); 570 } 571 572 /* 573 * Register a dead tuple with an ongoing rewrite. Dead tuples are not 574 * copied to the new table, but we still make note of them so that we 575 * can release some resources earlier. 576 * 577 * Returns true if a tuple was removed from the unresolved_tups table. 578 * This indicates that that tuple, previously thought to be "recently dead", 579 * is now known really dead and won't be written to the output. 580 */ 581 bool 582 rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple) 583 { 584 /* 585 * If we have already seen an earlier tuple in the update chain that 586 * points to this tuple, let's forget about that earlier tuple. It's in 587 * fact dead as well, our simple xmax < OldestXmin test in 588 * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens 589 * when xmin of a tuple is greater than xmax, which sounds 590 * counter-intuitive but is perfectly valid. 591 * 592 * We don't bother to try to detect the situation the other way round, 593 * when we encounter the dead tuple first and then the recently dead one 594 * that points to it. If that happens, we'll have some unmatched entries 595 * in the UnresolvedTups hash table at the end. That can happen anyway, 596 * because a vacuum might have removed the dead tuple in the chain before 597 * us. 598 */ 599 UnresolvedTup unresolved; 600 TidHashKey hashkey; 601 bool found; 602 603 memset(&hashkey, 0, sizeof(hashkey)); 604 hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data); 605 hashkey.tid = old_tuple->t_self; 606 607 unresolved = hash_search(state->rs_unresolved_tups, &hashkey, 608 HASH_FIND, NULL); 609 610 if (unresolved != NULL) 611 { 612 /* Need to free the contained tuple as well as the hashtable entry */ 613 heap_freetuple(unresolved->tuple); 614 hash_search(state->rs_unresolved_tups, &hashkey, 615 HASH_REMOVE, &found); 616 Assert(found); 617 return true; 618 } 619 620 return false; 621 } 622 623 /* 624 * Insert a tuple to the new relation. This has to track heap_insert 625 * and its subsidiary functions! 626 * 627 * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the 628 * tuple is invalid on entry, it's replaced with the new TID as well (in 629 * the inserted data only, not in the caller's copy). 630 */ 631 static void 632 raw_heap_insert(RewriteState state, HeapTuple tup) 633 { 634 Page page = state->rs_buffer; 635 Size pageFreeSpace, 636 saveFreeSpace; 637 Size len; 638 OffsetNumber newoff; 639 HeapTuple heaptup; 640 641 /* 642 * If the new tuple is too big for storage or contains already toasted 643 * out-of-line attributes from some other relation, invoke the toaster. 644 * 645 * Note: below this point, heaptup is the data we actually intend to store 646 * into the relation; tup is the caller's original untoasted data. 647 */ 648 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE) 649 { 650 /* toast table entries should never be recursively toasted */ 651 Assert(!HeapTupleHasExternal(tup)); 652 heaptup = tup; 653 } 654 else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) 655 { 656 int options = HEAP_INSERT_SKIP_FSM; 657 658 if (!state->rs_use_wal) 659 options |= HEAP_INSERT_SKIP_WAL; 660 661 /* 662 * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data 663 * for the TOAST table are not logically decoded. The main heap is 664 * WAL-logged as XLOG FPI records, which are not logically decoded. 665 */ 666 options |= HEAP_INSERT_NO_LOGICAL; 667 668 heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL, 669 options); 670 } 671 else 672 heaptup = tup; 673 674 len = MAXALIGN(heaptup->t_len); /* be conservative */ 675 676 /* 677 * If we're gonna fail for oversize tuple, do it right away 678 */ 679 if (len > MaxHeapTupleSize) 680 ereport(ERROR, 681 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 682 errmsg("row is too big: size %zu, maximum size %zu", 683 len, MaxHeapTupleSize))); 684 685 /* Compute desired extra freespace due to fillfactor option */ 686 saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel, 687 HEAP_DEFAULT_FILLFACTOR); 688 689 /* Now we can check to see if there's enough free space already. */ 690 if (state->rs_buffer_valid) 691 { 692 pageFreeSpace = PageGetHeapFreeSpace(page); 693 694 if (len + saveFreeSpace > pageFreeSpace) 695 { 696 /* Doesn't fit, so write out the existing page */ 697 698 /* XLOG stuff */ 699 if (state->rs_use_wal) 700 log_newpage(&state->rs_new_rel->rd_node, 701 MAIN_FORKNUM, 702 state->rs_blockno, 703 page, 704 true); 705 706 /* 707 * Now write the page. We say isTemp = true even if it's not a 708 * temp table, because there's no need for smgr to schedule an 709 * fsync for this write; we'll do it ourselves in 710 * end_heap_rewrite. 711 */ 712 RelationOpenSmgr(state->rs_new_rel); 713 714 PageSetChecksumInplace(page, state->rs_blockno); 715 716 smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, 717 state->rs_blockno, (char *) page, true); 718 719 state->rs_blockno++; 720 state->rs_buffer_valid = false; 721 } 722 } 723 724 if (!state->rs_buffer_valid) 725 { 726 /* Initialize a new empty page */ 727 PageInit(page, BLCKSZ, 0); 728 state->rs_buffer_valid = true; 729 } 730 731 /* And now we can insert the tuple into the page */ 732 newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len, 733 InvalidOffsetNumber, false, true); 734 if (newoff == InvalidOffsetNumber) 735 elog(ERROR, "failed to add tuple"); 736 737 /* Update caller's t_self to the actual position where it was stored */ 738 ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff); 739 740 /* 741 * Insert the correct position into CTID of the stored tuple, too, if the 742 * caller didn't supply a valid CTID. 743 */ 744 if (!ItemPointerIsValid(&tup->t_data->t_ctid)) 745 { 746 ItemId newitemid; 747 HeapTupleHeader onpage_tup; 748 749 newitemid = PageGetItemId(page, newoff); 750 onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid); 751 752 onpage_tup->t_ctid = tup->t_self; 753 } 754 755 /* If heaptup is a private copy, release it. */ 756 if (heaptup != tup) 757 heap_freetuple(heaptup); 758 } 759 760 /* ------------------------------------------------------------------------ 761 * Logical rewrite support 762 * 763 * When doing logical decoding - which relies on using cmin/cmax of catalog 764 * tuples, via xl_heap_new_cid records - heap rewrites have to log enough 765 * information to allow the decoding backend to updates its internal mapping 766 * of (relfilenode,ctid) => (cmin, cmax) to be correct for the rewritten heap. 767 * 768 * For that, every time we find a tuple that's been modified in a catalog 769 * relation within the xmin horizon of any decoding slot, we log a mapping 770 * from the old to the new location. 771 * 772 * To deal with rewrites that abort the filename of a mapping file contains 773 * the xid of the transaction performing the rewrite, which then can be 774 * checked before being read in. 775 * 776 * For efficiency we don't immediately spill every single map mapping for a 777 * row to disk but only do so in batches when we've collected several of them 778 * in memory or when end_heap_rewrite() has been called. 779 * 780 * Crash-Safety: This module diverts from the usual patterns of doing WAL 781 * since it cannot rely on checkpoint flushing out all buffers and thus 782 * waiting for exclusive locks on buffers. Usually the XLogInsert() covering 783 * buffer modifications is performed while the buffer(s) that are being 784 * modified are exclusively locked guaranteeing that both the WAL record and 785 * the modified heap are on either side of the checkpoint. But since the 786 * mapping files we log aren't in shared_buffers that interlock doesn't work. 787 * 788 * Instead we simply write the mapping files out to disk, *before* the 789 * XLogInsert() is performed. That guarantees that either the XLogInsert() is 790 * inserted after the checkpoint's redo pointer or that the checkpoint (via 791 * LogicalRewriteHeapCheckpoint()) has flushed the (partial) mapping file to 792 * disk. That leaves the tail end that has not yet been flushed open to 793 * corruption, which is solved by including the current offset in the 794 * xl_heap_rewrite_mapping records and truncating the mapping file to it 795 * during replay. Every time a rewrite is finished all generated mapping files 796 * are synced to disk. 797 * 798 * Note that if we were only concerned about crash safety we wouldn't have to 799 * deal with WAL logging at all - an fsync() at the end of a rewrite would be 800 * sufficient for crash safety. Any mapping that hasn't been safely flushed to 801 * disk has to be by an aborted (explicitly or via a crash) transaction and is 802 * ignored by virtue of the xid in its name being subject to a 803 * TransactionDidCommit() check. But we want to support having standbys via 804 * physical replication, both for availability and to do logical decoding 805 * there. 806 * ------------------------------------------------------------------------ 807 */ 808 809 /* 810 * Do preparations for logging logical mappings during a rewrite if 811 * necessary. If we detect that we don't need to log anything we'll prevent 812 * any further action by the various logical rewrite functions. 813 */ 814 static void 815 logical_begin_heap_rewrite(RewriteState state) 816 { 817 HASHCTL hash_ctl; 818 TransactionId logical_xmin; 819 820 /* 821 * We only need to persist these mappings if the rewritten table can be 822 * accessed during logical decoding, if not, we can skip doing any 823 * additional work. 824 */ 825 state->rs_logical_rewrite = 826 RelationIsAccessibleInLogicalDecoding(state->rs_old_rel); 827 828 if (!state->rs_logical_rewrite) 829 return; 830 831 ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); 832 833 /* 834 * If there are no logical slots in progress we don't need to do anything, 835 * there cannot be any remappings for relevant rows yet. The relation's 836 * lock protects us against races. 837 */ 838 if (logical_xmin == InvalidTransactionId) 839 { 840 state->rs_logical_rewrite = false; 841 return; 842 } 843 844 state->rs_logical_xmin = logical_xmin; 845 state->rs_begin_lsn = GetXLogInsertRecPtr(); 846 state->rs_num_rewrite_mappings = 0; 847 848 memset(&hash_ctl, 0, sizeof(hash_ctl)); 849 hash_ctl.keysize = sizeof(TransactionId); 850 hash_ctl.entrysize = sizeof(RewriteMappingFile); 851 hash_ctl.hcxt = state->rs_cxt; 852 853 state->rs_logical_mappings = 854 hash_create("Logical rewrite mapping", 855 128, /* arbitrary initial size */ 856 &hash_ctl, 857 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); 858 } 859 860 /* 861 * Flush all logical in-memory mappings to disk, but don't fsync them yet. 862 */ 863 static void 864 logical_heap_rewrite_flush_mappings(RewriteState state) 865 { 866 HASH_SEQ_STATUS seq_status; 867 RewriteMappingFile *src; 868 dlist_mutable_iter iter; 869 870 Assert(state->rs_logical_rewrite); 871 872 /* no logical rewrite in progress, no need to iterate over mappings */ 873 if (state->rs_num_rewrite_mappings == 0) 874 return; 875 876 elog(DEBUG1, "flushing %u logical rewrite mapping entries", 877 state->rs_num_rewrite_mappings); 878 879 hash_seq_init(&seq_status, state->rs_logical_mappings); 880 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL) 881 { 882 char *waldata; 883 char *waldata_start; 884 xl_heap_rewrite_mapping xlrec; 885 Oid dboid; 886 uint32 len; 887 int written; 888 889 /* this file hasn't got any new mappings */ 890 if (src->num_mappings == 0) 891 continue; 892 893 if (state->rs_old_rel->rd_rel->relisshared) 894 dboid = InvalidOid; 895 else 896 dboid = MyDatabaseId; 897 898 xlrec.num_mappings = src->num_mappings; 899 xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel); 900 xlrec.mapped_xid = src->xid; 901 xlrec.mapped_db = dboid; 902 xlrec.offset = src->off; 903 xlrec.start_lsn = state->rs_begin_lsn; 904 905 /* write all mappings consecutively */ 906 len = src->num_mappings * sizeof(LogicalRewriteMappingData); 907 waldata_start = waldata = palloc(len); 908 909 /* 910 * collect data we need to write out, but don't modify ondisk data yet 911 */ 912 dlist_foreach_modify(iter, &src->mappings) 913 { 914 RewriteMappingDataEntry *pmap; 915 916 pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur); 917 918 memcpy(waldata, &pmap->map, sizeof(pmap->map)); 919 waldata += sizeof(pmap->map); 920 921 /* remove from the list and free */ 922 dlist_delete(&pmap->node); 923 pfree(pmap); 924 925 /* update bookkeeping */ 926 state->rs_num_rewrite_mappings--; 927 src->num_mappings--; 928 } 929 930 Assert(src->num_mappings == 0); 931 Assert(waldata == waldata_start + len); 932 933 /* 934 * Note that we deviate from the usual WAL coding practices here, 935 * check the above "Logical rewrite support" comment for reasoning. 936 */ 937 written = FileWrite(src->vfd, waldata_start, len, 938 WAIT_EVENT_LOGICAL_REWRITE_WRITE); 939 if (written != len) 940 ereport(ERROR, 941 (errcode_for_file_access(), 942 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path, 943 written, len))); 944 src->off += len; 945 946 XLogBeginInsert(); 947 XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); 948 XLogRegisterData(waldata_start, len); 949 950 /* write xlog record */ 951 XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE); 952 953 pfree(waldata_start); 954 } 955 Assert(state->rs_num_rewrite_mappings == 0); 956 } 957 958 /* 959 * Logical remapping part of end_heap_rewrite(). 960 */ 961 static void 962 logical_end_heap_rewrite(RewriteState state) 963 { 964 HASH_SEQ_STATUS seq_status; 965 RewriteMappingFile *src; 966 967 /* done, no logical rewrite in progress */ 968 if (!state->rs_logical_rewrite) 969 return; 970 971 /* writeout remaining in-memory entries */ 972 if (state->rs_num_rewrite_mappings > 0) 973 logical_heap_rewrite_flush_mappings(state); 974 975 /* Iterate over all mappings we have written and fsync the files. */ 976 hash_seq_init(&seq_status, state->rs_logical_mappings); 977 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL) 978 { 979 if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0) 980 ereport(data_sync_elevel(ERROR), 981 (errcode_for_file_access(), 982 errmsg("could not fsync file \"%s\": %m", src->path))); 983 FileClose(src->vfd); 984 } 985 /* memory context cleanup will deal with the rest */ 986 } 987 988 /* 989 * Log a single (old->new) mapping for 'xid'. 990 */ 991 static void 992 logical_rewrite_log_mapping(RewriteState state, TransactionId xid, 993 LogicalRewriteMappingData *map) 994 { 995 RewriteMappingFile *src; 996 RewriteMappingDataEntry *pmap; 997 Oid relid; 998 bool found; 999 1000 relid = RelationGetRelid(state->rs_old_rel); 1001 1002 /* look for existing mappings for this 'mapped' xid */ 1003 src = hash_search(state->rs_logical_mappings, &xid, 1004 HASH_ENTER, &found); 1005 1006 /* 1007 * We haven't yet had the need to map anything for this xid, create 1008 * per-xid data structures. 1009 */ 1010 if (!found) 1011 { 1012 char path[MAXPGPATH]; 1013 Oid dboid; 1014 1015 if (state->rs_old_rel->rd_rel->relisshared) 1016 dboid = InvalidOid; 1017 else 1018 dboid = MyDatabaseId; 1019 1020 snprintf(path, MAXPGPATH, 1021 "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT, 1022 dboid, relid, 1023 (uint32) (state->rs_begin_lsn >> 32), 1024 (uint32) state->rs_begin_lsn, 1025 xid, GetCurrentTransactionId()); 1026 1027 dlist_init(&src->mappings); 1028 src->num_mappings = 0; 1029 src->off = 0; 1030 memcpy(src->path, path, sizeof(path)); 1031 src->vfd = PathNameOpenFile(path, 1032 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); 1033 if (src->vfd < 0) 1034 ereport(ERROR, 1035 (errcode_for_file_access(), 1036 errmsg("could not create file \"%s\": %m", path))); 1037 } 1038 1039 pmap = MemoryContextAlloc(state->rs_cxt, 1040 sizeof(RewriteMappingDataEntry)); 1041 memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData)); 1042 dlist_push_tail(&src->mappings, &pmap->node); 1043 src->num_mappings++; 1044 state->rs_num_rewrite_mappings++; 1045 1046 /* 1047 * Write out buffer every time we've too many in-memory entries across all 1048 * mapping files. 1049 */ 1050 if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ ) 1051 logical_heap_rewrite_flush_mappings(state); 1052 } 1053 1054 /* 1055 * Perform logical remapping for a tuple that's mapped from old_tid to 1056 * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple. 1057 */ 1058 static void 1059 logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, 1060 HeapTuple new_tuple) 1061 { 1062 ItemPointerData new_tid = new_tuple->t_self; 1063 TransactionId cutoff = state->rs_logical_xmin; 1064 TransactionId xmin; 1065 TransactionId xmax; 1066 bool do_log_xmin = false; 1067 bool do_log_xmax = false; 1068 LogicalRewriteMappingData map; 1069 1070 /* no logical rewrite in progress, we don't need to log anything */ 1071 if (!state->rs_logical_rewrite) 1072 return; 1073 1074 xmin = HeapTupleHeaderGetXmin(new_tuple->t_data); 1075 /* use *GetUpdateXid to correctly deal with multixacts */ 1076 xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data); 1077 1078 /* 1079 * Log the mapping iff the tuple has been created recently. 1080 */ 1081 if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff)) 1082 do_log_xmin = true; 1083 1084 if (!TransactionIdIsNormal(xmax)) 1085 { 1086 /* 1087 * no xmax is set, can't have any permanent ones, so this check is 1088 * sufficient 1089 */ 1090 } 1091 else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask)) 1092 { 1093 /* only locked, we don't care */ 1094 } 1095 else if (!TransactionIdPrecedes(xmax, cutoff)) 1096 { 1097 /* tuple has been deleted recently, log */ 1098 do_log_xmax = true; 1099 } 1100 1101 /* if neither needs to be logged, we're done */ 1102 if (!do_log_xmin && !do_log_xmax) 1103 return; 1104 1105 /* fill out mapping information */ 1106 map.old_node = state->rs_old_rel->rd_node; 1107 map.old_tid = old_tid; 1108 map.new_node = state->rs_new_rel->rd_node; 1109 map.new_tid = new_tid; 1110 1111 /* --- 1112 * Now persist the mapping for the individual xids that are affected. We 1113 * need to log for both xmin and xmax if they aren't the same transaction 1114 * since the mapping files are per "affected" xid. 1115 * We don't muster all that much effort detecting whether xmin and xmax 1116 * are actually the same transaction, we just check whether the xid is the 1117 * same disregarding subtransactions. Logging too much is relatively 1118 * harmless and we could never do the check fully since subtransaction 1119 * data is thrown away during restarts. 1120 * --- 1121 */ 1122 if (do_log_xmin) 1123 logical_rewrite_log_mapping(state, xmin, &map); 1124 /* separately log mapping for xmax unless it'd be redundant */ 1125 if (do_log_xmax && !TransactionIdEquals(xmin, xmax)) 1126 logical_rewrite_log_mapping(state, xmax, &map); 1127 } 1128 1129 /* 1130 * Replay XLOG_HEAP2_REWRITE records 1131 */ 1132 void 1133 heap_xlog_logical_rewrite(XLogReaderState *r) 1134 { 1135 char path[MAXPGPATH]; 1136 int fd; 1137 xl_heap_rewrite_mapping *xlrec; 1138 uint32 len; 1139 char *data; 1140 1141 xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r); 1142 1143 snprintf(path, MAXPGPATH, 1144 "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT, 1145 xlrec->mapped_db, xlrec->mapped_rel, 1146 (uint32) (xlrec->start_lsn >> 32), 1147 (uint32) xlrec->start_lsn, 1148 xlrec->mapped_xid, XLogRecGetXid(r)); 1149 1150 fd = OpenTransientFile(path, 1151 O_CREAT | O_WRONLY | PG_BINARY); 1152 if (fd < 0) 1153 ereport(ERROR, 1154 (errcode_for_file_access(), 1155 errmsg("could not create file \"%s\": %m", path))); 1156 1157 /* 1158 * Truncate all data that's not guaranteed to have been safely fsynced (by 1159 * previous record or by the last checkpoint). 1160 */ 1161 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE); 1162 if (ftruncate(fd, xlrec->offset) != 0) 1163 ereport(ERROR, 1164 (errcode_for_file_access(), 1165 errmsg("could not truncate file \"%s\" to %u: %m", 1166 path, (uint32) xlrec->offset))); 1167 pgstat_report_wait_end(); 1168 1169 /* now seek to the position we want to write our data to */ 1170 if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset) 1171 ereport(ERROR, 1172 (errcode_for_file_access(), 1173 errmsg("could not seek to end of file \"%s\": %m", 1174 path))); 1175 1176 data = XLogRecGetData(r) + sizeof(*xlrec); 1177 1178 len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData); 1179 1180 /* write out tail end of mapping file (again) */ 1181 errno = 0; 1182 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE); 1183 if (write(fd, data, len) != len) 1184 { 1185 /* if write didn't set errno, assume problem is no disk space */ 1186 if (errno == 0) 1187 errno = ENOSPC; 1188 ereport(ERROR, 1189 (errcode_for_file_access(), 1190 errmsg("could not write to file \"%s\": %m", path))); 1191 } 1192 pgstat_report_wait_end(); 1193 1194 /* 1195 * Now fsync all previously written data. We could improve things and only 1196 * do this for the last write to a file, but the required bookkeeping 1197 * doesn't seem worth the trouble. 1198 */ 1199 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC); 1200 if (pg_fsync(fd) != 0) 1201 ereport(data_sync_elevel(ERROR), 1202 (errcode_for_file_access(), 1203 errmsg("could not fsync file \"%s\": %m", path))); 1204 pgstat_report_wait_end(); 1205 1206 CloseTransientFile(fd); 1207 } 1208 1209 /* --- 1210 * Perform a checkpoint for logical rewrite mappings 1211 * 1212 * This serves two tasks: 1213 * 1) Remove all mappings not needed anymore based on the logical restart LSN 1214 * 2) Flush all remaining mappings to disk, so that replay after a checkpoint 1215 * only has to deal with the parts of a mapping that have been written out 1216 * after the checkpoint started. 1217 * --- 1218 */ 1219 void 1220 CheckPointLogicalRewriteHeap(void) 1221 { 1222 XLogRecPtr cutoff; 1223 XLogRecPtr redo; 1224 DIR *mappings_dir; 1225 struct dirent *mapping_de; 1226 char path[MAXPGPATH + 20]; 1227 1228 /* 1229 * We start of with a minimum of the last redo pointer. No new decoding 1230 * slot will start before that, so that's a safe upper bound for removal. 1231 */ 1232 redo = GetRedoRecPtr(); 1233 1234 /* now check for the restart ptrs from existing slots */ 1235 cutoff = ReplicationSlotsComputeLogicalRestartLSN(); 1236 1237 /* don't start earlier than the restart lsn */ 1238 if (cutoff != InvalidXLogRecPtr && redo < cutoff) 1239 cutoff = redo; 1240 1241 mappings_dir = AllocateDir("pg_logical/mappings"); 1242 while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL) 1243 { 1244 struct stat statbuf; 1245 Oid dboid; 1246 Oid relid; 1247 XLogRecPtr lsn; 1248 TransactionId rewrite_xid; 1249 TransactionId create_xid; 1250 uint32 hi, 1251 lo; 1252 1253 if (strcmp(mapping_de->d_name, ".") == 0 || 1254 strcmp(mapping_de->d_name, "..") == 0) 1255 continue; 1256 1257 snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name); 1258 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode)) 1259 continue; 1260 1261 /* Skip over files that cannot be ours. */ 1262 if (strncmp(mapping_de->d_name, "map-", 4) != 0) 1263 continue; 1264 1265 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT, 1266 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6) 1267 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name); 1268 1269 lsn = ((uint64) hi) << 32 | lo; 1270 1271 if (lsn < cutoff || cutoff == InvalidXLogRecPtr) 1272 { 1273 elog(DEBUG1, "removing logical rewrite file \"%s\"", path); 1274 if (unlink(path) < 0) 1275 ereport(ERROR, 1276 (errcode_for_file_access(), 1277 errmsg("could not remove file \"%s\": %m", path))); 1278 } 1279 else 1280 { 1281 /* on some operating systems fsyncing a file requires O_RDWR */ 1282 int fd = OpenTransientFile(path, O_RDWR | PG_BINARY); 1283 1284 /* 1285 * The file cannot vanish due to concurrency since this function 1286 * is the only one removing logical mappings and it's run while 1287 * CheckpointLock is held exclusively. 1288 */ 1289 if (fd < 0) 1290 ereport(ERROR, 1291 (errcode_for_file_access(), 1292 errmsg("could not open file \"%s\": %m", path))); 1293 1294 /* 1295 * We could try to avoid fsyncing files that either haven't 1296 * changed or have only been created since the checkpoint's start, 1297 * but it's currently not deemed worth the effort. 1298 */ 1299 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC); 1300 if (pg_fsync(fd) != 0) 1301 ereport(data_sync_elevel(ERROR), 1302 (errcode_for_file_access(), 1303 errmsg("could not fsync file \"%s\": %m", path))); 1304 pgstat_report_wait_end(); 1305 CloseTransientFile(fd); 1306 } 1307 } 1308 FreeDir(mappings_dir); 1309 } 1310