1 /*-------------------------------------------------------------------------
2  *
3  * nbtxlog.c
4  *	  WAL replay logic for btrees.
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/bufmask.h"
18 #include "access/nbtree.h"
19 #include "access/nbtxlog.h"
20 #include "access/transam.h"
21 #include "access/xlog.h"
22 #include "access/xlogutils.h"
23 #include "storage/procarray.h"
24 #include "miscadmin.h"
25 
26 /*
27  * _bt_restore_page -- re-enter all the index tuples on a page
28  *
29  * The page is freshly init'd, and *from (length len) is a copy of what
30  * had been its upper part (pd_upper to pd_special).  We assume that the
31  * tuples had been added to the page in item-number order, and therefore
32  * the one with highest item number appears first (lowest on the page).
33  */
34 static void
_bt_restore_page(Page page,char * from,int len)35 _bt_restore_page(Page page, char *from, int len)
36 {
37 	IndexTupleData itupdata;
38 	Size		itemsz;
39 	char	   *end = from + len;
40 	Item		items[MaxIndexTuplesPerPage];
41 	uint16		itemsizes[MaxIndexTuplesPerPage];
42 	int			i;
43 	int			nitems;
44 
45 	/*
46 	 * To get the items back in the original order, we add them to the page in
47 	 * reverse.  To figure out where one tuple ends and another begins, we
48 	 * have to scan them in forward order first.
49 	 */
50 	i = 0;
51 	while (from < end)
52 	{
53 		/*
54 		 * As we step through the items, 'from' won't always be properly
55 		 * aligned, so we need to use memcpy().  Further, we use Item (which
56 		 * is just a char*) here for our items array for the same reason;
57 		 * wouldn't want the compiler or anyone thinking that an item is
58 		 * aligned when it isn't.
59 		 */
60 		memcpy(&itupdata, from, sizeof(IndexTupleData));
61 		itemsz = IndexTupleSize(&itupdata);
62 		itemsz = MAXALIGN(itemsz);
63 
64 		items[i] = (Item) from;
65 		itemsizes[i] = itemsz;
66 		i++;
67 
68 		from += itemsz;
69 	}
70 	nitems = i;
71 
72 	for (i = nitems - 1; i >= 0; i--)
73 	{
74 		if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
75 						false, false) == InvalidOffsetNumber)
76 			elog(PANIC, "_bt_restore_page: cannot add item to page");
77 	}
78 }
79 
80 static void
_bt_restore_meta(XLogReaderState * record,uint8 block_id)81 _bt_restore_meta(XLogReaderState *record, uint8 block_id)
82 {
83 	XLogRecPtr	lsn = record->EndRecPtr;
84 	Buffer		metabuf;
85 	Page		metapg;
86 	BTMetaPageData *md;
87 	BTPageOpaque pageop;
88 	xl_btree_metadata *xlrec;
89 	char	   *ptr;
90 	Size		len;
91 
92 	metabuf = XLogInitBufferForRedo(record, block_id);
93 	ptr = XLogRecGetBlockData(record, block_id, &len);
94 
95 	Assert(len == sizeof(xl_btree_metadata));
96 	Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
97 	xlrec = (xl_btree_metadata *) ptr;
98 	metapg = BufferGetPage(metabuf);
99 
100 	_bt_pageinit(metapg, BufferGetPageSize(metabuf));
101 
102 	md = BTPageGetMeta(metapg);
103 	md->btm_magic = BTREE_MAGIC;
104 	md->btm_version = xlrec->version;
105 	md->btm_root = xlrec->root;
106 	md->btm_level = xlrec->level;
107 	md->btm_fastroot = xlrec->fastroot;
108 	md->btm_fastlevel = xlrec->fastlevel;
109 	/* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
110 	Assert(md->btm_version >= BTREE_NOVAC_VERSION);
111 	md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
112 	md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
113 
114 	pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
115 	pageop->btpo_flags = BTP_META;
116 
117 	/*
118 	 * Set pd_lower just past the end of the metadata.  This is essential,
119 	 * because without doing so, metadata will be lost if xlog.c compresses
120 	 * the page.
121 	 */
122 	((PageHeader) metapg)->pd_lower =
123 		((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
124 
125 	PageSetLSN(metapg, lsn);
126 	MarkBufferDirty(metabuf);
127 	UnlockReleaseBuffer(metabuf);
128 }
129 
130 /*
131  * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
132  *
133  * This is a common subroutine of the redo functions of all the WAL record
134  * types that can insert a downlink: insert, split, and newroot.
135  */
136 static void
_bt_clear_incomplete_split(XLogReaderState * record,uint8 block_id)137 _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
138 {
139 	XLogRecPtr	lsn = record->EndRecPtr;
140 	Buffer		buf;
141 
142 	if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
143 	{
144 		Page		page = (Page) BufferGetPage(buf);
145 		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
146 
147 		Assert(P_INCOMPLETE_SPLIT(pageop));
148 		pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
149 
150 		PageSetLSN(page, lsn);
151 		MarkBufferDirty(buf);
152 	}
153 	if (BufferIsValid(buf))
154 		UnlockReleaseBuffer(buf);
155 }
156 
157 static void
btree_xlog_insert(bool isleaf,bool ismeta,XLogReaderState * record)158 btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
159 {
160 	XLogRecPtr	lsn = record->EndRecPtr;
161 	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
162 	Buffer		buffer;
163 	Page		page;
164 
165 	/*
166 	 * Insertion to an internal page finishes an incomplete split at the child
167 	 * level.  Clear the incomplete-split flag in the child.  Note: during
168 	 * normal operation, the child and parent pages are locked at the same
169 	 * time, so that clearing the flag and inserting the downlink appear
170 	 * atomic to other backends.  We don't bother with that during replay,
171 	 * because readers don't care about the incomplete-split flag and there
172 	 * cannot be updates happening.
173 	 */
174 	if (!isleaf)
175 		_bt_clear_incomplete_split(record, 1);
176 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
177 	{
178 		Size		datalen;
179 		char	   *datapos = XLogRecGetBlockData(record, 0, &datalen);
180 
181 		page = BufferGetPage(buffer);
182 
183 		if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
184 						false, false) == InvalidOffsetNumber)
185 			elog(PANIC, "btree_xlog_insert: failed to add item");
186 
187 		PageSetLSN(page, lsn);
188 		MarkBufferDirty(buffer);
189 	}
190 	if (BufferIsValid(buffer))
191 		UnlockReleaseBuffer(buffer);
192 
193 	/*
194 	 * Note: in normal operation, we'd update the metapage while still holding
195 	 * lock on the page we inserted into.  But during replay it's not
196 	 * necessary to hold that lock, since no other index updates can be
197 	 * happening concurrently, and readers will cope fine with following an
198 	 * obsolete link from the metapage.
199 	 */
200 	if (ismeta)
201 		_bt_restore_meta(record, 2);
202 }
203 
204 static void
btree_xlog_split(bool onleft,XLogReaderState * record)205 btree_xlog_split(bool onleft, XLogReaderState *record)
206 {
207 	XLogRecPtr	lsn = record->EndRecPtr;
208 	xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
209 	bool		isleaf = (xlrec->level == 0);
210 	Buffer		lbuf;
211 	Buffer		rbuf;
212 	Page		rpage;
213 	BTPageOpaque ropaque;
214 	char	   *datapos;
215 	Size		datalen;
216 	BlockNumber leftsib;
217 	BlockNumber rightsib;
218 	BlockNumber rnext;
219 
220 	XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
221 	XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
222 	if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
223 		rnext = P_NONE;
224 
225 	/*
226 	 * Clear the incomplete split flag on the left sibling of the child page
227 	 * this is a downlink for.  (Like in btree_xlog_insert, this can be done
228 	 * before locking the other pages)
229 	 */
230 	if (!isleaf)
231 		_bt_clear_incomplete_split(record, 3);
232 
233 	/* Reconstruct right (new) sibling page from scratch */
234 	rbuf = XLogInitBufferForRedo(record, 1);
235 	datapos = XLogRecGetBlockData(record, 1, &datalen);
236 	rpage = (Page) BufferGetPage(rbuf);
237 
238 	_bt_pageinit(rpage, BufferGetPageSize(rbuf));
239 	ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
240 
241 	ropaque->btpo_prev = leftsib;
242 	ropaque->btpo_next = rnext;
243 	ropaque->btpo.level = xlrec->level;
244 	ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
245 	ropaque->btpo_cycleid = 0;
246 
247 	_bt_restore_page(rpage, datapos, datalen);
248 
249 	PageSetLSN(rpage, lsn);
250 	MarkBufferDirty(rbuf);
251 
252 	/* Now reconstruct left (original) sibling page */
253 	if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
254 	{
255 		/*
256 		 * To retain the same physical order of the tuples that they had, we
257 		 * initialize a temporary empty page for the left page and add all the
258 		 * items to that in item number order.  This mirrors how _bt_split()
259 		 * works.  Retaining the same physical order makes WAL consistency
260 		 * checking possible.  See also _bt_restore_page(), which does the
261 		 * same for the right page.
262 		 */
263 		Page		lpage = (Page) BufferGetPage(lbuf);
264 		BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
265 		OffsetNumber off;
266 		IndexTuple	newitem = NULL,
267 					left_hikey = NULL;
268 		Size		newitemsz = 0,
269 					left_hikeysz = 0;
270 		Page		newlpage;
271 		OffsetNumber leftoff;
272 
273 		datapos = XLogRecGetBlockData(record, 0, &datalen);
274 
275 		if (onleft)
276 		{
277 			newitem = (IndexTuple) datapos;
278 			newitemsz = MAXALIGN(IndexTupleSize(newitem));
279 			datapos += newitemsz;
280 			datalen -= newitemsz;
281 		}
282 
283 		/* Extract left hikey and its size (assuming 16-bit alignment) */
284 		left_hikey = (IndexTuple) datapos;
285 		left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
286 		datapos += left_hikeysz;
287 		datalen -= left_hikeysz;
288 
289 		Assert(datalen == 0);
290 
291 		newlpage = PageGetTempPageCopySpecial(lpage);
292 
293 		/* Set high key */
294 		leftoff = P_HIKEY;
295 		if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz,
296 						P_HIKEY, false, false) == InvalidOffsetNumber)
297 			elog(PANIC, "failed to add high key to left page after split");
298 		leftoff = OffsetNumberNext(leftoff);
299 
300 		for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
301 		{
302 			ItemId		itemid;
303 			Size		itemsz;
304 			IndexTuple	item;
305 
306 			/* add the new item if it was inserted on left page */
307 			if (onleft && off == xlrec->newitemoff)
308 			{
309 				if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
310 								false, false) == InvalidOffsetNumber)
311 					elog(ERROR, "failed to add new item to left page after split");
312 				leftoff = OffsetNumberNext(leftoff);
313 			}
314 
315 			itemid = PageGetItemId(lpage, off);
316 			itemsz = ItemIdGetLength(itemid);
317 			item = (IndexTuple) PageGetItem(lpage, itemid);
318 			if (PageAddItem(newlpage, (Item) item, itemsz, leftoff,
319 							false, false) == InvalidOffsetNumber)
320 				elog(ERROR, "failed to add old item to left page after split");
321 			leftoff = OffsetNumberNext(leftoff);
322 		}
323 
324 		/* cope with possibility that newitem goes at the end */
325 		if (onleft && off == xlrec->newitemoff)
326 		{
327 			if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
328 							false, false) == InvalidOffsetNumber)
329 				elog(ERROR, "failed to add new item to left page after split");
330 			leftoff = OffsetNumberNext(leftoff);
331 		}
332 
333 		PageRestoreTempPage(newlpage, lpage);
334 
335 		/* Fix opaque fields */
336 		lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
337 		if (isleaf)
338 			lopaque->btpo_flags |= BTP_LEAF;
339 		lopaque->btpo_next = rightsib;
340 		lopaque->btpo_cycleid = 0;
341 
342 		PageSetLSN(lpage, lsn);
343 		MarkBufferDirty(lbuf);
344 	}
345 
346 	/*
347 	 * We no longer need the buffers.  They must be released together, so that
348 	 * readers cannot observe two inconsistent halves.
349 	 */
350 	if (BufferIsValid(lbuf))
351 		UnlockReleaseBuffer(lbuf);
352 	UnlockReleaseBuffer(rbuf);
353 
354 	/*
355 	 * Fix left-link of the page to the right of the new right sibling.
356 	 *
357 	 * Note: in normal operation, we do this while still holding lock on the
358 	 * two split pages.  However, that's not necessary for correctness in WAL
359 	 * replay, because no other index update can be in progress, and readers
360 	 * will cope properly when following an obsolete left-link.
361 	 */
362 	if (rnext != P_NONE)
363 	{
364 		Buffer		buffer;
365 
366 		if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
367 		{
368 			Page		page = (Page) BufferGetPage(buffer);
369 			BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
370 
371 			pageop->btpo_prev = rightsib;
372 
373 			PageSetLSN(page, lsn);
374 			MarkBufferDirty(buffer);
375 		}
376 		if (BufferIsValid(buffer))
377 			UnlockReleaseBuffer(buffer);
378 	}
379 }
380 
381 static void
btree_xlog_vacuum(XLogReaderState * record)382 btree_xlog_vacuum(XLogReaderState *record)
383 {
384 	XLogRecPtr	lsn = record->EndRecPtr;
385 	Buffer		buffer;
386 	Page		page;
387 	BTPageOpaque opaque;
388 #ifdef UNUSED
389 	xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
390 
391 	/*
392 	 * This section of code is thought to be no longer needed, after analysis
393 	 * of the calling paths. It is retained to allow the code to be reinstated
394 	 * if a flaw is revealed in that thinking.
395 	 *
396 	 * If we are running non-MVCC scans using this index we need to do some
397 	 * additional work to ensure correctness, which is known as a "pin scan"
398 	 * described in more detail in next paragraphs. We used to do the extra
399 	 * work in all cases, whereas we now avoid that work in most cases. If
400 	 * lastBlockVacuumed is set to InvalidBlockNumber then we skip the
401 	 * additional work required for the pin scan.
402 	 *
403 	 * Avoiding this extra work is important since it requires us to touch
404 	 * every page in the index, so is an O(N) operation. Worse, it is an
405 	 * operation performed in the foreground during redo, so it delays
406 	 * replication directly.
407 	 *
408 	 * If queries might be active then we need to ensure every leaf page is
409 	 * unpinned between the lastBlockVacuumed and the current block, if there
410 	 * are any.  This prevents replay of the VACUUM from reaching the stage of
411 	 * removing heap tuples while there could still be indexscans "in flight"
412 	 * to those particular tuples for those scans which could be confused by
413 	 * finding new tuples at the old TID locations (see nbtree/README).
414 	 *
415 	 * It might be worth checking if there are actually any backends running;
416 	 * if not, we could just skip this.
417 	 *
418 	 * Since VACUUM can visit leaf pages out-of-order, it might issue records
419 	 * with lastBlockVacuumed >= block; that's not an error, it just means
420 	 * nothing to do now.
421 	 *
422 	 * Note: since we touch all pages in the range, we will lock non-leaf
423 	 * pages, and also any empty (all-zero) pages that may be in the index. It
424 	 * doesn't seem worth the complexity to avoid that.  But it's important
425 	 * that HotStandbyActiveInReplay() will not return true if the database
426 	 * isn't yet consistent; so we need not fear reading still-corrupt blocks
427 	 * here during crash recovery.
428 	 */
429 	if (HotStandbyActiveInReplay() && BlockNumberIsValid(xlrec->lastBlockVacuumed))
430 	{
431 		RelFileNode thisrnode;
432 		BlockNumber thisblkno;
433 		BlockNumber blkno;
434 
435 		XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
436 
437 		for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
438 		{
439 			/*
440 			 * We use RBM_NORMAL_NO_LOG mode because it's not an error
441 			 * condition to see all-zero pages.  The original btvacuumpage
442 			 * scan would have skipped over all-zero pages, noting them in FSM
443 			 * but not bothering to initialize them just yet; so we mustn't
444 			 * throw an error here.  (We could skip acquiring the cleanup lock
445 			 * if PageIsNew, but it's probably not worth the cycles to test.)
446 			 *
447 			 * XXX we don't actually need to read the block, we just need to
448 			 * confirm it is unpinned. If we had a special call into the
449 			 * buffer manager we could optimise this so that if the block is
450 			 * not in shared_buffers we confirm it as unpinned. Optimizing
451 			 * this is now moot, since in most cases we avoid the scan.
452 			 */
453 			buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
454 											RBM_NORMAL_NO_LOG);
455 			if (BufferIsValid(buffer))
456 			{
457 				LockBufferForCleanup(buffer);
458 				UnlockReleaseBuffer(buffer);
459 			}
460 		}
461 	}
462 #endif
463 
464 	/*
465 	 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
466 	 * page. See nbtree/README for details.
467 	 */
468 	if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
469 		== BLK_NEEDS_REDO)
470 	{
471 		char	   *ptr;
472 		Size		len;
473 
474 		ptr = XLogRecGetBlockData(record, 0, &len);
475 
476 		page = (Page) BufferGetPage(buffer);
477 
478 		if (len > 0)
479 		{
480 			OffsetNumber *unused;
481 			OffsetNumber *unend;
482 
483 			unused = (OffsetNumber *) ptr;
484 			unend = (OffsetNumber *) ((char *) ptr + len);
485 
486 			if ((unend - unused) > 0)
487 				PageIndexMultiDelete(page, unused, unend - unused);
488 		}
489 
490 		/*
491 		 * Mark the page as not containing any LP_DEAD items --- see comments
492 		 * in _bt_delitems_vacuum().
493 		 */
494 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
495 		opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
496 
497 		PageSetLSN(page, lsn);
498 		MarkBufferDirty(buffer);
499 	}
500 	if (BufferIsValid(buffer))
501 		UnlockReleaseBuffer(buffer);
502 }
503 
504 static void
btree_xlog_delete(XLogReaderState * record)505 btree_xlog_delete(XLogReaderState *record)
506 {
507 	XLogRecPtr	lsn = record->EndRecPtr;
508 	xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
509 	Buffer		buffer;
510 	Page		page;
511 	BTPageOpaque opaque;
512 
513 	/*
514 	 * If we have any conflict processing to do, it must happen before we
515 	 * update the page.
516 	 *
517 	 * Btree delete records can conflict with standby queries.  You might
518 	 * think that vacuum records would conflict as well, but we've handled
519 	 * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
520 	 * cleaned by the vacuum of the heap and so we can resolve any conflicts
521 	 * just once when that arrives.  After that we know that no conflicts
522 	 * exist from individual btree vacuum records on that index.
523 	 */
524 	if (InHotStandby)
525 	{
526 		RelFileNode rnode;
527 
528 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
529 
530 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
531 	}
532 
533 	/*
534 	 * We don't need to take a cleanup lock to apply these changes. See
535 	 * nbtree/README for details.
536 	 */
537 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
538 	{
539 		page = (Page) BufferGetPage(buffer);
540 
541 		if (XLogRecGetDataLen(record) > SizeOfBtreeDelete)
542 		{
543 			OffsetNumber *unused;
544 
545 			unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
546 
547 			PageIndexMultiDelete(page, unused, xlrec->nitems);
548 		}
549 
550 		/*
551 		 * Mark the page as not containing any LP_DEAD items --- see comments
552 		 * in _bt_delitems_delete().
553 		 */
554 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
555 		opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
556 
557 		PageSetLSN(page, lsn);
558 		MarkBufferDirty(buffer);
559 	}
560 	if (BufferIsValid(buffer))
561 		UnlockReleaseBuffer(buffer);
562 }
563 
564 static void
btree_xlog_mark_page_halfdead(uint8 info,XLogReaderState * record)565 btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
566 {
567 	XLogRecPtr	lsn = record->EndRecPtr;
568 	xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
569 	Buffer		buffer;
570 	Page		page;
571 	BTPageOpaque pageop;
572 	IndexTupleData trunctuple;
573 
574 	/*
575 	 * In normal operation, we would lock all the pages this WAL record
576 	 * touches before changing any of them.  In WAL replay, it should be okay
577 	 * to lock just one page at a time, since no concurrent index updates can
578 	 * be happening, and readers should not care whether they arrive at the
579 	 * target page or not (since it's surely empty).
580 	 */
581 
582 	/* parent page */
583 	if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
584 	{
585 		OffsetNumber poffset;
586 		ItemId		itemid;
587 		IndexTuple	itup;
588 		OffsetNumber nextoffset;
589 		BlockNumber rightsib;
590 
591 		page = (Page) BufferGetPage(buffer);
592 		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
593 
594 		poffset = xlrec->poffset;
595 
596 		nextoffset = OffsetNumberNext(poffset);
597 		itemid = PageGetItemId(page, nextoffset);
598 		itup = (IndexTuple) PageGetItem(page, itemid);
599 		rightsib = BTreeInnerTupleGetDownLink(itup);
600 
601 		itemid = PageGetItemId(page, poffset);
602 		itup = (IndexTuple) PageGetItem(page, itemid);
603 		BTreeInnerTupleSetDownLink(itup, rightsib);
604 		nextoffset = OffsetNumberNext(poffset);
605 		PageIndexTupleDelete(page, nextoffset);
606 
607 		PageSetLSN(page, lsn);
608 		MarkBufferDirty(buffer);
609 	}
610 	if (BufferIsValid(buffer))
611 		UnlockReleaseBuffer(buffer);
612 
613 	/* Rewrite the leaf page as a halfdead page */
614 	buffer = XLogInitBufferForRedo(record, 0);
615 	page = (Page) BufferGetPage(buffer);
616 
617 	_bt_pageinit(page, BufferGetPageSize(buffer));
618 	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
619 
620 	pageop->btpo_prev = xlrec->leftblk;
621 	pageop->btpo_next = xlrec->rightblk;
622 	pageop->btpo.level = 0;
623 	pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
624 	pageop->btpo_cycleid = 0;
625 
626 	/*
627 	 * Construct a dummy hikey item that points to the next parent to be
628 	 * deleted (if any).
629 	 */
630 	MemSet(&trunctuple, 0, sizeof(IndexTupleData));
631 	trunctuple.t_info = sizeof(IndexTupleData);
632 	BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
633 
634 	if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
635 					false, false) == InvalidOffsetNumber)
636 		elog(ERROR, "could not add dummy high key to half-dead page");
637 
638 	PageSetLSN(page, lsn);
639 	MarkBufferDirty(buffer);
640 	UnlockReleaseBuffer(buffer);
641 }
642 
643 
644 static void
btree_xlog_unlink_page(uint8 info,XLogReaderState * record)645 btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
646 {
647 	XLogRecPtr	lsn = record->EndRecPtr;
648 	xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
649 	BlockNumber leftsib;
650 	BlockNumber rightsib;
651 	Buffer		buffer;
652 	Page		page;
653 	BTPageOpaque pageop;
654 
655 	leftsib = xlrec->leftsib;
656 	rightsib = xlrec->rightsib;
657 
658 	/*
659 	 * In normal operation, we would lock all the pages this WAL record
660 	 * touches before changing any of them.  In WAL replay, it should be okay
661 	 * to lock just one page at a time, since no concurrent index updates can
662 	 * be happening, and readers should not care whether they arrive at the
663 	 * target page or not (since it's surely empty).
664 	 */
665 
666 	/* Fix left-link of right sibling */
667 	if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
668 	{
669 		page = (Page) BufferGetPage(buffer);
670 		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
671 		pageop->btpo_prev = leftsib;
672 
673 		PageSetLSN(page, lsn);
674 		MarkBufferDirty(buffer);
675 	}
676 	if (BufferIsValid(buffer))
677 		UnlockReleaseBuffer(buffer);
678 
679 	/* Fix right-link of left sibling, if any */
680 	if (leftsib != P_NONE)
681 	{
682 		if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
683 		{
684 			page = (Page) BufferGetPage(buffer);
685 			pageop = (BTPageOpaque) PageGetSpecialPointer(page);
686 			pageop->btpo_next = rightsib;
687 
688 			PageSetLSN(page, lsn);
689 			MarkBufferDirty(buffer);
690 		}
691 		if (BufferIsValid(buffer))
692 			UnlockReleaseBuffer(buffer);
693 	}
694 
695 	/* Rewrite target page as empty deleted page */
696 	buffer = XLogInitBufferForRedo(record, 0);
697 	page = (Page) BufferGetPage(buffer);
698 
699 	_bt_pageinit(page, BufferGetPageSize(buffer));
700 	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
701 
702 	pageop->btpo_prev = leftsib;
703 	pageop->btpo_next = rightsib;
704 	pageop->btpo.xact = xlrec->btpo_xact;
705 	pageop->btpo_flags = BTP_DELETED;
706 	pageop->btpo_cycleid = 0;
707 
708 	PageSetLSN(page, lsn);
709 	MarkBufferDirty(buffer);
710 	UnlockReleaseBuffer(buffer);
711 
712 	/*
713 	 * If we deleted a parent of the targeted leaf page, instead of the leaf
714 	 * itself, update the leaf to point to the next remaining child in the
715 	 * branch.
716 	 */
717 	if (XLogRecHasBlockRef(record, 3))
718 	{
719 		/*
720 		 * There is no real data on the page, so we just re-create it from
721 		 * scratch using the information from the WAL record.
722 		 */
723 		IndexTupleData trunctuple;
724 
725 		buffer = XLogInitBufferForRedo(record, 3);
726 		page = (Page) BufferGetPage(buffer);
727 
728 		_bt_pageinit(page, BufferGetPageSize(buffer));
729 		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
730 
731 		pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
732 		pageop->btpo_prev = xlrec->leafleftsib;
733 		pageop->btpo_next = xlrec->leafrightsib;
734 		pageop->btpo.level = 0;
735 		pageop->btpo_cycleid = 0;
736 
737 		/* Add a dummy hikey item */
738 		MemSet(&trunctuple, 0, sizeof(IndexTupleData));
739 		trunctuple.t_info = sizeof(IndexTupleData);
740 		BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
741 
742 		if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
743 						false, false) == InvalidOffsetNumber)
744 			elog(ERROR, "could not add dummy high key to half-dead page");
745 
746 		PageSetLSN(page, lsn);
747 		MarkBufferDirty(buffer);
748 		UnlockReleaseBuffer(buffer);
749 	}
750 
751 	/* Update metapage if needed */
752 	if (info == XLOG_BTREE_UNLINK_PAGE_META)
753 		_bt_restore_meta(record, 4);
754 }
755 
756 static void
btree_xlog_newroot(XLogReaderState * record)757 btree_xlog_newroot(XLogReaderState *record)
758 {
759 	XLogRecPtr	lsn = record->EndRecPtr;
760 	xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
761 	Buffer		buffer;
762 	Page		page;
763 	BTPageOpaque pageop;
764 	char	   *ptr;
765 	Size		len;
766 
767 	buffer = XLogInitBufferForRedo(record, 0);
768 	page = (Page) BufferGetPage(buffer);
769 
770 	_bt_pageinit(page, BufferGetPageSize(buffer));
771 	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
772 
773 	pageop->btpo_flags = BTP_ROOT;
774 	pageop->btpo_prev = pageop->btpo_next = P_NONE;
775 	pageop->btpo.level = xlrec->level;
776 	if (xlrec->level == 0)
777 		pageop->btpo_flags |= BTP_LEAF;
778 	pageop->btpo_cycleid = 0;
779 
780 	if (xlrec->level > 0)
781 	{
782 		ptr = XLogRecGetBlockData(record, 0, &len);
783 		_bt_restore_page(page, ptr, len);
784 
785 		/* Clear the incomplete-split flag in left child */
786 		_bt_clear_incomplete_split(record, 1);
787 	}
788 
789 	PageSetLSN(page, lsn);
790 	MarkBufferDirty(buffer);
791 	UnlockReleaseBuffer(buffer);
792 
793 	_bt_restore_meta(record, 2);
794 }
795 
796 static void
btree_xlog_reuse_page(XLogReaderState * record)797 btree_xlog_reuse_page(XLogReaderState *record)
798 {
799 	xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
800 
801 	/*
802 	 * Btree reuse_page records exist to provide a conflict point when we
803 	 * reuse pages in the index via the FSM.  That's all they do though.
804 	 *
805 	 * latestRemovedXid was the page's btpo.xact.  The btpo.xact <
806 	 * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
807 	 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
808 	 * Consequently, one XID value achieves the same exclusion effect on
809 	 * master and standby.
810 	 */
811 	if (InHotStandby)
812 	{
813 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
814 											xlrec->node);
815 	}
816 }
817 
818 void
btree_redo(XLogReaderState * record)819 btree_redo(XLogReaderState *record)
820 {
821 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
822 
823 	switch (info)
824 	{
825 		case XLOG_BTREE_INSERT_LEAF:
826 			btree_xlog_insert(true, false, record);
827 			break;
828 		case XLOG_BTREE_INSERT_UPPER:
829 			btree_xlog_insert(false, false, record);
830 			break;
831 		case XLOG_BTREE_INSERT_META:
832 			btree_xlog_insert(false, true, record);
833 			break;
834 		case XLOG_BTREE_SPLIT_L:
835 			btree_xlog_split(true, record);
836 			break;
837 		case XLOG_BTREE_SPLIT_R:
838 			btree_xlog_split(false, record);
839 			break;
840 		case XLOG_BTREE_VACUUM:
841 			btree_xlog_vacuum(record);
842 			break;
843 		case XLOG_BTREE_DELETE:
844 			btree_xlog_delete(record);
845 			break;
846 		case XLOG_BTREE_MARK_PAGE_HALFDEAD:
847 			btree_xlog_mark_page_halfdead(info, record);
848 			break;
849 		case XLOG_BTREE_UNLINK_PAGE:
850 		case XLOG_BTREE_UNLINK_PAGE_META:
851 			btree_xlog_unlink_page(info, record);
852 			break;
853 		case XLOG_BTREE_NEWROOT:
854 			btree_xlog_newroot(record);
855 			break;
856 		case XLOG_BTREE_REUSE_PAGE:
857 			btree_xlog_reuse_page(record);
858 			break;
859 		case XLOG_BTREE_META_CLEANUP:
860 			_bt_restore_meta(record, 0);
861 			break;
862 		default:
863 			elog(PANIC, "btree_redo: unknown op code %u", info);
864 	}
865 }
866 
867 /*
868  * Mask a btree page before performing consistency checks on it.
869  */
870 void
btree_mask(char * pagedata,BlockNumber blkno)871 btree_mask(char *pagedata, BlockNumber blkno)
872 {
873 	Page		page = (Page) pagedata;
874 	BTPageOpaque maskopaq;
875 
876 	mask_page_lsn_and_checksum(page);
877 
878 	mask_page_hint_bits(page);
879 	mask_unused_space(page);
880 
881 	maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
882 
883 	if (P_ISDELETED(maskopaq))
884 	{
885 		/*
886 		 * Mask page content on a DELETED page since it will be re-initialized
887 		 * during replay. See btree_xlog_unlink_page() for details.
888 		 */
889 		mask_page_content(page);
890 	}
891 	else if (P_ISLEAF(maskopaq))
892 	{
893 		/*
894 		 * In btree leaf pages, it is possible to modify the LP_FLAGS without
895 		 * emitting any WAL record. Hence, mask the line pointer flags. See
896 		 * _bt_killitems(), _bt_check_unique() for details.
897 		 */
898 		mask_lp_flags(page);
899 	}
900 
901 	/*
902 	 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
903 	 * _bt_killitems(), _bt_check_unique() for details.
904 	 */
905 	maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
906 
907 	/*
908 	 * During replay of a btree page split, we don't set the BTP_SPLIT_END
909 	 * flag of the right sibling and initialize the cycle_id to 0 for the same
910 	 * page. See btree_xlog_split() for details.
911 	 */
912 	maskopaq->btpo_flags &= ~BTP_SPLIT_END;
913 	maskopaq->btpo_cycleid = 0;
914 }
915