1 /*-------------------------------------------------------------------------
2  *
3  * ginbtree.c
4  *	  page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/ginbtree.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/xloginsert.h"
20 #include "storage/predicate.h"
21 #include "miscadmin.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24 
25 static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
26 static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
27 			   void *insertdata, BlockNumber updateblkno,
28 			   Buffer childbuf, GinStatsData *buildStats);
29 static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
30 			   bool freestack, GinStatsData *buildStats);
31 
32 /*
33  * Lock buffer by needed method for search.
34  */
35 int
ginTraverseLock(Buffer buffer,bool searchMode)36 ginTraverseLock(Buffer buffer, bool searchMode)
37 {
38 	Page		page;
39 	int			access = GIN_SHARE;
40 
41 	LockBuffer(buffer, GIN_SHARE);
42 	page = BufferGetPage(buffer);
43 	if (GinPageIsLeaf(page))
44 	{
45 		if (searchMode == false)
46 		{
47 			/* we should relock our page */
48 			LockBuffer(buffer, GIN_UNLOCK);
49 			LockBuffer(buffer, GIN_EXCLUSIVE);
50 
51 			/* But root can become non-leaf during relock */
52 			if (!GinPageIsLeaf(page))
53 			{
54 				/* restore old lock type (very rare) */
55 				LockBuffer(buffer, GIN_UNLOCK);
56 				LockBuffer(buffer, GIN_SHARE);
57 			}
58 			else
59 				access = GIN_EXCLUSIVE;
60 		}
61 	}
62 
63 	return access;
64 }
65 
66 /*
67  * Descend the tree to the leaf page that contains or would contain the key
68  * we're searching for. The key should already be filled in 'btree', in
69  * tree-type specific manner. If btree->fullScan is true, descends to the
70  * leftmost leaf page.
71  *
72  * If 'searchmode' is false, on return stack->buffer is exclusively locked,
73  * and the stack represents the full path to the root. Otherwise stack->buffer
74  * is share-locked, and stack->parent is NULL.
75  *
76  * If 'rootConflictCheck' is true, tree root is checked for serialization
77  * conflict.
78  */
79 GinBtreeStack *
ginFindLeafPage(GinBtree btree,bool searchMode,bool rootConflictCheck,Snapshot snapshot)80 ginFindLeafPage(GinBtree btree, bool searchMode,
81 				bool rootConflictCheck, Snapshot snapshot)
82 {
83 	GinBtreeStack *stack;
84 
85 	stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
86 	stack->blkno = btree->rootBlkno;
87 	stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
88 	stack->parent = NULL;
89 	stack->predictNumber = 1;
90 
91 	if (rootConflictCheck)
92 		CheckForSerializableConflictIn(btree->index, NULL, stack->buffer);
93 
94 	for (;;)
95 	{
96 		Page		page;
97 		BlockNumber child;
98 		int			access;
99 
100 		stack->off = InvalidOffsetNumber;
101 
102 		page = BufferGetPage(stack->buffer);
103 		TestForOldSnapshot(snapshot, btree->index, page);
104 
105 		access = ginTraverseLock(stack->buffer, searchMode);
106 
107 		/*
108 		 * If we're going to modify the tree, finish any incomplete splits we
109 		 * encounter on the way.
110 		 */
111 		if (!searchMode && GinPageIsIncompleteSplit(page))
112 			ginFinishSplit(btree, stack, false, NULL);
113 
114 		/*
115 		 * ok, page is correctly locked, we should check to move right ..,
116 		 * root never has a right link, so small optimization
117 		 */
118 		while (btree->fullScan == false && stack->blkno != btree->rootBlkno &&
119 			   btree->isMoveRight(btree, page))
120 		{
121 			BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
122 
123 			if (rightlink == InvalidBlockNumber)
124 				/* rightmost page */
125 				break;
126 
127 			stack->buffer = ginStepRight(stack->buffer, btree->index, access);
128 			stack->blkno = rightlink;
129 			page = BufferGetPage(stack->buffer);
130 			TestForOldSnapshot(snapshot, btree->index, page);
131 
132 			if (!searchMode && GinPageIsIncompleteSplit(page))
133 				ginFinishSplit(btree, stack, false, NULL);
134 		}
135 
136 		if (GinPageIsLeaf(page))	/* we found, return locked page */
137 			return stack;
138 
139 		/* now we have correct buffer, try to find child */
140 		child = btree->findChildPage(btree, stack);
141 
142 		LockBuffer(stack->buffer, GIN_UNLOCK);
143 		Assert(child != InvalidBlockNumber);
144 		Assert(stack->blkno != child);
145 
146 		if (searchMode)
147 		{
148 			/* in search mode we may forget path to leaf */
149 			stack->blkno = child;
150 			stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
151 		}
152 		else
153 		{
154 			GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
155 
156 			ptr->parent = stack;
157 			stack = ptr;
158 			stack->blkno = child;
159 			stack->buffer = ReadBuffer(btree->index, stack->blkno);
160 			stack->predictNumber = 1;
161 		}
162 	}
163 }
164 
165 /*
166  * Step right from current page.
167  *
168  * The next page is locked first, before releasing the current page. This is
169  * crucial to protect from concurrent page deletion (see comment in
170  * ginDeletePage).
171  */
172 Buffer
ginStepRight(Buffer buffer,Relation index,int lockmode)173 ginStepRight(Buffer buffer, Relation index, int lockmode)
174 {
175 	Buffer		nextbuffer;
176 	Page		page = BufferGetPage(buffer);
177 	bool		isLeaf = GinPageIsLeaf(page);
178 	bool		isData = GinPageIsData(page);
179 	BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
180 
181 	nextbuffer = ReadBuffer(index, blkno);
182 	LockBuffer(nextbuffer, lockmode);
183 	UnlockReleaseBuffer(buffer);
184 
185 	/* Sanity check that the page we stepped to is of similar kind. */
186 	page = BufferGetPage(nextbuffer);
187 	if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
188 		elog(ERROR, "right sibling of GIN page is of different type");
189 
190 	return nextbuffer;
191 }
192 
193 void
freeGinBtreeStack(GinBtreeStack * stack)194 freeGinBtreeStack(GinBtreeStack *stack)
195 {
196 	while (stack)
197 	{
198 		GinBtreeStack *tmp = stack->parent;
199 
200 		if (stack->buffer != InvalidBuffer)
201 			ReleaseBuffer(stack->buffer);
202 
203 		pfree(stack);
204 		stack = tmp;
205 	}
206 }
207 
208 /*
209  * Try to find parent for current stack position. Returns correct parent and
210  * child's offset in stack->parent. The root page is never released, to
211  * to prevent conflict with vacuum process.
212  */
213 static void
ginFindParents(GinBtree btree,GinBtreeStack * stack)214 ginFindParents(GinBtree btree, GinBtreeStack *stack)
215 {
216 	Page		page;
217 	Buffer		buffer;
218 	BlockNumber blkno,
219 				leftmostBlkno;
220 	OffsetNumber offset;
221 	GinBtreeStack *root;
222 	GinBtreeStack *ptr;
223 
224 	/*
225 	 * Unwind the stack all the way up to the root, leaving only the root
226 	 * item.
227 	 *
228 	 * Be careful not to release the pin on the root page! The pin on root
229 	 * page is required to lock out concurrent vacuums on the tree.
230 	 */
231 	root = stack->parent;
232 	while (root->parent)
233 	{
234 		ReleaseBuffer(root->buffer);
235 		root = root->parent;
236 	}
237 
238 	Assert(root->blkno == btree->rootBlkno);
239 	Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
240 	root->off = InvalidOffsetNumber;
241 
242 	blkno = root->blkno;
243 	buffer = root->buffer;
244 	offset = InvalidOffsetNumber;
245 
246 	ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
247 
248 	for (;;)
249 	{
250 		LockBuffer(buffer, GIN_EXCLUSIVE);
251 		page = BufferGetPage(buffer);
252 		if (GinPageIsLeaf(page))
253 			elog(ERROR, "Lost path");
254 
255 		if (GinPageIsIncompleteSplit(page))
256 		{
257 			Assert(blkno != btree->rootBlkno);
258 			ptr->blkno = blkno;
259 			ptr->buffer = buffer;
260 
261 			/*
262 			 * parent may be wrong, but if so, the ginFinishSplit call will
263 			 * recurse to call ginFindParents again to fix it.
264 			 */
265 			ptr->parent = root;
266 			ptr->off = InvalidOffsetNumber;
267 
268 			ginFinishSplit(btree, ptr, false, NULL);
269 		}
270 
271 		leftmostBlkno = btree->getLeftMostChild(btree, page);
272 
273 		while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
274 		{
275 			blkno = GinPageGetOpaque(page)->rightlink;
276 			if (blkno == InvalidBlockNumber)
277 			{
278 				UnlockReleaseBuffer(buffer);
279 				break;
280 			}
281 			buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
282 			page = BufferGetPage(buffer);
283 
284 			/* finish any incomplete splits, as above */
285 			if (GinPageIsIncompleteSplit(page))
286 			{
287 				Assert(blkno != btree->rootBlkno);
288 				ptr->blkno = blkno;
289 				ptr->buffer = buffer;
290 				ptr->parent = root;
291 				ptr->off = InvalidOffsetNumber;
292 
293 				ginFinishSplit(btree, ptr, false, NULL);
294 			}
295 		}
296 
297 		if (blkno != InvalidBlockNumber)
298 		{
299 			ptr->blkno = blkno;
300 			ptr->buffer = buffer;
301 			ptr->parent = root; /* it may be wrong, but in next call we will
302 								 * correct */
303 			ptr->off = offset;
304 			stack->parent = ptr;
305 			return;
306 		}
307 
308 		/* Descend down to next level */
309 		blkno = leftmostBlkno;
310 		buffer = ReadBuffer(btree->index, blkno);
311 	}
312 }
313 
314 /*
315  * Insert a new item to a page.
316  *
317  * Returns true if the insertion was finished. On false, the page was split and
318  * the parent needs to be updated. (A root split returns true as it doesn't
319  * need any further action by the caller to complete.)
320  *
321  * When inserting a downlink to an internal page, 'childbuf' contains the
322  * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
323  * atomically with the insert. Also, the existing item at offset stack->off
324  * in the target page is updated to point to updateblkno.
325  *
326  * stack->buffer is locked on entry, and is kept locked.
327  * Likewise for childbuf, if given.
328  */
329 static bool
ginPlaceToPage(GinBtree btree,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Buffer childbuf,GinStatsData * buildStats)330 ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
331 			   void *insertdata, BlockNumber updateblkno,
332 			   Buffer childbuf, GinStatsData *buildStats)
333 {
334 	Page		page = BufferGetPage(stack->buffer);
335 	bool		result;
336 	GinPlaceToPageRC rc;
337 	uint16		xlflags = 0;
338 	Page		childpage = NULL;
339 	Page		newlpage = NULL,
340 				newrpage = NULL;
341 	void	   *ptp_workspace = NULL;
342 	MemoryContext tmpCxt;
343 	MemoryContext oldCxt;
344 
345 	/*
346 	 * We do all the work of this function and its subfunctions in a temporary
347 	 * memory context.  This avoids leakages and simplifies APIs, since some
348 	 * subfunctions allocate storage that has to survive until we've finished
349 	 * the WAL insertion.
350 	 */
351 	tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
352 								   "ginPlaceToPage temporary context",
353 								   ALLOCSET_DEFAULT_SIZES);
354 	oldCxt = MemoryContextSwitchTo(tmpCxt);
355 
356 	if (GinPageIsData(page))
357 		xlflags |= GIN_INSERT_ISDATA;
358 	if (GinPageIsLeaf(page))
359 	{
360 		xlflags |= GIN_INSERT_ISLEAF;
361 		Assert(!BufferIsValid(childbuf));
362 		Assert(updateblkno == InvalidBlockNumber);
363 	}
364 	else
365 	{
366 		Assert(BufferIsValid(childbuf));
367 		Assert(updateblkno != InvalidBlockNumber);
368 		childpage = BufferGetPage(childbuf);
369 	}
370 
371 	/*
372 	 * See if the incoming tuple will fit on the page.  beginPlaceToPage will
373 	 * decide if the page needs to be split, and will compute the split
374 	 * contents if so.  See comments for beginPlaceToPage and execPlaceToPage
375 	 * functions for more details of the API here.
376 	 */
377 	rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
378 								 insertdata, updateblkno,
379 								 &ptp_workspace,
380 								 &newlpage, &newrpage);
381 
382 	if (rc == GPTP_NO_WORK)
383 	{
384 		/* Nothing to do */
385 		result = true;
386 	}
387 	else if (rc == GPTP_INSERT)
388 	{
389 		/* It will fit, perform the insertion */
390 		START_CRIT_SECTION();
391 
392 		if (RelationNeedsWAL(btree->index))
393 		{
394 			XLogBeginInsert();
395 			XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
396 			if (BufferIsValid(childbuf))
397 				XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
398 		}
399 
400 		/* Perform the page update, and register any extra WAL data */
401 		btree->execPlaceToPage(btree, stack->buffer, stack,
402 							   insertdata, updateblkno, ptp_workspace);
403 
404 		MarkBufferDirty(stack->buffer);
405 
406 		/* An insert to an internal page finishes the split of the child. */
407 		if (BufferIsValid(childbuf))
408 		{
409 			GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
410 			MarkBufferDirty(childbuf);
411 		}
412 
413 		if (RelationNeedsWAL(btree->index))
414 		{
415 			XLogRecPtr	recptr;
416 			ginxlogInsert xlrec;
417 			BlockIdData childblknos[2];
418 
419 			xlrec.flags = xlflags;
420 
421 			XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
422 
423 			/*
424 			 * Log information about child if this was an insertion of a
425 			 * downlink.
426 			 */
427 			if (BufferIsValid(childbuf))
428 			{
429 				BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
430 				BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
431 				XLogRegisterData((char *) childblknos,
432 								 sizeof(BlockIdData) * 2);
433 			}
434 
435 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
436 			PageSetLSN(page, recptr);
437 			if (BufferIsValid(childbuf))
438 				PageSetLSN(childpage, recptr);
439 		}
440 
441 		END_CRIT_SECTION();
442 
443 		/* Insertion is complete. */
444 		result = true;
445 	}
446 	else if (rc == GPTP_SPLIT)
447 	{
448 		/*
449 		 * Didn't fit, need to split.  The split has been computed in newlpage
450 		 * and newrpage, which are pointers to palloc'd pages, not associated
451 		 * with buffers.  stack->buffer is not touched yet.
452 		 */
453 		Buffer		rbuffer;
454 		BlockNumber savedRightLink;
455 		ginxlogSplit data;
456 		Buffer		lbuffer = InvalidBuffer;
457 		Page		newrootpg = NULL;
458 
459 		/* Get a new index page to become the right page */
460 		rbuffer = GinNewBuffer(btree->index);
461 
462 		/* During index build, count the new page */
463 		if (buildStats)
464 		{
465 			if (btree->isData)
466 				buildStats->nDataPages++;
467 			else
468 				buildStats->nEntryPages++;
469 		}
470 
471 		savedRightLink = GinPageGetOpaque(page)->rightlink;
472 
473 		/* Begin setting up WAL record */
474 		data.node = btree->index->rd_node;
475 		data.flags = xlflags;
476 		if (BufferIsValid(childbuf))
477 		{
478 			data.leftChildBlkno = BufferGetBlockNumber(childbuf);
479 			data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
480 		}
481 		else
482 			data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
483 
484 		if (stack->parent == NULL)
485 		{
486 			/*
487 			 * splitting the root, so we need to allocate new left page and
488 			 * place pointers to left and right page on root page.
489 			 */
490 			lbuffer = GinNewBuffer(btree->index);
491 
492 			/* During index build, count the new left page */
493 			if (buildStats)
494 			{
495 				if (btree->isData)
496 					buildStats->nDataPages++;
497 				else
498 					buildStats->nEntryPages++;
499 			}
500 
501 			data.rrlink = InvalidBlockNumber;
502 			data.flags |= GIN_SPLIT_ROOT;
503 
504 			GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
505 			GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
506 
507 			/*
508 			 * Construct a new root page containing downlinks to the new left
509 			 * and right pages.  (Do this in a temporary copy rather than
510 			 * overwriting the original page directly, since we're not in the
511 			 * critical section yet.)
512 			 */
513 			newrootpg = PageGetTempPage(newrpage);
514 			GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
515 
516 			btree->fillRoot(btree, newrootpg,
517 							BufferGetBlockNumber(lbuffer), newlpage,
518 							BufferGetBlockNumber(rbuffer), newrpage);
519 
520 			if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
521 			{
522 
523 				PredicateLockPageSplit(btree->index,
524 									   BufferGetBlockNumber(stack->buffer),
525 									   BufferGetBlockNumber(lbuffer));
526 
527 				PredicateLockPageSplit(btree->index,
528 									   BufferGetBlockNumber(stack->buffer),
529 									   BufferGetBlockNumber(rbuffer));
530 			}
531 
532 		}
533 		else
534 		{
535 			/* splitting a non-root page */
536 			data.rrlink = savedRightLink;
537 
538 			GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
539 			GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
540 			GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
541 
542 			if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
543 			{
544 
545 				PredicateLockPageSplit(btree->index,
546 									   BufferGetBlockNumber(stack->buffer),
547 									   BufferGetBlockNumber(rbuffer));
548 			}
549 		}
550 
551 		/*
552 		 * OK, we have the new contents of the left page in a temporary copy
553 		 * now (newlpage), and likewise for the new contents of the
554 		 * newly-allocated right block. The original page is still unchanged.
555 		 *
556 		 * If this is a root split, we also have a temporary page containing
557 		 * the new contents of the root.
558 		 */
559 
560 		START_CRIT_SECTION();
561 
562 		MarkBufferDirty(rbuffer);
563 		MarkBufferDirty(stack->buffer);
564 
565 		/*
566 		 * Restore the temporary copies over the real buffers.
567 		 */
568 		if (stack->parent == NULL)
569 		{
570 			/* Splitting the root, three pages to update */
571 			MarkBufferDirty(lbuffer);
572 			memcpy(page, newrootpg, BLCKSZ);
573 			memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
574 			memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
575 		}
576 		else
577 		{
578 			/* Normal split, only two pages to update */
579 			memcpy(page, newlpage, BLCKSZ);
580 			memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
581 		}
582 
583 		/* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
584 		if (BufferIsValid(childbuf))
585 		{
586 			GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
587 			MarkBufferDirty(childbuf);
588 		}
589 
590 		/* write WAL record */
591 		if (RelationNeedsWAL(btree->index))
592 		{
593 			XLogRecPtr	recptr;
594 
595 			XLogBeginInsert();
596 
597 			/*
598 			 * We just take full page images of all the split pages. Splits
599 			 * are uncommon enough that it's not worth complicating the code
600 			 * to be more efficient.
601 			 */
602 			if (stack->parent == NULL)
603 			{
604 				XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
605 				XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
606 				XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
607 			}
608 			else
609 			{
610 				XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
611 				XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
612 			}
613 			if (BufferIsValid(childbuf))
614 				XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
615 
616 			XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
617 
618 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
619 
620 			PageSetLSN(page, recptr);
621 			PageSetLSN(BufferGetPage(rbuffer), recptr);
622 			if (stack->parent == NULL)
623 				PageSetLSN(BufferGetPage(lbuffer), recptr);
624 			if (BufferIsValid(childbuf))
625 				PageSetLSN(childpage, recptr);
626 		}
627 		END_CRIT_SECTION();
628 
629 		/*
630 		 * We can release the locks/pins on the new pages now, but keep
631 		 * stack->buffer locked.  childbuf doesn't get unlocked either.
632 		 */
633 		UnlockReleaseBuffer(rbuffer);
634 		if (stack->parent == NULL)
635 			UnlockReleaseBuffer(lbuffer);
636 
637 		/*
638 		 * If we split the root, we're done. Otherwise the split is not
639 		 * complete until the downlink for the new page has been inserted to
640 		 * the parent.
641 		 */
642 		result = (stack->parent == NULL);
643 	}
644 	else
645 	{
646 		elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
647 		result = false;			/* keep compiler quiet */
648 	}
649 
650 	/* Clean up temp context */
651 	MemoryContextSwitchTo(oldCxt);
652 	MemoryContextDelete(tmpCxt);
653 
654 	return result;
655 }
656 
657 /*
658  * Finish a split by inserting the downlink for the new page to parent.
659  *
660  * On entry, stack->buffer is exclusively locked.
661  *
662  * If freestack is true, all the buffers are released and unlocked as we
663  * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
664  * locked, and stack is unmodified, except for possibly moving right to find
665  * the correct parent of page.
666  */
667 static void
ginFinishSplit(GinBtree btree,GinBtreeStack * stack,bool freestack,GinStatsData * buildStats)668 ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
669 			   GinStatsData *buildStats)
670 {
671 	Page		page;
672 	bool		done;
673 	bool		first = true;
674 
675 	/*
676 	 * freestack == false when we encounter an incompletely split page during
677 	 * a scan, while freestack == true is used in the normal scenario that a
678 	 * split is finished right after the initial insert.
679 	 */
680 	if (!freestack)
681 		elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
682 			 stack->blkno, RelationGetRelationName(btree->index));
683 
684 	/* this loop crawls up the stack until the insertion is complete */
685 	do
686 	{
687 		GinBtreeStack *parent = stack->parent;
688 		void	   *insertdata;
689 		BlockNumber updateblkno;
690 
691 		/* search parent to lock */
692 		LockBuffer(parent->buffer, GIN_EXCLUSIVE);
693 
694 		/*
695 		 * If the parent page was incompletely split, finish that split first,
696 		 * then continue with the current one.
697 		 *
698 		 * Note: we have to finish *all* incomplete splits we encounter, even
699 		 * if we have to move right. Otherwise we might choose as the target a
700 		 * page that has no downlink in the parent, and splitting it further
701 		 * would fail.
702 		 */
703 		if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
704 			ginFinishSplit(btree, parent, false, buildStats);
705 
706 		/* move right if it's needed */
707 		page = BufferGetPage(parent->buffer);
708 		while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
709 		{
710 			if (GinPageRightMost(page))
711 			{
712 				/*
713 				 * rightmost page, but we don't find parent, we should use
714 				 * plain search...
715 				 */
716 				LockBuffer(parent->buffer, GIN_UNLOCK);
717 				ginFindParents(btree, stack);
718 				parent = stack->parent;
719 				Assert(parent != NULL);
720 				break;
721 			}
722 
723 			parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
724 			parent->blkno = BufferGetBlockNumber(parent->buffer);
725 			page = BufferGetPage(parent->buffer);
726 
727 			if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
728 				ginFinishSplit(btree, parent, false, buildStats);
729 		}
730 
731 		/* insert the downlink */
732 		insertdata = btree->prepareDownlink(btree, stack->buffer);
733 		updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
734 		done = ginPlaceToPage(btree, parent,
735 							  insertdata, updateblkno,
736 							  stack->buffer, buildStats);
737 		pfree(insertdata);
738 
739 		/*
740 		 * If the caller requested to free the stack, unlock and release the
741 		 * child buffer now. Otherwise keep it pinned and locked, but if we
742 		 * have to recurse up the tree, we can unlock the upper pages, only
743 		 * keeping the page at the bottom of the stack locked.
744 		 */
745 		if (!first || freestack)
746 			LockBuffer(stack->buffer, GIN_UNLOCK);
747 		if (freestack)
748 		{
749 			ReleaseBuffer(stack->buffer);
750 			pfree(stack);
751 		}
752 		stack = parent;
753 
754 		first = false;
755 	} while (!done);
756 
757 	/* unlock the parent */
758 	LockBuffer(stack->buffer, GIN_UNLOCK);
759 
760 	if (freestack)
761 		freeGinBtreeStack(stack);
762 }
763 
764 /*
765  * Insert a value to tree described by stack.
766  *
767  * The value to be inserted is given in 'insertdata'. Its format depends
768  * on whether this is an entry or data tree, ginInsertValue just passes it
769  * through to the tree-specific callback function.
770  *
771  * During an index build, buildStats is non-null and the counters it contains
772  * are incremented as needed.
773  *
774  * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
775  */
776 void
ginInsertValue(GinBtree btree,GinBtreeStack * stack,void * insertdata,GinStatsData * buildStats)777 ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
778 			   GinStatsData *buildStats)
779 {
780 	bool		done;
781 
782 	/* If the leaf page was incompletely split, finish the split first */
783 	if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
784 		ginFinishSplit(btree, stack, false, buildStats);
785 
786 	done = ginPlaceToPage(btree, stack,
787 						  insertdata, InvalidBlockNumber,
788 						  InvalidBuffer, buildStats);
789 	if (done)
790 	{
791 		LockBuffer(stack->buffer, GIN_UNLOCK);
792 		freeGinBtreeStack(stack);
793 	}
794 	else
795 		ginFinishSplit(btree, stack, true, buildStats);
796 }
797