1 /*-------------------------------------------------------------------------
2 *
3 * nbtxlog.c
4 * WAL replay logic for btrees.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtxlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/bufmask.h"
18 #include "access/nbtree.h"
19 #include "access/nbtxlog.h"
20 #include "access/transam.h"
21 #include "access/xlog.h"
22 #include "access/xlogutils.h"
23 #include "storage/procarray.h"
24 #include "miscadmin.h"
25
26 /*
27 * _bt_restore_page -- re-enter all the index tuples on a page
28 *
29 * The page is freshly init'd, and *from (length len) is a copy of what
30 * had been its upper part (pd_upper to pd_special). We assume that the
31 * tuples had been added to the page in item-number order, and therefore
32 * the one with highest item number appears first (lowest on the page).
33 */
34 static void
_bt_restore_page(Page page,char * from,int len)35 _bt_restore_page(Page page, char *from, int len)
36 {
37 IndexTupleData itupdata;
38 Size itemsz;
39 char *end = from + len;
40 Item items[MaxIndexTuplesPerPage];
41 uint16 itemsizes[MaxIndexTuplesPerPage];
42 int i;
43 int nitems;
44
45 /*
46 * To get the items back in the original order, we add them to the page in
47 * reverse. To figure out where one tuple ends and another begins, we
48 * have to scan them in forward order first.
49 */
50 i = 0;
51 while (from < end)
52 {
53 /*
54 * As we step through the items, 'from' won't always be properly
55 * aligned, so we need to use memcpy(). Further, we use Item (which
56 * is just a char*) here for our items array for the same reason;
57 * wouldn't want the compiler or anyone thinking that an item is
58 * aligned when it isn't.
59 */
60 memcpy(&itupdata, from, sizeof(IndexTupleData));
61 itemsz = IndexTupleSize(&itupdata);
62 itemsz = MAXALIGN(itemsz);
63
64 items[i] = (Item) from;
65 itemsizes[i] = itemsz;
66 i++;
67
68 from += itemsz;
69 }
70 nitems = i;
71
72 for (i = nitems - 1; i >= 0; i--)
73 {
74 if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
75 false, false) == InvalidOffsetNumber)
76 elog(PANIC, "_bt_restore_page: cannot add item to page");
77 }
78 }
79
80 static void
_bt_restore_meta(XLogReaderState * record,uint8 block_id)81 _bt_restore_meta(XLogReaderState *record, uint8 block_id)
82 {
83 XLogRecPtr lsn = record->EndRecPtr;
84 Buffer metabuf;
85 Page metapg;
86 BTMetaPageData *md;
87 BTPageOpaque pageop;
88 xl_btree_metadata *xlrec;
89 char *ptr;
90 Size len;
91
92 metabuf = XLogInitBufferForRedo(record, block_id);
93 ptr = XLogRecGetBlockData(record, block_id, &len);
94
95 Assert(len == sizeof(xl_btree_metadata));
96 Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
97 xlrec = (xl_btree_metadata *) ptr;
98 metapg = BufferGetPage(metabuf);
99
100 _bt_pageinit(metapg, BufferGetPageSize(metabuf));
101
102 md = BTPageGetMeta(metapg);
103 md->btm_magic = BTREE_MAGIC;
104 md->btm_version = xlrec->version;
105 md->btm_root = xlrec->root;
106 md->btm_level = xlrec->level;
107 md->btm_fastroot = xlrec->fastroot;
108 md->btm_fastlevel = xlrec->fastlevel;
109 /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
110 Assert(md->btm_version >= BTREE_NOVAC_VERSION);
111 md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
112 md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
113
114 pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
115 pageop->btpo_flags = BTP_META;
116
117 /*
118 * Set pd_lower just past the end of the metadata. This is essential,
119 * because without doing so, metadata will be lost if xlog.c compresses
120 * the page.
121 */
122 ((PageHeader) metapg)->pd_lower =
123 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
124
125 PageSetLSN(metapg, lsn);
126 MarkBufferDirty(metabuf);
127 UnlockReleaseBuffer(metabuf);
128 }
129
130 /*
131 * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
132 *
133 * This is a common subroutine of the redo functions of all the WAL record
134 * types that can insert a downlink: insert, split, and newroot.
135 */
136 static void
_bt_clear_incomplete_split(XLogReaderState * record,uint8 block_id)137 _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
138 {
139 XLogRecPtr lsn = record->EndRecPtr;
140 Buffer buf;
141
142 if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
143 {
144 Page page = (Page) BufferGetPage(buf);
145 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
146
147 Assert(P_INCOMPLETE_SPLIT(pageop));
148 pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
149
150 PageSetLSN(page, lsn);
151 MarkBufferDirty(buf);
152 }
153 if (BufferIsValid(buf))
154 UnlockReleaseBuffer(buf);
155 }
156
157 static void
btree_xlog_insert(bool isleaf,bool ismeta,XLogReaderState * record)158 btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
159 {
160 XLogRecPtr lsn = record->EndRecPtr;
161 xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
162 Buffer buffer;
163 Page page;
164
165 /*
166 * Insertion to an internal page finishes an incomplete split at the child
167 * level. Clear the incomplete-split flag in the child. Note: during
168 * normal operation, the child and parent pages are locked at the same
169 * time, so that clearing the flag and inserting the downlink appear
170 * atomic to other backends. We don't bother with that during replay,
171 * because readers don't care about the incomplete-split flag and there
172 * cannot be updates happening.
173 */
174 if (!isleaf)
175 _bt_clear_incomplete_split(record, 1);
176 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
177 {
178 Size datalen;
179 char *datapos = XLogRecGetBlockData(record, 0, &datalen);
180
181 page = BufferGetPage(buffer);
182
183 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
184 false, false) == InvalidOffsetNumber)
185 elog(PANIC, "btree_xlog_insert: failed to add item");
186
187 PageSetLSN(page, lsn);
188 MarkBufferDirty(buffer);
189 }
190 if (BufferIsValid(buffer))
191 UnlockReleaseBuffer(buffer);
192
193 /*
194 * Note: in normal operation, we'd update the metapage while still holding
195 * lock on the page we inserted into. But during replay it's not
196 * necessary to hold that lock, since no other index updates can be
197 * happening concurrently, and readers will cope fine with following an
198 * obsolete link from the metapage.
199 */
200 if (ismeta)
201 _bt_restore_meta(record, 2);
202 }
203
204 static void
btree_xlog_split(bool onleft,XLogReaderState * record)205 btree_xlog_split(bool onleft, XLogReaderState *record)
206 {
207 XLogRecPtr lsn = record->EndRecPtr;
208 xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
209 bool isleaf = (xlrec->level == 0);
210 Buffer lbuf;
211 Buffer rbuf;
212 Page rpage;
213 BTPageOpaque ropaque;
214 char *datapos;
215 Size datalen;
216 BlockNumber leftsib;
217 BlockNumber rightsib;
218 BlockNumber rnext;
219
220 XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
221 XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
222 if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
223 rnext = P_NONE;
224
225 /*
226 * Clear the incomplete split flag on the left sibling of the child page
227 * this is a downlink for. (Like in btree_xlog_insert, this can be done
228 * before locking the other pages)
229 */
230 if (!isleaf)
231 _bt_clear_incomplete_split(record, 3);
232
233 /* Reconstruct right (new) sibling page from scratch */
234 rbuf = XLogInitBufferForRedo(record, 1);
235 datapos = XLogRecGetBlockData(record, 1, &datalen);
236 rpage = (Page) BufferGetPage(rbuf);
237
238 _bt_pageinit(rpage, BufferGetPageSize(rbuf));
239 ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
240
241 ropaque->btpo_prev = leftsib;
242 ropaque->btpo_next = rnext;
243 ropaque->btpo.level = xlrec->level;
244 ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
245 ropaque->btpo_cycleid = 0;
246
247 _bt_restore_page(rpage, datapos, datalen);
248
249 PageSetLSN(rpage, lsn);
250 MarkBufferDirty(rbuf);
251
252 /* Now reconstruct left (original) sibling page */
253 if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
254 {
255 /*
256 * To retain the same physical order of the tuples that they had, we
257 * initialize a temporary empty page for the left page and add all the
258 * items to that in item number order. This mirrors how _bt_split()
259 * works. Retaining the same physical order makes WAL consistency
260 * checking possible. See also _bt_restore_page(), which does the
261 * same for the right page.
262 */
263 Page lpage = (Page) BufferGetPage(lbuf);
264 BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
265 OffsetNumber off;
266 IndexTuple newitem = NULL,
267 left_hikey = NULL;
268 Size newitemsz = 0,
269 left_hikeysz = 0;
270 Page newlpage;
271 OffsetNumber leftoff;
272
273 datapos = XLogRecGetBlockData(record, 0, &datalen);
274
275 if (onleft)
276 {
277 newitem = (IndexTuple) datapos;
278 newitemsz = MAXALIGN(IndexTupleSize(newitem));
279 datapos += newitemsz;
280 datalen -= newitemsz;
281 }
282
283 /* Extract left hikey and its size (assuming 16-bit alignment) */
284 left_hikey = (IndexTuple) datapos;
285 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
286 datapos += left_hikeysz;
287 datalen -= left_hikeysz;
288
289 Assert(datalen == 0);
290
291 newlpage = PageGetTempPageCopySpecial(lpage);
292
293 /* Set high key */
294 leftoff = P_HIKEY;
295 if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz,
296 P_HIKEY, false, false) == InvalidOffsetNumber)
297 elog(PANIC, "failed to add high key to left page after split");
298 leftoff = OffsetNumberNext(leftoff);
299
300 for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
301 {
302 ItemId itemid;
303 Size itemsz;
304 IndexTuple item;
305
306 /* add the new item if it was inserted on left page */
307 if (onleft && off == xlrec->newitemoff)
308 {
309 if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
310 false, false) == InvalidOffsetNumber)
311 elog(ERROR, "failed to add new item to left page after split");
312 leftoff = OffsetNumberNext(leftoff);
313 }
314
315 itemid = PageGetItemId(lpage, off);
316 itemsz = ItemIdGetLength(itemid);
317 item = (IndexTuple) PageGetItem(lpage, itemid);
318 if (PageAddItem(newlpage, (Item) item, itemsz, leftoff,
319 false, false) == InvalidOffsetNumber)
320 elog(ERROR, "failed to add old item to left page after split");
321 leftoff = OffsetNumberNext(leftoff);
322 }
323
324 /* cope with possibility that newitem goes at the end */
325 if (onleft && off == xlrec->newitemoff)
326 {
327 if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
328 false, false) == InvalidOffsetNumber)
329 elog(ERROR, "failed to add new item to left page after split");
330 leftoff = OffsetNumberNext(leftoff);
331 }
332
333 PageRestoreTempPage(newlpage, lpage);
334
335 /* Fix opaque fields */
336 lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
337 if (isleaf)
338 lopaque->btpo_flags |= BTP_LEAF;
339 lopaque->btpo_next = rightsib;
340 lopaque->btpo_cycleid = 0;
341
342 PageSetLSN(lpage, lsn);
343 MarkBufferDirty(lbuf);
344 }
345
346 /*
347 * We no longer need the buffers. They must be released together, so that
348 * readers cannot observe two inconsistent halves.
349 */
350 if (BufferIsValid(lbuf))
351 UnlockReleaseBuffer(lbuf);
352 UnlockReleaseBuffer(rbuf);
353
354 /*
355 * Fix left-link of the page to the right of the new right sibling.
356 *
357 * Note: in normal operation, we do this while still holding lock on the
358 * two split pages. However, that's not necessary for correctness in WAL
359 * replay, because no other index update can be in progress, and readers
360 * will cope properly when following an obsolete left-link.
361 */
362 if (rnext != P_NONE)
363 {
364 Buffer buffer;
365
366 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
367 {
368 Page page = (Page) BufferGetPage(buffer);
369 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
370
371 pageop->btpo_prev = rightsib;
372
373 PageSetLSN(page, lsn);
374 MarkBufferDirty(buffer);
375 }
376 if (BufferIsValid(buffer))
377 UnlockReleaseBuffer(buffer);
378 }
379 }
380
381 static void
btree_xlog_vacuum(XLogReaderState * record)382 btree_xlog_vacuum(XLogReaderState *record)
383 {
384 XLogRecPtr lsn = record->EndRecPtr;
385 Buffer buffer;
386 Page page;
387 BTPageOpaque opaque;
388 #ifdef UNUSED
389 xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
390
391 /*
392 * This section of code is thought to be no longer needed, after analysis
393 * of the calling paths. It is retained to allow the code to be reinstated
394 * if a flaw is revealed in that thinking.
395 *
396 * If we are running non-MVCC scans using this index we need to do some
397 * additional work to ensure correctness, which is known as a "pin scan"
398 * described in more detail in next paragraphs. We used to do the extra
399 * work in all cases, whereas we now avoid that work in most cases. If
400 * lastBlockVacuumed is set to InvalidBlockNumber then we skip the
401 * additional work required for the pin scan.
402 *
403 * Avoiding this extra work is important since it requires us to touch
404 * every page in the index, so is an O(N) operation. Worse, it is an
405 * operation performed in the foreground during redo, so it delays
406 * replication directly.
407 *
408 * If queries might be active then we need to ensure every leaf page is
409 * unpinned between the lastBlockVacuumed and the current block, if there
410 * are any. This prevents replay of the VACUUM from reaching the stage of
411 * removing heap tuples while there could still be indexscans "in flight"
412 * to those particular tuples for those scans which could be confused by
413 * finding new tuples at the old TID locations (see nbtree/README).
414 *
415 * It might be worth checking if there are actually any backends running;
416 * if not, we could just skip this.
417 *
418 * Since VACUUM can visit leaf pages out-of-order, it might issue records
419 * with lastBlockVacuumed >= block; that's not an error, it just means
420 * nothing to do now.
421 *
422 * Note: since we touch all pages in the range, we will lock non-leaf
423 * pages, and also any empty (all-zero) pages that may be in the index. It
424 * doesn't seem worth the complexity to avoid that. But it's important
425 * that HotStandbyActiveInReplay() will not return true if the database
426 * isn't yet consistent; so we need not fear reading still-corrupt blocks
427 * here during crash recovery.
428 */
429 if (HotStandbyActiveInReplay() && BlockNumberIsValid(xlrec->lastBlockVacuumed))
430 {
431 RelFileNode thisrnode;
432 BlockNumber thisblkno;
433 BlockNumber blkno;
434
435 XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
436
437 for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
438 {
439 /*
440 * We use RBM_NORMAL_NO_LOG mode because it's not an error
441 * condition to see all-zero pages. The original btvacuumpage
442 * scan would have skipped over all-zero pages, noting them in FSM
443 * but not bothering to initialize them just yet; so we mustn't
444 * throw an error here. (We could skip acquiring the cleanup lock
445 * if PageIsNew, but it's probably not worth the cycles to test.)
446 *
447 * XXX we don't actually need to read the block, we just need to
448 * confirm it is unpinned. If we had a special call into the
449 * buffer manager we could optimise this so that if the block is
450 * not in shared_buffers we confirm it as unpinned. Optimizing
451 * this is now moot, since in most cases we avoid the scan.
452 */
453 buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
454 RBM_NORMAL_NO_LOG);
455 if (BufferIsValid(buffer))
456 {
457 LockBufferForCleanup(buffer);
458 UnlockReleaseBuffer(buffer);
459 }
460 }
461 }
462 #endif
463
464 /*
465 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
466 * page. See nbtree/README for details.
467 */
468 if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
469 == BLK_NEEDS_REDO)
470 {
471 char *ptr;
472 Size len;
473
474 ptr = XLogRecGetBlockData(record, 0, &len);
475
476 page = (Page) BufferGetPage(buffer);
477
478 if (len > 0)
479 {
480 OffsetNumber *unused;
481 OffsetNumber *unend;
482
483 unused = (OffsetNumber *) ptr;
484 unend = (OffsetNumber *) ((char *) ptr + len);
485
486 if ((unend - unused) > 0)
487 PageIndexMultiDelete(page, unused, unend - unused);
488 }
489
490 /*
491 * Mark the page as not containing any LP_DEAD items --- see comments
492 * in _bt_delitems_vacuum().
493 */
494 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
495 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
496
497 PageSetLSN(page, lsn);
498 MarkBufferDirty(buffer);
499 }
500 if (BufferIsValid(buffer))
501 UnlockReleaseBuffer(buffer);
502 }
503
504 static void
btree_xlog_delete(XLogReaderState * record)505 btree_xlog_delete(XLogReaderState *record)
506 {
507 XLogRecPtr lsn = record->EndRecPtr;
508 xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
509 Buffer buffer;
510 Page page;
511 BTPageOpaque opaque;
512
513 /*
514 * If we have any conflict processing to do, it must happen before we
515 * update the page.
516 *
517 * Btree delete records can conflict with standby queries. You might
518 * think that vacuum records would conflict as well, but we've handled
519 * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
520 * cleaned by the vacuum of the heap and so we can resolve any conflicts
521 * just once when that arrives. After that we know that no conflicts
522 * exist from individual btree vacuum records on that index.
523 */
524 if (InHotStandby)
525 {
526 RelFileNode rnode;
527
528 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
529
530 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
531 }
532
533 /*
534 * We don't need to take a cleanup lock to apply these changes. See
535 * nbtree/README for details.
536 */
537 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
538 {
539 page = (Page) BufferGetPage(buffer);
540
541 if (XLogRecGetDataLen(record) > SizeOfBtreeDelete)
542 {
543 OffsetNumber *unused;
544
545 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
546
547 PageIndexMultiDelete(page, unused, xlrec->nitems);
548 }
549
550 /*
551 * Mark the page as not containing any LP_DEAD items --- see comments
552 * in _bt_delitems_delete().
553 */
554 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
555 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
556
557 PageSetLSN(page, lsn);
558 MarkBufferDirty(buffer);
559 }
560 if (BufferIsValid(buffer))
561 UnlockReleaseBuffer(buffer);
562 }
563
564 static void
btree_xlog_mark_page_halfdead(uint8 info,XLogReaderState * record)565 btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
566 {
567 XLogRecPtr lsn = record->EndRecPtr;
568 xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
569 Buffer buffer;
570 Page page;
571 BTPageOpaque pageop;
572 IndexTupleData trunctuple;
573
574 /*
575 * In normal operation, we would lock all the pages this WAL record
576 * touches before changing any of them. In WAL replay, it should be okay
577 * to lock just one page at a time, since no concurrent index updates can
578 * be happening, and readers should not care whether they arrive at the
579 * target page or not (since it's surely empty).
580 */
581
582 /* parent page */
583 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
584 {
585 OffsetNumber poffset;
586 ItemId itemid;
587 IndexTuple itup;
588 OffsetNumber nextoffset;
589 BlockNumber rightsib;
590
591 page = (Page) BufferGetPage(buffer);
592 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
593
594 poffset = xlrec->poffset;
595
596 nextoffset = OffsetNumberNext(poffset);
597 itemid = PageGetItemId(page, nextoffset);
598 itup = (IndexTuple) PageGetItem(page, itemid);
599 rightsib = BTreeInnerTupleGetDownLink(itup);
600
601 itemid = PageGetItemId(page, poffset);
602 itup = (IndexTuple) PageGetItem(page, itemid);
603 BTreeInnerTupleSetDownLink(itup, rightsib);
604 nextoffset = OffsetNumberNext(poffset);
605 PageIndexTupleDelete(page, nextoffset);
606
607 PageSetLSN(page, lsn);
608 MarkBufferDirty(buffer);
609 }
610 if (BufferIsValid(buffer))
611 UnlockReleaseBuffer(buffer);
612
613 /* Rewrite the leaf page as a halfdead page */
614 buffer = XLogInitBufferForRedo(record, 0);
615 page = (Page) BufferGetPage(buffer);
616
617 _bt_pageinit(page, BufferGetPageSize(buffer));
618 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
619
620 pageop->btpo_prev = xlrec->leftblk;
621 pageop->btpo_next = xlrec->rightblk;
622 pageop->btpo.level = 0;
623 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
624 pageop->btpo_cycleid = 0;
625
626 /*
627 * Construct a dummy hikey item that points to the next parent to be
628 * deleted (if any).
629 */
630 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
631 trunctuple.t_info = sizeof(IndexTupleData);
632 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
633
634 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
635 false, false) == InvalidOffsetNumber)
636 elog(ERROR, "could not add dummy high key to half-dead page");
637
638 PageSetLSN(page, lsn);
639 MarkBufferDirty(buffer);
640 UnlockReleaseBuffer(buffer);
641 }
642
643
644 static void
btree_xlog_unlink_page(uint8 info,XLogReaderState * record)645 btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
646 {
647 XLogRecPtr lsn = record->EndRecPtr;
648 xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
649 BlockNumber leftsib;
650 BlockNumber rightsib;
651 Buffer buffer;
652 Page page;
653 BTPageOpaque pageop;
654
655 leftsib = xlrec->leftsib;
656 rightsib = xlrec->rightsib;
657
658 /*
659 * In normal operation, we would lock all the pages this WAL record
660 * touches before changing any of them. In WAL replay, it should be okay
661 * to lock just one page at a time, since no concurrent index updates can
662 * be happening, and readers should not care whether they arrive at the
663 * target page or not (since it's surely empty).
664 */
665
666 /* Fix left-link of right sibling */
667 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
668 {
669 page = (Page) BufferGetPage(buffer);
670 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
671 pageop->btpo_prev = leftsib;
672
673 PageSetLSN(page, lsn);
674 MarkBufferDirty(buffer);
675 }
676 if (BufferIsValid(buffer))
677 UnlockReleaseBuffer(buffer);
678
679 /* Fix right-link of left sibling, if any */
680 if (leftsib != P_NONE)
681 {
682 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
683 {
684 page = (Page) BufferGetPage(buffer);
685 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
686 pageop->btpo_next = rightsib;
687
688 PageSetLSN(page, lsn);
689 MarkBufferDirty(buffer);
690 }
691 if (BufferIsValid(buffer))
692 UnlockReleaseBuffer(buffer);
693 }
694
695 /* Rewrite target page as empty deleted page */
696 buffer = XLogInitBufferForRedo(record, 0);
697 page = (Page) BufferGetPage(buffer);
698
699 _bt_pageinit(page, BufferGetPageSize(buffer));
700 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
701
702 pageop->btpo_prev = leftsib;
703 pageop->btpo_next = rightsib;
704 pageop->btpo.xact = xlrec->btpo_xact;
705 pageop->btpo_flags = BTP_DELETED;
706 pageop->btpo_cycleid = 0;
707
708 PageSetLSN(page, lsn);
709 MarkBufferDirty(buffer);
710 UnlockReleaseBuffer(buffer);
711
712 /*
713 * If we deleted a parent of the targeted leaf page, instead of the leaf
714 * itself, update the leaf to point to the next remaining child in the
715 * branch.
716 */
717 if (XLogRecHasBlockRef(record, 3))
718 {
719 /*
720 * There is no real data on the page, so we just re-create it from
721 * scratch using the information from the WAL record.
722 */
723 IndexTupleData trunctuple;
724
725 buffer = XLogInitBufferForRedo(record, 3);
726 page = (Page) BufferGetPage(buffer);
727
728 _bt_pageinit(page, BufferGetPageSize(buffer));
729 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
730
731 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
732 pageop->btpo_prev = xlrec->leafleftsib;
733 pageop->btpo_next = xlrec->leafrightsib;
734 pageop->btpo.level = 0;
735 pageop->btpo_cycleid = 0;
736
737 /* Add a dummy hikey item */
738 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
739 trunctuple.t_info = sizeof(IndexTupleData);
740 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
741
742 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
743 false, false) == InvalidOffsetNumber)
744 elog(ERROR, "could not add dummy high key to half-dead page");
745
746 PageSetLSN(page, lsn);
747 MarkBufferDirty(buffer);
748 UnlockReleaseBuffer(buffer);
749 }
750
751 /* Update metapage if needed */
752 if (info == XLOG_BTREE_UNLINK_PAGE_META)
753 _bt_restore_meta(record, 4);
754 }
755
756 static void
btree_xlog_newroot(XLogReaderState * record)757 btree_xlog_newroot(XLogReaderState *record)
758 {
759 XLogRecPtr lsn = record->EndRecPtr;
760 xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
761 Buffer buffer;
762 Page page;
763 BTPageOpaque pageop;
764 char *ptr;
765 Size len;
766
767 buffer = XLogInitBufferForRedo(record, 0);
768 page = (Page) BufferGetPage(buffer);
769
770 _bt_pageinit(page, BufferGetPageSize(buffer));
771 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
772
773 pageop->btpo_flags = BTP_ROOT;
774 pageop->btpo_prev = pageop->btpo_next = P_NONE;
775 pageop->btpo.level = xlrec->level;
776 if (xlrec->level == 0)
777 pageop->btpo_flags |= BTP_LEAF;
778 pageop->btpo_cycleid = 0;
779
780 if (xlrec->level > 0)
781 {
782 ptr = XLogRecGetBlockData(record, 0, &len);
783 _bt_restore_page(page, ptr, len);
784
785 /* Clear the incomplete-split flag in left child */
786 _bt_clear_incomplete_split(record, 1);
787 }
788
789 PageSetLSN(page, lsn);
790 MarkBufferDirty(buffer);
791 UnlockReleaseBuffer(buffer);
792
793 _bt_restore_meta(record, 2);
794 }
795
796 static void
btree_xlog_reuse_page(XLogReaderState * record)797 btree_xlog_reuse_page(XLogReaderState *record)
798 {
799 xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
800
801 /*
802 * Btree reuse_page records exist to provide a conflict point when we
803 * reuse pages in the index via the FSM. That's all they do though.
804 *
805 * latestRemovedXid was the page's btpo.xact. The btpo.xact <
806 * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
807 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
808 * Consequently, one XID value achieves the same exclusion effect on
809 * master and standby.
810 */
811 if (InHotStandby)
812 {
813 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
814 xlrec->node);
815 }
816 }
817
818 void
btree_redo(XLogReaderState * record)819 btree_redo(XLogReaderState *record)
820 {
821 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
822
823 switch (info)
824 {
825 case XLOG_BTREE_INSERT_LEAF:
826 btree_xlog_insert(true, false, record);
827 break;
828 case XLOG_BTREE_INSERT_UPPER:
829 btree_xlog_insert(false, false, record);
830 break;
831 case XLOG_BTREE_INSERT_META:
832 btree_xlog_insert(false, true, record);
833 break;
834 case XLOG_BTREE_SPLIT_L:
835 btree_xlog_split(true, record);
836 break;
837 case XLOG_BTREE_SPLIT_R:
838 btree_xlog_split(false, record);
839 break;
840 case XLOG_BTREE_VACUUM:
841 btree_xlog_vacuum(record);
842 break;
843 case XLOG_BTREE_DELETE:
844 btree_xlog_delete(record);
845 break;
846 case XLOG_BTREE_MARK_PAGE_HALFDEAD:
847 btree_xlog_mark_page_halfdead(info, record);
848 break;
849 case XLOG_BTREE_UNLINK_PAGE:
850 case XLOG_BTREE_UNLINK_PAGE_META:
851 btree_xlog_unlink_page(info, record);
852 break;
853 case XLOG_BTREE_NEWROOT:
854 btree_xlog_newroot(record);
855 break;
856 case XLOG_BTREE_REUSE_PAGE:
857 btree_xlog_reuse_page(record);
858 break;
859 case XLOG_BTREE_META_CLEANUP:
860 _bt_restore_meta(record, 0);
861 break;
862 default:
863 elog(PANIC, "btree_redo: unknown op code %u", info);
864 }
865 }
866
867 /*
868 * Mask a btree page before performing consistency checks on it.
869 */
870 void
btree_mask(char * pagedata,BlockNumber blkno)871 btree_mask(char *pagedata, BlockNumber blkno)
872 {
873 Page page = (Page) pagedata;
874 BTPageOpaque maskopaq;
875
876 mask_page_lsn_and_checksum(page);
877
878 mask_page_hint_bits(page);
879 mask_unused_space(page);
880
881 maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
882
883 if (P_ISDELETED(maskopaq))
884 {
885 /*
886 * Mask page content on a DELETED page since it will be re-initialized
887 * during replay. See btree_xlog_unlink_page() for details.
888 */
889 mask_page_content(page);
890 }
891 else if (P_ISLEAF(maskopaq))
892 {
893 /*
894 * In btree leaf pages, it is possible to modify the LP_FLAGS without
895 * emitting any WAL record. Hence, mask the line pointer flags. See
896 * _bt_killitems(), _bt_check_unique() for details.
897 */
898 mask_lp_flags(page);
899 }
900
901 /*
902 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
903 * _bt_killitems(), _bt_check_unique() for details.
904 */
905 maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
906
907 /*
908 * During replay of a btree page split, we don't set the BTP_SPLIT_END
909 * flag of the right sibling and initialize the cycle_id to 0 for the same
910 * page. See btree_xlog_split() for details.
911 */
912 maskopaq->btpo_flags &= ~BTP_SPLIT_END;
913 maskopaq->btpo_cycleid = 0;
914 }
915