1 /*-------------------------------------------------------------------------
2 *
3 * ginbtree.c
4 * page utilities routines for the postgres inverted index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gin/ginbtree.c
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/xloginsert.h"
20 #include "storage/predicate.h"
21 #include "miscadmin.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24
25 static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
26 static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
27 void *insertdata, BlockNumber updateblkno,
28 Buffer childbuf, GinStatsData *buildStats);
29 static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
30 bool freestack, GinStatsData *buildStats);
31
32 /*
33 * Lock buffer by needed method for search.
34 */
35 int
ginTraverseLock(Buffer buffer,bool searchMode)36 ginTraverseLock(Buffer buffer, bool searchMode)
37 {
38 Page page;
39 int access = GIN_SHARE;
40
41 LockBuffer(buffer, GIN_SHARE);
42 page = BufferGetPage(buffer);
43 if (GinPageIsLeaf(page))
44 {
45 if (searchMode == false)
46 {
47 /* we should relock our page */
48 LockBuffer(buffer, GIN_UNLOCK);
49 LockBuffer(buffer, GIN_EXCLUSIVE);
50
51 /* But root can become non-leaf during relock */
52 if (!GinPageIsLeaf(page))
53 {
54 /* restore old lock type (very rare) */
55 LockBuffer(buffer, GIN_UNLOCK);
56 LockBuffer(buffer, GIN_SHARE);
57 }
58 else
59 access = GIN_EXCLUSIVE;
60 }
61 }
62
63 return access;
64 }
65
66 /*
67 * Descend the tree to the leaf page that contains or would contain the key
68 * we're searching for. The key should already be filled in 'btree', in
69 * tree-type specific manner. If btree->fullScan is true, descends to the
70 * leftmost leaf page.
71 *
72 * If 'searchmode' is false, on return stack->buffer is exclusively locked,
73 * and the stack represents the full path to the root. Otherwise stack->buffer
74 * is share-locked, and stack->parent is NULL.
75 *
76 * If 'rootConflictCheck' is true, tree root is checked for serialization
77 * conflict.
78 */
79 GinBtreeStack *
ginFindLeafPage(GinBtree btree,bool searchMode,bool rootConflictCheck,Snapshot snapshot)80 ginFindLeafPage(GinBtree btree, bool searchMode,
81 bool rootConflictCheck, Snapshot snapshot)
82 {
83 GinBtreeStack *stack;
84
85 stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
86 stack->blkno = btree->rootBlkno;
87 stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
88 stack->parent = NULL;
89 stack->predictNumber = 1;
90
91 if (rootConflictCheck)
92 CheckForSerializableConflictIn(btree->index, NULL, stack->buffer);
93
94 for (;;)
95 {
96 Page page;
97 BlockNumber child;
98 int access;
99
100 stack->off = InvalidOffsetNumber;
101
102 page = BufferGetPage(stack->buffer);
103 TestForOldSnapshot(snapshot, btree->index, page);
104
105 access = ginTraverseLock(stack->buffer, searchMode);
106
107 /*
108 * If we're going to modify the tree, finish any incomplete splits we
109 * encounter on the way.
110 */
111 if (!searchMode && GinPageIsIncompleteSplit(page))
112 ginFinishSplit(btree, stack, false, NULL);
113
114 /*
115 * ok, page is correctly locked, we should check to move right ..,
116 * root never has a right link, so small optimization
117 */
118 while (btree->fullScan == false && stack->blkno != btree->rootBlkno &&
119 btree->isMoveRight(btree, page))
120 {
121 BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
122
123 if (rightlink == InvalidBlockNumber)
124 /* rightmost page */
125 break;
126
127 stack->buffer = ginStepRight(stack->buffer, btree->index, access);
128 stack->blkno = rightlink;
129 page = BufferGetPage(stack->buffer);
130 TestForOldSnapshot(snapshot, btree->index, page);
131
132 if (!searchMode && GinPageIsIncompleteSplit(page))
133 ginFinishSplit(btree, stack, false, NULL);
134 }
135
136 if (GinPageIsLeaf(page)) /* we found, return locked page */
137 return stack;
138
139 /* now we have correct buffer, try to find child */
140 child = btree->findChildPage(btree, stack);
141
142 LockBuffer(stack->buffer, GIN_UNLOCK);
143 Assert(child != InvalidBlockNumber);
144 Assert(stack->blkno != child);
145
146 if (searchMode)
147 {
148 /* in search mode we may forget path to leaf */
149 stack->blkno = child;
150 stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
151 }
152 else
153 {
154 GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
155
156 ptr->parent = stack;
157 stack = ptr;
158 stack->blkno = child;
159 stack->buffer = ReadBuffer(btree->index, stack->blkno);
160 stack->predictNumber = 1;
161 }
162 }
163 }
164
165 /*
166 * Step right from current page.
167 *
168 * The next page is locked first, before releasing the current page. This is
169 * crucial to protect from concurrent page deletion (see comment in
170 * ginDeletePage).
171 */
172 Buffer
ginStepRight(Buffer buffer,Relation index,int lockmode)173 ginStepRight(Buffer buffer, Relation index, int lockmode)
174 {
175 Buffer nextbuffer;
176 Page page = BufferGetPage(buffer);
177 bool isLeaf = GinPageIsLeaf(page);
178 bool isData = GinPageIsData(page);
179 BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
180
181 nextbuffer = ReadBuffer(index, blkno);
182 LockBuffer(nextbuffer, lockmode);
183 UnlockReleaseBuffer(buffer);
184
185 /* Sanity check that the page we stepped to is of similar kind. */
186 page = BufferGetPage(nextbuffer);
187 if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
188 elog(ERROR, "right sibling of GIN page is of different type");
189
190 return nextbuffer;
191 }
192
193 void
freeGinBtreeStack(GinBtreeStack * stack)194 freeGinBtreeStack(GinBtreeStack *stack)
195 {
196 while (stack)
197 {
198 GinBtreeStack *tmp = stack->parent;
199
200 if (stack->buffer != InvalidBuffer)
201 ReleaseBuffer(stack->buffer);
202
203 pfree(stack);
204 stack = tmp;
205 }
206 }
207
208 /*
209 * Try to find parent for current stack position. Returns correct parent and
210 * child's offset in stack->parent. The root page is never released, to
211 * to prevent conflict with vacuum process.
212 */
213 static void
ginFindParents(GinBtree btree,GinBtreeStack * stack)214 ginFindParents(GinBtree btree, GinBtreeStack *stack)
215 {
216 Page page;
217 Buffer buffer;
218 BlockNumber blkno,
219 leftmostBlkno;
220 OffsetNumber offset;
221 GinBtreeStack *root;
222 GinBtreeStack *ptr;
223
224 /*
225 * Unwind the stack all the way up to the root, leaving only the root
226 * item.
227 *
228 * Be careful not to release the pin on the root page! The pin on root
229 * page is required to lock out concurrent vacuums on the tree.
230 */
231 root = stack->parent;
232 while (root->parent)
233 {
234 ReleaseBuffer(root->buffer);
235 root = root->parent;
236 }
237
238 Assert(root->blkno == btree->rootBlkno);
239 Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
240 root->off = InvalidOffsetNumber;
241
242 blkno = root->blkno;
243 buffer = root->buffer;
244 offset = InvalidOffsetNumber;
245
246 ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
247
248 for (;;)
249 {
250 LockBuffer(buffer, GIN_EXCLUSIVE);
251 page = BufferGetPage(buffer);
252 if (GinPageIsLeaf(page))
253 elog(ERROR, "Lost path");
254
255 if (GinPageIsIncompleteSplit(page))
256 {
257 Assert(blkno != btree->rootBlkno);
258 ptr->blkno = blkno;
259 ptr->buffer = buffer;
260
261 /*
262 * parent may be wrong, but if so, the ginFinishSplit call will
263 * recurse to call ginFindParents again to fix it.
264 */
265 ptr->parent = root;
266 ptr->off = InvalidOffsetNumber;
267
268 ginFinishSplit(btree, ptr, false, NULL);
269 }
270
271 leftmostBlkno = btree->getLeftMostChild(btree, page);
272
273 while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
274 {
275 blkno = GinPageGetOpaque(page)->rightlink;
276 if (blkno == InvalidBlockNumber)
277 {
278 UnlockReleaseBuffer(buffer);
279 break;
280 }
281 buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
282 page = BufferGetPage(buffer);
283
284 /* finish any incomplete splits, as above */
285 if (GinPageIsIncompleteSplit(page))
286 {
287 Assert(blkno != btree->rootBlkno);
288 ptr->blkno = blkno;
289 ptr->buffer = buffer;
290 ptr->parent = root;
291 ptr->off = InvalidOffsetNumber;
292
293 ginFinishSplit(btree, ptr, false, NULL);
294 }
295 }
296
297 if (blkno != InvalidBlockNumber)
298 {
299 ptr->blkno = blkno;
300 ptr->buffer = buffer;
301 ptr->parent = root; /* it may be wrong, but in next call we will
302 * correct */
303 ptr->off = offset;
304 stack->parent = ptr;
305 return;
306 }
307
308 /* Descend down to next level */
309 blkno = leftmostBlkno;
310 buffer = ReadBuffer(btree->index, blkno);
311 }
312 }
313
314 /*
315 * Insert a new item to a page.
316 *
317 * Returns true if the insertion was finished. On false, the page was split and
318 * the parent needs to be updated. (A root split returns true as it doesn't
319 * need any further action by the caller to complete.)
320 *
321 * When inserting a downlink to an internal page, 'childbuf' contains the
322 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
323 * atomically with the insert. Also, the existing item at offset stack->off
324 * in the target page is updated to point to updateblkno.
325 *
326 * stack->buffer is locked on entry, and is kept locked.
327 * Likewise for childbuf, if given.
328 */
329 static bool
ginPlaceToPage(GinBtree btree,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Buffer childbuf,GinStatsData * buildStats)330 ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
331 void *insertdata, BlockNumber updateblkno,
332 Buffer childbuf, GinStatsData *buildStats)
333 {
334 Page page = BufferGetPage(stack->buffer);
335 bool result;
336 GinPlaceToPageRC rc;
337 uint16 xlflags = 0;
338 Page childpage = NULL;
339 Page newlpage = NULL,
340 newrpage = NULL;
341 void *ptp_workspace = NULL;
342 MemoryContext tmpCxt;
343 MemoryContext oldCxt;
344
345 /*
346 * We do all the work of this function and its subfunctions in a temporary
347 * memory context. This avoids leakages and simplifies APIs, since some
348 * subfunctions allocate storage that has to survive until we've finished
349 * the WAL insertion.
350 */
351 tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
352 "ginPlaceToPage temporary context",
353 ALLOCSET_DEFAULT_SIZES);
354 oldCxt = MemoryContextSwitchTo(tmpCxt);
355
356 if (GinPageIsData(page))
357 xlflags |= GIN_INSERT_ISDATA;
358 if (GinPageIsLeaf(page))
359 {
360 xlflags |= GIN_INSERT_ISLEAF;
361 Assert(!BufferIsValid(childbuf));
362 Assert(updateblkno == InvalidBlockNumber);
363 }
364 else
365 {
366 Assert(BufferIsValid(childbuf));
367 Assert(updateblkno != InvalidBlockNumber);
368 childpage = BufferGetPage(childbuf);
369 }
370
371 /*
372 * See if the incoming tuple will fit on the page. beginPlaceToPage will
373 * decide if the page needs to be split, and will compute the split
374 * contents if so. See comments for beginPlaceToPage and execPlaceToPage
375 * functions for more details of the API here.
376 */
377 rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
378 insertdata, updateblkno,
379 &ptp_workspace,
380 &newlpage, &newrpage);
381
382 if (rc == GPTP_NO_WORK)
383 {
384 /* Nothing to do */
385 result = true;
386 }
387 else if (rc == GPTP_INSERT)
388 {
389 /* It will fit, perform the insertion */
390 START_CRIT_SECTION();
391
392 if (RelationNeedsWAL(btree->index))
393 {
394 XLogBeginInsert();
395 XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
396 if (BufferIsValid(childbuf))
397 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
398 }
399
400 /* Perform the page update, and register any extra WAL data */
401 btree->execPlaceToPage(btree, stack->buffer, stack,
402 insertdata, updateblkno, ptp_workspace);
403
404 MarkBufferDirty(stack->buffer);
405
406 /* An insert to an internal page finishes the split of the child. */
407 if (BufferIsValid(childbuf))
408 {
409 GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
410 MarkBufferDirty(childbuf);
411 }
412
413 if (RelationNeedsWAL(btree->index))
414 {
415 XLogRecPtr recptr;
416 ginxlogInsert xlrec;
417 BlockIdData childblknos[2];
418
419 xlrec.flags = xlflags;
420
421 XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
422
423 /*
424 * Log information about child if this was an insertion of a
425 * downlink.
426 */
427 if (BufferIsValid(childbuf))
428 {
429 BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
430 BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
431 XLogRegisterData((char *) childblknos,
432 sizeof(BlockIdData) * 2);
433 }
434
435 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
436 PageSetLSN(page, recptr);
437 if (BufferIsValid(childbuf))
438 PageSetLSN(childpage, recptr);
439 }
440
441 END_CRIT_SECTION();
442
443 /* Insertion is complete. */
444 result = true;
445 }
446 else if (rc == GPTP_SPLIT)
447 {
448 /*
449 * Didn't fit, need to split. The split has been computed in newlpage
450 * and newrpage, which are pointers to palloc'd pages, not associated
451 * with buffers. stack->buffer is not touched yet.
452 */
453 Buffer rbuffer;
454 BlockNumber savedRightLink;
455 ginxlogSplit data;
456 Buffer lbuffer = InvalidBuffer;
457 Page newrootpg = NULL;
458
459 /* Get a new index page to become the right page */
460 rbuffer = GinNewBuffer(btree->index);
461
462 /* During index build, count the new page */
463 if (buildStats)
464 {
465 if (btree->isData)
466 buildStats->nDataPages++;
467 else
468 buildStats->nEntryPages++;
469 }
470
471 savedRightLink = GinPageGetOpaque(page)->rightlink;
472
473 /* Begin setting up WAL record */
474 data.node = btree->index->rd_node;
475 data.flags = xlflags;
476 if (BufferIsValid(childbuf))
477 {
478 data.leftChildBlkno = BufferGetBlockNumber(childbuf);
479 data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
480 }
481 else
482 data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
483
484 if (stack->parent == NULL)
485 {
486 /*
487 * splitting the root, so we need to allocate new left page and
488 * place pointers to left and right page on root page.
489 */
490 lbuffer = GinNewBuffer(btree->index);
491
492 /* During index build, count the new left page */
493 if (buildStats)
494 {
495 if (btree->isData)
496 buildStats->nDataPages++;
497 else
498 buildStats->nEntryPages++;
499 }
500
501 data.rrlink = InvalidBlockNumber;
502 data.flags |= GIN_SPLIT_ROOT;
503
504 GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
505 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
506
507 /*
508 * Construct a new root page containing downlinks to the new left
509 * and right pages. (Do this in a temporary copy rather than
510 * overwriting the original page directly, since we're not in the
511 * critical section yet.)
512 */
513 newrootpg = PageGetTempPage(newrpage);
514 GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
515
516 btree->fillRoot(btree, newrootpg,
517 BufferGetBlockNumber(lbuffer), newlpage,
518 BufferGetBlockNumber(rbuffer), newrpage);
519
520 if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
521 {
522
523 PredicateLockPageSplit(btree->index,
524 BufferGetBlockNumber(stack->buffer),
525 BufferGetBlockNumber(lbuffer));
526
527 PredicateLockPageSplit(btree->index,
528 BufferGetBlockNumber(stack->buffer),
529 BufferGetBlockNumber(rbuffer));
530 }
531
532 }
533 else
534 {
535 /* splitting a non-root page */
536 data.rrlink = savedRightLink;
537
538 GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
539 GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
540 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
541
542 if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
543 {
544
545 PredicateLockPageSplit(btree->index,
546 BufferGetBlockNumber(stack->buffer),
547 BufferGetBlockNumber(rbuffer));
548 }
549 }
550
551 /*
552 * OK, we have the new contents of the left page in a temporary copy
553 * now (newlpage), and likewise for the new contents of the
554 * newly-allocated right block. The original page is still unchanged.
555 *
556 * If this is a root split, we also have a temporary page containing
557 * the new contents of the root.
558 */
559
560 START_CRIT_SECTION();
561
562 MarkBufferDirty(rbuffer);
563 MarkBufferDirty(stack->buffer);
564
565 /*
566 * Restore the temporary copies over the real buffers.
567 */
568 if (stack->parent == NULL)
569 {
570 /* Splitting the root, three pages to update */
571 MarkBufferDirty(lbuffer);
572 memcpy(page, newrootpg, BLCKSZ);
573 memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
574 memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
575 }
576 else
577 {
578 /* Normal split, only two pages to update */
579 memcpy(page, newlpage, BLCKSZ);
580 memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
581 }
582
583 /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
584 if (BufferIsValid(childbuf))
585 {
586 GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
587 MarkBufferDirty(childbuf);
588 }
589
590 /* write WAL record */
591 if (RelationNeedsWAL(btree->index))
592 {
593 XLogRecPtr recptr;
594
595 XLogBeginInsert();
596
597 /*
598 * We just take full page images of all the split pages. Splits
599 * are uncommon enough that it's not worth complicating the code
600 * to be more efficient.
601 */
602 if (stack->parent == NULL)
603 {
604 XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
605 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
606 XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
607 }
608 else
609 {
610 XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
611 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
612 }
613 if (BufferIsValid(childbuf))
614 XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
615
616 XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
617
618 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
619
620 PageSetLSN(page, recptr);
621 PageSetLSN(BufferGetPage(rbuffer), recptr);
622 if (stack->parent == NULL)
623 PageSetLSN(BufferGetPage(lbuffer), recptr);
624 if (BufferIsValid(childbuf))
625 PageSetLSN(childpage, recptr);
626 }
627 END_CRIT_SECTION();
628
629 /*
630 * We can release the locks/pins on the new pages now, but keep
631 * stack->buffer locked. childbuf doesn't get unlocked either.
632 */
633 UnlockReleaseBuffer(rbuffer);
634 if (stack->parent == NULL)
635 UnlockReleaseBuffer(lbuffer);
636
637 /*
638 * If we split the root, we're done. Otherwise the split is not
639 * complete until the downlink for the new page has been inserted to
640 * the parent.
641 */
642 result = (stack->parent == NULL);
643 }
644 else
645 {
646 elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
647 result = false; /* keep compiler quiet */
648 }
649
650 /* Clean up temp context */
651 MemoryContextSwitchTo(oldCxt);
652 MemoryContextDelete(tmpCxt);
653
654 return result;
655 }
656
657 /*
658 * Finish a split by inserting the downlink for the new page to parent.
659 *
660 * On entry, stack->buffer is exclusively locked.
661 *
662 * If freestack is true, all the buffers are released and unlocked as we
663 * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
664 * locked, and stack is unmodified, except for possibly moving right to find
665 * the correct parent of page.
666 */
667 static void
ginFinishSplit(GinBtree btree,GinBtreeStack * stack,bool freestack,GinStatsData * buildStats)668 ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
669 GinStatsData *buildStats)
670 {
671 Page page;
672 bool done;
673 bool first = true;
674
675 /*
676 * freestack == false when we encounter an incompletely split page during
677 * a scan, while freestack == true is used in the normal scenario that a
678 * split is finished right after the initial insert.
679 */
680 if (!freestack)
681 elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
682 stack->blkno, RelationGetRelationName(btree->index));
683
684 /* this loop crawls up the stack until the insertion is complete */
685 do
686 {
687 GinBtreeStack *parent = stack->parent;
688 void *insertdata;
689 BlockNumber updateblkno;
690
691 /* search parent to lock */
692 LockBuffer(parent->buffer, GIN_EXCLUSIVE);
693
694 /*
695 * If the parent page was incompletely split, finish that split first,
696 * then continue with the current one.
697 *
698 * Note: we have to finish *all* incomplete splits we encounter, even
699 * if we have to move right. Otherwise we might choose as the target a
700 * page that has no downlink in the parent, and splitting it further
701 * would fail.
702 */
703 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
704 ginFinishSplit(btree, parent, false, buildStats);
705
706 /* move right if it's needed */
707 page = BufferGetPage(parent->buffer);
708 while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
709 {
710 if (GinPageRightMost(page))
711 {
712 /*
713 * rightmost page, but we don't find parent, we should use
714 * plain search...
715 */
716 LockBuffer(parent->buffer, GIN_UNLOCK);
717 ginFindParents(btree, stack);
718 parent = stack->parent;
719 Assert(parent != NULL);
720 break;
721 }
722
723 parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
724 parent->blkno = BufferGetBlockNumber(parent->buffer);
725 page = BufferGetPage(parent->buffer);
726
727 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
728 ginFinishSplit(btree, parent, false, buildStats);
729 }
730
731 /* insert the downlink */
732 insertdata = btree->prepareDownlink(btree, stack->buffer);
733 updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
734 done = ginPlaceToPage(btree, parent,
735 insertdata, updateblkno,
736 stack->buffer, buildStats);
737 pfree(insertdata);
738
739 /*
740 * If the caller requested to free the stack, unlock and release the
741 * child buffer now. Otherwise keep it pinned and locked, but if we
742 * have to recurse up the tree, we can unlock the upper pages, only
743 * keeping the page at the bottom of the stack locked.
744 */
745 if (!first || freestack)
746 LockBuffer(stack->buffer, GIN_UNLOCK);
747 if (freestack)
748 {
749 ReleaseBuffer(stack->buffer);
750 pfree(stack);
751 }
752 stack = parent;
753
754 first = false;
755 } while (!done);
756
757 /* unlock the parent */
758 LockBuffer(stack->buffer, GIN_UNLOCK);
759
760 if (freestack)
761 freeGinBtreeStack(stack);
762 }
763
764 /*
765 * Insert a value to tree described by stack.
766 *
767 * The value to be inserted is given in 'insertdata'. Its format depends
768 * on whether this is an entry or data tree, ginInsertValue just passes it
769 * through to the tree-specific callback function.
770 *
771 * During an index build, buildStats is non-null and the counters it contains
772 * are incremented as needed.
773 *
774 * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
775 */
776 void
ginInsertValue(GinBtree btree,GinBtreeStack * stack,void * insertdata,GinStatsData * buildStats)777 ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
778 GinStatsData *buildStats)
779 {
780 bool done;
781
782 /* If the leaf page was incompletely split, finish the split first */
783 if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
784 ginFinishSplit(btree, stack, false, buildStats);
785
786 done = ginPlaceToPage(btree, stack,
787 insertdata, InvalidBlockNumber,
788 InvalidBuffer, buildStats);
789 if (done)
790 {
791 LockBuffer(stack->buffer, GIN_UNLOCK);
792 freeGinBtreeStack(stack);
793 }
794 else
795 ginFinishSplit(btree, stack, true, buildStats);
796 }
797