1 /*------------------------------------------------------------------------- 2 * 3 * hash_xlog.c 4 * WAL replay logic for hash index. 5 * 6 * 7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994, Regents of the University of California 9 * 10 * IDENTIFICATION 11 * src/backend/access/hash/hash_xlog.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 #include "postgres.h" 16 17 #include "access/bufmask.h" 18 #include "access/hash.h" 19 #include "access/hash_xlog.h" 20 #include "access/xlogutils.h" 21 #include "access/xlog.h" 22 #include "access/transam.h" 23 #include "storage/procarray.h" 24 #include "miscadmin.h" 25 26 /* 27 * replay a hash index meta page 28 */ 29 static void 30 hash_xlog_init_meta_page(XLogReaderState *record) 31 { 32 XLogRecPtr lsn = record->EndRecPtr; 33 Page page; 34 Buffer metabuf; 35 ForkNumber forknum; 36 37 xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record); 38 39 /* create the index' metapage */ 40 metabuf = XLogInitBufferForRedo(record, 0); 41 Assert(BufferIsValid(metabuf)); 42 _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid, 43 xlrec->ffactor, true); 44 page = (Page) BufferGetPage(metabuf); 45 PageSetLSN(page, lsn); 46 MarkBufferDirty(metabuf); 47 48 /* 49 * Force the on-disk state of init forks to always be in sync with the 50 * state in shared buffers. See XLogReadBufferForRedoExtended. We need 51 * special handling for init forks as create index operations don't log a 52 * full page image of the metapage. 53 */ 54 XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL); 55 if (forknum == INIT_FORKNUM) 56 FlushOneBuffer(metabuf); 57 58 /* all done */ 59 UnlockReleaseBuffer(metabuf); 60 } 61 62 /* 63 * replay a hash index bitmap page 64 */ 65 static void 66 hash_xlog_init_bitmap_page(XLogReaderState *record) 67 { 68 XLogRecPtr lsn = record->EndRecPtr; 69 Buffer bitmapbuf; 70 Buffer metabuf; 71 Page page; 72 HashMetaPage metap; 73 uint32 num_buckets; 74 ForkNumber forknum; 75 76 xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record); 77 78 /* 79 * Initialize bitmap page 80 */ 81 bitmapbuf = XLogInitBufferForRedo(record, 0); 82 _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true); 83 PageSetLSN(BufferGetPage(bitmapbuf), lsn); 84 MarkBufferDirty(bitmapbuf); 85 86 /* 87 * Force the on-disk state of init forks to always be in sync with the 88 * state in shared buffers. See XLogReadBufferForRedoExtended. We need 89 * special handling for init forks as create index operations don't log a 90 * full page image of the metapage. 91 */ 92 XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL); 93 if (forknum == INIT_FORKNUM) 94 FlushOneBuffer(bitmapbuf); 95 UnlockReleaseBuffer(bitmapbuf); 96 97 /* add the new bitmap page to the metapage's list of bitmaps */ 98 if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO) 99 { 100 /* 101 * Note: in normal operation, we'd update the metapage while still 102 * holding lock on the bitmap page. But during replay it's not 103 * necessary to hold that lock, since nobody can see it yet; the 104 * creating transaction hasn't yet committed. 105 */ 106 page = BufferGetPage(metabuf); 107 metap = HashPageGetMeta(page); 108 109 num_buckets = metap->hashm_maxbucket + 1; 110 metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1; 111 metap->hashm_nmaps++; 112 113 PageSetLSN(page, lsn); 114 MarkBufferDirty(metabuf); 115 116 XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL); 117 if (forknum == INIT_FORKNUM) 118 FlushOneBuffer(metabuf); 119 } 120 if (BufferIsValid(metabuf)) 121 UnlockReleaseBuffer(metabuf); 122 } 123 124 /* 125 * replay a hash index insert without split 126 */ 127 static void 128 hash_xlog_insert(XLogReaderState *record) 129 { 130 HashMetaPage metap; 131 XLogRecPtr lsn = record->EndRecPtr; 132 xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record); 133 Buffer buffer; 134 Page page; 135 136 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) 137 { 138 Size datalen; 139 char *datapos = XLogRecGetBlockData(record, 0, &datalen); 140 141 page = BufferGetPage(buffer); 142 143 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum, 144 false, false) == InvalidOffsetNumber) 145 elog(PANIC, "hash_xlog_insert: failed to add item"); 146 147 PageSetLSN(page, lsn); 148 MarkBufferDirty(buffer); 149 } 150 if (BufferIsValid(buffer)) 151 UnlockReleaseBuffer(buffer); 152 153 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) 154 { 155 /* 156 * Note: in normal operation, we'd update the metapage while still 157 * holding lock on the page we inserted into. But during replay it's 158 * not necessary to hold that lock, since no other index updates can 159 * be happening concurrently. 160 */ 161 page = BufferGetPage(buffer); 162 metap = HashPageGetMeta(page); 163 metap->hashm_ntuples += 1; 164 165 PageSetLSN(page, lsn); 166 MarkBufferDirty(buffer); 167 } 168 if (BufferIsValid(buffer)) 169 UnlockReleaseBuffer(buffer); 170 } 171 172 /* 173 * replay addition of overflow page for hash index 174 */ 175 static void 176 hash_xlog_add_ovfl_page(XLogReaderState *record) 177 { 178 XLogRecPtr lsn = record->EndRecPtr; 179 xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record); 180 Buffer leftbuf; 181 Buffer ovflbuf; 182 Buffer metabuf; 183 BlockNumber leftblk; 184 BlockNumber rightblk; 185 BlockNumber newmapblk = InvalidBlockNumber; 186 Page ovflpage; 187 HashPageOpaque ovflopaque; 188 uint32 *num_bucket; 189 char *data; 190 Size datalen PG_USED_FOR_ASSERTS_ONLY; 191 bool new_bmpage = false; 192 193 XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk); 194 XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk); 195 196 ovflbuf = XLogInitBufferForRedo(record, 0); 197 Assert(BufferIsValid(ovflbuf)); 198 199 data = XLogRecGetBlockData(record, 0, &datalen); 200 num_bucket = (uint32 *) data; 201 Assert(datalen == sizeof(uint32)); 202 _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE, 203 true); 204 /* update backlink */ 205 ovflpage = BufferGetPage(ovflbuf); 206 ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage); 207 ovflopaque->hasho_prevblkno = leftblk; 208 209 PageSetLSN(ovflpage, lsn); 210 MarkBufferDirty(ovflbuf); 211 212 if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO) 213 { 214 Page leftpage; 215 HashPageOpaque leftopaque; 216 217 leftpage = BufferGetPage(leftbuf); 218 leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage); 219 leftopaque->hasho_nextblkno = rightblk; 220 221 PageSetLSN(leftpage, lsn); 222 MarkBufferDirty(leftbuf); 223 } 224 225 if (BufferIsValid(leftbuf)) 226 UnlockReleaseBuffer(leftbuf); 227 UnlockReleaseBuffer(ovflbuf); 228 229 /* 230 * Note: in normal operation, we'd update the bitmap and meta page while 231 * still holding lock on the overflow pages. But during replay it's not 232 * necessary to hold those locks, since no other index updates can be 233 * happening concurrently. 234 */ 235 if (XLogRecHasBlockRef(record, 2)) 236 { 237 Buffer mapbuffer; 238 239 if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO) 240 { 241 Page mappage = (Page) BufferGetPage(mapbuffer); 242 uint32 *freep = NULL; 243 char *data; 244 uint32 *bitmap_page_bit; 245 246 freep = HashPageGetBitmap(mappage); 247 248 data = XLogRecGetBlockData(record, 2, &datalen); 249 bitmap_page_bit = (uint32 *) data; 250 251 SETBIT(freep, *bitmap_page_bit); 252 253 PageSetLSN(mappage, lsn); 254 MarkBufferDirty(mapbuffer); 255 } 256 if (BufferIsValid(mapbuffer)) 257 UnlockReleaseBuffer(mapbuffer); 258 } 259 260 if (XLogRecHasBlockRef(record, 3)) 261 { 262 Buffer newmapbuf; 263 264 newmapbuf = XLogInitBufferForRedo(record, 3); 265 266 _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true); 267 268 new_bmpage = true; 269 newmapblk = BufferGetBlockNumber(newmapbuf); 270 271 MarkBufferDirty(newmapbuf); 272 PageSetLSN(BufferGetPage(newmapbuf), lsn); 273 274 UnlockReleaseBuffer(newmapbuf); 275 } 276 277 if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO) 278 { 279 HashMetaPage metap; 280 Page page; 281 uint32 *firstfree_ovflpage; 282 283 data = XLogRecGetBlockData(record, 4, &datalen); 284 firstfree_ovflpage = (uint32 *) data; 285 286 page = BufferGetPage(metabuf); 287 metap = HashPageGetMeta(page); 288 metap->hashm_firstfree = *firstfree_ovflpage; 289 290 if (!xlrec->bmpage_found) 291 { 292 metap->hashm_spares[metap->hashm_ovflpoint]++; 293 294 if (new_bmpage) 295 { 296 Assert(BlockNumberIsValid(newmapblk)); 297 298 metap->hashm_mapp[metap->hashm_nmaps] = newmapblk; 299 metap->hashm_nmaps++; 300 metap->hashm_spares[metap->hashm_ovflpoint]++; 301 } 302 } 303 304 PageSetLSN(page, lsn); 305 MarkBufferDirty(metabuf); 306 } 307 if (BufferIsValid(metabuf)) 308 UnlockReleaseBuffer(metabuf); 309 } 310 311 /* 312 * replay allocation of page for split operation 313 */ 314 static void 315 hash_xlog_split_allocate_page(XLogReaderState *record) 316 { 317 XLogRecPtr lsn = record->EndRecPtr; 318 xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record); 319 Buffer oldbuf; 320 Buffer newbuf; 321 Buffer metabuf; 322 Size datalen PG_USED_FOR_ASSERTS_ONLY; 323 char *data; 324 XLogRedoAction action; 325 326 /* 327 * To be consistent with normal operation, here we take cleanup locks on 328 * both the old and new buckets even though there can't be any concurrent 329 * inserts. 330 */ 331 332 /* replay the record for old bucket */ 333 action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf); 334 335 /* 336 * Note that we still update the page even if it was restored from a full 337 * page image, because the special space is not included in the image. 338 */ 339 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED) 340 { 341 Page oldpage; 342 HashPageOpaque oldopaque; 343 344 oldpage = BufferGetPage(oldbuf); 345 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage); 346 347 oldopaque->hasho_flag = xlrec->old_bucket_flag; 348 oldopaque->hasho_prevblkno = xlrec->new_bucket; 349 350 PageSetLSN(oldpage, lsn); 351 MarkBufferDirty(oldbuf); 352 } 353 354 /* replay the record for new bucket */ 355 newbuf = XLogInitBufferForRedo(record, 1); 356 _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket, 357 xlrec->new_bucket_flag, true); 358 if (!IsBufferCleanupOK(newbuf)) 359 elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock"); 360 MarkBufferDirty(newbuf); 361 PageSetLSN(BufferGetPage(newbuf), lsn); 362 363 /* 364 * We can release the lock on old bucket early as well but doing here to 365 * consistent with normal operation. 366 */ 367 if (BufferIsValid(oldbuf)) 368 UnlockReleaseBuffer(oldbuf); 369 if (BufferIsValid(newbuf)) 370 UnlockReleaseBuffer(newbuf); 371 372 /* 373 * Note: in normal operation, we'd update the meta page while still 374 * holding lock on the old and new bucket pages. But during replay it's 375 * not necessary to hold those locks, since no other bucket splits can be 376 * happening concurrently. 377 */ 378 379 /* replay the record for metapage changes */ 380 if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO) 381 { 382 Page page; 383 HashMetaPage metap; 384 385 page = BufferGetPage(metabuf); 386 metap = HashPageGetMeta(page); 387 metap->hashm_maxbucket = xlrec->new_bucket; 388 389 data = XLogRecGetBlockData(record, 2, &datalen); 390 391 if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS) 392 { 393 uint32 lowmask; 394 uint32 *highmask; 395 396 /* extract low and high masks. */ 397 memcpy(&lowmask, data, sizeof(uint32)); 398 highmask = (uint32 *) ((char *) data + sizeof(uint32)); 399 400 /* update metapage */ 401 metap->hashm_lowmask = lowmask; 402 metap->hashm_highmask = *highmask; 403 404 data += sizeof(uint32) * 2; 405 } 406 407 if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT) 408 { 409 uint32 ovflpoint; 410 uint32 *ovflpages; 411 412 /* extract information of overflow pages. */ 413 memcpy(&ovflpoint, data, sizeof(uint32)); 414 ovflpages = (uint32 *) ((char *) data + sizeof(uint32)); 415 416 /* update metapage */ 417 metap->hashm_spares[ovflpoint] = *ovflpages; 418 metap->hashm_ovflpoint = ovflpoint; 419 } 420 421 MarkBufferDirty(metabuf); 422 PageSetLSN(BufferGetPage(metabuf), lsn); 423 } 424 425 if (BufferIsValid(metabuf)) 426 UnlockReleaseBuffer(metabuf); 427 } 428 429 /* 430 * replay of split operation 431 */ 432 static void 433 hash_xlog_split_page(XLogReaderState *record) 434 { 435 Buffer buf; 436 437 if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED) 438 elog(ERROR, "Hash split record did not contain a full-page image"); 439 440 UnlockReleaseBuffer(buf); 441 } 442 443 /* 444 * replay completion of split operation 445 */ 446 static void 447 hash_xlog_split_complete(XLogReaderState *record) 448 { 449 XLogRecPtr lsn = record->EndRecPtr; 450 xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record); 451 Buffer oldbuf; 452 Buffer newbuf; 453 XLogRedoAction action; 454 455 /* replay the record for old bucket */ 456 action = XLogReadBufferForRedo(record, 0, &oldbuf); 457 458 /* 459 * Note that we still update the page even if it was restored from a full 460 * page image, because the bucket flag is not included in the image. 461 */ 462 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED) 463 { 464 Page oldpage; 465 HashPageOpaque oldopaque; 466 467 oldpage = BufferGetPage(oldbuf); 468 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage); 469 470 oldopaque->hasho_flag = xlrec->old_bucket_flag; 471 472 PageSetLSN(oldpage, lsn); 473 MarkBufferDirty(oldbuf); 474 } 475 if (BufferIsValid(oldbuf)) 476 UnlockReleaseBuffer(oldbuf); 477 478 /* replay the record for new bucket */ 479 action = XLogReadBufferForRedo(record, 1, &newbuf); 480 481 /* 482 * Note that we still update the page even if it was restored from a full 483 * page image, because the bucket flag is not included in the image. 484 */ 485 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED) 486 { 487 Page newpage; 488 HashPageOpaque nopaque; 489 490 newpage = BufferGetPage(newbuf); 491 nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage); 492 493 nopaque->hasho_flag = xlrec->new_bucket_flag; 494 495 PageSetLSN(newpage, lsn); 496 MarkBufferDirty(newbuf); 497 } 498 if (BufferIsValid(newbuf)) 499 UnlockReleaseBuffer(newbuf); 500 } 501 502 /* 503 * replay move of page contents for squeeze operation of hash index 504 */ 505 static void 506 hash_xlog_move_page_contents(XLogReaderState *record) 507 { 508 XLogRecPtr lsn = record->EndRecPtr; 509 xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record); 510 Buffer bucketbuf = InvalidBuffer; 511 Buffer writebuf = InvalidBuffer; 512 Buffer deletebuf = InvalidBuffer; 513 XLogRedoAction action; 514 515 /* 516 * Ensure we have a cleanup lock on primary bucket page before we start 517 * with the actual replay operation. This is to ensure that neither a 518 * scan can start nor a scan can be already-in-progress during the replay 519 * of this operation. If we allow scans during this operation, then they 520 * can miss some records or show the same record multiple times. 521 */ 522 if (xldata->is_prim_bucket_same_wrt) 523 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf); 524 else 525 { 526 /* 527 * we don't care for return value as the purpose of reading bucketbuf 528 * is to ensure a cleanup lock on primary bucket page. 529 */ 530 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf); 531 532 action = XLogReadBufferForRedo(record, 1, &writebuf); 533 } 534 535 /* replay the record for adding entries in overflow buffer */ 536 if (action == BLK_NEEDS_REDO) 537 { 538 Page writepage; 539 char *begin; 540 char *data; 541 Size datalen; 542 uint16 ninserted = 0; 543 544 data = begin = XLogRecGetBlockData(record, 1, &datalen); 545 546 writepage = (Page) BufferGetPage(writebuf); 547 548 if (xldata->ntups > 0) 549 { 550 OffsetNumber *towrite = (OffsetNumber *) data; 551 552 data += sizeof(OffsetNumber) * xldata->ntups; 553 554 while (data - begin < datalen) 555 { 556 IndexTuple itup = (IndexTuple) data; 557 Size itemsz; 558 OffsetNumber l; 559 560 itemsz = IndexTupleSize(itup); 561 itemsz = MAXALIGN(itemsz); 562 563 data += itemsz; 564 565 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false); 566 if (l == InvalidOffsetNumber) 567 elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes", 568 (int) itemsz); 569 570 ninserted++; 571 } 572 } 573 574 /* 575 * number of tuples inserted must be same as requested in REDO record. 576 */ 577 Assert(ninserted == xldata->ntups); 578 579 PageSetLSN(writepage, lsn); 580 MarkBufferDirty(writebuf); 581 } 582 583 /* replay the record for deleting entries from overflow buffer */ 584 if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO) 585 { 586 Page page; 587 char *ptr; 588 Size len; 589 590 ptr = XLogRecGetBlockData(record, 2, &len); 591 592 page = (Page) BufferGetPage(deletebuf); 593 594 if (len > 0) 595 { 596 OffsetNumber *unused; 597 OffsetNumber *unend; 598 599 unused = (OffsetNumber *) ptr; 600 unend = (OffsetNumber *) ((char *) ptr + len); 601 602 if ((unend - unused) > 0) 603 PageIndexMultiDelete(page, unused, unend - unused); 604 } 605 606 PageSetLSN(page, lsn); 607 MarkBufferDirty(deletebuf); 608 } 609 610 /* 611 * Replay is complete, now we can release the buffers. We release locks at 612 * end of replay operation to ensure that we hold lock on primary bucket 613 * page till end of operation. We can optimize by releasing the lock on 614 * write buffer as soon as the operation for same is complete, if it is 615 * not same as primary bucket page, but that doesn't seem to be worth 616 * complicating the code. 617 */ 618 if (BufferIsValid(deletebuf)) 619 UnlockReleaseBuffer(deletebuf); 620 621 if (BufferIsValid(writebuf)) 622 UnlockReleaseBuffer(writebuf); 623 624 if (BufferIsValid(bucketbuf)) 625 UnlockReleaseBuffer(bucketbuf); 626 } 627 628 /* 629 * replay squeeze page operation of hash index 630 */ 631 static void 632 hash_xlog_squeeze_page(XLogReaderState *record) 633 { 634 XLogRecPtr lsn = record->EndRecPtr; 635 xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record); 636 Buffer bucketbuf = InvalidBuffer; 637 Buffer writebuf; 638 Buffer ovflbuf; 639 Buffer prevbuf = InvalidBuffer; 640 Buffer mapbuf; 641 XLogRedoAction action; 642 643 /* 644 * Ensure we have a cleanup lock on primary bucket page before we start 645 * with the actual replay operation. This is to ensure that neither a 646 * scan can start nor a scan can be already-in-progress during the replay 647 * of this operation. If we allow scans during this operation, then they 648 * can miss some records or show the same record multiple times. 649 */ 650 if (xldata->is_prim_bucket_same_wrt) 651 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf); 652 else 653 { 654 /* 655 * we don't care for return value as the purpose of reading bucketbuf 656 * is to ensure a cleanup lock on primary bucket page. __construct($exceptions = null)657 */ 658 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf); 659 660 action = XLogReadBufferForRedo(record, 1, &writebuf); 661 } 662 663 /* replay the record for adding entries in overflow buffer */ 664 if (action == BLK_NEEDS_REDO) 665 { 666 Page writepage; __destruct()667 char *begin; 668 char *data; 669 Size datalen; 670 uint16 ninserted = 0; 671 672 data = begin = XLogRecGetBlockData(record, 1, &datalen); 673 674 writepage = (Page) BufferGetPage(writebuf); 675 676 if (xldata->ntups > 0) 677 { 678 OffsetNumber *towrite = (OffsetNumber *) data; 679 680 data += sizeof(OffsetNumber) * xldata->ntups; 681 682 while (data - begin < datalen) 683 { 684 IndexTuple itup = (IndexTuple) data; 685 Size itemsz; mailPassthru($to, $subject, $body, $header, $params)686 OffsetNumber l; 687 688 itemsz = IndexTupleSize(itup); 689 itemsz = MAXALIGN(itemsz); 690 691 data += itemsz; 692 693 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false); 694 if (l == InvalidOffsetNumber) 695 elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes", 696 (int) itemsz); 697 698 ninserted++; 699 } 700 } 701 702 /* 703 * number of tuples inserted must be same as requested in REDO record. 704 */ 705 Assert(ninserted == xldata->ntups); 706 707 /* 708 * if the page on which are adding tuples is a page previous to freed 709 * overflow page, then update its nextblno. 710 */ edebug($str)711 if (xldata->is_prev_bucket_same_wrt) 712 { 713 HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage); 714 715 writeopaque->hasho_nextblkno = xldata->nextblkno; 716 } 717 718 PageSetLSN(writepage, lsn); 719 MarkBufferDirty(writebuf); 720 } 721 722 /* replay the record for initializing overflow buffer */ 723 if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO) 724 { 725 Page ovflpage; 726 HashPageOpaque ovflopaque; 727 728 ovflpage = BufferGetPage(ovflbuf); 729 730 _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf)); 731 732 ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage); 733 734 ovflopaque->hasho_prevblkno = InvalidBlockNumber; 735 ovflopaque->hasho_nextblkno = InvalidBlockNumber; 736 ovflopaque->hasho_bucket = -1; 737 ovflopaque->hasho_flag = LH_UNUSED_PAGE; 738 ovflopaque->hasho_page_id = HASHO_PAGE_ID; 739 740 PageSetLSN(ovflpage, lsn); 741 MarkBufferDirty(ovflbuf); 742 } 743 if (BufferIsValid(ovflbuf)) 744 UnlockReleaseBuffer(ovflbuf); 745 746 /* replay the record for page previous to the freed overflow page */ 747 if (!xldata->is_prev_bucket_same_wrt && 748 XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO) 749 { 750 Page prevpage = BufferGetPage(prevbuf); 751 HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage); isHTML($isHtml = true)752 753 prevopaque->hasho_nextblkno = xldata->nextblkno; 754 755 PageSetLSN(prevpage, lsn); 756 MarkBufferDirty(prevbuf); 757 } 758 if (BufferIsValid(prevbuf)) 759 UnlockReleaseBuffer(prevbuf); 760 761 /* replay the record for page next to the freed overflow page */ 762 if (XLogRecHasBlockRef(record, 4)) 763 { 764 Buffer nextbuf; isSMTP()765 766 if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO) 767 { 768 Page nextpage = BufferGetPage(nextbuf); 769 HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage); 770 771 nextopaque->hasho_prevblkno = xldata->prevblkno; 772 773 PageSetLSN(nextpage, lsn); 774 MarkBufferDirty(nextbuf); 775 } 776 if (BufferIsValid(nextbuf)) 777 UnlockReleaseBuffer(nextbuf); 778 } 779 780 if (BufferIsValid(writebuf)) 781 UnlockReleaseBuffer(writebuf); 782 isSendmail()783 if (BufferIsValid(bucketbuf)) 784 UnlockReleaseBuffer(bucketbuf); 785 786 /* 787 * Note: in normal operation, we'd update the bitmap and meta page while 788 * still holding lock on the primary bucket page and overflow pages. But 789 * during replay it's not necessary to hold those locks, since no other 790 * index updates can be happening concurrently. 791 */ 792 /* replay the record for bitmap page */ 793 if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO) 794 { 795 Page mappage = (Page) BufferGetPage(mapbuf); 796 uint32 *freep = NULL; 797 char *data; 798 uint32 *bitmap_page_bit; isQmail()799 Size datalen; 800 801 freep = HashPageGetBitmap(mappage); 802 803 data = XLogRecGetBlockData(record, 5, &datalen); 804 bitmap_page_bit = (uint32 *) data; 805 806 CLRBIT(freep, *bitmap_page_bit); 807 808 PageSetLSN(mappage, lsn); 809 MarkBufferDirty(mapbuf); 810 } 811 if (BufferIsValid(mapbuf)) 812 UnlockReleaseBuffer(mapbuf); 813 814 /* replay the record for meta page */ 815 if (XLogRecHasBlockRef(record, 6)) 816 { addAddress($address, $name = '')817 Buffer metabuf; 818 819 if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO) 820 { 821 HashMetaPage metap; 822 Page page; 823 char *data; 824 uint32 *firstfree_ovflpage; 825 Size datalen; 826 827 data = XLogRecGetBlockData(record, 6, &datalen); 828 firstfree_ovflpage = (uint32 *) data; addCC($address, $name = '')829 830 page = BufferGetPage(metabuf); 831 metap = HashPageGetMeta(page); 832 metap->hashm_firstfree = *firstfree_ovflpage; 833 834 PageSetLSN(page, lsn); 835 MarkBufferDirty(metabuf); 836 } 837 if (BufferIsValid(metabuf)) 838 UnlockReleaseBuffer(metabuf); 839 } 840 } addBCC($address, $name = '')841 842 /* 843 * replay delete operation of hash index 844 */ 845 static void 846 hash_xlog_delete(XLogReaderState *record) 847 { 848 XLogRecPtr lsn = record->EndRecPtr; 849 xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record); 850 Buffer bucketbuf = InvalidBuffer; 851 Buffer deletebuf; 852 Page page; 853 XLogRedoAction action; 854 855 /* 856 * Ensure we have a cleanup lock on primary bucket page before we start 857 * with the actual replay operation. This is to ensure that neither a 858 * scan can start nor a scan can be already-in-progress during the replay 859 * of this operation. If we allow scans during this operation, then they 860 * can miss some records or show the same record multiple times. 861 */ 862 if (xldata->is_primary_bucket_page) 863 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf); 864 else 865 { 866 /* 867 * we don't care for return value as the purpose of reading bucketbuf 868 * is to ensure a cleanup lock on primary bucket page. 869 */ 870 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf); 871 872 action = XLogReadBufferForRedo(record, 1, &deletebuf); 873 } 874 875 /* replay the record for deleting entries in bucket page */ 876 if (action == BLK_NEEDS_REDO) 877 { 878 char *ptr; 879 Size len; 880 881 ptr = XLogRecGetBlockData(record, 1, &len); 882 883 page = (Page) BufferGetPage(deletebuf); 884 885 if (len > 0) 886 { 887 OffsetNumber *unused; 888 OffsetNumber *unend; 889 890 unused = (OffsetNumber *) ptr; 891 unend = (OffsetNumber *) ((char *) ptr + len); 892 893 if ((unend - unused) > 0) 894 PageIndexMultiDelete(page, unused, unend - unused); 895 } 896 897 /* 898 * Mark the page as not containing any LP_DEAD items only if 899 * clear_dead_marking flag is set to true. See comments in 900 * hashbucketcleanup() for details. 901 */ 902 if (xldata->clear_dead_marking) 903 { 904 HashPageOpaque pageopaque; 905 906 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); 907 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES; 908 } 909 910 PageSetLSN(page, lsn); 911 MarkBufferDirty(deletebuf); 912 } 913 if (BufferIsValid(deletebuf)) 914 UnlockReleaseBuffer(deletebuf); 915 916 if (BufferIsValid(bucketbuf)) 917 UnlockReleaseBuffer(bucketbuf); 918 } 919 920 /* 921 * replay split cleanup flag operation for primary bucket page. 922 */ 923 static void 924 hash_xlog_split_cleanup(XLogReaderState *record) 925 { 926 XLogRecPtr lsn = record->EndRecPtr; 927 Buffer buffer; 928 Page page; 929 930 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) 931 { 932 HashPageOpaque bucket_opaque; 933 934 page = (Page) BufferGetPage(buffer); 935 936 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page); 937 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP; 938 PageSetLSN(page, lsn); 939 MarkBufferDirty(buffer); 940 } 941 if (BufferIsValid(buffer)) 942 UnlockReleaseBuffer(buffer); 943 } 944 945 /* 946 * replay for update meta page 947 */ 948 static void 949 hash_xlog_update_meta_page(XLogReaderState *record) 950 { 951 HashMetaPage metap; 952 XLogRecPtr lsn = record->EndRecPtr; 953 xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record); 954 Buffer metabuf; 955 Page page; 956 957 if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO) parseAddresses($addrstr, $useimap = true)958 { 959 page = BufferGetPage(metabuf); 960 metap = HashPageGetMeta(page); 961 962 metap->hashm_ntuples = xldata->ntuples; 963 964 PageSetLSN(page, lsn); 965 MarkBufferDirty(metabuf); 966 } 967 if (BufferIsValid(metabuf)) 968 UnlockReleaseBuffer(metabuf); 969 } 970 971 /* 972 * replay delete operation in hash index to remove 973 * tuples marked as DEAD during index tuple insertion. 974 */ 975 static void 976 hash_xlog_vacuum_one_page(XLogReaderState *record) 977 { 978 XLogRecPtr lsn = record->EndRecPtr; 979 xl_hash_vacuum_one_page *xldata; 980 Buffer buffer; 981 Buffer metabuf; 982 Page page; 983 XLogRedoAction action; 984 HashPageOpaque pageopaque; 985 986 xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record); 987 988 /* 989 * If we have any conflict processing to do, it must happen before we 990 * update the page. 991 * 992 * Hash index records that are marked as LP_DEAD and being removed during 993 * hash index tuple insertion can conflict with standby queries. You might 994 * think that vacuum records would conflict as well, but we've handled 995 * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid 996 * cleaned by the vacuum of the heap and so we can resolve any conflicts 997 * just once when that arrives. After that we know that no conflicts 998 * exist from individual hash index vacuum records on that index. 999 */ 1000 if (InHotStandby) 1001 { 1002 RelFileNode rnode; 1003 1004 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); 1005 ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); 1006 } 1007 1008 action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); 1009 1010 if (action == BLK_NEEDS_REDO) setFrom($address, $name = '', $auto = true)1011 { 1012 page = (Page) BufferGetPage(buffer); 1013 1014 if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage) 1015 { 1016 OffsetNumber *unused; 1017 1018 unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage); 1019 1020 PageIndexMultiDelete(page, unused, xldata->ntuples); 1021 } 1022 1023 /* 1024 * Mark the page as not containing any LP_DEAD items. See comments in 1025 * _hash_vacuum_one_page() for details. 1026 */ 1027 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); 1028 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES; 1029 1030 PageSetLSN(page, lsn); 1031 MarkBufferDirty(buffer); 1032 } 1033 if (BufferIsValid(buffer)) 1034 UnlockReleaseBuffer(buffer); 1035 1036 if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO) 1037 { 1038 Page metapage; 1039 HashMetaPage metap; 1040 1041 metapage = BufferGetPage(metabuf); 1042 metap = HashPageGetMeta(metapage); 1043 getLastMessageID()1044 metap->hashm_ntuples -= xldata->ntuples; 1045 1046 PageSetLSN(metapage, lsn); 1047 MarkBufferDirty(metabuf); 1048 } 1049 if (BufferIsValid(metabuf)) 1050 UnlockReleaseBuffer(metabuf); 1051 } 1052 1053 void 1054 hash_redo(XLogReaderState *record) 1055 { 1056 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; 1057 1058 switch (info) 1059 { 1060 case XLOG_HASH_INIT_META_PAGE: 1061 hash_xlog_init_meta_page(record); 1062 break; 1063 case XLOG_HASH_INIT_BITMAP_PAGE: 1064 hash_xlog_init_bitmap_page(record); 1065 break; 1066 case XLOG_HASH_INSERT: 1067 hash_xlog_insert(record); validateAddress($address, $patternselect = null)1068 break; 1069 case XLOG_HASH_ADD_OVFL_PAGE: 1070 hash_xlog_add_ovfl_page(record); 1071 break; 1072 case XLOG_HASH_SPLIT_ALLOCATE_PAGE: 1073 hash_xlog_split_allocate_page(record); 1074 break; 1075 case XLOG_HASH_SPLIT_PAGE: 1076 hash_xlog_split_page(record); 1077 break; 1078 case XLOG_HASH_SPLIT_COMPLETE: 1079 hash_xlog_split_complete(record); 1080 break; 1081 case XLOG_HASH_MOVE_PAGE_CONTENTS: 1082 hash_xlog_move_page_contents(record); 1083 break; 1084 case XLOG_HASH_SQUEEZE_PAGE: 1085 hash_xlog_squeeze_page(record); 1086 break; 1087 case XLOG_HASH_DELETE: 1088 hash_xlog_delete(record); 1089 break; 1090 case XLOG_HASH_SPLIT_CLEANUP: 1091 hash_xlog_split_cleanup(record); 1092 break; 1093 case XLOG_HASH_UPDATE_META_PAGE: 1094 hash_xlog_update_meta_page(record); 1095 break; 1096 case XLOG_HASH_VACUUM_ONE_PAGE: 1097 hash_xlog_vacuum_one_page(record); 1098 break; 1099 default: 1100 elog(PANIC, "hash_redo: unknown op code %u", info); 1101 } 1102 } 1103 1104 /* 1105 * Mask a hash page before performing consistency checks on it. 1106 */ 1107 void 1108 hash_mask(char *pagedata, BlockNumber blkno) 1109 { 1110 Page page = (Page) pagedata; 1111 HashPageOpaque opaque; 1112 int pagetype; 1113 1114 mask_page_lsn_and_checksum(page); 1115 1116 mask_page_hint_bits(page); 1117 mask_unused_space(page); 1118 1119 opaque = (HashPageOpaque) PageGetSpecialPointer(page); 1120 1121 pagetype = opaque->hasho_flag & LH_PAGE_TYPE; 1122 if (pagetype == LH_UNUSED_PAGE) 1123 { 1124 /* 1125 * Mask everything on a UNUSED page. 1126 */ 1127 mask_page_content(page); 1128 } 1129 else if (pagetype == LH_BUCKET_PAGE || 1130 pagetype == LH_OVERFLOW_PAGE) 1131 { 1132 /* 1133 * In hash bucket and overflow pages, it is possible to modify the 1134 * LP_FLAGS without emitting any WAL record. Hence, mask the line 1135 * pointer flags. See hashgettuple(), _hash_kill_items() for details. 1136 */ 1137 mask_lp_flags(page); 1138 } 1139 1140 /* 1141 * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain 1142 * unlogged. So, mask it. See _hash_kill_items() for details. 1143 */ 1144 opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES; 1145 } 1146