1 /*-------------------------------------------------------------------------
2 *
3 * nbtxlog.c
4 * WAL replay logic for btrees.
5 *
6 *
7 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtxlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/bufmask.h"
18 #include "access/heapam_xlog.h"
19 #include "access/nbtree.h"
20 #include "access/nbtxlog.h"
21 #include "access/transam.h"
22 #include "access/xlog.h"
23 #include "access/xlogutils.h"
24 #include "storage/procarray.h"
25 #include "miscadmin.h"
26
27 /*
28 * _bt_restore_page -- re-enter all the index tuples on a page
29 *
30 * The page is freshly init'd, and *from (length len) is a copy of what
31 * had been its upper part (pd_upper to pd_special). We assume that the
32 * tuples had been added to the page in item-number order, and therefore
33 * the one with highest item number appears first (lowest on the page).
34 */
35 static void
_bt_restore_page(Page page,char * from,int len)36 _bt_restore_page(Page page, char *from, int len)
37 {
38 IndexTupleData itupdata;
39 Size itemsz;
40 char *end = from + len;
41 Item items[MaxIndexTuplesPerPage];
42 uint16 itemsizes[MaxIndexTuplesPerPage];
43 int i;
44 int nitems;
45
46 /*
47 * To get the items back in the original order, we add them to the page in
48 * reverse. To figure out where one tuple ends and another begins, we
49 * have to scan them in forward order first.
50 */
51 i = 0;
52 while (from < end)
53 {
54 /*
55 * As we step through the items, 'from' won't always be properly
56 * aligned, so we need to use memcpy(). Further, we use Item (which
57 * is just a char*) here for our items array for the same reason;
58 * wouldn't want the compiler or anyone thinking that an item is
59 * aligned when it isn't.
60 */
61 memcpy(&itupdata, from, sizeof(IndexTupleData));
62 itemsz = IndexTupleSize(&itupdata);
63 itemsz = MAXALIGN(itemsz);
64
65 items[i] = (Item) from;
66 itemsizes[i] = itemsz;
67 i++;
68
69 from += itemsz;
70 }
71 nitems = i;
72
73 for (i = nitems - 1; i >= 0; i--)
74 {
75 if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
76 false, false) == InvalidOffsetNumber)
77 elog(PANIC, "_bt_restore_page: cannot add item to page");
78 }
79 }
80
81 static void
_bt_restore_meta(XLogReaderState * record,uint8 block_id)82 _bt_restore_meta(XLogReaderState *record, uint8 block_id)
83 {
84 XLogRecPtr lsn = record->EndRecPtr;
85 Buffer metabuf;
86 Page metapg;
87 BTMetaPageData *md;
88 BTPageOpaque pageop;
89 xl_btree_metadata *xlrec;
90 char *ptr;
91 Size len;
92
93 metabuf = XLogInitBufferForRedo(record, block_id);
94 ptr = XLogRecGetBlockData(record, block_id, &len);
95
96 Assert(len == sizeof(xl_btree_metadata));
97 Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
98 xlrec = (xl_btree_metadata *) ptr;
99 metapg = BufferGetPage(metabuf);
100
101 _bt_pageinit(metapg, BufferGetPageSize(metabuf));
102
103 md = BTPageGetMeta(metapg);
104 md->btm_magic = BTREE_MAGIC;
105 md->btm_version = BTREE_VERSION;
106 md->btm_root = xlrec->root;
107 md->btm_level = xlrec->level;
108 md->btm_fastroot = xlrec->fastroot;
109 md->btm_fastlevel = xlrec->fastlevel;
110 /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
111 Assert(md->btm_version == BTREE_VERSION);
112 md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
113 md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
114
115 pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
116 pageop->btpo_flags = BTP_META;
117
118 /*
119 * Set pd_lower just past the end of the metadata. This is essential,
120 * because without doing so, metadata will be lost if xlog.c compresses
121 * the page.
122 */
123 ((PageHeader) metapg)->pd_lower =
124 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
125
126 PageSetLSN(metapg, lsn);
127 MarkBufferDirty(metabuf);
128 UnlockReleaseBuffer(metabuf);
129 }
130
131 /*
132 * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
133 *
134 * This is a common subroutine of the redo functions of all the WAL record
135 * types that can insert a downlink: insert, split, and newroot.
136 */
137 static void
_bt_clear_incomplete_split(XLogReaderState * record,uint8 block_id)138 _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
139 {
140 XLogRecPtr lsn = record->EndRecPtr;
141 Buffer buf;
142
143 if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
144 {
145 Page page = (Page) BufferGetPage(buf);
146 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
147
148 Assert(P_INCOMPLETE_SPLIT(pageop));
149 pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
150
151 PageSetLSN(page, lsn);
152 MarkBufferDirty(buf);
153 }
154 if (BufferIsValid(buf))
155 UnlockReleaseBuffer(buf);
156 }
157
158 static void
btree_xlog_insert(bool isleaf,bool ismeta,XLogReaderState * record)159 btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
160 {
161 XLogRecPtr lsn = record->EndRecPtr;
162 xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
163 Buffer buffer;
164 Page page;
165
166 /*
167 * Insertion to an internal page finishes an incomplete split at the child
168 * level. Clear the incomplete-split flag in the child. Note: during
169 * normal operation, the child and parent pages are locked at the same
170 * time, so that clearing the flag and inserting the downlink appear
171 * atomic to other backends. We don't bother with that during replay,
172 * because readers don't care about the incomplete-split flag and there
173 * cannot be updates happening.
174 */
175 if (!isleaf)
176 _bt_clear_incomplete_split(record, 1);
177 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
178 {
179 Size datalen;
180 char *datapos = XLogRecGetBlockData(record, 0, &datalen);
181
182 page = BufferGetPage(buffer);
183
184 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
185 false, false) == InvalidOffsetNumber)
186 elog(PANIC, "btree_insert_redo: failed to add item");
187
188 PageSetLSN(page, lsn);
189 MarkBufferDirty(buffer);
190 }
191 if (BufferIsValid(buffer))
192 UnlockReleaseBuffer(buffer);
193
194 /*
195 * Note: in normal operation, we'd update the metapage while still holding
196 * lock on the page we inserted into. But during replay it's not
197 * necessary to hold that lock, since no other index updates can be
198 * happening concurrently, and readers will cope fine with following an
199 * obsolete link from the metapage.
200 */
201 if (ismeta)
202 _bt_restore_meta(record, 2);
203 }
204
205 static void
btree_xlog_split(bool onleft,bool lhighkey,XLogReaderState * record)206 btree_xlog_split(bool onleft, bool lhighkey, XLogReaderState *record)
207 {
208 XLogRecPtr lsn = record->EndRecPtr;
209 xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
210 bool isleaf = (xlrec->level == 0);
211 Buffer lbuf;
212 Buffer rbuf;
213 Page rpage;
214 BTPageOpaque ropaque;
215 char *datapos;
216 Size datalen;
217 IndexTuple left_hikey = NULL;
218 Size left_hikeysz = 0;
219 BlockNumber leftsib;
220 BlockNumber rightsib;
221 BlockNumber rnext;
222
223 XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
224 XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
225 if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
226 rnext = P_NONE;
227
228 /*
229 * Clear the incomplete split flag on the left sibling of the child page
230 * this is a downlink for. (Like in btree_xlog_insert, this can be done
231 * before locking the other pages)
232 */
233 if (!isleaf)
234 _bt_clear_incomplete_split(record, 3);
235
236 /* Reconstruct right (new) sibling page from scratch */
237 rbuf = XLogInitBufferForRedo(record, 1);
238 datapos = XLogRecGetBlockData(record, 1, &datalen);
239 rpage = (Page) BufferGetPage(rbuf);
240
241 _bt_pageinit(rpage, BufferGetPageSize(rbuf));
242 ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
243
244 ropaque->btpo_prev = leftsib;
245 ropaque->btpo_next = rnext;
246 ropaque->btpo.level = xlrec->level;
247 ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
248 ropaque->btpo_cycleid = 0;
249
250 _bt_restore_page(rpage, datapos, datalen);
251
252 /*
253 * When the high key isn't present is the wal record, then we assume it to
254 * be equal to the first key on the right page. It must be from the leaf
255 * level.
256 */
257 if (!lhighkey)
258 {
259 ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
260
261 Assert(isleaf);
262 left_hikey = (IndexTuple) PageGetItem(rpage, hiItemId);
263 left_hikeysz = ItemIdGetLength(hiItemId);
264 }
265
266 PageSetLSN(rpage, lsn);
267 MarkBufferDirty(rbuf);
268
269 /* don't release the buffer yet; we touch right page's first item below */
270
271 /* Now reconstruct left (original) sibling page */
272 if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
273 {
274 /*
275 * To retain the same physical order of the tuples that they had, we
276 * initialize a temporary empty page for the left page and add all the
277 * items to that in item number order. This mirrors how _bt_split()
278 * works. It's not strictly required to retain the same physical
279 * order, as long as the items are in the correct item number order,
280 * but it helps debugging. See also _bt_restore_page(), which does
281 * the same for the right page.
282 */
283 Page lpage = (Page) BufferGetPage(lbuf);
284 BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
285 OffsetNumber off;
286 IndexTuple newitem = NULL;
287 Size newitemsz = 0;
288 Page newlpage;
289 OffsetNumber leftoff;
290
291 datapos = XLogRecGetBlockData(record, 0, &datalen);
292
293 if (onleft)
294 {
295 newitem = (IndexTuple) datapos;
296 newitemsz = MAXALIGN(IndexTupleSize(newitem));
297 datapos += newitemsz;
298 datalen -= newitemsz;
299 }
300
301 /* Extract left hikey and its size (assuming 16-bit alignment) */
302 if (lhighkey)
303 {
304 left_hikey = (IndexTuple) datapos;
305 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
306 datapos += left_hikeysz;
307 datalen -= left_hikeysz;
308 }
309
310 Assert(datalen == 0);
311
312 newlpage = PageGetTempPageCopySpecial(lpage);
313
314 /* Set high key */
315 leftoff = P_HIKEY;
316 if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz,
317 P_HIKEY, false, false) == InvalidOffsetNumber)
318 elog(PANIC, "failed to add high key to left page after split");
319 leftoff = OffsetNumberNext(leftoff);
320
321 for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
322 {
323 ItemId itemid;
324 Size itemsz;
325 IndexTuple item;
326
327 /* add the new item if it was inserted on left page */
328 if (onleft && off == xlrec->newitemoff)
329 {
330 if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
331 false, false) == InvalidOffsetNumber)
332 elog(ERROR, "failed to add new item to left page after split");
333 leftoff = OffsetNumberNext(leftoff);
334 }
335
336 itemid = PageGetItemId(lpage, off);
337 itemsz = ItemIdGetLength(itemid);
338 item = (IndexTuple) PageGetItem(lpage, itemid);
339 if (PageAddItem(newlpage, (Item) item, itemsz, leftoff,
340 false, false) == InvalidOffsetNumber)
341 elog(ERROR, "failed to add old item to left page after split");
342 leftoff = OffsetNumberNext(leftoff);
343 }
344
345 /* cope with possibility that newitem goes at the end */
346 if (onleft && off == xlrec->newitemoff)
347 {
348 if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
349 false, false) == InvalidOffsetNumber)
350 elog(ERROR, "failed to add new item to left page after split");
351 leftoff = OffsetNumberNext(leftoff);
352 }
353
354 PageRestoreTempPage(newlpage, lpage);
355
356 /* Fix opaque fields */
357 lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
358 if (isleaf)
359 lopaque->btpo_flags |= BTP_LEAF;
360 lopaque->btpo_next = rightsib;
361 lopaque->btpo_cycleid = 0;
362
363 PageSetLSN(lpage, lsn);
364 MarkBufferDirty(lbuf);
365 }
366
367 /* We no longer need the buffers */
368 if (BufferIsValid(lbuf))
369 UnlockReleaseBuffer(lbuf);
370 UnlockReleaseBuffer(rbuf);
371
372 /*
373 * Fix left-link of the page to the right of the new right sibling.
374 *
375 * Note: in normal operation, we do this while still holding lock on the
376 * two split pages. However, that's not necessary for correctness in WAL
377 * replay, because no other index update can be in progress, and readers
378 * will cope properly when following an obsolete left-link.
379 */
380 if (rnext != P_NONE)
381 {
382 Buffer buffer;
383
384 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
385 {
386 Page page = (Page) BufferGetPage(buffer);
387 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
388
389 pageop->btpo_prev = rightsib;
390
391 PageSetLSN(page, lsn);
392 MarkBufferDirty(buffer);
393 }
394 if (BufferIsValid(buffer))
395 UnlockReleaseBuffer(buffer);
396 }
397 }
398
399 static void
btree_xlog_vacuum(XLogReaderState * record)400 btree_xlog_vacuum(XLogReaderState *record)
401 {
402 XLogRecPtr lsn = record->EndRecPtr;
403 Buffer buffer;
404 Page page;
405 BTPageOpaque opaque;
406 #ifdef UNUSED
407 xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
408
409 /*
410 * This section of code is thought to be no longer needed, after analysis
411 * of the calling paths. It is retained to allow the code to be reinstated
412 * if a flaw is revealed in that thinking.
413 *
414 * If we are running non-MVCC scans using this index we need to do some
415 * additional work to ensure correctness, which is known as a "pin scan"
416 * described in more detail in next paragraphs. We used to do the extra
417 * work in all cases, whereas we now avoid that work in most cases. If
418 * lastBlockVacuumed is set to InvalidBlockNumber then we skip the
419 * additional work required for the pin scan.
420 *
421 * Avoiding this extra work is important since it requires us to touch
422 * every page in the index, so is an O(N) operation. Worse, it is an
423 * operation performed in the foreground during redo, so it delays
424 * replication directly.
425 *
426 * If queries might be active then we need to ensure every leaf page is
427 * unpinned between the lastBlockVacuumed and the current block, if there
428 * are any. This prevents replay of the VACUUM from reaching the stage of
429 * removing heap tuples while there could still be indexscans "in flight"
430 * to those particular tuples for those scans which could be confused by
431 * finding new tuples at the old TID locations (see nbtree/README).
432 *
433 * It might be worth checking if there are actually any backends running;
434 * if not, we could just skip this.
435 *
436 * Since VACUUM can visit leaf pages out-of-order, it might issue records
437 * with lastBlockVacuumed >= block; that's not an error, it just means
438 * nothing to do now.
439 *
440 * Note: since we touch all pages in the range, we will lock non-leaf
441 * pages, and also any empty (all-zero) pages that may be in the index. It
442 * doesn't seem worth the complexity to avoid that. But it's important
443 * that HotStandbyActiveInReplay() will not return true if the database
444 * isn't yet consistent; so we need not fear reading still-corrupt blocks
445 * here during crash recovery.
446 */
447 if (HotStandbyActiveInReplay() && BlockNumberIsValid(xlrec->lastBlockVacuumed))
448 {
449 RelFileNode thisrnode;
450 BlockNumber thisblkno;
451 BlockNumber blkno;
452
453 XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
454
455 for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
456 {
457 /*
458 * We use RBM_NORMAL_NO_LOG mode because it's not an error
459 * condition to see all-zero pages. The original btvacuumpage
460 * scan would have skipped over all-zero pages, noting them in FSM
461 * but not bothering to initialize them just yet; so we mustn't
462 * throw an error here. (We could skip acquiring the cleanup lock
463 * if PageIsNew, but it's probably not worth the cycles to test.)
464 *
465 * XXX we don't actually need to read the block, we just need to
466 * confirm it is unpinned. If we had a special call into the
467 * buffer manager we could optimise this so that if the block is
468 * not in shared_buffers we confirm it as unpinned. Optimizing
469 * this is now moot, since in most cases we avoid the scan.
470 */
471 buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
472 RBM_NORMAL_NO_LOG);
473 if (BufferIsValid(buffer))
474 {
475 LockBufferForCleanup(buffer);
476 UnlockReleaseBuffer(buffer);
477 }
478 }
479 }
480 #endif
481
482 /*
483 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
484 * page. See nbtree/README for details.
485 */
486 if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
487 == BLK_NEEDS_REDO)
488 {
489 char *ptr;
490 Size len;
491
492 ptr = XLogRecGetBlockData(record, 0, &len);
493
494 page = (Page) BufferGetPage(buffer);
495
496 if (len > 0)
497 {
498 OffsetNumber *unused;
499 OffsetNumber *unend;
500
501 unused = (OffsetNumber *) ptr;
502 unend = (OffsetNumber *) ((char *) ptr + len);
503
504 if ((unend - unused) > 0)
505 PageIndexMultiDelete(page, unused, unend - unused);
506 }
507
508 /*
509 * Mark the page as not containing any LP_DEAD items --- see comments
510 * in _bt_delitems_vacuum().
511 */
512 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
513 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
514
515 PageSetLSN(page, lsn);
516 MarkBufferDirty(buffer);
517 }
518 if (BufferIsValid(buffer))
519 UnlockReleaseBuffer(buffer);
520 }
521
522 /*
523 * Get the latestRemovedXid from the heap pages pointed at by the index
524 * tuples being deleted. This puts the work for calculating latestRemovedXid
525 * into the recovery path rather than the primary path.
526 *
527 * It's possible that this generates a fair amount of I/O, since an index
528 * block may have hundreds of tuples being deleted. Repeat accesses to the
529 * same heap blocks are common, though are not yet optimised.
530 *
531 * XXX optimise later with something like XLogPrefetchBuffer()
532 */
533 static TransactionId
btree_xlog_delete_get_latestRemovedXid(XLogReaderState * record)534 btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
535 {
536 xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
537 OffsetNumber *unused;
538 Buffer ibuffer,
539 hbuffer;
540 Page ipage,
541 hpage;
542 RelFileNode rnode;
543 BlockNumber blkno;
544 ItemId iitemid,
545 hitemid;
546 IndexTuple itup;
547 HeapTupleHeader htuphdr;
548 BlockNumber hblkno;
549 OffsetNumber hoffnum;
550 TransactionId latestRemovedXid = InvalidTransactionId;
551 int i;
552
553 /*
554 * If there's nothing running on the standby we don't need to derive a
555 * full latestRemovedXid value, so use a fast path out of here. This
556 * returns InvalidTransactionId, and so will conflict with all HS
557 * transactions; but since we just worked out that that's zero people,
558 * it's OK.
559 *
560 * XXX There is a race condition here, which is that a new backend might
561 * start just after we look. If so, it cannot need to conflict, but this
562 * coding will result in throwing a conflict anyway.
563 */
564 if (CountDBBackends(InvalidOid) == 0)
565 return latestRemovedXid;
566
567 /*
568 * In what follows, we have to examine the previous state of the index
569 * page, as well as the heap page(s) it points to. This is only valid if
570 * WAL replay has reached a consistent database state; which means that
571 * the preceding check is not just an optimization, but is *necessary*. We
572 * won't have let in any user sessions before we reach consistency.
573 */
574 if (!reachedConsistency)
575 elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
576
577 /*
578 * Get index page. If the DB is consistent, this should not fail, nor
579 * should any of the heap page fetches below. If one does, we return
580 * InvalidTransactionId to cancel all HS transactions. That's probably
581 * overkill, but it's safe, and certainly better than panicking here.
582 */
583 XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
584 ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
585 if (!BufferIsValid(ibuffer))
586 return InvalidTransactionId;
587 LockBuffer(ibuffer, BT_READ);
588 ipage = (Page) BufferGetPage(ibuffer);
589
590 /*
591 * Loop through the deleted index items to obtain the TransactionId from
592 * the heap items they point to.
593 */
594 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
595
596 for (i = 0; i < xlrec->nitems; i++)
597 {
598 /*
599 * Identify the index tuple about to be deleted
600 */
601 iitemid = PageGetItemId(ipage, unused[i]);
602 itup = (IndexTuple) PageGetItem(ipage, iitemid);
603
604 /*
605 * Locate the heap page that the index tuple points at
606 */
607 hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
608 hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL);
609 if (!BufferIsValid(hbuffer))
610 {
611 UnlockReleaseBuffer(ibuffer);
612 return InvalidTransactionId;
613 }
614 LockBuffer(hbuffer, BT_READ);
615 hpage = (Page) BufferGetPage(hbuffer);
616
617 /*
618 * Look up the heap tuple header that the index tuple points at by
619 * using the heap node supplied with the xlrec. We can't use
620 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
621 * Note that we are not looking at tuple data here, just headers.
622 */
623 hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
624 hitemid = PageGetItemId(hpage, hoffnum);
625
626 /*
627 * Follow any redirections until we find something useful.
628 */
629 while (ItemIdIsRedirected(hitemid))
630 {
631 hoffnum = ItemIdGetRedirect(hitemid);
632 hitemid = PageGetItemId(hpage, hoffnum);
633 CHECK_FOR_INTERRUPTS();
634 }
635
636 /*
637 * If the heap item has storage, then read the header and use that to
638 * set latestRemovedXid.
639 *
640 * Some LP_DEAD items may not be accessible, so we ignore them.
641 */
642 if (ItemIdHasStorage(hitemid))
643 {
644 htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
645
646 HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
647 }
648 else if (ItemIdIsDead(hitemid))
649 {
650 /*
651 * Conjecture: if hitemid is dead then it had xids before the xids
652 * marked on LP_NORMAL items. So we just ignore this item and move
653 * onto the next, for the purposes of calculating
654 * latestRemovedxids.
655 */
656 }
657 else
658 Assert(!ItemIdIsUsed(hitemid));
659
660 UnlockReleaseBuffer(hbuffer);
661 }
662
663 UnlockReleaseBuffer(ibuffer);
664
665 /*
666 * If all heap tuples were LP_DEAD then we will be returning
667 * InvalidTransactionId here, which avoids conflicts. This matches
668 * existing logic which assumes that LP_DEAD tuples must already be older
669 * than the latestRemovedXid on the cleanup record that set them as
670 * LP_DEAD, hence must already have generated a conflict.
671 */
672 return latestRemovedXid;
673 }
674
675 static void
btree_xlog_delete(XLogReaderState * record)676 btree_xlog_delete(XLogReaderState *record)
677 {
678 XLogRecPtr lsn = record->EndRecPtr;
679 xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
680 Buffer buffer;
681 Page page;
682 BTPageOpaque opaque;
683
684 /*
685 * If we have any conflict processing to do, it must happen before we
686 * update the page.
687 *
688 * Btree delete records can conflict with standby queries. You might
689 * think that vacuum records would conflict as well, but we've handled
690 * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
691 * cleaned by the vacuum of the heap and so we can resolve any conflicts
692 * just once when that arrives. After that we know that no conflicts
693 * exist from individual btree vacuum records on that index.
694 */
695 if (InHotStandby)
696 {
697 TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
698 RelFileNode rnode;
699
700 XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
701
702 ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
703 }
704
705 /*
706 * We don't need to take a cleanup lock to apply these changes. See
707 * nbtree/README for details.
708 */
709 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
710 {
711 page = (Page) BufferGetPage(buffer);
712
713 if (XLogRecGetDataLen(record) > SizeOfBtreeDelete)
714 {
715 OffsetNumber *unused;
716
717 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
718
719 PageIndexMultiDelete(page, unused, xlrec->nitems);
720 }
721
722 /*
723 * Mark the page as not containing any LP_DEAD items --- see comments
724 * in _bt_delitems_delete().
725 */
726 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
727 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
728
729 PageSetLSN(page, lsn);
730 MarkBufferDirty(buffer);
731 }
732 if (BufferIsValid(buffer))
733 UnlockReleaseBuffer(buffer);
734 }
735
736 static void
btree_xlog_mark_page_halfdead(uint8 info,XLogReaderState * record)737 btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
738 {
739 XLogRecPtr lsn = record->EndRecPtr;
740 xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
741 Buffer buffer;
742 Page page;
743 BTPageOpaque pageop;
744 IndexTupleData trunctuple;
745
746 /*
747 * In normal operation, we would lock all the pages this WAL record
748 * touches before changing any of them. In WAL replay, it should be okay
749 * to lock just one page at a time, since no concurrent index updates can
750 * be happening, and readers should not care whether they arrive at the
751 * target page or not (since it's surely empty).
752 */
753
754 /* parent page */
755 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
756 {
757 OffsetNumber poffset;
758 ItemId itemid;
759 IndexTuple itup;
760 OffsetNumber nextoffset;
761 BlockNumber rightsib;
762
763 page = (Page) BufferGetPage(buffer);
764 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
765
766 poffset = xlrec->poffset;
767
768 nextoffset = OffsetNumberNext(poffset);
769 itemid = PageGetItemId(page, nextoffset);
770 itup = (IndexTuple) PageGetItem(page, itemid);
771 rightsib = BTreeInnerTupleGetDownLink(itup);
772
773 itemid = PageGetItemId(page, poffset);
774 itup = (IndexTuple) PageGetItem(page, itemid);
775 BTreeInnerTupleSetDownLink(itup, rightsib);
776 nextoffset = OffsetNumberNext(poffset);
777 PageIndexTupleDelete(page, nextoffset);
778
779 PageSetLSN(page, lsn);
780 MarkBufferDirty(buffer);
781 }
782 if (BufferIsValid(buffer))
783 UnlockReleaseBuffer(buffer);
784
785 /* Rewrite the leaf page as a halfdead page */
786 buffer = XLogInitBufferForRedo(record, 0);
787 page = (Page) BufferGetPage(buffer);
788
789 _bt_pageinit(page, BufferGetPageSize(buffer));
790 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
791
792 pageop->btpo_prev = xlrec->leftblk;
793 pageop->btpo_next = xlrec->rightblk;
794 pageop->btpo.level = 0;
795 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
796 pageop->btpo_cycleid = 0;
797
798 /*
799 * Construct a dummy hikey item that points to the next parent to be
800 * deleted (if any).
801 */
802 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
803 trunctuple.t_info = sizeof(IndexTupleData);
804 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
805
806 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
807 false, false) == InvalidOffsetNumber)
808 elog(ERROR, "could not add dummy high key to half-dead page");
809
810 PageSetLSN(page, lsn);
811 MarkBufferDirty(buffer);
812 UnlockReleaseBuffer(buffer);
813 }
814
815
816 static void
btree_xlog_unlink_page(uint8 info,XLogReaderState * record)817 btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
818 {
819 XLogRecPtr lsn = record->EndRecPtr;
820 xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
821 BlockNumber leftsib;
822 BlockNumber rightsib;
823 Buffer buffer;
824 Page page;
825 BTPageOpaque pageop;
826
827 leftsib = xlrec->leftsib;
828 rightsib = xlrec->rightsib;
829
830 /*
831 * In normal operation, we would lock all the pages this WAL record
832 * touches before changing any of them. In WAL replay, it should be okay
833 * to lock just one page at a time, since no concurrent index updates can
834 * be happening, and readers should not care whether they arrive at the
835 * target page or not (since it's surely empty).
836 */
837
838 /* Fix left-link of right sibling */
839 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
840 {
841 page = (Page) BufferGetPage(buffer);
842 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
843 pageop->btpo_prev = leftsib;
844
845 PageSetLSN(page, lsn);
846 MarkBufferDirty(buffer);
847 }
848 if (BufferIsValid(buffer))
849 UnlockReleaseBuffer(buffer);
850
851 /* Fix right-link of left sibling, if any */
852 if (leftsib != P_NONE)
853 {
854 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
855 {
856 page = (Page) BufferGetPage(buffer);
857 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
858 pageop->btpo_next = rightsib;
859
860 PageSetLSN(page, lsn);
861 MarkBufferDirty(buffer);
862 }
863 if (BufferIsValid(buffer))
864 UnlockReleaseBuffer(buffer);
865 }
866
867 /* Rewrite target page as empty deleted page */
868 buffer = XLogInitBufferForRedo(record, 0);
869 page = (Page) BufferGetPage(buffer);
870
871 _bt_pageinit(page, BufferGetPageSize(buffer));
872 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
873
874 pageop->btpo_prev = leftsib;
875 pageop->btpo_next = rightsib;
876 pageop->btpo.xact = xlrec->btpo_xact;
877 pageop->btpo_flags = BTP_DELETED;
878 pageop->btpo_cycleid = 0;
879
880 PageSetLSN(page, lsn);
881 MarkBufferDirty(buffer);
882 UnlockReleaseBuffer(buffer);
883
884 /*
885 * If we deleted a parent of the targeted leaf page, instead of the leaf
886 * itself, update the leaf to point to the next remaining child in the
887 * branch.
888 */
889 if (XLogRecHasBlockRef(record, 3))
890 {
891 /*
892 * There is no real data on the page, so we just re-create it from
893 * scratch using the information from the WAL record.
894 */
895 IndexTupleData trunctuple;
896
897 buffer = XLogInitBufferForRedo(record, 3);
898 page = (Page) BufferGetPage(buffer);
899
900 _bt_pageinit(page, BufferGetPageSize(buffer));
901 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
902
903 pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
904 pageop->btpo_prev = xlrec->leafleftsib;
905 pageop->btpo_next = xlrec->leafrightsib;
906 pageop->btpo.level = 0;
907 pageop->btpo_cycleid = 0;
908
909 /* Add a dummy hikey item */
910 MemSet(&trunctuple, 0, sizeof(IndexTupleData));
911 trunctuple.t_info = sizeof(IndexTupleData);
912 BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
913
914 if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
915 false, false) == InvalidOffsetNumber)
916 elog(ERROR, "could not add dummy high key to half-dead page");
917
918 PageSetLSN(page, lsn);
919 MarkBufferDirty(buffer);
920 UnlockReleaseBuffer(buffer);
921 }
922
923 /* Update metapage if needed */
924 if (info == XLOG_BTREE_UNLINK_PAGE_META)
925 _bt_restore_meta(record, 4);
926 }
927
928 static void
btree_xlog_newroot(XLogReaderState * record)929 btree_xlog_newroot(XLogReaderState *record)
930 {
931 XLogRecPtr lsn = record->EndRecPtr;
932 xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
933 Buffer buffer;
934 Page page;
935 BTPageOpaque pageop;
936 char *ptr;
937 Size len;
938
939 buffer = XLogInitBufferForRedo(record, 0);
940 page = (Page) BufferGetPage(buffer);
941
942 _bt_pageinit(page, BufferGetPageSize(buffer));
943 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
944
945 pageop->btpo_flags = BTP_ROOT;
946 pageop->btpo_prev = pageop->btpo_next = P_NONE;
947 pageop->btpo.level = xlrec->level;
948 if (xlrec->level == 0)
949 pageop->btpo_flags |= BTP_LEAF;
950 pageop->btpo_cycleid = 0;
951
952 if (xlrec->level > 0)
953 {
954 ptr = XLogRecGetBlockData(record, 0, &len);
955 _bt_restore_page(page, ptr, len);
956
957 /* Clear the incomplete-split flag in left child */
958 _bt_clear_incomplete_split(record, 1);
959 }
960
961 PageSetLSN(page, lsn);
962 MarkBufferDirty(buffer);
963 UnlockReleaseBuffer(buffer);
964
965 _bt_restore_meta(record, 2);
966 }
967
968 static void
btree_xlog_reuse_page(XLogReaderState * record)969 btree_xlog_reuse_page(XLogReaderState *record)
970 {
971 xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
972
973 /*
974 * Btree reuse_page records exist to provide a conflict point when we
975 * reuse pages in the index via the FSM. That's all they do though.
976 *
977 * latestRemovedXid was the page's btpo.xact. The btpo.xact <
978 * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
979 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
980 * Consequently, one XID value achieves the same exclusion effect on
981 * master and standby.
982 */
983 if (InHotStandby)
984 {
985 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
986 xlrec->node);
987 }
988 }
989
990 void
btree_redo(XLogReaderState * record)991 btree_redo(XLogReaderState *record)
992 {
993 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
994
995 switch (info)
996 {
997 case XLOG_BTREE_INSERT_LEAF:
998 btree_xlog_insert(true, false, record);
999 break;
1000 case XLOG_BTREE_INSERT_UPPER:
1001 btree_xlog_insert(false, false, record);
1002 break;
1003 case XLOG_BTREE_INSERT_META:
1004 btree_xlog_insert(false, true, record);
1005 break;
1006 case XLOG_BTREE_SPLIT_L:
1007 btree_xlog_split(true, false, record);
1008 break;
1009 case XLOG_BTREE_SPLIT_L_HIGHKEY:
1010 btree_xlog_split(true, true, record);
1011 break;
1012 case XLOG_BTREE_SPLIT_R:
1013 btree_xlog_split(false, false, record);
1014 break;
1015 case XLOG_BTREE_SPLIT_R_HIGHKEY:
1016 btree_xlog_split(false, true, record);
1017 break;
1018 case XLOG_BTREE_VACUUM:
1019 btree_xlog_vacuum(record);
1020 break;
1021 case XLOG_BTREE_DELETE:
1022 btree_xlog_delete(record);
1023 break;
1024 case XLOG_BTREE_MARK_PAGE_HALFDEAD:
1025 btree_xlog_mark_page_halfdead(info, record);
1026 break;
1027 case XLOG_BTREE_UNLINK_PAGE:
1028 case XLOG_BTREE_UNLINK_PAGE_META:
1029 btree_xlog_unlink_page(info, record);
1030 break;
1031 case XLOG_BTREE_NEWROOT:
1032 btree_xlog_newroot(record);
1033 break;
1034 case XLOG_BTREE_REUSE_PAGE:
1035 btree_xlog_reuse_page(record);
1036 break;
1037 case XLOG_BTREE_META_CLEANUP:
1038 _bt_restore_meta(record, 0);
1039 break;
1040 default:
1041 elog(PANIC, "btree_redo: unknown op code %u", info);
1042 }
1043 }
1044
1045 /*
1046 * Mask a btree page before performing consistency checks on it.
1047 */
1048 void
btree_mask(char * pagedata,BlockNumber blkno)1049 btree_mask(char *pagedata, BlockNumber blkno)
1050 {
1051 Page page = (Page) pagedata;
1052 BTPageOpaque maskopaq;
1053
1054 mask_page_lsn_and_checksum(page);
1055
1056 mask_page_hint_bits(page);
1057 mask_unused_space(page);
1058
1059 maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
1060
1061 if (P_ISDELETED(maskopaq))
1062 {
1063 /*
1064 * Mask page content on a DELETED page since it will be re-initialized
1065 * during replay. See btree_xlog_unlink_page() for details.
1066 */
1067 mask_page_content(page);
1068 }
1069 else if (P_ISLEAF(maskopaq))
1070 {
1071 /*
1072 * In btree leaf pages, it is possible to modify the LP_FLAGS without
1073 * emitting any WAL record. Hence, mask the line pointer flags. See
1074 * _bt_killitems(), _bt_check_unique() for details.
1075 */
1076 mask_lp_flags(page);
1077 }
1078
1079 /*
1080 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
1081 * _bt_killitems(), _bt_check_unique() for details.
1082 */
1083 maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
1084
1085 /*
1086 * During replay of a btree page split, we don't set the BTP_SPLIT_END
1087 * flag of the right sibling and initialize the cycle_id to 0 for the same
1088 * page. See btree_xlog_split() for details.
1089 */
1090 maskopaq->btpo_flags &= ~BTP_SPLIT_END;
1091 maskopaq->btpo_cycleid = 0;
1092 }
1093