1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *	  Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/lmgr.h"
24 #include "storage/predicate.h"
25 #include "utils/tqual.h"
26 
27 
28 typedef struct
29 {
30 	/* context data for _bt_checksplitloc */
31 	Size		newitemsz;		/* size of new item to be inserted */
32 	int			fillfactor;		/* needed when splitting rightmost page */
33 	bool		is_leaf;		/* T if splitting a leaf page */
34 	bool		is_rightmost;	/* T if splitting a rightmost page */
35 	OffsetNumber newitemoff;	/* where the new item is to be inserted */
36 	int			leftspace;		/* space available for items on left page */
37 	int			rightspace;		/* space available for items on right page */
38 	int			olddataitemstotal;		/* space taken by old items */
39 
40 	bool		have_split;		/* found a valid split? */
41 
42 	/* these fields valid only if have_split is true */
43 	bool		newitemonleft;	/* new item on left or right of best split */
44 	OffsetNumber firstright;	/* best split point */
45 	int			best_delta;		/* best size delta so far */
46 } FindSplitData;
47 
48 
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50 
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52 				 Relation heapRel, Buffer buf, OffsetNumber offset,
53 				 ScanKey itup_scankey,
54 				 IndexUniqueCheck checkUnique, bool *is_unique,
55 				 uint32 *speculativeToken);
56 static void _bt_findinsertloc(Relation rel,
57 				  Buffer *bufptr,
58 				  OffsetNumber *offsetptr,
59 				  int keysz,
60 				  ScanKey scankey,
61 				  IndexTuple newtup,
62 				  BTStack stack,
63 				  Relation heapRel);
64 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
65 			   BTStack stack,
66 			   IndexTuple itup,
67 			   OffsetNumber newitemoff,
68 			   bool split_only_page);
69 static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
70 		  OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
71 		  IndexTuple newitem, bool newitemonleft);
72 static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
73 				  BTStack stack, bool is_root, bool is_only);
74 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
75 				 OffsetNumber newitemoff,
76 				 Size newitemsz,
77 				 bool *newitemonleft);
78 static void _bt_checksplitloc(FindSplitData *state,
79 				  OffsetNumber firstoldonright, bool newitemonleft,
80 				  int dataitemstoleft, Size firstoldonrightsz);
81 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
82 			 OffsetNumber itup_off);
83 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
84 			int keysz, ScanKey scankey);
85 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
86 
87 
88 /*
89  *	_bt_doinsert() -- Handle insertion of a single index tuple in the tree.
90  *
91  *		This routine is called by the public interface routine, btinsert.
92  *		By here, itup is filled in, including the TID.
93  *
94  *		If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
95  *		will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
96  *		UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
97  *		For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
98  *		don't actually insert.
99  *
100  *		The result value is only significant for UNIQUE_CHECK_PARTIAL:
101  *		it must be TRUE if the entry is known unique, else FALSE.
102  *		(In the current implementation we'll also return TRUE after a
103  *		successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
104  *		that's just a coding artifact.)
105  */
106 bool
_bt_doinsert(Relation rel,IndexTuple itup,IndexUniqueCheck checkUnique,Relation heapRel)107 _bt_doinsert(Relation rel, IndexTuple itup,
108 			 IndexUniqueCheck checkUnique, Relation heapRel)
109 {
110 	bool		is_unique = false;
111 	int			natts = rel->rd_rel->relnatts;
112 	ScanKey		itup_scankey;
113 	BTStack		stack;
114 	Buffer		buf;
115 	OffsetNumber offset;
116 
117 	/* we need an insertion scan key to do our search, so build one */
118 	itup_scankey = _bt_mkscankey(rel, itup);
119 
120 top:
121 	/* find the first page containing this key */
122 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
123 
124 	offset = InvalidOffsetNumber;
125 
126 	/* trade in our read lock for a write lock */
127 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
128 	LockBuffer(buf, BT_WRITE);
129 
130 	/*
131 	 * If the page was split between the time that we surrendered our read
132 	 * lock and acquired our write lock, then this page may no longer be the
133 	 * right place for the key we want to insert.  In this case, we need to
134 	 * move right in the tree.  See Lehman and Yao for an excruciatingly
135 	 * precise description.
136 	 */
137 	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
138 						true, stack, BT_WRITE, NULL);
139 
140 	/*
141 	 * If we're not allowing duplicates, make sure the key isn't already in
142 	 * the index.
143 	 *
144 	 * NOTE: obviously, _bt_check_unique can only detect keys that are already
145 	 * in the index; so it cannot defend against concurrent insertions of the
146 	 * same key.  We protect against that by means of holding a write lock on
147 	 * the target page.  Any other would-be inserter of the same key must
148 	 * acquire a write lock on the same target page, so only one would-be
149 	 * inserter can be making the check at one time.  Furthermore, once we are
150 	 * past the check we hold write locks continuously until we have performed
151 	 * our insertion, so no later inserter can fail to see our insertion.
152 	 * (This requires some care in _bt_insertonpg.)
153 	 *
154 	 * If we must wait for another xact, we release the lock while waiting,
155 	 * and then must start over completely.
156 	 *
157 	 * For a partial uniqueness check, we don't wait for the other xact. Just
158 	 * let the tuple in and return false for possibly non-unique, or true for
159 	 * definitely unique.
160 	 */
161 	if (checkUnique != UNIQUE_CHECK_NO)
162 	{
163 		TransactionId xwait;
164 		uint32		speculativeToken;
165 
166 		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
167 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
168 								 checkUnique, &is_unique, &speculativeToken);
169 
170 		if (TransactionIdIsValid(xwait))
171 		{
172 			/* Have to wait for the other guy ... */
173 			_bt_relbuf(rel, buf);
174 
175 			/*
176 			 * If it's a speculative insertion, wait for it to finish (ie. to
177 			 * go ahead with the insertion, or kill the tuple).  Otherwise
178 			 * wait for the transaction to finish as usual.
179 			 */
180 			if (speculativeToken)
181 				SpeculativeInsertionWait(xwait, speculativeToken);
182 			else
183 				XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
184 
185 			/* start over... */
186 			_bt_freestack(stack);
187 			goto top;
188 		}
189 	}
190 
191 	if (checkUnique != UNIQUE_CHECK_EXISTING)
192 	{
193 		/*
194 		 * The only conflict predicate locking cares about for indexes is when
195 		 * an index tuple insert conflicts with an existing lock.  Since the
196 		 * actual location of the insert is hard to predict because of the
197 		 * random search used to prevent O(N^2) performance when there are
198 		 * many duplicate entries, we can just use the "first valid" page.
199 		 */
200 		CheckForSerializableConflictIn(rel, NULL, buf);
201 		/* do the insertion */
202 		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
203 						  stack, heapRel);
204 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
205 	}
206 	else
207 	{
208 		/* just release the buffer */
209 		_bt_relbuf(rel, buf);
210 	}
211 
212 	/* be tidy */
213 	_bt_freestack(stack);
214 	_bt_freeskey(itup_scankey);
215 
216 	return is_unique;
217 }
218 
219 /*
220  *	_bt_check_unique() -- Check for violation of unique index constraint
221  *
222  * offset points to the first possible item that could conflict. It can
223  * also point to end-of-page, which means that the first tuple to check
224  * is the first tuple on the next page.
225  *
226  * Returns InvalidTransactionId if there is no conflict, else an xact ID
227  * we must wait for to see if it commits a conflicting tuple.   If an actual
228  * conflict is detected, no return --- just ereport().  If an xact ID is
229  * returned, and the conflicting tuple still has a speculative insertion in
230  * progress, *speculativeToken is set to non-zero, and the caller can wait for
231  * the verdict on the insertion using SpeculativeInsertionWait().
232  *
233  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
234  * InvalidTransactionId because we don't want to wait.  In this case we
235  * set *is_unique to false if there is a potential conflict, and the
236  * core code must redo the uniqueness check later.
237  */
238 static TransactionId
_bt_check_unique(Relation rel,IndexTuple itup,Relation heapRel,Buffer buf,OffsetNumber offset,ScanKey itup_scankey,IndexUniqueCheck checkUnique,bool * is_unique,uint32 * speculativeToken)239 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
240 				 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
241 				 IndexUniqueCheck checkUnique, bool *is_unique,
242 				 uint32 *speculativeToken)
243 {
244 	TupleDesc	itupdesc = RelationGetDescr(rel);
245 	int			natts = rel->rd_rel->relnatts;
246 	SnapshotData SnapshotDirty;
247 	OffsetNumber maxoff;
248 	Page		page;
249 	BTPageOpaque opaque;
250 	Buffer		nbuf = InvalidBuffer;
251 	bool		found = false;
252 
253 	/* Assume unique until we find a duplicate */
254 	*is_unique = true;
255 
256 	InitDirtySnapshot(SnapshotDirty);
257 
258 	page = BufferGetPage(buf);
259 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
260 	maxoff = PageGetMaxOffsetNumber(page);
261 
262 	/*
263 	 * Scan over all equal tuples, looking for live conflicts.
264 	 */
265 	for (;;)
266 	{
267 		ItemId		curitemid;
268 		IndexTuple	curitup;
269 		BlockNumber nblkno;
270 
271 		/*
272 		 * make sure the offset points to an actual item before trying to
273 		 * examine it...
274 		 */
275 		if (offset <= maxoff)
276 		{
277 			curitemid = PageGetItemId(page, offset);
278 
279 			/*
280 			 * We can skip items that are marked killed.
281 			 *
282 			 * Formerly, we applied _bt_isequal() before checking the kill
283 			 * flag, so as to fall out of the item loop as soon as possible.
284 			 * However, in the presence of heavy update activity an index may
285 			 * contain many killed items with the same key; running
286 			 * _bt_isequal() on each killed item gets expensive. Furthermore
287 			 * it is likely that the non-killed version of each key appears
288 			 * first, so that we didn't actually get to exit any sooner
289 			 * anyway. So now we just advance over killed items as quickly as
290 			 * we can. We only apply _bt_isequal() when we get to a non-killed
291 			 * item or the end of the page.
292 			 */
293 			if (!ItemIdIsDead(curitemid))
294 			{
295 				ItemPointerData htid;
296 				bool		all_dead;
297 
298 				/*
299 				 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
300 				 * how we handling NULLs - and so we must not use _bt_compare
301 				 * in real comparison, but only for ordering/finding items on
302 				 * pages. - vadim 03/24/97
303 				 */
304 				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
305 					break;		/* we're past all the equal tuples */
306 
307 				/* okay, we gotta fetch the heap tuple ... */
308 				curitup = (IndexTuple) PageGetItem(page, curitemid);
309 				htid = curitup->t_tid;
310 
311 				/*
312 				 * If we are doing a recheck, we expect to find the tuple we
313 				 * are rechecking.  It's not a duplicate, but we have to keep
314 				 * scanning.
315 				 */
316 				if (checkUnique == UNIQUE_CHECK_EXISTING &&
317 					ItemPointerCompare(&htid, &itup->t_tid) == 0)
318 				{
319 					found = true;
320 				}
321 
322 				/*
323 				 * We check the whole HOT-chain to see if there is any tuple
324 				 * that satisfies SnapshotDirty.  This is necessary because we
325 				 * have just a single index entry for the entire chain.
326 				 */
327 				else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
328 										 &all_dead))
329 				{
330 					TransactionId xwait;
331 
332 					/*
333 					 * It is a duplicate. If we are only doing a partial
334 					 * check, then don't bother checking if the tuple is being
335 					 * updated in another transaction. Just return the fact
336 					 * that it is a potential conflict and leave the full
337 					 * check till later.
338 					 */
339 					if (checkUnique == UNIQUE_CHECK_PARTIAL)
340 					{
341 						if (nbuf != InvalidBuffer)
342 							_bt_relbuf(rel, nbuf);
343 						*is_unique = false;
344 						return InvalidTransactionId;
345 					}
346 
347 					/*
348 					 * If this tuple is being updated by other transaction
349 					 * then we have to wait for its commit/abort.
350 					 */
351 					xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
352 						SnapshotDirty.xmin : SnapshotDirty.xmax;
353 
354 					if (TransactionIdIsValid(xwait))
355 					{
356 						if (nbuf != InvalidBuffer)
357 							_bt_relbuf(rel, nbuf);
358 						/* Tell _bt_doinsert to wait... */
359 						*speculativeToken = SnapshotDirty.speculativeToken;
360 						return xwait;
361 					}
362 
363 					/*
364 					 * Otherwise we have a definite conflict.  But before
365 					 * complaining, look to see if the tuple we want to insert
366 					 * is itself now committed dead --- if so, don't complain.
367 					 * This is a waste of time in normal scenarios but we must
368 					 * do it to support CREATE INDEX CONCURRENTLY.
369 					 *
370 					 * We must follow HOT-chains here because during
371 					 * concurrent index build, we insert the root TID though
372 					 * the actual tuple may be somewhere in the HOT-chain.
373 					 * While following the chain we might not stop at the
374 					 * exact tuple which triggered the insert, but that's OK
375 					 * because if we find a live tuple anywhere in this chain,
376 					 * we have a unique key conflict.  The other live tuple is
377 					 * not part of this chain because it had a different index
378 					 * entry.
379 					 */
380 					htid = itup->t_tid;
381 					if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
382 					{
383 						/* Normal case --- it's still live */
384 					}
385 					else
386 					{
387 						/*
388 						 * It's been deleted, so no error, and no need to
389 						 * continue searching
390 						 */
391 						break;
392 					}
393 
394 					/*
395 					 * Check for a conflict-in as we would if we were going to
396 					 * write to this page.  We aren't actually going to write,
397 					 * but we want a chance to report SSI conflicts that would
398 					 * otherwise be masked by this unique constraint
399 					 * violation.
400 					 */
401 					CheckForSerializableConflictIn(rel, NULL, buf);
402 
403 					/*
404 					 * This is a definite conflict.  Break the tuple down into
405 					 * datums and report the error.  But first, make sure we
406 					 * release the buffer locks we're holding ---
407 					 * BuildIndexValueDescription could make catalog accesses,
408 					 * which in the worst case might touch this same index and
409 					 * cause deadlocks.
410 					 */
411 					if (nbuf != InvalidBuffer)
412 						_bt_relbuf(rel, nbuf);
413 					_bt_relbuf(rel, buf);
414 
415 					{
416 						Datum		values[INDEX_MAX_KEYS];
417 						bool		isnull[INDEX_MAX_KEYS];
418 						char	   *key_desc;
419 
420 						index_deform_tuple(itup, RelationGetDescr(rel),
421 										   values, isnull);
422 
423 						key_desc = BuildIndexValueDescription(rel, values,
424 															  isnull);
425 
426 						ereport(ERROR,
427 								(errcode(ERRCODE_UNIQUE_VIOLATION),
428 								 errmsg("duplicate key value violates unique constraint \"%s\"",
429 										RelationGetRelationName(rel)),
430 							   key_desc ? errdetail("Key %s already exists.",
431 													key_desc) : 0,
432 								 errtableconstraint(heapRel,
433 											 RelationGetRelationName(rel))));
434 					}
435 				}
436 				else if (all_dead)
437 				{
438 					/*
439 					 * The conflicting tuple (or whole HOT chain) is dead to
440 					 * everyone, so we may as well mark the index entry
441 					 * killed.
442 					 */
443 					ItemIdMarkDead(curitemid);
444 					opaque->btpo_flags |= BTP_HAS_GARBAGE;
445 
446 					/*
447 					 * Mark buffer with a dirty hint, since state is not
448 					 * crucial. Be sure to mark the proper buffer dirty.
449 					 */
450 					if (nbuf != InvalidBuffer)
451 						MarkBufferDirtyHint(nbuf, true);
452 					else
453 						MarkBufferDirtyHint(buf, true);
454 				}
455 			}
456 		}
457 
458 		/*
459 		 * Advance to next tuple to continue checking.
460 		 */
461 		if (offset < maxoff)
462 			offset = OffsetNumberNext(offset);
463 		else
464 		{
465 			/* If scankey == hikey we gotta check the next page too */
466 			if (P_RIGHTMOST(opaque))
467 				break;
468 			if (!_bt_isequal(itupdesc, page, P_HIKEY,
469 							 natts, itup_scankey))
470 				break;
471 			/* Advance to next non-dead page --- there must be one */
472 			for (;;)
473 			{
474 				nblkno = opaque->btpo_next;
475 				nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
476 				page = BufferGetPage(nbuf);
477 				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
478 				if (!P_IGNORE(opaque))
479 					break;
480 				if (P_RIGHTMOST(opaque))
481 					elog(ERROR, "fell off the end of index \"%s\"",
482 						 RelationGetRelationName(rel));
483 			}
484 			maxoff = PageGetMaxOffsetNumber(page);
485 			offset = P_FIRSTDATAKEY(opaque);
486 		}
487 	}
488 
489 	/*
490 	 * If we are doing a recheck then we should have found the tuple we are
491 	 * checking.  Otherwise there's something very wrong --- probably, the
492 	 * index is on a non-immutable expression.
493 	 */
494 	if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
495 		ereport(ERROR,
496 				(errcode(ERRCODE_INTERNAL_ERROR),
497 				 errmsg("failed to re-find tuple within index \"%s\"",
498 						RelationGetRelationName(rel)),
499 		 errhint("This may be because of a non-immutable index expression."),
500 				 errtableconstraint(heapRel,
501 									RelationGetRelationName(rel))));
502 
503 	if (nbuf != InvalidBuffer)
504 		_bt_relbuf(rel, nbuf);
505 
506 	return InvalidTransactionId;
507 }
508 
509 
510 /*
511  *	_bt_findinsertloc() -- Finds an insert location for a tuple
512  *
513  *		If the new key is equal to one or more existing keys, we can
514  *		legitimately place it anywhere in the series of equal keys --- in fact,
515  *		if the new key is equal to the page's "high key" we can place it on
516  *		the next page.  If it is equal to the high key, and there's not room
517  *		to insert the new tuple on the current page without splitting, then
518  *		we can move right hoping to find more free space and avoid a split.
519  *		(We should not move right indefinitely, however, since that leads to
520  *		O(N^2) insertion behavior in the presence of many equal keys.)
521  *		Once we have chosen the page to put the key on, we'll insert it before
522  *		any existing equal keys because of the way _bt_binsrch() works.
523  *
524  *		If there's not enough room in the space, we try to make room by
525  *		removing any LP_DEAD tuples.
526  *
527  *		On entry, *bufptr and *offsetptr point to the first legal position
528  *		where the new tuple could be inserted.  The caller should hold an
529  *		exclusive lock on *bufptr.  *offsetptr can also be set to
530  *		InvalidOffsetNumber, in which case the function will search for the
531  *		right location within the page if needed.  On exit, they point to the
532  *		chosen insert location.  If _bt_findinsertloc decides to move right,
533  *		the lock and pin on the original page will be released and the new
534  *		page returned to the caller is exclusively locked instead.
535  *
536  *		newtup is the new tuple we're inserting, and scankey is an insertion
537  *		type scan key for it.
538  */
539 static void
_bt_findinsertloc(Relation rel,Buffer * bufptr,OffsetNumber * offsetptr,int keysz,ScanKey scankey,IndexTuple newtup,BTStack stack,Relation heapRel)540 _bt_findinsertloc(Relation rel,
541 				  Buffer *bufptr,
542 				  OffsetNumber *offsetptr,
543 				  int keysz,
544 				  ScanKey scankey,
545 				  IndexTuple newtup,
546 				  BTStack stack,
547 				  Relation heapRel)
548 {
549 	Buffer		buf = *bufptr;
550 	Page		page = BufferGetPage(buf);
551 	Size		itemsz;
552 	BTPageOpaque lpageop;
553 	bool		movedright,
554 				vacuumed;
555 	OffsetNumber newitemoff;
556 	OffsetNumber firstlegaloff = *offsetptr;
557 
558 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
559 
560 	itemsz = IndexTupleDSize(*newtup);
561 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
562 								 * need to be consistent */
563 
564 	/*
565 	 * Check whether the item can fit on a btree page at all. (Eventually, we
566 	 * ought to try to apply TOAST methods if not.) We actually need to be
567 	 * able to fit three items on every page, so restrict any one item to 1/3
568 	 * the per-page available space. Note that at this point, itemsz doesn't
569 	 * include the ItemId.
570 	 *
571 	 * NOTE: if you change this, see also the similar code in _bt_buildadd().
572 	 */
573 	if (itemsz > BTMaxItemSize(page))
574 		ereport(ERROR,
575 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
576 			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
577 				   itemsz, BTMaxItemSize(page),
578 				   RelationGetRelationName(rel)),
579 		errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
580 				"Consider a function index of an MD5 hash of the value, "
581 				"or use full text indexing."),
582 				 errtableconstraint(heapRel,
583 									RelationGetRelationName(rel))));
584 
585 	/*----------
586 	 * If we will need to split the page to put the item on this page,
587 	 * check whether we can put the tuple somewhere to the right,
588 	 * instead.  Keep scanning right until we
589 	 *		(a) find a page with enough free space,
590 	 *		(b) reach the last page where the tuple can legally go, or
591 	 *		(c) get tired of searching.
592 	 * (c) is not flippant; it is important because if there are many
593 	 * pages' worth of equal keys, it's better to split one of the early
594 	 * pages than to scan all the way to the end of the run of equal keys
595 	 * on every insert.  We implement "get tired" as a random choice,
596 	 * since stopping after scanning a fixed number of pages wouldn't work
597 	 * well (we'd never reach the right-hand side of previously split
598 	 * pages).  Currently the probability of moving right is set at 0.99,
599 	 * which may seem too high to change the behavior much, but it does an
600 	 * excellent job of preventing O(N^2) behavior with many equal keys.
601 	 *----------
602 	 */
603 	movedright = false;
604 	vacuumed = false;
605 	while (PageGetFreeSpace(page) < itemsz)
606 	{
607 		Buffer		rbuf;
608 		BlockNumber rblkno;
609 
610 		/*
611 		 * before considering moving right, see if we can obtain enough space
612 		 * by erasing LP_DEAD items
613 		 */
614 		if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
615 		{
616 			_bt_vacuum_one_page(rel, buf, heapRel);
617 
618 			/*
619 			 * remember that we vacuumed this page, because that makes the
620 			 * hint supplied by the caller invalid
621 			 */
622 			vacuumed = true;
623 
624 			if (PageGetFreeSpace(page) >= itemsz)
625 				break;			/* OK, now we have enough space */
626 		}
627 
628 		/*
629 		 * nope, so check conditions (b) and (c) enumerated above
630 		 */
631 		if (P_RIGHTMOST(lpageop) ||
632 			_bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
633 			random() <= (MAX_RANDOM_VALUE / 100))
634 			break;
635 
636 		/*
637 		 * step right to next non-dead page
638 		 *
639 		 * must write-lock that page before releasing write lock on current
640 		 * page; else someone else's _bt_check_unique scan could fail to see
641 		 * our insertion.  write locks on intermediate dead pages won't do
642 		 * because we don't know when they will get de-linked from the tree.
643 		 */
644 		rbuf = InvalidBuffer;
645 
646 		rblkno = lpageop->btpo_next;
647 		for (;;)
648 		{
649 			rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
650 			page = BufferGetPage(rbuf);
651 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
652 
653 			/*
654 			 * If this page was incompletely split, finish the split now. We
655 			 * do this while holding a lock on the left sibling, which is not
656 			 * good because finishing the split could be a fairly lengthy
657 			 * operation.  But this should happen very seldom.
658 			 */
659 			if (P_INCOMPLETE_SPLIT(lpageop))
660 			{
661 				_bt_finish_split(rel, rbuf, stack);
662 				rbuf = InvalidBuffer;
663 				continue;
664 			}
665 
666 			if (!P_IGNORE(lpageop))
667 				break;
668 			if (P_RIGHTMOST(lpageop))
669 				elog(ERROR, "fell off the end of index \"%s\"",
670 					 RelationGetRelationName(rel));
671 
672 			rblkno = lpageop->btpo_next;
673 		}
674 		_bt_relbuf(rel, buf);
675 		buf = rbuf;
676 		movedright = true;
677 		vacuumed = false;
678 	}
679 
680 	/*
681 	 * Now we are on the right page, so find the insert position. If we moved
682 	 * right at all, we know we should insert at the start of the page. If we
683 	 * didn't move right, we can use the firstlegaloff hint if the caller
684 	 * supplied one, unless we vacuumed the page which might have moved tuples
685 	 * around making the hint invalid. If we didn't move right or can't use
686 	 * the hint, find the position by searching.
687 	 */
688 	if (movedright)
689 		newitemoff = P_FIRSTDATAKEY(lpageop);
690 	else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
691 		newitemoff = firstlegaloff;
692 	else
693 		newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
694 
695 	*bufptr = buf;
696 	*offsetptr = newitemoff;
697 }
698 
699 /*----------
700  *	_bt_insertonpg() -- Insert a tuple on a particular page in the index.
701  *
702  *		This recursive procedure does the following things:
703  *
704  *			+  if necessary, splits the target page (making sure that the
705  *			   split is equitable as far as post-insert free space goes).
706  *			+  inserts the tuple.
707  *			+  if the page was split, pops the parent stack, and finds the
708  *			   right place to insert the new child pointer (by walking
709  *			   right using information stored in the parent stack).
710  *			+  invokes itself with the appropriate tuple for the right
711  *			   child page on the parent.
712  *			+  updates the metapage if a true root or fast root is split.
713  *
714  *		On entry, we must have the correct buffer in which to do the
715  *		insertion, and the buffer must be pinned and write-locked.  On return,
716  *		we will have dropped both the pin and the lock on the buffer.
717  *
718  *		When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
719  *		page we're inserting the downlink for.  This function will clear the
720  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
721  *
722  *		The locking interactions in this code are critical.  You should
723  *		grok Lehman and Yao's paper before making any changes.  In addition,
724  *		you need to understand how we disambiguate duplicate keys in this
725  *		implementation, in order to be able to find our location using
726  *		L&Y "move right" operations.  Since we may insert duplicate user
727  *		keys, and since these dups may propagate up the tree, we use the
728  *		'afteritem' parameter to position ourselves correctly for the
729  *		insertion on internal pages.
730  *----------
731  */
732 static void
_bt_insertonpg(Relation rel,Buffer buf,Buffer cbuf,BTStack stack,IndexTuple itup,OffsetNumber newitemoff,bool split_only_page)733 _bt_insertonpg(Relation rel,
734 			   Buffer buf,
735 			   Buffer cbuf,
736 			   BTStack stack,
737 			   IndexTuple itup,
738 			   OffsetNumber newitemoff,
739 			   bool split_only_page)
740 {
741 	Page		page;
742 	BTPageOpaque lpageop;
743 	OffsetNumber firstright = InvalidOffsetNumber;
744 	Size		itemsz;
745 
746 	page = BufferGetPage(buf);
747 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
748 
749 	/* child buffer must be given iff inserting on an internal page */
750 	Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
751 
752 	/* The caller should've finished any incomplete splits already. */
753 	if (P_INCOMPLETE_SPLIT(lpageop))
754 		elog(ERROR, "cannot insert to incompletely split page %u",
755 			 BufferGetBlockNumber(buf));
756 
757 	itemsz = IndexTupleDSize(*itup);
758 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
759 								 * need to be consistent */
760 
761 	/*
762 	 * Do we need to split the page to fit the item on it?
763 	 *
764 	 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
765 	 * so this comparison is correct even though we appear to be accounting
766 	 * only for the item and not for its line pointer.
767 	 */
768 	if (PageGetFreeSpace(page) < itemsz)
769 	{
770 		bool		is_root = P_ISROOT(lpageop);
771 		bool		is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
772 		bool		newitemonleft;
773 		Buffer		rbuf;
774 
775 		/* Choose the split point */
776 		firstright = _bt_findsplitloc(rel, page,
777 									  newitemoff, itemsz,
778 									  &newitemonleft);
779 
780 		/* split the buffer into left and right halves */
781 		rbuf = _bt_split(rel, buf, cbuf, firstright,
782 						 newitemoff, itemsz, itup, newitemonleft);
783 		PredicateLockPageSplit(rel,
784 							   BufferGetBlockNumber(buf),
785 							   BufferGetBlockNumber(rbuf));
786 
787 		/*----------
788 		 * By here,
789 		 *
790 		 *		+  our target page has been split;
791 		 *		+  the original tuple has been inserted;
792 		 *		+  we have write locks on both the old (left half)
793 		 *		   and new (right half) buffers, after the split; and
794 		 *		+  we know the key we want to insert into the parent
795 		 *		   (it's the "high key" on the left child page).
796 		 *
797 		 * We're ready to do the parent insertion.  We need to hold onto the
798 		 * locks for the child pages until we locate the parent, but we can
799 		 * release them before doing the actual insertion (see Lehman and Yao
800 		 * for the reasoning).
801 		 *----------
802 		 */
803 		_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
804 	}
805 	else
806 	{
807 		Buffer		metabuf = InvalidBuffer;
808 		Page		metapg = NULL;
809 		BTMetaPageData *metad = NULL;
810 		OffsetNumber itup_off;
811 		BlockNumber itup_blkno;
812 
813 		itup_off = newitemoff;
814 		itup_blkno = BufferGetBlockNumber(buf);
815 
816 		/*
817 		 * If we are doing this insert because we split a page that was the
818 		 * only one on its tree level, but was not the root, it may have been
819 		 * the "fast root".  We need to ensure that the fast root link points
820 		 * at or above the current page.  We can safely acquire a lock on the
821 		 * metapage here --- see comments for _bt_newroot().
822 		 */
823 		if (split_only_page)
824 		{
825 			Assert(!P_ISLEAF(lpageop));
826 
827 			metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
828 			metapg = BufferGetPage(metabuf);
829 			metad = BTPageGetMeta(metapg);
830 
831 			if (metad->btm_fastlevel >= lpageop->btpo.level)
832 			{
833 				/* no update wanted */
834 				_bt_relbuf(rel, metabuf);
835 				metabuf = InvalidBuffer;
836 			}
837 		}
838 
839 		/* Do the update.  No ereport(ERROR) until changes are logged */
840 		START_CRIT_SECTION();
841 
842 		if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
843 			elog(PANIC, "failed to add new item to block %u in index \"%s\"",
844 				 itup_blkno, RelationGetRelationName(rel));
845 
846 		MarkBufferDirty(buf);
847 
848 		if (BufferIsValid(metabuf))
849 		{
850 			metad->btm_fastroot = itup_blkno;
851 			metad->btm_fastlevel = lpageop->btpo.level;
852 			MarkBufferDirty(metabuf);
853 		}
854 
855 		/* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
856 		if (BufferIsValid(cbuf))
857 		{
858 			Page		cpage = BufferGetPage(cbuf);
859 			BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
860 
861 			Assert(P_INCOMPLETE_SPLIT(cpageop));
862 			cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
863 			MarkBufferDirty(cbuf);
864 		}
865 
866 		/* XLOG stuff */
867 		if (RelationNeedsWAL(rel))
868 		{
869 			xl_btree_insert xlrec;
870 			xl_btree_metadata xlmeta;
871 			uint8		xlinfo;
872 			XLogRecPtr	recptr;
873 			IndexTupleData trunctuple;
874 
875 			xlrec.offnum = itup_off;
876 
877 			XLogBeginInsert();
878 			XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
879 
880 			if (P_ISLEAF(lpageop))
881 				xlinfo = XLOG_BTREE_INSERT_LEAF;
882 			else
883 			{
884 				/*
885 				 * Register the left child whose INCOMPLETE_SPLIT flag was
886 				 * cleared.
887 				 */
888 				XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
889 
890 				xlinfo = XLOG_BTREE_INSERT_UPPER;
891 			}
892 
893 			if (BufferIsValid(metabuf))
894 			{
895 				xlmeta.root = metad->btm_root;
896 				xlmeta.level = metad->btm_level;
897 				xlmeta.fastroot = metad->btm_fastroot;
898 				xlmeta.fastlevel = metad->btm_fastlevel;
899 
900 				XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
901 				XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
902 
903 				xlinfo = XLOG_BTREE_INSERT_META;
904 			}
905 
906 			/* Read comments in _bt_pgaddtup */
907 			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
908 			if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
909 			{
910 				trunctuple = *itup;
911 				trunctuple.t_info = sizeof(IndexTupleData);
912 				XLogRegisterBufData(0, (char *) &trunctuple,
913 									sizeof(IndexTupleData));
914 			}
915 			else
916 				XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
917 
918 			recptr = XLogInsert(RM_BTREE_ID, xlinfo);
919 
920 			if (BufferIsValid(metabuf))
921 			{
922 				PageSetLSN(metapg, recptr);
923 			}
924 			if (BufferIsValid(cbuf))
925 			{
926 				PageSetLSN(BufferGetPage(cbuf), recptr);
927 			}
928 
929 			PageSetLSN(page, recptr);
930 		}
931 
932 		END_CRIT_SECTION();
933 
934 		/* release buffers */
935 		if (BufferIsValid(metabuf))
936 			_bt_relbuf(rel, metabuf);
937 		if (BufferIsValid(cbuf))
938 			_bt_relbuf(rel, cbuf);
939 		_bt_relbuf(rel, buf);
940 	}
941 }
942 
943 /*
944  *	_bt_split() -- split a page in the btree.
945  *
946  *		On entry, buf is the page to split, and is pinned and write-locked.
947  *		firstright is the item index of the first item to be moved to the
948  *		new right page.  newitemoff etc. tell us about the new item that
949  *		must be inserted along with the data from the old page.
950  *
951  *		When splitting a non-leaf page, 'cbuf' is the left-sibling of the
952  *		page we're inserting the downlink for.  This function will clear the
953  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
954  *
955  *		Returns the new right sibling of buf, pinned and write-locked.
956  *		The pin and lock on buf are maintained.
957  */
958 static Buffer
_bt_split(Relation rel,Buffer buf,Buffer cbuf,OffsetNumber firstright,OffsetNumber newitemoff,Size newitemsz,IndexTuple newitem,bool newitemonleft)959 _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
960 		  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
961 		  bool newitemonleft)
962 {
963 	Buffer		rbuf;
964 	Page		origpage;
965 	Page		leftpage,
966 				rightpage;
967 	BlockNumber origpagenumber,
968 				rightpagenumber;
969 	BTPageOpaque ropaque,
970 				lopaque,
971 				oopaque;
972 	Buffer		sbuf = InvalidBuffer;
973 	Page		spage = NULL;
974 	BTPageOpaque sopaque = NULL;
975 	Size		itemsz;
976 	ItemId		itemid;
977 	IndexTuple	item;
978 	OffsetNumber leftoff,
979 				rightoff;
980 	OffsetNumber maxoff;
981 	OffsetNumber i;
982 	bool		isroot;
983 	bool		isleaf;
984 
985 	/* Acquire a new page to split into */
986 	rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
987 
988 	/*
989 	 * origpage is the original page to be split.  leftpage is a temporary
990 	 * buffer that receives the left-sibling data, which will be copied back
991 	 * into origpage on success.  rightpage is the new page that receives the
992 	 * right-sibling data.  If we fail before reaching the critical section,
993 	 * origpage hasn't been modified and leftpage is only workspace. In
994 	 * principle we shouldn't need to worry about rightpage either, because it
995 	 * hasn't been linked into the btree page structure; but to avoid leaving
996 	 * possibly-confusing junk behind, we are careful to rewrite rightpage as
997 	 * zeroes before throwing any error.
998 	 */
999 	origpage = BufferGetPage(buf);
1000 	leftpage = PageGetTempPage(origpage);
1001 	rightpage = BufferGetPage(rbuf);
1002 
1003 	origpagenumber = BufferGetBlockNumber(buf);
1004 	rightpagenumber = BufferGetBlockNumber(rbuf);
1005 
1006 	_bt_pageinit(leftpage, BufferGetPageSize(buf));
1007 	/* rightpage was already initialized by _bt_getbuf */
1008 
1009 	/*
1010 	 * Copy the original page's LSN into leftpage, which will become the
1011 	 * updated version of the page.  We need this because XLogInsert will
1012 	 * examine the LSN and possibly dump it in a page image.
1013 	 */
1014 	PageSetLSN(leftpage, PageGetLSN(origpage));
1015 
1016 	/* init btree private data */
1017 	oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1018 	lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1019 	ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1020 
1021 	isroot = P_ISROOT(oopaque);
1022 	isleaf = P_ISLEAF(oopaque);
1023 
1024 	/* if we're splitting this page, it won't be the root when we're done */
1025 	/* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
1026 	lopaque->btpo_flags = oopaque->btpo_flags;
1027 	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1028 	ropaque->btpo_flags = lopaque->btpo_flags;
1029 	/* set flag in left page indicating that the right page has no downlink */
1030 	lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1031 	lopaque->btpo_prev = oopaque->btpo_prev;
1032 	lopaque->btpo_next = rightpagenumber;
1033 	ropaque->btpo_prev = origpagenumber;
1034 	ropaque->btpo_next = oopaque->btpo_next;
1035 	lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
1036 	/* Since we already have write-lock on both pages, ok to read cycleid */
1037 	lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1038 	ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1039 
1040 	/*
1041 	 * If the page we're splitting is not the rightmost page at its level in
1042 	 * the tree, then the first entry on the page is the high key for the
1043 	 * page.  We need to copy that to the right half.  Otherwise (meaning the
1044 	 * rightmost page case), all the items on the right half will be user
1045 	 * data.
1046 	 */
1047 	rightoff = P_HIKEY;
1048 
1049 	if (!P_RIGHTMOST(oopaque))
1050 	{
1051 		itemid = PageGetItemId(origpage, P_HIKEY);
1052 		itemsz = ItemIdGetLength(itemid);
1053 		item = (IndexTuple) PageGetItem(origpage, itemid);
1054 		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1055 						false, false) == InvalidOffsetNumber)
1056 		{
1057 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1058 			elog(ERROR, "failed to add hikey to the right sibling"
1059 				 " while splitting block %u of index \"%s\"",
1060 				 origpagenumber, RelationGetRelationName(rel));
1061 		}
1062 		rightoff = OffsetNumberNext(rightoff);
1063 	}
1064 
1065 	/*
1066 	 * The "high key" for the new left page will be the first key that's going
1067 	 * to go into the new right page.  This might be either the existing data
1068 	 * item at position firstright, or the incoming tuple.
1069 	 */
1070 	leftoff = P_HIKEY;
1071 	if (!newitemonleft && newitemoff == firstright)
1072 	{
1073 		/* incoming tuple will become first on right page */
1074 		itemsz = newitemsz;
1075 		item = newitem;
1076 	}
1077 	else
1078 	{
1079 		/* existing item at firstright will become first on right page */
1080 		itemid = PageGetItemId(origpage, firstright);
1081 		itemsz = ItemIdGetLength(itemid);
1082 		item = (IndexTuple) PageGetItem(origpage, itemid);
1083 	}
1084 	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1085 					false, false) == InvalidOffsetNumber)
1086 	{
1087 		memset(rightpage, 0, BufferGetPageSize(rbuf));
1088 		elog(ERROR, "failed to add hikey to the left sibling"
1089 			 " while splitting block %u of index \"%s\"",
1090 			 origpagenumber, RelationGetRelationName(rel));
1091 	}
1092 	leftoff = OffsetNumberNext(leftoff);
1093 
1094 	/*
1095 	 * Now transfer all the data items to the appropriate page.
1096 	 *
1097 	 * Note: we *must* insert at least the right page's items in item-number
1098 	 * order, for the benefit of _bt_restore_page().
1099 	 */
1100 	maxoff = PageGetMaxOffsetNumber(origpage);
1101 
1102 	for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1103 	{
1104 		itemid = PageGetItemId(origpage, i);
1105 		itemsz = ItemIdGetLength(itemid);
1106 		item = (IndexTuple) PageGetItem(origpage, itemid);
1107 
1108 		/* does new item belong before this one? */
1109 		if (i == newitemoff)
1110 		{
1111 			if (newitemonleft)
1112 			{
1113 				if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1114 				{
1115 					memset(rightpage, 0, BufferGetPageSize(rbuf));
1116 					elog(ERROR, "failed to add new item to the left sibling"
1117 						 " while splitting block %u of index \"%s\"",
1118 						 origpagenumber, RelationGetRelationName(rel));
1119 				}
1120 				leftoff = OffsetNumberNext(leftoff);
1121 			}
1122 			else
1123 			{
1124 				if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1125 				{
1126 					memset(rightpage, 0, BufferGetPageSize(rbuf));
1127 					elog(ERROR, "failed to add new item to the right sibling"
1128 						 " while splitting block %u of index \"%s\"",
1129 						 origpagenumber, RelationGetRelationName(rel));
1130 				}
1131 				rightoff = OffsetNumberNext(rightoff);
1132 			}
1133 		}
1134 
1135 		/* decide which page to put it on */
1136 		if (i < firstright)
1137 		{
1138 			if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1139 			{
1140 				memset(rightpage, 0, BufferGetPageSize(rbuf));
1141 				elog(ERROR, "failed to add old item to the left sibling"
1142 					 " while splitting block %u of index \"%s\"",
1143 					 origpagenumber, RelationGetRelationName(rel));
1144 			}
1145 			leftoff = OffsetNumberNext(leftoff);
1146 		}
1147 		else
1148 		{
1149 			if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1150 			{
1151 				memset(rightpage, 0, BufferGetPageSize(rbuf));
1152 				elog(ERROR, "failed to add old item to the right sibling"
1153 					 " while splitting block %u of index \"%s\"",
1154 					 origpagenumber, RelationGetRelationName(rel));
1155 			}
1156 			rightoff = OffsetNumberNext(rightoff);
1157 		}
1158 	}
1159 
1160 	/* cope with possibility that newitem goes at the end */
1161 	if (i <= newitemoff)
1162 	{
1163 		/*
1164 		 * Can't have newitemonleft here; that would imply we were told to put
1165 		 * *everything* on the left page, which cannot fit (if it could, we'd
1166 		 * not be splitting the page).
1167 		 */
1168 		Assert(!newitemonleft);
1169 		if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1170 		{
1171 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1172 			elog(ERROR, "failed to add new item to the right sibling"
1173 				 " while splitting block %u of index \"%s\"",
1174 				 origpagenumber, RelationGetRelationName(rel));
1175 		}
1176 		rightoff = OffsetNumberNext(rightoff);
1177 	}
1178 
1179 	/*
1180 	 * We have to grab the right sibling (if any) and fix the prev pointer
1181 	 * there. We are guaranteed that this is deadlock-free since no other
1182 	 * writer will be holding a lock on that page and trying to move left, and
1183 	 * all readers release locks on a page before trying to fetch its
1184 	 * neighbors.
1185 	 */
1186 
1187 	if (!P_RIGHTMOST(oopaque))
1188 	{
1189 		sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1190 		spage = BufferGetPage(sbuf);
1191 		sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1192 		if (sopaque->btpo_prev != origpagenumber)
1193 		{
1194 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1195 			elog(ERROR, "right sibling's left-link doesn't match: "
1196 			   "block %u links to %u instead of expected %u in index \"%s\"",
1197 				 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1198 				 RelationGetRelationName(rel));
1199 		}
1200 
1201 		/*
1202 		 * Check to see if we can set the SPLIT_END flag in the right-hand
1203 		 * split page; this can save some I/O for vacuum since it need not
1204 		 * proceed to the right sibling.  We can set the flag if the right
1205 		 * sibling has a different cycleid: that means it could not be part of
1206 		 * a group of pages that were all split off from the same ancestor
1207 		 * page.  If you're confused, imagine that page A splits to A B and
1208 		 * then again, yielding A C B, while vacuum is in progress.  Tuples
1209 		 * originally in A could now be in either B or C, hence vacuum must
1210 		 * examine both pages.  But if D, our right sibling, has a different
1211 		 * cycleid then it could not contain any tuples that were in A when
1212 		 * the vacuum started.
1213 		 */
1214 		if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1215 			ropaque->btpo_flags |= BTP_SPLIT_END;
1216 	}
1217 
1218 	/*
1219 	 * Right sibling is locked, new siblings are prepared, but original page
1220 	 * is not updated yet.
1221 	 *
1222 	 * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
1223 	 * not starting the critical section till here because we haven't been
1224 	 * scribbling on the original page yet; see comments above.
1225 	 */
1226 	START_CRIT_SECTION();
1227 
1228 	/*
1229 	 * By here, the original data page has been split into two new halves, and
1230 	 * these are correct.  The algorithm requires that the left page never
1231 	 * move during a split, so we copy the new left page back on top of the
1232 	 * original.  Note that this is not a waste of time, since we also require
1233 	 * (in the page management code) that the center of a page always be
1234 	 * clean, and the most efficient way to guarantee this is just to compact
1235 	 * the data by reinserting it into a new left page.  (XXX the latter
1236 	 * comment is probably obsolete; but in any case it's good to not scribble
1237 	 * on the original page until we enter the critical section.)
1238 	 *
1239 	 * We need to do this before writing the WAL record, so that XLogInsert
1240 	 * can WAL log an image of the page if necessary.
1241 	 */
1242 	PageRestoreTempPage(leftpage, origpage);
1243 	/* leftpage, lopaque must not be used below here */
1244 
1245 	MarkBufferDirty(buf);
1246 	MarkBufferDirty(rbuf);
1247 
1248 	if (!P_RIGHTMOST(ropaque))
1249 	{
1250 		sopaque->btpo_prev = rightpagenumber;
1251 		MarkBufferDirty(sbuf);
1252 	}
1253 
1254 	/*
1255 	 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1256 	 * a split.
1257 	 */
1258 	if (!isleaf)
1259 	{
1260 		Page		cpage = BufferGetPage(cbuf);
1261 		BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1262 
1263 		cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1264 		MarkBufferDirty(cbuf);
1265 	}
1266 
1267 	/* XLOG stuff */
1268 	if (RelationNeedsWAL(rel))
1269 	{
1270 		xl_btree_split xlrec;
1271 		uint8		xlinfo;
1272 		XLogRecPtr	recptr;
1273 
1274 		xlrec.level = ropaque->btpo.level;
1275 		xlrec.firstright = firstright;
1276 		xlrec.newitemoff = newitemoff;
1277 
1278 		XLogBeginInsert();
1279 		XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1280 
1281 		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1282 		XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1283 		/* Log the right sibling, because we've changed its prev-pointer. */
1284 		if (!P_RIGHTMOST(ropaque))
1285 			XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1286 		if (BufferIsValid(cbuf))
1287 			XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1288 
1289 		/*
1290 		 * Log the new item, if it was inserted on the left page. (If it was
1291 		 * put on the right page, we don't need to explicitly WAL log it
1292 		 * because it's included with all the other items on the right page.)
1293 		 * Show the new item as belonging to the left page buffer, so that it
1294 		 * is not stored if XLogInsert decides it needs a full-page image of
1295 		 * the left page.  We store the offset anyway, though, to support
1296 		 * archive compression of these records.
1297 		 */
1298 		if (newitemonleft)
1299 			XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1300 
1301 		/* Log left page */
1302 		if (!isleaf)
1303 		{
1304 			/*
1305 			 * We must also log the left page's high key, because the right
1306 			 * page's leftmost key is suppressed on non-leaf levels.  Show it
1307 			 * as belonging to the left page buffer, so that it is not stored
1308 			 * if XLogInsert decides it needs a full-page image of the left
1309 			 * page.
1310 			 */
1311 			itemid = PageGetItemId(origpage, P_HIKEY);
1312 			item = (IndexTuple) PageGetItem(origpage, itemid);
1313 			XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1314 		}
1315 
1316 		/*
1317 		 * Log the contents of the right page in the format understood by
1318 		 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1319 		 * because we're going to recreate the whole page anyway, so it should
1320 		 * never be stored by XLogInsert.
1321 		 *
1322 		 * Direct access to page is not good but faster - we should implement
1323 		 * some new func in page API.  Note we only store the tuples
1324 		 * themselves, knowing that they were inserted in item-number order
1325 		 * and so the item pointers can be reconstructed.  See comments for
1326 		 * _bt_restore_page().
1327 		 */
1328 		XLogRegisterBufData(1,
1329 					 (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1330 							((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1331 
1332 		if (isroot)
1333 			xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1334 		else
1335 			xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1336 
1337 		recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1338 
1339 		PageSetLSN(origpage, recptr);
1340 		PageSetLSN(rightpage, recptr);
1341 		if (!P_RIGHTMOST(ropaque))
1342 		{
1343 			PageSetLSN(spage, recptr);
1344 		}
1345 		if (!isleaf)
1346 		{
1347 			PageSetLSN(BufferGetPage(cbuf), recptr);
1348 		}
1349 	}
1350 
1351 	END_CRIT_SECTION();
1352 
1353 	/* release the old right sibling */
1354 	if (!P_RIGHTMOST(ropaque))
1355 		_bt_relbuf(rel, sbuf);
1356 
1357 	/* release the child */
1358 	if (!isleaf)
1359 		_bt_relbuf(rel, cbuf);
1360 
1361 	/* split's done */
1362 	return rbuf;
1363 }
1364 
1365 /*
1366  *	_bt_findsplitloc() -- find an appropriate place to split a page.
1367  *
1368  * The idea here is to equalize the free space that will be on each split
1369  * page, *after accounting for the inserted tuple*.  (If we fail to account
1370  * for it, we might find ourselves with too little room on the page that
1371  * it needs to go into!)
1372  *
1373  * If the page is the rightmost page on its level, we instead try to arrange
1374  * to leave the left split page fillfactor% full.  In this way, when we are
1375  * inserting successively increasing keys (consider sequences, timestamps,
1376  * etc) we will end up with a tree whose pages are about fillfactor% full,
1377  * instead of the 50% full result that we'd get without this special case.
1378  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1379  * that leaf and nonleaf pages use different fillfactors.
1380  *
1381  * We are passed the intended insert position of the new tuple, expressed as
1382  * the offsetnumber of the tuple it must go in front of.  (This could be
1383  * maxoff+1 if the tuple is to go at the end.)
1384  *
1385  * We return the index of the first existing tuple that should go on the
1386  * righthand page, plus a boolean indicating whether the new tuple goes on
1387  * the left or right page.  The bool is necessary to disambiguate the case
1388  * where firstright == newitemoff.
1389  */
1390 static OffsetNumber
_bt_findsplitloc(Relation rel,Page page,OffsetNumber newitemoff,Size newitemsz,bool * newitemonleft)1391 _bt_findsplitloc(Relation rel,
1392 				 Page page,
1393 				 OffsetNumber newitemoff,
1394 				 Size newitemsz,
1395 				 bool *newitemonleft)
1396 {
1397 	BTPageOpaque opaque;
1398 	OffsetNumber offnum;
1399 	OffsetNumber maxoff;
1400 	ItemId		itemid;
1401 	FindSplitData state;
1402 	int			leftspace,
1403 				rightspace,
1404 				goodenough,
1405 				olddataitemstotal,
1406 				olddataitemstoleft;
1407 	bool		goodenoughfound;
1408 
1409 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1410 
1411 	/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1412 	newitemsz += sizeof(ItemIdData);
1413 
1414 	/* Total free space available on a btree page, after fixed overhead */
1415 	leftspace = rightspace =
1416 		PageGetPageSize(page) - SizeOfPageHeaderData -
1417 		MAXALIGN(sizeof(BTPageOpaqueData));
1418 
1419 	/* The right page will have the same high key as the old page */
1420 	if (!P_RIGHTMOST(opaque))
1421 	{
1422 		itemid = PageGetItemId(page, P_HIKEY);
1423 		rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1424 							 sizeof(ItemIdData));
1425 	}
1426 
1427 	/* Count up total space in data items without actually scanning 'em */
1428 	olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1429 
1430 	state.newitemsz = newitemsz;
1431 	state.is_leaf = P_ISLEAF(opaque);
1432 	state.is_rightmost = P_RIGHTMOST(opaque);
1433 	state.have_split = false;
1434 	if (state.is_leaf)
1435 		state.fillfactor = RelationGetFillFactor(rel,
1436 												 BTREE_DEFAULT_FILLFACTOR);
1437 	else
1438 		state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1439 	state.newitemonleft = false;	/* these just to keep compiler quiet */
1440 	state.firstright = 0;
1441 	state.best_delta = 0;
1442 	state.leftspace = leftspace;
1443 	state.rightspace = rightspace;
1444 	state.olddataitemstotal = olddataitemstotal;
1445 	state.newitemoff = newitemoff;
1446 
1447 	/*
1448 	 * Finding the best possible split would require checking all the possible
1449 	 * split points, because of the high-key and left-key special cases.
1450 	 * That's probably more work than it's worth; instead, stop as soon as we
1451 	 * find a "good-enough" split, where good-enough is defined as an
1452 	 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1453 	 * should let us stop near the middle on most pages, instead of plowing to
1454 	 * the end.
1455 	 */
1456 	goodenough = leftspace / 16;
1457 
1458 	/*
1459 	 * Scan through the data items and calculate space usage for a split at
1460 	 * each possible position.
1461 	 */
1462 	olddataitemstoleft = 0;
1463 	goodenoughfound = false;
1464 	maxoff = PageGetMaxOffsetNumber(page);
1465 
1466 	for (offnum = P_FIRSTDATAKEY(opaque);
1467 		 offnum <= maxoff;
1468 		 offnum = OffsetNumberNext(offnum))
1469 	{
1470 		Size		itemsz;
1471 
1472 		itemid = PageGetItemId(page, offnum);
1473 		itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1474 
1475 		/*
1476 		 * Will the new item go to left or right of split?
1477 		 */
1478 		if (offnum > newitemoff)
1479 			_bt_checksplitloc(&state, offnum, true,
1480 							  olddataitemstoleft, itemsz);
1481 
1482 		else if (offnum < newitemoff)
1483 			_bt_checksplitloc(&state, offnum, false,
1484 							  olddataitemstoleft, itemsz);
1485 		else
1486 		{
1487 			/* need to try it both ways! */
1488 			_bt_checksplitloc(&state, offnum, true,
1489 							  olddataitemstoleft, itemsz);
1490 
1491 			_bt_checksplitloc(&state, offnum, false,
1492 							  olddataitemstoleft, itemsz);
1493 		}
1494 
1495 		/* Abort scan once we find a good-enough choice */
1496 		if (state.have_split && state.best_delta <= goodenough)
1497 		{
1498 			goodenoughfound = true;
1499 			break;
1500 		}
1501 
1502 		olddataitemstoleft += itemsz;
1503 	}
1504 
1505 	/*
1506 	 * If the new item goes as the last item, check for splitting so that all
1507 	 * the old items go to the left page and the new item goes to the right
1508 	 * page.
1509 	 */
1510 	if (newitemoff > maxoff && !goodenoughfound)
1511 		_bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1512 
1513 	/*
1514 	 * I believe it is not possible to fail to find a feasible split, but just
1515 	 * in case ...
1516 	 */
1517 	if (!state.have_split)
1518 		elog(ERROR, "could not find a feasible split point for index \"%s\"",
1519 			 RelationGetRelationName(rel));
1520 
1521 	*newitemonleft = state.newitemonleft;
1522 	return state.firstright;
1523 }
1524 
1525 /*
1526  * Subroutine to analyze a particular possible split choice (ie, firstright
1527  * and newitemonleft settings), and record the best split so far in *state.
1528  *
1529  * firstoldonright is the offset of the first item on the original page
1530  * that goes to the right page, and firstoldonrightsz is the size of that
1531  * tuple. firstoldonright can be > max offset, which means that all the old
1532  * items go to the left page and only the new item goes to the right page.
1533  * In that case, firstoldonrightsz is not used.
1534  *
1535  * olddataitemstoleft is the total size of all old items to the left of
1536  * firstoldonright.
1537  */
1538 static void
_bt_checksplitloc(FindSplitData * state,OffsetNumber firstoldonright,bool newitemonleft,int olddataitemstoleft,Size firstoldonrightsz)1539 _bt_checksplitloc(FindSplitData *state,
1540 				  OffsetNumber firstoldonright,
1541 				  bool newitemonleft,
1542 				  int olddataitemstoleft,
1543 				  Size firstoldonrightsz)
1544 {
1545 	int			leftfree,
1546 				rightfree;
1547 	Size		firstrightitemsz;
1548 	bool		newitemisfirstonright;
1549 
1550 	/* Is the new item going to be the first item on the right page? */
1551 	newitemisfirstonright = (firstoldonright == state->newitemoff
1552 							 && !newitemonleft);
1553 
1554 	if (newitemisfirstonright)
1555 		firstrightitemsz = state->newitemsz;
1556 	else
1557 		firstrightitemsz = firstoldonrightsz;
1558 
1559 	/* Account for all the old tuples */
1560 	leftfree = state->leftspace - olddataitemstoleft;
1561 	rightfree = state->rightspace -
1562 		(state->olddataitemstotal - olddataitemstoleft);
1563 
1564 	/*
1565 	 * The first item on the right page becomes the high key of the left page;
1566 	 * therefore it counts against left space as well as right space.
1567 	 */
1568 	leftfree -= firstrightitemsz;
1569 
1570 	/* account for the new item */
1571 	if (newitemonleft)
1572 		leftfree -= (int) state->newitemsz;
1573 	else
1574 		rightfree -= (int) state->newitemsz;
1575 
1576 	/*
1577 	 * If we are not on the leaf level, we will be able to discard the key
1578 	 * data from the first item that winds up on the right page.
1579 	 */
1580 	if (!state->is_leaf)
1581 		rightfree += (int) firstrightitemsz -
1582 			(int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1583 
1584 	/*
1585 	 * If feasible split point, remember best delta.
1586 	 */
1587 	if (leftfree >= 0 && rightfree >= 0)
1588 	{
1589 		int			delta;
1590 
1591 		if (state->is_rightmost)
1592 		{
1593 			/*
1594 			 * If splitting a rightmost page, try to put (100-fillfactor)% of
1595 			 * free space on left page. See comments for _bt_findsplitloc.
1596 			 */
1597 			delta = (state->fillfactor * leftfree)
1598 				- ((100 - state->fillfactor) * rightfree);
1599 		}
1600 		else
1601 		{
1602 			/* Otherwise, aim for equal free space on both sides */
1603 			delta = leftfree - rightfree;
1604 		}
1605 
1606 		if (delta < 0)
1607 			delta = -delta;
1608 		if (!state->have_split || delta < state->best_delta)
1609 		{
1610 			state->have_split = true;
1611 			state->newitemonleft = newitemonleft;
1612 			state->firstright = firstoldonright;
1613 			state->best_delta = delta;
1614 		}
1615 	}
1616 }
1617 
1618 /*
1619  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1620  *
1621  * On entry, buf and rbuf are the left and right split pages, which we
1622  * still hold write locks on per the L&Y algorithm.  We release the
1623  * write locks once we have write lock on the parent page.  (Any sooner,
1624  * and it'd be possible for some other process to try to split or delete
1625  * one of these pages, and get confused because it cannot find the downlink.)
1626  *
1627  * stack - stack showing how we got here.  May be NULL in cases that don't
1628  *			have to be efficient (concurrent ROOT split, WAL recovery)
1629  * is_root - we split the true root
1630  * is_only - we split a page alone on its level (might have been fast root)
1631  */
1632 static void
_bt_insert_parent(Relation rel,Buffer buf,Buffer rbuf,BTStack stack,bool is_root,bool is_only)1633 _bt_insert_parent(Relation rel,
1634 				  Buffer buf,
1635 				  Buffer rbuf,
1636 				  BTStack stack,
1637 				  bool is_root,
1638 				  bool is_only)
1639 {
1640 	/*
1641 	 * Here we have to do something Lehman and Yao don't talk about: deal with
1642 	 * a root split and construction of a new root.  If our stack is empty
1643 	 * then we have just split a node on what had been the root level when we
1644 	 * descended the tree.  If it was still the root then we perform a
1645 	 * new-root construction.  If it *wasn't* the root anymore, search to find
1646 	 * the next higher level that someone constructed meanwhile, and find the
1647 	 * right place to insert as for the normal case.
1648 	 *
1649 	 * If we have to search for the parent level, we do so by re-descending
1650 	 * from the root.  This is not super-efficient, but it's rare enough not
1651 	 * to matter.
1652 	 */
1653 	if (is_root)
1654 	{
1655 		Buffer		rootbuf;
1656 
1657 		Assert(stack == NULL);
1658 		Assert(is_only);
1659 		/* create a new root node and update the metapage */
1660 		rootbuf = _bt_newroot(rel, buf, rbuf);
1661 		/* release the split buffers */
1662 		_bt_relbuf(rel, rootbuf);
1663 		_bt_relbuf(rel, rbuf);
1664 		_bt_relbuf(rel, buf);
1665 	}
1666 	else
1667 	{
1668 		BlockNumber bknum = BufferGetBlockNumber(buf);
1669 		BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1670 		Page		page = BufferGetPage(buf);
1671 		IndexTuple	new_item;
1672 		BTStackData fakestack;
1673 		IndexTuple	ritem;
1674 		Buffer		pbuf;
1675 
1676 		if (stack == NULL)
1677 		{
1678 			BTPageOpaque lpageop;
1679 
1680 			elog(DEBUG2, "concurrent ROOT page split");
1681 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1682 			/* Find the leftmost page at the next level up */
1683 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1684 									NULL);
1685 			/* Set up a phony stack entry pointing there */
1686 			stack = &fakestack;
1687 			stack->bts_blkno = BufferGetBlockNumber(pbuf);
1688 			stack->bts_offset = InvalidOffsetNumber;
1689 			/* bts_btentry will be initialized below */
1690 			stack->bts_parent = NULL;
1691 			_bt_relbuf(rel, pbuf);
1692 		}
1693 
1694 		/* get high key from left page == lowest key on new right page */
1695 		ritem = (IndexTuple) PageGetItem(page,
1696 										 PageGetItemId(page, P_HIKEY));
1697 
1698 		/* form an index tuple that points at the new right page */
1699 		new_item = CopyIndexTuple(ritem);
1700 		ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1701 
1702 		/*
1703 		 * Find the parent buffer and get the parent page.
1704 		 *
1705 		 * Oops - if we were moved right then we need to change stack item! We
1706 		 * want to find parent pointing to where we are, right ?	- vadim
1707 		 * 05/27/97
1708 		 */
1709 		ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1710 		pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1711 
1712 		/*
1713 		 * Now we can unlock the right child. The left child will be unlocked
1714 		 * by _bt_insertonpg().
1715 		 */
1716 		_bt_relbuf(rel, rbuf);
1717 
1718 		/* Check for error only after writing children */
1719 		if (pbuf == InvalidBuffer)
1720 			elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1721 				 RelationGetRelationName(rel), bknum, rbknum);
1722 
1723 		/* Recursively update the parent */
1724 		_bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
1725 					   new_item, stack->bts_offset + 1,
1726 					   is_only);
1727 
1728 		/* be tidy */
1729 		pfree(new_item);
1730 	}
1731 }
1732 
1733 /*
1734  * _bt_finish_split() -- Finish an incomplete split
1735  *
1736  * A crash or other failure can leave a split incomplete.  The insertion
1737  * routines won't allow to insert on a page that is incompletely split.
1738  * Before inserting on such a page, call _bt_finish_split().
1739  *
1740  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
1741  * and unpinned.
1742  */
1743 void
_bt_finish_split(Relation rel,Buffer lbuf,BTStack stack)1744 _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1745 {
1746 	Page		lpage = BufferGetPage(lbuf);
1747 	BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1748 	Buffer		rbuf;
1749 	Page		rpage;
1750 	BTPageOpaque rpageop;
1751 	bool		was_root;
1752 	bool		was_only;
1753 
1754 	Assert(P_INCOMPLETE_SPLIT(lpageop));
1755 
1756 	/* Lock right sibling, the one missing the downlink */
1757 	rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1758 	rpage = BufferGetPage(rbuf);
1759 	rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1760 
1761 	/* Could this be a root split? */
1762 	if (!stack)
1763 	{
1764 		Buffer		metabuf;
1765 		Page		metapg;
1766 		BTMetaPageData *metad;
1767 
1768 		/* acquire lock on the metapage */
1769 		metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1770 		metapg = BufferGetPage(metabuf);
1771 		metad = BTPageGetMeta(metapg);
1772 
1773 		was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1774 
1775 		_bt_relbuf(rel, metabuf);
1776 	}
1777 	else
1778 		was_root = false;
1779 
1780 	/* Was this the only page on the level before split? */
1781 	was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1782 
1783 	elog(DEBUG1, "finishing incomplete split of %u/%u",
1784 		 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1785 
1786 	_bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1787 }
1788 
1789 /*
1790  *	_bt_getstackbuf() -- Walk back up the tree one step, and find the item
1791  *						 we last looked at in the parent.
1792  *
1793  *		This is possible because we save the downlink from the parent item,
1794  *		which is enough to uniquely identify it.  Insertions into the parent
1795  *		level could cause the item to move right; deletions could cause it
1796  *		to move left, but not left of the page we previously found it in.
1797  *
1798  *		Adjusts bts_blkno & bts_offset if changed.
1799  *
1800  *		Returns InvalidBuffer if item not found (should not happen).
1801  */
1802 Buffer
_bt_getstackbuf(Relation rel,BTStack stack,int access)1803 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1804 {
1805 	BlockNumber blkno;
1806 	OffsetNumber start;
1807 
1808 	blkno = stack->bts_blkno;
1809 	start = stack->bts_offset;
1810 
1811 	for (;;)
1812 	{
1813 		Buffer		buf;
1814 		Page		page;
1815 		BTPageOpaque opaque;
1816 
1817 		buf = _bt_getbuf(rel, blkno, access);
1818 		page = BufferGetPage(buf);
1819 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1820 
1821 		if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
1822 		{
1823 			_bt_finish_split(rel, buf, stack->bts_parent);
1824 			continue;
1825 		}
1826 
1827 		if (!P_IGNORE(opaque))
1828 		{
1829 			OffsetNumber offnum,
1830 						minoff,
1831 						maxoff;
1832 			ItemId		itemid;
1833 			IndexTuple	item;
1834 
1835 			minoff = P_FIRSTDATAKEY(opaque);
1836 			maxoff = PageGetMaxOffsetNumber(page);
1837 
1838 			/*
1839 			 * start = InvalidOffsetNumber means "search the whole page". We
1840 			 * need this test anyway due to possibility that page has a high
1841 			 * key now when it didn't before.
1842 			 */
1843 			if (start < minoff)
1844 				start = minoff;
1845 
1846 			/*
1847 			 * Need this check too, to guard against possibility that page
1848 			 * split since we visited it originally.
1849 			 */
1850 			if (start > maxoff)
1851 				start = OffsetNumberNext(maxoff);
1852 
1853 			/*
1854 			 * These loops will check every item on the page --- but in an
1855 			 * order that's attuned to the probability of where it actually
1856 			 * is.  Scan to the right first, then to the left.
1857 			 */
1858 			for (offnum = start;
1859 				 offnum <= maxoff;
1860 				 offnum = OffsetNumberNext(offnum))
1861 			{
1862 				itemid = PageGetItemId(page, offnum);
1863 				item = (IndexTuple) PageGetItem(page, itemid);
1864 				if (BTEntrySame(item, &stack->bts_btentry))
1865 				{
1866 					/* Return accurate pointer to where link is now */
1867 					stack->bts_blkno = blkno;
1868 					stack->bts_offset = offnum;
1869 					return buf;
1870 				}
1871 			}
1872 
1873 			for (offnum = OffsetNumberPrev(start);
1874 				 offnum >= minoff;
1875 				 offnum = OffsetNumberPrev(offnum))
1876 			{
1877 				itemid = PageGetItemId(page, offnum);
1878 				item = (IndexTuple) PageGetItem(page, itemid);
1879 				if (BTEntrySame(item, &stack->bts_btentry))
1880 				{
1881 					/* Return accurate pointer to where link is now */
1882 					stack->bts_blkno = blkno;
1883 					stack->bts_offset = offnum;
1884 					return buf;
1885 				}
1886 			}
1887 		}
1888 
1889 		/*
1890 		 * The item we're looking for moved right at least one page.
1891 		 */
1892 		if (P_RIGHTMOST(opaque))
1893 		{
1894 			_bt_relbuf(rel, buf);
1895 			return InvalidBuffer;
1896 		}
1897 		blkno = opaque->btpo_next;
1898 		start = InvalidOffsetNumber;
1899 		_bt_relbuf(rel, buf);
1900 	}
1901 }
1902 
1903 /*
1904  *	_bt_newroot() -- Create a new root page for the index.
1905  *
1906  *		We've just split the old root page and need to create a new one.
1907  *		In order to do this, we add a new root page to the file, then lock
1908  *		the metadata page and update it.  This is guaranteed to be deadlock-
1909  *		free, because all readers release their locks on the metadata page
1910  *		before trying to lock the root, and all writers lock the root before
1911  *		trying to lock the metadata page.  We have a write lock on the old
1912  *		root page, so we have not introduced any cycles into the waits-for
1913  *		graph.
1914  *
1915  *		On entry, lbuf (the old root) and rbuf (its new peer) are write-
1916  *		locked. On exit, a new root page exists with entries for the
1917  *		two new children, metapage is updated and unlocked/unpinned.
1918  *		The new root buffer is returned to caller which has to unlock/unpin
1919  *		lbuf, rbuf & rootbuf.
1920  */
1921 static Buffer
_bt_newroot(Relation rel,Buffer lbuf,Buffer rbuf)1922 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1923 {
1924 	Buffer		rootbuf;
1925 	Page		lpage,
1926 				rootpage;
1927 	BlockNumber lbkno,
1928 				rbkno;
1929 	BlockNumber rootblknum;
1930 	BTPageOpaque rootopaque;
1931 	BTPageOpaque lopaque;
1932 	ItemId		itemid;
1933 	IndexTuple	item;
1934 	IndexTuple	left_item;
1935 	Size		left_item_sz;
1936 	IndexTuple	right_item;
1937 	Size		right_item_sz;
1938 	Buffer		metabuf;
1939 	Page		metapg;
1940 	BTMetaPageData *metad;
1941 
1942 	lbkno = BufferGetBlockNumber(lbuf);
1943 	rbkno = BufferGetBlockNumber(rbuf);
1944 	lpage = BufferGetPage(lbuf);
1945 	lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
1946 
1947 	/* get a new root page */
1948 	rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1949 	rootpage = BufferGetPage(rootbuf);
1950 	rootblknum = BufferGetBlockNumber(rootbuf);
1951 
1952 	/* acquire lock on the metapage */
1953 	metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1954 	metapg = BufferGetPage(metabuf);
1955 	metad = BTPageGetMeta(metapg);
1956 
1957 	/*
1958 	 * Create downlink item for left page (old root).  Since this will be the
1959 	 * first item in a non-leaf page, it implicitly has minus-infinity key
1960 	 * value, so we need not store any actual key in it.
1961 	 */
1962 	left_item_sz = sizeof(IndexTupleData);
1963 	left_item = (IndexTuple) palloc(left_item_sz);
1964 	left_item->t_info = left_item_sz;
1965 	ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
1966 
1967 	/*
1968 	 * Create downlink item for right page.  The key for it is obtained from
1969 	 * the "high key" position in the left page.
1970 	 */
1971 	itemid = PageGetItemId(lpage, P_HIKEY);
1972 	right_item_sz = ItemIdGetLength(itemid);
1973 	item = (IndexTuple) PageGetItem(lpage, itemid);
1974 	right_item = CopyIndexTuple(item);
1975 	ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
1976 
1977 	/* NO EREPORT(ERROR) from here till newroot op is logged */
1978 	START_CRIT_SECTION();
1979 
1980 	/* set btree special data */
1981 	rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1982 	rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1983 	rootopaque->btpo_flags = BTP_ROOT;
1984 	rootopaque->btpo.level =
1985 		((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1986 	rootopaque->btpo_cycleid = 0;
1987 
1988 	/* update metapage data */
1989 	metad->btm_root = rootblknum;
1990 	metad->btm_level = rootopaque->btpo.level;
1991 	metad->btm_fastroot = rootblknum;
1992 	metad->btm_fastlevel = rootopaque->btpo.level;
1993 
1994 	/*
1995 	 * Insert the left page pointer into the new root page.  The root page is
1996 	 * the rightmost page on its level so there is no "high key" in it; the
1997 	 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1998 	 *
1999 	 * Note: we *must* insert the two items in item-number order, for the
2000 	 * benefit of _bt_restore_page().
2001 	 */
2002 	if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
2003 					false, false) == InvalidOffsetNumber)
2004 		elog(PANIC, "failed to add leftkey to new root page"
2005 			 " while splitting block %u of index \"%s\"",
2006 			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2007 
2008 	/*
2009 	 * insert the right page pointer into the new root page.
2010 	 */
2011 	if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2012 					false, false) == InvalidOffsetNumber)
2013 		elog(PANIC, "failed to add rightkey to new root page"
2014 			 " while splitting block %u of index \"%s\"",
2015 			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2016 
2017 	/* Clear the incomplete-split flag in the left child */
2018 	Assert(P_INCOMPLETE_SPLIT(lopaque));
2019 	lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2020 	MarkBufferDirty(lbuf);
2021 
2022 	MarkBufferDirty(rootbuf);
2023 	MarkBufferDirty(metabuf);
2024 
2025 	/* XLOG stuff */
2026 	if (RelationNeedsWAL(rel))
2027 	{
2028 		xl_btree_newroot xlrec;
2029 		XLogRecPtr	recptr;
2030 		xl_btree_metadata md;
2031 
2032 		xlrec.rootblk = rootblknum;
2033 		xlrec.level = metad->btm_level;
2034 
2035 		XLogBeginInsert();
2036 		XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2037 
2038 		XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2039 		XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2040 		XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
2041 
2042 		md.root = rootblknum;
2043 		md.level = metad->btm_level;
2044 		md.fastroot = rootblknum;
2045 		md.fastlevel = metad->btm_level;
2046 
2047 		XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2048 
2049 		/*
2050 		 * Direct access to page is not good but faster - we should implement
2051 		 * some new func in page API.
2052 		 */
2053 		XLogRegisterBufData(0,
2054 					   (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2055 							((PageHeader) rootpage)->pd_special -
2056 							((PageHeader) rootpage)->pd_upper);
2057 
2058 		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2059 
2060 		PageSetLSN(lpage, recptr);
2061 		PageSetLSN(rootpage, recptr);
2062 		PageSetLSN(metapg, recptr);
2063 	}
2064 
2065 	END_CRIT_SECTION();
2066 
2067 	/* done with metapage */
2068 	_bt_relbuf(rel, metabuf);
2069 
2070 	pfree(left_item);
2071 	pfree(right_item);
2072 
2073 	return rootbuf;
2074 }
2075 
2076 /*
2077  *	_bt_pgaddtup() -- add a tuple to a particular page in the index.
2078  *
2079  *		This routine adds the tuple to the page as requested.  It does
2080  *		not affect pin/lock status, but you'd better have a write lock
2081  *		and pin on the target buffer!  Don't forget to write and release
2082  *		the buffer afterwards, either.
2083  *
2084  *		The main difference between this routine and a bare PageAddItem call
2085  *		is that this code knows that the leftmost index tuple on a non-leaf
2086  *		btree page doesn't need to have a key.  Therefore, it strips such
2087  *		tuples down to just the tuple header.  CAUTION: this works ONLY if
2088  *		we insert the tuples in order, so that the given itup_off does
2089  *		represent the final position of the tuple!
2090  */
2091 static bool
_bt_pgaddtup(Page page,Size itemsize,IndexTuple itup,OffsetNumber itup_off)2092 _bt_pgaddtup(Page page,
2093 			 Size itemsize,
2094 			 IndexTuple itup,
2095 			 OffsetNumber itup_off)
2096 {
2097 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2098 	IndexTupleData trunctuple;
2099 
2100 	if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2101 	{
2102 		trunctuple = *itup;
2103 		trunctuple.t_info = sizeof(IndexTupleData);
2104 		itup = &trunctuple;
2105 		itemsize = sizeof(IndexTupleData);
2106 	}
2107 
2108 	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2109 					false, false) == InvalidOffsetNumber)
2110 		return false;
2111 
2112 	return true;
2113 }
2114 
2115 /*
2116  * _bt_isequal - used in _bt_doinsert in check for duplicates.
2117  *
2118  * This is very similar to _bt_compare, except for NULL handling.
2119  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
2120  */
2121 static bool
_bt_isequal(TupleDesc itupdesc,Page page,OffsetNumber offnum,int keysz,ScanKey scankey)2122 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2123 			int keysz, ScanKey scankey)
2124 {
2125 	IndexTuple	itup;
2126 	int			i;
2127 
2128 	/* Better be comparing to a leaf item */
2129 	Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2130 
2131 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2132 
2133 	for (i = 1; i <= keysz; i++)
2134 	{
2135 		AttrNumber	attno;
2136 		Datum		datum;
2137 		bool		isNull;
2138 		int32		result;
2139 
2140 		attno = scankey->sk_attno;
2141 		Assert(attno == i);
2142 		datum = index_getattr(itup, attno, itupdesc, &isNull);
2143 
2144 		/* NULLs are never equal to anything */
2145 		if (isNull || (scankey->sk_flags & SK_ISNULL))
2146 			return false;
2147 
2148 		result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2149 												 scankey->sk_collation,
2150 												 datum,
2151 												 scankey->sk_argument));
2152 
2153 		if (result != 0)
2154 			return false;
2155 
2156 		scankey++;
2157 	}
2158 
2159 	/* if we get here, the keys are equal */
2160 	return true;
2161 }
2162 
2163 /*
2164  * _bt_vacuum_one_page - vacuum just one index page.
2165  *
2166  * Try to remove LP_DEAD items from the given page.  The passed buffer
2167  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2168  * super-exclusive "cleanup" lock (see nbtree/README).
2169  */
2170 static void
_bt_vacuum_one_page(Relation rel,Buffer buffer,Relation heapRel)2171 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2172 {
2173 	OffsetNumber deletable[MaxOffsetNumber];
2174 	int			ndeletable = 0;
2175 	OffsetNumber offnum,
2176 				minoff,
2177 				maxoff;
2178 	Page		page = BufferGetPage(buffer);
2179 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2180 
2181 	/*
2182 	 * Scan over all items to see which ones need to be deleted according to
2183 	 * LP_DEAD flags.
2184 	 */
2185 	minoff = P_FIRSTDATAKEY(opaque);
2186 	maxoff = PageGetMaxOffsetNumber(page);
2187 	for (offnum = minoff;
2188 		 offnum <= maxoff;
2189 		 offnum = OffsetNumberNext(offnum))
2190 	{
2191 		ItemId		itemId = PageGetItemId(page, offnum);
2192 
2193 		if (ItemIdIsDead(itemId))
2194 			deletable[ndeletable++] = offnum;
2195 	}
2196 
2197 	if (ndeletable > 0)
2198 		_bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2199 
2200 	/*
2201 	 * Note: if we didn't find any LP_DEAD items, then the page's
2202 	 * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
2203 	 * separate write to clear it, however.  We will clear it when we split
2204 	 * the page.
2205 	 */
2206 }
2207