1 /*-------------------------------------------------------------------------
2  *
3  * ginbtree.c
4  *	  page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/ginbtree.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/xloginsert.h"
19 #include "miscadmin.h"
20 #include "utils/memutils.h"
21 #include "utils/rel.h"
22 
23 static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
24 static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
25 			   void *insertdata, BlockNumber updateblkno,
26 			   Buffer childbuf, GinStatsData *buildStats);
27 static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
28 			   bool freestack, GinStatsData *buildStats);
29 
30 /*
31  * Lock buffer by needed method for search.
32  */
33 static int
ginTraverseLock(Buffer buffer,bool searchMode)34 ginTraverseLock(Buffer buffer, bool searchMode)
35 {
36 	Page		page;
37 	int			access = GIN_SHARE;
38 
39 	LockBuffer(buffer, GIN_SHARE);
40 	page = BufferGetPage(buffer);
41 	if (GinPageIsLeaf(page))
42 	{
43 		if (searchMode == FALSE)
44 		{
45 			/* we should relock our page */
46 			LockBuffer(buffer, GIN_UNLOCK);
47 			LockBuffer(buffer, GIN_EXCLUSIVE);
48 
49 			/* But root can become non-leaf during relock */
50 			if (!GinPageIsLeaf(page))
51 			{
52 				/* restore old lock type (very rare) */
53 				LockBuffer(buffer, GIN_UNLOCK);
54 				LockBuffer(buffer, GIN_SHARE);
55 			}
56 			else
57 				access = GIN_EXCLUSIVE;
58 		}
59 	}
60 
61 	return access;
62 }
63 
64 /*
65  * Descend the tree to the leaf page that contains or would contain the key
66  * we're searching for. The key should already be filled in 'btree', in
67  * tree-type specific manner. If btree->fullScan is true, descends to the
68  * leftmost leaf page.
69  *
70  * If 'searchmode' is false, on return stack->buffer is exclusively locked,
71  * and the stack represents the full path to the root. Otherwise stack->buffer
72  * is share-locked, and stack->parent is NULL.
73  */
74 GinBtreeStack *
ginFindLeafPage(GinBtree btree,bool searchMode,Snapshot snapshot)75 ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
76 {
77 	GinBtreeStack *stack;
78 
79 	stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
80 	stack->blkno = btree->rootBlkno;
81 	stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
82 	stack->parent = NULL;
83 	stack->predictNumber = 1;
84 
85 	for (;;)
86 	{
87 		Page		page;
88 		BlockNumber child;
89 		int			access;
90 
91 		stack->off = InvalidOffsetNumber;
92 
93 		page = BufferGetPage(stack->buffer);
94 		TestForOldSnapshot(snapshot, btree->index, page);
95 
96 		access = ginTraverseLock(stack->buffer, searchMode);
97 
98 		/*
99 		 * If we're going to modify the tree, finish any incomplete splits we
100 		 * encounter on the way.
101 		 */
102 		if (!searchMode && GinPageIsIncompleteSplit(page))
103 			ginFinishSplit(btree, stack, false, NULL);
104 
105 		/*
106 		 * ok, page is correctly locked, we should check to move right ..,
107 		 * root never has a right link, so small optimization
108 		 */
109 		while (btree->fullScan == FALSE && stack->blkno != btree->rootBlkno &&
110 			   btree->isMoveRight(btree, page))
111 		{
112 			BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
113 
114 			if (rightlink == InvalidBlockNumber)
115 				/* rightmost page */
116 				break;
117 
118 			stack->buffer = ginStepRight(stack->buffer, btree->index, access);
119 			stack->blkno = rightlink;
120 			page = BufferGetPage(stack->buffer);
121 			TestForOldSnapshot(snapshot, btree->index, page);
122 
123 			if (!searchMode && GinPageIsIncompleteSplit(page))
124 				ginFinishSplit(btree, stack, false, NULL);
125 		}
126 
127 		if (GinPageIsLeaf(page))	/* we found, return locked page */
128 			return stack;
129 
130 		/* now we have correct buffer, try to find child */
131 		child = btree->findChildPage(btree, stack);
132 
133 		LockBuffer(stack->buffer, GIN_UNLOCK);
134 		Assert(child != InvalidBlockNumber);
135 		Assert(stack->blkno != child);
136 
137 		if (searchMode)
138 		{
139 			/* in search mode we may forget path to leaf */
140 			stack->blkno = child;
141 			stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
142 		}
143 		else
144 		{
145 			GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
146 
147 			ptr->parent = stack;
148 			stack = ptr;
149 			stack->blkno = child;
150 			stack->buffer = ReadBuffer(btree->index, stack->blkno);
151 			stack->predictNumber = 1;
152 		}
153 	}
154 }
155 
156 /*
157  * Step right from current page.
158  *
159  * The next page is locked first, before releasing the current page. This is
160  * crucial to protect from concurrent page deletion (see comment in
161  * ginDeletePage).
162  */
163 Buffer
ginStepRight(Buffer buffer,Relation index,int lockmode)164 ginStepRight(Buffer buffer, Relation index, int lockmode)
165 {
166 	Buffer		nextbuffer;
167 	Page		page = BufferGetPage(buffer);
168 	bool		isLeaf = GinPageIsLeaf(page);
169 	bool		isData = GinPageIsData(page);
170 	BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
171 
172 	nextbuffer = ReadBuffer(index, blkno);
173 	LockBuffer(nextbuffer, lockmode);
174 	UnlockReleaseBuffer(buffer);
175 
176 	/* Sanity check that the page we stepped to is of similar kind. */
177 	page = BufferGetPage(nextbuffer);
178 	if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
179 		elog(ERROR, "right sibling of GIN page is of different type");
180 
181 	return nextbuffer;
182 }
183 
184 void
freeGinBtreeStack(GinBtreeStack * stack)185 freeGinBtreeStack(GinBtreeStack *stack)
186 {
187 	while (stack)
188 	{
189 		GinBtreeStack *tmp = stack->parent;
190 
191 		if (stack->buffer != InvalidBuffer)
192 			ReleaseBuffer(stack->buffer);
193 
194 		pfree(stack);
195 		stack = tmp;
196 	}
197 }
198 
199 /*
200  * Try to find parent for current stack position. Returns correct parent and
201  * child's offset in stack->parent. The root page is never released, to
202  * to prevent conflict with vacuum process.
203  */
204 static void
ginFindParents(GinBtree btree,GinBtreeStack * stack)205 ginFindParents(GinBtree btree, GinBtreeStack *stack)
206 {
207 	Page		page;
208 	Buffer		buffer;
209 	BlockNumber blkno,
210 				leftmostBlkno;
211 	OffsetNumber offset;
212 	GinBtreeStack *root;
213 	GinBtreeStack *ptr;
214 
215 	/*
216 	 * Unwind the stack all the way up to the root, leaving only the root
217 	 * item.
218 	 *
219 	 * Be careful not to release the pin on the root page! The pin on root
220 	 * page is required to lock out concurrent vacuums on the tree.
221 	 */
222 	root = stack->parent;
223 	while (root->parent)
224 	{
225 		ReleaseBuffer(root->buffer);
226 		root = root->parent;
227 	}
228 
229 	Assert(root->blkno == btree->rootBlkno);
230 	Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
231 	root->off = InvalidOffsetNumber;
232 
233 	blkno = root->blkno;
234 	buffer = root->buffer;
235 	offset = InvalidOffsetNumber;
236 
237 	ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
238 
239 	for (;;)
240 	{
241 		LockBuffer(buffer, GIN_EXCLUSIVE);
242 		page = BufferGetPage(buffer);
243 		if (GinPageIsLeaf(page))
244 			elog(ERROR, "Lost path");
245 
246 		if (GinPageIsIncompleteSplit(page))
247 		{
248 			Assert(blkno != btree->rootBlkno);
249 			ptr->blkno = blkno;
250 			ptr->buffer = buffer;
251 
252 			/*
253 			 * parent may be wrong, but if so, the ginFinishSplit call will
254 			 * recurse to call ginFindParents again to fix it.
255 			 */
256 			ptr->parent = root;
257 			ptr->off = InvalidOffsetNumber;
258 
259 			ginFinishSplit(btree, ptr, false, NULL);
260 		}
261 
262 		leftmostBlkno = btree->getLeftMostChild(btree, page);
263 
264 		while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
265 		{
266 			blkno = GinPageGetOpaque(page)->rightlink;
267 			if (blkno == InvalidBlockNumber)
268 			{
269 				UnlockReleaseBuffer(buffer);
270 				break;
271 			}
272 			buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
273 			page = BufferGetPage(buffer);
274 
275 			/* finish any incomplete splits, as above */
276 			if (GinPageIsIncompleteSplit(page))
277 			{
278 				Assert(blkno != btree->rootBlkno);
279 				ptr->blkno = blkno;
280 				ptr->buffer = buffer;
281 				ptr->parent = root;
282 				ptr->off = InvalidOffsetNumber;
283 
284 				ginFinishSplit(btree, ptr, false, NULL);
285 			}
286 		}
287 
288 		if (blkno != InvalidBlockNumber)
289 		{
290 			ptr->blkno = blkno;
291 			ptr->buffer = buffer;
292 			ptr->parent = root; /* it may be wrong, but in next call we will
293 								 * correct */
294 			ptr->off = offset;
295 			stack->parent = ptr;
296 			return;
297 		}
298 
299 		/* Descend down to next level */
300 		blkno = leftmostBlkno;
301 		buffer = ReadBuffer(btree->index, blkno);
302 	}
303 }
304 
305 /*
306  * Insert a new item to a page.
307  *
308  * Returns true if the insertion was finished. On false, the page was split and
309  * the parent needs to be updated. (A root split returns true as it doesn't
310  * need any further action by the caller to complete.)
311  *
312  * When inserting a downlink to an internal page, 'childbuf' contains the
313  * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
314  * atomically with the insert. Also, the existing item at offset stack->off
315  * in the target page is updated to point to updateblkno.
316  *
317  * stack->buffer is locked on entry, and is kept locked.
318  * Likewise for childbuf, if given.
319  */
320 static bool
ginPlaceToPage(GinBtree btree,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Buffer childbuf,GinStatsData * buildStats)321 ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
322 			   void *insertdata, BlockNumber updateblkno,
323 			   Buffer childbuf, GinStatsData *buildStats)
324 {
325 	Page		page = BufferGetPage(stack->buffer);
326 	bool		result;
327 	GinPlaceToPageRC rc;
328 	uint16		xlflags = 0;
329 	Page		childpage = NULL;
330 	Page		newlpage = NULL,
331 				newrpage = NULL;
332 	void	   *ptp_workspace = NULL;
333 	MemoryContext tmpCxt;
334 	MemoryContext oldCxt;
335 
336 	/*
337 	 * We do all the work of this function and its subfunctions in a temporary
338 	 * memory context.  This avoids leakages and simplifies APIs, since some
339 	 * subfunctions allocate storage that has to survive until we've finished
340 	 * the WAL insertion.
341 	 */
342 	tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
343 								   "ginPlaceToPage temporary context",
344 								   ALLOCSET_DEFAULT_SIZES);
345 	oldCxt = MemoryContextSwitchTo(tmpCxt);
346 
347 	if (GinPageIsData(page))
348 		xlflags |= GIN_INSERT_ISDATA;
349 	if (GinPageIsLeaf(page))
350 	{
351 		xlflags |= GIN_INSERT_ISLEAF;
352 		Assert(!BufferIsValid(childbuf));
353 		Assert(updateblkno == InvalidBlockNumber);
354 	}
355 	else
356 	{
357 		Assert(BufferIsValid(childbuf));
358 		Assert(updateblkno != InvalidBlockNumber);
359 		childpage = BufferGetPage(childbuf);
360 	}
361 
362 	/*
363 	 * See if the incoming tuple will fit on the page.  beginPlaceToPage will
364 	 * decide if the page needs to be split, and will compute the split
365 	 * contents if so.  See comments for beginPlaceToPage and execPlaceToPage
366 	 * functions for more details of the API here.
367 	 */
368 	rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
369 								 insertdata, updateblkno,
370 								 &ptp_workspace,
371 								 &newlpage, &newrpage);
372 
373 	if (rc == GPTP_NO_WORK)
374 	{
375 		/* Nothing to do */
376 		result = true;
377 	}
378 	else if (rc == GPTP_INSERT)
379 	{
380 		/* It will fit, perform the insertion */
381 		START_CRIT_SECTION();
382 
383 		if (RelationNeedsWAL(btree->index))
384 		{
385 			XLogBeginInsert();
386 			XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
387 			if (BufferIsValid(childbuf))
388 				XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
389 		}
390 
391 		/* Perform the page update, and register any extra WAL data */
392 		btree->execPlaceToPage(btree, stack->buffer, stack,
393 							   insertdata, updateblkno, ptp_workspace);
394 
395 		MarkBufferDirty(stack->buffer);
396 
397 		/* An insert to an internal page finishes the split of the child. */
398 		if (BufferIsValid(childbuf))
399 		{
400 			GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
401 			MarkBufferDirty(childbuf);
402 		}
403 
404 		if (RelationNeedsWAL(btree->index))
405 		{
406 			XLogRecPtr	recptr;
407 			ginxlogInsert xlrec;
408 			BlockIdData childblknos[2];
409 
410 			xlrec.flags = xlflags;
411 
412 			XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
413 
414 			/*
415 			 * Log information about child if this was an insertion of a
416 			 * downlink.
417 			 */
418 			if (BufferIsValid(childbuf))
419 			{
420 				BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
421 				BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
422 				XLogRegisterData((char *) childblknos,
423 								 sizeof(BlockIdData) * 2);
424 			}
425 
426 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
427 			PageSetLSN(page, recptr);
428 			if (BufferIsValid(childbuf))
429 				PageSetLSN(childpage, recptr);
430 		}
431 
432 		END_CRIT_SECTION();
433 
434 		/* Insertion is complete. */
435 		result = true;
436 	}
437 	else if (rc == GPTP_SPLIT)
438 	{
439 		/*
440 		 * Didn't fit, need to split.  The split has been computed in newlpage
441 		 * and newrpage, which are pointers to palloc'd pages, not associated
442 		 * with buffers.  stack->buffer is not touched yet.
443 		 */
444 		Buffer		rbuffer;
445 		BlockNumber savedRightLink;
446 		ginxlogSplit data;
447 		Buffer		lbuffer = InvalidBuffer;
448 		Page		newrootpg = NULL;
449 
450 		/* Get a new index page to become the right page */
451 		rbuffer = GinNewBuffer(btree->index);
452 
453 		/* During index build, count the new page */
454 		if (buildStats)
455 		{
456 			if (btree->isData)
457 				buildStats->nDataPages++;
458 			else
459 				buildStats->nEntryPages++;
460 		}
461 
462 		savedRightLink = GinPageGetOpaque(page)->rightlink;
463 
464 		/* Begin setting up WAL record */
465 		data.node = btree->index->rd_node;
466 		data.flags = xlflags;
467 		if (BufferIsValid(childbuf))
468 		{
469 			data.leftChildBlkno = BufferGetBlockNumber(childbuf);
470 			data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
471 		}
472 		else
473 			data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
474 
475 		if (stack->parent == NULL)
476 		{
477 			/*
478 			 * splitting the root, so we need to allocate new left page and
479 			 * place pointers to left and right page on root page.
480 			 */
481 			lbuffer = GinNewBuffer(btree->index);
482 
483 			/* During index build, count the new left page */
484 			if (buildStats)
485 			{
486 				if (btree->isData)
487 					buildStats->nDataPages++;
488 				else
489 					buildStats->nEntryPages++;
490 			}
491 
492 			data.rrlink = InvalidBlockNumber;
493 			data.flags |= GIN_SPLIT_ROOT;
494 
495 			GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
496 			GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
497 
498 			/*
499 			 * Construct a new root page containing downlinks to the new left
500 			 * and right pages.  (Do this in a temporary copy rather than
501 			 * overwriting the original page directly, since we're not in the
502 			 * critical section yet.)
503 			 */
504 			newrootpg = PageGetTempPage(newrpage);
505 			GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
506 
507 			btree->fillRoot(btree, newrootpg,
508 							BufferGetBlockNumber(lbuffer), newlpage,
509 							BufferGetBlockNumber(rbuffer), newrpage);
510 		}
511 		else
512 		{
513 			/* splitting a non-root page */
514 			data.rrlink = savedRightLink;
515 
516 			GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
517 			GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
518 			GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
519 		}
520 
521 		/*
522 		 * OK, we have the new contents of the left page in a temporary copy
523 		 * now (newlpage), and likewise for the new contents of the
524 		 * newly-allocated right block. The original page is still unchanged.
525 		 *
526 		 * If this is a root split, we also have a temporary page containing
527 		 * the new contents of the root.
528 		 */
529 
530 		START_CRIT_SECTION();
531 
532 		MarkBufferDirty(rbuffer);
533 		MarkBufferDirty(stack->buffer);
534 
535 		/*
536 		 * Restore the temporary copies over the real buffers.
537 		 */
538 		if (stack->parent == NULL)
539 		{
540 			/* Splitting the root, three pages to update */
541 			MarkBufferDirty(lbuffer);
542 			memcpy(page, newrootpg, BLCKSZ);
543 			memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
544 			memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
545 		}
546 		else
547 		{
548 			/* Normal split, only two pages to update */
549 			memcpy(page, newlpage, BLCKSZ);
550 			memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
551 		}
552 
553 		/* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
554 		if (BufferIsValid(childbuf))
555 		{
556 			GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
557 			MarkBufferDirty(childbuf);
558 		}
559 
560 		/* write WAL record */
561 		if (RelationNeedsWAL(btree->index))
562 		{
563 			XLogRecPtr	recptr;
564 
565 			XLogBeginInsert();
566 
567 			/*
568 			 * We just take full page images of all the split pages. Splits
569 			 * are uncommon enough that it's not worth complicating the code
570 			 * to be more efficient.
571 			 */
572 			if (stack->parent == NULL)
573 			{
574 				XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
575 				XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
576 				XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
577 			}
578 			else
579 			{
580 				XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
581 				XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
582 			}
583 			if (BufferIsValid(childbuf))
584 				XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
585 
586 			XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
587 
588 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
589 
590 			PageSetLSN(page, recptr);
591 			PageSetLSN(BufferGetPage(rbuffer), recptr);
592 			if (stack->parent == NULL)
593 				PageSetLSN(BufferGetPage(lbuffer), recptr);
594 			if (BufferIsValid(childbuf))
595 				PageSetLSN(childpage, recptr);
596 		}
597 		END_CRIT_SECTION();
598 
599 		/*
600 		 * We can release the locks/pins on the new pages now, but keep
601 		 * stack->buffer locked.  childbuf doesn't get unlocked either.
602 		 */
603 		UnlockReleaseBuffer(rbuffer);
604 		if (stack->parent == NULL)
605 			UnlockReleaseBuffer(lbuffer);
606 
607 		/*
608 		 * If we split the root, we're done. Otherwise the split is not
609 		 * complete until the downlink for the new page has been inserted to
610 		 * the parent.
611 		 */
612 		result = (stack->parent == NULL);
613 	}
614 	else
615 	{
616 		elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
617 		result = false;			/* keep compiler quiet */
618 	}
619 
620 	/* Clean up temp context */
621 	MemoryContextSwitchTo(oldCxt);
622 	MemoryContextDelete(tmpCxt);
623 
624 	return result;
625 }
626 
627 /*
628  * Finish a split by inserting the downlink for the new page to parent.
629  *
630  * On entry, stack->buffer is exclusively locked.
631  *
632  * If freestack is true, all the buffers are released and unlocked as we
633  * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
634  * locked, and stack is unmodified, except for possibly moving right to find
635  * the correct parent of page.
636  */
637 static void
ginFinishSplit(GinBtree btree,GinBtreeStack * stack,bool freestack,GinStatsData * buildStats)638 ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
639 			   GinStatsData *buildStats)
640 {
641 	Page		page;
642 	bool		done;
643 	bool		first = true;
644 
645 	/*
646 	 * freestack == false when we encounter an incompletely split page during
647 	 * a scan, while freestack == true is used in the normal scenario that a
648 	 * split is finished right after the initial insert.
649 	 */
650 	if (!freestack)
651 		elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
652 			 stack->blkno, RelationGetRelationName(btree->index));
653 
654 	/* this loop crawls up the stack until the insertion is complete */
655 	do
656 	{
657 		GinBtreeStack *parent = stack->parent;
658 		void	   *insertdata;
659 		BlockNumber updateblkno;
660 
661 		/* search parent to lock */
662 		LockBuffer(parent->buffer, GIN_EXCLUSIVE);
663 
664 		/*
665 		 * If the parent page was incompletely split, finish that split first,
666 		 * then continue with the current one.
667 		 *
668 		 * Note: we have to finish *all* incomplete splits we encounter, even
669 		 * if we have to move right. Otherwise we might choose as the target a
670 		 * page that has no downlink in the parent, and splitting it further
671 		 * would fail.
672 		 */
673 		if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
674 			ginFinishSplit(btree, parent, false, buildStats);
675 
676 		/* move right if it's needed */
677 		page = BufferGetPage(parent->buffer);
678 		while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
679 		{
680 			if (GinPageRightMost(page))
681 			{
682 				/*
683 				 * rightmost page, but we don't find parent, we should use
684 				 * plain search...
685 				 */
686 				LockBuffer(parent->buffer, GIN_UNLOCK);
687 				ginFindParents(btree, stack);
688 				parent = stack->parent;
689 				Assert(parent != NULL);
690 				break;
691 			}
692 
693 			parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
694 			parent->blkno = BufferGetBlockNumber(parent->buffer);
695 			page = BufferGetPage(parent->buffer);
696 
697 			if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
698 				ginFinishSplit(btree, parent, false, buildStats);
699 		}
700 
701 		/* insert the downlink */
702 		insertdata = btree->prepareDownlink(btree, stack->buffer);
703 		updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
704 		done = ginPlaceToPage(btree, parent,
705 							  insertdata, updateblkno,
706 							  stack->buffer, buildStats);
707 		pfree(insertdata);
708 
709 		/*
710 		 * If the caller requested to free the stack, unlock and release the
711 		 * child buffer now. Otherwise keep it pinned and locked, but if we
712 		 * have to recurse up the tree, we can unlock the upper pages, only
713 		 * keeping the page at the bottom of the stack locked.
714 		 */
715 		if (!first || freestack)
716 			LockBuffer(stack->buffer, GIN_UNLOCK);
717 		if (freestack)
718 		{
719 			ReleaseBuffer(stack->buffer);
720 			pfree(stack);
721 		}
722 		stack = parent;
723 
724 		first = false;
725 	} while (!done);
726 
727 	/* unlock the parent */
728 	LockBuffer(stack->buffer, GIN_UNLOCK);
729 
730 	if (freestack)
731 		freeGinBtreeStack(stack);
732 }
733 
734 /*
735  * Insert a value to tree described by stack.
736  *
737  * The value to be inserted is given in 'insertdata'. Its format depends
738  * on whether this is an entry or data tree, ginInsertValue just passes it
739  * through to the tree-specific callback function.
740  *
741  * During an index build, buildStats is non-null and the counters it contains
742  * are incremented as needed.
743  *
744  * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
745  */
746 void
ginInsertValue(GinBtree btree,GinBtreeStack * stack,void * insertdata,GinStatsData * buildStats)747 ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
748 			   GinStatsData *buildStats)
749 {
750 	bool		done;
751 
752 	/* If the leaf page was incompletely split, finish the split first */
753 	if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
754 		ginFinishSplit(btree, stack, false, buildStats);
755 
756 	done = ginPlaceToPage(btree, stack,
757 						  insertdata, InvalidBlockNumber,
758 						  InvalidBuffer, buildStats);
759 	if (done)
760 	{
761 		LockBuffer(stack->buffer, GIN_UNLOCK);
762 		freeGinBtreeStack(stack);
763 	}
764 	else
765 		ginFinishSplit(btree, stack, true, buildStats);
766 }
767