1 /*-------------------------------------------------------------------------
2  *
3  * nbtinsert.c
4  *	  Item insertion in Lehman and Yao btrees for Postgres.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/nbtxlog.h"
21 #include "access/transam.h"
22 #include "access/xloginsert.h"
23 #include "miscadmin.h"
24 #include "storage/lmgr.h"
25 #include "storage/predicate.h"
26 #include "utils/tqual.h"
27 
28 
29 typedef struct
30 {
31 	/* context data for _bt_checksplitloc */
32 	Size		newitemsz;		/* size of new item to be inserted */
33 	int			fillfactor;		/* needed when splitting rightmost page */
34 	bool		is_leaf;		/* T if splitting a leaf page */
35 	bool		is_rightmost;	/* T if splitting a rightmost page */
36 	OffsetNumber newitemoff;	/* where the new item is to be inserted */
37 	int			leftspace;		/* space available for items on left page */
38 	int			rightspace;		/* space available for items on right page */
39 	int			olddataitemstotal;	/* space taken by old items */
40 
41 	bool		have_split;		/* found a valid split? */
42 
43 	/* these fields valid only if have_split is true */
44 	bool		newitemonleft;	/* new item on left or right of best split */
45 	OffsetNumber firstright;	/* best split point */
46 	int			best_delta;		/* best size delta so far */
47 } FindSplitData;
48 
49 
50 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
51 
52 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
53 				 Relation heapRel, Buffer buf, OffsetNumber offset,
54 				 ScanKey itup_scankey,
55 				 IndexUniqueCheck checkUnique, bool *is_unique,
56 				 uint32 *speculativeToken);
57 static void _bt_findinsertloc(Relation rel,
58 				  Buffer *bufptr,
59 				  OffsetNumber *offsetptr,
60 				  int keysz,
61 				  ScanKey scankey,
62 				  IndexTuple newtup,
63 				  BTStack stack,
64 				  Relation heapRel);
65 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
66 			   BTStack stack,
67 			   IndexTuple itup,
68 			   OffsetNumber newitemoff,
69 			   bool split_only_page);
70 static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
71 		  OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
72 		  IndexTuple newitem, bool newitemonleft);
73 static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
74 				  BTStack stack, bool is_root, bool is_only);
75 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
76 				 OffsetNumber newitemoff,
77 				 Size newitemsz,
78 				 bool *newitemonleft);
79 static void _bt_checksplitloc(FindSplitData *state,
80 				  OffsetNumber firstoldonright, bool newitemonleft,
81 				  int dataitemstoleft, Size firstoldonrightsz);
82 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
83 			 OffsetNumber itup_off);
84 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
85 			int keysz, ScanKey scankey);
86 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
87 
88 
89 /*
90  *	_bt_doinsert() -- Handle insertion of a single index tuple in the tree.
91  *
92  *		This routine is called by the public interface routine, btinsert.
93  *		By here, itup is filled in, including the TID.
94  *
95  *		If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
96  *		will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
97  *		UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
98  *		For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
99  *		don't actually insert.
100  *
101  *		The result value is only significant for UNIQUE_CHECK_PARTIAL:
102  *		it must be TRUE if the entry is known unique, else FALSE.
103  *		(In the current implementation we'll also return TRUE after a
104  *		successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
105  *		that's just a coding artifact.)
106  */
107 bool
_bt_doinsert(Relation rel,IndexTuple itup,IndexUniqueCheck checkUnique,Relation heapRel)108 _bt_doinsert(Relation rel, IndexTuple itup,
109 			 IndexUniqueCheck checkUnique, Relation heapRel)
110 {
111 	bool		is_unique = false;
112 	int			natts = rel->rd_rel->relnatts;
113 	ScanKey		itup_scankey;
114 	BTStack		stack;
115 	Buffer		buf;
116 	OffsetNumber offset;
117 
118 	/* we need an insertion scan key to do our search, so build one */
119 	itup_scankey = _bt_mkscankey(rel, itup);
120 
121 top:
122 	/* find the first page containing this key */
123 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
124 
125 	offset = InvalidOffsetNumber;
126 
127 	/* trade in our read lock for a write lock */
128 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
129 	LockBuffer(buf, BT_WRITE);
130 
131 	/*
132 	 * If the page was split between the time that we surrendered our read
133 	 * lock and acquired our write lock, then this page may no longer be the
134 	 * right place for the key we want to insert.  In this case, we need to
135 	 * move right in the tree.  See Lehman and Yao for an excruciatingly
136 	 * precise description.
137 	 */
138 	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
139 						true, stack, BT_WRITE, NULL);
140 
141 	/*
142 	 * If we're not allowing duplicates, make sure the key isn't already in
143 	 * the index.
144 	 *
145 	 * NOTE: obviously, _bt_check_unique can only detect keys that are already
146 	 * in the index; so it cannot defend against concurrent insertions of the
147 	 * same key.  We protect against that by means of holding a write lock on
148 	 * the target page.  Any other would-be inserter of the same key must
149 	 * acquire a write lock on the same target page, so only one would-be
150 	 * inserter can be making the check at one time.  Furthermore, once we are
151 	 * past the check we hold write locks continuously until we have performed
152 	 * our insertion, so no later inserter can fail to see our insertion.
153 	 * (This requires some care in _bt_insertonpg.)
154 	 *
155 	 * If we must wait for another xact, we release the lock while waiting,
156 	 * and then must start over completely.
157 	 *
158 	 * For a partial uniqueness check, we don't wait for the other xact. Just
159 	 * let the tuple in and return false for possibly non-unique, or true for
160 	 * definitely unique.
161 	 */
162 	if (checkUnique != UNIQUE_CHECK_NO)
163 	{
164 		TransactionId xwait;
165 		uint32		speculativeToken;
166 
167 		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
168 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
169 								 checkUnique, &is_unique, &speculativeToken);
170 
171 		if (TransactionIdIsValid(xwait))
172 		{
173 			/* Have to wait for the other guy ... */
174 			_bt_relbuf(rel, buf);
175 
176 			/*
177 			 * If it's a speculative insertion, wait for it to finish (ie. to
178 			 * go ahead with the insertion, or kill the tuple).  Otherwise
179 			 * wait for the transaction to finish as usual.
180 			 */
181 			if (speculativeToken)
182 				SpeculativeInsertionWait(xwait, speculativeToken);
183 			else
184 				XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
185 
186 			/* start over... */
187 			_bt_freestack(stack);
188 			goto top;
189 		}
190 	}
191 
192 	if (checkUnique != UNIQUE_CHECK_EXISTING)
193 	{
194 		/*
195 		 * The only conflict predicate locking cares about for indexes is when
196 		 * an index tuple insert conflicts with an existing lock.  Since the
197 		 * actual location of the insert is hard to predict because of the
198 		 * random search used to prevent O(N^2) performance when there are
199 		 * many duplicate entries, we can just use the "first valid" page.
200 		 */
201 		CheckForSerializableConflictIn(rel, NULL, buf);
202 		/* do the insertion */
203 		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
204 						  stack, heapRel);
205 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
206 	}
207 	else
208 	{
209 		/* just release the buffer */
210 		_bt_relbuf(rel, buf);
211 	}
212 
213 	/* be tidy */
214 	_bt_freestack(stack);
215 	_bt_freeskey(itup_scankey);
216 
217 	return is_unique;
218 }
219 
220 /*
221  *	_bt_check_unique() -- Check for violation of unique index constraint
222  *
223  * offset points to the first possible item that could conflict. It can
224  * also point to end-of-page, which means that the first tuple to check
225  * is the first tuple on the next page.
226  *
227  * Returns InvalidTransactionId if there is no conflict, else an xact ID
228  * we must wait for to see if it commits a conflicting tuple.   If an actual
229  * conflict is detected, no return --- just ereport().  If an xact ID is
230  * returned, and the conflicting tuple still has a speculative insertion in
231  * progress, *speculativeToken is set to non-zero, and the caller can wait for
232  * the verdict on the insertion using SpeculativeInsertionWait().
233  *
234  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
235  * InvalidTransactionId because we don't want to wait.  In this case we
236  * set *is_unique to false if there is a potential conflict, and the
237  * core code must redo the uniqueness check later.
238  */
239 static TransactionId
_bt_check_unique(Relation rel,IndexTuple itup,Relation heapRel,Buffer buf,OffsetNumber offset,ScanKey itup_scankey,IndexUniqueCheck checkUnique,bool * is_unique,uint32 * speculativeToken)240 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
241 				 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
242 				 IndexUniqueCheck checkUnique, bool *is_unique,
243 				 uint32 *speculativeToken)
244 {
245 	TupleDesc	itupdesc = RelationGetDescr(rel);
246 	int			natts = rel->rd_rel->relnatts;
247 	SnapshotData SnapshotDirty;
248 	OffsetNumber maxoff;
249 	Page		page;
250 	BTPageOpaque opaque;
251 	Buffer		nbuf = InvalidBuffer;
252 	bool		found = false;
253 
254 	/* Assume unique until we find a duplicate */
255 	*is_unique = true;
256 
257 	InitDirtySnapshot(SnapshotDirty);
258 
259 	page = BufferGetPage(buf);
260 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
261 	maxoff = PageGetMaxOffsetNumber(page);
262 
263 	/*
264 	 * Scan over all equal tuples, looking for live conflicts.
265 	 */
266 	for (;;)
267 	{
268 		ItemId		curitemid;
269 		IndexTuple	curitup;
270 		BlockNumber nblkno;
271 
272 		/*
273 		 * make sure the offset points to an actual item before trying to
274 		 * examine it...
275 		 */
276 		if (offset <= maxoff)
277 		{
278 			curitemid = PageGetItemId(page, offset);
279 
280 			/*
281 			 * We can skip items that are marked killed.
282 			 *
283 			 * Formerly, we applied _bt_isequal() before checking the kill
284 			 * flag, so as to fall out of the item loop as soon as possible.
285 			 * However, in the presence of heavy update activity an index may
286 			 * contain many killed items with the same key; running
287 			 * _bt_isequal() on each killed item gets expensive. Furthermore
288 			 * it is likely that the non-killed version of each key appears
289 			 * first, so that we didn't actually get to exit any sooner
290 			 * anyway. So now we just advance over killed items as quickly as
291 			 * we can. We only apply _bt_isequal() when we get to a non-killed
292 			 * item or the end of the page.
293 			 */
294 			if (!ItemIdIsDead(curitemid))
295 			{
296 				ItemPointerData htid;
297 				bool		all_dead;
298 
299 				/*
300 				 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
301 				 * how we handling NULLs - and so we must not use _bt_compare
302 				 * in real comparison, but only for ordering/finding items on
303 				 * pages. - vadim 03/24/97
304 				 */
305 				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
306 					break;		/* we're past all the equal tuples */
307 
308 				/* okay, we gotta fetch the heap tuple ... */
309 				curitup = (IndexTuple) PageGetItem(page, curitemid);
310 				htid = curitup->t_tid;
311 
312 				/*
313 				 * If we are doing a recheck, we expect to find the tuple we
314 				 * are rechecking.  It's not a duplicate, but we have to keep
315 				 * scanning.
316 				 */
317 				if (checkUnique == UNIQUE_CHECK_EXISTING &&
318 					ItemPointerCompare(&htid, &itup->t_tid) == 0)
319 				{
320 					found = true;
321 				}
322 
323 				/*
324 				 * We check the whole HOT-chain to see if there is any tuple
325 				 * that satisfies SnapshotDirty.  This is necessary because we
326 				 * have just a single index entry for the entire chain.
327 				 */
328 				else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
329 										 &all_dead))
330 				{
331 					TransactionId xwait;
332 
333 					/*
334 					 * It is a duplicate. If we are only doing a partial
335 					 * check, then don't bother checking if the tuple is being
336 					 * updated in another transaction. Just return the fact
337 					 * that it is a potential conflict and leave the full
338 					 * check till later.
339 					 */
340 					if (checkUnique == UNIQUE_CHECK_PARTIAL)
341 					{
342 						if (nbuf != InvalidBuffer)
343 							_bt_relbuf(rel, nbuf);
344 						*is_unique = false;
345 						return InvalidTransactionId;
346 					}
347 
348 					/*
349 					 * If this tuple is being updated by other transaction
350 					 * then we have to wait for its commit/abort.
351 					 */
352 					xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
353 						SnapshotDirty.xmin : SnapshotDirty.xmax;
354 
355 					if (TransactionIdIsValid(xwait))
356 					{
357 						if (nbuf != InvalidBuffer)
358 							_bt_relbuf(rel, nbuf);
359 						/* Tell _bt_doinsert to wait... */
360 						*speculativeToken = SnapshotDirty.speculativeToken;
361 						return xwait;
362 					}
363 
364 					/*
365 					 * Otherwise we have a definite conflict.  But before
366 					 * complaining, look to see if the tuple we want to insert
367 					 * is itself now committed dead --- if so, don't complain.
368 					 * This is a waste of time in normal scenarios but we must
369 					 * do it to support CREATE INDEX CONCURRENTLY.
370 					 *
371 					 * We must follow HOT-chains here because during
372 					 * concurrent index build, we insert the root TID though
373 					 * the actual tuple may be somewhere in the HOT-chain.
374 					 * While following the chain we might not stop at the
375 					 * exact tuple which triggered the insert, but that's OK
376 					 * because if we find a live tuple anywhere in this chain,
377 					 * we have a unique key conflict.  The other live tuple is
378 					 * not part of this chain because it had a different index
379 					 * entry.
380 					 */
381 					htid = itup->t_tid;
382 					if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
383 					{
384 						/* Normal case --- it's still live */
385 					}
386 					else
387 					{
388 						/*
389 						 * It's been deleted, so no error, and no need to
390 						 * continue searching
391 						 */
392 						break;
393 					}
394 
395 					/*
396 					 * Check for a conflict-in as we would if we were going to
397 					 * write to this page.  We aren't actually going to write,
398 					 * but we want a chance to report SSI conflicts that would
399 					 * otherwise be masked by this unique constraint
400 					 * violation.
401 					 */
402 					CheckForSerializableConflictIn(rel, NULL, buf);
403 
404 					/*
405 					 * This is a definite conflict.  Break the tuple down into
406 					 * datums and report the error.  But first, make sure we
407 					 * release the buffer locks we're holding ---
408 					 * BuildIndexValueDescription could make catalog accesses,
409 					 * which in the worst case might touch this same index and
410 					 * cause deadlocks.
411 					 */
412 					if (nbuf != InvalidBuffer)
413 						_bt_relbuf(rel, nbuf);
414 					_bt_relbuf(rel, buf);
415 
416 					{
417 						Datum		values[INDEX_MAX_KEYS];
418 						bool		isnull[INDEX_MAX_KEYS];
419 						char	   *key_desc;
420 
421 						index_deform_tuple(itup, RelationGetDescr(rel),
422 										   values, isnull);
423 
424 						key_desc = BuildIndexValueDescription(rel, values,
425 															  isnull);
426 
427 						ereport(ERROR,
428 								(errcode(ERRCODE_UNIQUE_VIOLATION),
429 								 errmsg("duplicate key value violates unique constraint \"%s\"",
430 										RelationGetRelationName(rel)),
431 								 key_desc ? errdetail("Key %s already exists.",
432 													  key_desc) : 0,
433 								 errtableconstraint(heapRel,
434 													RelationGetRelationName(rel))));
435 					}
436 				}
437 				else if (all_dead)
438 				{
439 					/*
440 					 * The conflicting tuple (or whole HOT chain) is dead to
441 					 * everyone, so we may as well mark the index entry
442 					 * killed.
443 					 */
444 					ItemIdMarkDead(curitemid);
445 					opaque->btpo_flags |= BTP_HAS_GARBAGE;
446 
447 					/*
448 					 * Mark buffer with a dirty hint, since state is not
449 					 * crucial. Be sure to mark the proper buffer dirty.
450 					 */
451 					if (nbuf != InvalidBuffer)
452 						MarkBufferDirtyHint(nbuf, true);
453 					else
454 						MarkBufferDirtyHint(buf, true);
455 				}
456 			}
457 		}
458 
459 		/*
460 		 * Advance to next tuple to continue checking.
461 		 */
462 		if (offset < maxoff)
463 			offset = OffsetNumberNext(offset);
464 		else
465 		{
466 			/* If scankey == hikey we gotta check the next page too */
467 			if (P_RIGHTMOST(opaque))
468 				break;
469 			if (!_bt_isequal(itupdesc, page, P_HIKEY,
470 							 natts, itup_scankey))
471 				break;
472 			/* Advance to next non-dead page --- there must be one */
473 			for (;;)
474 			{
475 				nblkno = opaque->btpo_next;
476 				nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
477 				page = BufferGetPage(nbuf);
478 				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
479 				if (!P_IGNORE(opaque))
480 					break;
481 				if (P_RIGHTMOST(opaque))
482 					elog(ERROR, "fell off the end of index \"%s\"",
483 						 RelationGetRelationName(rel));
484 			}
485 			maxoff = PageGetMaxOffsetNumber(page);
486 			offset = P_FIRSTDATAKEY(opaque);
487 		}
488 	}
489 
490 	/*
491 	 * If we are doing a recheck then we should have found the tuple we are
492 	 * checking.  Otherwise there's something very wrong --- probably, the
493 	 * index is on a non-immutable expression.
494 	 */
495 	if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
496 		ereport(ERROR,
497 				(errcode(ERRCODE_INTERNAL_ERROR),
498 				 errmsg("failed to re-find tuple within index \"%s\"",
499 						RelationGetRelationName(rel)),
500 				 errhint("This may be because of a non-immutable index expression."),
501 				 errtableconstraint(heapRel,
502 									RelationGetRelationName(rel))));
503 
504 	if (nbuf != InvalidBuffer)
505 		_bt_relbuf(rel, nbuf);
506 
507 	return InvalidTransactionId;
508 }
509 
510 
511 /*
512  *	_bt_findinsertloc() -- Finds an insert location for a tuple
513  *
514  *		If the new key is equal to one or more existing keys, we can
515  *		legitimately place it anywhere in the series of equal keys --- in fact,
516  *		if the new key is equal to the page's "high key" we can place it on
517  *		the next page.  If it is equal to the high key, and there's not room
518  *		to insert the new tuple on the current page without splitting, then
519  *		we can move right hoping to find more free space and avoid a split.
520  *		(We should not move right indefinitely, however, since that leads to
521  *		O(N^2) insertion behavior in the presence of many equal keys.)
522  *		Once we have chosen the page to put the key on, we'll insert it before
523  *		any existing equal keys because of the way _bt_binsrch() works.
524  *
525  *		If there's not enough room in the space, we try to make room by
526  *		removing any LP_DEAD tuples.
527  *
528  *		On entry, *bufptr and *offsetptr point to the first legal position
529  *		where the new tuple could be inserted.  The caller should hold an
530  *		exclusive lock on *bufptr.  *offsetptr can also be set to
531  *		InvalidOffsetNumber, in which case the function will search for the
532  *		right location within the page if needed.  On exit, they point to the
533  *		chosen insert location.  If _bt_findinsertloc decides to move right,
534  *		the lock and pin on the original page will be released and the new
535  *		page returned to the caller is exclusively locked instead.
536  *
537  *		newtup is the new tuple we're inserting, and scankey is an insertion
538  *		type scan key for it.
539  */
540 static void
_bt_findinsertloc(Relation rel,Buffer * bufptr,OffsetNumber * offsetptr,int keysz,ScanKey scankey,IndexTuple newtup,BTStack stack,Relation heapRel)541 _bt_findinsertloc(Relation rel,
542 				  Buffer *bufptr,
543 				  OffsetNumber *offsetptr,
544 				  int keysz,
545 				  ScanKey scankey,
546 				  IndexTuple newtup,
547 				  BTStack stack,
548 				  Relation heapRel)
549 {
550 	Buffer		buf = *bufptr;
551 	Page		page = BufferGetPage(buf);
552 	Size		itemsz;
553 	BTPageOpaque lpageop;
554 	bool		movedright,
555 				vacuumed;
556 	OffsetNumber newitemoff;
557 	OffsetNumber firstlegaloff = *offsetptr;
558 
559 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
560 
561 	itemsz = IndexTupleDSize(*newtup);
562 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
563 								 * need to be consistent */
564 
565 	/*
566 	 * Check whether the item can fit on a btree page at all. (Eventually, we
567 	 * ought to try to apply TOAST methods if not.) We actually need to be
568 	 * able to fit three items on every page, so restrict any one item to 1/3
569 	 * the per-page available space. Note that at this point, itemsz doesn't
570 	 * include the ItemId.
571 	 *
572 	 * NOTE: if you change this, see also the similar code in _bt_buildadd().
573 	 */
574 	if (itemsz > BTMaxItemSize(page))
575 		ereport(ERROR,
576 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
577 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
578 						itemsz, BTMaxItemSize(page),
579 						RelationGetRelationName(rel)),
580 				 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
581 						 "Consider a function index of an MD5 hash of the value, "
582 						 "or use full text indexing."),
583 				 errtableconstraint(heapRel,
584 									RelationGetRelationName(rel))));
585 
586 	/*----------
587 	 * If we will need to split the page to put the item on this page,
588 	 * check whether we can put the tuple somewhere to the right,
589 	 * instead.  Keep scanning right until we
590 	 *		(a) find a page with enough free space,
591 	 *		(b) reach the last page where the tuple can legally go, or
592 	 *		(c) get tired of searching.
593 	 * (c) is not flippant; it is important because if there are many
594 	 * pages' worth of equal keys, it's better to split one of the early
595 	 * pages than to scan all the way to the end of the run of equal keys
596 	 * on every insert.  We implement "get tired" as a random choice,
597 	 * since stopping after scanning a fixed number of pages wouldn't work
598 	 * well (we'd never reach the right-hand side of previously split
599 	 * pages).  Currently the probability of moving right is set at 0.99,
600 	 * which may seem too high to change the behavior much, but it does an
601 	 * excellent job of preventing O(N^2) behavior with many equal keys.
602 	 *----------
603 	 */
604 	movedright = false;
605 	vacuumed = false;
606 	while (PageGetFreeSpace(page) < itemsz)
607 	{
608 		Buffer		rbuf;
609 		BlockNumber rblkno;
610 
611 		/*
612 		 * before considering moving right, see if we can obtain enough space
613 		 * by erasing LP_DEAD items
614 		 */
615 		if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
616 		{
617 			_bt_vacuum_one_page(rel, buf, heapRel);
618 
619 			/*
620 			 * remember that we vacuumed this page, because that makes the
621 			 * hint supplied by the caller invalid
622 			 */
623 			vacuumed = true;
624 
625 			if (PageGetFreeSpace(page) >= itemsz)
626 				break;			/* OK, now we have enough space */
627 		}
628 
629 		/*
630 		 * nope, so check conditions (b) and (c) enumerated above
631 		 */
632 		if (P_RIGHTMOST(lpageop) ||
633 			_bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
634 			random() <= (MAX_RANDOM_VALUE / 100))
635 			break;
636 
637 		/*
638 		 * step right to next non-dead page
639 		 *
640 		 * must write-lock that page before releasing write lock on current
641 		 * page; else someone else's _bt_check_unique scan could fail to see
642 		 * our insertion.  write locks on intermediate dead pages won't do
643 		 * because we don't know when they will get de-linked from the tree.
644 		 */
645 		rbuf = InvalidBuffer;
646 
647 		rblkno = lpageop->btpo_next;
648 		for (;;)
649 		{
650 			rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
651 			page = BufferGetPage(rbuf);
652 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
653 
654 			/*
655 			 * If this page was incompletely split, finish the split now. We
656 			 * do this while holding a lock on the left sibling, which is not
657 			 * good because finishing the split could be a fairly lengthy
658 			 * operation.  But this should happen very seldom.
659 			 */
660 			if (P_INCOMPLETE_SPLIT(lpageop))
661 			{
662 				_bt_finish_split(rel, rbuf, stack);
663 				rbuf = InvalidBuffer;
664 				continue;
665 			}
666 
667 			if (!P_IGNORE(lpageop))
668 				break;
669 			if (P_RIGHTMOST(lpageop))
670 				elog(ERROR, "fell off the end of index \"%s\"",
671 					 RelationGetRelationName(rel));
672 
673 			rblkno = lpageop->btpo_next;
674 		}
675 		_bt_relbuf(rel, buf);
676 		buf = rbuf;
677 		movedright = true;
678 		vacuumed = false;
679 	}
680 
681 	/*
682 	 * Now we are on the right page, so find the insert position. If we moved
683 	 * right at all, we know we should insert at the start of the page. If we
684 	 * didn't move right, we can use the firstlegaloff hint if the caller
685 	 * supplied one, unless we vacuumed the page which might have moved tuples
686 	 * around making the hint invalid. If we didn't move right or can't use
687 	 * the hint, find the position by searching.
688 	 */
689 	if (movedright)
690 		newitemoff = P_FIRSTDATAKEY(lpageop);
691 	else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
692 		newitemoff = firstlegaloff;
693 	else
694 		newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
695 
696 	*bufptr = buf;
697 	*offsetptr = newitemoff;
698 }
699 
700 /*----------
701  *	_bt_insertonpg() -- Insert a tuple on a particular page in the index.
702  *
703  *		This recursive procedure does the following things:
704  *
705  *			+  if necessary, splits the target page (making sure that the
706  *			   split is equitable as far as post-insert free space goes).
707  *			+  inserts the tuple.
708  *			+  if the page was split, pops the parent stack, and finds the
709  *			   right place to insert the new child pointer (by walking
710  *			   right using information stored in the parent stack).
711  *			+  invokes itself with the appropriate tuple for the right
712  *			   child page on the parent.
713  *			+  updates the metapage if a true root or fast root is split.
714  *
715  *		On entry, we must have the correct buffer in which to do the
716  *		insertion, and the buffer must be pinned and write-locked.  On return,
717  *		we will have dropped both the pin and the lock on the buffer.
718  *
719  *		When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
720  *		page we're inserting the downlink for.  This function will clear the
721  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
722  *
723  *		The locking interactions in this code are critical.  You should
724  *		grok Lehman and Yao's paper before making any changes.  In addition,
725  *		you need to understand how we disambiguate duplicate keys in this
726  *		implementation, in order to be able to find our location using
727  *		L&Y "move right" operations.  Since we may insert duplicate user
728  *		keys, and since these dups may propagate up the tree, we use the
729  *		'afteritem' parameter to position ourselves correctly for the
730  *		insertion on internal pages.
731  *----------
732  */
733 static void
_bt_insertonpg(Relation rel,Buffer buf,Buffer cbuf,BTStack stack,IndexTuple itup,OffsetNumber newitemoff,bool split_only_page)734 _bt_insertonpg(Relation rel,
735 			   Buffer buf,
736 			   Buffer cbuf,
737 			   BTStack stack,
738 			   IndexTuple itup,
739 			   OffsetNumber newitemoff,
740 			   bool split_only_page)
741 {
742 	Page		page;
743 	BTPageOpaque lpageop;
744 	OffsetNumber firstright = InvalidOffsetNumber;
745 	Size		itemsz;
746 
747 	page = BufferGetPage(buf);
748 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
749 
750 	/* child buffer must be given iff inserting on an internal page */
751 	Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
752 
753 	/* The caller should've finished any incomplete splits already. */
754 	if (P_INCOMPLETE_SPLIT(lpageop))
755 		elog(ERROR, "cannot insert to incompletely split page %u",
756 			 BufferGetBlockNumber(buf));
757 
758 	itemsz = IndexTupleDSize(*itup);
759 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
760 								 * need to be consistent */
761 
762 	/*
763 	 * Do we need to split the page to fit the item on it?
764 	 *
765 	 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
766 	 * so this comparison is correct even though we appear to be accounting
767 	 * only for the item and not for its line pointer.
768 	 */
769 	if (PageGetFreeSpace(page) < itemsz)
770 	{
771 		bool		is_root = P_ISROOT(lpageop);
772 		bool		is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
773 		bool		newitemonleft;
774 		Buffer		rbuf;
775 
776 		/* Choose the split point */
777 		firstright = _bt_findsplitloc(rel, page,
778 									  newitemoff, itemsz,
779 									  &newitemonleft);
780 
781 		/* split the buffer into left and right halves */
782 		rbuf = _bt_split(rel, buf, cbuf, firstright,
783 						 newitemoff, itemsz, itup, newitemonleft);
784 		PredicateLockPageSplit(rel,
785 							   BufferGetBlockNumber(buf),
786 							   BufferGetBlockNumber(rbuf));
787 
788 		/*----------
789 		 * By here,
790 		 *
791 		 *		+  our target page has been split;
792 		 *		+  the original tuple has been inserted;
793 		 *		+  we have write locks on both the old (left half)
794 		 *		   and new (right half) buffers, after the split; and
795 		 *		+  we know the key we want to insert into the parent
796 		 *		   (it's the "high key" on the left child page).
797 		 *
798 		 * We're ready to do the parent insertion.  We need to hold onto the
799 		 * locks for the child pages until we locate the parent, but we can
800 		 * release them before doing the actual insertion (see Lehman and Yao
801 		 * for the reasoning).
802 		 *----------
803 		 */
804 		_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
805 	}
806 	else
807 	{
808 		Buffer		metabuf = InvalidBuffer;
809 		Page		metapg = NULL;
810 		BTMetaPageData *metad = NULL;
811 		OffsetNumber itup_off;
812 		BlockNumber itup_blkno;
813 
814 		itup_off = newitemoff;
815 		itup_blkno = BufferGetBlockNumber(buf);
816 
817 		/*
818 		 * If we are doing this insert because we split a page that was the
819 		 * only one on its tree level, but was not the root, it may have been
820 		 * the "fast root".  We need to ensure that the fast root link points
821 		 * at or above the current page.  We can safely acquire a lock on the
822 		 * metapage here --- see comments for _bt_newroot().
823 		 */
824 		if (split_only_page)
825 		{
826 			Assert(!P_ISLEAF(lpageop));
827 
828 			metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
829 			metapg = BufferGetPage(metabuf);
830 			metad = BTPageGetMeta(metapg);
831 
832 			if (metad->btm_fastlevel >= lpageop->btpo.level)
833 			{
834 				/* no update wanted */
835 				_bt_relbuf(rel, metabuf);
836 				metabuf = InvalidBuffer;
837 			}
838 		}
839 
840 		/* Do the update.  No ereport(ERROR) until changes are logged */
841 		START_CRIT_SECTION();
842 
843 		if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
844 			elog(PANIC, "failed to add new item to block %u in index \"%s\"",
845 				 itup_blkno, RelationGetRelationName(rel));
846 
847 		MarkBufferDirty(buf);
848 
849 		if (BufferIsValid(metabuf))
850 		{
851 			metad->btm_fastroot = itup_blkno;
852 			metad->btm_fastlevel = lpageop->btpo.level;
853 			MarkBufferDirty(metabuf);
854 		}
855 
856 		/* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
857 		if (BufferIsValid(cbuf))
858 		{
859 			Page		cpage = BufferGetPage(cbuf);
860 			BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
861 
862 			Assert(P_INCOMPLETE_SPLIT(cpageop));
863 			cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
864 			MarkBufferDirty(cbuf);
865 		}
866 
867 		/* XLOG stuff */
868 		if (RelationNeedsWAL(rel))
869 		{
870 			xl_btree_insert xlrec;
871 			xl_btree_metadata xlmeta;
872 			uint8		xlinfo;
873 			XLogRecPtr	recptr;
874 			IndexTupleData trunctuple;
875 
876 			xlrec.offnum = itup_off;
877 
878 			XLogBeginInsert();
879 			XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
880 
881 			if (P_ISLEAF(lpageop))
882 				xlinfo = XLOG_BTREE_INSERT_LEAF;
883 			else
884 			{
885 				/*
886 				 * Register the left child whose INCOMPLETE_SPLIT flag was
887 				 * cleared.
888 				 */
889 				XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
890 
891 				xlinfo = XLOG_BTREE_INSERT_UPPER;
892 			}
893 
894 			if (BufferIsValid(metabuf))
895 			{
896 				xlmeta.root = metad->btm_root;
897 				xlmeta.level = metad->btm_level;
898 				xlmeta.fastroot = metad->btm_fastroot;
899 				xlmeta.fastlevel = metad->btm_fastlevel;
900 
901 				XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
902 				XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
903 
904 				xlinfo = XLOG_BTREE_INSERT_META;
905 			}
906 
907 			/* Read comments in _bt_pgaddtup */
908 			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
909 			if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
910 			{
911 				trunctuple = *itup;
912 				trunctuple.t_info = sizeof(IndexTupleData);
913 				XLogRegisterBufData(0, (char *) &trunctuple,
914 									sizeof(IndexTupleData));
915 			}
916 			else
917 				XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
918 
919 			recptr = XLogInsert(RM_BTREE_ID, xlinfo);
920 
921 			if (BufferIsValid(metabuf))
922 			{
923 				PageSetLSN(metapg, recptr);
924 			}
925 			if (BufferIsValid(cbuf))
926 			{
927 				PageSetLSN(BufferGetPage(cbuf), recptr);
928 			}
929 
930 			PageSetLSN(page, recptr);
931 		}
932 
933 		END_CRIT_SECTION();
934 
935 		/* release buffers */
936 		if (BufferIsValid(metabuf))
937 			_bt_relbuf(rel, metabuf);
938 		if (BufferIsValid(cbuf))
939 			_bt_relbuf(rel, cbuf);
940 		_bt_relbuf(rel, buf);
941 	}
942 }
943 
944 /*
945  *	_bt_split() -- split a page in the btree.
946  *
947  *		On entry, buf is the page to split, and is pinned and write-locked.
948  *		firstright is the item index of the first item to be moved to the
949  *		new right page.  newitemoff etc. tell us about the new item that
950  *		must be inserted along with the data from the old page.
951  *
952  *		When splitting a non-leaf page, 'cbuf' is the left-sibling of the
953  *		page we're inserting the downlink for.  This function will clear the
954  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
955  *
956  *		Returns the new right sibling of buf, pinned and write-locked.
957  *		The pin and lock on buf are maintained.
958  */
959 static Buffer
_bt_split(Relation rel,Buffer buf,Buffer cbuf,OffsetNumber firstright,OffsetNumber newitemoff,Size newitemsz,IndexTuple newitem,bool newitemonleft)960 _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
961 		  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
962 		  bool newitemonleft)
963 {
964 	Buffer		rbuf;
965 	Page		origpage;
966 	Page		leftpage,
967 				rightpage;
968 	BlockNumber origpagenumber,
969 				rightpagenumber;
970 	BTPageOpaque ropaque,
971 				lopaque,
972 				oopaque;
973 	Buffer		sbuf = InvalidBuffer;
974 	Page		spage = NULL;
975 	BTPageOpaque sopaque = NULL;
976 	Size		itemsz;
977 	ItemId		itemid;
978 	IndexTuple	item;
979 	OffsetNumber leftoff,
980 				rightoff;
981 	OffsetNumber maxoff;
982 	OffsetNumber i;
983 	bool		isroot;
984 	bool		isleaf;
985 
986 	/* Acquire a new page to split into */
987 	rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
988 
989 	/*
990 	 * origpage is the original page to be split.  leftpage is a temporary
991 	 * buffer that receives the left-sibling data, which will be copied back
992 	 * into origpage on success.  rightpage is the new page that receives the
993 	 * right-sibling data.  If we fail before reaching the critical section,
994 	 * origpage hasn't been modified and leftpage is only workspace. In
995 	 * principle we shouldn't need to worry about rightpage either, because it
996 	 * hasn't been linked into the btree page structure; but to avoid leaving
997 	 * possibly-confusing junk behind, we are careful to rewrite rightpage as
998 	 * zeroes before throwing any error.
999 	 */
1000 	origpage = BufferGetPage(buf);
1001 	leftpage = PageGetTempPage(origpage);
1002 	rightpage = BufferGetPage(rbuf);
1003 
1004 	origpagenumber = BufferGetBlockNumber(buf);
1005 	rightpagenumber = BufferGetBlockNumber(rbuf);
1006 
1007 	_bt_pageinit(leftpage, BufferGetPageSize(buf));
1008 	/* rightpage was already initialized by _bt_getbuf */
1009 
1010 	/*
1011 	 * Copy the original page's LSN into leftpage, which will become the
1012 	 * updated version of the page.  We need this because XLogInsert will
1013 	 * examine the LSN and possibly dump it in a page image.
1014 	 */
1015 	PageSetLSN(leftpage, PageGetLSN(origpage));
1016 
1017 	/* init btree private data */
1018 	oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1019 	lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1020 	ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1021 
1022 	isroot = P_ISROOT(oopaque);
1023 	isleaf = P_ISLEAF(oopaque);
1024 
1025 	/* if we're splitting this page, it won't be the root when we're done */
1026 	/* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
1027 	lopaque->btpo_flags = oopaque->btpo_flags;
1028 	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1029 	ropaque->btpo_flags = lopaque->btpo_flags;
1030 	/* set flag in left page indicating that the right page has no downlink */
1031 	lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1032 	lopaque->btpo_prev = oopaque->btpo_prev;
1033 	lopaque->btpo_next = rightpagenumber;
1034 	ropaque->btpo_prev = origpagenumber;
1035 	ropaque->btpo_next = oopaque->btpo_next;
1036 	lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
1037 	/* Since we already have write-lock on both pages, ok to read cycleid */
1038 	lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1039 	ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1040 
1041 	/*
1042 	 * If the page we're splitting is not the rightmost page at its level in
1043 	 * the tree, then the first entry on the page is the high key for the
1044 	 * page.  We need to copy that to the right half.  Otherwise (meaning the
1045 	 * rightmost page case), all the items on the right half will be user
1046 	 * data.
1047 	 */
1048 	rightoff = P_HIKEY;
1049 
1050 	if (!P_RIGHTMOST(oopaque))
1051 	{
1052 		itemid = PageGetItemId(origpage, P_HIKEY);
1053 		itemsz = ItemIdGetLength(itemid);
1054 		item = (IndexTuple) PageGetItem(origpage, itemid);
1055 		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1056 						false, false) == InvalidOffsetNumber)
1057 		{
1058 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1059 			elog(ERROR, "failed to add hikey to the right sibling"
1060 				 " while splitting block %u of index \"%s\"",
1061 				 origpagenumber, RelationGetRelationName(rel));
1062 		}
1063 		rightoff = OffsetNumberNext(rightoff);
1064 	}
1065 
1066 	/*
1067 	 * The "high key" for the new left page will be the first key that's going
1068 	 * to go into the new right page.  This might be either the existing data
1069 	 * item at position firstright, or the incoming tuple.
1070 	 */
1071 	leftoff = P_HIKEY;
1072 	if (!newitemonleft && newitemoff == firstright)
1073 	{
1074 		/* incoming tuple will become first on right page */
1075 		itemsz = newitemsz;
1076 		item = newitem;
1077 	}
1078 	else
1079 	{
1080 		/* existing item at firstright will become first on right page */
1081 		itemid = PageGetItemId(origpage, firstright);
1082 		itemsz = ItemIdGetLength(itemid);
1083 		item = (IndexTuple) PageGetItem(origpage, itemid);
1084 	}
1085 	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1086 					false, false) == InvalidOffsetNumber)
1087 	{
1088 		memset(rightpage, 0, BufferGetPageSize(rbuf));
1089 		elog(ERROR, "failed to add hikey to the left sibling"
1090 			 " while splitting block %u of index \"%s\"",
1091 			 origpagenumber, RelationGetRelationName(rel));
1092 	}
1093 	leftoff = OffsetNumberNext(leftoff);
1094 
1095 	/*
1096 	 * Now transfer all the data items to the appropriate page.
1097 	 *
1098 	 * Note: we *must* insert at least the right page's items in item-number
1099 	 * order, for the benefit of _bt_restore_page().
1100 	 */
1101 	maxoff = PageGetMaxOffsetNumber(origpage);
1102 
1103 	for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1104 	{
1105 		itemid = PageGetItemId(origpage, i);
1106 		itemsz = ItemIdGetLength(itemid);
1107 		item = (IndexTuple) PageGetItem(origpage, itemid);
1108 
1109 		/* does new item belong before this one? */
1110 		if (i == newitemoff)
1111 		{
1112 			if (newitemonleft)
1113 			{
1114 				if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1115 				{
1116 					memset(rightpage, 0, BufferGetPageSize(rbuf));
1117 					elog(ERROR, "failed to add new item to the left sibling"
1118 						 " while splitting block %u of index \"%s\"",
1119 						 origpagenumber, RelationGetRelationName(rel));
1120 				}
1121 				leftoff = OffsetNumberNext(leftoff);
1122 			}
1123 			else
1124 			{
1125 				if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1126 				{
1127 					memset(rightpage, 0, BufferGetPageSize(rbuf));
1128 					elog(ERROR, "failed to add new item to the right sibling"
1129 						 " while splitting block %u of index \"%s\"",
1130 						 origpagenumber, RelationGetRelationName(rel));
1131 				}
1132 				rightoff = OffsetNumberNext(rightoff);
1133 			}
1134 		}
1135 
1136 		/* decide which page to put it on */
1137 		if (i < firstright)
1138 		{
1139 			if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1140 			{
1141 				memset(rightpage, 0, BufferGetPageSize(rbuf));
1142 				elog(ERROR, "failed to add old item to the left sibling"
1143 					 " while splitting block %u of index \"%s\"",
1144 					 origpagenumber, RelationGetRelationName(rel));
1145 			}
1146 			leftoff = OffsetNumberNext(leftoff);
1147 		}
1148 		else
1149 		{
1150 			if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1151 			{
1152 				memset(rightpage, 0, BufferGetPageSize(rbuf));
1153 				elog(ERROR, "failed to add old item to the right sibling"
1154 					 " while splitting block %u of index \"%s\"",
1155 					 origpagenumber, RelationGetRelationName(rel));
1156 			}
1157 			rightoff = OffsetNumberNext(rightoff);
1158 		}
1159 	}
1160 
1161 	/* cope with possibility that newitem goes at the end */
1162 	if (i <= newitemoff)
1163 	{
1164 		/*
1165 		 * Can't have newitemonleft here; that would imply we were told to put
1166 		 * *everything* on the left page, which cannot fit (if it could, we'd
1167 		 * not be splitting the page).
1168 		 */
1169 		Assert(!newitemonleft);
1170 		if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1171 		{
1172 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1173 			elog(ERROR, "failed to add new item to the right sibling"
1174 				 " while splitting block %u of index \"%s\"",
1175 				 origpagenumber, RelationGetRelationName(rel));
1176 		}
1177 		rightoff = OffsetNumberNext(rightoff);
1178 	}
1179 
1180 	/*
1181 	 * We have to grab the right sibling (if any) and fix the prev pointer
1182 	 * there. We are guaranteed that this is deadlock-free since no other
1183 	 * writer will be holding a lock on that page and trying to move left, and
1184 	 * all readers release locks on a page before trying to fetch its
1185 	 * neighbors.
1186 	 */
1187 
1188 	if (!P_RIGHTMOST(oopaque))
1189 	{
1190 		sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1191 		spage = BufferGetPage(sbuf);
1192 		sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1193 		if (sopaque->btpo_prev != origpagenumber)
1194 		{
1195 			memset(rightpage, 0, BufferGetPageSize(rbuf));
1196 			elog(ERROR, "right sibling's left-link doesn't match: "
1197 				 "block %u links to %u instead of expected %u in index \"%s\"",
1198 				 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1199 				 RelationGetRelationName(rel));
1200 		}
1201 
1202 		/*
1203 		 * Check to see if we can set the SPLIT_END flag in the right-hand
1204 		 * split page; this can save some I/O for vacuum since it need not
1205 		 * proceed to the right sibling.  We can set the flag if the right
1206 		 * sibling has a different cycleid: that means it could not be part of
1207 		 * a group of pages that were all split off from the same ancestor
1208 		 * page.  If you're confused, imagine that page A splits to A B and
1209 		 * then again, yielding A C B, while vacuum is in progress.  Tuples
1210 		 * originally in A could now be in either B or C, hence vacuum must
1211 		 * examine both pages.  But if D, our right sibling, has a different
1212 		 * cycleid then it could not contain any tuples that were in A when
1213 		 * the vacuum started.
1214 		 */
1215 		if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1216 			ropaque->btpo_flags |= BTP_SPLIT_END;
1217 	}
1218 
1219 	/*
1220 	 * Right sibling is locked, new siblings are prepared, but original page
1221 	 * is not updated yet.
1222 	 *
1223 	 * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
1224 	 * not starting the critical section till here because we haven't been
1225 	 * scribbling on the original page yet; see comments above.
1226 	 */
1227 	START_CRIT_SECTION();
1228 
1229 	/*
1230 	 * By here, the original data page has been split into two new halves, and
1231 	 * these are correct.  The algorithm requires that the left page never
1232 	 * move during a split, so we copy the new left page back on top of the
1233 	 * original.  Note that this is not a waste of time, since we also require
1234 	 * (in the page management code) that the center of a page always be
1235 	 * clean, and the most efficient way to guarantee this is just to compact
1236 	 * the data by reinserting it into a new left page.  (XXX the latter
1237 	 * comment is probably obsolete; but in any case it's good to not scribble
1238 	 * on the original page until we enter the critical section.)
1239 	 *
1240 	 * We need to do this before writing the WAL record, so that XLogInsert
1241 	 * can WAL log an image of the page if necessary.
1242 	 */
1243 	PageRestoreTempPage(leftpage, origpage);
1244 	/* leftpage, lopaque must not be used below here */
1245 
1246 	MarkBufferDirty(buf);
1247 	MarkBufferDirty(rbuf);
1248 
1249 	if (!P_RIGHTMOST(ropaque))
1250 	{
1251 		sopaque->btpo_prev = rightpagenumber;
1252 		MarkBufferDirty(sbuf);
1253 	}
1254 
1255 	/*
1256 	 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1257 	 * a split.
1258 	 */
1259 	if (!isleaf)
1260 	{
1261 		Page		cpage = BufferGetPage(cbuf);
1262 		BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1263 
1264 		cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1265 		MarkBufferDirty(cbuf);
1266 	}
1267 
1268 	/* XLOG stuff */
1269 	if (RelationNeedsWAL(rel))
1270 	{
1271 		xl_btree_split xlrec;
1272 		uint8		xlinfo;
1273 		XLogRecPtr	recptr;
1274 
1275 		xlrec.level = ropaque->btpo.level;
1276 		xlrec.firstright = firstright;
1277 		xlrec.newitemoff = newitemoff;
1278 
1279 		XLogBeginInsert();
1280 		XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1281 
1282 		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1283 		XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1284 		/* Log the right sibling, because we've changed its prev-pointer. */
1285 		if (!P_RIGHTMOST(ropaque))
1286 			XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1287 		if (BufferIsValid(cbuf))
1288 			XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1289 
1290 		/*
1291 		 * Log the new item, if it was inserted on the left page. (If it was
1292 		 * put on the right page, we don't need to explicitly WAL log it
1293 		 * because it's included with all the other items on the right page.)
1294 		 * Show the new item as belonging to the left page buffer, so that it
1295 		 * is not stored if XLogInsert decides it needs a full-page image of
1296 		 * the left page.  We store the offset anyway, though, to support
1297 		 * archive compression of these records.
1298 		 */
1299 		if (newitemonleft)
1300 			XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1301 
1302 		/* Log left page */
1303 		if (!isleaf)
1304 		{
1305 			/*
1306 			 * We must also log the left page's high key, because the right
1307 			 * page's leftmost key is suppressed on non-leaf levels.  Show it
1308 			 * as belonging to the left page buffer, so that it is not stored
1309 			 * if XLogInsert decides it needs a full-page image of the left
1310 			 * page.
1311 			 */
1312 			itemid = PageGetItemId(origpage, P_HIKEY);
1313 			item = (IndexTuple) PageGetItem(origpage, itemid);
1314 			XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1315 		}
1316 
1317 		/*
1318 		 * Log the contents of the right page in the format understood by
1319 		 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1320 		 * because we're going to recreate the whole page anyway, so it should
1321 		 * never be stored by XLogInsert.
1322 		 *
1323 		 * Direct access to page is not good but faster - we should implement
1324 		 * some new func in page API.  Note we only store the tuples
1325 		 * themselves, knowing that they were inserted in item-number order
1326 		 * and so the item pointers can be reconstructed.  See comments for
1327 		 * _bt_restore_page().
1328 		 */
1329 		XLogRegisterBufData(1,
1330 							(char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1331 							((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1332 
1333 		if (isroot)
1334 			xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1335 		else
1336 			xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1337 
1338 		recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1339 
1340 		PageSetLSN(origpage, recptr);
1341 		PageSetLSN(rightpage, recptr);
1342 		if (!P_RIGHTMOST(ropaque))
1343 		{
1344 			PageSetLSN(spage, recptr);
1345 		}
1346 		if (!isleaf)
1347 		{
1348 			PageSetLSN(BufferGetPage(cbuf), recptr);
1349 		}
1350 	}
1351 
1352 	END_CRIT_SECTION();
1353 
1354 	/* release the old right sibling */
1355 	if (!P_RIGHTMOST(ropaque))
1356 		_bt_relbuf(rel, sbuf);
1357 
1358 	/* release the child */
1359 	if (!isleaf)
1360 		_bt_relbuf(rel, cbuf);
1361 
1362 	/* split's done */
1363 	return rbuf;
1364 }
1365 
1366 /*
1367  *	_bt_findsplitloc() -- find an appropriate place to split a page.
1368  *
1369  * The idea here is to equalize the free space that will be on each split
1370  * page, *after accounting for the inserted tuple*.  (If we fail to account
1371  * for it, we might find ourselves with too little room on the page that
1372  * it needs to go into!)
1373  *
1374  * If the page is the rightmost page on its level, we instead try to arrange
1375  * to leave the left split page fillfactor% full.  In this way, when we are
1376  * inserting successively increasing keys (consider sequences, timestamps,
1377  * etc) we will end up with a tree whose pages are about fillfactor% full,
1378  * instead of the 50% full result that we'd get without this special case.
1379  * This is the same as nbtsort.c produces for a newly-created tree.  Note
1380  * that leaf and nonleaf pages use different fillfactors.
1381  *
1382  * We are passed the intended insert position of the new tuple, expressed as
1383  * the offsetnumber of the tuple it must go in front of.  (This could be
1384  * maxoff+1 if the tuple is to go at the end.)
1385  *
1386  * We return the index of the first existing tuple that should go on the
1387  * righthand page, plus a boolean indicating whether the new tuple goes on
1388  * the left or right page.  The bool is necessary to disambiguate the case
1389  * where firstright == newitemoff.
1390  */
1391 static OffsetNumber
_bt_findsplitloc(Relation rel,Page page,OffsetNumber newitemoff,Size newitemsz,bool * newitemonleft)1392 _bt_findsplitloc(Relation rel,
1393 				 Page page,
1394 				 OffsetNumber newitemoff,
1395 				 Size newitemsz,
1396 				 bool *newitemonleft)
1397 {
1398 	BTPageOpaque opaque;
1399 	OffsetNumber offnum;
1400 	OffsetNumber maxoff;
1401 	ItemId		itemid;
1402 	FindSplitData state;
1403 	int			leftspace,
1404 				rightspace,
1405 				goodenough,
1406 				olddataitemstotal,
1407 				olddataitemstoleft;
1408 	bool		goodenoughfound;
1409 
1410 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1411 
1412 	/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1413 	newitemsz += sizeof(ItemIdData);
1414 
1415 	/* Total free space available on a btree page, after fixed overhead */
1416 	leftspace = rightspace =
1417 		PageGetPageSize(page) - SizeOfPageHeaderData -
1418 		MAXALIGN(sizeof(BTPageOpaqueData));
1419 
1420 	/* The right page will have the same high key as the old page */
1421 	if (!P_RIGHTMOST(opaque))
1422 	{
1423 		itemid = PageGetItemId(page, P_HIKEY);
1424 		rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1425 							 sizeof(ItemIdData));
1426 	}
1427 
1428 	/* Count up total space in data items without actually scanning 'em */
1429 	olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1430 
1431 	state.newitemsz = newitemsz;
1432 	state.is_leaf = P_ISLEAF(opaque);
1433 	state.is_rightmost = P_RIGHTMOST(opaque);
1434 	state.have_split = false;
1435 	if (state.is_leaf)
1436 		state.fillfactor = RelationGetFillFactor(rel,
1437 												 BTREE_DEFAULT_FILLFACTOR);
1438 	else
1439 		state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1440 	state.newitemonleft = false;	/* these just to keep compiler quiet */
1441 	state.firstright = 0;
1442 	state.best_delta = 0;
1443 	state.leftspace = leftspace;
1444 	state.rightspace = rightspace;
1445 	state.olddataitemstotal = olddataitemstotal;
1446 	state.newitemoff = newitemoff;
1447 
1448 	/*
1449 	 * Finding the best possible split would require checking all the possible
1450 	 * split points, because of the high-key and left-key special cases.
1451 	 * That's probably more work than it's worth; instead, stop as soon as we
1452 	 * find a "good-enough" split, where good-enough is defined as an
1453 	 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1454 	 * should let us stop near the middle on most pages, instead of plowing to
1455 	 * the end.
1456 	 */
1457 	goodenough = leftspace / 16;
1458 
1459 	/*
1460 	 * Scan through the data items and calculate space usage for a split at
1461 	 * each possible position.
1462 	 */
1463 	olddataitemstoleft = 0;
1464 	goodenoughfound = false;
1465 	maxoff = PageGetMaxOffsetNumber(page);
1466 
1467 	for (offnum = P_FIRSTDATAKEY(opaque);
1468 		 offnum <= maxoff;
1469 		 offnum = OffsetNumberNext(offnum))
1470 	{
1471 		Size		itemsz;
1472 
1473 		itemid = PageGetItemId(page, offnum);
1474 		itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1475 
1476 		/*
1477 		 * Will the new item go to left or right of split?
1478 		 */
1479 		if (offnum > newitemoff)
1480 			_bt_checksplitloc(&state, offnum, true,
1481 							  olddataitemstoleft, itemsz);
1482 
1483 		else if (offnum < newitemoff)
1484 			_bt_checksplitloc(&state, offnum, false,
1485 							  olddataitemstoleft, itemsz);
1486 		else
1487 		{
1488 			/* need to try it both ways! */
1489 			_bt_checksplitloc(&state, offnum, true,
1490 							  olddataitemstoleft, itemsz);
1491 
1492 			_bt_checksplitloc(&state, offnum, false,
1493 							  olddataitemstoleft, itemsz);
1494 		}
1495 
1496 		/* Abort scan once we find a good-enough choice */
1497 		if (state.have_split && state.best_delta <= goodenough)
1498 		{
1499 			goodenoughfound = true;
1500 			break;
1501 		}
1502 
1503 		olddataitemstoleft += itemsz;
1504 	}
1505 
1506 	/*
1507 	 * If the new item goes as the last item, check for splitting so that all
1508 	 * the old items go to the left page and the new item goes to the right
1509 	 * page.
1510 	 */
1511 	if (newitemoff > maxoff && !goodenoughfound)
1512 		_bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1513 
1514 	/*
1515 	 * I believe it is not possible to fail to find a feasible split, but just
1516 	 * in case ...
1517 	 */
1518 	if (!state.have_split)
1519 		elog(ERROR, "could not find a feasible split point for index \"%s\"",
1520 			 RelationGetRelationName(rel));
1521 
1522 	*newitemonleft = state.newitemonleft;
1523 	return state.firstright;
1524 }
1525 
1526 /*
1527  * Subroutine to analyze a particular possible split choice (ie, firstright
1528  * and newitemonleft settings), and record the best split so far in *state.
1529  *
1530  * firstoldonright is the offset of the first item on the original page
1531  * that goes to the right page, and firstoldonrightsz is the size of that
1532  * tuple. firstoldonright can be > max offset, which means that all the old
1533  * items go to the left page and only the new item goes to the right page.
1534  * In that case, firstoldonrightsz is not used.
1535  *
1536  * olddataitemstoleft is the total size of all old items to the left of
1537  * firstoldonright.
1538  */
1539 static void
_bt_checksplitloc(FindSplitData * state,OffsetNumber firstoldonright,bool newitemonleft,int olddataitemstoleft,Size firstoldonrightsz)1540 _bt_checksplitloc(FindSplitData *state,
1541 				  OffsetNumber firstoldonright,
1542 				  bool newitemonleft,
1543 				  int olddataitemstoleft,
1544 				  Size firstoldonrightsz)
1545 {
1546 	int			leftfree,
1547 				rightfree;
1548 	Size		firstrightitemsz;
1549 	bool		newitemisfirstonright;
1550 
1551 	/* Is the new item going to be the first item on the right page? */
1552 	newitemisfirstonright = (firstoldonright == state->newitemoff
1553 							 && !newitemonleft);
1554 
1555 	if (newitemisfirstonright)
1556 		firstrightitemsz = state->newitemsz;
1557 	else
1558 		firstrightitemsz = firstoldonrightsz;
1559 
1560 	/* Account for all the old tuples */
1561 	leftfree = state->leftspace - olddataitemstoleft;
1562 	rightfree = state->rightspace -
1563 		(state->olddataitemstotal - olddataitemstoleft);
1564 
1565 	/*
1566 	 * The first item on the right page becomes the high key of the left page;
1567 	 * therefore it counts against left space as well as right space.
1568 	 */
1569 	leftfree -= firstrightitemsz;
1570 
1571 	/* account for the new item */
1572 	if (newitemonleft)
1573 		leftfree -= (int) state->newitemsz;
1574 	else
1575 		rightfree -= (int) state->newitemsz;
1576 
1577 	/*
1578 	 * If we are not on the leaf level, we will be able to discard the key
1579 	 * data from the first item that winds up on the right page.
1580 	 */
1581 	if (!state->is_leaf)
1582 		rightfree += (int) firstrightitemsz -
1583 			(int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1584 
1585 	/*
1586 	 * If feasible split point, remember best delta.
1587 	 */
1588 	if (leftfree >= 0 && rightfree >= 0)
1589 	{
1590 		int			delta;
1591 
1592 		if (state->is_rightmost)
1593 		{
1594 			/*
1595 			 * If splitting a rightmost page, try to put (100-fillfactor)% of
1596 			 * free space on left page. See comments for _bt_findsplitloc.
1597 			 */
1598 			delta = (state->fillfactor * leftfree)
1599 				- ((100 - state->fillfactor) * rightfree);
1600 		}
1601 		else
1602 		{
1603 			/* Otherwise, aim for equal free space on both sides */
1604 			delta = leftfree - rightfree;
1605 		}
1606 
1607 		if (delta < 0)
1608 			delta = -delta;
1609 		if (!state->have_split || delta < state->best_delta)
1610 		{
1611 			state->have_split = true;
1612 			state->newitemonleft = newitemonleft;
1613 			state->firstright = firstoldonright;
1614 			state->best_delta = delta;
1615 		}
1616 	}
1617 }
1618 
1619 /*
1620  * _bt_insert_parent() -- Insert downlink into parent after a page split.
1621  *
1622  * On entry, buf and rbuf are the left and right split pages, which we
1623  * still hold write locks on per the L&Y algorithm.  We release the
1624  * write locks once we have write lock on the parent page.  (Any sooner,
1625  * and it'd be possible for some other process to try to split or delete
1626  * one of these pages, and get confused because it cannot find the downlink.)
1627  *
1628  * stack - stack showing how we got here.  May be NULL in cases that don't
1629  *			have to be efficient (concurrent ROOT split, WAL recovery)
1630  * is_root - we split the true root
1631  * is_only - we split a page alone on its level (might have been fast root)
1632  */
1633 static void
_bt_insert_parent(Relation rel,Buffer buf,Buffer rbuf,BTStack stack,bool is_root,bool is_only)1634 _bt_insert_parent(Relation rel,
1635 				  Buffer buf,
1636 				  Buffer rbuf,
1637 				  BTStack stack,
1638 				  bool is_root,
1639 				  bool is_only)
1640 {
1641 	/*
1642 	 * Here we have to do something Lehman and Yao don't talk about: deal with
1643 	 * a root split and construction of a new root.  If our stack is empty
1644 	 * then we have just split a node on what had been the root level when we
1645 	 * descended the tree.  If it was still the root then we perform a
1646 	 * new-root construction.  If it *wasn't* the root anymore, search to find
1647 	 * the next higher level that someone constructed meanwhile, and find the
1648 	 * right place to insert as for the normal case.
1649 	 *
1650 	 * If we have to search for the parent level, we do so by re-descending
1651 	 * from the root.  This is not super-efficient, but it's rare enough not
1652 	 * to matter.
1653 	 */
1654 	if (is_root)
1655 	{
1656 		Buffer		rootbuf;
1657 
1658 		Assert(stack == NULL);
1659 		Assert(is_only);
1660 		/* create a new root node and update the metapage */
1661 		rootbuf = _bt_newroot(rel, buf, rbuf);
1662 		/* release the split buffers */
1663 		_bt_relbuf(rel, rootbuf);
1664 		_bt_relbuf(rel, rbuf);
1665 		_bt_relbuf(rel, buf);
1666 	}
1667 	else
1668 	{
1669 		BlockNumber bknum = BufferGetBlockNumber(buf);
1670 		BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1671 		Page		page = BufferGetPage(buf);
1672 		IndexTuple	new_item;
1673 		BTStackData fakestack;
1674 		IndexTuple	ritem;
1675 		Buffer		pbuf;
1676 
1677 		if (stack == NULL)
1678 		{
1679 			BTPageOpaque lpageop;
1680 
1681 			elog(DEBUG2, "concurrent ROOT page split");
1682 			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1683 			/* Find the leftmost page at the next level up */
1684 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1685 									NULL);
1686 			/* Set up a phony stack entry pointing there */
1687 			stack = &fakestack;
1688 			stack->bts_blkno = BufferGetBlockNumber(pbuf);
1689 			stack->bts_offset = InvalidOffsetNumber;
1690 			/* bts_btentry will be initialized below */
1691 			stack->bts_parent = NULL;
1692 			_bt_relbuf(rel, pbuf);
1693 		}
1694 
1695 		/* get high key from left page == lowest key on new right page */
1696 		ritem = (IndexTuple) PageGetItem(page,
1697 										 PageGetItemId(page, P_HIKEY));
1698 
1699 		/* form an index tuple that points at the new right page */
1700 		new_item = CopyIndexTuple(ritem);
1701 		ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1702 
1703 		/*
1704 		 * Find the parent buffer and get the parent page.
1705 		 *
1706 		 * Oops - if we were moved right then we need to change stack item! We
1707 		 * want to find parent pointing to where we are, right ?	- vadim
1708 		 * 05/27/97
1709 		 */
1710 		ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1711 		pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1712 
1713 		/*
1714 		 * Now we can unlock the right child. The left child will be unlocked
1715 		 * by _bt_insertonpg().
1716 		 */
1717 		_bt_relbuf(rel, rbuf);
1718 
1719 		/* Check for error only after writing children */
1720 		if (pbuf == InvalidBuffer)
1721 			elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1722 				 RelationGetRelationName(rel), bknum, rbknum);
1723 
1724 		/* Recursively update the parent */
1725 		_bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
1726 					   new_item, stack->bts_offset + 1,
1727 					   is_only);
1728 
1729 		/* be tidy */
1730 		pfree(new_item);
1731 	}
1732 }
1733 
1734 /*
1735  * _bt_finish_split() -- Finish an incomplete split
1736  *
1737  * A crash or other failure can leave a split incomplete.  The insertion
1738  * routines won't allow to insert on a page that is incompletely split.
1739  * Before inserting on such a page, call _bt_finish_split().
1740  *
1741  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
1742  * and unpinned.
1743  */
1744 void
_bt_finish_split(Relation rel,Buffer lbuf,BTStack stack)1745 _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1746 {
1747 	Page		lpage = BufferGetPage(lbuf);
1748 	BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1749 	Buffer		rbuf;
1750 	Page		rpage;
1751 	BTPageOpaque rpageop;
1752 	bool		was_root;
1753 	bool		was_only;
1754 
1755 	Assert(P_INCOMPLETE_SPLIT(lpageop));
1756 
1757 	/* Lock right sibling, the one missing the downlink */
1758 	rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1759 	rpage = BufferGetPage(rbuf);
1760 	rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1761 
1762 	/* Could this be a root split? */
1763 	if (!stack)
1764 	{
1765 		Buffer		metabuf;
1766 		Page		metapg;
1767 		BTMetaPageData *metad;
1768 
1769 		/* acquire lock on the metapage */
1770 		metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1771 		metapg = BufferGetPage(metabuf);
1772 		metad = BTPageGetMeta(metapg);
1773 
1774 		was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1775 
1776 		_bt_relbuf(rel, metabuf);
1777 	}
1778 	else
1779 		was_root = false;
1780 
1781 	/* Was this the only page on the level before split? */
1782 	was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1783 
1784 	elog(DEBUG1, "finishing incomplete split of %u/%u",
1785 		 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1786 
1787 	_bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1788 }
1789 
1790 /*
1791  *	_bt_getstackbuf() -- Walk back up the tree one step, and find the item
1792  *						 we last looked at in the parent.
1793  *
1794  *		This is possible because we save the downlink from the parent item,
1795  *		which is enough to uniquely identify it.  Insertions into the parent
1796  *		level could cause the item to move right; deletions could cause it
1797  *		to move left, but not left of the page we previously found it in.
1798  *
1799  *		Adjusts bts_blkno & bts_offset if changed.
1800  *
1801  *		Returns InvalidBuffer if item not found (should not happen).
1802  */
1803 Buffer
_bt_getstackbuf(Relation rel,BTStack stack,int access)1804 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1805 {
1806 	BlockNumber blkno;
1807 	OffsetNumber start;
1808 
1809 	blkno = stack->bts_blkno;
1810 	start = stack->bts_offset;
1811 
1812 	for (;;)
1813 	{
1814 		Buffer		buf;
1815 		Page		page;
1816 		BTPageOpaque opaque;
1817 
1818 		buf = _bt_getbuf(rel, blkno, access);
1819 		page = BufferGetPage(buf);
1820 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1821 
1822 		if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
1823 		{
1824 			_bt_finish_split(rel, buf, stack->bts_parent);
1825 			continue;
1826 		}
1827 
1828 		if (!P_IGNORE(opaque))
1829 		{
1830 			OffsetNumber offnum,
1831 						minoff,
1832 						maxoff;
1833 			ItemId		itemid;
1834 			IndexTuple	item;
1835 
1836 			minoff = P_FIRSTDATAKEY(opaque);
1837 			maxoff = PageGetMaxOffsetNumber(page);
1838 
1839 			/*
1840 			 * start = InvalidOffsetNumber means "search the whole page". We
1841 			 * need this test anyway due to possibility that page has a high
1842 			 * key now when it didn't before.
1843 			 */
1844 			if (start < minoff)
1845 				start = minoff;
1846 
1847 			/*
1848 			 * Need this check too, to guard against possibility that page
1849 			 * split since we visited it originally.
1850 			 */
1851 			if (start > maxoff)
1852 				start = OffsetNumberNext(maxoff);
1853 
1854 			/*
1855 			 * These loops will check every item on the page --- but in an
1856 			 * order that's attuned to the probability of where it actually
1857 			 * is.  Scan to the right first, then to the left.
1858 			 */
1859 			for (offnum = start;
1860 				 offnum <= maxoff;
1861 				 offnum = OffsetNumberNext(offnum))
1862 			{
1863 				itemid = PageGetItemId(page, offnum);
1864 				item = (IndexTuple) PageGetItem(page, itemid);
1865 				if (BTEntrySame(item, &stack->bts_btentry))
1866 				{
1867 					/* Return accurate pointer to where link is now */
1868 					stack->bts_blkno = blkno;
1869 					stack->bts_offset = offnum;
1870 					return buf;
1871 				}
1872 			}
1873 
1874 			for (offnum = OffsetNumberPrev(start);
1875 				 offnum >= minoff;
1876 				 offnum = OffsetNumberPrev(offnum))
1877 			{
1878 				itemid = PageGetItemId(page, offnum);
1879 				item = (IndexTuple) PageGetItem(page, itemid);
1880 				if (BTEntrySame(item, &stack->bts_btentry))
1881 				{
1882 					/* Return accurate pointer to where link is now */
1883 					stack->bts_blkno = blkno;
1884 					stack->bts_offset = offnum;
1885 					return buf;
1886 				}
1887 			}
1888 		}
1889 
1890 		/*
1891 		 * The item we're looking for moved right at least one page.
1892 		 */
1893 		if (P_RIGHTMOST(opaque))
1894 		{
1895 			_bt_relbuf(rel, buf);
1896 			return InvalidBuffer;
1897 		}
1898 		blkno = opaque->btpo_next;
1899 		start = InvalidOffsetNumber;
1900 		_bt_relbuf(rel, buf);
1901 	}
1902 }
1903 
1904 /*
1905  *	_bt_newroot() -- Create a new root page for the index.
1906  *
1907  *		We've just split the old root page and need to create a new one.
1908  *		In order to do this, we add a new root page to the file, then lock
1909  *		the metadata page and update it.  This is guaranteed to be deadlock-
1910  *		free, because all readers release their locks on the metadata page
1911  *		before trying to lock the root, and all writers lock the root before
1912  *		trying to lock the metadata page.  We have a write lock on the old
1913  *		root page, so we have not introduced any cycles into the waits-for
1914  *		graph.
1915  *
1916  *		On entry, lbuf (the old root) and rbuf (its new peer) are write-
1917  *		locked. On exit, a new root page exists with entries for the
1918  *		two new children, metapage is updated and unlocked/unpinned.
1919  *		The new root buffer is returned to caller which has to unlock/unpin
1920  *		lbuf, rbuf & rootbuf.
1921  */
1922 static Buffer
_bt_newroot(Relation rel,Buffer lbuf,Buffer rbuf)1923 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1924 {
1925 	Buffer		rootbuf;
1926 	Page		lpage,
1927 				rootpage;
1928 	BlockNumber lbkno,
1929 				rbkno;
1930 	BlockNumber rootblknum;
1931 	BTPageOpaque rootopaque;
1932 	BTPageOpaque lopaque;
1933 	ItemId		itemid;
1934 	IndexTuple	item;
1935 	IndexTuple	left_item;
1936 	Size		left_item_sz;
1937 	IndexTuple	right_item;
1938 	Size		right_item_sz;
1939 	Buffer		metabuf;
1940 	Page		metapg;
1941 	BTMetaPageData *metad;
1942 
1943 	lbkno = BufferGetBlockNumber(lbuf);
1944 	rbkno = BufferGetBlockNumber(rbuf);
1945 	lpage = BufferGetPage(lbuf);
1946 	lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
1947 
1948 	/* get a new root page */
1949 	rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1950 	rootpage = BufferGetPage(rootbuf);
1951 	rootblknum = BufferGetBlockNumber(rootbuf);
1952 
1953 	/* acquire lock on the metapage */
1954 	metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1955 	metapg = BufferGetPage(metabuf);
1956 	metad = BTPageGetMeta(metapg);
1957 
1958 	/*
1959 	 * Create downlink item for left page (old root).  Since this will be the
1960 	 * first item in a non-leaf page, it implicitly has minus-infinity key
1961 	 * value, so we need not store any actual key in it.
1962 	 */
1963 	left_item_sz = sizeof(IndexTupleData);
1964 	left_item = (IndexTuple) palloc(left_item_sz);
1965 	left_item->t_info = left_item_sz;
1966 	ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
1967 
1968 	/*
1969 	 * Create downlink item for right page.  The key for it is obtained from
1970 	 * the "high key" position in the left page.
1971 	 */
1972 	itemid = PageGetItemId(lpage, P_HIKEY);
1973 	right_item_sz = ItemIdGetLength(itemid);
1974 	item = (IndexTuple) PageGetItem(lpage, itemid);
1975 	right_item = CopyIndexTuple(item);
1976 	ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
1977 
1978 	/* NO EREPORT(ERROR) from here till newroot op is logged */
1979 	START_CRIT_SECTION();
1980 
1981 	/* set btree special data */
1982 	rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1983 	rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1984 	rootopaque->btpo_flags = BTP_ROOT;
1985 	rootopaque->btpo.level =
1986 		((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1987 	rootopaque->btpo_cycleid = 0;
1988 
1989 	/* update metapage data */
1990 	metad->btm_root = rootblknum;
1991 	metad->btm_level = rootopaque->btpo.level;
1992 	metad->btm_fastroot = rootblknum;
1993 	metad->btm_fastlevel = rootopaque->btpo.level;
1994 
1995 	/*
1996 	 * Insert the left page pointer into the new root page.  The root page is
1997 	 * the rightmost page on its level so there is no "high key" in it; the
1998 	 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1999 	 *
2000 	 * Note: we *must* insert the two items in item-number order, for the
2001 	 * benefit of _bt_restore_page().
2002 	 */
2003 	if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
2004 					false, false) == InvalidOffsetNumber)
2005 		elog(PANIC, "failed to add leftkey to new root page"
2006 			 " while splitting block %u of index \"%s\"",
2007 			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2008 
2009 	/*
2010 	 * insert the right page pointer into the new root page.
2011 	 */
2012 	if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2013 					false, false) == InvalidOffsetNumber)
2014 		elog(PANIC, "failed to add rightkey to new root page"
2015 			 " while splitting block %u of index \"%s\"",
2016 			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2017 
2018 	/* Clear the incomplete-split flag in the left child */
2019 	Assert(P_INCOMPLETE_SPLIT(lopaque));
2020 	lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2021 	MarkBufferDirty(lbuf);
2022 
2023 	MarkBufferDirty(rootbuf);
2024 	MarkBufferDirty(metabuf);
2025 
2026 	/* XLOG stuff */
2027 	if (RelationNeedsWAL(rel))
2028 	{
2029 		xl_btree_newroot xlrec;
2030 		XLogRecPtr	recptr;
2031 		xl_btree_metadata md;
2032 
2033 		xlrec.rootblk = rootblknum;
2034 		xlrec.level = metad->btm_level;
2035 
2036 		XLogBeginInsert();
2037 		XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2038 
2039 		XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2040 		XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2041 		XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
2042 
2043 		md.root = rootblknum;
2044 		md.level = metad->btm_level;
2045 		md.fastroot = rootblknum;
2046 		md.fastlevel = metad->btm_level;
2047 
2048 		XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2049 
2050 		/*
2051 		 * Direct access to page is not good but faster - we should implement
2052 		 * some new func in page API.
2053 		 */
2054 		XLogRegisterBufData(0,
2055 							(char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2056 							((PageHeader) rootpage)->pd_special -
2057 							((PageHeader) rootpage)->pd_upper);
2058 
2059 		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2060 
2061 		PageSetLSN(lpage, recptr);
2062 		PageSetLSN(rootpage, recptr);
2063 		PageSetLSN(metapg, recptr);
2064 	}
2065 
2066 	END_CRIT_SECTION();
2067 
2068 	/* done with metapage */
2069 	_bt_relbuf(rel, metabuf);
2070 
2071 	pfree(left_item);
2072 	pfree(right_item);
2073 
2074 	return rootbuf;
2075 }
2076 
2077 /*
2078  *	_bt_pgaddtup() -- add a tuple to a particular page in the index.
2079  *
2080  *		This routine adds the tuple to the page as requested.  It does
2081  *		not affect pin/lock status, but you'd better have a write lock
2082  *		and pin on the target buffer!  Don't forget to write and release
2083  *		the buffer afterwards, either.
2084  *
2085  *		The main difference between this routine and a bare PageAddItem call
2086  *		is that this code knows that the leftmost index tuple on a non-leaf
2087  *		btree page doesn't need to have a key.  Therefore, it strips such
2088  *		tuples down to just the tuple header.  CAUTION: this works ONLY if
2089  *		we insert the tuples in order, so that the given itup_off does
2090  *		represent the final position of the tuple!
2091  */
2092 static bool
_bt_pgaddtup(Page page,Size itemsize,IndexTuple itup,OffsetNumber itup_off)2093 _bt_pgaddtup(Page page,
2094 			 Size itemsize,
2095 			 IndexTuple itup,
2096 			 OffsetNumber itup_off)
2097 {
2098 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2099 	IndexTupleData trunctuple;
2100 
2101 	if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2102 	{
2103 		trunctuple = *itup;
2104 		trunctuple.t_info = sizeof(IndexTupleData);
2105 		itup = &trunctuple;
2106 		itemsize = sizeof(IndexTupleData);
2107 	}
2108 
2109 	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2110 					false, false) == InvalidOffsetNumber)
2111 		return false;
2112 
2113 	return true;
2114 }
2115 
2116 /*
2117  * _bt_isequal - used in _bt_doinsert in check for duplicates.
2118  *
2119  * This is very similar to _bt_compare, except for NULL handling.
2120  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
2121  */
2122 static bool
_bt_isequal(TupleDesc itupdesc,Page page,OffsetNumber offnum,int keysz,ScanKey scankey)2123 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2124 			int keysz, ScanKey scankey)
2125 {
2126 	IndexTuple	itup;
2127 	int			i;
2128 
2129 	/* Better be comparing to a leaf item */
2130 	Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2131 
2132 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2133 
2134 	for (i = 1; i <= keysz; i++)
2135 	{
2136 		AttrNumber	attno;
2137 		Datum		datum;
2138 		bool		isNull;
2139 		int32		result;
2140 
2141 		attno = scankey->sk_attno;
2142 		Assert(attno == i);
2143 		datum = index_getattr(itup, attno, itupdesc, &isNull);
2144 
2145 		/* NULLs are never equal to anything */
2146 		if (isNull || (scankey->sk_flags & SK_ISNULL))
2147 			return false;
2148 
2149 		result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2150 												 scankey->sk_collation,
2151 												 datum,
2152 												 scankey->sk_argument));
2153 
2154 		if (result != 0)
2155 			return false;
2156 
2157 		scankey++;
2158 	}
2159 
2160 	/* if we get here, the keys are equal */
2161 	return true;
2162 }
2163 
2164 /*
2165  * _bt_vacuum_one_page - vacuum just one index page.
2166  *
2167  * Try to remove LP_DEAD items from the given page.  The passed buffer
2168  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2169  * super-exclusive "cleanup" lock (see nbtree/README).
2170  */
2171 static void
_bt_vacuum_one_page(Relation rel,Buffer buffer,Relation heapRel)2172 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2173 {
2174 	OffsetNumber deletable[MaxOffsetNumber];
2175 	int			ndeletable = 0;
2176 	OffsetNumber offnum,
2177 				minoff,
2178 				maxoff;
2179 	Page		page = BufferGetPage(buffer);
2180 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2181 
2182 	/*
2183 	 * Scan over all items to see which ones need to be deleted according to
2184 	 * LP_DEAD flags.
2185 	 */
2186 	minoff = P_FIRSTDATAKEY(opaque);
2187 	maxoff = PageGetMaxOffsetNumber(page);
2188 	for (offnum = minoff;
2189 		 offnum <= maxoff;
2190 		 offnum = OffsetNumberNext(offnum))
2191 	{
2192 		ItemId		itemId = PageGetItemId(page, offnum);
2193 
2194 		if (ItemIdIsDead(itemId))
2195 			deletable[ndeletable++] = offnum;
2196 	}
2197 
2198 	if (ndeletable > 0)
2199 		_bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2200 
2201 	/*
2202 	 * Note: if we didn't find any LP_DEAD items, then the page's
2203 	 * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
2204 	 * separate write to clear it, however.  We will clear it when we split
2205 	 * the page.
2206 	 */
2207 }
2208