1 /*-------------------------------------------------------------------------
2 *
3 * hash_xlog.c
4 * WAL replay logic for hash index.
5 *
6 *
7 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/hash/hash_xlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/heapam_xlog.h"
18 #include "access/bufmask.h"
19 #include "access/hash.h"
20 #include "access/hash_xlog.h"
21 #include "access/xlogutils.h"
22 #include "access/xlog.h"
23 #include "access/transam.h"
24 #include "storage/procarray.h"
25 #include "miscadmin.h"
26
27 /*
28 * replay a hash index meta page
29 */
30 static void
hash_xlog_init_meta_page(XLogReaderState * record)31 hash_xlog_init_meta_page(XLogReaderState *record)
32 {
33 XLogRecPtr lsn = record->EndRecPtr;
34 Page page;
35 Buffer metabuf;
36 ForkNumber forknum;
37
38 xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
39
40 /* create the index' metapage */
41 metabuf = XLogInitBufferForRedo(record, 0);
42 Assert(BufferIsValid(metabuf));
43 _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
44 xlrec->ffactor, true);
45 page = (Page) BufferGetPage(metabuf);
46 PageSetLSN(page, lsn);
47 MarkBufferDirty(metabuf);
48
49 /*
50 * Force the on-disk state of init forks to always be in sync with the
51 * state in shared buffers. See XLogReadBufferForRedoExtended. We need
52 * special handling for init forks as create index operations don't log a
53 * full page image of the metapage.
54 */
55 XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
56 if (forknum == INIT_FORKNUM)
57 FlushOneBuffer(metabuf);
58
59 /* all done */
60 UnlockReleaseBuffer(metabuf);
61 }
62
63 /*
64 * replay a hash index bitmap page
65 */
66 static void
hash_xlog_init_bitmap_page(XLogReaderState * record)67 hash_xlog_init_bitmap_page(XLogReaderState *record)
68 {
69 XLogRecPtr lsn = record->EndRecPtr;
70 Buffer bitmapbuf;
71 Buffer metabuf;
72 Page page;
73 HashMetaPage metap;
74 uint32 num_buckets;
75 ForkNumber forknum;
76
77 xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
78
79 /*
80 * Initialize bitmap page
81 */
82 bitmapbuf = XLogInitBufferForRedo(record, 0);
83 _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
84 PageSetLSN(BufferGetPage(bitmapbuf), lsn);
85 MarkBufferDirty(bitmapbuf);
86
87 /*
88 * Force the on-disk state of init forks to always be in sync with the
89 * state in shared buffers. See XLogReadBufferForRedoExtended. We need
90 * special handling for init forks as create index operations don't log a
91 * full page image of the metapage.
92 */
93 XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
94 if (forknum == INIT_FORKNUM)
95 FlushOneBuffer(bitmapbuf);
96 UnlockReleaseBuffer(bitmapbuf);
97
98 /* add the new bitmap page to the metapage's list of bitmaps */
99 if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
100 {
101 /*
102 * Note: in normal operation, we'd update the metapage while still
103 * holding lock on the bitmap page. But during replay it's not
104 * necessary to hold that lock, since nobody can see it yet; the
105 * creating transaction hasn't yet committed.
106 */
107 page = BufferGetPage(metabuf);
108 metap = HashPageGetMeta(page);
109
110 num_buckets = metap->hashm_maxbucket + 1;
111 metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
112 metap->hashm_nmaps++;
113
114 PageSetLSN(page, lsn);
115 MarkBufferDirty(metabuf);
116
117 XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL);
118 if (forknum == INIT_FORKNUM)
119 FlushOneBuffer(metabuf);
120 }
121 if (BufferIsValid(metabuf))
122 UnlockReleaseBuffer(metabuf);
123 }
124
125 /*
126 * replay a hash index insert without split
127 */
128 static void
hash_xlog_insert(XLogReaderState * record)129 hash_xlog_insert(XLogReaderState *record)
130 {
131 HashMetaPage metap;
132 XLogRecPtr lsn = record->EndRecPtr;
133 xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
134 Buffer buffer;
135 Page page;
136
137 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
138 {
139 Size datalen;
140 char *datapos = XLogRecGetBlockData(record, 0, &datalen);
141
142 page = BufferGetPage(buffer);
143
144 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
145 false, false) == InvalidOffsetNumber)
146 elog(PANIC, "hash_xlog_insert: failed to add item");
147
148 PageSetLSN(page, lsn);
149 MarkBufferDirty(buffer);
150 }
151 if (BufferIsValid(buffer))
152 UnlockReleaseBuffer(buffer);
153
154 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
155 {
156 /*
157 * Note: in normal operation, we'd update the metapage while still
158 * holding lock on the page we inserted into. But during replay it's
159 * not necessary to hold that lock, since no other index updates can
160 * be happening concurrently.
161 */
162 page = BufferGetPage(buffer);
163 metap = HashPageGetMeta(page);
164 metap->hashm_ntuples += 1;
165
166 PageSetLSN(page, lsn);
167 MarkBufferDirty(buffer);
168 }
169 if (BufferIsValid(buffer))
170 UnlockReleaseBuffer(buffer);
171 }
172
173 /*
174 * replay addition of overflow page for hash index
175 */
176 static void
hash_xlog_add_ovfl_page(XLogReaderState * record)177 hash_xlog_add_ovfl_page(XLogReaderState *record)
178 {
179 XLogRecPtr lsn = record->EndRecPtr;
180 xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
181 Buffer leftbuf;
182 Buffer ovflbuf;
183 Buffer metabuf;
184 BlockNumber leftblk;
185 BlockNumber rightblk;
186 BlockNumber newmapblk = InvalidBlockNumber;
187 Page ovflpage;
188 HashPageOpaque ovflopaque;
189 uint32 *num_bucket;
190 char *data;
191 Size datalen PG_USED_FOR_ASSERTS_ONLY;
192 bool new_bmpage = false;
193
194 XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
195 XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
196
197 ovflbuf = XLogInitBufferForRedo(record, 0);
198 Assert(BufferIsValid(ovflbuf));
199
200 data = XLogRecGetBlockData(record, 0, &datalen);
201 num_bucket = (uint32 *) data;
202 Assert(datalen == sizeof(uint32));
203 _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
204 true);
205 /* update backlink */
206 ovflpage = BufferGetPage(ovflbuf);
207 ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
208 ovflopaque->hasho_prevblkno = leftblk;
209
210 PageSetLSN(ovflpage, lsn);
211 MarkBufferDirty(ovflbuf);
212
213 if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
214 {
215 Page leftpage;
216 HashPageOpaque leftopaque;
217
218 leftpage = BufferGetPage(leftbuf);
219 leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
220 leftopaque->hasho_nextblkno = rightblk;
221
222 PageSetLSN(leftpage, lsn);
223 MarkBufferDirty(leftbuf);
224 }
225
226 if (BufferIsValid(leftbuf))
227 UnlockReleaseBuffer(leftbuf);
228 UnlockReleaseBuffer(ovflbuf);
229
230 /*
231 * Note: in normal operation, we'd update the bitmap and meta page while
232 * still holding lock on the overflow pages. But during replay it's not
233 * necessary to hold those locks, since no other index updates can be
234 * happening concurrently.
235 */
236 if (XLogRecHasBlockRef(record, 2))
237 {
238 Buffer mapbuffer;
239
240 if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
241 {
242 Page mappage = (Page) BufferGetPage(mapbuffer);
243 uint32 *freep = NULL;
244 char *data;
245 uint32 *bitmap_page_bit;
246
247 freep = HashPageGetBitmap(mappage);
248
249 data = XLogRecGetBlockData(record, 2, &datalen);
250 bitmap_page_bit = (uint32 *) data;
251
252 SETBIT(freep, *bitmap_page_bit);
253
254 PageSetLSN(mappage, lsn);
255 MarkBufferDirty(mapbuffer);
256 }
257 if (BufferIsValid(mapbuffer))
258 UnlockReleaseBuffer(mapbuffer);
259 }
260
261 if (XLogRecHasBlockRef(record, 3))
262 {
263 Buffer newmapbuf;
264
265 newmapbuf = XLogInitBufferForRedo(record, 3);
266
267 _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
268
269 new_bmpage = true;
270 newmapblk = BufferGetBlockNumber(newmapbuf);
271
272 MarkBufferDirty(newmapbuf);
273 PageSetLSN(BufferGetPage(newmapbuf), lsn);
274
275 UnlockReleaseBuffer(newmapbuf);
276 }
277
278 if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
279 {
280 HashMetaPage metap;
281 Page page;
282 uint32 *firstfree_ovflpage;
283
284 data = XLogRecGetBlockData(record, 4, &datalen);
285 firstfree_ovflpage = (uint32 *) data;
286
287 page = BufferGetPage(metabuf);
288 metap = HashPageGetMeta(page);
289 metap->hashm_firstfree = *firstfree_ovflpage;
290
291 if (!xlrec->bmpage_found)
292 {
293 metap->hashm_spares[metap->hashm_ovflpoint]++;
294
295 if (new_bmpage)
296 {
297 Assert(BlockNumberIsValid(newmapblk));
298
299 metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
300 metap->hashm_nmaps++;
301 metap->hashm_spares[metap->hashm_ovflpoint]++;
302 }
303 }
304
305 PageSetLSN(page, lsn);
306 MarkBufferDirty(metabuf);
307 }
308 if (BufferIsValid(metabuf))
309 UnlockReleaseBuffer(metabuf);
310 }
311
312 /*
313 * replay allocation of page for split operation
314 */
315 static void
hash_xlog_split_allocate_page(XLogReaderState * record)316 hash_xlog_split_allocate_page(XLogReaderState *record)
317 {
318 XLogRecPtr lsn = record->EndRecPtr;
319 xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
320 Buffer oldbuf;
321 Buffer newbuf;
322 Buffer metabuf;
323 Size datalen PG_USED_FOR_ASSERTS_ONLY;
324 char *data;
325 XLogRedoAction action;
326
327 /*
328 * To be consistent with normal operation, here we take cleanup locks on
329 * both the old and new buckets even though there can't be any concurrent
330 * inserts.
331 */
332
333 /* replay the record for old bucket */
334 action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
335
336 /*
337 * Note that we still update the page even if it was restored from a full
338 * page image, because the special space is not included in the image.
339 */
340 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
341 {
342 Page oldpage;
343 HashPageOpaque oldopaque;
344
345 oldpage = BufferGetPage(oldbuf);
346 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
347
348 oldopaque->hasho_flag = xlrec->old_bucket_flag;
349 oldopaque->hasho_prevblkno = xlrec->new_bucket;
350
351 PageSetLSN(oldpage, lsn);
352 MarkBufferDirty(oldbuf);
353 }
354
355 /* replay the record for new bucket */
356 newbuf = XLogInitBufferForRedo(record, 1);
357 _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
358 xlrec->new_bucket_flag, true);
359 if (!IsBufferCleanupOK(newbuf))
360 elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
361 MarkBufferDirty(newbuf);
362 PageSetLSN(BufferGetPage(newbuf), lsn);
363
364 /*
365 * We can release the lock on old bucket early as well but doing here to
366 * consistent with normal operation.
367 */
368 if (BufferIsValid(oldbuf))
369 UnlockReleaseBuffer(oldbuf);
370 if (BufferIsValid(newbuf))
371 UnlockReleaseBuffer(newbuf);
372
373 /*
374 * Note: in normal operation, we'd update the meta page while still
375 * holding lock on the old and new bucket pages. But during replay it's
376 * not necessary to hold those locks, since no other bucket splits can be
377 * happening concurrently.
378 */
379
380 /* replay the record for metapage changes */
381 if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
382 {
383 Page page;
384 HashMetaPage metap;
385
386 page = BufferGetPage(metabuf);
387 metap = HashPageGetMeta(page);
388 metap->hashm_maxbucket = xlrec->new_bucket;
389
390 data = XLogRecGetBlockData(record, 2, &datalen);
391
392 if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
393 {
394 uint32 lowmask;
395 uint32 *highmask;
396
397 /* extract low and high masks. */
398 memcpy(&lowmask, data, sizeof(uint32));
399 highmask = (uint32 *) ((char *) data + sizeof(uint32));
400
401 /* update metapage */
402 metap->hashm_lowmask = lowmask;
403 metap->hashm_highmask = *highmask;
404
405 data += sizeof(uint32) * 2;
406 }
407
408 if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
409 {
410 uint32 ovflpoint;
411 uint32 *ovflpages;
412
413 /* extract information of overflow pages. */
414 memcpy(&ovflpoint, data, sizeof(uint32));
415 ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
416
417 /* update metapage */
418 metap->hashm_spares[ovflpoint] = *ovflpages;
419 metap->hashm_ovflpoint = ovflpoint;
420 }
421
422 MarkBufferDirty(metabuf);
423 PageSetLSN(BufferGetPage(metabuf), lsn);
424 }
425
426 if (BufferIsValid(metabuf))
427 UnlockReleaseBuffer(metabuf);
428 }
429
430 /*
431 * replay of split operation
432 */
433 static void
hash_xlog_split_page(XLogReaderState * record)434 hash_xlog_split_page(XLogReaderState *record)
435 {
436 Buffer buf;
437
438 if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
439 elog(ERROR, "Hash split record did not contain a full-page image");
440
441 UnlockReleaseBuffer(buf);
442 }
443
444 /*
445 * replay completion of split operation
446 */
447 static void
hash_xlog_split_complete(XLogReaderState * record)448 hash_xlog_split_complete(XLogReaderState *record)
449 {
450 XLogRecPtr lsn = record->EndRecPtr;
451 xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
452 Buffer oldbuf;
453 Buffer newbuf;
454 XLogRedoAction action;
455
456 /* replay the record for old bucket */
457 action = XLogReadBufferForRedo(record, 0, &oldbuf);
458
459 /*
460 * Note that we still update the page even if it was restored from a full
461 * page image, because the bucket flag is not included in the image.
462 */
463 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
464 {
465 Page oldpage;
466 HashPageOpaque oldopaque;
467
468 oldpage = BufferGetPage(oldbuf);
469 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
470
471 oldopaque->hasho_flag = xlrec->old_bucket_flag;
472
473 PageSetLSN(oldpage, lsn);
474 MarkBufferDirty(oldbuf);
475 }
476 if (BufferIsValid(oldbuf))
477 UnlockReleaseBuffer(oldbuf);
478
479 /* replay the record for new bucket */
480 action = XLogReadBufferForRedo(record, 1, &newbuf);
481
482 /*
483 * Note that we still update the page even if it was restored from a full
484 * page image, because the bucket flag is not included in the image.
485 */
486 if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
487 {
488 Page newpage;
489 HashPageOpaque nopaque;
490
491 newpage = BufferGetPage(newbuf);
492 nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
493
494 nopaque->hasho_flag = xlrec->new_bucket_flag;
495
496 PageSetLSN(newpage, lsn);
497 MarkBufferDirty(newbuf);
498 }
499 if (BufferIsValid(newbuf))
500 UnlockReleaseBuffer(newbuf);
501 }
502
503 /*
504 * replay move of page contents for squeeze operation of hash index
505 */
506 static void
hash_xlog_move_page_contents(XLogReaderState * record)507 hash_xlog_move_page_contents(XLogReaderState *record)
508 {
509 XLogRecPtr lsn = record->EndRecPtr;
510 xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
511 Buffer bucketbuf = InvalidBuffer;
512 Buffer writebuf = InvalidBuffer;
513 Buffer deletebuf = InvalidBuffer;
514 XLogRedoAction action;
515
516 /*
517 * Ensure we have a cleanup lock on primary bucket page before we start
518 * with the actual replay operation. This is to ensure that neither a
519 * scan can start nor a scan can be already-in-progress during the replay
520 * of this operation. If we allow scans during this operation, then they
521 * can miss some records or show the same record multiple times.
522 */
523 if (xldata->is_prim_bucket_same_wrt)
524 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
525 else
526 {
527 /*
528 * we don't care for return value as the purpose of reading bucketbuf
529 * is to ensure a cleanup lock on primary bucket page.
530 */
531 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
532
533 action = XLogReadBufferForRedo(record, 1, &writebuf);
534 }
535
536 /* replay the record for adding entries in overflow buffer */
537 if (action == BLK_NEEDS_REDO)
538 {
539 Page writepage;
540 char *begin;
541 char *data;
542 Size datalen;
543 uint16 ninserted = 0;
544
545 data = begin = XLogRecGetBlockData(record, 1, &datalen);
546
547 writepage = (Page) BufferGetPage(writebuf);
548
549 if (xldata->ntups > 0)
550 {
551 OffsetNumber *towrite = (OffsetNumber *) data;
552
553 data += sizeof(OffsetNumber) * xldata->ntups;
554
555 while (data - begin < datalen)
556 {
557 IndexTuple itup = (IndexTuple) data;
558 Size itemsz;
559 OffsetNumber l;
560
561 itemsz = IndexTupleDSize(*itup);
562 itemsz = MAXALIGN(itemsz);
563
564 data += itemsz;
565
566 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
567 if (l == InvalidOffsetNumber)
568 elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
569 (int) itemsz);
570
571 ninserted++;
572 }
573 }
574
575 /*
576 * number of tuples inserted must be same as requested in REDO record.
577 */
578 Assert(ninserted == xldata->ntups);
579
580 PageSetLSN(writepage, lsn);
581 MarkBufferDirty(writebuf);
582 }
583
584 /* replay the record for deleting entries from overflow buffer */
585 if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
586 {
587 Page page;
588 char *ptr;
589 Size len;
590
591 ptr = XLogRecGetBlockData(record, 2, &len);
592
593 page = (Page) BufferGetPage(deletebuf);
594
595 if (len > 0)
596 {
597 OffsetNumber *unused;
598 OffsetNumber *unend;
599
600 unused = (OffsetNumber *) ptr;
601 unend = (OffsetNumber *) ((char *) ptr + len);
602
603 if ((unend - unused) > 0)
604 PageIndexMultiDelete(page, unused, unend - unused);
605 }
606
607 PageSetLSN(page, lsn);
608 MarkBufferDirty(deletebuf);
609 }
610
611 /*
612 * Replay is complete, now we can release the buffers. We release locks at
613 * end of replay operation to ensure that we hold lock on primary bucket
614 * page till end of operation. We can optimize by releasing the lock on
615 * write buffer as soon as the operation for same is complete, if it is
616 * not same as primary bucket page, but that doesn't seem to be worth
617 * complicating the code.
618 */
619 if (BufferIsValid(deletebuf))
620 UnlockReleaseBuffer(deletebuf);
621
622 if (BufferIsValid(writebuf))
623 UnlockReleaseBuffer(writebuf);
624
625 if (BufferIsValid(bucketbuf))
626 UnlockReleaseBuffer(bucketbuf);
627 }
628
629 /*
630 * replay squeeze page operation of hash index
631 */
632 static void
hash_xlog_squeeze_page(XLogReaderState * record)633 hash_xlog_squeeze_page(XLogReaderState *record)
634 {
635 XLogRecPtr lsn = record->EndRecPtr;
636 xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
637 Buffer bucketbuf = InvalidBuffer;
638 Buffer writebuf;
639 Buffer ovflbuf;
640 Buffer prevbuf = InvalidBuffer;
641 Buffer mapbuf;
642 XLogRedoAction action;
643
644 /*
645 * Ensure we have a cleanup lock on primary bucket page before we start
646 * with the actual replay operation. This is to ensure that neither a
647 * scan can start nor a scan can be already-in-progress during the replay
648 * of this operation. If we allow scans during this operation, then they
649 * can miss some records or show the same record multiple times.
650 */
651 if (xldata->is_prim_bucket_same_wrt)
652 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
653 else
654 {
655 /*
656 * we don't care for return value as the purpose of reading bucketbuf
657 * is to ensure a cleanup lock on primary bucket page.
658 */
659 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
660
661 action = XLogReadBufferForRedo(record, 1, &writebuf);
662 }
663
664 /* replay the record for adding entries in overflow buffer */
665 if (action == BLK_NEEDS_REDO)
666 {
667 Page writepage;
668 char *begin;
669 char *data;
670 Size datalen;
671 uint16 ninserted = 0;
672
673 data = begin = XLogRecGetBlockData(record, 1, &datalen);
674
675 writepage = (Page) BufferGetPage(writebuf);
676
677 if (xldata->ntups > 0)
678 {
679 OffsetNumber *towrite = (OffsetNumber *) data;
680
681 data += sizeof(OffsetNumber) * xldata->ntups;
682
683 while (data - begin < datalen)
684 {
685 IndexTuple itup = (IndexTuple) data;
686 Size itemsz;
687 OffsetNumber l;
688
689 itemsz = IndexTupleDSize(*itup);
690 itemsz = MAXALIGN(itemsz);
691
692 data += itemsz;
693
694 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
695 if (l == InvalidOffsetNumber)
696 elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
697 (int) itemsz);
698
699 ninserted++;
700 }
701 }
702
703 /*
704 * number of tuples inserted must be same as requested in REDO record.
705 */
706 Assert(ninserted == xldata->ntups);
707
708 /*
709 * if the page on which are adding tuples is a page previous to freed
710 * overflow page, then update its nextblno.
711 */
712 if (xldata->is_prev_bucket_same_wrt)
713 {
714 HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
715
716 writeopaque->hasho_nextblkno = xldata->nextblkno;
717 }
718
719 PageSetLSN(writepage, lsn);
720 MarkBufferDirty(writebuf);
721 }
722
723 /* replay the record for initializing overflow buffer */
724 if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
725 {
726 Page ovflpage;
727 HashPageOpaque ovflopaque;
728
729 ovflpage = BufferGetPage(ovflbuf);
730
731 _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
732
733 ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
734
735 ovflopaque->hasho_prevblkno = InvalidBlockNumber;
736 ovflopaque->hasho_nextblkno = InvalidBlockNumber;
737 ovflopaque->hasho_bucket = -1;
738 ovflopaque->hasho_flag = LH_UNUSED_PAGE;
739 ovflopaque->hasho_page_id = HASHO_PAGE_ID;
740
741 PageSetLSN(ovflpage, lsn);
742 MarkBufferDirty(ovflbuf);
743 }
744 if (BufferIsValid(ovflbuf))
745 UnlockReleaseBuffer(ovflbuf);
746
747 /* replay the record for page previous to the freed overflow page */
748 if (!xldata->is_prev_bucket_same_wrt &&
749 XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
750 {
751 Page prevpage = BufferGetPage(prevbuf);
752 HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
753
754 prevopaque->hasho_nextblkno = xldata->nextblkno;
755
756 PageSetLSN(prevpage, lsn);
757 MarkBufferDirty(prevbuf);
758 }
759 if (BufferIsValid(prevbuf))
760 UnlockReleaseBuffer(prevbuf);
761
762 /* replay the record for page next to the freed overflow page */
763 if (XLogRecHasBlockRef(record, 4))
764 {
765 Buffer nextbuf;
766
767 if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
768 {
769 Page nextpage = BufferGetPage(nextbuf);
770 HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
771
772 nextopaque->hasho_prevblkno = xldata->prevblkno;
773
774 PageSetLSN(nextpage, lsn);
775 MarkBufferDirty(nextbuf);
776 }
777 if (BufferIsValid(nextbuf))
778 UnlockReleaseBuffer(nextbuf);
779 }
780
781 if (BufferIsValid(writebuf))
782 UnlockReleaseBuffer(writebuf);
783
784 if (BufferIsValid(bucketbuf))
785 UnlockReleaseBuffer(bucketbuf);
786
787 /*
788 * Note: in normal operation, we'd update the bitmap and meta page while
789 * still holding lock on the primary bucket page and overflow pages. But
790 * during replay it's not necessary to hold those locks, since no other
791 * index updates can be happening concurrently.
792 */
793 /* replay the record for bitmap page */
794 if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
795 {
796 Page mappage = (Page) BufferGetPage(mapbuf);
797 uint32 *freep = NULL;
798 char *data;
799 uint32 *bitmap_page_bit;
800 Size datalen;
801
802 freep = HashPageGetBitmap(mappage);
803
804 data = XLogRecGetBlockData(record, 5, &datalen);
805 bitmap_page_bit = (uint32 *) data;
806
807 CLRBIT(freep, *bitmap_page_bit);
808
809 PageSetLSN(mappage, lsn);
810 MarkBufferDirty(mapbuf);
811 }
812 if (BufferIsValid(mapbuf))
813 UnlockReleaseBuffer(mapbuf);
814
815 /* replay the record for meta page */
816 if (XLogRecHasBlockRef(record, 6))
817 {
818 Buffer metabuf;
819
820 if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
821 {
822 HashMetaPage metap;
823 Page page;
824 char *data;
825 uint32 *firstfree_ovflpage;
826 Size datalen;
827
828 data = XLogRecGetBlockData(record, 6, &datalen);
829 firstfree_ovflpage = (uint32 *) data;
830
831 page = BufferGetPage(metabuf);
832 metap = HashPageGetMeta(page);
833 metap->hashm_firstfree = *firstfree_ovflpage;
834
835 PageSetLSN(page, lsn);
836 MarkBufferDirty(metabuf);
837 }
838 if (BufferIsValid(metabuf))
839 UnlockReleaseBuffer(metabuf);
840 }
841 }
842
843 /*
844 * replay delete operation of hash index
845 */
846 static void
hash_xlog_delete(XLogReaderState * record)847 hash_xlog_delete(XLogReaderState *record)
848 {
849 XLogRecPtr lsn = record->EndRecPtr;
850 xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
851 Buffer bucketbuf = InvalidBuffer;
852 Buffer deletebuf;
853 Page page;
854 XLogRedoAction action;
855
856 /*
857 * Ensure we have a cleanup lock on primary bucket page before we start
858 * with the actual replay operation. This is to ensure that neither a
859 * scan can start nor a scan can be already-in-progress during the replay
860 * of this operation. If we allow scans during this operation, then they
861 * can miss some records or show the same record multiple times.
862 */
863 if (xldata->is_primary_bucket_page)
864 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
865 else
866 {
867 /*
868 * we don't care for return value as the purpose of reading bucketbuf
869 * is to ensure a cleanup lock on primary bucket page.
870 */
871 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
872
873 action = XLogReadBufferForRedo(record, 1, &deletebuf);
874 }
875
876 /* replay the record for deleting entries in bucket page */
877 if (action == BLK_NEEDS_REDO)
878 {
879 char *ptr;
880 Size len;
881
882 ptr = XLogRecGetBlockData(record, 1, &len);
883
884 page = (Page) BufferGetPage(deletebuf);
885
886 if (len > 0)
887 {
888 OffsetNumber *unused;
889 OffsetNumber *unend;
890
891 unused = (OffsetNumber *) ptr;
892 unend = (OffsetNumber *) ((char *) ptr + len);
893
894 if ((unend - unused) > 0)
895 PageIndexMultiDelete(page, unused, unend - unused);
896 }
897
898 /*
899 * Mark the page as not containing any LP_DEAD items only if
900 * clear_dead_marking flag is set to true. See comments in
901 * hashbucketcleanup() for details.
902 */
903 if (xldata->clear_dead_marking)
904 {
905 HashPageOpaque pageopaque;
906
907 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
908 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
909 }
910
911 PageSetLSN(page, lsn);
912 MarkBufferDirty(deletebuf);
913 }
914 if (BufferIsValid(deletebuf))
915 UnlockReleaseBuffer(deletebuf);
916
917 if (BufferIsValid(bucketbuf))
918 UnlockReleaseBuffer(bucketbuf);
919 }
920
921 /*
922 * replay split cleanup flag operation for primary bucket page.
923 */
924 static void
hash_xlog_split_cleanup(XLogReaderState * record)925 hash_xlog_split_cleanup(XLogReaderState *record)
926 {
927 XLogRecPtr lsn = record->EndRecPtr;
928 Buffer buffer;
929 Page page;
930
931 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
932 {
933 HashPageOpaque bucket_opaque;
934
935 page = (Page) BufferGetPage(buffer);
936
937 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
938 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
939 PageSetLSN(page, lsn);
940 MarkBufferDirty(buffer);
941 }
942 if (BufferIsValid(buffer))
943 UnlockReleaseBuffer(buffer);
944 }
945
946 /*
947 * replay for update meta page
948 */
949 static void
hash_xlog_update_meta_page(XLogReaderState * record)950 hash_xlog_update_meta_page(XLogReaderState *record)
951 {
952 HashMetaPage metap;
953 XLogRecPtr lsn = record->EndRecPtr;
954 xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
955 Buffer metabuf;
956 Page page;
957
958 if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
959 {
960 page = BufferGetPage(metabuf);
961 metap = HashPageGetMeta(page);
962
963 metap->hashm_ntuples = xldata->ntuples;
964
965 PageSetLSN(page, lsn);
966 MarkBufferDirty(metabuf);
967 }
968 if (BufferIsValid(metabuf))
969 UnlockReleaseBuffer(metabuf);
970 }
971
972 /*
973 * Get the latestRemovedXid from the heap pages pointed at by the index
974 * tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
975 * on which this function is based.
976 */
977 static TransactionId
hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState * record)978 hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
979 {
980 xl_hash_vacuum_one_page *xlrec;
981 OffsetNumber *unused;
982 Buffer ibuffer,
983 hbuffer;
984 Page ipage,
985 hpage;
986 RelFileNode rnode;
987 BlockNumber blkno;
988 ItemId iitemid,
989 hitemid;
990 IndexTuple itup;
991 HeapTupleHeader htuphdr;
992 BlockNumber hblkno;
993 OffsetNumber hoffnum;
994 TransactionId latestRemovedXid = InvalidTransactionId;
995 int i;
996
997 xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
998
999 /*
1000 * If there's nothing running on the standby we don't need to derive a
1001 * full latestRemovedXid value, so use a fast path out of here. This
1002 * returns InvalidTransactionId, and so will conflict with all HS
1003 * transactions; but since we just worked out that that's zero people,
1004 * it's OK.
1005 *
1006 * XXX There is a race condition here, which is that a new backend might
1007 * start just after we look. If so, it cannot need to conflict, but this
1008 * coding will result in throwing a conflict anyway.
1009 */
1010 if (CountDBBackends(InvalidOid) == 0)
1011 return latestRemovedXid;
1012
1013 /*
1014 * Check if WAL replay has reached a consistent database state. If not, we
1015 * must PANIC. See the definition of
1016 * btree_xlog_delete_get_latestRemovedXid for more details.
1017 */
1018 if (!reachedConsistency)
1019 elog(PANIC, "hash_xlog_vacuum_get_latestRemovedXid: cannot operate with inconsistent data");
1020
1021 /*
1022 * Get index page. If the DB is consistent, this should not fail, nor
1023 * should any of the heap page fetches below. If one does, we return
1024 * InvalidTransactionId to cancel all HS transactions. That's probably
1025 * overkill, but it's safe, and certainly better than panicking here.
1026 */
1027 XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
1028 ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
1029
1030 if (!BufferIsValid(ibuffer))
1031 return InvalidTransactionId;
1032 LockBuffer(ibuffer, HASH_READ);
1033 ipage = (Page) BufferGetPage(ibuffer);
1034
1035 /*
1036 * Loop through the deleted index items to obtain the TransactionId from
1037 * the heap items they point to.
1038 */
1039 unused = (OffsetNumber *) ((char *) xlrec + SizeOfHashVacuumOnePage);
1040
1041 for (i = 0; i < xlrec->ntuples; i++)
1042 {
1043 /*
1044 * Identify the index tuple about to be deleted.
1045 */
1046 iitemid = PageGetItemId(ipage, unused[i]);
1047 itup = (IndexTuple) PageGetItem(ipage, iitemid);
1048
1049 /*
1050 * Locate the heap page that the index tuple points at
1051 */
1052 hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
1053 hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
1054 hblkno, RBM_NORMAL);
1055
1056 if (!BufferIsValid(hbuffer))
1057 {
1058 UnlockReleaseBuffer(ibuffer);
1059 return InvalidTransactionId;
1060 }
1061 LockBuffer(hbuffer, HASH_READ);
1062 hpage = (Page) BufferGetPage(hbuffer);
1063
1064 /*
1065 * Look up the heap tuple header that the index tuple points at by
1066 * using the heap node supplied with the xlrec. We can't use
1067 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
1068 * Note that we are not looking at tuple data here, just headers.
1069 */
1070 hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
1071 hitemid = PageGetItemId(hpage, hoffnum);
1072
1073 /*
1074 * Follow any redirections until we find something useful.
1075 */
1076 while (ItemIdIsRedirected(hitemid))
1077 {
1078 hoffnum = ItemIdGetRedirect(hitemid);
1079 hitemid = PageGetItemId(hpage, hoffnum);
1080 CHECK_FOR_INTERRUPTS();
1081 }
1082
1083 /*
1084 * If the heap item has storage, then read the header and use that to
1085 * set latestRemovedXid.
1086 *
1087 * Some LP_DEAD items may not be accessible, so we ignore them.
1088 */
1089 if (ItemIdHasStorage(hitemid))
1090 {
1091 htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
1092 HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
1093 }
1094 else if (ItemIdIsDead(hitemid))
1095 {
1096 /*
1097 * Conjecture: if hitemid is dead then it had xids before the xids
1098 * marked on LP_NORMAL items. So we just ignore this item and move
1099 * onto the next, for the purposes of calculating
1100 * latestRemovedxids.
1101 */
1102 }
1103 else
1104 Assert(!ItemIdIsUsed(hitemid));
1105
1106 UnlockReleaseBuffer(hbuffer);
1107 }
1108
1109 UnlockReleaseBuffer(ibuffer);
1110
1111 /*
1112 * If all heap tuples were LP_DEAD then we will be returning
1113 * InvalidTransactionId here, which avoids conflicts. This matches
1114 * existing logic which assumes that LP_DEAD tuples must already be older
1115 * than the latestRemovedXid on the cleanup record that set them as
1116 * LP_DEAD, hence must already have generated a conflict.
1117 */
1118 return latestRemovedXid;
1119 }
1120
1121 /*
1122 * replay delete operation in hash index to remove
1123 * tuples marked as DEAD during index tuple insertion.
1124 */
1125 static void
hash_xlog_vacuum_one_page(XLogReaderState * record)1126 hash_xlog_vacuum_one_page(XLogReaderState *record)
1127 {
1128 XLogRecPtr lsn = record->EndRecPtr;
1129 xl_hash_vacuum_one_page *xldata;
1130 Buffer buffer;
1131 Buffer metabuf;
1132 Page page;
1133 XLogRedoAction action;
1134 HashPageOpaque pageopaque;
1135
1136 xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
1137
1138 /*
1139 * If we have any conflict processing to do, it must happen before we
1140 * update the page.
1141 *
1142 * Hash index records that are marked as LP_DEAD and being removed during
1143 * hash index tuple insertion can conflict with standby queries. You might
1144 * think that vacuum records would conflict as well, but we've handled
1145 * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
1146 * cleaned by the vacuum of the heap and so we can resolve any conflicts
1147 * just once when that arrives. After that we know that no conflicts
1148 * exist from individual hash index vacuum records on that index.
1149 */
1150 if (InHotStandby)
1151 {
1152 TransactionId latestRemovedXid =
1153 hash_xlog_vacuum_get_latestRemovedXid(record);
1154 RelFileNode rnode;
1155
1156 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
1157 ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
1158 }
1159
1160 action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
1161
1162 if (action == BLK_NEEDS_REDO)
1163 {
1164 page = (Page) BufferGetPage(buffer);
1165
1166 if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
1167 {
1168 OffsetNumber *unused;
1169
1170 unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
1171
1172 PageIndexMultiDelete(page, unused, xldata->ntuples);
1173 }
1174
1175 /*
1176 * Mark the page as not containing any LP_DEAD items. See comments in
1177 * _hash_vacuum_one_page() for details.
1178 */
1179 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
1180 pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
1181
1182 PageSetLSN(page, lsn);
1183 MarkBufferDirty(buffer);
1184 }
1185 if (BufferIsValid(buffer))
1186 UnlockReleaseBuffer(buffer);
1187
1188 if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
1189 {
1190 Page metapage;
1191 HashMetaPage metap;
1192
1193 metapage = BufferGetPage(metabuf);
1194 metap = HashPageGetMeta(metapage);
1195
1196 metap->hashm_ntuples -= xldata->ntuples;
1197
1198 PageSetLSN(metapage, lsn);
1199 MarkBufferDirty(metabuf);
1200 }
1201 if (BufferIsValid(metabuf))
1202 UnlockReleaseBuffer(metabuf);
1203 }
1204
1205 void
hash_redo(XLogReaderState * record)1206 hash_redo(XLogReaderState *record)
1207 {
1208 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1209
1210 switch (info)
1211 {
1212 case XLOG_HASH_INIT_META_PAGE:
1213 hash_xlog_init_meta_page(record);
1214 break;
1215 case XLOG_HASH_INIT_BITMAP_PAGE:
1216 hash_xlog_init_bitmap_page(record);
1217 break;
1218 case XLOG_HASH_INSERT:
1219 hash_xlog_insert(record);
1220 break;
1221 case XLOG_HASH_ADD_OVFL_PAGE:
1222 hash_xlog_add_ovfl_page(record);
1223 break;
1224 case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
1225 hash_xlog_split_allocate_page(record);
1226 break;
1227 case XLOG_HASH_SPLIT_PAGE:
1228 hash_xlog_split_page(record);
1229 break;
1230 case XLOG_HASH_SPLIT_COMPLETE:
1231 hash_xlog_split_complete(record);
1232 break;
1233 case XLOG_HASH_MOVE_PAGE_CONTENTS:
1234 hash_xlog_move_page_contents(record);
1235 break;
1236 case XLOG_HASH_SQUEEZE_PAGE:
1237 hash_xlog_squeeze_page(record);
1238 break;
1239 case XLOG_HASH_DELETE:
1240 hash_xlog_delete(record);
1241 break;
1242 case XLOG_HASH_SPLIT_CLEANUP:
1243 hash_xlog_split_cleanup(record);
1244 break;
1245 case XLOG_HASH_UPDATE_META_PAGE:
1246 hash_xlog_update_meta_page(record);
1247 break;
1248 case XLOG_HASH_VACUUM_ONE_PAGE:
1249 hash_xlog_vacuum_one_page(record);
1250 break;
1251 default:
1252 elog(PANIC, "hash_redo: unknown op code %u", info);
1253 }
1254 }
1255
1256 /*
1257 * Mask a hash page before performing consistency checks on it.
1258 */
1259 void
hash_mask(char * pagedata,BlockNumber blkno)1260 hash_mask(char *pagedata, BlockNumber blkno)
1261 {
1262 Page page = (Page) pagedata;
1263 HashPageOpaque opaque;
1264 int pagetype;
1265
1266 mask_page_lsn_and_checksum(page);
1267
1268 mask_page_hint_bits(page);
1269 mask_unused_space(page);
1270
1271 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
1272
1273 pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
1274 if (pagetype == LH_UNUSED_PAGE)
1275 {
1276 /*
1277 * Mask everything on a UNUSED page.
1278 */
1279 mask_page_content(page);
1280 }
1281 else if (pagetype == LH_BUCKET_PAGE ||
1282 pagetype == LH_OVERFLOW_PAGE)
1283 {
1284 /*
1285 * In hash bucket and overflow pages, it is possible to modify the
1286 * LP_FLAGS without emitting any WAL record. Hence, mask the line
1287 * pointer flags. See hashgettuple(), _hash_kill_items() for details.
1288 */
1289 mask_lp_flags(page);
1290 }
1291
1292 /*
1293 * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
1294 * unlogged. So, mask it. See _hash_kill_items() for details.
1295 */
1296 opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
1297 }
1298