1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *	  routines for handling GIN posting tree pages.
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/gindatapage.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/xloginsert.h"
20 #include "lib/ilist.h"
21 #include "miscadmin.h"
22 #include "utils/rel.h"
23 
24 /*
25  * Min, Max and Target size of posting lists stored on leaf pages, in bytes.
26  *
27  * The code can deal with any size, but random access is more efficient when
28  * a number of smaller lists are stored, rather than one big list. If a
29  * posting list would become larger than Max size as a result of insertions,
30  * it is split into two. If a posting list would be smaller than minimum
31  * size, it is merged with the next posting list.
32  */
33 #define GinPostingListSegmentMaxSize 384
34 #define GinPostingListSegmentTargetSize 256
35 #define GinPostingListSegmentMinSize 128
36 
37 /*
38  * At least this many items fit in a GinPostingListSegmentMaxSize-bytes
39  * long segment. This is used when estimating how much space is required
40  * for N items, at minimum.
41  */
42 #define MinTuplesPerSegment ((GinPostingListSegmentMaxSize - 2) / 6)
43 
44 /*
45  * A working struct for manipulating a posting tree leaf page.
46  */
47 typedef struct
48 {
49 	dlist_head	segments;		/* a list of leafSegmentInfos */
50 
51 	/*
52 	 * The following fields represent how the segments are split across pages,
53 	 * if a page split is required. Filled in by leafRepackItems.
54 	 */
55 	dlist_node *lastleft;		/* last segment on left page */
56 	int			lsize;			/* total size on left page */
57 	int			rsize;			/* total size on right page */
58 
59 	bool		oldformat;		/* page is in pre-9.4 format on disk */
60 
61 	/*
62 	 * If we need WAL data representing the reconstructed leaf page, it's
63 	 * stored here by computeLeafRecompressWALData.
64 	 */
65 	char	   *walinfo;		/* buffer start */
66 	int			walinfolen;		/* and length */
67 } disassembledLeaf;
68 
69 typedef struct
70 {
71 	dlist_node	node;			/* linked list pointers */
72 
73 	/*-------------
74 	 * 'action' indicates the status of this in-memory segment, compared to
75 	 * what's on disk. It is one of the GIN_SEGMENT_* action codes:
76 	 *
77 	 * UNMODIFIED	no changes
78 	 * DELETE		the segment is to be removed. 'seg' and 'items' are
79 	 *				ignored
80 	 * INSERT		this is a completely new segment
81 	 * REPLACE		this replaces an existing segment with new content
82 	 * ADDITEMS		like REPLACE, but no items have been removed, and we track
83 	 *				in detail what items have been added to this segment, in
84 	 *				'modifieditems'
85 	 *-------------
86 	 */
87 	char		action;
88 
89 	ItemPointerData *modifieditems;
90 	uint16		nmodifieditems;
91 
92 	/*
93 	 * The following fields represent the items in this segment. If 'items' is
94 	 * not NULL, it contains a palloc'd array of the itemsin this segment. If
95 	 * 'seg' is not NULL, it contains the items in an already-compressed
96 	 * format. It can point to an on-disk page (!modified), or a palloc'd
97 	 * segment in memory. If both are set, they must represent the same items.
98 	 */
99 	GinPostingList *seg;
100 	ItemPointer items;
101 	int			nitems;			/* # of items in 'items', if items != NULL */
102 } leafSegmentInfo;
103 
104 static ItemPointer dataLeafPageGetUncompressed(Page page, int *nitems);
105 static void dataSplitPageInternal(GinBtree btree, Buffer origbuf,
106 					  GinBtreeStack *stack,
107 					  void *insertdata, BlockNumber updateblkno,
108 					  Page *newlpage, Page *newrpage);
109 
110 static disassembledLeaf *disassembleLeaf(Page page);
111 static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining);
112 static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
113 			   int nNewItems);
114 
115 static void computeLeafRecompressWALData(disassembledLeaf *leaf);
116 static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
117 static void dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
118 						 ItemPointerData lbound, ItemPointerData rbound,
119 						 Page lpage, Page rpage);
120 
121 /*
122  * Read TIDs from leaf data page to single uncompressed array. The TIDs are
123  * returned in ascending order.
124  *
125  * advancePast is a hint, indicating that the caller is only interested in
126  * TIDs > advancePast. To return all items, use ItemPointerSetMin.
127  *
128  * Note: This function can still return items smaller than advancePast that
129  * are in the same posting list as the items of interest, so the caller must
130  * still check all the returned items. But passing it allows this function to
131  * skip whole posting lists.
132  */
133 ItemPointer
GinDataLeafPageGetItems(Page page,int * nitems,ItemPointerData advancePast)134 GinDataLeafPageGetItems(Page page, int *nitems, ItemPointerData advancePast)
135 {
136 	ItemPointer result;
137 
138 	if (GinPageIsCompressed(page))
139 	{
140 		GinPostingList *seg = GinDataLeafPageGetPostingList(page);
141 		Size		len = GinDataLeafPageGetPostingListSize(page);
142 		Pointer		endptr = ((Pointer) seg) + len;
143 		GinPostingList *next;
144 
145 		/* Skip to the segment containing advancePast+1 */
146 		if (ItemPointerIsValid(&advancePast))
147 		{
148 			next = GinNextPostingListSegment(seg);
149 			while ((Pointer) next < endptr &&
150 				   ginCompareItemPointers(&next->first, &advancePast) <= 0)
151 			{
152 				seg = next;
153 				next = GinNextPostingListSegment(seg);
154 			}
155 			len = endptr - (Pointer) seg;
156 		}
157 
158 		if (len > 0)
159 			result = ginPostingListDecodeAllSegments(seg, len, nitems);
160 		else
161 		{
162 			result = NULL;
163 			*nitems = 0;
164 		}
165 	}
166 	else
167 	{
168 		ItemPointer tmp = dataLeafPageGetUncompressed(page, nitems);
169 
170 		result = palloc((*nitems) * sizeof(ItemPointerData));
171 		memcpy(result, tmp, (*nitems) * sizeof(ItemPointerData));
172 	}
173 
174 	return result;
175 }
176 
177 /*
178  * Places all TIDs from leaf data page to bitmap.
179  */
180 int
GinDataLeafPageGetItemsToTbm(Page page,TIDBitmap * tbm)181 GinDataLeafPageGetItemsToTbm(Page page, TIDBitmap *tbm)
182 {
183 	ItemPointer uncompressed;
184 	int			nitems;
185 
186 	if (GinPageIsCompressed(page))
187 	{
188 		GinPostingList *segment = GinDataLeafPageGetPostingList(page);
189 		Size		len = GinDataLeafPageGetPostingListSize(page);
190 
191 		nitems = ginPostingListDecodeAllSegmentsToTbm(segment, len, tbm);
192 	}
193 	else
194 	{
195 		uncompressed = dataLeafPageGetUncompressed(page, &nitems);
196 
197 		if (nitems > 0)
198 			tbm_add_tuples(tbm, uncompressed, nitems, false);
199 	}
200 
201 	return nitems;
202 }
203 
204 /*
205  * Get pointer to the uncompressed array of items on a pre-9.4 format
206  * uncompressed leaf page. The number of items in the array is returned in
207  * *nitems.
208  */
209 static ItemPointer
dataLeafPageGetUncompressed(Page page,int * nitems)210 dataLeafPageGetUncompressed(Page page, int *nitems)
211 {
212 	ItemPointer items;
213 
214 	Assert(!GinPageIsCompressed(page));
215 
216 	/*
217 	 * In the old pre-9.4 page format, the whole page content is used for
218 	 * uncompressed items, and the number of items is stored in 'maxoff'
219 	 */
220 	items = (ItemPointer) GinDataPageGetData(page);
221 	*nitems = GinPageGetOpaque(page)->maxoff;
222 
223 	return items;
224 }
225 
226 /*
227  * Check if we should follow the right link to find the item we're searching
228  * for.
229  *
230  * Compares inserting item pointer with the right bound of the current page.
231  */
232 static bool
dataIsMoveRight(GinBtree btree,Page page)233 dataIsMoveRight(GinBtree btree, Page page)
234 {
235 	ItemPointer iptr = GinDataPageGetRightBound(page);
236 
237 	if (GinPageRightMost(page))
238 		return FALSE;
239 
240 	if (GinPageIsDeleted(page))
241 		return TRUE;
242 
243 	return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
244 }
245 
246 /*
247  * Find correct PostingItem in non-leaf page. It is assumed that this is
248  * the correct page, and the searched value SHOULD be on the page.
249  */
250 static BlockNumber
dataLocateItem(GinBtree btree,GinBtreeStack * stack)251 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
252 {
253 	OffsetNumber low,
254 				high,
255 				maxoff;
256 	PostingItem *pitem = NULL;
257 	int			result;
258 	Page		page = BufferGetPage(stack->buffer);
259 
260 	Assert(!GinPageIsLeaf(page));
261 	Assert(GinPageIsData(page));
262 
263 	if (btree->fullScan)
264 	{
265 		stack->off = FirstOffsetNumber;
266 		stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
267 		return btree->getLeftMostChild(btree, page);
268 	}
269 
270 	low = FirstOffsetNumber;
271 	maxoff = high = GinPageGetOpaque(page)->maxoff;
272 	Assert(high >= low);
273 
274 	high++;
275 
276 	while (high > low)
277 	{
278 		OffsetNumber mid = low + ((high - low) / 2);
279 
280 		pitem = GinDataPageGetPostingItem(page, mid);
281 
282 		if (mid == maxoff)
283 		{
284 			/*
285 			 * Right infinity, page already correctly chosen with a help of
286 			 * dataIsMoveRight
287 			 */
288 			result = -1;
289 		}
290 		else
291 		{
292 			pitem = GinDataPageGetPostingItem(page, mid);
293 			result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
294 		}
295 
296 		if (result == 0)
297 		{
298 			stack->off = mid;
299 			return PostingItemGetBlockNumber(pitem);
300 		}
301 		else if (result > 0)
302 			low = mid + 1;
303 		else
304 			high = mid;
305 	}
306 
307 	Assert(high >= FirstOffsetNumber && high <= maxoff);
308 
309 	stack->off = high;
310 	pitem = GinDataPageGetPostingItem(page, high);
311 	return PostingItemGetBlockNumber(pitem);
312 }
313 
314 /*
315  * Find link to blkno on non-leaf page, returns offset of PostingItem
316  */
317 static OffsetNumber
dataFindChildPtr(GinBtree btree,Page page,BlockNumber blkno,OffsetNumber storedOff)318 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
319 {
320 	OffsetNumber i,
321 				maxoff = GinPageGetOpaque(page)->maxoff;
322 	PostingItem *pitem;
323 
324 	Assert(!GinPageIsLeaf(page));
325 	Assert(GinPageIsData(page));
326 
327 	/* if page isn't changed, we return storedOff */
328 	if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
329 	{
330 		pitem = GinDataPageGetPostingItem(page, storedOff);
331 		if (PostingItemGetBlockNumber(pitem) == blkno)
332 			return storedOff;
333 
334 		/*
335 		 * we hope, that needed pointer goes to right. It's true if there
336 		 * wasn't a deletion
337 		 */
338 		for (i = storedOff + 1; i <= maxoff; i++)
339 		{
340 			pitem = GinDataPageGetPostingItem(page, i);
341 			if (PostingItemGetBlockNumber(pitem) == blkno)
342 				return i;
343 		}
344 
345 		maxoff = storedOff - 1;
346 	}
347 
348 	/* last chance */
349 	for (i = FirstOffsetNumber; i <= maxoff; i++)
350 	{
351 		pitem = GinDataPageGetPostingItem(page, i);
352 		if (PostingItemGetBlockNumber(pitem) == blkno)
353 			return i;
354 	}
355 
356 	return InvalidOffsetNumber;
357 }
358 
359 /*
360  * Return blkno of leftmost child
361  */
362 static BlockNumber
dataGetLeftMostPage(GinBtree btree,Page page)363 dataGetLeftMostPage(GinBtree btree, Page page)
364 {
365 	PostingItem *pitem;
366 
367 	Assert(!GinPageIsLeaf(page));
368 	Assert(GinPageIsData(page));
369 	Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
370 
371 	pitem = GinDataPageGetPostingItem(page, FirstOffsetNumber);
372 	return PostingItemGetBlockNumber(pitem);
373 }
374 
375 /*
376  * Add PostingItem to a non-leaf page.
377  */
378 void
GinDataPageAddPostingItem(Page page,PostingItem * data,OffsetNumber offset)379 GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
380 {
381 	OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
382 	char	   *ptr;
383 
384 	Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
385 	Assert(!GinPageIsLeaf(page));
386 
387 	if (offset == InvalidOffsetNumber)
388 	{
389 		ptr = (char *) GinDataPageGetPostingItem(page, maxoff + 1);
390 	}
391 	else
392 	{
393 		ptr = (char *) GinDataPageGetPostingItem(page, offset);
394 		if (offset != maxoff + 1)
395 			memmove(ptr + sizeof(PostingItem),
396 					ptr,
397 					(maxoff - offset + 1) * sizeof(PostingItem));
398 	}
399 	memcpy(ptr, data, sizeof(PostingItem));
400 
401 	maxoff++;
402 	GinPageGetOpaque(page)->maxoff = maxoff;
403 
404 	/*
405 	 * Also set pd_lower to the end of the posting items, to follow the
406 	 * "standard" page layout, so that we can squeeze out the unused space
407 	 * from full-page images.
408 	 */
409 	GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
410 }
411 
412 /*
413  * Delete posting item from non-leaf page
414  */
415 void
GinPageDeletePostingItem(Page page,OffsetNumber offset)416 GinPageDeletePostingItem(Page page, OffsetNumber offset)
417 {
418 	OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
419 
420 	Assert(!GinPageIsLeaf(page));
421 	Assert(offset >= FirstOffsetNumber && offset <= maxoff);
422 
423 	if (offset != maxoff)
424 		memmove(GinDataPageGetPostingItem(page, offset),
425 				GinDataPageGetPostingItem(page, offset + 1),
426 				sizeof(PostingItem) * (maxoff - offset));
427 
428 	maxoff--;
429 	GinPageGetOpaque(page)->maxoff = maxoff;
430 
431 	GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
432 }
433 
434 /*
435  * Prepare to insert data on a leaf data page.
436  *
437  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
438  * before we enter the insertion critical section.  *ptp_workspace can be
439  * set to pass information along to the execPlaceToPage function.
440  *
441  * If it won't fit, perform a page split and return two temporary page
442  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
443  *
444  * In neither case should the given page buffer be modified here.
445  */
446 static GinPlaceToPageRC
dataBeginPlaceToPageLeaf(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,void ** ptp_workspace,Page * newlpage,Page * newrpage)447 dataBeginPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
448 						 void *insertdata,
449 						 void **ptp_workspace,
450 						 Page *newlpage, Page *newrpage)
451 {
452 	GinBtreeDataLeafInsertData *items = insertdata;
453 	ItemPointer newItems = &items->items[items->curitem];
454 	int			maxitems = items->nitem - items->curitem;
455 	Page		page = BufferGetPage(buf);
456 	int			i;
457 	ItemPointerData rbound;
458 	ItemPointerData lbound;
459 	bool		needsplit;
460 	bool		append;
461 	int			segsize;
462 	Size		freespace;
463 	disassembledLeaf *leaf;
464 	leafSegmentInfo *lastleftinfo;
465 	ItemPointerData maxOldItem;
466 	ItemPointerData remaining;
467 
468 	rbound = *GinDataPageGetRightBound(page);
469 
470 	/*
471 	 * Count how many of the new items belong to this page.
472 	 */
473 	if (!GinPageRightMost(page))
474 	{
475 		for (i = 0; i < maxitems; i++)
476 		{
477 			if (ginCompareItemPointers(&newItems[i], &rbound) > 0)
478 			{
479 				/*
480 				 * This needs to go to some other location in the tree. (The
481 				 * caller should've chosen the insert location so that at
482 				 * least the first item goes here.)
483 				 */
484 				Assert(i > 0);
485 				break;
486 			}
487 		}
488 		maxitems = i;
489 	}
490 
491 	/* Disassemble the data on the page */
492 	leaf = disassembleLeaf(page);
493 
494 	/*
495 	 * Are we appending to the end of the page? IOW, are all the new items
496 	 * larger than any of the existing items.
497 	 */
498 	if (!dlist_is_empty(&leaf->segments))
499 	{
500 		lastleftinfo = dlist_container(leafSegmentInfo, node,
501 									   dlist_tail_node(&leaf->segments));
502 		if (!lastleftinfo->items)
503 			lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
504 													   &lastleftinfo->nitems);
505 		maxOldItem = lastleftinfo->items[lastleftinfo->nitems - 1];
506 		if (ginCompareItemPointers(&newItems[0], &maxOldItem) >= 0)
507 			append = true;
508 		else
509 			append = false;
510 	}
511 	else
512 	{
513 		ItemPointerSetMin(&maxOldItem);
514 		append = true;
515 	}
516 
517 	/*
518 	 * If we're appending to the end of the page, we will append as many items
519 	 * as we can fit (after splitting), and stop when the pages becomes full.
520 	 * Otherwise we have to limit the number of new items to insert, because
521 	 * once we start packing we can't just stop when we run out of space,
522 	 * because we must make sure that all the old items still fit.
523 	 */
524 	if (GinPageIsCompressed(page))
525 		freespace = GinDataLeafPageGetFreeSpace(page);
526 	else
527 		freespace = 0;
528 	if (append)
529 	{
530 		/*
531 		 * Even when appending, trying to append more items than will fit is
532 		 * not completely free, because we will merge the new items and old
533 		 * items into an array below. In the best case, every new item fits in
534 		 * a single byte, and we can use all the free space on the old page as
535 		 * well as the new page. For simplicity, ignore segment overhead etc.
536 		 */
537 		maxitems = Min(maxitems, freespace + GinDataPageMaxDataSize);
538 	}
539 	else
540 	{
541 		/*
542 		 * Calculate a conservative estimate of how many new items we can fit
543 		 * on the two pages after splitting.
544 		 *
545 		 * We can use any remaining free space on the old page to store full
546 		 * segments, as well as the new page. Each full-sized segment can hold
547 		 * at least MinTuplesPerSegment items
548 		 */
549 		int			nnewsegments;
550 
551 		nnewsegments = freespace / GinPostingListSegmentMaxSize;
552 		nnewsegments += GinDataPageMaxDataSize / GinPostingListSegmentMaxSize;
553 		maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
554 	}
555 
556 	/* Add the new items to the segment list */
557 	if (!addItemsToLeaf(leaf, newItems, maxitems))
558 	{
559 		/* all items were duplicates, we have nothing to do */
560 		items->curitem += maxitems;
561 
562 		return GPTP_NO_WORK;
563 	}
564 
565 	/*
566 	 * Pack the items back to compressed segments, ready for writing to disk.
567 	 */
568 	needsplit = leafRepackItems(leaf, &remaining);
569 
570 	/*
571 	 * Did all the new items fit?
572 	 *
573 	 * If we're appending, it's OK if they didn't. But as a sanity check,
574 	 * verify that all the old items fit.
575 	 */
576 	if (ItemPointerIsValid(&remaining))
577 	{
578 		if (!append || ItemPointerCompare(&maxOldItem, &remaining) >= 0)
579 			elog(ERROR, "could not split GIN page; all old items didn't fit");
580 
581 		/* Count how many of the new items did fit. */
582 		for (i = 0; i < maxitems; i++)
583 		{
584 			if (ginCompareItemPointers(&newItems[i], &remaining) >= 0)
585 				break;
586 		}
587 		if (i == 0)
588 			elog(ERROR, "could not split GIN page; no new items fit");
589 		maxitems = i;
590 	}
591 
592 	if (!needsplit)
593 	{
594 		/*
595 		 * Great, all the items fit on a single page.  If needed, prepare data
596 		 * for a WAL record describing the changes we'll make.
597 		 */
598 		if (RelationNeedsWAL(btree->index))
599 			computeLeafRecompressWALData(leaf);
600 
601 		/*
602 		 * We're ready to enter the critical section, but
603 		 * dataExecPlaceToPageLeaf will need access to the "leaf" data.
604 		 */
605 		*ptp_workspace = leaf;
606 
607 		if (append)
608 			elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
609 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
610 				 items->nitem - items->curitem - maxitems);
611 		else
612 			elog(DEBUG2, "inserted %d new items to block %u; %d bytes (%d to go)",
613 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
614 				 items->nitem - items->curitem - maxitems);
615 	}
616 	else
617 	{
618 		/*
619 		 * Have to split.
620 		 *
621 		 * leafRepackItems already divided the segments between the left and
622 		 * the right page. It filled the left page as full as possible, and
623 		 * put the rest to the right page. When building a new index, that's
624 		 * good, because the table is scanned from beginning to end and there
625 		 * won't be any more insertions to the left page during the build.
626 		 * This packs the index as tight as possible. But otherwise, split
627 		 * 50/50, by moving segments from the left page to the right page
628 		 * until they're balanced.
629 		 *
630 		 * As a further heuristic, when appending items to the end of the
631 		 * page, try to make the left page 75% full, on the assumption that
632 		 * subsequent insertions will probably also go to the end. This packs
633 		 * the index somewhat tighter when appending to a table, which is very
634 		 * common.
635 		 */
636 		if (!btree->isBuild)
637 		{
638 			while (dlist_has_prev(&leaf->segments, leaf->lastleft))
639 			{
640 				lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
641 
642 				/* ignore deleted segments */
643 				if (lastleftinfo->action != GIN_SEGMENT_DELETE)
644 				{
645 					segsize = SizeOfGinPostingList(lastleftinfo->seg);
646 
647 					/*
648 					 * Note that we check that the right page doesn't become
649 					 * more full than the left page even when appending. It's
650 					 * possible that we added enough items to make both pages
651 					 * more than 75% full.
652 					 */
653 					if ((leaf->lsize - segsize) - (leaf->rsize + segsize) < 0)
654 						break;
655 					if (append)
656 					{
657 						if ((leaf->lsize - segsize) < (BLCKSZ * 3) / 4)
658 							break;
659 					}
660 
661 					leaf->lsize -= segsize;
662 					leaf->rsize += segsize;
663 				}
664 				leaf->lastleft = dlist_prev_node(&leaf->segments, leaf->lastleft);
665 			}
666 		}
667 		Assert(leaf->lsize <= GinDataPageMaxDataSize);
668 		Assert(leaf->rsize <= GinDataPageMaxDataSize);
669 
670 		/*
671 		 * Fetch the max item in the left page's last segment; it becomes the
672 		 * right bound of the page.
673 		 */
674 		lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
675 		if (!lastleftinfo->items)
676 			lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
677 													   &lastleftinfo->nitems);
678 		lbound = lastleftinfo->items[lastleftinfo->nitems - 1];
679 
680 		/*
681 		 * Now allocate a couple of temporary page images, and fill them.
682 		 */
683 		*newlpage = palloc(BLCKSZ);
684 		*newrpage = palloc(BLCKSZ);
685 
686 		dataPlaceToPageLeafSplit(leaf, lbound, rbound,
687 								 *newlpage, *newrpage);
688 
689 		Assert(GinPageRightMost(page) ||
690 			   ginCompareItemPointers(GinDataPageGetRightBound(*newlpage),
691 									  GinDataPageGetRightBound(*newrpage)) < 0);
692 
693 		if (append)
694 			elog(DEBUG2, "appended %d items to block %u; split %d/%d (%d to go)",
695 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
696 				 items->nitem - items->curitem - maxitems);
697 		else
698 			elog(DEBUG2, "inserted %d items to block %u; split %d/%d (%d to go)",
699 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
700 				 items->nitem - items->curitem - maxitems);
701 	}
702 
703 	items->curitem += maxitems;
704 
705 	return needsplit ? GPTP_SPLIT : GPTP_INSERT;
706 }
707 
708 /*
709  * Perform data insertion after beginPlaceToPage has decided it will fit.
710  *
711  * This is invoked within a critical section, and XLOG record creation (if
712  * needed) is already started.  The target buffer is registered in slot 0.
713  */
714 static void
dataExecPlaceToPageLeaf(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,void * ptp_workspace)715 dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
716 						void *insertdata, void *ptp_workspace)
717 {
718 	disassembledLeaf *leaf = (disassembledLeaf *) ptp_workspace;
719 
720 	/* Apply changes to page */
721 	dataPlaceToPageLeafRecompress(buf, leaf);
722 
723 	/* If needed, register WAL data built by computeLeafRecompressWALData */
724 	if (RelationNeedsWAL(btree->index))
725 	{
726 		XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
727 	}
728 }
729 
730 /*
731  * Vacuum a posting tree leaf page.
732  */
733 void
ginVacuumPostingTreeLeaf(Relation indexrel,Buffer buffer,GinVacuumState * gvs)734 ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs)
735 {
736 	Page		page = BufferGetPage(buffer);
737 	disassembledLeaf *leaf;
738 	bool		removedsomething = false;
739 	dlist_iter	iter;
740 
741 	leaf = disassembleLeaf(page);
742 
743 	/* Vacuum each segment. */
744 	dlist_foreach(iter, &leaf->segments)
745 	{
746 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
747 		int			oldsegsize;
748 		ItemPointer cleaned;
749 		int			ncleaned;
750 
751 		if (!seginfo->items)
752 			seginfo->items = ginPostingListDecode(seginfo->seg,
753 												  &seginfo->nitems);
754 		if (seginfo->seg)
755 			oldsegsize = SizeOfGinPostingList(seginfo->seg);
756 		else
757 			oldsegsize = GinDataPageMaxDataSize;
758 
759 		cleaned = ginVacuumItemPointers(gvs,
760 										seginfo->items,
761 										seginfo->nitems,
762 										&ncleaned);
763 		pfree(seginfo->items);
764 		seginfo->items = NULL;
765 		seginfo->nitems = 0;
766 		if (cleaned)
767 		{
768 			if (ncleaned > 0)
769 			{
770 				int			npacked;
771 
772 				seginfo->seg = ginCompressPostingList(cleaned,
773 													  ncleaned,
774 													  oldsegsize,
775 													  &npacked);
776 				/* Removing an item never increases the size of the segment */
777 				if (npacked != ncleaned)
778 					elog(ERROR, "could not fit vacuumed posting list");
779 				seginfo->action = GIN_SEGMENT_REPLACE;
780 			}
781 			else
782 			{
783 				seginfo->seg = NULL;
784 				seginfo->items = NULL;
785 				seginfo->action = GIN_SEGMENT_DELETE;
786 			}
787 			seginfo->nitems = ncleaned;
788 
789 			removedsomething = true;
790 		}
791 	}
792 
793 	/*
794 	 * If we removed any items, reconstruct the page from the pieces.
795 	 *
796 	 * We don't try to re-encode the segments here, even though some of them
797 	 * might be really small now that we've removed some items from them. It
798 	 * seems like a waste of effort, as there isn't really any benefit from
799 	 * larger segments per se; larger segments only help to pack more items in
800 	 * the same space. We might as well delay doing that until the next
801 	 * insertion, which will need to re-encode at least part of the page
802 	 * anyway.
803 	 *
804 	 * Also note if the page was in uncompressed, pre-9.4 format before, it is
805 	 * now represented as one huge segment that contains all the items. It
806 	 * might make sense to split that, to speed up random access, but we don't
807 	 * bother. You'll have to REINDEX anyway if you want the full gain of the
808 	 * new tighter index format.
809 	 */
810 	if (removedsomething)
811 	{
812 		bool		modified;
813 
814 		/*
815 		 * Make sure we have a palloc'd copy of all segments, after the first
816 		 * segment that is modified. (dataPlaceToPageLeafRecompress requires
817 		 * this).
818 		 */
819 		modified = false;
820 		dlist_foreach(iter, &leaf->segments)
821 		{
822 			leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
823 													   iter.cur);
824 
825 			if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
826 				modified = true;
827 			if (modified && seginfo->action != GIN_SEGMENT_DELETE)
828 			{
829 				int			segsize = SizeOfGinPostingList(seginfo->seg);
830 				GinPostingList *tmp = (GinPostingList *) palloc(segsize);
831 
832 				memcpy(tmp, seginfo->seg, segsize);
833 				seginfo->seg = tmp;
834 			}
835 		}
836 
837 		if (RelationNeedsWAL(indexrel))
838 			computeLeafRecompressWALData(leaf);
839 
840 		/* Apply changes to page */
841 		START_CRIT_SECTION();
842 
843 		dataPlaceToPageLeafRecompress(buffer, leaf);
844 
845 		MarkBufferDirty(buffer);
846 
847 		if (RelationNeedsWAL(indexrel))
848 		{
849 			XLogRecPtr	recptr;
850 
851 			XLogBeginInsert();
852 			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
853 			XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
854 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE);
855 			PageSetLSN(page, recptr);
856 		}
857 
858 		END_CRIT_SECTION();
859 	}
860 }
861 
862 /*
863  * Construct a ginxlogRecompressDataLeaf record representing the changes
864  * in *leaf.  (Because this requires a palloc, we have to do it before
865  * we enter the critical section that actually updates the page.)
866  */
867 static void
computeLeafRecompressWALData(disassembledLeaf * leaf)868 computeLeafRecompressWALData(disassembledLeaf *leaf)
869 {
870 	int			nmodified = 0;
871 	char	   *walbufbegin;
872 	char	   *walbufend;
873 	dlist_iter	iter;
874 	int			segno;
875 	ginxlogRecompressDataLeaf *recompress_xlog;
876 
877 	/* Count the modified segments */
878 	dlist_foreach(iter, &leaf->segments)
879 	{
880 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
881 												   iter.cur);
882 
883 		if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
884 			nmodified++;
885 	}
886 
887 	walbufbegin =
888 		palloc(sizeof(ginxlogRecompressDataLeaf) +
889 			   BLCKSZ +			/* max size needed to hold the segment data */
890 			   nmodified * 2	/* (segno + action) per action */
891 		);
892 	walbufend = walbufbegin;
893 
894 	recompress_xlog = (ginxlogRecompressDataLeaf *) walbufend;
895 	walbufend += sizeof(ginxlogRecompressDataLeaf);
896 
897 	recompress_xlog->nactions = nmodified;
898 
899 	segno = 0;
900 	dlist_foreach(iter, &leaf->segments)
901 	{
902 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
903 												   iter.cur);
904 		int			segsize = 0;
905 		int			datalen;
906 		uint8		action = seginfo->action;
907 
908 		if (action == GIN_SEGMENT_UNMODIFIED)
909 		{
910 			segno++;
911 			continue;
912 		}
913 
914 		if (action != GIN_SEGMENT_DELETE)
915 			segsize = SizeOfGinPostingList(seginfo->seg);
916 
917 		/*
918 		 * If storing the uncompressed list of added item pointers would take
919 		 * more space than storing the compressed segment as is, do that
920 		 * instead.
921 		 */
922 		if (action == GIN_SEGMENT_ADDITEMS &&
923 			seginfo->nmodifieditems * sizeof(ItemPointerData) > segsize)
924 		{
925 			action = GIN_SEGMENT_REPLACE;
926 		}
927 
928 		*((uint8 *) (walbufend++)) = segno;
929 		*(walbufend++) = action;
930 
931 		switch (action)
932 		{
933 			case GIN_SEGMENT_DELETE:
934 				datalen = 0;
935 				break;
936 
937 			case GIN_SEGMENT_ADDITEMS:
938 				datalen = seginfo->nmodifieditems * sizeof(ItemPointerData);
939 				memcpy(walbufend, &seginfo->nmodifieditems, sizeof(uint16));
940 				memcpy(walbufend + sizeof(uint16), seginfo->modifieditems, datalen);
941 				datalen += sizeof(uint16);
942 				break;
943 
944 			case GIN_SEGMENT_INSERT:
945 			case GIN_SEGMENT_REPLACE:
946 				datalen = SHORTALIGN(segsize);
947 				memcpy(walbufend, seginfo->seg, segsize);
948 				break;
949 
950 			default:
951 				elog(ERROR, "unexpected GIN leaf action %d", action);
952 		}
953 		walbufend += datalen;
954 
955 		if (action != GIN_SEGMENT_INSERT)
956 			segno++;
957 	}
958 
959 	/* Pass back the constructed info via *leaf */
960 	leaf->walinfo = walbufbegin;
961 	leaf->walinfolen = walbufend - walbufbegin;
962 }
963 
964 /*
965  * Assemble a disassembled posting tree leaf page back to a buffer.
966  *
967  * This just updates the target buffer; WAL stuff is caller's responsibility.
968  *
969  * NOTE: The segment pointers must not point directly to the same buffer,
970  * except for segments that have not been modified and whose preceding
971  * segments have not been modified either.
972  */
973 static void
dataPlaceToPageLeafRecompress(Buffer buf,disassembledLeaf * leaf)974 dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf)
975 {
976 	Page		page = BufferGetPage(buf);
977 	char	   *ptr;
978 	int			newsize;
979 	bool		modified = false;
980 	dlist_iter	iter;
981 	int			segsize;
982 
983 	/*
984 	 * If the page was in pre-9.4 format before, convert the header, and force
985 	 * all segments to be copied to the page whether they were modified or
986 	 * not.
987 	 */
988 	if (!GinPageIsCompressed(page))
989 	{
990 		Assert(leaf->oldformat);
991 		GinPageSetCompressed(page);
992 		GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
993 		modified = true;
994 	}
995 
996 	ptr = (char *) GinDataLeafPageGetPostingList(page);
997 	newsize = 0;
998 	dlist_foreach(iter, &leaf->segments)
999 	{
1000 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
1001 
1002 		if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
1003 			modified = true;
1004 
1005 		if (seginfo->action != GIN_SEGMENT_DELETE)
1006 		{
1007 			segsize = SizeOfGinPostingList(seginfo->seg);
1008 
1009 			if (modified)
1010 				memcpy(ptr, seginfo->seg, segsize);
1011 
1012 			ptr += segsize;
1013 			newsize += segsize;
1014 		}
1015 	}
1016 
1017 	Assert(newsize <= GinDataPageMaxDataSize);
1018 	GinDataPageSetDataSize(page, newsize);
1019 }
1020 
1021 /*
1022  * Like dataPlaceToPageLeafRecompress, but writes the disassembled leaf
1023  * segments to two pages instead of one.
1024  *
1025  * This is different from the non-split cases in that this does not modify
1026  * the original page directly, but writes to temporary in-memory copies of
1027  * the new left and right pages.
1028  */
1029 static void
dataPlaceToPageLeafSplit(disassembledLeaf * leaf,ItemPointerData lbound,ItemPointerData rbound,Page lpage,Page rpage)1030 dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
1031 						 ItemPointerData lbound, ItemPointerData rbound,
1032 						 Page lpage, Page rpage)
1033 {
1034 	char	   *ptr;
1035 	int			segsize;
1036 	int			lsize;
1037 	int			rsize;
1038 	dlist_node *node;
1039 	dlist_node *firstright;
1040 	leafSegmentInfo *seginfo;
1041 
1042 	/* Initialize temporary pages to hold the new left and right pages */
1043 	GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1044 	GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1045 
1046 	/*
1047 	 * Copy the segments that go to the left page.
1048 	 *
1049 	 * XXX: We should skip copying the unmodified part of the left page, like
1050 	 * we do when recompressing.
1051 	 */
1052 	lsize = 0;
1053 	ptr = (char *) GinDataLeafPageGetPostingList(lpage);
1054 	firstright = dlist_next_node(&leaf->segments, leaf->lastleft);
1055 	for (node = dlist_head_node(&leaf->segments);
1056 		 node != firstright;
1057 		 node = dlist_next_node(&leaf->segments, node))
1058 	{
1059 		seginfo = dlist_container(leafSegmentInfo, node, node);
1060 
1061 		if (seginfo->action != GIN_SEGMENT_DELETE)
1062 		{
1063 			segsize = SizeOfGinPostingList(seginfo->seg);
1064 			memcpy(ptr, seginfo->seg, segsize);
1065 			ptr += segsize;
1066 			lsize += segsize;
1067 		}
1068 	}
1069 	Assert(lsize == leaf->lsize);
1070 	GinDataPageSetDataSize(lpage, lsize);
1071 	*GinDataPageGetRightBound(lpage) = lbound;
1072 
1073 	/* Copy the segments that go to the right page */
1074 	ptr = (char *) GinDataLeafPageGetPostingList(rpage);
1075 	rsize = 0;
1076 	for (node = firstright;
1077 		 ;
1078 		 node = dlist_next_node(&leaf->segments, node))
1079 	{
1080 		seginfo = dlist_container(leafSegmentInfo, node, node);
1081 
1082 		if (seginfo->action != GIN_SEGMENT_DELETE)
1083 		{
1084 			segsize = SizeOfGinPostingList(seginfo->seg);
1085 			memcpy(ptr, seginfo->seg, segsize);
1086 			ptr += segsize;
1087 			rsize += segsize;
1088 		}
1089 
1090 		if (!dlist_has_next(&leaf->segments, node))
1091 			break;
1092 	}
1093 	Assert(rsize == leaf->rsize);
1094 	GinDataPageSetDataSize(rpage, rsize);
1095 	*GinDataPageGetRightBound(rpage) = rbound;
1096 }
1097 
1098 /*
1099  * Prepare to insert data on an internal data page.
1100  *
1101  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
1102  * before we enter the insertion critical section.  *ptp_workspace can be
1103  * set to pass information along to the execPlaceToPage function.
1104  *
1105  * If it won't fit, perform a page split and return two temporary page
1106  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
1107  *
1108  * In neither case should the given page buffer be modified here.
1109  *
1110  * Note: on insertion to an internal node, in addition to inserting the given
1111  * item, the downlink of the existing item at stack->off will be updated to
1112  * point to updateblkno.
1113  */
1114 static GinPlaceToPageRC
dataBeginPlaceToPageInternal(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)1115 dataBeginPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1116 							 void *insertdata, BlockNumber updateblkno,
1117 							 void **ptp_workspace,
1118 							 Page *newlpage, Page *newrpage)
1119 {
1120 	Page		page = BufferGetPage(buf);
1121 
1122 	/* If it doesn't fit, deal with split case */
1123 	if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
1124 	{
1125 		dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
1126 							  newlpage, newrpage);
1127 		return GPTP_SPLIT;
1128 	}
1129 
1130 	/* Else, we're ready to proceed with insertion */
1131 	return GPTP_INSERT;
1132 }
1133 
1134 /*
1135  * Perform data insertion after beginPlaceToPage has decided it will fit.
1136  *
1137  * This is invoked within a critical section, and XLOG record creation (if
1138  * needed) is already started.  The target buffer is registered in slot 0.
1139  */
1140 static void
dataExecPlaceToPageInternal(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void * ptp_workspace)1141 dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1142 							void *insertdata, BlockNumber updateblkno,
1143 							void *ptp_workspace)
1144 {
1145 	Page		page = BufferGetPage(buf);
1146 	OffsetNumber off = stack->off;
1147 	PostingItem *pitem;
1148 
1149 	/* Update existing downlink to point to next page (on internal page) */
1150 	pitem = GinDataPageGetPostingItem(page, off);
1151 	PostingItemSetBlockNumber(pitem, updateblkno);
1152 
1153 	/* Add new item */
1154 	pitem = (PostingItem *) insertdata;
1155 	GinDataPageAddPostingItem(page, pitem, off);
1156 
1157 	if (RelationNeedsWAL(btree->index))
1158 	{
1159 		/*
1160 		 * This must be static, because it has to survive until XLogInsert,
1161 		 * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
1162 		 * isn't reentrant anyway.
1163 		 */
1164 		static ginxlogInsertDataInternal data;
1165 
1166 		data.offset = off;
1167 		data.newitem = *pitem;
1168 
1169 		XLogRegisterBufData(0, (char *) &data,
1170 							sizeof(ginxlogInsertDataInternal));
1171 	}
1172 }
1173 
1174 /*
1175  * Prepare to insert data on a posting-tree data page.
1176  *
1177  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
1178  * before we enter the insertion critical section.  *ptp_workspace can be
1179  * set to pass information along to the execPlaceToPage function.
1180  *
1181  * If it won't fit, perform a page split and return two temporary page
1182  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
1183  *
1184  * In neither case should the given page buffer be modified here.
1185  *
1186  * Note: on insertion to an internal node, in addition to inserting the given
1187  * item, the downlink of the existing item at stack->off will be updated to
1188  * point to updateblkno.
1189  *
1190  * Calls relevant function for internal or leaf page because they are handled
1191  * very differently.
1192  */
1193 static GinPlaceToPageRC
dataBeginPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)1194 dataBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1195 					 void *insertdata, BlockNumber updateblkno,
1196 					 void **ptp_workspace,
1197 					 Page *newlpage, Page *newrpage)
1198 {
1199 	Page		page = BufferGetPage(buf);
1200 
1201 	Assert(GinPageIsData(page));
1202 
1203 	if (GinPageIsLeaf(page))
1204 		return dataBeginPlaceToPageLeaf(btree, buf, stack, insertdata,
1205 										ptp_workspace,
1206 										newlpage, newrpage);
1207 	else
1208 		return dataBeginPlaceToPageInternal(btree, buf, stack,
1209 											insertdata, updateblkno,
1210 											ptp_workspace,
1211 											newlpage, newrpage);
1212 }
1213 
1214 /*
1215  * Perform data insertion after beginPlaceToPage has decided it will fit.
1216  *
1217  * This is invoked within a critical section, and XLOG record creation (if
1218  * needed) is already started.  The target buffer is registered in slot 0.
1219  *
1220  * Calls relevant function for internal or leaf page because they are handled
1221  * very differently.
1222  */
1223 static void
dataExecPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void * ptp_workspace)1224 dataExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1225 					void *insertdata, BlockNumber updateblkno,
1226 					void *ptp_workspace)
1227 {
1228 	Page		page = BufferGetPage(buf);
1229 
1230 	if (GinPageIsLeaf(page))
1231 		dataExecPlaceToPageLeaf(btree, buf, stack, insertdata,
1232 								ptp_workspace);
1233 	else
1234 		dataExecPlaceToPageInternal(btree, buf, stack, insertdata,
1235 									updateblkno, ptp_workspace);
1236 }
1237 
1238 /*
1239  * Split internal page and insert new data.
1240  *
1241  * Returns new temp pages to *newlpage and *newrpage.
1242  * The original buffer is left untouched.
1243  */
1244 static void
dataSplitPageInternal(GinBtree btree,Buffer origbuf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Page * newlpage,Page * newrpage)1245 dataSplitPageInternal(GinBtree btree, Buffer origbuf,
1246 					  GinBtreeStack *stack,
1247 					  void *insertdata, BlockNumber updateblkno,
1248 					  Page *newlpage, Page *newrpage)
1249 {
1250 	Page		oldpage = BufferGetPage(origbuf);
1251 	OffsetNumber off = stack->off;
1252 	int			nitems = GinPageGetOpaque(oldpage)->maxoff;
1253 	int			nleftitems;
1254 	int			nrightitems;
1255 	Size		pageSize = PageGetPageSize(oldpage);
1256 	ItemPointerData oldbound = *GinDataPageGetRightBound(oldpage);
1257 	ItemPointer bound;
1258 	Page		lpage;
1259 	Page		rpage;
1260 	OffsetNumber separator;
1261 	PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
1262 
1263 	lpage = PageGetTempPage(oldpage);
1264 	rpage = PageGetTempPage(oldpage);
1265 	GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1266 	GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1267 
1268 	/*
1269 	 * First construct a new list of PostingItems, which includes all the old
1270 	 * items, and the new item.
1271 	 */
1272 	memcpy(allitems, GinDataPageGetPostingItem(oldpage, FirstOffsetNumber),
1273 		   (off - 1) * sizeof(PostingItem));
1274 
1275 	allitems[off - 1] = *((PostingItem *) insertdata);
1276 	memcpy(&allitems[off], GinDataPageGetPostingItem(oldpage, off),
1277 		   (nitems - (off - 1)) * sizeof(PostingItem));
1278 	nitems++;
1279 
1280 	/* Update existing downlink to point to next page */
1281 	PostingItemSetBlockNumber(&allitems[off], updateblkno);
1282 
1283 	/*
1284 	 * When creating a new index, fit as many tuples as possible on the left
1285 	 * page, on the assumption that the table is scanned from beginning to
1286 	 * end. This packs the index as tight as possible.
1287 	 */
1288 	if (btree->isBuild && GinPageRightMost(oldpage))
1289 		separator = GinNonLeafDataPageGetFreeSpace(rpage) / sizeof(PostingItem);
1290 	else
1291 		separator = nitems / 2;
1292 	nleftitems = separator;
1293 	nrightitems = nitems - separator;
1294 
1295 	memcpy(GinDataPageGetPostingItem(lpage, FirstOffsetNumber),
1296 		   allitems,
1297 		   nleftitems * sizeof(PostingItem));
1298 	GinPageGetOpaque(lpage)->maxoff = nleftitems;
1299 	memcpy(GinDataPageGetPostingItem(rpage, FirstOffsetNumber),
1300 		   &allitems[separator],
1301 		   nrightitems * sizeof(PostingItem));
1302 	GinPageGetOpaque(rpage)->maxoff = nrightitems;
1303 
1304 	/*
1305 	 * Also set pd_lower for both pages, like GinDataPageAddPostingItem does.
1306 	 */
1307 	GinDataPageSetDataSize(lpage, nleftitems * sizeof(PostingItem));
1308 	GinDataPageSetDataSize(rpage, nrightitems * sizeof(PostingItem));
1309 
1310 	/* set up right bound for left page */
1311 	bound = GinDataPageGetRightBound(lpage);
1312 	*bound = GinDataPageGetPostingItem(lpage, nleftitems)->key;
1313 
1314 	/* set up right bound for right page */
1315 	*GinDataPageGetRightBound(rpage) = oldbound;
1316 
1317 	/* return temp pages to caller */
1318 	*newlpage = lpage;
1319 	*newrpage = rpage;
1320 }
1321 
1322 /*
1323  * Construct insertion payload for inserting the downlink for given buffer.
1324  */
1325 static void *
dataPrepareDownlink(GinBtree btree,Buffer lbuf)1326 dataPrepareDownlink(GinBtree btree, Buffer lbuf)
1327 {
1328 	PostingItem *pitem = palloc(sizeof(PostingItem));
1329 	Page		lpage = BufferGetPage(lbuf);
1330 
1331 	PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
1332 	pitem->key = *GinDataPageGetRightBound(lpage);
1333 
1334 	return pitem;
1335 }
1336 
1337 /*
1338  * Fills new root by right bound values from child.
1339  * Also called from ginxlog, should not use btree
1340  */
1341 void
ginDataFillRoot(GinBtree btree,Page root,BlockNumber lblkno,Page lpage,BlockNumber rblkno,Page rpage)1342 ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
1343 {
1344 	PostingItem li,
1345 				ri;
1346 
1347 	li.key = *GinDataPageGetRightBound(lpage);
1348 	PostingItemSetBlockNumber(&li, lblkno);
1349 	GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
1350 
1351 	ri.key = *GinDataPageGetRightBound(rpage);
1352 	PostingItemSetBlockNumber(&ri, rblkno);
1353 	GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
1354 }
1355 
1356 
1357 /*** Functions to work with disassembled leaf pages ***/
1358 
1359 /*
1360  * Disassemble page into a disassembledLeaf struct.
1361  */
1362 static disassembledLeaf *
disassembleLeaf(Page page)1363 disassembleLeaf(Page page)
1364 {
1365 	disassembledLeaf *leaf;
1366 	GinPostingList *seg;
1367 	Pointer		segbegin;
1368 	Pointer		segend;
1369 
1370 	leaf = palloc0(sizeof(disassembledLeaf));
1371 	dlist_init(&leaf->segments);
1372 
1373 	if (GinPageIsCompressed(page))
1374 	{
1375 		/*
1376 		 * Create a leafSegment entry for each segment.
1377 		 */
1378 		seg = GinDataLeafPageGetPostingList(page);
1379 		segbegin = (Pointer) seg;
1380 		segend = segbegin + GinDataLeafPageGetPostingListSize(page);
1381 		while ((Pointer) seg < segend)
1382 		{
1383 			leafSegmentInfo *seginfo = palloc(sizeof(leafSegmentInfo));
1384 
1385 			seginfo->action = GIN_SEGMENT_UNMODIFIED;
1386 			seginfo->seg = seg;
1387 			seginfo->items = NULL;
1388 			seginfo->nitems = 0;
1389 			dlist_push_tail(&leaf->segments, &seginfo->node);
1390 
1391 			seg = GinNextPostingListSegment(seg);
1392 		}
1393 		leaf->oldformat = false;
1394 	}
1395 	else
1396 	{
1397 		/*
1398 		 * A pre-9.4 format uncompressed page is represented by a single
1399 		 * segment, with an array of items.  The corner case is uncompressed
1400 		 * page containing no items, which is represented as no segments.
1401 		 */
1402 		ItemPointer uncompressed;
1403 		int			nuncompressed;
1404 		leafSegmentInfo *seginfo;
1405 
1406 		uncompressed = dataLeafPageGetUncompressed(page, &nuncompressed);
1407 
1408 		if (nuncompressed > 0)
1409 		{
1410 			seginfo = palloc(sizeof(leafSegmentInfo));
1411 
1412 			seginfo->action = GIN_SEGMENT_REPLACE;
1413 			seginfo->seg = NULL;
1414 			seginfo->items = palloc(nuncompressed * sizeof(ItemPointerData));
1415 			memcpy(seginfo->items, uncompressed, nuncompressed * sizeof(ItemPointerData));
1416 			seginfo->nitems = nuncompressed;
1417 
1418 			dlist_push_tail(&leaf->segments, &seginfo->node);
1419 		}
1420 
1421 		leaf->oldformat = true;
1422 	}
1423 
1424 	return leaf;
1425 }
1426 
1427 /*
1428  * Distribute newItems to the segments.
1429  *
1430  * Any segments that acquire new items are decoded, and the new items are
1431  * merged with the old items.
1432  *
1433  * Returns true if any new items were added. False means they were all
1434  * duplicates of existing items on the page.
1435  */
1436 static bool
addItemsToLeaf(disassembledLeaf * leaf,ItemPointer newItems,int nNewItems)1437 addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems)
1438 {
1439 	dlist_iter	iter;
1440 	ItemPointer nextnew = newItems;
1441 	int			newleft = nNewItems;
1442 	bool		modified = false;
1443 	leafSegmentInfo *newseg;
1444 
1445 	/*
1446 	 * If the page is completely empty, just construct one new segment to hold
1447 	 * all the new items.
1448 	 */
1449 	if (dlist_is_empty(&leaf->segments))
1450 	{
1451 		newseg = palloc(sizeof(leafSegmentInfo));
1452 		newseg->seg = NULL;
1453 		newseg->items = newItems;
1454 		newseg->nitems = nNewItems;
1455 		newseg->action = GIN_SEGMENT_INSERT;
1456 		dlist_push_tail(&leaf->segments, &newseg->node);
1457 		return true;
1458 	}
1459 
1460 	dlist_foreach(iter, &leaf->segments)
1461 	{
1462 		leafSegmentInfo *cur = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node, iter.cur);
1463 		int			nthis;
1464 		ItemPointer tmpitems;
1465 		int			ntmpitems;
1466 
1467 		/*
1468 		 * How many of the new items fall into this segment?
1469 		 */
1470 		if (!dlist_has_next(&leaf->segments, iter.cur))
1471 			nthis = newleft;
1472 		else
1473 		{
1474 			leafSegmentInfo *next;
1475 			ItemPointerData next_first;
1476 
1477 			next = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node,
1478 													   dlist_next_node(&leaf->segments, iter.cur));
1479 			if (next->items)
1480 				next_first = next->items[0];
1481 			else
1482 			{
1483 				Assert(next->seg != NULL);
1484 				next_first = next->seg->first;
1485 			}
1486 
1487 			nthis = 0;
1488 			while (nthis < newleft && ginCompareItemPointers(&nextnew[nthis], &next_first) < 0)
1489 				nthis++;
1490 		}
1491 		if (nthis == 0)
1492 			continue;
1493 
1494 		/* Merge the new items with the existing items. */
1495 		if (!cur->items)
1496 			cur->items = ginPostingListDecode(cur->seg, &cur->nitems);
1497 
1498 		/*
1499 		 * Fast path for the important special case that we're appending to
1500 		 * the end of the page: don't let the last segment on the page grow
1501 		 * larger than the target, create a new segment before that happens.
1502 		 */
1503 		if (!dlist_has_next(&leaf->segments, iter.cur) &&
1504 			ginCompareItemPointers(&cur->items[cur->nitems - 1], &nextnew[0]) < 0 &&
1505 			cur->seg != NULL &&
1506 			SizeOfGinPostingList(cur->seg) >= GinPostingListSegmentTargetSize)
1507 		{
1508 			newseg = palloc(sizeof(leafSegmentInfo));
1509 			newseg->seg = NULL;
1510 			newseg->items = nextnew;
1511 			newseg->nitems = nthis;
1512 			newseg->action = GIN_SEGMENT_INSERT;
1513 			dlist_push_tail(&leaf->segments, &newseg->node);
1514 			modified = true;
1515 			break;
1516 		}
1517 
1518 		tmpitems = ginMergeItemPointers(cur->items, cur->nitems,
1519 										nextnew, nthis,
1520 										&ntmpitems);
1521 		if (ntmpitems != cur->nitems)
1522 		{
1523 			/*
1524 			 * If there are no duplicates, track the added items so that we
1525 			 * can emit a compact ADDITEMS WAL record later on. (it doesn't
1526 			 * seem worth re-checking which items were duplicates, if there
1527 			 * were any)
1528 			 */
1529 			if (ntmpitems == nthis + cur->nitems &&
1530 				cur->action == GIN_SEGMENT_UNMODIFIED)
1531 			{
1532 				cur->action = GIN_SEGMENT_ADDITEMS;
1533 				cur->modifieditems = nextnew;
1534 				cur->nmodifieditems = nthis;
1535 			}
1536 			else
1537 				cur->action = GIN_SEGMENT_REPLACE;
1538 
1539 			cur->items = tmpitems;
1540 			cur->nitems = ntmpitems;
1541 			cur->seg = NULL;
1542 			modified = true;
1543 		}
1544 
1545 		nextnew += nthis;
1546 		newleft -= nthis;
1547 		if (newleft == 0)
1548 			break;
1549 	}
1550 
1551 	return modified;
1552 }
1553 
1554 /*
1555  * Recompresses all segments that have been modified.
1556  *
1557  * If not all the items fit on two pages (ie. after split), we store as
1558  * many items as fit, and set *remaining to the first item that didn't fit.
1559  * If all items fit, *remaining is set to invalid.
1560  *
1561  * Returns true if the page has to be split.
1562  */
1563 static bool
leafRepackItems(disassembledLeaf * leaf,ItemPointer remaining)1564 leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining)
1565 {
1566 	int			pgused = 0;
1567 	bool		needsplit = false;
1568 	dlist_iter	iter;
1569 	int			segsize;
1570 	leafSegmentInfo *nextseg;
1571 	int			npacked;
1572 	bool		modified;
1573 	dlist_node *cur_node;
1574 	dlist_node *next_node;
1575 
1576 	ItemPointerSetInvalid(remaining);
1577 
1578 	/*
1579 	 * cannot use dlist_foreach_modify here because we insert adjacent items
1580 	 * while iterating.
1581 	 */
1582 	for (cur_node = dlist_head_node(&leaf->segments);
1583 		 cur_node != NULL;
1584 		 cur_node = next_node)
1585 	{
1586 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1587 												   cur_node);
1588 
1589 		if (dlist_has_next(&leaf->segments, cur_node))
1590 			next_node = dlist_next_node(&leaf->segments, cur_node);
1591 		else
1592 			next_node = NULL;
1593 
1594 		/* Compress the posting list, if necessary */
1595 		if (seginfo->action != GIN_SEGMENT_DELETE)
1596 		{
1597 			if (seginfo->seg == NULL)
1598 			{
1599 				if (seginfo->nitems > GinPostingListSegmentMaxSize)
1600 					npacked = 0;	/* no chance that it would fit. */
1601 				else
1602 				{
1603 					seginfo->seg = ginCompressPostingList(seginfo->items,
1604 														  seginfo->nitems,
1605 														  GinPostingListSegmentMaxSize,
1606 														  &npacked);
1607 				}
1608 				if (npacked != seginfo->nitems)
1609 				{
1610 					/*
1611 					 * Too large. Compress again to the target size, and
1612 					 * create a new segment to represent the remaining items.
1613 					 * The new segment is inserted after this one, so it will
1614 					 * be processed in the next iteration of this loop.
1615 					 */
1616 					if (seginfo->seg)
1617 						pfree(seginfo->seg);
1618 					seginfo->seg = ginCompressPostingList(seginfo->items,
1619 														  seginfo->nitems,
1620 														  GinPostingListSegmentTargetSize,
1621 														  &npacked);
1622 					if (seginfo->action != GIN_SEGMENT_INSERT)
1623 						seginfo->action = GIN_SEGMENT_REPLACE;
1624 
1625 					nextseg = palloc(sizeof(leafSegmentInfo));
1626 					nextseg->action = GIN_SEGMENT_INSERT;
1627 					nextseg->seg = NULL;
1628 					nextseg->items = &seginfo->items[npacked];
1629 					nextseg->nitems = seginfo->nitems - npacked;
1630 					next_node = &nextseg->node;
1631 					dlist_insert_after(cur_node, next_node);
1632 				}
1633 			}
1634 
1635 			/*
1636 			 * If the segment is very small, merge it with the next segment.
1637 			 */
1638 			if (SizeOfGinPostingList(seginfo->seg) < GinPostingListSegmentMinSize && next_node)
1639 			{
1640 				int			nmerged;
1641 
1642 				nextseg = dlist_container(leafSegmentInfo, node, next_node);
1643 
1644 				if (seginfo->items == NULL)
1645 					seginfo->items = ginPostingListDecode(seginfo->seg,
1646 														  &seginfo->nitems);
1647 				if (nextseg->items == NULL)
1648 					nextseg->items = ginPostingListDecode(nextseg->seg,
1649 														  &nextseg->nitems);
1650 				nextseg->items =
1651 					ginMergeItemPointers(seginfo->items, seginfo->nitems,
1652 										 nextseg->items, nextseg->nitems,
1653 										 &nmerged);
1654 				Assert(nmerged == seginfo->nitems + nextseg->nitems);
1655 				nextseg->nitems = nmerged;
1656 				nextseg->seg = NULL;
1657 
1658 				nextseg->action = GIN_SEGMENT_REPLACE;
1659 				nextseg->modifieditems = NULL;
1660 				nextseg->nmodifieditems = 0;
1661 
1662 				if (seginfo->action == GIN_SEGMENT_INSERT)
1663 				{
1664 					dlist_delete(cur_node);
1665 					continue;
1666 				}
1667 				else
1668 				{
1669 					seginfo->action = GIN_SEGMENT_DELETE;
1670 					seginfo->seg = NULL;
1671 				}
1672 			}
1673 
1674 			seginfo->items = NULL;
1675 			seginfo->nitems = 0;
1676 		}
1677 
1678 		if (seginfo->action == GIN_SEGMENT_DELETE)
1679 			continue;
1680 
1681 		/*
1682 		 * OK, we now have a compressed version of this segment ready for
1683 		 * copying to the page. Did we exceed the size that fits on one page?
1684 		 */
1685 		segsize = SizeOfGinPostingList(seginfo->seg);
1686 		if (pgused + segsize > GinDataPageMaxDataSize)
1687 		{
1688 			if (!needsplit)
1689 			{
1690 				/* switch to right page */
1691 				Assert(pgused > 0);
1692 				leaf->lastleft = dlist_prev_node(&leaf->segments, cur_node);
1693 				needsplit = true;
1694 				leaf->lsize = pgused;
1695 				pgused = 0;
1696 			}
1697 			else
1698 			{
1699 				/*
1700 				 * Filled both pages. The last segment we constructed did not
1701 				 * fit.
1702 				 */
1703 				*remaining = seginfo->seg->first;
1704 
1705 				/*
1706 				 * remove all segments that did not fit from the list.
1707 				 */
1708 				while (dlist_has_next(&leaf->segments, cur_node))
1709 					dlist_delete(dlist_next_node(&leaf->segments, cur_node));
1710 				dlist_delete(cur_node);
1711 				break;
1712 			}
1713 		}
1714 
1715 		pgused += segsize;
1716 	}
1717 
1718 	if (!needsplit)
1719 	{
1720 		leaf->lsize = pgused;
1721 		leaf->rsize = 0;
1722 	}
1723 	else
1724 		leaf->rsize = pgused;
1725 
1726 	Assert(leaf->lsize <= GinDataPageMaxDataSize);
1727 	Assert(leaf->rsize <= GinDataPageMaxDataSize);
1728 
1729 	/*
1730 	 * Make a palloc'd copy of every segment after the first modified one,
1731 	 * because as we start copying items to the original page, we might
1732 	 * overwrite an existing segment.
1733 	 */
1734 	modified = false;
1735 	dlist_foreach(iter, &leaf->segments)
1736 	{
1737 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1738 												   iter.cur);
1739 
1740 		if (!modified && seginfo->action != GIN_SEGMENT_UNMODIFIED)
1741 		{
1742 			modified = true;
1743 		}
1744 		else if (modified && seginfo->action == GIN_SEGMENT_UNMODIFIED)
1745 		{
1746 			GinPostingList *tmp;
1747 
1748 			segsize = SizeOfGinPostingList(seginfo->seg);
1749 			tmp = palloc(segsize);
1750 			memcpy(tmp, seginfo->seg, segsize);
1751 			seginfo->seg = tmp;
1752 		}
1753 	}
1754 
1755 	return needsplit;
1756 }
1757 
1758 
1759 /*** Functions that are exported to the rest of the GIN code ***/
1760 
1761 /*
1762  * Creates new posting tree containing the given TIDs. Returns the page
1763  * number of the root of the new posting tree.
1764  *
1765  * items[] must be in sorted order with no duplicates.
1766  */
1767 BlockNumber
createPostingTree(Relation index,ItemPointerData * items,uint32 nitems,GinStatsData * buildStats)1768 createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
1769 				  GinStatsData *buildStats)
1770 {
1771 	BlockNumber blkno;
1772 	Buffer		buffer;
1773 	Page		tmppage;
1774 	Page		page;
1775 	Pointer		ptr;
1776 	int			nrootitems;
1777 	int			rootsize;
1778 
1779 	/* Construct the new root page in memory first. */
1780 	tmppage = (Page) palloc(BLCKSZ);
1781 	GinInitPage(tmppage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1782 	GinPageGetOpaque(tmppage)->rightlink = InvalidBlockNumber;
1783 
1784 	/*
1785 	 * Write as many of the items to the root page as fit. In segments of max
1786 	 * GinPostingListSegmentMaxSize bytes each.
1787 	 */
1788 	nrootitems = 0;
1789 	rootsize = 0;
1790 	ptr = (Pointer) GinDataLeafPageGetPostingList(tmppage);
1791 	while (nrootitems < nitems)
1792 	{
1793 		GinPostingList *segment;
1794 		int			npacked;
1795 		int			segsize;
1796 
1797 		segment = ginCompressPostingList(&items[nrootitems],
1798 										 nitems - nrootitems,
1799 										 GinPostingListSegmentMaxSize,
1800 										 &npacked);
1801 		segsize = SizeOfGinPostingList(segment);
1802 		if (rootsize + segsize > GinDataPageMaxDataSize)
1803 			break;
1804 
1805 		memcpy(ptr, segment, segsize);
1806 		ptr += segsize;
1807 		rootsize += segsize;
1808 		nrootitems += npacked;
1809 		pfree(segment);
1810 	}
1811 	GinDataPageSetDataSize(tmppage, rootsize);
1812 
1813 	/*
1814 	 * All set. Get a new physical page, and copy the in-memory page to it.
1815 	 */
1816 	buffer = GinNewBuffer(index);
1817 	page = BufferGetPage(buffer);
1818 	blkno = BufferGetBlockNumber(buffer);
1819 
1820 	START_CRIT_SECTION();
1821 
1822 	PageRestoreTempPage(tmppage, page);
1823 	MarkBufferDirty(buffer);
1824 
1825 	if (RelationNeedsWAL(index))
1826 	{
1827 		XLogRecPtr	recptr;
1828 		ginxlogCreatePostingTree data;
1829 
1830 		data.size = rootsize;
1831 
1832 		XLogBeginInsert();
1833 		XLogRegisterData((char *) &data, sizeof(ginxlogCreatePostingTree));
1834 
1835 		XLogRegisterData((char *) GinDataLeafPageGetPostingList(page),
1836 						 rootsize);
1837 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
1838 
1839 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE);
1840 		PageSetLSN(page, recptr);
1841 	}
1842 
1843 	UnlockReleaseBuffer(buffer);
1844 
1845 	END_CRIT_SECTION();
1846 
1847 	/* During index build, count the newly-added data page */
1848 	if (buildStats)
1849 		buildStats->nDataPages++;
1850 
1851 	elog(DEBUG2, "created GIN posting tree with %d items", nrootitems);
1852 
1853 	/*
1854 	 * Add any remaining TIDs to the newly-created posting tree.
1855 	 */
1856 	if (nitems > nrootitems)
1857 	{
1858 		ginInsertItemPointers(index, blkno,
1859 							  items + nrootitems,
1860 							  nitems - nrootitems,
1861 							  buildStats);
1862 	}
1863 
1864 	return blkno;
1865 }
1866 
1867 void
ginPrepareDataScan(GinBtree btree,Relation index,BlockNumber rootBlkno)1868 ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
1869 {
1870 	memset(btree, 0, sizeof(GinBtreeData));
1871 
1872 	btree->index = index;
1873 	btree->rootBlkno = rootBlkno;
1874 
1875 	btree->findChildPage = dataLocateItem;
1876 	btree->getLeftMostChild = dataGetLeftMostPage;
1877 	btree->isMoveRight = dataIsMoveRight;
1878 	btree->findItem = NULL;
1879 	btree->findChildPtr = dataFindChildPtr;
1880 	btree->beginPlaceToPage = dataBeginPlaceToPage;
1881 	btree->execPlaceToPage = dataExecPlaceToPage;
1882 	btree->fillRoot = ginDataFillRoot;
1883 	btree->prepareDownlink = dataPrepareDownlink;
1884 
1885 	btree->isData = TRUE;
1886 	btree->fullScan = FALSE;
1887 	btree->isBuild = FALSE;
1888 }
1889 
1890 /*
1891  * Inserts array of item pointers, may execute several tree scan (very rare)
1892  */
1893 void
ginInsertItemPointers(Relation index,BlockNumber rootBlkno,ItemPointerData * items,uint32 nitem,GinStatsData * buildStats)1894 ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
1895 					  ItemPointerData *items, uint32 nitem,
1896 					  GinStatsData *buildStats)
1897 {
1898 	GinBtreeData btree;
1899 	GinBtreeDataLeafInsertData insertdata;
1900 	GinBtreeStack *stack;
1901 
1902 	ginPrepareDataScan(&btree, index, rootBlkno);
1903 	btree.isBuild = (buildStats != NULL);
1904 	insertdata.items = items;
1905 	insertdata.nitem = nitem;
1906 	insertdata.curitem = 0;
1907 
1908 	while (insertdata.curitem < insertdata.nitem)
1909 	{
1910 		/* search for the leaf page where the first item should go to */
1911 		btree.itemptr = insertdata.items[insertdata.curitem];
1912 		stack = ginFindLeafPage(&btree, false, NULL);
1913 
1914 		ginInsertValue(&btree, stack, &insertdata, buildStats);
1915 	}
1916 }
1917 
1918 /*
1919  * Starts a new scan on a posting tree.
1920  */
1921 GinBtreeStack *
ginScanBeginPostingTree(GinBtree btree,Relation index,BlockNumber rootBlkno,Snapshot snapshot)1922 ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
1923 						Snapshot snapshot)
1924 {
1925 	GinBtreeStack *stack;
1926 
1927 	ginPrepareDataScan(btree, index, rootBlkno);
1928 
1929 	btree->fullScan = TRUE;
1930 
1931 	stack = ginFindLeafPage(btree, TRUE, snapshot);
1932 
1933 	return stack;
1934 }
1935