1 /*-------------------------------------------------------------------------
2 *
3 * ginbtree.c
4 * page utilities routines for the postgres inverted index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gin/ginbtree.c
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "access/xloginsert.h"
19 #include "miscadmin.h"
20 #include "utils/memutils.h"
21 #include "utils/rel.h"
22
23 static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
24 static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
25 void *insertdata, BlockNumber updateblkno,
26 Buffer childbuf, GinStatsData *buildStats);
27 static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
28 bool freestack, GinStatsData *buildStats);
29
30 /*
31 * Lock buffer by needed method for search.
32 */
33 static int
ginTraverseLock(Buffer buffer,bool searchMode)34 ginTraverseLock(Buffer buffer, bool searchMode)
35 {
36 Page page;
37 int access = GIN_SHARE;
38
39 LockBuffer(buffer, GIN_SHARE);
40 page = BufferGetPage(buffer);
41 if (GinPageIsLeaf(page))
42 {
43 if (searchMode == FALSE)
44 {
45 /* we should relock our page */
46 LockBuffer(buffer, GIN_UNLOCK);
47 LockBuffer(buffer, GIN_EXCLUSIVE);
48
49 /* But root can become non-leaf during relock */
50 if (!GinPageIsLeaf(page))
51 {
52 /* restore old lock type (very rare) */
53 LockBuffer(buffer, GIN_UNLOCK);
54 LockBuffer(buffer, GIN_SHARE);
55 }
56 else
57 access = GIN_EXCLUSIVE;
58 }
59 }
60
61 return access;
62 }
63
64 /*
65 * Descend the tree to the leaf page that contains or would contain the key
66 * we're searching for. The key should already be filled in 'btree', in
67 * tree-type specific manner. If btree->fullScan is true, descends to the
68 * leftmost leaf page.
69 *
70 * If 'searchmode' is false, on return stack->buffer is exclusively locked,
71 * and the stack represents the full path to the root. Otherwise stack->buffer
72 * is share-locked, and stack->parent is NULL.
73 */
74 GinBtreeStack *
ginFindLeafPage(GinBtree btree,bool searchMode,Snapshot snapshot)75 ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
76 {
77 GinBtreeStack *stack;
78
79 stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
80 stack->blkno = btree->rootBlkno;
81 stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
82 stack->parent = NULL;
83 stack->predictNumber = 1;
84
85 for (;;)
86 {
87 Page page;
88 BlockNumber child;
89 int access;
90
91 stack->off = InvalidOffsetNumber;
92
93 page = BufferGetPage(stack->buffer);
94 TestForOldSnapshot(snapshot, btree->index, page);
95
96 access = ginTraverseLock(stack->buffer, searchMode);
97
98 /*
99 * If we're going to modify the tree, finish any incomplete splits we
100 * encounter on the way.
101 */
102 if (!searchMode && GinPageIsIncompleteSplit(page))
103 ginFinishSplit(btree, stack, false, NULL);
104
105 /*
106 * ok, page is correctly locked, we should check to move right ..,
107 * root never has a right link, so small optimization
108 */
109 while (btree->fullScan == FALSE && stack->blkno != btree->rootBlkno &&
110 btree->isMoveRight(btree, page))
111 {
112 BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
113
114 if (rightlink == InvalidBlockNumber)
115 /* rightmost page */
116 break;
117
118 stack->buffer = ginStepRight(stack->buffer, btree->index, access);
119 stack->blkno = rightlink;
120 page = BufferGetPage(stack->buffer);
121 TestForOldSnapshot(snapshot, btree->index, page);
122
123 if (!searchMode && GinPageIsIncompleteSplit(page))
124 ginFinishSplit(btree, stack, false, NULL);
125 }
126
127 if (GinPageIsLeaf(page)) /* we found, return locked page */
128 return stack;
129
130 /* now we have correct buffer, try to find child */
131 child = btree->findChildPage(btree, stack);
132
133 LockBuffer(stack->buffer, GIN_UNLOCK);
134 Assert(child != InvalidBlockNumber);
135 Assert(stack->blkno != child);
136
137 if (searchMode)
138 {
139 /* in search mode we may forget path to leaf */
140 stack->blkno = child;
141 stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
142 }
143 else
144 {
145 GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
146
147 ptr->parent = stack;
148 stack = ptr;
149 stack->blkno = child;
150 stack->buffer = ReadBuffer(btree->index, stack->blkno);
151 stack->predictNumber = 1;
152 }
153 }
154 }
155
156 /*
157 * Step right from current page.
158 *
159 * The next page is locked first, before releasing the current page. This is
160 * crucial to protect from concurrent page deletion (see comment in
161 * ginDeletePage).
162 */
163 Buffer
ginStepRight(Buffer buffer,Relation index,int lockmode)164 ginStepRight(Buffer buffer, Relation index, int lockmode)
165 {
166 Buffer nextbuffer;
167 Page page = BufferGetPage(buffer);
168 bool isLeaf = GinPageIsLeaf(page);
169 bool isData = GinPageIsData(page);
170 BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
171
172 nextbuffer = ReadBuffer(index, blkno);
173 LockBuffer(nextbuffer, lockmode);
174 UnlockReleaseBuffer(buffer);
175
176 /* Sanity check that the page we stepped to is of similar kind. */
177 page = BufferGetPage(nextbuffer);
178 if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
179 elog(ERROR, "right sibling of GIN page is of different type");
180
181 return nextbuffer;
182 }
183
184 void
freeGinBtreeStack(GinBtreeStack * stack)185 freeGinBtreeStack(GinBtreeStack *stack)
186 {
187 while (stack)
188 {
189 GinBtreeStack *tmp = stack->parent;
190
191 if (stack->buffer != InvalidBuffer)
192 ReleaseBuffer(stack->buffer);
193
194 pfree(stack);
195 stack = tmp;
196 }
197 }
198
199 /*
200 * Try to find parent for current stack position. Returns correct parent and
201 * child's offset in stack->parent. The root page is never released, to
202 * to prevent conflict with vacuum process.
203 */
204 static void
ginFindParents(GinBtree btree,GinBtreeStack * stack)205 ginFindParents(GinBtree btree, GinBtreeStack *stack)
206 {
207 Page page;
208 Buffer buffer;
209 BlockNumber blkno,
210 leftmostBlkno;
211 OffsetNumber offset;
212 GinBtreeStack *root;
213 GinBtreeStack *ptr;
214
215 /*
216 * Unwind the stack all the way up to the root, leaving only the root
217 * item.
218 *
219 * Be careful not to release the pin on the root page! The pin on root
220 * page is required to lock out concurrent vacuums on the tree.
221 */
222 root = stack->parent;
223 while (root->parent)
224 {
225 ReleaseBuffer(root->buffer);
226 root = root->parent;
227 }
228
229 Assert(root->blkno == btree->rootBlkno);
230 Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
231 root->off = InvalidOffsetNumber;
232
233 blkno = root->blkno;
234 buffer = root->buffer;
235 offset = InvalidOffsetNumber;
236
237 ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
238
239 for (;;)
240 {
241 LockBuffer(buffer, GIN_EXCLUSIVE);
242 page = BufferGetPage(buffer);
243 if (GinPageIsLeaf(page))
244 elog(ERROR, "Lost path");
245
246 if (GinPageIsIncompleteSplit(page))
247 {
248 Assert(blkno != btree->rootBlkno);
249 ptr->blkno = blkno;
250 ptr->buffer = buffer;
251
252 /*
253 * parent may be wrong, but if so, the ginFinishSplit call will
254 * recurse to call ginFindParents again to fix it.
255 */
256 ptr->parent = root;
257 ptr->off = InvalidOffsetNumber;
258
259 ginFinishSplit(btree, ptr, false, NULL);
260 }
261
262 leftmostBlkno = btree->getLeftMostChild(btree, page);
263
264 while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
265 {
266 blkno = GinPageGetOpaque(page)->rightlink;
267 if (blkno == InvalidBlockNumber)
268 {
269 UnlockReleaseBuffer(buffer);
270 break;
271 }
272 buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
273 page = BufferGetPage(buffer);
274
275 /* finish any incomplete splits, as above */
276 if (GinPageIsIncompleteSplit(page))
277 {
278 Assert(blkno != btree->rootBlkno);
279 ptr->blkno = blkno;
280 ptr->buffer = buffer;
281 ptr->parent = root;
282 ptr->off = InvalidOffsetNumber;
283
284 ginFinishSplit(btree, ptr, false, NULL);
285 }
286 }
287
288 if (blkno != InvalidBlockNumber)
289 {
290 ptr->blkno = blkno;
291 ptr->buffer = buffer;
292 ptr->parent = root; /* it may be wrong, but in next call we will
293 * correct */
294 ptr->off = offset;
295 stack->parent = ptr;
296 return;
297 }
298
299 /* Descend down to next level */
300 blkno = leftmostBlkno;
301 buffer = ReadBuffer(btree->index, blkno);
302 }
303 }
304
305 /*
306 * Insert a new item to a page.
307 *
308 * Returns true if the insertion was finished. On false, the page was split and
309 * the parent needs to be updated. (A root split returns true as it doesn't
310 * need any further action by the caller to complete.)
311 *
312 * When inserting a downlink to an internal page, 'childbuf' contains the
313 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
314 * atomically with the insert. Also, the existing item at offset stack->off
315 * in the target page is updated to point to updateblkno.
316 *
317 * stack->buffer is locked on entry, and is kept locked.
318 * Likewise for childbuf, if given.
319 */
320 static bool
ginPlaceToPage(GinBtree btree,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Buffer childbuf,GinStatsData * buildStats)321 ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
322 void *insertdata, BlockNumber updateblkno,
323 Buffer childbuf, GinStatsData *buildStats)
324 {
325 Page page = BufferGetPage(stack->buffer);
326 bool result;
327 GinPlaceToPageRC rc;
328 uint16 xlflags = 0;
329 Page childpage = NULL;
330 Page newlpage = NULL,
331 newrpage = NULL;
332 void *ptp_workspace = NULL;
333 MemoryContext tmpCxt;
334 MemoryContext oldCxt;
335
336 /*
337 * We do all the work of this function and its subfunctions in a temporary
338 * memory context. This avoids leakages and simplifies APIs, since some
339 * subfunctions allocate storage that has to survive until we've finished
340 * the WAL insertion.
341 */
342 tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
343 "ginPlaceToPage temporary context",
344 ALLOCSET_DEFAULT_SIZES);
345 oldCxt = MemoryContextSwitchTo(tmpCxt);
346
347 if (GinPageIsData(page))
348 xlflags |= GIN_INSERT_ISDATA;
349 if (GinPageIsLeaf(page))
350 {
351 xlflags |= GIN_INSERT_ISLEAF;
352 Assert(!BufferIsValid(childbuf));
353 Assert(updateblkno == InvalidBlockNumber);
354 }
355 else
356 {
357 Assert(BufferIsValid(childbuf));
358 Assert(updateblkno != InvalidBlockNumber);
359 childpage = BufferGetPage(childbuf);
360 }
361
362 /*
363 * See if the incoming tuple will fit on the page. beginPlaceToPage will
364 * decide if the page needs to be split, and will compute the split
365 * contents if so. See comments for beginPlaceToPage and execPlaceToPage
366 * functions for more details of the API here.
367 */
368 rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
369 insertdata, updateblkno,
370 &ptp_workspace,
371 &newlpage, &newrpage);
372
373 if (rc == GPTP_NO_WORK)
374 {
375 /* Nothing to do */
376 result = true;
377 }
378 else if (rc == GPTP_INSERT)
379 {
380 /* It will fit, perform the insertion */
381 START_CRIT_SECTION();
382
383 if (RelationNeedsWAL(btree->index))
384 {
385 XLogBeginInsert();
386 XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
387 if (BufferIsValid(childbuf))
388 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
389 }
390
391 /* Perform the page update, and register any extra WAL data */
392 btree->execPlaceToPage(btree, stack->buffer, stack,
393 insertdata, updateblkno, ptp_workspace);
394
395 MarkBufferDirty(stack->buffer);
396
397 /* An insert to an internal page finishes the split of the child. */
398 if (BufferIsValid(childbuf))
399 {
400 GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
401 MarkBufferDirty(childbuf);
402 }
403
404 if (RelationNeedsWAL(btree->index))
405 {
406 XLogRecPtr recptr;
407 ginxlogInsert xlrec;
408 BlockIdData childblknos[2];
409
410 xlrec.flags = xlflags;
411
412 XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
413
414 /*
415 * Log information about child if this was an insertion of a
416 * downlink.
417 */
418 if (BufferIsValid(childbuf))
419 {
420 BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
421 BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
422 XLogRegisterData((char *) childblknos,
423 sizeof(BlockIdData) * 2);
424 }
425
426 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
427 PageSetLSN(page, recptr);
428 if (BufferIsValid(childbuf))
429 PageSetLSN(childpage, recptr);
430 }
431
432 END_CRIT_SECTION();
433
434 /* Insertion is complete. */
435 result = true;
436 }
437 else if (rc == GPTP_SPLIT)
438 {
439 /*
440 * Didn't fit, need to split. The split has been computed in newlpage
441 * and newrpage, which are pointers to palloc'd pages, not associated
442 * with buffers. stack->buffer is not touched yet.
443 */
444 Buffer rbuffer;
445 BlockNumber savedRightLink;
446 ginxlogSplit data;
447 Buffer lbuffer = InvalidBuffer;
448 Page newrootpg = NULL;
449
450 /* Get a new index page to become the right page */
451 rbuffer = GinNewBuffer(btree->index);
452
453 /* During index build, count the new page */
454 if (buildStats)
455 {
456 if (btree->isData)
457 buildStats->nDataPages++;
458 else
459 buildStats->nEntryPages++;
460 }
461
462 savedRightLink = GinPageGetOpaque(page)->rightlink;
463
464 /* Begin setting up WAL record */
465 data.node = btree->index->rd_node;
466 data.flags = xlflags;
467 if (BufferIsValid(childbuf))
468 {
469 data.leftChildBlkno = BufferGetBlockNumber(childbuf);
470 data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
471 }
472 else
473 data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
474
475 if (stack->parent == NULL)
476 {
477 /*
478 * splitting the root, so we need to allocate new left page and
479 * place pointers to left and right page on root page.
480 */
481 lbuffer = GinNewBuffer(btree->index);
482
483 /* During index build, count the new left page */
484 if (buildStats)
485 {
486 if (btree->isData)
487 buildStats->nDataPages++;
488 else
489 buildStats->nEntryPages++;
490 }
491
492 data.rrlink = InvalidBlockNumber;
493 data.flags |= GIN_SPLIT_ROOT;
494
495 GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
496 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
497
498 /*
499 * Construct a new root page containing downlinks to the new left
500 * and right pages. (Do this in a temporary copy rather than
501 * overwriting the original page directly, since we're not in the
502 * critical section yet.)
503 */
504 newrootpg = PageGetTempPage(newrpage);
505 GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
506
507 btree->fillRoot(btree, newrootpg,
508 BufferGetBlockNumber(lbuffer), newlpage,
509 BufferGetBlockNumber(rbuffer), newrpage);
510 }
511 else
512 {
513 /* splitting a non-root page */
514 data.rrlink = savedRightLink;
515
516 GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
517 GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
518 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
519 }
520
521 /*
522 * OK, we have the new contents of the left page in a temporary copy
523 * now (newlpage), and likewise for the new contents of the
524 * newly-allocated right block. The original page is still unchanged.
525 *
526 * If this is a root split, we also have a temporary page containing
527 * the new contents of the root.
528 */
529
530 START_CRIT_SECTION();
531
532 MarkBufferDirty(rbuffer);
533 MarkBufferDirty(stack->buffer);
534
535 /*
536 * Restore the temporary copies over the real buffers.
537 */
538 if (stack->parent == NULL)
539 {
540 /* Splitting the root, three pages to update */
541 MarkBufferDirty(lbuffer);
542 memcpy(page, newrootpg, BLCKSZ);
543 memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
544 memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
545 }
546 else
547 {
548 /* Normal split, only two pages to update */
549 memcpy(page, newlpage, BLCKSZ);
550 memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
551 }
552
553 /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
554 if (BufferIsValid(childbuf))
555 {
556 GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
557 MarkBufferDirty(childbuf);
558 }
559
560 /* write WAL record */
561 if (RelationNeedsWAL(btree->index))
562 {
563 XLogRecPtr recptr;
564
565 XLogBeginInsert();
566
567 /*
568 * We just take full page images of all the split pages. Splits
569 * are uncommon enough that it's not worth complicating the code
570 * to be more efficient.
571 */
572 if (stack->parent == NULL)
573 {
574 XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
575 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
576 XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
577 }
578 else
579 {
580 XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
581 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
582 }
583 if (BufferIsValid(childbuf))
584 XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
585
586 XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
587
588 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
589
590 PageSetLSN(page, recptr);
591 PageSetLSN(BufferGetPage(rbuffer), recptr);
592 if (stack->parent == NULL)
593 PageSetLSN(BufferGetPage(lbuffer), recptr);
594 if (BufferIsValid(childbuf))
595 PageSetLSN(childpage, recptr);
596 }
597 END_CRIT_SECTION();
598
599 /*
600 * We can release the locks/pins on the new pages now, but keep
601 * stack->buffer locked. childbuf doesn't get unlocked either.
602 */
603 UnlockReleaseBuffer(rbuffer);
604 if (stack->parent == NULL)
605 UnlockReleaseBuffer(lbuffer);
606
607 /*
608 * If we split the root, we're done. Otherwise the split is not
609 * complete until the downlink for the new page has been inserted to
610 * the parent.
611 */
612 result = (stack->parent == NULL);
613 }
614 else
615 {
616 elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
617 result = false; /* keep compiler quiet */
618 }
619
620 /* Clean up temp context */
621 MemoryContextSwitchTo(oldCxt);
622 MemoryContextDelete(tmpCxt);
623
624 return result;
625 }
626
627 /*
628 * Finish a split by inserting the downlink for the new page to parent.
629 *
630 * On entry, stack->buffer is exclusively locked.
631 *
632 * If freestack is true, all the buffers are released and unlocked as we
633 * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
634 * locked, and stack is unmodified, except for possibly moving right to find
635 * the correct parent of page.
636 */
637 static void
ginFinishSplit(GinBtree btree,GinBtreeStack * stack,bool freestack,GinStatsData * buildStats)638 ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
639 GinStatsData *buildStats)
640 {
641 Page page;
642 bool done;
643 bool first = true;
644
645 /*
646 * freestack == false when we encounter an incompletely split page during
647 * a scan, while freestack == true is used in the normal scenario that a
648 * split is finished right after the initial insert.
649 */
650 if (!freestack)
651 elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
652 stack->blkno, RelationGetRelationName(btree->index));
653
654 /* this loop crawls up the stack until the insertion is complete */
655 do
656 {
657 GinBtreeStack *parent = stack->parent;
658 void *insertdata;
659 BlockNumber updateblkno;
660
661 /* search parent to lock */
662 LockBuffer(parent->buffer, GIN_EXCLUSIVE);
663
664 /*
665 * If the parent page was incompletely split, finish that split first,
666 * then continue with the current one.
667 *
668 * Note: we have to finish *all* incomplete splits we encounter, even
669 * if we have to move right. Otherwise we might choose as the target a
670 * page that has no downlink in the parent, and splitting it further
671 * would fail.
672 */
673 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
674 ginFinishSplit(btree, parent, false, buildStats);
675
676 /* move right if it's needed */
677 page = BufferGetPage(parent->buffer);
678 while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
679 {
680 if (GinPageRightMost(page))
681 {
682 /*
683 * rightmost page, but we don't find parent, we should use
684 * plain search...
685 */
686 LockBuffer(parent->buffer, GIN_UNLOCK);
687 ginFindParents(btree, stack);
688 parent = stack->parent;
689 Assert(parent != NULL);
690 break;
691 }
692
693 parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
694 parent->blkno = BufferGetBlockNumber(parent->buffer);
695 page = BufferGetPage(parent->buffer);
696
697 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
698 ginFinishSplit(btree, parent, false, buildStats);
699 }
700
701 /* insert the downlink */
702 insertdata = btree->prepareDownlink(btree, stack->buffer);
703 updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
704 done = ginPlaceToPage(btree, parent,
705 insertdata, updateblkno,
706 stack->buffer, buildStats);
707 pfree(insertdata);
708
709 /*
710 * If the caller requested to free the stack, unlock and release the
711 * child buffer now. Otherwise keep it pinned and locked, but if we
712 * have to recurse up the tree, we can unlock the upper pages, only
713 * keeping the page at the bottom of the stack locked.
714 */
715 if (!first || freestack)
716 LockBuffer(stack->buffer, GIN_UNLOCK);
717 if (freestack)
718 {
719 ReleaseBuffer(stack->buffer);
720 pfree(stack);
721 }
722 stack = parent;
723
724 first = false;
725 } while (!done);
726
727 /* unlock the parent */
728 LockBuffer(stack->buffer, GIN_UNLOCK);
729
730 if (freestack)
731 freeGinBtreeStack(stack);
732 }
733
734 /*
735 * Insert a value to tree described by stack.
736 *
737 * The value to be inserted is given in 'insertdata'. Its format depends
738 * on whether this is an entry or data tree, ginInsertValue just passes it
739 * through to the tree-specific callback function.
740 *
741 * During an index build, buildStats is non-null and the counters it contains
742 * are incremented as needed.
743 *
744 * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
745 */
746 void
ginInsertValue(GinBtree btree,GinBtreeStack * stack,void * insertdata,GinStatsData * buildStats)747 ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
748 GinStatsData *buildStats)
749 {
750 bool done;
751
752 /* If the leaf page was incompletely split, finish the split first */
753 if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
754 ginFinishSplit(btree, stack, false, buildStats);
755
756 done = ginPlaceToPage(btree, stack,
757 insertdata, InvalidBlockNumber,
758 InvalidBuffer, buildStats);
759 if (done)
760 {
761 LockBuffer(stack->buffer, GIN_UNLOCK);
762 freeGinBtreeStack(stack);
763 }
764 else
765 ginFinishSplit(btree, stack, true, buildStats);
766 }
767