1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *	  Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/nbtree.h"
19 #include "access/nbtxlog.h"
20 #include "access/tableam.h"
21 #include "access/transam.h"
22 #include "access/xloginsert.h"
23 #include "miscadmin.h"
24 #include "storage/lmgr.h"
25 #include "storage/predicate.h"
26 #include "storage/smgr.h"
27 
28 /* Minimum tree height for application of fastpath optimization */
29 #define BTREE_FASTPATH_MIN_LEVEL	2
30 
31 
32 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
33 
34 static TransactionId _bt_check_unique(Relation rel, BTInsertState insertstate,
35 									  Relation heapRel,
36 									  IndexUniqueCheck checkUnique, bool *is_unique,
37 									  uint32 *speculativeToken);
38 static OffsetNumber _bt_findinsertloc(Relation rel,
39 									  BTInsertState insertstate,
40 									  bool checkingunique,
41 									  BTStack stack,
42 									  Relation heapRel);
43 static void _bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack);
44 static void _bt_insertonpg(Relation rel, BTScanInsert itup_key,
45 						   Buffer buf,
46 						   Buffer cbuf,
47 						   BTStack stack,
48 						   IndexTuple itup,
49 						   OffsetNumber newitemoff,
50 						   bool split_only_page);
51 static Buffer _bt_split(Relation rel, BTScanInsert itup_key, Buffer buf,
52 						Buffer cbuf, OffsetNumber newitemoff, Size newitemsz,
53 						IndexTuple newitem);
54 static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
55 							  BTStack stack, bool is_root, bool is_only);
56 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
57 						 OffsetNumber itup_off);
58 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
59 
60 /*
61  *	_bt_doinsert() -- Handle insertion of a single index tuple in the tree.
62  *
63  *		This routine is called by the public interface routine, btinsert.
64  *		By here, itup is filled in, including the TID.
65  *
66  *		If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
67  *		will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
68  *		UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
69  *		For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
70  *		don't actually insert.
71  *
72  *		The result value is only significant for UNIQUE_CHECK_PARTIAL:
73  *		it must be true if the entry is known unique, else false.
74  *		(In the current implementation we'll also return true after a
75  *		successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
76  *		that's just a coding artifact.)
77  */
78 bool
_bt_doinsert(Relation rel,IndexTuple itup,IndexUniqueCheck checkUnique,Relation heapRel)79 _bt_doinsert(Relation rel, IndexTuple itup,
80 			 IndexUniqueCheck checkUnique, Relation heapRel)
81 {
82 	bool		is_unique = false;
83 	BTInsertStateData insertstate;
84 	BTScanInsert itup_key;
85 	BTStack		stack = NULL;
86 	Buffer		buf;
87 	bool		fastpath;
88 	bool		checkingunique = (checkUnique != UNIQUE_CHECK_NO);
89 
90 	/* we need an insertion scan key to do our search, so build one */
91 	itup_key = _bt_mkscankey(rel, itup);
92 
93 	if (checkingunique)
94 	{
95 		if (!itup_key->anynullkeys)
96 		{
97 			/* No (heapkeyspace) scantid until uniqueness established */
98 			itup_key->scantid = NULL;
99 		}
100 		else
101 		{
102 			/*
103 			 * Scan key for new tuple contains NULL key values.  Bypass
104 			 * checkingunique steps.  They are unnecessary because core code
105 			 * considers NULL unequal to every value, including NULL.
106 			 *
107 			 * This optimization avoids O(N^2) behavior within the
108 			 * _bt_findinsertloc() heapkeyspace path when a unique index has a
109 			 * large number of "duplicates" with NULL key values.
110 			 */
111 			checkingunique = false;
112 			/* Tuple is unique in the sense that core code cares about */
113 			Assert(checkUnique != UNIQUE_CHECK_EXISTING);
114 			is_unique = true;
115 		}
116 	}
117 
118 	/*
119 	 * Fill in the BTInsertState working area, to track the current page and
120 	 * position within the page to insert on
121 	 */
122 	insertstate.itup = itup;
123 	/* PageAddItem will MAXALIGN(), but be consistent */
124 	insertstate.itemsz = MAXALIGN(IndexTupleSize(itup));
125 	insertstate.itup_key = itup_key;
126 	insertstate.bounds_valid = false;
127 	insertstate.buf = InvalidBuffer;
128 
129 	/*
130 	 * It's very common to have an index on an auto-incremented or
131 	 * monotonically increasing value. In such cases, every insertion happens
132 	 * towards the end of the index. We try to optimize that case by caching
133 	 * the right-most leaf of the index. If our cached block is still the
134 	 * rightmost leaf, has enough free space to accommodate a new entry and
135 	 * the insertion key is strictly greater than the first key in this page,
136 	 * then we can safely conclude that the new key will be inserted in the
137 	 * cached block. So we simply search within the cached block and insert
138 	 * the key at the appropriate location. We call it a fastpath.
139 	 *
140 	 * Testing has revealed, though, that the fastpath can result in increased
141 	 * contention on the exclusive-lock on the rightmost leaf page. So we
142 	 * conditionally check if the lock is available. If it's not available
143 	 * then we simply abandon the fastpath and take the regular path. This
144 	 * makes sense because unavailability of the lock also signals that some
145 	 * other backend might be concurrently inserting into the page, thus
146 	 * reducing our chances to finding an insertion place in this page.
147 	 */
148 top:
149 	fastpath = false;
150 	if (RelationGetTargetBlock(rel) != InvalidBlockNumber)
151 	{
152 		Page		page;
153 		BTPageOpaque lpageop;
154 
155 		/*
156 		 * Conditionally acquire exclusive lock on the buffer before doing any
157 		 * checks. If we don't get the lock, we simply follow slowpath. If we
158 		 * do get the lock, this ensures that the index state cannot change,
159 		 * as far as the rightmost part of the index is concerned.
160 		 */
161 		buf = ReadBuffer(rel, RelationGetTargetBlock(rel));
162 
163 		if (ConditionalLockBuffer(buf))
164 		{
165 			_bt_checkpage(rel, buf);
166 
167 			page = BufferGetPage(buf);
168 
169 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
170 
171 			/*
172 			 * Check if the page is still the rightmost leaf page, has enough
173 			 * free space to accommodate the new tuple, and the insertion scan
174 			 * key is strictly greater than the first key on the page.
175 			 */
176 			if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) &&
177 				!P_IGNORE(lpageop) &&
178 				(PageGetFreeSpace(page) > insertstate.itemsz) &&
179 				PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
180 				_bt_compare(rel, itup_key, page, P_FIRSTDATAKEY(lpageop)) > 0)
181 			{
182 				/*
183 				 * The right-most block should never have an incomplete split.
184 				 * But be paranoid and check for it anyway.
185 				 */
186 				Assert(!P_INCOMPLETE_SPLIT(lpageop));
187 				fastpath = true;
188 			}
189 			else
190 			{
191 				_bt_relbuf(rel, buf);
192 
193 				/*
194 				 * Something did not work out. Just forget about the cached
195 				 * block and follow the normal path. It might be set again if
196 				 * the conditions are favourable.
197 				 */
198 				RelationSetTargetBlock(rel, InvalidBlockNumber);
199 			}
200 		}
201 		else
202 		{
203 			ReleaseBuffer(buf);
204 
205 			/*
206 			 * If someone's holding a lock, it's likely to change anyway, so
207 			 * don't try again until we get an updated rightmost leaf.
208 			 */
209 			RelationSetTargetBlock(rel, InvalidBlockNumber);
210 		}
211 	}
212 
213 	if (!fastpath)
214 	{
215 		/*
216 		 * Find the first page containing this key.  Buffer returned by
217 		 * _bt_search() is locked in exclusive mode.
218 		 */
219 		stack = _bt_search(rel, itup_key, &buf, BT_WRITE, NULL);
220 	}
221 
222 	insertstate.buf = buf;
223 	buf = InvalidBuffer;		/* insertstate.buf now owns the buffer */
224 
225 	/*
226 	 * If we're not allowing duplicates, make sure the key isn't already in
227 	 * the index.
228 	 *
229 	 * NOTE: obviously, _bt_check_unique can only detect keys that are already
230 	 * in the index; so it cannot defend against concurrent insertions of the
231 	 * same key.  We protect against that by means of holding a write lock on
232 	 * the first page the value could be on, with omitted/-inf value for the
233 	 * implicit heap TID tiebreaker attribute.  Any other would-be inserter of
234 	 * the same key must acquire a write lock on the same page, so only one
235 	 * would-be inserter can be making the check at one time.  Furthermore,
236 	 * once we are past the check we hold write locks continuously until we
237 	 * have performed our insertion, so no later inserter can fail to see our
238 	 * insertion.  (This requires some care in _bt_findinsertloc.)
239 	 *
240 	 * If we must wait for another xact, we release the lock while waiting,
241 	 * and then must start over completely.
242 	 *
243 	 * For a partial uniqueness check, we don't wait for the other xact. Just
244 	 * let the tuple in and return false for possibly non-unique, or true for
245 	 * definitely unique.
246 	 */
247 	if (checkingunique)
248 	{
249 		TransactionId xwait;
250 		uint32		speculativeToken;
251 
252 		xwait = _bt_check_unique(rel, &insertstate, heapRel, checkUnique,
253 								 &is_unique, &speculativeToken);
254 
255 		if (TransactionIdIsValid(xwait))
256 		{
257 			/* Have to wait for the other guy ... */
258 			_bt_relbuf(rel, insertstate.buf);
259 			insertstate.buf = InvalidBuffer;
260 
261 			/*
262 			 * If it's a speculative insertion, wait for it to finish (ie. to
263 			 * go ahead with the insertion, or kill the tuple).  Otherwise
264 			 * wait for the transaction to finish as usual.
265 			 */
266 			if (speculativeToken)
267 				SpeculativeInsertionWait(xwait, speculativeToken);
268 			else
269 				XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
270 
271 			/* start over... */
272 			if (stack)
273 				_bt_freestack(stack);
274 			goto top;
275 		}
276 
277 		/* Uniqueness is established -- restore heap tid as scantid */
278 		if (itup_key->heapkeyspace)
279 			itup_key->scantid = &itup->t_tid;
280 	}
281 
282 	if (checkUnique != UNIQUE_CHECK_EXISTING)
283 	{
284 		OffsetNumber newitemoff;
285 
286 		/*
287 		 * The only conflict predicate locking cares about for indexes is when
288 		 * an index tuple insert conflicts with an existing lock.  We don't
289 		 * know the actual page we're going to insert on for sure just yet in
290 		 * checkingunique and !heapkeyspace cases, but it's okay to use the
291 		 * first page the value could be on (with scantid omitted) instead.
292 		 */
293 		CheckForSerializableConflictIn(rel, NULL, insertstate.buf);
294 
295 		/*
296 		 * Do the insertion.  Note that insertstate contains cached binary
297 		 * search bounds established within _bt_check_unique when insertion is
298 		 * checkingunique.
299 		 */
300 		newitemoff = _bt_findinsertloc(rel, &insertstate, checkingunique,
301 									   stack, heapRel);
302 		_bt_insertonpg(rel, itup_key, insertstate.buf, InvalidBuffer, stack,
303 					   itup, newitemoff, false);
304 	}
305 	else
306 	{
307 		/* just release the buffer */
308 		_bt_relbuf(rel, insertstate.buf);
309 	}
310 
311 	/* be tidy */
312 	if (stack)
313 		_bt_freestack(stack);
314 	pfree(itup_key);
315 
316 	return is_unique;
317 }
318 
319 /*
320  *	_bt_check_unique() -- Check for violation of unique index constraint
321  *
322  * Returns InvalidTransactionId if there is no conflict, else an xact ID
323  * we must wait for to see if it commits a conflicting tuple.   If an actual
324  * conflict is detected, no return --- just ereport().  If an xact ID is
325  * returned, and the conflicting tuple still has a speculative insertion in
326  * progress, *speculativeToken is set to non-zero, and the caller can wait for
327  * the verdict on the insertion using SpeculativeInsertionWait().
328  *
329  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
330  * InvalidTransactionId because we don't want to wait.  In this case we
331  * set *is_unique to false if there is a potential conflict, and the
332  * core code must redo the uniqueness check later.
333  *
334  * As a side-effect, sets state in insertstate that can later be used by
335  * _bt_findinsertloc() to reuse most of the binary search work we do
336  * here.
337  *
338  * Do not call here when there are NULL values in scan key.  NULL should be
339  * considered unequal to NULL when checking for duplicates, but we are not
340  * prepared to handle that correctly.
341  */
342 static TransactionId
_bt_check_unique(Relation rel,BTInsertState insertstate,Relation heapRel,IndexUniqueCheck checkUnique,bool * is_unique,uint32 * speculativeToken)343 _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
344 				 IndexUniqueCheck checkUnique, bool *is_unique,
345 				 uint32 *speculativeToken)
346 {
347 	IndexTuple	itup = insertstate->itup;
348 	BTScanInsert itup_key = insertstate->itup_key;
349 	SnapshotData SnapshotDirty;
350 	OffsetNumber offset;
351 	OffsetNumber maxoff;
352 	Page		page;
353 	BTPageOpaque opaque;
354 	Buffer		nbuf = InvalidBuffer;
355 	bool		found = false;
356 
357 	/* Assume unique until we find a duplicate */
358 	*is_unique = true;
359 
360 	InitDirtySnapshot(SnapshotDirty);
361 
362 	page = BufferGetPage(insertstate->buf);
363 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
364 	maxoff = PageGetMaxOffsetNumber(page);
365 
366 	/*
367 	 * Find the first tuple with the same key.
368 	 *
369 	 * This also saves the binary search bounds in insertstate.  We use them
370 	 * in the fastpath below, but also in the _bt_findinsertloc() call later.
371 	 */
372 	Assert(!insertstate->bounds_valid);
373 	offset = _bt_binsrch_insert(rel, insertstate);
374 
375 	/*
376 	 * Scan over all equal tuples, looking for live conflicts.
377 	 */
378 	Assert(!insertstate->bounds_valid || insertstate->low == offset);
379 	Assert(!itup_key->anynullkeys);
380 	Assert(itup_key->scantid == NULL);
381 	for (;;)
382 	{
383 		ItemId		curitemid;
384 		IndexTuple	curitup;
385 		BlockNumber nblkno;
386 
387 		/*
388 		 * make sure the offset points to an actual item before trying to
389 		 * examine it...
390 		 */
391 		if (offset <= maxoff)
392 		{
393 			/*
394 			 * Fastpath: In most cases, we can use cached search bounds to
395 			 * limit our consideration to items that are definitely
396 			 * duplicates.  This fastpath doesn't apply when the original page
397 			 * is empty, or when initial offset is past the end of the
398 			 * original page, which may indicate that we need to examine a
399 			 * second or subsequent page.
400 			 *
401 			 * Note that this optimization allows us to avoid calling
402 			 * _bt_compare() directly when there are no duplicates, as long as
403 			 * the offset where the key will go is not at the end of the page.
404 			 */
405 			if (nbuf == InvalidBuffer && offset == insertstate->stricthigh)
406 			{
407 				Assert(insertstate->bounds_valid);
408 				Assert(insertstate->low >= P_FIRSTDATAKEY(opaque));
409 				Assert(insertstate->low <= insertstate->stricthigh);
410 				Assert(_bt_compare(rel, itup_key, page, offset) < 0);
411 				break;
412 			}
413 
414 			curitemid = PageGetItemId(page, offset);
415 
416 			/*
417 			 * We can skip items that are marked killed.
418 			 *
419 			 * In the presence of heavy update activity an index may contain
420 			 * many killed items with the same key; running _bt_compare() on
421 			 * each killed item gets expensive.  Just advance over killed
422 			 * items as quickly as we can.  We only apply _bt_compare() when
423 			 * we get to a non-killed item.  Even those comparisons could be
424 			 * avoided (in the common case where there is only one page to
425 			 * visit) by reusing bounds, but just skipping dead items is fast
426 			 * enough.
427 			 */
428 			if (!ItemIdIsDead(curitemid))
429 			{
430 				ItemPointerData htid;
431 				bool		all_dead;
432 
433 				if (_bt_compare(rel, itup_key, page, offset) != 0)
434 					break;		/* we're past all the equal tuples */
435 
436 				/* okay, we gotta fetch the heap tuple ... */
437 				curitup = (IndexTuple) PageGetItem(page, curitemid);
438 				htid = curitup->t_tid;
439 
440 				/*
441 				 * If we are doing a recheck, we expect to find the tuple we
442 				 * are rechecking.  It's not a duplicate, but we have to keep
443 				 * scanning.
444 				 */
445 				if (checkUnique == UNIQUE_CHECK_EXISTING &&
446 					ItemPointerCompare(&htid, &itup->t_tid) == 0)
447 				{
448 					found = true;
449 				}
450 
451 				/*
452 				 * Check if there's any table tuples for this index entry
453 				 * satisfying SnapshotDirty. This is necessary because for AMs
454 				 * with optimizations like heap's HOT, we have just a single
455 				 * index entry for the entire chain.
456 				 */
457 				else if (table_index_fetch_tuple_check(heapRel, &htid,
458 													   &SnapshotDirty,
459 													   &all_dead))
460 				{
461 					TransactionId xwait;
462 
463 					/*
464 					 * It is a duplicate. If we are only doing a partial
465 					 * check, then don't bother checking if the tuple is being
466 					 * updated in another transaction. Just return the fact
467 					 * that it is a potential conflict and leave the full
468 					 * check till later. Don't invalidate binary search
469 					 * bounds.
470 					 */
471 					if (checkUnique == UNIQUE_CHECK_PARTIAL)
472 					{
473 						if (nbuf != InvalidBuffer)
474 							_bt_relbuf(rel, nbuf);
475 						*is_unique = false;
476 						return InvalidTransactionId;
477 					}
478 
479 					/*
480 					 * If this tuple is being updated by other transaction
481 					 * then we have to wait for its commit/abort.
482 					 */
483 					xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
484 						SnapshotDirty.xmin : SnapshotDirty.xmax;
485 
486 					if (TransactionIdIsValid(xwait))
487 					{
488 						if (nbuf != InvalidBuffer)
489 							_bt_relbuf(rel, nbuf);
490 						/* Tell _bt_doinsert to wait... */
491 						*speculativeToken = SnapshotDirty.speculativeToken;
492 						/* Caller releases lock on buf immediately */
493 						insertstate->bounds_valid = false;
494 						return xwait;
495 					}
496 
497 					/*
498 					 * Otherwise we have a definite conflict.  But before
499 					 * complaining, look to see if the tuple we want to insert
500 					 * is itself now committed dead --- if so, don't complain.
501 					 * This is a waste of time in normal scenarios but we must
502 					 * do it to support CREATE INDEX CONCURRENTLY.
503 					 *
504 					 * We must follow HOT-chains here because during
505 					 * concurrent index build, we insert the root TID though
506 					 * the actual tuple may be somewhere in the HOT-chain.
507 					 * While following the chain we might not stop at the
508 					 * exact tuple which triggered the insert, but that's OK
509 					 * because if we find a live tuple anywhere in this chain,
510 					 * we have a unique key conflict.  The other live tuple is
511 					 * not part of this chain because it had a different index
512 					 * entry.
513 					 */
514 					htid = itup->t_tid;
515 					if (table_index_fetch_tuple_check(heapRel, &htid,
516 													  SnapshotSelf, NULL))
517 					{
518 						/* Normal case --- it's still live */
519 					}
520 					else
521 					{
522 						/*
523 						 * It's been deleted, so no error, and no need to
524 						 * continue searching
525 						 */
526 						break;
527 					}
528 
529 					/*
530 					 * Check for a conflict-in as we would if we were going to
531 					 * write to this page.  We aren't actually going to write,
532 					 * but we want a chance to report SSI conflicts that would
533 					 * otherwise be masked by this unique constraint
534 					 * violation.
535 					 */
536 					CheckForSerializableConflictIn(rel, NULL, insertstate->buf);
537 
538 					/*
539 					 * This is a definite conflict.  Break the tuple down into
540 					 * datums and report the error.  But first, make sure we
541 					 * release the buffer locks we're holding ---
542 					 * BuildIndexValueDescription could make catalog accesses,
543 					 * which in the worst case might touch this same index and
544 					 * cause deadlocks.
545 					 */
546 					if (nbuf != InvalidBuffer)
547 						_bt_relbuf(rel, nbuf);
548 					_bt_relbuf(rel, insertstate->buf);
549 					insertstate->buf = InvalidBuffer;
550 					insertstate->bounds_valid = false;
551 
552 					{
553 						Datum		values[INDEX_MAX_KEYS];
554 						bool		isnull[INDEX_MAX_KEYS];
555 						char	   *key_desc;
556 
557 						index_deform_tuple(itup, RelationGetDescr(rel),
558 										   values, isnull);
559 
560 						key_desc = BuildIndexValueDescription(rel, values,
561 															  isnull);
562 
563 						ereport(ERROR,
564 								(errcode(ERRCODE_UNIQUE_VIOLATION),
565 								 errmsg("duplicate key value violates unique constraint \"%s\"",
566 										RelationGetRelationName(rel)),
567 								 key_desc ? errdetail("Key %s already exists.",
568 													  key_desc) : 0,
569 								 errtableconstraint(heapRel,
570 													RelationGetRelationName(rel))));
571 					}
572 				}
573 				else if (all_dead)
574 				{
575 					/*
576 					 * The conflicting tuple (or whole HOT chain) is dead to
577 					 * everyone, so we may as well mark the index entry
578 					 * killed.
579 					 */
580 					ItemIdMarkDead(curitemid);
581 					opaque->btpo_flags |= BTP_HAS_GARBAGE;
582 
583 					/*
584 					 * Mark buffer with a dirty hint, since state is not
585 					 * crucial. Be sure to mark the proper buffer dirty.
586 					 */
587 					if (nbuf != InvalidBuffer)
588 						MarkBufferDirtyHint(nbuf, true);
589 					else
590 						MarkBufferDirtyHint(insertstate->buf, true);
591 				}
592 			}
593 		}
594 
595 		/*
596 		 * Advance to next tuple to continue checking.
597 		 */
598 		if (offset < maxoff)
599 			offset = OffsetNumberNext(offset);
600 		else
601 		{
602 			int			highkeycmp;
603 
604 			/* If scankey == hikey we gotta check the next page too */
605 			if (P_RIGHTMOST(opaque))
606 				break;
607 			highkeycmp = _bt_compare(rel, itup_key, page, P_HIKEY);
608 			Assert(highkeycmp <= 0);
609 			if (highkeycmp != 0)
610 				break;
611 			/* Advance to next non-dead page --- there must be one */
612 			for (;;)
613 			{
614 				nblkno = opaque->btpo_next;
615 				nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
616 				page = BufferGetPage(nbuf);
617 				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
618 				if (!P_IGNORE(opaque))
619 					break;
620 				if (P_RIGHTMOST(opaque))
621 					elog(ERROR, "fell off the end of index \"%s\"",
622 						 RelationGetRelationName(rel));
623 			}
624 			maxoff = PageGetMaxOffsetNumber(page);
625 			offset = P_FIRSTDATAKEY(opaque);
626 			/* Don't invalidate binary search bounds */
627 		}
628 	}
629 
630 	/*
631 	 * If we are doing a recheck then we should have found the tuple we are
632 	 * checking.  Otherwise there's something very wrong --- probably, the
633 	 * index is on a non-immutable expression.
634 	 */
635 	if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
636 		ereport(ERROR,
637 				(errcode(ERRCODE_INTERNAL_ERROR),
638 				 errmsg("failed to re-find tuple within index \"%s\"",
639 						RelationGetRelationName(rel)),
640 				 errhint("This may be because of a non-immutable index expression."),
641 				 errtableconstraint(heapRel,
642 									RelationGetRelationName(rel))));
643 
644 	if (nbuf != InvalidBuffer)
645 		_bt_relbuf(rel, nbuf);
646 
647 	return InvalidTransactionId;
648 }
649 
650 
651 /*
652  *	_bt_findinsertloc() -- Finds an insert location for a tuple
653  *
654  *		On entry, insertstate buffer contains the page the new tuple belongs
655  *		on.  It is exclusive-locked and pinned by the caller.
656  *
657  *		If 'checkingunique' is true, the buffer on entry is the first page
658  *		that contains duplicates of the new key.  If there are duplicates on
659  *		multiple pages, the correct insertion position might be some page to
660  *		the right, rather than the first page.  In that case, this function
661  *		moves right to the correct target page.
662  *
663  *		(In a !heapkeyspace index, there can be multiple pages with the same
664  *		high key, where the new tuple could legitimately be placed on.  In
665  *		that case, the caller passes the first page containing duplicates,
666  *		just like when checkingunique=true.  If that page doesn't have enough
667  *		room for the new tuple, this function moves right, trying to find a
668  *		legal page that does.)
669  *
670  *		On exit, insertstate buffer contains the chosen insertion page, and
671  *		the offset within that page is returned.  If _bt_findinsertloc needed
672  *		to move right, the lock and pin on the original page are released, and
673  *		the new buffer is exclusively locked and pinned instead.
674  *
675  *		If insertstate contains cached binary search bounds, we will take
676  *		advantage of them.  This avoids repeating comparisons that we made in
677  *		_bt_check_unique() already.
678  *
679  *		If there is not enough room on the page for the new tuple, we try to
680  *		make room by removing any LP_DEAD tuples.
681  */
682 static OffsetNumber
_bt_findinsertloc(Relation rel,BTInsertState insertstate,bool checkingunique,BTStack stack,Relation heapRel)683 _bt_findinsertloc(Relation rel,
684 				  BTInsertState insertstate,
685 				  bool checkingunique,
686 				  BTStack stack,
687 				  Relation heapRel)
688 {
689 	BTScanInsert itup_key = insertstate->itup_key;
690 	Page		page = BufferGetPage(insertstate->buf);
691 	BTPageOpaque lpageop;
692 
693 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
694 
695 	/* Check 1/3 of a page restriction */
696 	if (unlikely(insertstate->itemsz > BTMaxItemSize(page)))
697 		_bt_check_third_page(rel, heapRel, itup_key->heapkeyspace, page,
698 							 insertstate->itup);
699 
700 	Assert(P_ISLEAF(lpageop) && !P_INCOMPLETE_SPLIT(lpageop));
701 	Assert(!insertstate->bounds_valid || checkingunique);
702 	Assert(!itup_key->heapkeyspace || itup_key->scantid != NULL);
703 	Assert(itup_key->heapkeyspace || itup_key->scantid == NULL);
704 
705 	if (itup_key->heapkeyspace)
706 	{
707 		/*
708 		 * If we're inserting into a unique index, we may have to walk right
709 		 * through leaf pages to find the one leaf page that we must insert on
710 		 * to.
711 		 *
712 		 * This is needed for checkingunique callers because a scantid was not
713 		 * used when we called _bt_search().  scantid can only be set after
714 		 * _bt_check_unique() has checked for duplicates.  The buffer
715 		 * initially stored in insertstate->buf has the page where the first
716 		 * duplicate key might be found, which isn't always the page that new
717 		 * tuple belongs on.  The heap TID attribute for new tuple (scantid)
718 		 * could force us to insert on a sibling page, though that should be
719 		 * very rare in practice.
720 		 */
721 		if (checkingunique)
722 		{
723 			for (;;)
724 			{
725 				/*
726 				 * Does the new tuple belong on this page?
727 				 *
728 				 * The earlier _bt_check_unique() call may well have
729 				 * established a strict upper bound on the offset for the new
730 				 * item.  If it's not the last item of the page (i.e. if there
731 				 * is at least one tuple on the page that goes after the tuple
732 				 * we're inserting) then we know that the tuple belongs on
733 				 * this page.  We can skip the high key check.
734 				 */
735 				if (insertstate->bounds_valid &&
736 					insertstate->low <= insertstate->stricthigh &&
737 					insertstate->stricthigh <= PageGetMaxOffsetNumber(page))
738 					break;
739 
740 				/* Test '<=', not '!=', since scantid is set now */
741 				if (P_RIGHTMOST(lpageop) ||
742 					_bt_compare(rel, itup_key, page, P_HIKEY) <= 0)
743 					break;
744 
745 				_bt_stepright(rel, insertstate, stack);
746 				/* Update local state after stepping right */
747 				page = BufferGetPage(insertstate->buf);
748 				lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
749 			}
750 		}
751 
752 		/*
753 		 * If the target page is full, see if we can obtain enough space by
754 		 * erasing LP_DEAD items
755 		 */
756 		if (PageGetFreeSpace(page) < insertstate->itemsz &&
757 			P_HAS_GARBAGE(lpageop))
758 		{
759 			_bt_vacuum_one_page(rel, insertstate->buf, heapRel);
760 			insertstate->bounds_valid = false;
761 		}
762 	}
763 	else
764 	{
765 		/*----------
766 		 * This is a !heapkeyspace (version 2 or 3) index.  The current page
767 		 * is the first page that we could insert the new tuple to, but there
768 		 * may be other pages to the right that we could opt to use instead.
769 		 *
770 		 * If the new key is equal to one or more existing keys, we can
771 		 * legitimately place it anywhere in the series of equal keys.  In
772 		 * fact, if the new key is equal to the page's "high key" we can place
773 		 * it on the next page.  If it is equal to the high key, and there's
774 		 * not room to insert the new tuple on the current page without
775 		 * splitting, then we move right hoping to find more free space and
776 		 * avoid a split.
777 		 *
778 		 * Keep scanning right until we
779 		 *		(a) find a page with enough free space,
780 		 *		(b) reach the last page where the tuple can legally go, or
781 		 *		(c) get tired of searching.
782 		 * (c) is not flippant; it is important because if there are many
783 		 * pages' worth of equal keys, it's better to split one of the early
784 		 * pages than to scan all the way to the end of the run of equal keys
785 		 * on every insert.  We implement "get tired" as a random choice,
786 		 * since stopping after scanning a fixed number of pages wouldn't work
787 		 * well (we'd never reach the right-hand side of previously split
788 		 * pages).  The probability of moving right is set at 0.99, which may
789 		 * seem too high to change the behavior much, but it does an excellent
790 		 * job of preventing O(N^2) behavior with many equal keys.
791 		 *----------
792 		 */
793 		while (PageGetFreeSpace(page) < insertstate->itemsz)
794 		{
795 			/*
796 			 * Before considering moving right, see if we can obtain enough
797 			 * space by erasing LP_DEAD items
798 			 */
799 			if (P_HAS_GARBAGE(lpageop))
800 			{
801 				_bt_vacuum_one_page(rel, insertstate->buf, heapRel);
802 				insertstate->bounds_valid = false;
803 
804 				if (PageGetFreeSpace(page) >= insertstate->itemsz)
805 					break;		/* OK, now we have enough space */
806 			}
807 
808 			/*
809 			 * Nope, so check conditions (b) and (c) enumerated above
810 			 *
811 			 * The earlier _bt_check_unique() call may well have established a
812 			 * strict upper bound on the offset for the new item.  If it's not
813 			 * the last item of the page (i.e. if there is at least one tuple
814 			 * on the page that's greater than the tuple we're inserting to)
815 			 * then we know that the tuple belongs on this page.  We can skip
816 			 * the high key check.
817 			 */
818 			if (insertstate->bounds_valid &&
819 				insertstate->low <= insertstate->stricthigh &&
820 				insertstate->stricthigh <= PageGetMaxOffsetNumber(page))
821 				break;
822 
823 			if (P_RIGHTMOST(lpageop) ||
824 				_bt_compare(rel, itup_key, page, P_HIKEY) != 0 ||
825 				random() <= (MAX_RANDOM_VALUE / 100))
826 				break;
827 
828 			_bt_stepright(rel, insertstate, stack);
829 			/* Update local state after stepping right */
830 			page = BufferGetPage(insertstate->buf);
831 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
832 		}
833 	}
834 
835 	/*
836 	 * We should now be on the correct page.  Find the offset within the page
837 	 * for the new tuple. (Possibly reusing earlier search bounds.)
838 	 */
839 	Assert(P_RIGHTMOST(lpageop) ||
840 		   _bt_compare(rel, itup_key, page, P_HIKEY) <= 0);
841 
842 	return _bt_binsrch_insert(rel, insertstate);
843 }
844 
845 /*
846  * Step right to next non-dead page, during insertion.
847  *
848  * This is a bit more complicated than moving right in a search.  We must
849  * write-lock the target page before releasing write lock on current page;
850  * else someone else's _bt_check_unique scan could fail to see our insertion.
851  * Write locks on intermediate dead pages won't do because we don't know when
852  * they will get de-linked from the tree.
853  *
854  * This is more aggressive than it needs to be for non-unique !heapkeyspace
855  * indexes.
856  */
857 static void
_bt_stepright(Relation rel,BTInsertState insertstate,BTStack stack)858 _bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack)
859 {
860 	Page		page;
861 	BTPageOpaque lpageop;
862 	Buffer		rbuf;
863 	BlockNumber rblkno;
864 
865 	page = BufferGetPage(insertstate->buf);
866 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
867 
868 	rbuf = InvalidBuffer;
869 	rblkno = lpageop->btpo_next;
870 	for (;;)
871 	{
872 		rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
873 		page = BufferGetPage(rbuf);
874 		lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
875 
876 		/*
877 		 * If this page was incompletely split, finish the split now.  We do
878 		 * this while holding a lock on the left sibling, which is not good
879 		 * because finishing the split could be a fairly lengthy operation.
880 		 * But this should happen very seldom.
881 		 */
882 		if (P_INCOMPLETE_SPLIT(lpageop))
883 		{
884 			_bt_finish_split(rel, rbuf, stack);
885 			rbuf = InvalidBuffer;
886 			continue;
887 		}
888 
889 		if (!P_IGNORE(lpageop))
890 			break;
891 		if (P_RIGHTMOST(lpageop))
892 			elog(ERROR, "fell off the end of index \"%s\"",
893 				 RelationGetRelationName(rel));
894 
895 		rblkno = lpageop->btpo_next;
896 	}
897 	/* rbuf locked; unlock buf, update state for caller */
898 	_bt_relbuf(rel, insertstate->buf);
899 	insertstate->buf = rbuf;
900 	insertstate->bounds_valid = false;
901 }
902 
903 /*----------
904  *	_bt_insertonpg() -- Insert a tuple on a particular page in the index.
905  *
906  *		This recursive procedure does the following things:
907  *
908  *			+  if necessary, splits the target page, using 'itup_key' for
909  *			   suffix truncation on leaf pages (caller passes NULL for
910  *			   non-leaf pages).
911  *			+  inserts the tuple.
912  *			+  if the page was split, pops the parent stack, and finds the
913  *			   right place to insert the new child pointer (by walking
914  *			   right using information stored in the parent stack).
915  *			+  invokes itself with the appropriate tuple for the right
916  *			   child page on the parent.
917  *			+  updates the metapage if a true root or fast root is split.
918  *
919  *		On entry, we must have the correct buffer in which to do the
920  *		insertion, and the buffer must be pinned and write-locked.  On return,
921  *		we will have dropped both the pin and the lock on the buffer.
922  *
923  *		This routine only performs retail tuple insertions.  'itup' should
924  *		always be either a non-highkey leaf item, or a downlink (new high
925  *		key items are created indirectly, when a page is split).  When
926  *		inserting to a non-leaf page, 'cbuf' is the left-sibling of the page
927  *		we're inserting the downlink for.  This function will clear the
928  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
929  *----------
930  */
931 static void
_bt_insertonpg(Relation rel,BTScanInsert itup_key,Buffer buf,Buffer cbuf,BTStack stack,IndexTuple itup,OffsetNumber newitemoff,bool split_only_page)932 _bt_insertonpg(Relation rel,
933 			   BTScanInsert itup_key,
934 			   Buffer buf,
935 			   Buffer cbuf,
936 			   BTStack stack,
937 			   IndexTuple itup,
938 			   OffsetNumber newitemoff,
939 			   bool split_only_page)
940 {
941 	Page		page;
942 	BTPageOpaque lpageop;
943 	Size		itemsz;
944 
945 	page = BufferGetPage(buf);
946 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
947 
948 	/* child buffer must be given iff inserting on an internal page */
949 	Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
950 	/* tuple must have appropriate number of attributes */
951 	Assert(!P_ISLEAF(lpageop) ||
952 		   BTreeTupleGetNAtts(itup, rel) ==
953 		   IndexRelationGetNumberOfAttributes(rel));
954 	Assert(P_ISLEAF(lpageop) ||
955 		   BTreeTupleGetNAtts(itup, rel) <=
956 		   IndexRelationGetNumberOfKeyAttributes(rel));
957 
958 	/* The caller should've finished any incomplete splits already. */
959 	if (P_INCOMPLETE_SPLIT(lpageop))
960 		elog(ERROR, "cannot insert to incompletely split page %u",
961 			 BufferGetBlockNumber(buf));
962 
963 	itemsz = IndexTupleSize(itup);
964 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
965 								 * need to be consistent */
966 
967 	/*
968 	 * Do we need to split the page to fit the item on it?
969 	 *
970 	 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
971 	 * so this comparison is correct even though we appear to be accounting
972 	 * only for the item and not for its line pointer.
973 	 */
974 	if (PageGetFreeSpace(page) < itemsz)
975 	{
976 		bool		is_root = P_ISROOT(lpageop);
977 		bool		is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
978 		Buffer		rbuf;
979 
980 		/*
981 		 * If we're here then a pagesplit is needed. We should never reach
982 		 * here if we're using the fastpath since we should have checked for
983 		 * all the required conditions, including the fact that this page has
984 		 * enough freespace. Note that this routine can in theory deal with
985 		 * the situation where a NULL stack pointer is passed (that's what
986 		 * would happen if the fastpath is taken). But that path is much
987 		 * slower, defeating the very purpose of the optimization.  The
988 		 * following assertion should protect us from any future code changes
989 		 * that invalidate those assumptions.
990 		 *
991 		 * Note that whenever we fail to take the fastpath, we clear the
992 		 * cached block. Checking for a valid cached block at this point is
993 		 * enough to decide whether we're in a fastpath or not.
994 		 */
995 		Assert(!(P_ISLEAF(lpageop) &&
996 				 BlockNumberIsValid(RelationGetTargetBlock(rel))));
997 
998 		/* split the buffer into left and right halves */
999 		rbuf = _bt_split(rel, itup_key, buf, cbuf, newitemoff, itemsz, itup);
1000 		PredicateLockPageSplit(rel,
1001 							   BufferGetBlockNumber(buf),
1002 							   BufferGetBlockNumber(rbuf));
1003 
1004 		/*----------
1005 		 * By here,
1006 		 *
1007 		 *		+  our target page has been split;
1008 		 *		+  the original tuple has been inserted;
1009 		 *		+  we have write locks on both the old (left half)
1010 		 *		   and new (right half) buffers, after the split; and
1011 		 *		+  we know the key we want to insert into the parent
1012 		 *		   (it's the "high key" on the left child page).
1013 		 *
1014 		 * We're ready to do the parent insertion.  We need to hold onto the
1015 		 * locks for the child pages until we locate the parent, but we can
1016 		 * at least release the lock on the right child before doing the
1017 		 * actual insertion.  The lock on the left child will be released
1018 		 * last of all by parent insertion, where it is the 'cbuf' of parent
1019 		 * page.
1020 		 *----------
1021 		 */
1022 		_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
1023 	}
1024 	else
1025 	{
1026 		Buffer		metabuf = InvalidBuffer;
1027 		Page		metapg = NULL;
1028 		BTMetaPageData *metad = NULL;
1029 		OffsetNumber itup_off;
1030 		BlockNumber itup_blkno;
1031 		BlockNumber cachedBlock = InvalidBlockNumber;
1032 
1033 		itup_off = newitemoff;
1034 		itup_blkno = BufferGetBlockNumber(buf);
1035 
1036 		/*
1037 		 * If we are doing this insert because we split a page that was the
1038 		 * only one on its tree level, but was not the root, it may have been
1039 		 * the "fast root".  We need to ensure that the fast root link points
1040 		 * at or above the current page.  We can safely acquire a lock on the
1041 		 * metapage here --- see comments for _bt_newroot().
1042 		 */
1043 		if (split_only_page)
1044 		{
1045 			Assert(!P_ISLEAF(lpageop));
1046 
1047 			metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1048 			metapg = BufferGetPage(metabuf);
1049 			metad = BTPageGetMeta(metapg);
1050 
1051 			if (metad->btm_fastlevel >= lpageop->btpo.level)
1052 			{
1053 				/* no update wanted */
1054 				_bt_relbuf(rel, metabuf);
1055 				metabuf = InvalidBuffer;
1056 			}
1057 		}
1058 
1059 		/*
1060 		 * Every internal page should have exactly one negative infinity item
1061 		 * at all times.  Only _bt_split() and _bt_newroot() should add items
1062 		 * that become negative infinity items through truncation, since
1063 		 * they're the only routines that allocate new internal pages.  Do not
1064 		 * allow a retail insertion of a new item at the negative infinity
1065 		 * offset.
1066 		 */
1067 		if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
1068 			elog(ERROR, "cannot insert second negative infinity item in block %u of index \"%s\"",
1069 				 itup_blkno, RelationGetRelationName(rel));
1070 
1071 		/* Do the update.  No ereport(ERROR) until changes are logged */
1072 		START_CRIT_SECTION();
1073 
1074 		if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
1075 			elog(PANIC, "failed to add new item to block %u in index \"%s\"",
1076 				 itup_blkno, RelationGetRelationName(rel));
1077 
1078 		MarkBufferDirty(buf);
1079 
1080 		if (BufferIsValid(metabuf))
1081 		{
1082 			/* upgrade meta-page if needed */
1083 			if (metad->btm_version < BTREE_NOVAC_VERSION)
1084 				_bt_upgrademetapage(metapg);
1085 			metad->btm_fastroot = itup_blkno;
1086 			metad->btm_fastlevel = lpageop->btpo.level;
1087 			MarkBufferDirty(metabuf);
1088 		}
1089 
1090 		/* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
1091 		if (BufferIsValid(cbuf))
1092 		{
1093 			Page		cpage = BufferGetPage(cbuf);
1094 			BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1095 
1096 			Assert(P_INCOMPLETE_SPLIT(cpageop));
1097 			cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1098 			MarkBufferDirty(cbuf);
1099 		}
1100 
1101 		/*
1102 		 * Cache the block information if we just inserted into the rightmost
1103 		 * leaf page of the index and it's not the root page.  For very small
1104 		 * index where root is also the leaf, there is no point trying for any
1105 		 * optimization.
1106 		 */
1107 		if (P_RIGHTMOST(lpageop) && P_ISLEAF(lpageop) && !P_ISROOT(lpageop))
1108 			cachedBlock = BufferGetBlockNumber(buf);
1109 
1110 		/* XLOG stuff */
1111 		if (RelationNeedsWAL(rel))
1112 		{
1113 			xl_btree_insert xlrec;
1114 			xl_btree_metadata xlmeta;
1115 			uint8		xlinfo;
1116 			XLogRecPtr	recptr;
1117 
1118 			xlrec.offnum = itup_off;
1119 
1120 			XLogBeginInsert();
1121 			XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
1122 
1123 			if (P_ISLEAF(lpageop))
1124 				xlinfo = XLOG_BTREE_INSERT_LEAF;
1125 			else
1126 			{
1127 				/*
1128 				 * Register the left child whose INCOMPLETE_SPLIT flag was
1129 				 * cleared.
1130 				 */
1131 				XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
1132 
1133 				xlinfo = XLOG_BTREE_INSERT_UPPER;
1134 			}
1135 
1136 			if (BufferIsValid(metabuf))
1137 			{
1138 				Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
1139 				xlmeta.version = metad->btm_version;
1140 				xlmeta.root = metad->btm_root;
1141 				xlmeta.level = metad->btm_level;
1142 				xlmeta.fastroot = metad->btm_fastroot;
1143 				xlmeta.fastlevel = metad->btm_fastlevel;
1144 				xlmeta.oldest_btpo_xact = metad->btm_oldest_btpo_xact;
1145 				xlmeta.last_cleanup_num_heap_tuples =
1146 					metad->btm_last_cleanup_num_heap_tuples;
1147 
1148 				XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
1149 				XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
1150 
1151 				xlinfo = XLOG_BTREE_INSERT_META;
1152 			}
1153 
1154 			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1155 			XLogRegisterBufData(0, (char *) itup, IndexTupleSize(itup));
1156 
1157 			recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1158 
1159 			if (BufferIsValid(metabuf))
1160 			{
1161 				PageSetLSN(metapg, recptr);
1162 			}
1163 			if (BufferIsValid(cbuf))
1164 			{
1165 				PageSetLSN(BufferGetPage(cbuf), recptr);
1166 			}
1167 
1168 			PageSetLSN(page, recptr);
1169 		}
1170 
1171 		END_CRIT_SECTION();
1172 
1173 		/* release buffers */
1174 		if (BufferIsValid(metabuf))
1175 			_bt_relbuf(rel, metabuf);
1176 		if (BufferIsValid(cbuf))
1177 			_bt_relbuf(rel, cbuf);
1178 		_bt_relbuf(rel, buf);
1179 
1180 		/*
1181 		 * If we decided to cache the insertion target block, then set it now.
1182 		 * But before that, check for the height of the tree and don't go for
1183 		 * the optimization for small indexes. We defer that check to this
1184 		 * point to ensure that we don't call _bt_getrootheight while holding
1185 		 * lock on any other block.
1186 		 *
1187 		 * We do this after dropping locks on all buffers. So the information
1188 		 * about whether the insertion block is still the rightmost block or
1189 		 * not may have changed in between. But we will deal with that during
1190 		 * next insert operation. No special care is required while setting
1191 		 * it.
1192 		 */
1193 		if (BlockNumberIsValid(cachedBlock) &&
1194 			_bt_getrootheight(rel) >= BTREE_FASTPATH_MIN_LEVEL)
1195 			RelationSetTargetBlock(rel, cachedBlock);
1196 	}
1197 }
1198 
1199 /*
1200  *	_bt_split() -- split a page in the btree.
1201  *
1202  *		On entry, buf is the page to split, and is pinned and write-locked.
1203  *		newitemoff etc. tell us about the new item that must be inserted
1204  *		along with the data from the original page.
1205  *
1206  *		itup_key is used for suffix truncation on leaf pages (internal
1207  *		page callers pass NULL).  When splitting a non-leaf page, 'cbuf'
1208  *		is the left-sibling of the page we're inserting the downlink for.
1209  *		This function will clear the INCOMPLETE_SPLIT flag on it, and
1210  *		release the buffer.
1211  *
1212  *		Returns the new right sibling of buf, pinned and write-locked.
1213  *		The pin and lock on buf are maintained.
1214  */
1215 static Buffer
_bt_split(Relation rel,BTScanInsert itup_key,Buffer buf,Buffer cbuf,OffsetNumber newitemoff,Size newitemsz,IndexTuple newitem)1216 _bt_split(Relation rel, BTScanInsert itup_key, Buffer buf, Buffer cbuf,
1217 		  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem)
1218 {
1219 	Buffer		rbuf;
1220 	Page		origpage;
1221 	Page		leftpage,
1222 				rightpage;
1223 	BlockNumber origpagenumber,
1224 				rightpagenumber;
1225 	BTPageOpaque ropaque,
1226 				lopaque,
1227 				oopaque;
1228 	Buffer		sbuf = InvalidBuffer;
1229 	Page		spage = NULL;
1230 	BTPageOpaque sopaque = NULL;
1231 	Size		itemsz;
1232 	ItemId		itemid;
1233 	IndexTuple	item;
1234 	OffsetNumber leftoff,
1235 				rightoff;
1236 	OffsetNumber firstright;
1237 	OffsetNumber maxoff;
1238 	OffsetNumber i;
1239 	bool		newitemonleft,
1240 				isleaf;
1241 	IndexTuple	lefthikey;
1242 	int			indnatts = IndexRelationGetNumberOfAttributes(rel);
1243 	int			indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
1244 
1245 	/*
1246 	 * origpage is the original page to be split.  leftpage is a temporary
1247 	 * buffer that receives the left-sibling data, which will be copied back
1248 	 * into origpage on success.  rightpage is the new page that will receive
1249 	 * the right-sibling data.
1250 	 *
1251 	 * leftpage is allocated after choosing a split point.  rightpage's new
1252 	 * buffer isn't acquired until after leftpage is initialized and has new
1253 	 * high key, the last point where splitting the page may fail (barring
1254 	 * corruption).  Failing before acquiring new buffer won't have lasting
1255 	 * consequences, since origpage won't have been modified and leftpage is
1256 	 * only workspace.
1257 	 */
1258 	origpage = BufferGetPage(buf);
1259 	oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1260 	origpagenumber = BufferGetBlockNumber(buf);
1261 
1262 	/*
1263 	 * Choose a point to split origpage at.
1264 	 *
1265 	 * A split point can be thought of as a point _between_ two existing
1266 	 * tuples on origpage (lastleft and firstright tuples), provided you
1267 	 * pretend that the new item that didn't fit is already on origpage.
1268 	 *
1269 	 * Since origpage does not actually contain newitem, the representation of
1270 	 * split points needs to work with two boundary cases: splits where
1271 	 * newitem is lastleft, and splits where newitem is firstright.
1272 	 * newitemonleft resolves the ambiguity that would otherwise exist when
1273 	 * newitemoff == firstright.  In all other cases it's clear which side of
1274 	 * the split every tuple goes on from context.  newitemonleft is usually
1275 	 * (but not always) redundant information.
1276 	 */
1277 	firstright = _bt_findsplitloc(rel, origpage, newitemoff, newitemsz,
1278 								  newitem, &newitemonleft);
1279 
1280 	/* Allocate temp buffer for leftpage */
1281 	leftpage = PageGetTempPage(origpage);
1282 	_bt_pageinit(leftpage, BufferGetPageSize(buf));
1283 	lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1284 
1285 	/*
1286 	 * leftpage won't be the root when we're done.  Also, clear the SPLIT_END
1287 	 * and HAS_GARBAGE flags.
1288 	 */
1289 	lopaque->btpo_flags = oopaque->btpo_flags;
1290 	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1291 	/* set flag in leftpage indicating that rightpage has no downlink yet */
1292 	lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1293 	lopaque->btpo_prev = oopaque->btpo_prev;
1294 	/* handle btpo_next after rightpage buffer acquired */
1295 	lopaque->btpo.level = oopaque->btpo.level;
1296 	/* handle btpo_cycleid after rightpage buffer acquired */
1297 
1298 	/*
1299 	 * Copy the original page's LSN into leftpage, which will become the
1300 	 * updated version of the page.  We need this because XLogInsert will
1301 	 * examine the LSN and possibly dump it in a page image.
1302 	 */
1303 	PageSetLSN(leftpage, PageGetLSN(origpage));
1304 	isleaf = P_ISLEAF(oopaque);
1305 
1306 	/*
1307 	 * The "high key" for the new left page will be the first key that's going
1308 	 * to go into the new right page, or a truncated version if this is a leaf
1309 	 * page split.
1310 	 *
1311 	 * The high key for the left page is formed using the first item on the
1312 	 * right page, which may seem to be contrary to Lehman & Yao's approach of
1313 	 * using the left page's last item as its new high key when splitting on
1314 	 * the leaf level.  It isn't, though: suffix truncation will leave the
1315 	 * left page's high key fully equal to the last item on the left page when
1316 	 * two tuples with equal key values (excluding heap TID) enclose the split
1317 	 * point.  It isn't actually necessary for a new leaf high key to be equal
1318 	 * to the last item on the left for the L&Y "subtree" invariant to hold.
1319 	 * It's sufficient to make sure that the new leaf high key is strictly
1320 	 * less than the first item on the right leaf page, and greater than or
1321 	 * equal to (not necessarily equal to) the last item on the left leaf
1322 	 * page.
1323 	 *
1324 	 * In other words, when suffix truncation isn't possible, L&Y's exact
1325 	 * approach to leaf splits is taken.  (Actually, even that is slightly
1326 	 * inaccurate.  A tuple with all the keys from firstright but the heap TID
1327 	 * from lastleft will be used as the new high key, since the last left
1328 	 * tuple could be physically larger despite being opclass-equal in respect
1329 	 * of all attributes prior to the heap TID attribute.)
1330 	 */
1331 	if (!newitemonleft && newitemoff == firstright)
1332 	{
1333 		/* incoming tuple will become first on right page */
1334 		itemsz = newitemsz;
1335 		item = newitem;
1336 	}
1337 	else
1338 	{
1339 		/* existing item at firstright will become first on right page */
1340 		itemid = PageGetItemId(origpage, firstright);
1341 		itemsz = ItemIdGetLength(itemid);
1342 		item = (IndexTuple) PageGetItem(origpage, itemid);
1343 	}
1344 
1345 	/*
1346 	 * Truncate unneeded key and non-key attributes of the high key item
1347 	 * before inserting it on the left page.  This can only happen at the leaf
1348 	 * level, since in general all pivot tuple values originate from leaf
1349 	 * level high keys.  A pivot tuple in a grandparent page must guide a
1350 	 * search not only to the correct parent page, but also to the correct
1351 	 * leaf page.
1352 	 */
1353 	if (isleaf && (itup_key->heapkeyspace || indnatts != indnkeyatts))
1354 	{
1355 		IndexTuple	lastleft;
1356 
1357 		/*
1358 		 * Determine which tuple will become the last on the left page.  This
1359 		 * is needed to decide how many attributes from the first item on the
1360 		 * right page must remain in new high key for left page.
1361 		 */
1362 		if (newitemonleft && newitemoff == firstright)
1363 		{
1364 			/* incoming tuple will become last on left page */
1365 			lastleft = newitem;
1366 		}
1367 		else
1368 		{
1369 			OffsetNumber lastleftoff;
1370 
1371 			/* item just before firstright will become last on left page */
1372 			lastleftoff = OffsetNumberPrev(firstright);
1373 			Assert(lastleftoff >= P_FIRSTDATAKEY(oopaque));
1374 			itemid = PageGetItemId(origpage, lastleftoff);
1375 			lastleft = (IndexTuple) PageGetItem(origpage, itemid);
1376 		}
1377 
1378 		Assert(lastleft != item);
1379 		lefthikey = _bt_truncate(rel, lastleft, item, itup_key);
1380 		itemsz = IndexTupleSize(lefthikey);
1381 		itemsz = MAXALIGN(itemsz);
1382 	}
1383 	else
1384 		lefthikey = item;
1385 
1386 	/*
1387 	 * Add new high key to leftpage
1388 	 */
1389 	leftoff = P_HIKEY;
1390 
1391 	Assert(BTreeTupleGetNAtts(lefthikey, rel) > 0);
1392 	Assert(BTreeTupleGetNAtts(lefthikey, rel) <= indnkeyatts);
1393 	if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff,
1394 					false, false) == InvalidOffsetNumber)
1395 		elog(ERROR, "failed to add hikey to the left sibling"
1396 			 " while splitting block %u of index \"%s\"",
1397 			 origpagenumber, RelationGetRelationName(rel));
1398 	leftoff = OffsetNumberNext(leftoff);
1399 	/* be tidy */
1400 	if (lefthikey != item)
1401 		pfree(lefthikey);
1402 
1403 	/*
1404 	 * Acquire a new right page to split into, now that left page has a new
1405 	 * high key.  From here on, it's not okay to throw an error without
1406 	 * zeroing rightpage first.  This coding rule ensures that we won't
1407 	 * confuse future VACUUM operations, which might otherwise try to re-find
1408 	 * a downlink to a leftover junk page as the page undergoes deletion.
1409 	 *
1410 	 * It would be reasonable to start the critical section just after the new
1411 	 * rightpage buffer is acquired instead; that would allow us to avoid
1412 	 * leftover junk pages without bothering to zero rightpage.  We do it this
1413 	 * way because it avoids an unnecessary PANIC when either origpage or its
1414 	 * existing sibling page are corrupt.
1415 	 */
1416 	rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1417 	rightpage = BufferGetPage(rbuf);
1418 	rightpagenumber = BufferGetBlockNumber(rbuf);
1419 	/* rightpage was initialized by _bt_getbuf */
1420 	ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1421 
1422 	/*
1423 	 * Finish off remaining leftpage special area fields.  They cannot be set
1424 	 * before both origpage (leftpage) and rightpage buffers are acquired and
1425 	 * locked.
1426 	 */
1427 	lopaque->btpo_next = rightpagenumber;
1428 	lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1429 
1430 	/*
1431 	 * rightpage won't be the root when we're done.  Also, clear the SPLIT_END
1432 	 * and HAS_GARBAGE flags.
1433 	 */
1434 	ropaque->btpo_flags = oopaque->btpo_flags;
1435 	ropaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1436 	ropaque->btpo_prev = origpagenumber;
1437 	ropaque->btpo_next = oopaque->btpo_next;
1438 	ropaque->btpo.level = oopaque->btpo.level;
1439 	ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1440 
1441 	/*
1442 	 * Add new high key to rightpage where necessary.
1443 	 *
1444 	 * If the page we're splitting is not the rightmost page at its level in
1445 	 * the tree, then the first entry on the page is the high key from
1446 	 * origpage.
1447 	 */
1448 	rightoff = P_HIKEY;
1449 
1450 	if (!P_RIGHTMOST(oopaque))
1451 	{
1452 		itemid = PageGetItemId(origpage, P_HIKEY);
1453 		itemsz = ItemIdGetLength(itemid);
1454 		item = (IndexTuple) PageGetItem(origpage, itemid);
1455 		Assert(BTreeTupleGetNAtts(item, rel) > 0);
1456 		Assert(BTreeTupleGetNAtts(item, rel) <= indnkeyatts);
1457 		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1458 						false, false) == InvalidOffsetNumber)
1459 		{
1460 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1461 			elog(ERROR, "failed to add hikey to the right sibling"
1462 				 " while splitting block %u of index \"%s\"",
1463 				 origpagenumber, RelationGetRelationName(rel));
1464 		}
1465 		rightoff = OffsetNumberNext(rightoff);
1466 	}
1467 
1468 	/*
1469 	 * Now transfer all the data items (non-pivot tuples in isleaf case, or
1470 	 * additional pivot tuples in !isleaf case) to the appropriate page.
1471 	 *
1472 	 * Note: we *must* insert at least the right page's items in item-number
1473 	 * order, for the benefit of _bt_restore_page().
1474 	 */
1475 	maxoff = PageGetMaxOffsetNumber(origpage);
1476 
1477 	for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1478 	{
1479 		itemid = PageGetItemId(origpage, i);
1480 		itemsz = ItemIdGetLength(itemid);
1481 		item = (IndexTuple) PageGetItem(origpage, itemid);
1482 
1483 		/* does new item belong before this one? */
1484 		if (i == newitemoff)
1485 		{
1486 			if (newitemonleft)
1487 			{
1488 				Assert(newitemoff <= firstright);
1489 				if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1490 				{
1491 					memset(rightpage, 0, BufferGetPageSize(rbuf));
1492 					elog(ERROR, "failed to add new item to the left sibling"
1493 						 " while splitting block %u of index \"%s\"",
1494 						 origpagenumber, RelationGetRelationName(rel));
1495 				}
1496 				leftoff = OffsetNumberNext(leftoff);
1497 			}
1498 			else
1499 			{
1500 				Assert(newitemoff >= firstright);
1501 				if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1502 				{
1503 					memset(rightpage, 0, BufferGetPageSize(rbuf));
1504 					elog(ERROR, "failed to add new item to the right sibling"
1505 						 " while splitting block %u of index \"%s\"",
1506 						 origpagenumber, RelationGetRelationName(rel));
1507 				}
1508 				rightoff = OffsetNumberNext(rightoff);
1509 			}
1510 		}
1511 
1512 		/* decide which page to put it on */
1513 		if (i < firstright)
1514 		{
1515 			if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1516 			{
1517 				memset(rightpage, 0, BufferGetPageSize(rbuf));
1518 				elog(ERROR, "failed to add old item to the left sibling"
1519 					 " while splitting block %u of index \"%s\"",
1520 					 origpagenumber, RelationGetRelationName(rel));
1521 			}
1522 			leftoff = OffsetNumberNext(leftoff);
1523 		}
1524 		else
1525 		{
1526 			if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1527 			{
1528 				memset(rightpage, 0, BufferGetPageSize(rbuf));
1529 				elog(ERROR, "failed to add old item to the right sibling"
1530 					 " while splitting block %u of index \"%s\"",
1531 					 origpagenumber, RelationGetRelationName(rel));
1532 			}
1533 			rightoff = OffsetNumberNext(rightoff);
1534 		}
1535 	}
1536 
1537 	/* cope with possibility that newitem goes at the end */
1538 	if (i <= newitemoff)
1539 	{
1540 		/*
1541 		 * Can't have newitemonleft here; that would imply we were told to put
1542 		 * *everything* on the left page, which cannot fit (if it could, we'd
1543 		 * not be splitting the page).
1544 		 */
1545 		Assert(!newitemonleft);
1546 		if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1547 		{
1548 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1549 			elog(ERROR, "failed to add new item to the right sibling"
1550 				 " while splitting block %u of index \"%s\"",
1551 				 origpagenumber, RelationGetRelationName(rel));
1552 		}
1553 		rightoff = OffsetNumberNext(rightoff);
1554 	}
1555 
1556 	/*
1557 	 * We have to grab the right sibling (if any) and fix the prev pointer
1558 	 * there. We are guaranteed that this is deadlock-free since no other
1559 	 * writer will be holding a lock on that page and trying to move left, and
1560 	 * all readers release locks on a page before trying to fetch its
1561 	 * neighbors.
1562 	 */
1563 	if (!P_RIGHTMOST(oopaque))
1564 	{
1565 		sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1566 		spage = BufferGetPage(sbuf);
1567 		sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1568 		if (sopaque->btpo_prev != origpagenumber)
1569 		{
1570 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1571 			elog(ERROR, "right sibling's left-link doesn't match: "
1572 				 "block %u links to %u instead of expected %u in index \"%s\"",
1573 				 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1574 				 RelationGetRelationName(rel));
1575 		}
1576 
1577 		/*
1578 		 * Check to see if we can set the SPLIT_END flag in the right-hand
1579 		 * split page; this can save some I/O for vacuum since it need not
1580 		 * proceed to the right sibling.  We can set the flag if the right
1581 		 * sibling has a different cycleid: that means it could not be part of
1582 		 * a group of pages that were all split off from the same ancestor
1583 		 * page.  If you're confused, imagine that page A splits to A B and
1584 		 * then again, yielding A C B, while vacuum is in progress.  Tuples
1585 		 * originally in A could now be in either B or C, hence vacuum must
1586 		 * examine both pages.  But if D, our right sibling, has a different
1587 		 * cycleid then it could not contain any tuples that were in A when
1588 		 * the vacuum started.
1589 		 */
1590 		if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1591 			ropaque->btpo_flags |= BTP_SPLIT_END;
1592 	}
1593 
1594 	/*
1595 	 * Right sibling is locked, new siblings are prepared, but original page
1596 	 * is not updated yet.
1597 	 *
1598 	 * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
1599 	 * not starting the critical section till here because we haven't been
1600 	 * scribbling on the original page yet; see comments above.
1601 	 */
1602 	START_CRIT_SECTION();
1603 
1604 	/*
1605 	 * By here, the original data page has been split into two new halves, and
1606 	 * these are correct.  The algorithm requires that the left page never
1607 	 * move during a split, so we copy the new left page back on top of the
1608 	 * original.  Note that this is not a waste of time, since we also require
1609 	 * (in the page management code) that the center of a page always be
1610 	 * clean, and the most efficient way to guarantee this is just to compact
1611 	 * the data by reinserting it into a new left page.  (XXX the latter
1612 	 * comment is probably obsolete; but in any case it's good to not scribble
1613 	 * on the original page until we enter the critical section.)
1614 	 *
1615 	 * We need to do this before writing the WAL record, so that XLogInsert
1616 	 * can WAL log an image of the page if necessary.
1617 	 */
1618 	PageRestoreTempPage(leftpage, origpage);
1619 	/* leftpage, lopaque must not be used below here */
1620 
1621 	MarkBufferDirty(buf);
1622 	MarkBufferDirty(rbuf);
1623 
1624 	if (!P_RIGHTMOST(ropaque))
1625 	{
1626 		sopaque->btpo_prev = rightpagenumber;
1627 		MarkBufferDirty(sbuf);
1628 	}
1629 
1630 	/*
1631 	 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1632 	 * a split.
1633 	 */
1634 	if (!isleaf)
1635 	{
1636 		Page		cpage = BufferGetPage(cbuf);
1637 		BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1638 
1639 		cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1640 		MarkBufferDirty(cbuf);
1641 	}
1642 
1643 	/* XLOG stuff */
1644 	if (RelationNeedsWAL(rel))
1645 	{
1646 		xl_btree_split xlrec;
1647 		uint8		xlinfo;
1648 		XLogRecPtr	recptr;
1649 
1650 		xlrec.level = ropaque->btpo.level;
1651 		xlrec.firstright = firstright;
1652 		xlrec.newitemoff = newitemoff;
1653 
1654 		XLogBeginInsert();
1655 		XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1656 
1657 		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1658 		XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1659 		/* Log the right sibling, because we've changed its prev-pointer. */
1660 		if (!P_RIGHTMOST(ropaque))
1661 			XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1662 		if (BufferIsValid(cbuf))
1663 			XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1664 
1665 		/*
1666 		 * Log the new item, if it was inserted on the left page. (If it was
1667 		 * put on the right page, we don't need to explicitly WAL log it
1668 		 * because it's included with all the other items on the right page.)
1669 		 * Show the new item as belonging to the left page buffer, so that it
1670 		 * is not stored if XLogInsert decides it needs a full-page image of
1671 		 * the left page.  We store the offset anyway, though, to support
1672 		 * archive compression of these records.
1673 		 */
1674 		if (newitemonleft)
1675 			XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1676 
1677 		/* Log the left page's new high key */
1678 		itemid = PageGetItemId(origpage, P_HIKEY);
1679 		item = (IndexTuple) PageGetItem(origpage, itemid);
1680 		XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1681 
1682 		/*
1683 		 * Log the contents of the right page in the format understood by
1684 		 * _bt_restore_page().  The whole right page will be recreated.
1685 		 *
1686 		 * Direct access to page is not good but faster - we should implement
1687 		 * some new func in page API.  Note we only store the tuples
1688 		 * themselves, knowing that they were inserted in item-number order
1689 		 * and so the line pointers can be reconstructed.  See comments for
1690 		 * _bt_restore_page().
1691 		 */
1692 		XLogRegisterBufData(1,
1693 							(char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1694 							((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1695 
1696 		xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1697 		recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1698 
1699 		PageSetLSN(origpage, recptr);
1700 		PageSetLSN(rightpage, recptr);
1701 		if (!P_RIGHTMOST(ropaque))
1702 		{
1703 			PageSetLSN(spage, recptr);
1704 		}
1705 		if (!isleaf)
1706 		{
1707 			PageSetLSN(BufferGetPage(cbuf), recptr);
1708 		}
1709 	}
1710 
1711 	END_CRIT_SECTION();
1712 
1713 	/* release the old right sibling */
1714 	if (!P_RIGHTMOST(ropaque))
1715 		_bt_relbuf(rel, sbuf);
1716 
1717 	/* release the child */
1718 	if (!isleaf)
1719 		_bt_relbuf(rel, cbuf);
1720 
1721 	/* split's done */
1722 	return rbuf;
1723 }
1724 
1725 /*
1726  * _bt_insert_parent() -- Insert downlink into parent, completing split.
1727  *
1728  * On entry, buf and rbuf are the left and right split pages, which we
1729  * still hold write locks on.  Both locks will be released here.  We
1730  * release the rbuf lock once we have a write lock on the page that we
1731  * intend to insert a downlink to rbuf on (i.e. buf's current parent page).
1732  * The lock on buf is released at the same point as the lock on the parent
1733  * page, since buf's INCOMPLETE_SPLIT flag must be cleared by the same
1734  * atomic operation that completes the split by inserting a new downlink.
1735  *
1736  * stack - stack showing how we got here.  Will be NULL when splitting true
1737  *			root, or during concurrent root split, where we can be inefficient
1738  * is_root - we split the true root
1739  * is_only - we split a page alone on its level (might have been fast root)
1740  */
1741 static void
_bt_insert_parent(Relation rel,Buffer buf,Buffer rbuf,BTStack stack,bool is_root,bool is_only)1742 _bt_insert_parent(Relation rel,
1743 				  Buffer buf,
1744 				  Buffer rbuf,
1745 				  BTStack stack,
1746 				  bool is_root,
1747 				  bool is_only)
1748 {
1749 	/*
1750 	 * Here we have to do something Lehman and Yao don't talk about: deal with
1751 	 * a root split and construction of a new root.  If our stack is empty
1752 	 * then we have just split a node on what had been the root level when we
1753 	 * descended the tree.  If it was still the root then we perform a
1754 	 * new-root construction.  If it *wasn't* the root anymore, search to find
1755 	 * the next higher level that someone constructed meanwhile, and find the
1756 	 * right place to insert as for the normal case.
1757 	 *
1758 	 * If we have to search for the parent level, we do so by re-descending
1759 	 * from the root.  This is not super-efficient, but it's rare enough not
1760 	 * to matter.
1761 	 */
1762 	if (is_root)
1763 	{
1764 		Buffer		rootbuf;
1765 
1766 		Assert(stack == NULL);
1767 		Assert(is_only);
1768 		/* create a new root node and update the metapage */
1769 		rootbuf = _bt_newroot(rel, buf, rbuf);
1770 		/* release the split buffers */
1771 		_bt_relbuf(rel, rootbuf);
1772 		_bt_relbuf(rel, rbuf);
1773 		_bt_relbuf(rel, buf);
1774 	}
1775 	else
1776 	{
1777 		BlockNumber bknum = BufferGetBlockNumber(buf);
1778 		BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1779 		Page		page = BufferGetPage(buf);
1780 		IndexTuple	new_item;
1781 		BTStackData fakestack;
1782 		IndexTuple	ritem;
1783 		Buffer		pbuf;
1784 
1785 		if (stack == NULL)
1786 		{
1787 			BTPageOpaque lpageop;
1788 
1789 			elog(DEBUG2, "concurrent ROOT page split");
1790 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1791 			/* Find the leftmost page at the next level up */
1792 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1793 									NULL);
1794 			/* Set up a phony stack entry pointing there */
1795 			stack = &fakestack;
1796 			stack->bts_blkno = BufferGetBlockNumber(pbuf);
1797 			stack->bts_offset = InvalidOffsetNumber;
1798 			stack->bts_btentry = InvalidBlockNumber;
1799 			stack->bts_parent = NULL;
1800 			_bt_relbuf(rel, pbuf);
1801 		}
1802 
1803 		/* get high key from left, a strict lower bound for new right page */
1804 		ritem = (IndexTuple) PageGetItem(page,
1805 										 PageGetItemId(page, P_HIKEY));
1806 
1807 		/* form an index tuple that points at the new right page */
1808 		new_item = CopyIndexTuple(ritem);
1809 		BTreeInnerTupleSetDownLink(new_item, rbknum);
1810 
1811 		/*
1812 		 * Re-find and write lock the parent of buf.
1813 		 *
1814 		 * It's possible that the location of buf's downlink has changed since
1815 		 * our initial _bt_search() descent.  _bt_getstackbuf() will detect
1816 		 * and recover from this, updating the stack, which ensures that the
1817 		 * new downlink will be inserted at the correct offset. Even buf's
1818 		 * parent may have changed.
1819 		 */
1820 		stack->bts_btentry = bknum;
1821 		pbuf = _bt_getstackbuf(rel, stack);
1822 
1823 		/*
1824 		 * Now we can unlock the right child. The left child will be unlocked
1825 		 * by _bt_insertonpg().
1826 		 */
1827 		_bt_relbuf(rel, rbuf);
1828 
1829 		if (pbuf == InvalidBuffer)
1830 			elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1831 				 RelationGetRelationName(rel), bknum, rbknum);
1832 
1833 		/* Recursively update the parent */
1834 		_bt_insertonpg(rel, NULL, pbuf, buf, stack->bts_parent,
1835 					   new_item, stack->bts_offset + 1,
1836 					   is_only);
1837 
1838 		/* be tidy */
1839 		pfree(new_item);
1840 	}
1841 }
1842 
1843 /*
1844  * _bt_finish_split() -- Finish an incomplete split
1845  *
1846  * A crash or other failure can leave a split incomplete.  The insertion
1847  * routines won't allow to insert on a page that is incompletely split.
1848  * Before inserting on such a page, call _bt_finish_split().
1849  *
1850  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
1851  * and unpinned.
1852  */
1853 void
_bt_finish_split(Relation rel,Buffer lbuf,BTStack stack)1854 _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1855 {
1856 	Page		lpage = BufferGetPage(lbuf);
1857 	BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1858 	Buffer		rbuf;
1859 	Page		rpage;
1860 	BTPageOpaque rpageop;
1861 	bool		was_root;
1862 	bool		was_only;
1863 
1864 	Assert(P_INCOMPLETE_SPLIT(lpageop));
1865 
1866 	/* Lock right sibling, the one missing the downlink */
1867 	rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1868 	rpage = BufferGetPage(rbuf);
1869 	rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1870 
1871 	/* Could this be a root split? */
1872 	if (!stack)
1873 	{
1874 		Buffer		metabuf;
1875 		Page		metapg;
1876 		BTMetaPageData *metad;
1877 
1878 		/* acquire lock on the metapage */
1879 		metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1880 		metapg = BufferGetPage(metabuf);
1881 		metad = BTPageGetMeta(metapg);
1882 
1883 		was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1884 
1885 		_bt_relbuf(rel, metabuf);
1886 	}
1887 	else
1888 		was_root = false;
1889 
1890 	/* Was this the only page on the level before split? */
1891 	was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1892 
1893 	elog(DEBUG1, "finishing incomplete split of %u/%u",
1894 		 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1895 
1896 	_bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1897 }
1898 
1899 /*
1900  *	_bt_getstackbuf() -- Walk back up the tree one step, and find the item
1901  *						 we last looked at in the parent.
1902  *
1903  *		This is possible because we save the downlink from the parent item,
1904  *		which is enough to uniquely identify it.  Insertions into the parent
1905  *		level could cause the item to move right; deletions could cause it
1906  *		to move left, but not left of the page we previously found it in.
1907  *
1908  *		Adjusts bts_blkno & bts_offset if changed.
1909  *
1910  *		Returns write-locked buffer, or InvalidBuffer if item not found
1911  *		(should not happen).
1912  */
1913 Buffer
_bt_getstackbuf(Relation rel,BTStack stack)1914 _bt_getstackbuf(Relation rel, BTStack stack)
1915 {
1916 	BlockNumber blkno;
1917 	OffsetNumber start;
1918 
1919 	blkno = stack->bts_blkno;
1920 	start = stack->bts_offset;
1921 
1922 	for (;;)
1923 	{
1924 		Buffer		buf;
1925 		Page		page;
1926 		BTPageOpaque opaque;
1927 
1928 		buf = _bt_getbuf(rel, blkno, BT_WRITE);
1929 		page = BufferGetPage(buf);
1930 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1931 
1932 		if (P_INCOMPLETE_SPLIT(opaque))
1933 		{
1934 			_bt_finish_split(rel, buf, stack->bts_parent);
1935 			continue;
1936 		}
1937 
1938 		if (!P_IGNORE(opaque))
1939 		{
1940 			OffsetNumber offnum,
1941 						minoff,
1942 						maxoff;
1943 			ItemId		itemid;
1944 			IndexTuple	item;
1945 
1946 			minoff = P_FIRSTDATAKEY(opaque);
1947 			maxoff = PageGetMaxOffsetNumber(page);
1948 
1949 			/*
1950 			 * start = InvalidOffsetNumber means "search the whole page". We
1951 			 * need this test anyway due to possibility that page has a high
1952 			 * key now when it didn't before.
1953 			 */
1954 			if (start < minoff)
1955 				start = minoff;
1956 
1957 			/*
1958 			 * Need this check too, to guard against possibility that page
1959 			 * split since we visited it originally.
1960 			 */
1961 			if (start > maxoff)
1962 				start = OffsetNumberNext(maxoff);
1963 
1964 			/*
1965 			 * These loops will check every item on the page --- but in an
1966 			 * order that's attuned to the probability of where it actually
1967 			 * is.  Scan to the right first, then to the left.
1968 			 */
1969 			for (offnum = start;
1970 				 offnum <= maxoff;
1971 				 offnum = OffsetNumberNext(offnum))
1972 			{
1973 				itemid = PageGetItemId(page, offnum);
1974 				item = (IndexTuple) PageGetItem(page, itemid);
1975 
1976 				if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
1977 				{
1978 					/* Return accurate pointer to where link is now */
1979 					stack->bts_blkno = blkno;
1980 					stack->bts_offset = offnum;
1981 					return buf;
1982 				}
1983 			}
1984 
1985 			for (offnum = OffsetNumberPrev(start);
1986 				 offnum >= minoff;
1987 				 offnum = OffsetNumberPrev(offnum))
1988 			{
1989 				itemid = PageGetItemId(page, offnum);
1990 				item = (IndexTuple) PageGetItem(page, itemid);
1991 
1992 				if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
1993 				{
1994 					/* Return accurate pointer to where link is now */
1995 					stack->bts_blkno = blkno;
1996 					stack->bts_offset = offnum;
1997 					return buf;
1998 				}
1999 			}
2000 		}
2001 
2002 		/*
2003 		 * The item we're looking for moved right at least one page.
2004 		 */
2005 		if (P_RIGHTMOST(opaque))
2006 		{
2007 			_bt_relbuf(rel, buf);
2008 			return InvalidBuffer;
2009 		}
2010 		blkno = opaque->btpo_next;
2011 		start = InvalidOffsetNumber;
2012 		_bt_relbuf(rel, buf);
2013 	}
2014 }
2015 
2016 /*
2017  *	_bt_newroot() -- Create a new root page for the index.
2018  *
2019  *		We've just split the old root page and need to create a new one.
2020  *		In order to do this, we add a new root page to the file, then lock
2021  *		the metadata page and update it.  This is guaranteed to be deadlock-
2022  *		free, because all readers release their locks on the metadata page
2023  *		before trying to lock the root, and all writers lock the root before
2024  *		trying to lock the metadata page.  We have a write lock on the old
2025  *		root page, so we have not introduced any cycles into the waits-for
2026  *		graph.
2027  *
2028  *		On entry, lbuf (the old root) and rbuf (its new peer) are write-
2029  *		locked. On exit, a new root page exists with entries for the
2030  *		two new children, metapage is updated and unlocked/unpinned.
2031  *		The new root buffer is returned to caller which has to unlock/unpin
2032  *		lbuf, rbuf & rootbuf.
2033  */
2034 static Buffer
_bt_newroot(Relation rel,Buffer lbuf,Buffer rbuf)2035 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
2036 {
2037 	Buffer		rootbuf;
2038 	Page		lpage,
2039 				rootpage;
2040 	BlockNumber lbkno,
2041 				rbkno;
2042 	BlockNumber rootblknum;
2043 	BTPageOpaque rootopaque;
2044 	BTPageOpaque lopaque;
2045 	ItemId		itemid;
2046 	IndexTuple	item;
2047 	IndexTuple	left_item;
2048 	Size		left_item_sz;
2049 	IndexTuple	right_item;
2050 	Size		right_item_sz;
2051 	Buffer		metabuf;
2052 	Page		metapg;
2053 	BTMetaPageData *metad;
2054 
2055 	lbkno = BufferGetBlockNumber(lbuf);
2056 	rbkno = BufferGetBlockNumber(rbuf);
2057 	lpage = BufferGetPage(lbuf);
2058 	lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
2059 
2060 	/* get a new root page */
2061 	rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
2062 	rootpage = BufferGetPage(rootbuf);
2063 	rootblknum = BufferGetBlockNumber(rootbuf);
2064 
2065 	/* acquire lock on the metapage */
2066 	metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
2067 	metapg = BufferGetPage(metabuf);
2068 	metad = BTPageGetMeta(metapg);
2069 
2070 	/*
2071 	 * Create downlink item for left page (old root).  Since this will be the
2072 	 * first item in a non-leaf page, it implicitly has minus-infinity key
2073 	 * value, so we need not store any actual key in it.
2074 	 */
2075 	left_item_sz = sizeof(IndexTupleData);
2076 	left_item = (IndexTuple) palloc(left_item_sz);
2077 	left_item->t_info = left_item_sz;
2078 	BTreeInnerTupleSetDownLink(left_item, lbkno);
2079 	BTreeTupleSetNAtts(left_item, 0);
2080 
2081 	/*
2082 	 * Create downlink item for right page.  The key for it is obtained from
2083 	 * the "high key" position in the left page.
2084 	 */
2085 	itemid = PageGetItemId(lpage, P_HIKEY);
2086 	right_item_sz = ItemIdGetLength(itemid);
2087 	item = (IndexTuple) PageGetItem(lpage, itemid);
2088 	right_item = CopyIndexTuple(item);
2089 	BTreeInnerTupleSetDownLink(right_item, rbkno);
2090 
2091 	/* NO EREPORT(ERROR) from here till newroot op is logged */
2092 	START_CRIT_SECTION();
2093 
2094 	/* upgrade metapage if needed */
2095 	if (metad->btm_version < BTREE_NOVAC_VERSION)
2096 		_bt_upgrademetapage(metapg);
2097 
2098 	/* set btree special data */
2099 	rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
2100 	rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
2101 	rootopaque->btpo_flags = BTP_ROOT;
2102 	rootopaque->btpo.level =
2103 		((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
2104 	rootopaque->btpo_cycleid = 0;
2105 
2106 	/* update metapage data */
2107 	metad->btm_root = rootblknum;
2108 	metad->btm_level = rootopaque->btpo.level;
2109 	metad->btm_fastroot = rootblknum;
2110 	metad->btm_fastlevel = rootopaque->btpo.level;
2111 
2112 	/*
2113 	 * Insert the left page pointer into the new root page.  The root page is
2114 	 * the rightmost page on its level so there is no "high key" in it; the
2115 	 * two items will go into positions P_HIKEY and P_FIRSTKEY.
2116 	 *
2117 	 * Note: we *must* insert the two items in item-number order, for the
2118 	 * benefit of _bt_restore_page().
2119 	 */
2120 	Assert(BTreeTupleGetNAtts(left_item, rel) == 0);
2121 	if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
2122 					false, false) == InvalidOffsetNumber)
2123 		elog(PANIC, "failed to add leftkey to new root page"
2124 			 " while splitting block %u of index \"%s\"",
2125 			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2126 
2127 	/*
2128 	 * insert the right page pointer into the new root page.
2129 	 */
2130 	Assert(BTreeTupleGetNAtts(right_item, rel) > 0);
2131 	Assert(BTreeTupleGetNAtts(right_item, rel) <=
2132 		   IndexRelationGetNumberOfKeyAttributes(rel));
2133 	if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2134 					false, false) == InvalidOffsetNumber)
2135 		elog(PANIC, "failed to add rightkey to new root page"
2136 			 " while splitting block %u of index \"%s\"",
2137 			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2138 
2139 	/* Clear the incomplete-split flag in the left child */
2140 	Assert(P_INCOMPLETE_SPLIT(lopaque));
2141 	lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2142 	MarkBufferDirty(lbuf);
2143 
2144 	MarkBufferDirty(rootbuf);
2145 	MarkBufferDirty(metabuf);
2146 
2147 	/* XLOG stuff */
2148 	if (RelationNeedsWAL(rel))
2149 	{
2150 		xl_btree_newroot xlrec;
2151 		XLogRecPtr	recptr;
2152 		xl_btree_metadata md;
2153 
2154 		xlrec.rootblk = rootblknum;
2155 		xlrec.level = metad->btm_level;
2156 
2157 		XLogBeginInsert();
2158 		XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2159 
2160 		XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2161 		XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2162 		XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
2163 
2164 		Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
2165 		md.version = metad->btm_version;
2166 		md.root = rootblknum;
2167 		md.level = metad->btm_level;
2168 		md.fastroot = rootblknum;
2169 		md.fastlevel = metad->btm_level;
2170 		md.oldest_btpo_xact = metad->btm_oldest_btpo_xact;
2171 		md.last_cleanup_num_heap_tuples = metad->btm_last_cleanup_num_heap_tuples;
2172 
2173 		XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2174 
2175 		/*
2176 		 * Direct access to page is not good but faster - we should implement
2177 		 * some new func in page API.
2178 		 */
2179 		XLogRegisterBufData(0,
2180 							(char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2181 							((PageHeader) rootpage)->pd_special -
2182 							((PageHeader) rootpage)->pd_upper);
2183 
2184 		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2185 
2186 		PageSetLSN(lpage, recptr);
2187 		PageSetLSN(rootpage, recptr);
2188 		PageSetLSN(metapg, recptr);
2189 	}
2190 
2191 	END_CRIT_SECTION();
2192 
2193 	/* done with metapage */
2194 	_bt_relbuf(rel, metabuf);
2195 
2196 	pfree(left_item);
2197 	pfree(right_item);
2198 
2199 	return rootbuf;
2200 }
2201 
2202 /*
2203  *	_bt_pgaddtup() -- add a tuple to a particular page in the index.
2204  *
2205  *		This routine adds the tuple to the page as requested.  It does
2206  *		not affect pin/lock status, but you'd better have a write lock
2207  *		and pin on the target buffer!  Don't forget to write and release
2208  *		the buffer afterwards, either.
2209  *
2210  *		The main difference between this routine and a bare PageAddItem call
2211  *		is that this code knows that the leftmost index tuple on a non-leaf
2212  *		btree page doesn't need to have a key.  Therefore, it strips such
2213  *		tuples down to just the tuple header.  CAUTION: this works ONLY if
2214  *		we insert the tuples in order, so that the given itup_off does
2215  *		represent the final position of the tuple!
2216  */
2217 static bool
_bt_pgaddtup(Page page,Size itemsize,IndexTuple itup,OffsetNumber itup_off)2218 _bt_pgaddtup(Page page,
2219 			 Size itemsize,
2220 			 IndexTuple itup,
2221 			 OffsetNumber itup_off)
2222 {
2223 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2224 	IndexTupleData trunctuple;
2225 
2226 	if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2227 	{
2228 		trunctuple = *itup;
2229 		trunctuple.t_info = sizeof(IndexTupleData);
2230 		/* Deliberately zero INDEX_ALT_TID_MASK bits */
2231 		BTreeTupleSetNAtts(&trunctuple, 0);
2232 		itup = &trunctuple;
2233 		itemsize = sizeof(IndexTupleData);
2234 	}
2235 
2236 	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2237 					false, false) == InvalidOffsetNumber)
2238 		return false;
2239 
2240 	return true;
2241 }
2242 
2243 /*
2244  * _bt_vacuum_one_page - vacuum just one index page.
2245  *
2246  * Try to remove LP_DEAD items from the given page.  The passed buffer
2247  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2248  * super-exclusive "cleanup" lock (see nbtree/README).
2249  */
2250 static void
_bt_vacuum_one_page(Relation rel,Buffer buffer,Relation heapRel)2251 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2252 {
2253 	OffsetNumber deletable[MaxOffsetNumber];
2254 	int			ndeletable = 0;
2255 	OffsetNumber offnum,
2256 				minoff,
2257 				maxoff;
2258 	Page		page = BufferGetPage(buffer);
2259 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2260 
2261 	Assert(P_ISLEAF(opaque));
2262 
2263 	/*
2264 	 * Scan over all items to see which ones need to be deleted according to
2265 	 * LP_DEAD flags.
2266 	 */
2267 	minoff = P_FIRSTDATAKEY(opaque);
2268 	maxoff = PageGetMaxOffsetNumber(page);
2269 	for (offnum = minoff;
2270 		 offnum <= maxoff;
2271 		 offnum = OffsetNumberNext(offnum))
2272 	{
2273 		ItemId		itemId = PageGetItemId(page, offnum);
2274 
2275 		if (ItemIdIsDead(itemId))
2276 			deletable[ndeletable++] = offnum;
2277 	}
2278 
2279 	if (ndeletable > 0)
2280 		_bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2281 
2282 	/*
2283 	 * Note: if we didn't find any LP_DEAD items, then the page's
2284 	 * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
2285 	 * separate write to clear it, however.  We will clear it when we split
2286 	 * the page.
2287 	 */
2288 }
2289