1 /*-------------------------------------------------------------------------
2  *
3  * nbtxlog.c
4  *	  WAL replay logic for btrees.
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/bufmask.h"
18 #include "access/heapam_xlog.h"
19 #include "access/nbtree.h"
20 #include "access/nbtxlog.h"
21 #include "access/transam.h"
22 #include "access/xlog.h"
23 #include "access/xlogutils.h"
24 #include "storage/procarray.h"
25 #include "miscadmin.h"
26 
27 /*
28  * _bt_restore_page -- re-enter all the index tuples on a page
29  *
30  * The page is freshly init'd, and *from (length len) is a copy of what
31  * had been its upper part (pd_upper to pd_special).  We assume that the
32  * tuples had been added to the page in item-number order, and therefore
33  * the one with highest item number appears first (lowest on the page).
34  */
35 static void
_bt_restore_page(Page page,char * from,int len)36 _bt_restore_page(Page page, char *from, int len)
37 {
38 	IndexTupleData itupdata;
39 	Size		itemsz;
40 	char	   *end = from + len;
41 	Item		items[MaxIndexTuplesPerPage];
42 	uint16		itemsizes[MaxIndexTuplesPerPage];
43 	int			i;
44 	int			nitems;
45 
46 	/*
47 	 * To get the items back in the original order, we add them to the page in
48 	 * reverse.  To figure out where one tuple ends and another begins, we
49 	 * have to scan them in forward order first.
50 	 */
51 	i = 0;
52 	while (from < end)
53 	{
54 		/*
55 		 * As we step through the items, 'from' won't always be properly
56 		 * aligned, so we need to use memcpy().  Further, we use Item (which
57 		 * is just a char*) here for our items array for the same reason;
58 		 * wouldn't want the compiler or anyone thinking that an item is
59 		 * aligned when it isn't.
60 		 */
61 		memcpy(&itupdata, from, sizeof(IndexTupleData));
62 		itemsz = IndexTupleSize(&itupdata);
63 		itemsz = MAXALIGN(itemsz);
64 
65 		items[i] = (Item) from;
66 		itemsizes[i] = itemsz;
67 		i++;
68 
69 		from += itemsz;
70 	}
71 	nitems = i;
72 
73 	for (i = nitems - 1; i >= 0; i--)
74 	{
75 		if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
76 						false, false) == InvalidOffsetNumber)
77 			elog(PANIC, "_bt_restore_page: cannot add item to page");
78 	}
79 }
80 
81 static void
_bt_restore_meta(XLogReaderState * record,uint8 block_id)82 _bt_restore_meta(XLogReaderState *record, uint8 block_id)
83 {
84 	XLogRecPtr	lsn = record->EndRecPtr;
85 	Buffer		metabuf;
86 	Page		metapg;
87 	BTMetaPageData *md;
88 	BTPageOpaque pageop;
89 	xl_btree_metadata *xlrec;
90 	char	   *ptr;
91 	Size		len;
92 
93 	metabuf = XLogInitBufferForRedo(record, block_id);
94 	ptr = XLogRecGetBlockData(record, block_id, &len);
95 
96 	Assert(len == sizeof(xl_btree_metadata));
97 	Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
98 	xlrec = (xl_btree_metadata *) ptr;
99 	metapg = BufferGetPage(metabuf);
100 
101 	_bt_pageinit(metapg, BufferGetPageSize(metabuf));
102 
103 	md = BTPageGetMeta(metapg);
104 	md->btm_magic = BTREE_MAGIC;
105 	md->btm_version = BTREE_VERSION;
106 	md->btm_root = xlrec->root;
107 	md->btm_level = xlrec->level;
108 	md->btm_fastroot = xlrec->fastroot;
109 	md->btm_fastlevel = xlrec->fastlevel;
110 	/* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
111 	Assert(md->btm_version == BTREE_VERSION);
112 	md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
113 	md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
114 
115 	pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
116 	pageop->btpo_flags = BTP_META;
117 
118 	/*
119 	 * Set pd_lower just past the end of the metadata.  This is essential,
120 	 * because without doing so, metadata will be lost if xlog.c compresses
121 	 * the page.
122 	 */
123 	((PageHeader) metapg)->pd_lower =
124 		((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
125 
126 	PageSetLSN(metapg, lsn);
127 	MarkBufferDirty(metabuf);
128 	UnlockReleaseBuffer(metabuf);
129 }
130 
131 /*
132  * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
133  *
134  * This is a common subroutine of the redo functions of all the WAL record
135  * types that can insert a downlink: insert, split, and newroot.
136  */
137 static void
_bt_clear_incomplete_split(XLogReaderState * record,uint8 block_id)138 _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
139 {
140 	XLogRecPtr	lsn = record->EndRecPtr;
141 	Buffer		buf;
142 
143 	if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
144 	{
145 		Page		page = (Page) BufferGetPage(buf);
146 		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
147 
148 		Assert(P_INCOMPLETE_SPLIT(pageop));
149 		pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
150 
151 		PageSetLSN(page, lsn);
152 		MarkBufferDirty(buf);
153 	}
154 	if (BufferIsValid(buf))
155 		UnlockReleaseBuffer(buf);
156 }
157 
158 static void
btree_xlog_insert(bool isleaf,bool ismeta,XLogReaderState * record)159 btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
160 {
161 	XLogRecPtr	lsn = record->EndRecPtr;
162 	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
163 	Buffer		buffer;
164 	Page		page;
165 
166 	/*
167 	 * Insertion to an internal page finishes an incomplete split at the child
168 	 * level.  Clear the incomplete-split flag in the child.  Note: during
169 	 * normal operation, the child and parent pages are locked at the same
170 	 * time, so that clearing the flag and inserting the downlink appear
171 	 * atomic to other backends.  We don't bother with that during replay,
172 	 * because readers don't care about the incomplete-split flag and there
173 	 * cannot be updates happening.
174 	 */
175 	if (!isleaf)
176 		_bt_clear_incomplete_split(record, 1);
177 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
178 	{
179 		Size		datalen;
180 		char	   *datapos = XLogRecGetBlockData(record, 0, &datalen);
181 
182 		page = BufferGetPage(buffer);
183 
184 		if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
185 						false, false) == InvalidOffsetNumber)
186 			elog(PANIC, "btree_insert_redo: failed to add item");
187 
188 		PageSetLSN(page, lsn);
189 		MarkBufferDirty(buffer);
190 	}
191 	if (BufferIsValid(buffer))
192 		UnlockReleaseBuffer(buffer);
193 
194 	/*
195 	 * Note: in normal operation, we'd update the metapage while still holding
196 	 * lock on the page we inserted into.  But during replay it's not
197 	 * necessary to hold that lock, since no other index updates can be
198 	 * happening concurrently, and readers will cope fine with following an
199 	 * obsolete link from the metapage.
200 	 */
201 	if (ismeta)
202 		_bt_restore_meta(record, 2);
203 }
204 
205 static void
btree_xlog_split(bool onleft,bool lhighkey,XLogReaderState * record)206 btree_xlog_split(bool onleft, bool lhighkey, XLogReaderState *record)
207 {
208 	XLogRecPtr	lsn = record->EndRecPtr;
209 	xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
210 	bool		isleaf = (xlrec->level == 0);
211 	Buffer		lbuf;
212 	Buffer		rbuf;
213 	Page		rpage;
214 	BTPageOpaque ropaque;
215 	char	   *datapos;
216 	Size		datalen;
217 	IndexTuple	left_hikey = NULL;
218 	Size		left_hikeysz = 0;
219 	BlockNumber leftsib;
220 	BlockNumber rightsib;
221 	BlockNumber rnext;
222 
223 	XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
224 	XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
225 	if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
226 		rnext = P_NONE;
227 
228 	/*
229 	 * Clear the incomplete split flag on the left sibling of the child page
230 	 * this is a downlink for.  (Like in btree_xlog_insert, this can be done
231 	 * before locking the other pages)
232 	 */
233 	if (!isleaf)
234 		_bt_clear_incomplete_split(record, 3);
235 
236 	/* Reconstruct right (new) sibling page from scratch */
237 	rbuf = XLogInitBufferForRedo(record, 1);
238 	datapos = XLogRecGetBlockData(record, 1, &datalen);
239 	rpage = (Page) BufferGetPage(rbuf);
240 
241 	_bt_pageinit(rpage, BufferGetPageSize(rbuf));
242 	ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
243 
244 	ropaque->btpo_prev = leftsib;
245 	ropaque->btpo_next = rnext;
246 	ropaque->btpo.level = xlrec->level;
247 	ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
248 	ropaque->btpo_cycleid = 0;
249 
250 	_bt_restore_page(rpage, datapos, datalen);
251 
252 	/*
253 	 * When the high key isn't present is the wal record, then we assume it to
254 	 * be equal to the first key on the right page.  It must be from the leaf
255 	 * level.
256 	 */
257 	if (!lhighkey)
258 	{
259 		ItemId		hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
260 
261 		Assert(isleaf);
262 		left_hikey = (IndexTuple) PageGetItem(rpage, hiItemId);
263 		left_hikeysz = ItemIdGetLength(hiItemId);
264 	}
265 
266 	PageSetLSN(rpage, lsn);
267 	MarkBufferDirty(rbuf);
268 
269 	/* don't release the buffer yet; we touch right page's first item below */
270 
271 	/* Now reconstruct left (original) sibling page */
272 	if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
273 	{
274 		/*
275 		 * To retain the same physical order of the tuples that they had, we
276 		 * initialize a temporary empty page for the left page and add all the
277 		 * items to that in item number order.  This mirrors how _bt_split()
278 		 * works.  It's not strictly required to retain the same physical
279 		 * order, as long as the items are in the correct item number order,
280 		 * but it helps debugging.  See also _bt_restore_page(), which does
281 		 * the same for the right page.
282 		 */
283 		Page		lpage = (Page) BufferGetPage(lbuf);
284 		BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
285 		OffsetNumber off;
286 		IndexTuple	newitem = NULL;
287 		Size		newitemsz = 0;
288 		Page		newlpage;
289 		OffsetNumber leftoff;
290 
291 		datapos = XLogRecGetBlockData(record, 0, &datalen);
292 
293 		if (onleft)
294 		{
295 			newitem = (IndexTuple) datapos;
296 			newitemsz = MAXALIGN(IndexTupleSize(newitem));
297 			datapos += newitemsz;
298 			datalen -= newitemsz;
299 		}
300 
301 		/* Extract left hikey and its size (assuming 16-bit alignment) */
302 		if (lhighkey)
303 		{
304 			left_hikey = (IndexTuple) datapos;
305 			left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
306 			datapos += left_hikeysz;
307 			datalen -= left_hikeysz;
308 		}
309 
310 		Assert(datalen == 0);
311 
312 		newlpage = PageGetTempPageCopySpecial(lpage);
313 
314 		/* Set high key */
315 		leftoff = P_HIKEY;
316 		if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz,
317 						P_HIKEY, false, false) == InvalidOffsetNumber)
318 			elog(PANIC, "failed to add high key to left page after split");
319 		leftoff = OffsetNumberNext(leftoff);
320 
321 		for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
322 		{
323 			ItemId		itemid;
324 			Size		itemsz;
325 			IndexTuple	item;
326 
327 			/* add the new item if it was inserted on left page */
328 			if (onleft && off == xlrec->newitemoff)
329 			{
330 				if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
331 								false, false) == InvalidOffsetNumber)
332 					elog(ERROR, "failed to add new item to left page after split");
333 				leftoff = OffsetNumberNext(leftoff);
334 			}
335 
336 			itemid = PageGetItemId(lpage, off);
337 			itemsz = ItemIdGetLength(itemid);
338 			item = (IndexTuple) PageGetItem(lpage, itemid);
339 			if (PageAddItem(newlpage, (Item) item, itemsz, leftoff,
340 							false, false) == InvalidOffsetNumber)
341 				elog(ERROR, "failed to add old item to left page after split");
342 			leftoff = OffsetNumberNext(leftoff);
343 		}
344 
345 		/* cope with possibility that newitem goes at the end */
346 		if (onleft && off == xlrec->newitemoff)
347 		{
348 			if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
349 							false, false) == InvalidOffsetNumber)
350 				elog(ERROR, "failed to add new item to left page after split");
351 			leftoff = OffsetNumberNext(leftoff);
352 		}
353 
354 		PageRestoreTempPage(newlpage, lpage);
355 
356 		/* Fix opaque fields */
357 		lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
358 		if (isleaf)
359 			lopaque->btpo_flags |= BTP_LEAF;
360 		lopaque->btpo_next = rightsib;
361 		lopaque->btpo_cycleid = 0;
362 
363 		PageSetLSN(lpage, lsn);
364 		MarkBufferDirty(lbuf);
365 	}
366 
367 	/* We no longer need the buffers */
368 	if (BufferIsValid(lbuf))
369 		UnlockReleaseBuffer(lbuf);
370 	UnlockReleaseBuffer(rbuf);
371 
372 	/*
373 	 * Fix left-link of the page to the right of the new right sibling.
374 	 *
375 	 * Note: in normal operation, we do this while still holding lock on the
376 	 * two split pages.  However, that's not necessary for correctness in WAL
377 	 * replay, because no other index update can be in progress, and readers
378 	 * will cope properly when following an obsolete left-link.
379 	 */
380 	if (rnext != P_NONE)
381 	{
382 		Buffer		buffer;
383 
384 		if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
385 		{
386 			Page		page = (Page) BufferGetPage(buffer);
387 			BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
388 
389 			pageop->btpo_prev = rightsib;
390 
391 			PageSetLSN(page, lsn);
392 			MarkBufferDirty(buffer);
393 		}
394 		if (BufferIsValid(buffer))
395 			UnlockReleaseBuffer(buffer);
396 	}
397 }
398 
399 static void
btree_xlog_vacuum(XLogReaderState * record)400 btree_xlog_vacuum(XLogReaderState *record)
401 {
402 	XLogRecPtr	lsn = record->EndRecPtr;
403 	Buffer		buffer;
404 	Page		page;
405 	BTPageOpaque opaque;
406 #ifdef UNUSED
407 	xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
408 
409 	/*
410 	 * This section of code is thought to be no longer needed, after analysis
411 	 * of the calling paths. It is retained to allow the code to be reinstated
412 	 * if a flaw is revealed in that thinking.
413 	 *
414 	 * If we are running non-MVCC scans using this index we need to do some
415 	 * additional work to ensure correctness, which is known as a "pin scan"
416 	 * described in more detail in next paragraphs. We used to do the extra
417 	 * work in all cases, whereas we now avoid that work in most cases. If
418 	 * lastBlockVacuumed is set to InvalidBlockNumber then we skip the
419 	 * additional work required for the pin scan.
420 	 *
421 	 * Avoiding this extra work is important since it requires us to touch
422 	 * every page in the index, so is an O(N) operation. Worse, it is an
423 	 * operation performed in the foreground during redo, so it delays
424 	 * replication directly.
425 	 *
426 	 * If queries might be active then we need to ensure every leaf page is
427 	 * unpinned between the lastBlockVacuumed and the current block, if there
428 	 * are any.  This prevents replay of the VACUUM from reaching the stage of
429 	 * removing heap tuples while there could still be indexscans "in flight"
430 	 * to those particular tuples for those scans which could be confused by
431 	 * finding new tuples at the old TID locations (see nbtree/README).
432 	 *
433 	 * It might be worth checking if there are actually any backends running;
434 	 * if not, we could just skip this.
435 	 *
436 	 * Since VACUUM can visit leaf pages out-of-order, it might issue records
437 	 * with lastBlockVacuumed >= block; that's not an error, it just means
438 	 * nothing to do now.
439 	 *
440 	 * Note: since we touch all pages in the range, we will lock non-leaf
441 	 * pages, and also any empty (all-zero) pages that may be in the index. It
442 	 * doesn't seem worth the complexity to avoid that.  But it's important
443 	 * that HotStandbyActiveInReplay() will not return true if the database
444 	 * isn't yet consistent; so we need not fear reading still-corrupt blocks
445 	 * here during crash recovery.
446 	 */
447 	if (HotStandbyActiveInReplay() && BlockNumberIsValid(xlrec->lastBlockVacuumed))
448 	{
449 		RelFileNode thisrnode;
450 		BlockNumber thisblkno;
451 		BlockNumber blkno;
452 
453 		XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
454 
455 		for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
456 		{
457 			/*
458 			 * We use RBM_NORMAL_NO_LOG mode because it's not an error
459 			 * condition to see all-zero pages.  The original btvacuumpage
460 			 * scan would have skipped over all-zero pages, noting them in FSM
461 			 * but not bothering to initialize them just yet; so we mustn't
462 			 * throw an error here.  (We could skip acquiring the cleanup lock
463 			 * if PageIsNew, but it's probably not worth the cycles to test.)
464 			 *
465 			 * XXX we don't actually need to read the block, we just need to
466 			 * confirm it is unpinned. If we had a special call into the
467 			 * buffer manager we could optimise this so that if the block is
468 			 * not in shared_buffers we confirm it as unpinned. Optimizing
469 			 * this is now moot, since in most cases we avoid the scan.
470 			 */
471 			buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
472 											RBM_NORMAL_NO_LOG);
473 			if (BufferIsValid(buffer))
474 			{
475 				LockBufferForCleanup(buffer);
476 				UnlockReleaseBuffer(buffer);
477 			}
478 		}
479 	}
480 #endif
481 
482 	/*
483 	 * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
484 	 * page. See nbtree/README for details.
485 	 */
486 	if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
487 		== BLK_NEEDS_REDO)
488 	{
489 		char	   *ptr;
490 		Size		len;
491 
492 		ptr = XLogRecGetBlockData(record, 0, &len);
493 
494 		page = (Page) BufferGetPage(buffer);
495 
496 		if (len > 0)
497 		{
498 			OffsetNumber *unused;
499 			OffsetNumber *unend;
500 
501 			unused = (OffsetNumber *) ptr;
502 			unend = (OffsetNumber *) ((char *) ptr + len);
503 
504 			if ((unend - unused) > 0)
505 				PageIndexMultiDelete(page, unused, unend - unused);
506 		}
507 
508 		/*
509 		 * Mark the page as not containing any LP_DEAD items --- see comments
510 		 * in _bt_delitems_vacuum().
511 		 */
512 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
513 		opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
514 
515 		PageSetLSN(page, lsn);
516 		MarkBufferDirty(buffer);
517 	}
518 	if (BufferIsValid(buffer))
519 		UnlockReleaseBuffer(buffer);
520 }
521 
522 /*
523  * Get the latestRemovedXid from the heap pages pointed at by the index
524  * tuples being deleted. This puts the work for calculating latestRemovedXid
525  * into the recovery path rather than the primary path.
526  *
527  * It's possible that this generates a fair amount of I/O, since an index
528  * block may have hundreds of tuples being deleted. Repeat accesses to the
529  * same heap blocks are common, though are not yet optimised.
530  *
531  * XXX optimise later with something like XLogPrefetchBuffer()
532  */
533 static TransactionId
btree_xlog_delete_get_latestRemovedXid(XLogReaderState * record)534 btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
535 {
536 	xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
537 	OffsetNumber *unused;
538 	Buffer		ibuffer,
539 				hbuffer;
540 	Page		ipage,
541 				hpage;
542 	RelFileNode rnode;
543 	BlockNumber blkno;
544 	ItemId		iitemid,
545 				hitemid;
546 	IndexTuple	itup;
547 	HeapTupleHeader htuphdr;
548 	BlockNumber hblkno;
549 	OffsetNumber hoffnum;
550 	TransactionId latestRemovedXid = InvalidTransactionId;
551 	int			i;
552 
553 	/*
554 	 * If there's nothing running on the standby we don't need to derive a
555 	 * full latestRemovedXid value, so use a fast path out of here.  This
556 	 * returns InvalidTransactionId, and so will conflict with all HS
557 	 * transactions; but since we just worked out that that's zero people,
558 	 * it's OK.
559 	 *
560 	 * XXX There is a race condition here, which is that a new backend might
561 	 * start just after we look.  If so, it cannot need to conflict, but this
562 	 * coding will result in throwing a conflict anyway.
563 	 */
564 	if (CountDBBackends(InvalidOid) == 0)
565 		return latestRemovedXid;
566 
567 	/*
568 	 * In what follows, we have to examine the previous state of the index
569 	 * page, as well as the heap page(s) it points to.  This is only valid if
570 	 * WAL replay has reached a consistent database state; which means that
571 	 * the preceding check is not just an optimization, but is *necessary*. We
572 	 * won't have let in any user sessions before we reach consistency.
573 	 */
574 	if (!reachedConsistency)
575 		elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
576 
577 	/*
578 	 * Get index page.  If the DB is consistent, this should not fail, nor
579 	 * should any of the heap page fetches below.  If one does, we return
580 	 * InvalidTransactionId to cancel all HS transactions.  That's probably
581 	 * overkill, but it's safe, and certainly better than panicking here.
582 	 */
583 	XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
584 	ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
585 	if (!BufferIsValid(ibuffer))
586 		return InvalidTransactionId;
587 	LockBuffer(ibuffer, BT_READ);
588 	ipage = (Page) BufferGetPage(ibuffer);
589 
590 	/*
591 	 * Loop through the deleted index items to obtain the TransactionId from
592 	 * the heap items they point to.
593 	 */
594 	unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
595 
596 	for (i = 0; i < xlrec->nitems; i++)
597 	{
598 		/*
599 		 * Identify the index tuple about to be deleted
600 		 */
601 		iitemid = PageGetItemId(ipage, unused[i]);
602 		itup = (IndexTuple) PageGetItem(ipage, iitemid);
603 
604 		/*
605 		 * Locate the heap page that the index tuple points at
606 		 */
607 		hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
608 		hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL);
609 		if (!BufferIsValid(hbuffer))
610 		{
611 			UnlockReleaseBuffer(ibuffer);
612 			return InvalidTransactionId;
613 		}
614 		LockBuffer(hbuffer, BT_READ);
615 		hpage = (Page) BufferGetPage(hbuffer);
616 
617 		/*
618 		 * Look up the heap tuple header that the index tuple points at by
619 		 * using the heap node supplied with the xlrec. We can't use
620 		 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
621 		 * Note that we are not looking at tuple data here, just headers.
622 		 */
623 		hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
624 		hitemid = PageGetItemId(hpage, hoffnum);
625 
626 		/*
627 		 * Follow any redirections until we find something useful.
628 		 */
629 		while (ItemIdIsRedirected(hitemid))
630 		{
631 			hoffnum = ItemIdGetRedirect(hitemid);
632 			hitemid = PageGetItemId(hpage, hoffnum);
633 			CHECK_FOR_INTERRUPTS();
634 		}
635 
636 		/*
637 		 * If the heap item has storage, then read the header and use that to
638 		 * set latestRemovedXid.
639 		 *
640 		 * Some LP_DEAD items may not be accessible, so we ignore them.
641 		 */
642 		if (ItemIdHasStorage(hitemid))
643 		{
644 			htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
645 
646 			HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
647 		}
648 		else if (ItemIdIsDead(hitemid))
649 		{
650 			/*
651 			 * Conjecture: if hitemid is dead then it had xids before the xids
652 			 * marked on LP_NORMAL items. So we just ignore this item and move
653 			 * onto the next, for the purposes of calculating
654 			 * latestRemovedxids.
655 			 */
656 		}
657 		else
658 			Assert(!ItemIdIsUsed(hitemid));
659 
660 		UnlockReleaseBuffer(hbuffer);
661 	}
662 
663 	UnlockReleaseBuffer(ibuffer);
664 
665 	/*
666 	 * If all heap tuples were LP_DEAD then we will be returning
667 	 * InvalidTransactionId here, which avoids conflicts. This matches
668 	 * existing logic which assumes that LP_DEAD tuples must already be older
669 	 * than the latestRemovedXid on the cleanup record that set them as
670 	 * LP_DEAD, hence must already have generated a conflict.
671 	 */
672 	return latestRemovedXid;
673 }
674 
675 static void
btree_xlog_delete(XLogReaderState * record)676 btree_xlog_delete(XLogReaderState *record)
677 {
678 	XLogRecPtr	lsn = record->EndRecPtr;
679 	xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
680 	Buffer		buffer;
681 	Page		page;
682 	BTPageOpaque opaque;
683 
684 	/*
685 	 * If we have any conflict processing to do, it must happen before we
686 	 * update the page.
687 	 *
688 	 * Btree delete records can conflict with standby queries.  You might
689 	 * think that vacuum records would conflict as well, but we've handled
690 	 * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
691 	 * cleaned by the vacuum of the heap and so we can resolve any conflicts
692 	 * just once when that arrives.  After that we know that no conflicts
693 	 * exist from individual btree vacuum records on that index.
694 	 */
695 	if (InHotStandby)
696 	{
697 		TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
698 		RelFileNode rnode;
699 
700 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
701 
702 		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
703 	}
704 
705 	/*
706 	 * We don't need to take a cleanup lock to apply these changes. See
707 	 * nbtree/README for details.
708 	 */
709 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
710 	{
711 		page = (Page) BufferGetPage(buffer);
712 
713 		if (XLogRecGetDataLen(record) > SizeOfBtreeDelete)
714 		{
715 			OffsetNumber *unused;
716 
717 			unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
718 
719 			PageIndexMultiDelete(page, unused, xlrec->nitems);
720 		}
721 
722 		/*
723 		 * Mark the page as not containing any LP_DEAD items --- see comments
724 		 * in _bt_delitems_delete().
725 		 */
726 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
727 		opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
728 
729 		PageSetLSN(page, lsn);
730 		MarkBufferDirty(buffer);
731 	}
732 	if (BufferIsValid(buffer))
733 		UnlockReleaseBuffer(buffer);
734 }
735 
736 static void
btree_xlog_mark_page_halfdead(uint8 info,XLogReaderState * record)737 btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
738 {
739 	XLogRecPtr	lsn = record->EndRecPtr;
740 	xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
741 	Buffer		buffer;
742 	Page		page;
743 	BTPageOpaque pageop;
744 	IndexTupleData trunctuple;
745 
746 	/*
747 	 * In normal operation, we would lock all the pages this WAL record
748 	 * touches before changing any of them.  In WAL replay, it should be okay
749 	 * to lock just one page at a time, since no concurrent index updates can
750 	 * be happening, and readers should not care whether they arrive at the
751 	 * target page or not (since it's surely empty).
752 	 */
753 
754 	/* parent page */
755 	if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
756 	{
757 		OffsetNumber poffset;
758 		ItemId		itemid;
759 		IndexTuple	itup;
760 		OffsetNumber nextoffset;
761 		BlockNumber rightsib;
762 
763 		page = (Page) BufferGetPage(buffer);
764 		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
765 
766 		poffset = xlrec->poffset;
767 
768 		nextoffset = OffsetNumberNext(poffset);
769 		itemid = PageGetItemId(page, nextoffset);
770 		itup = (IndexTuple) PageGetItem(page, itemid);
771 		rightsib = BTreeInnerTupleGetDownLink(itup);
772 
773 		itemid = PageGetItemId(page, poffset);
774 		itup = (IndexTuple) PageGetItem(page, itemid);
775 		BTreeInnerTupleSetDownLink(itup, rightsib);
776 		nextoffset = OffsetNumberNext(poffset);
777 		PageIndexTupleDelete(page, nextoffset);
778 
779 		PageSetLSN(page, lsn);
780 		MarkBufferDirty(buffer);
781 	}
782 	if (BufferIsValid(buffer))
783 		UnlockReleaseBuffer(buffer);
784 
785 	/* Rewrite the leaf page as a halfdead page */
786 	buffer = XLogInitBufferForRedo(record, 0);
787 	page = (Page) BufferGetPage(buffer);
788 
789 	_bt_pageinit(page, BufferGetPageSize(buffer));
790 	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
791 
792 	pageop->btpo_prev = xlrec->leftblk;
793 	pageop->btpo_next = xlrec->rightblk;
794 	pageop->btpo.level = 0;
795 	pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
796 	pageop->btpo_cycleid = 0;
797 
798 	/*
799 	 * Construct a dummy hikey item that points to the next parent to be
800 	 * deleted (if any).
801 	 */
802 	MemSet(&trunctuple, 0, sizeof(IndexTupleData));
803 	trunctuple.t_info = sizeof(IndexTupleData);
804 	BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
805 
806 	if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
807 					false, false) == InvalidOffsetNumber)
808 		elog(ERROR, "could not add dummy high key to half-dead page");
809 
810 	PageSetLSN(page, lsn);
811 	MarkBufferDirty(buffer);
812 	UnlockReleaseBuffer(buffer);
813 }
814 
815 
816 static void
btree_xlog_unlink_page(uint8 info,XLogReaderState * record)817 btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
818 {
819 	XLogRecPtr	lsn = record->EndRecPtr;
820 	xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
821 	BlockNumber leftsib;
822 	BlockNumber rightsib;
823 	Buffer		buffer;
824 	Page		page;
825 	BTPageOpaque pageop;
826 
827 	leftsib = xlrec->leftsib;
828 	rightsib = xlrec->rightsib;
829 
830 	/*
831 	 * In normal operation, we would lock all the pages this WAL record
832 	 * touches before changing any of them.  In WAL replay, it should be okay
833 	 * to lock just one page at a time, since no concurrent index updates can
834 	 * be happening, and readers should not care whether they arrive at the
835 	 * target page or not (since it's surely empty).
836 	 */
837 
838 	/* Fix left-link of right sibling */
839 	if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
840 	{
841 		page = (Page) BufferGetPage(buffer);
842 		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
843 		pageop->btpo_prev = leftsib;
844 
845 		PageSetLSN(page, lsn);
846 		MarkBufferDirty(buffer);
847 	}
848 	if (BufferIsValid(buffer))
849 		UnlockReleaseBuffer(buffer);
850 
851 	/* Fix right-link of left sibling, if any */
852 	if (leftsib != P_NONE)
853 	{
854 		if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
855 		{
856 			page = (Page) BufferGetPage(buffer);
857 			pageop = (BTPageOpaque) PageGetSpecialPointer(page);
858 			pageop->btpo_next = rightsib;
859 
860 			PageSetLSN(page, lsn);
861 			MarkBufferDirty(buffer);
862 		}
863 		if (BufferIsValid(buffer))
864 			UnlockReleaseBuffer(buffer);
865 	}
866 
867 	/* Rewrite target page as empty deleted page */
868 	buffer = XLogInitBufferForRedo(record, 0);
869 	page = (Page) BufferGetPage(buffer);
870 
871 	_bt_pageinit(page, BufferGetPageSize(buffer));
872 	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
873 
874 	pageop->btpo_prev = leftsib;
875 	pageop->btpo_next = rightsib;
876 	pageop->btpo.xact = xlrec->btpo_xact;
877 	pageop->btpo_flags = BTP_DELETED;
878 	pageop->btpo_cycleid = 0;
879 
880 	PageSetLSN(page, lsn);
881 	MarkBufferDirty(buffer);
882 	UnlockReleaseBuffer(buffer);
883 
884 	/*
885 	 * If we deleted a parent of the targeted leaf page, instead of the leaf
886 	 * itself, update the leaf to point to the next remaining child in the
887 	 * branch.
888 	 */
889 	if (XLogRecHasBlockRef(record, 3))
890 	{
891 		/*
892 		 * There is no real data on the page, so we just re-create it from
893 		 * scratch using the information from the WAL record.
894 		 */
895 		IndexTupleData trunctuple;
896 
897 		buffer = XLogInitBufferForRedo(record, 3);
898 		page = (Page) BufferGetPage(buffer);
899 
900 		_bt_pageinit(page, BufferGetPageSize(buffer));
901 		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
902 
903 		pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
904 		pageop->btpo_prev = xlrec->leafleftsib;
905 		pageop->btpo_next = xlrec->leafrightsib;
906 		pageop->btpo.level = 0;
907 		pageop->btpo_cycleid = 0;
908 
909 		/* Add a dummy hikey item */
910 		MemSet(&trunctuple, 0, sizeof(IndexTupleData));
911 		trunctuple.t_info = sizeof(IndexTupleData);
912 		BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
913 
914 		if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
915 						false, false) == InvalidOffsetNumber)
916 			elog(ERROR, "could not add dummy high key to half-dead page");
917 
918 		PageSetLSN(page, lsn);
919 		MarkBufferDirty(buffer);
920 		UnlockReleaseBuffer(buffer);
921 	}
922 
923 	/* Update metapage if needed */
924 	if (info == XLOG_BTREE_UNLINK_PAGE_META)
925 		_bt_restore_meta(record, 4);
926 }
927 
928 static void
btree_xlog_newroot(XLogReaderState * record)929 btree_xlog_newroot(XLogReaderState *record)
930 {
931 	XLogRecPtr	lsn = record->EndRecPtr;
932 	xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
933 	Buffer		buffer;
934 	Page		page;
935 	BTPageOpaque pageop;
936 	char	   *ptr;
937 	Size		len;
938 
939 	buffer = XLogInitBufferForRedo(record, 0);
940 	page = (Page) BufferGetPage(buffer);
941 
942 	_bt_pageinit(page, BufferGetPageSize(buffer));
943 	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
944 
945 	pageop->btpo_flags = BTP_ROOT;
946 	pageop->btpo_prev = pageop->btpo_next = P_NONE;
947 	pageop->btpo.level = xlrec->level;
948 	if (xlrec->level == 0)
949 		pageop->btpo_flags |= BTP_LEAF;
950 	pageop->btpo_cycleid = 0;
951 
952 	if (xlrec->level > 0)
953 	{
954 		ptr = XLogRecGetBlockData(record, 0, &len);
955 		_bt_restore_page(page, ptr, len);
956 
957 		/* Clear the incomplete-split flag in left child */
958 		_bt_clear_incomplete_split(record, 1);
959 	}
960 
961 	PageSetLSN(page, lsn);
962 	MarkBufferDirty(buffer);
963 	UnlockReleaseBuffer(buffer);
964 
965 	_bt_restore_meta(record, 2);
966 }
967 
968 static void
btree_xlog_reuse_page(XLogReaderState * record)969 btree_xlog_reuse_page(XLogReaderState *record)
970 {
971 	xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
972 
973 	/*
974 	 * Btree reuse_page records exist to provide a conflict point when we
975 	 * reuse pages in the index via the FSM.  That's all they do though.
976 	 *
977 	 * latestRemovedXid was the page's btpo.xact.  The btpo.xact <
978 	 * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
979 	 * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
980 	 * Consequently, one XID value achieves the same exclusion effect on
981 	 * master and standby.
982 	 */
983 	if (InHotStandby)
984 	{
985 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
986 											xlrec->node);
987 	}
988 }
989 
990 void
btree_redo(XLogReaderState * record)991 btree_redo(XLogReaderState *record)
992 {
993 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
994 
995 	switch (info)
996 	{
997 		case XLOG_BTREE_INSERT_LEAF:
998 			btree_xlog_insert(true, false, record);
999 			break;
1000 		case XLOG_BTREE_INSERT_UPPER:
1001 			btree_xlog_insert(false, false, record);
1002 			break;
1003 		case XLOG_BTREE_INSERT_META:
1004 			btree_xlog_insert(false, true, record);
1005 			break;
1006 		case XLOG_BTREE_SPLIT_L:
1007 			btree_xlog_split(true, false, record);
1008 			break;
1009 		case XLOG_BTREE_SPLIT_L_HIGHKEY:
1010 			btree_xlog_split(true, true, record);
1011 			break;
1012 		case XLOG_BTREE_SPLIT_R:
1013 			btree_xlog_split(false, false, record);
1014 			break;
1015 		case XLOG_BTREE_SPLIT_R_HIGHKEY:
1016 			btree_xlog_split(false, true, record);
1017 			break;
1018 		case XLOG_BTREE_VACUUM:
1019 			btree_xlog_vacuum(record);
1020 			break;
1021 		case XLOG_BTREE_DELETE:
1022 			btree_xlog_delete(record);
1023 			break;
1024 		case XLOG_BTREE_MARK_PAGE_HALFDEAD:
1025 			btree_xlog_mark_page_halfdead(info, record);
1026 			break;
1027 		case XLOG_BTREE_UNLINK_PAGE:
1028 		case XLOG_BTREE_UNLINK_PAGE_META:
1029 			btree_xlog_unlink_page(info, record);
1030 			break;
1031 		case XLOG_BTREE_NEWROOT:
1032 			btree_xlog_newroot(record);
1033 			break;
1034 		case XLOG_BTREE_REUSE_PAGE:
1035 			btree_xlog_reuse_page(record);
1036 			break;
1037 		case XLOG_BTREE_META_CLEANUP:
1038 			_bt_restore_meta(record, 0);
1039 			break;
1040 		default:
1041 			elog(PANIC, "btree_redo: unknown op code %u", info);
1042 	}
1043 }
1044 
1045 /*
1046  * Mask a btree page before performing consistency checks on it.
1047  */
1048 void
btree_mask(char * pagedata,BlockNumber blkno)1049 btree_mask(char *pagedata, BlockNumber blkno)
1050 {
1051 	Page		page = (Page) pagedata;
1052 	BTPageOpaque maskopaq;
1053 
1054 	mask_page_lsn_and_checksum(page);
1055 
1056 	mask_page_hint_bits(page);
1057 	mask_unused_space(page);
1058 
1059 	maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
1060 
1061 	if (P_ISDELETED(maskopaq))
1062 	{
1063 		/*
1064 		 * Mask page content on a DELETED page since it will be re-initialized
1065 		 * during replay. See btree_xlog_unlink_page() for details.
1066 		 */
1067 		mask_page_content(page);
1068 	}
1069 	else if (P_ISLEAF(maskopaq))
1070 	{
1071 		/*
1072 		 * In btree leaf pages, it is possible to modify the LP_FLAGS without
1073 		 * emitting any WAL record. Hence, mask the line pointer flags. See
1074 		 * _bt_killitems(), _bt_check_unique() for details.
1075 		 */
1076 		mask_lp_flags(page);
1077 	}
1078 
1079 	/*
1080 	 * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
1081 	 * _bt_killitems(), _bt_check_unique() for details.
1082 	 */
1083 	maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
1084 
1085 	/*
1086 	 * During replay of a btree page split, we don't set the BTP_SPLIT_END
1087 	 * flag of the right sibling and initialize the cycle_id to 0 for the same
1088 	 * page. See btree_xlog_split() for details.
1089 	 */
1090 	maskopaq->btpo_flags &= ~BTP_SPLIT_END;
1091 	maskopaq->btpo_cycleid = 0;
1092 }
1093