1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *	  routines for handling GIN posting tree pages.
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/gindatapage.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/xloginsert.h"
19 #include "lib/ilist.h"
20 #include "miscadmin.h"
21 #include "utils/rel.h"
22 
23 /*
24  * Min, Max and Target size of posting lists stored on leaf pages, in bytes.
25  *
26  * The code can deal with any size, but random access is more efficient when
27  * a number of smaller lists are stored, rather than one big list. If a
28  * posting list would become larger than Max size as a result of insertions,
29  * it is split into two. If a posting list would be smaller than minimum
30  * size, it is merged with the next posting list.
31  */
32 #define GinPostingListSegmentMaxSize 384
33 #define GinPostingListSegmentTargetSize 256
34 #define GinPostingListSegmentMinSize 128
35 
36 /*
37  * At least this many items fit in a GinPostingListSegmentMaxSize-bytes
38  * long segment. This is used when estimating how much space is required
39  * for N items, at minimum.
40  */
41 #define MinTuplesPerSegment ((GinPostingListSegmentMaxSize - 2) / 6)
42 
43 /*
44  * A working struct for manipulating a posting tree leaf page.
45  */
46 typedef struct
47 {
48 	dlist_head	segments;		/* a list of leafSegmentInfos */
49 
50 	/*
51 	 * The following fields represent how the segments are split across pages,
52 	 * if a page split is required. Filled in by leafRepackItems.
53 	 */
54 	dlist_node *lastleft;		/* last segment on left page */
55 	int			lsize;			/* total size on left page */
56 	int			rsize;			/* total size on right page */
57 
58 	bool		oldformat;		/* page is in pre-9.4 format on disk */
59 
60 	/*
61 	 * If we need WAL data representing the reconstructed leaf page, it's
62 	 * stored here by computeLeafRecompressWALData.
63 	 */
64 	char	   *walinfo;		/* buffer start */
65 	int			walinfolen;		/* and length */
66 } disassembledLeaf;
67 
68 typedef struct
69 {
70 	dlist_node	node;			/* linked list pointers */
71 
72 	/*-------------
73 	 * 'action' indicates the status of this in-memory segment, compared to
74 	 * what's on disk. It is one of the GIN_SEGMENT_* action codes:
75 	 *
76 	 * UNMODIFIED	no changes
77 	 * DELETE		the segment is to be removed. 'seg' and 'items' are
78 	 *				ignored
79 	 * INSERT		this is a completely new segment
80 	 * REPLACE		this replaces an existing segment with new content
81 	 * ADDITEMS		like REPLACE, but no items have been removed, and we track
82 	 *				in detail what items have been added to this segment, in
83 	 *				'modifieditems'
84 	 *-------------
85 	 */
86 	char		action;
87 
88 	ItemPointerData *modifieditems;
89 	uint16		nmodifieditems;
90 
91 	/*
92 	 * The following fields represent the items in this segment. If 'items' is
93 	 * not NULL, it contains a palloc'd array of the itemsin this segment. If
94 	 * 'seg' is not NULL, it contains the items in an already-compressed
95 	 * format. It can point to an on-disk page (!modified), or a palloc'd
96 	 * segment in memory. If both are set, they must represent the same items.
97 	 */
98 	GinPostingList *seg;
99 	ItemPointer items;
100 	int			nitems;			/* # of items in 'items', if items != NULL */
101 } leafSegmentInfo;
102 
103 static ItemPointer dataLeafPageGetUncompressed(Page page, int *nitems);
104 static void dataSplitPageInternal(GinBtree btree, Buffer origbuf,
105 					  GinBtreeStack *stack,
106 					  void *insertdata, BlockNumber updateblkno,
107 					  Page *newlpage, Page *newrpage);
108 
109 static disassembledLeaf *disassembleLeaf(Page page);
110 static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining);
111 static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
112 			   int nNewItems);
113 
114 static void computeLeafRecompressWALData(disassembledLeaf *leaf);
115 static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
116 static void dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
117 						 ItemPointerData lbound, ItemPointerData rbound,
118 						 Page lpage, Page rpage);
119 
120 /*
121  * Read TIDs from leaf data page to single uncompressed array. The TIDs are
122  * returned in ascending order.
123  *
124  * advancePast is a hint, indicating that the caller is only interested in
125  * TIDs > advancePast. To return all items, use ItemPointerSetMin.
126  *
127  * Note: This function can still return items smaller than advancePast that
128  * are in the same posting list as the items of interest, so the caller must
129  * still check all the returned items. But passing it allows this function to
130  * skip whole posting lists.
131  */
132 ItemPointer
GinDataLeafPageGetItems(Page page,int * nitems,ItemPointerData advancePast)133 GinDataLeafPageGetItems(Page page, int *nitems, ItemPointerData advancePast)
134 {
135 	ItemPointer result;
136 
137 	if (GinPageIsCompressed(page))
138 	{
139 		GinPostingList *seg = GinDataLeafPageGetPostingList(page);
140 		Size		len = GinDataLeafPageGetPostingListSize(page);
141 		Pointer		endptr = ((Pointer) seg) + len;
142 		GinPostingList *next;
143 
144 		/* Skip to the segment containing advancePast+1 */
145 		if (ItemPointerIsValid(&advancePast))
146 		{
147 			next = GinNextPostingListSegment(seg);
148 			while ((Pointer) next < endptr &&
149 				   ginCompareItemPointers(&next->first, &advancePast) <= 0)
150 			{
151 				seg = next;
152 				next = GinNextPostingListSegment(seg);
153 			}
154 			len = endptr - (Pointer) seg;
155 		}
156 
157 		if (len > 0)
158 			result = ginPostingListDecodeAllSegments(seg, len, nitems);
159 		else
160 		{
161 			result = NULL;
162 			*nitems = 0;
163 		}
164 	}
165 	else
166 	{
167 		ItemPointer tmp = dataLeafPageGetUncompressed(page, nitems);
168 
169 		result = palloc((*nitems) * sizeof(ItemPointerData));
170 		memcpy(result, tmp, (*nitems) * sizeof(ItemPointerData));
171 	}
172 
173 	return result;
174 }
175 
176 /*
177  * Places all TIDs from leaf data page to bitmap.
178  */
179 int
GinDataLeafPageGetItemsToTbm(Page page,TIDBitmap * tbm)180 GinDataLeafPageGetItemsToTbm(Page page, TIDBitmap *tbm)
181 {
182 	ItemPointer uncompressed;
183 	int			nitems;
184 
185 	if (GinPageIsCompressed(page))
186 	{
187 		GinPostingList *segment = GinDataLeafPageGetPostingList(page);
188 		Size		len = GinDataLeafPageGetPostingListSize(page);
189 
190 		nitems = ginPostingListDecodeAllSegmentsToTbm(segment, len, tbm);
191 	}
192 	else
193 	{
194 		uncompressed = dataLeafPageGetUncompressed(page, &nitems);
195 
196 		if (nitems > 0)
197 			tbm_add_tuples(tbm, uncompressed, nitems, false);
198 	}
199 
200 	return nitems;
201 }
202 
203 /*
204  * Get pointer to the uncompressed array of items on a pre-9.4 format
205  * uncompressed leaf page. The number of items in the array is returned in
206  * *nitems.
207  */
208 static ItemPointer
dataLeafPageGetUncompressed(Page page,int * nitems)209 dataLeafPageGetUncompressed(Page page, int *nitems)
210 {
211 	ItemPointer items;
212 
213 	Assert(!GinPageIsCompressed(page));
214 
215 	/*
216 	 * In the old pre-9.4 page format, the whole page content is used for
217 	 * uncompressed items, and the number of items is stored in 'maxoff'
218 	 */
219 	items = (ItemPointer) GinDataPageGetData(page);
220 	*nitems = GinPageGetOpaque(page)->maxoff;
221 
222 	return items;
223 }
224 
225 /*
226  * Check if we should follow the right link to find the item we're searching
227  * for.
228  *
229  * Compares inserting item pointer with the right bound of the current page.
230  */
231 static bool
dataIsMoveRight(GinBtree btree,Page page)232 dataIsMoveRight(GinBtree btree, Page page)
233 {
234 	ItemPointer iptr = GinDataPageGetRightBound(page);
235 
236 	if (GinPageRightMost(page))
237 		return FALSE;
238 
239 	if (GinPageIsDeleted(page))
240 		return TRUE;
241 
242 	return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
243 }
244 
245 /*
246  * Find correct PostingItem in non-leaf page. It is assumed that this is
247  * the correct page, and the searched value SHOULD be on the page.
248  */
249 static BlockNumber
dataLocateItem(GinBtree btree,GinBtreeStack * stack)250 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
251 {
252 	OffsetNumber low,
253 				high,
254 				maxoff;
255 	PostingItem *pitem = NULL;
256 	int			result;
257 	Page		page = BufferGetPage(stack->buffer);
258 
259 	Assert(!GinPageIsLeaf(page));
260 	Assert(GinPageIsData(page));
261 
262 	if (btree->fullScan)
263 	{
264 		stack->off = FirstOffsetNumber;
265 		stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
266 		return btree->getLeftMostChild(btree, page);
267 	}
268 
269 	low = FirstOffsetNumber;
270 	maxoff = high = GinPageGetOpaque(page)->maxoff;
271 	Assert(high >= low);
272 
273 	high++;
274 
275 	while (high > low)
276 	{
277 		OffsetNumber mid = low + ((high - low) / 2);
278 
279 		pitem = GinDataPageGetPostingItem(page, mid);
280 
281 		if (mid == maxoff)
282 		{
283 			/*
284 			 * Right infinity, page already correctly chosen with a help of
285 			 * dataIsMoveRight
286 			 */
287 			result = -1;
288 		}
289 		else
290 		{
291 			pitem = GinDataPageGetPostingItem(page, mid);
292 			result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
293 		}
294 
295 		if (result == 0)
296 		{
297 			stack->off = mid;
298 			return PostingItemGetBlockNumber(pitem);
299 		}
300 		else if (result > 0)
301 			low = mid + 1;
302 		else
303 			high = mid;
304 	}
305 
306 	Assert(high >= FirstOffsetNumber && high <= maxoff);
307 
308 	stack->off = high;
309 	pitem = GinDataPageGetPostingItem(page, high);
310 	return PostingItemGetBlockNumber(pitem);
311 }
312 
313 /*
314  * Find link to blkno on non-leaf page, returns offset of PostingItem
315  */
316 static OffsetNumber
dataFindChildPtr(GinBtree btree,Page page,BlockNumber blkno,OffsetNumber storedOff)317 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
318 {
319 	OffsetNumber i,
320 				maxoff = GinPageGetOpaque(page)->maxoff;
321 	PostingItem *pitem;
322 
323 	Assert(!GinPageIsLeaf(page));
324 	Assert(GinPageIsData(page));
325 
326 	/* if page isn't changed, we return storedOff */
327 	if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
328 	{
329 		pitem = GinDataPageGetPostingItem(page, storedOff);
330 		if (PostingItemGetBlockNumber(pitem) == blkno)
331 			return storedOff;
332 
333 		/*
334 		 * we hope, that needed pointer goes to right. It's true if there
335 		 * wasn't a deletion
336 		 */
337 		for (i = storedOff + 1; i <= maxoff; i++)
338 		{
339 			pitem = GinDataPageGetPostingItem(page, i);
340 			if (PostingItemGetBlockNumber(pitem) == blkno)
341 				return i;
342 		}
343 
344 		maxoff = storedOff - 1;
345 	}
346 
347 	/* last chance */
348 	for (i = FirstOffsetNumber; i <= maxoff; i++)
349 	{
350 		pitem = GinDataPageGetPostingItem(page, i);
351 		if (PostingItemGetBlockNumber(pitem) == blkno)
352 			return i;
353 	}
354 
355 	return InvalidOffsetNumber;
356 }
357 
358 /*
359  * Return blkno of leftmost child
360  */
361 static BlockNumber
dataGetLeftMostPage(GinBtree btree,Page page)362 dataGetLeftMostPage(GinBtree btree, Page page)
363 {
364 	PostingItem *pitem;
365 
366 	Assert(!GinPageIsLeaf(page));
367 	Assert(GinPageIsData(page));
368 	Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
369 
370 	pitem = GinDataPageGetPostingItem(page, FirstOffsetNumber);
371 	return PostingItemGetBlockNumber(pitem);
372 }
373 
374 /*
375  * Add PostingItem to a non-leaf page.
376  */
377 void
GinDataPageAddPostingItem(Page page,PostingItem * data,OffsetNumber offset)378 GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
379 {
380 	OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
381 	char	   *ptr;
382 
383 	Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
384 	Assert(!GinPageIsLeaf(page));
385 
386 	if (offset == InvalidOffsetNumber)
387 	{
388 		ptr = (char *) GinDataPageGetPostingItem(page, maxoff + 1);
389 	}
390 	else
391 	{
392 		ptr = (char *) GinDataPageGetPostingItem(page, offset);
393 		if (offset != maxoff + 1)
394 			memmove(ptr + sizeof(PostingItem),
395 					ptr,
396 					(maxoff - offset + 1) *sizeof(PostingItem));
397 	}
398 	memcpy(ptr, data, sizeof(PostingItem));
399 
400 	maxoff++;
401 	GinPageGetOpaque(page)->maxoff = maxoff;
402 
403 	/*
404 	 * Also set pd_lower to the end of the posting items, to follow the
405 	 * "standard" page layout, so that we can squeeze out the unused space
406 	 * from full-page images.
407 	 */
408 	GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
409 }
410 
411 /*
412  * Delete posting item from non-leaf page
413  */
414 void
GinPageDeletePostingItem(Page page,OffsetNumber offset)415 GinPageDeletePostingItem(Page page, OffsetNumber offset)
416 {
417 	OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
418 
419 	Assert(!GinPageIsLeaf(page));
420 	Assert(offset >= FirstOffsetNumber && offset <= maxoff);
421 
422 	if (offset != maxoff)
423 		memmove(GinDataPageGetPostingItem(page, offset),
424 				GinDataPageGetPostingItem(page, offset + 1),
425 				sizeof(PostingItem) * (maxoff - offset));
426 
427 	maxoff--;
428 	GinPageGetOpaque(page)->maxoff = maxoff;
429 
430 	GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
431 }
432 
433 /*
434  * Prepare to insert data on a leaf data page.
435  *
436  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
437  * before we enter the insertion critical section.  *ptp_workspace can be
438  * set to pass information along to the execPlaceToPage function.
439  *
440  * If it won't fit, perform a page split and return two temporary page
441  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
442  *
443  * In neither case should the given page buffer be modified here.
444  */
445 static GinPlaceToPageRC
dataBeginPlaceToPageLeaf(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,void ** ptp_workspace,Page * newlpage,Page * newrpage)446 dataBeginPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
447 						 void *insertdata,
448 						 void **ptp_workspace,
449 						 Page *newlpage, Page *newrpage)
450 {
451 	GinBtreeDataLeafInsertData *items = insertdata;
452 	ItemPointer newItems = &items->items[items->curitem];
453 	int			maxitems = items->nitem - items->curitem;
454 	Page		page = BufferGetPage(buf);
455 	int			i;
456 	ItemPointerData rbound;
457 	ItemPointerData lbound;
458 	bool		needsplit;
459 	bool		append;
460 	int			segsize;
461 	Size		freespace;
462 	disassembledLeaf *leaf;
463 	leafSegmentInfo *lastleftinfo;
464 	ItemPointerData maxOldItem;
465 	ItemPointerData remaining;
466 
467 	rbound = *GinDataPageGetRightBound(page);
468 
469 	/*
470 	 * Count how many of the new items belong to this page.
471 	 */
472 	if (!GinPageRightMost(page))
473 	{
474 		for (i = 0; i < maxitems; i++)
475 		{
476 			if (ginCompareItemPointers(&newItems[i], &rbound) > 0)
477 			{
478 				/*
479 				 * This needs to go to some other location in the tree. (The
480 				 * caller should've chosen the insert location so that at
481 				 * least the first item goes here.)
482 				 */
483 				Assert(i > 0);
484 				break;
485 			}
486 		}
487 		maxitems = i;
488 	}
489 
490 	/* Disassemble the data on the page */
491 	leaf = disassembleLeaf(page);
492 
493 	/*
494 	 * Are we appending to the end of the page? IOW, are all the new items
495 	 * larger than any of the existing items.
496 	 */
497 	if (!dlist_is_empty(&leaf->segments))
498 	{
499 		lastleftinfo = dlist_container(leafSegmentInfo, node,
500 									   dlist_tail_node(&leaf->segments));
501 		if (!lastleftinfo->items)
502 			lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
503 													   &lastleftinfo->nitems);
504 		maxOldItem = lastleftinfo->items[lastleftinfo->nitems - 1];
505 		if (ginCompareItemPointers(&newItems[0], &maxOldItem) >= 0)
506 			append = true;
507 		else
508 			append = false;
509 	}
510 	else
511 	{
512 		ItemPointerSetMin(&maxOldItem);
513 		append = true;
514 	}
515 
516 	/*
517 	 * If we're appending to the end of the page, we will append as many items
518 	 * as we can fit (after splitting), and stop when the pages becomes full.
519 	 * Otherwise we have to limit the number of new items to insert, because
520 	 * once we start packing we can't just stop when we run out of space,
521 	 * because we must make sure that all the old items still fit.
522 	 */
523 	if (GinPageIsCompressed(page))
524 		freespace = GinDataLeafPageGetFreeSpace(page);
525 	else
526 		freespace = 0;
527 	if (append)
528 	{
529 		/*
530 		 * Even when appending, trying to append more items than will fit is
531 		 * not completely free, because we will merge the new items and old
532 		 * items into an array below. In the best case, every new item fits in
533 		 * a single byte, and we can use all the free space on the old page as
534 		 * well as the new page. For simplicity, ignore segment overhead etc.
535 		 */
536 		maxitems = Min(maxitems, freespace + GinDataPageMaxDataSize);
537 	}
538 	else
539 	{
540 		/*
541 		 * Calculate a conservative estimate of how many new items we can fit
542 		 * on the two pages after splitting.
543 		 *
544 		 * We can use any remaining free space on the old page to store full
545 		 * segments, as well as the new page. Each full-sized segment can hold
546 		 * at least MinTuplesPerSegment items
547 		 */
548 		int			nnewsegments;
549 
550 		nnewsegments = freespace / GinPostingListSegmentMaxSize;
551 		nnewsegments += GinDataPageMaxDataSize / GinPostingListSegmentMaxSize;
552 		maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
553 	}
554 
555 	/* Add the new items to the segment list */
556 	if (!addItemsToLeaf(leaf, newItems, maxitems))
557 	{
558 		/* all items were duplicates, we have nothing to do */
559 		items->curitem += maxitems;
560 
561 		return GPTP_NO_WORK;
562 	}
563 
564 	/*
565 	 * Pack the items back to compressed segments, ready for writing to disk.
566 	 */
567 	needsplit = leafRepackItems(leaf, &remaining);
568 
569 	/*
570 	 * Did all the new items fit?
571 	 *
572 	 * If we're appending, it's OK if they didn't. But as a sanity check,
573 	 * verify that all the old items fit.
574 	 */
575 	if (ItemPointerIsValid(&remaining))
576 	{
577 		if (!append || ItemPointerCompare(&maxOldItem, &remaining) >= 0)
578 			elog(ERROR, "could not split GIN page; all old items didn't fit");
579 
580 		/* Count how many of the new items did fit. */
581 		for (i = 0; i < maxitems; i++)
582 		{
583 			if (ginCompareItemPointers(&newItems[i], &remaining) >= 0)
584 				break;
585 		}
586 		if (i == 0)
587 			elog(ERROR, "could not split GIN page; no new items fit");
588 		maxitems = i;
589 	}
590 
591 	if (!needsplit)
592 	{
593 		/*
594 		 * Great, all the items fit on a single page.  If needed, prepare data
595 		 * for a WAL record describing the changes we'll make.
596 		 */
597 		if (RelationNeedsWAL(btree->index))
598 			computeLeafRecompressWALData(leaf);
599 
600 		/*
601 		 * We're ready to enter the critical section, but
602 		 * dataExecPlaceToPageLeaf will need access to the "leaf" data.
603 		 */
604 		*ptp_workspace = leaf;
605 
606 		if (append)
607 			elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
608 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
609 				 items->nitem - items->curitem - maxitems);
610 		else
611 			elog(DEBUG2, "inserted %d new items to block %u; %d bytes (%d to go)",
612 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
613 				 items->nitem - items->curitem - maxitems);
614 	}
615 	else
616 	{
617 		/*
618 		 * Have to split.
619 		 *
620 		 * leafRepackItems already divided the segments between the left and
621 		 * the right page. It filled the left page as full as possible, and
622 		 * put the rest to the right page. When building a new index, that's
623 		 * good, because the table is scanned from beginning to end and there
624 		 * won't be any more insertions to the left page during the build.
625 		 * This packs the index as tight as possible. But otherwise, split
626 		 * 50/50, by moving segments from the left page to the right page
627 		 * until they're balanced.
628 		 *
629 		 * As a further heuristic, when appending items to the end of the
630 		 * page, try to make the left page 75% full, on the assumption that
631 		 * subsequent insertions will probably also go to the end. This packs
632 		 * the index somewhat tighter when appending to a table, which is very
633 		 * common.
634 		 */
635 		if (!btree->isBuild)
636 		{
637 			while (dlist_has_prev(&leaf->segments, leaf->lastleft))
638 			{
639 				lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
640 
641 				/* ignore deleted segments */
642 				if (lastleftinfo->action != GIN_SEGMENT_DELETE)
643 				{
644 					segsize = SizeOfGinPostingList(lastleftinfo->seg);
645 
646 					/*
647 					 * Note that we check that the right page doesn't become
648 					 * more full than the left page even when appending. It's
649 					 * possible that we added enough items to make both pages
650 					 * more than 75% full.
651 					 */
652 					if ((leaf->lsize - segsize) - (leaf->rsize + segsize) < 0)
653 						break;
654 					if (append)
655 					{
656 						if ((leaf->lsize - segsize) < (BLCKSZ * 3) / 4)
657 							break;
658 					}
659 
660 					leaf->lsize -= segsize;
661 					leaf->rsize += segsize;
662 				}
663 				leaf->lastleft = dlist_prev_node(&leaf->segments, leaf->lastleft);
664 			}
665 		}
666 		Assert(leaf->lsize <= GinDataPageMaxDataSize);
667 		Assert(leaf->rsize <= GinDataPageMaxDataSize);
668 
669 		/*
670 		 * Fetch the max item in the left page's last segment; it becomes the
671 		 * right bound of the page.
672 		 */
673 		lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
674 		if (!lastleftinfo->items)
675 			lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
676 													   &lastleftinfo->nitems);
677 		lbound = lastleftinfo->items[lastleftinfo->nitems - 1];
678 
679 		/*
680 		 * Now allocate a couple of temporary page images, and fill them.
681 		 */
682 		*newlpage = palloc(BLCKSZ);
683 		*newrpage = palloc(BLCKSZ);
684 
685 		dataPlaceToPageLeafSplit(leaf, lbound, rbound,
686 								 *newlpage, *newrpage);
687 
688 		Assert(GinPageRightMost(page) ||
689 			   ginCompareItemPointers(GinDataPageGetRightBound(*newlpage),
690 								   GinDataPageGetRightBound(*newrpage)) < 0);
691 
692 		if (append)
693 			elog(DEBUG2, "appended %d items to block %u; split %d/%d (%d to go)",
694 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
695 				 items->nitem - items->curitem - maxitems);
696 		else
697 			elog(DEBUG2, "inserted %d items to block %u; split %d/%d (%d to go)",
698 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
699 				 items->nitem - items->curitem - maxitems);
700 	}
701 
702 	items->curitem += maxitems;
703 
704 	return needsplit ? GPTP_SPLIT : GPTP_INSERT;
705 }
706 
707 /*
708  * Perform data insertion after beginPlaceToPage has decided it will fit.
709  *
710  * This is invoked within a critical section, and XLOG record creation (if
711  * needed) is already started.  The target buffer is registered in slot 0.
712  */
713 static void
dataExecPlaceToPageLeaf(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,void * ptp_workspace)714 dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
715 						void *insertdata, void *ptp_workspace)
716 {
717 	disassembledLeaf *leaf = (disassembledLeaf *) ptp_workspace;
718 
719 	/* Apply changes to page */
720 	dataPlaceToPageLeafRecompress(buf, leaf);
721 
722 	/* If needed, register WAL data built by computeLeafRecompressWALData */
723 	if (RelationNeedsWAL(btree->index))
724 	{
725 		XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
726 	}
727 }
728 
729 /*
730  * Vacuum a posting tree leaf page.
731  */
732 void
ginVacuumPostingTreeLeaf(Relation indexrel,Buffer buffer,GinVacuumState * gvs)733 ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs)
734 {
735 	Page		page = BufferGetPage(buffer);
736 	disassembledLeaf *leaf;
737 	bool		removedsomething = false;
738 	dlist_iter	iter;
739 
740 	leaf = disassembleLeaf(page);
741 
742 	/* Vacuum each segment. */
743 	dlist_foreach(iter, &leaf->segments)
744 	{
745 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
746 		int			oldsegsize;
747 		ItemPointer cleaned;
748 		int			ncleaned;
749 
750 		if (!seginfo->items)
751 			seginfo->items = ginPostingListDecode(seginfo->seg,
752 												  &seginfo->nitems);
753 		if (seginfo->seg)
754 			oldsegsize = SizeOfGinPostingList(seginfo->seg);
755 		else
756 			oldsegsize = GinDataPageMaxDataSize;
757 
758 		cleaned = ginVacuumItemPointers(gvs,
759 										seginfo->items,
760 										seginfo->nitems,
761 										&ncleaned);
762 		pfree(seginfo->items);
763 		seginfo->items = NULL;
764 		seginfo->nitems = 0;
765 		if (cleaned)
766 		{
767 			if (ncleaned > 0)
768 			{
769 				int			npacked;
770 
771 				seginfo->seg = ginCompressPostingList(cleaned,
772 													  ncleaned,
773 													  oldsegsize,
774 													  &npacked);
775 				/* Removing an item never increases the size of the segment */
776 				if (npacked != ncleaned)
777 					elog(ERROR, "could not fit vacuumed posting list");
778 				seginfo->action = GIN_SEGMENT_REPLACE;
779 			}
780 			else
781 			{
782 				seginfo->seg = NULL;
783 				seginfo->items = NULL;
784 				seginfo->action = GIN_SEGMENT_DELETE;
785 			}
786 			seginfo->nitems = ncleaned;
787 
788 			removedsomething = true;
789 		}
790 	}
791 
792 	/*
793 	 * If we removed any items, reconstruct the page from the pieces.
794 	 *
795 	 * We don't try to re-encode the segments here, even though some of them
796 	 * might be really small now that we've removed some items from them. It
797 	 * seems like a waste of effort, as there isn't really any benefit from
798 	 * larger segments per se; larger segments only help to pack more items in
799 	 * the same space. We might as well delay doing that until the next
800 	 * insertion, which will need to re-encode at least part of the page
801 	 * anyway.
802 	 *
803 	 * Also note if the page was in uncompressed, pre-9.4 format before, it is
804 	 * now represented as one huge segment that contains all the items. It
805 	 * might make sense to split that, to speed up random access, but we don't
806 	 * bother. You'll have to REINDEX anyway if you want the full gain of the
807 	 * new tighter index format.
808 	 */
809 	if (removedsomething)
810 	{
811 		bool		modified;
812 
813 		/*
814 		 * Make sure we have a palloc'd copy of all segments, after the first
815 		 * segment that is modified. (dataPlaceToPageLeafRecompress requires
816 		 * this).
817 		 */
818 		modified = false;
819 		dlist_foreach(iter, &leaf->segments)
820 		{
821 			leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
822 													   iter.cur);
823 
824 			if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
825 				modified = true;
826 			if (modified && seginfo->action != GIN_SEGMENT_DELETE)
827 			{
828 				int			segsize = SizeOfGinPostingList(seginfo->seg);
829 				GinPostingList *tmp = (GinPostingList *) palloc(segsize);
830 
831 				memcpy(tmp, seginfo->seg, segsize);
832 				seginfo->seg = tmp;
833 			}
834 		}
835 
836 		if (RelationNeedsWAL(indexrel))
837 			computeLeafRecompressWALData(leaf);
838 
839 		/* Apply changes to page */
840 		START_CRIT_SECTION();
841 
842 		dataPlaceToPageLeafRecompress(buffer, leaf);
843 
844 		MarkBufferDirty(buffer);
845 
846 		if (RelationNeedsWAL(indexrel))
847 		{
848 			XLogRecPtr	recptr;
849 
850 			XLogBeginInsert();
851 			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
852 			XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
853 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE);
854 			PageSetLSN(page, recptr);
855 		}
856 
857 		END_CRIT_SECTION();
858 	}
859 }
860 
861 /*
862  * Construct a ginxlogRecompressDataLeaf record representing the changes
863  * in *leaf.  (Because this requires a palloc, we have to do it before
864  * we enter the critical section that actually updates the page.)
865  */
866 static void
computeLeafRecompressWALData(disassembledLeaf * leaf)867 computeLeafRecompressWALData(disassembledLeaf *leaf)
868 {
869 	int			nmodified = 0;
870 	char	   *walbufbegin;
871 	char	   *walbufend;
872 	dlist_iter	iter;
873 	int			segno;
874 	ginxlogRecompressDataLeaf *recompress_xlog;
875 
876 	/* Count the modified segments */
877 	dlist_foreach(iter, &leaf->segments)
878 	{
879 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
880 												   iter.cur);
881 
882 		if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
883 			nmodified++;
884 	}
885 
886 	walbufbegin =
887 		palloc(sizeof(ginxlogRecompressDataLeaf) +
888 			   BLCKSZ +			/* max size needed to hold the segment data */
889 			   nmodified * 2	/* (segno + action) per action */
890 		);
891 	walbufend = walbufbegin;
892 
893 	recompress_xlog = (ginxlogRecompressDataLeaf *) walbufend;
894 	walbufend += sizeof(ginxlogRecompressDataLeaf);
895 
896 	recompress_xlog->nactions = nmodified;
897 
898 	segno = 0;
899 	dlist_foreach(iter, &leaf->segments)
900 	{
901 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
902 												   iter.cur);
903 		int			segsize = 0;
904 		int			datalen;
905 		uint8		action = seginfo->action;
906 
907 		if (action == GIN_SEGMENT_UNMODIFIED)
908 		{
909 			segno++;
910 			continue;
911 		}
912 
913 		if (action != GIN_SEGMENT_DELETE)
914 			segsize = SizeOfGinPostingList(seginfo->seg);
915 
916 		/*
917 		 * If storing the uncompressed list of added item pointers would take
918 		 * more space than storing the compressed segment as is, do that
919 		 * instead.
920 		 */
921 		if (action == GIN_SEGMENT_ADDITEMS &&
922 			seginfo->nmodifieditems * sizeof(ItemPointerData) > segsize)
923 		{
924 			action = GIN_SEGMENT_REPLACE;
925 		}
926 
927 		*((uint8 *) (walbufend++)) = segno;
928 		*(walbufend++) = action;
929 
930 		switch (action)
931 		{
932 			case GIN_SEGMENT_DELETE:
933 				datalen = 0;
934 				break;
935 
936 			case GIN_SEGMENT_ADDITEMS:
937 				datalen = seginfo->nmodifieditems * sizeof(ItemPointerData);
938 				memcpy(walbufend, &seginfo->nmodifieditems, sizeof(uint16));
939 				memcpy(walbufend + sizeof(uint16), seginfo->modifieditems, datalen);
940 				datalen += sizeof(uint16);
941 				break;
942 
943 			case GIN_SEGMENT_INSERT:
944 			case GIN_SEGMENT_REPLACE:
945 				datalen = SHORTALIGN(segsize);
946 				memcpy(walbufend, seginfo->seg, segsize);
947 				break;
948 
949 			default:
950 				elog(ERROR, "unexpected GIN leaf action %d", action);
951 		}
952 		walbufend += datalen;
953 
954 		if (action != GIN_SEGMENT_INSERT)
955 			segno++;
956 	}
957 
958 	/* Pass back the constructed info via *leaf */
959 	leaf->walinfo = walbufbegin;
960 	leaf->walinfolen = walbufend - walbufbegin;
961 }
962 
963 /*
964  * Assemble a disassembled posting tree leaf page back to a buffer.
965  *
966  * This just updates the target buffer; WAL stuff is caller's responsibility.
967  *
968  * NOTE: The segment pointers must not point directly to the same buffer,
969  * except for segments that have not been modified and whose preceding
970  * segments have not been modified either.
971  */
972 static void
dataPlaceToPageLeafRecompress(Buffer buf,disassembledLeaf * leaf)973 dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf)
974 {
975 	Page		page = BufferGetPage(buf);
976 	char	   *ptr;
977 	int			newsize;
978 	bool		modified = false;
979 	dlist_iter	iter;
980 	int			segsize;
981 
982 	/*
983 	 * If the page was in pre-9.4 format before, convert the header, and force
984 	 * all segments to be copied to the page whether they were modified or
985 	 * not.
986 	 */
987 	if (!GinPageIsCompressed(page))
988 	{
989 		Assert(leaf->oldformat);
990 		GinPageSetCompressed(page);
991 		GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
992 		modified = true;
993 	}
994 
995 	ptr = (char *) GinDataLeafPageGetPostingList(page);
996 	newsize = 0;
997 	dlist_foreach(iter, &leaf->segments)
998 	{
999 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
1000 
1001 		if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
1002 			modified = true;
1003 
1004 		if (seginfo->action != GIN_SEGMENT_DELETE)
1005 		{
1006 			segsize = SizeOfGinPostingList(seginfo->seg);
1007 
1008 			if (modified)
1009 				memcpy(ptr, seginfo->seg, segsize);
1010 
1011 			ptr += segsize;
1012 			newsize += segsize;
1013 		}
1014 	}
1015 
1016 	Assert(newsize <= GinDataPageMaxDataSize);
1017 	GinDataPageSetDataSize(page, newsize);
1018 }
1019 
1020 /*
1021  * Like dataPlaceToPageLeafRecompress, but writes the disassembled leaf
1022  * segments to two pages instead of one.
1023  *
1024  * This is different from the non-split cases in that this does not modify
1025  * the original page directly, but writes to temporary in-memory copies of
1026  * the new left and right pages.
1027  */
1028 static void
dataPlaceToPageLeafSplit(disassembledLeaf * leaf,ItemPointerData lbound,ItemPointerData rbound,Page lpage,Page rpage)1029 dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
1030 						 ItemPointerData lbound, ItemPointerData rbound,
1031 						 Page lpage, Page rpage)
1032 {
1033 	char	   *ptr;
1034 	int			segsize;
1035 	int			lsize;
1036 	int			rsize;
1037 	dlist_node *node;
1038 	dlist_node *firstright;
1039 	leafSegmentInfo *seginfo;
1040 
1041 	/* Initialize temporary pages to hold the new left and right pages */
1042 	GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1043 	GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1044 
1045 	/*
1046 	 * Copy the segments that go to the left page.
1047 	 *
1048 	 * XXX: We should skip copying the unmodified part of the left page, like
1049 	 * we do when recompressing.
1050 	 */
1051 	lsize = 0;
1052 	ptr = (char *) GinDataLeafPageGetPostingList(lpage);
1053 	firstright = dlist_next_node(&leaf->segments, leaf->lastleft);
1054 	for (node = dlist_head_node(&leaf->segments);
1055 		 node != firstright;
1056 		 node = dlist_next_node(&leaf->segments, node))
1057 	{
1058 		seginfo = dlist_container(leafSegmentInfo, node, node);
1059 
1060 		if (seginfo->action != GIN_SEGMENT_DELETE)
1061 		{
1062 			segsize = SizeOfGinPostingList(seginfo->seg);
1063 			memcpy(ptr, seginfo->seg, segsize);
1064 			ptr += segsize;
1065 			lsize += segsize;
1066 		}
1067 	}
1068 	Assert(lsize == leaf->lsize);
1069 	GinDataPageSetDataSize(lpage, lsize);
1070 	*GinDataPageGetRightBound(lpage) = lbound;
1071 
1072 	/* Copy the segments that go to the right page */
1073 	ptr = (char *) GinDataLeafPageGetPostingList(rpage);
1074 	rsize = 0;
1075 	for (node = firstright;
1076 		 ;
1077 		 node = dlist_next_node(&leaf->segments, node))
1078 	{
1079 		seginfo = dlist_container(leafSegmentInfo, node, node);
1080 
1081 		if (seginfo->action != GIN_SEGMENT_DELETE)
1082 		{
1083 			segsize = SizeOfGinPostingList(seginfo->seg);
1084 			memcpy(ptr, seginfo->seg, segsize);
1085 			ptr += segsize;
1086 			rsize += segsize;
1087 		}
1088 
1089 		if (!dlist_has_next(&leaf->segments, node))
1090 			break;
1091 	}
1092 	Assert(rsize == leaf->rsize);
1093 	GinDataPageSetDataSize(rpage, rsize);
1094 	*GinDataPageGetRightBound(rpage) = rbound;
1095 }
1096 
1097 /*
1098  * Prepare to insert data on an internal data page.
1099  *
1100  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
1101  * before we enter the insertion critical section.  *ptp_workspace can be
1102  * set to pass information along to the execPlaceToPage function.
1103  *
1104  * If it won't fit, perform a page split and return two temporary page
1105  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
1106  *
1107  * In neither case should the given page buffer be modified here.
1108  *
1109  * Note: on insertion to an internal node, in addition to inserting the given
1110  * item, the downlink of the existing item at stack->off will be updated to
1111  * point to updateblkno.
1112  */
1113 static GinPlaceToPageRC
dataBeginPlaceToPageInternal(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)1114 dataBeginPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1115 							 void *insertdata, BlockNumber updateblkno,
1116 							 void **ptp_workspace,
1117 							 Page *newlpage, Page *newrpage)
1118 {
1119 	Page		page = BufferGetPage(buf);
1120 
1121 	/* If it doesn't fit, deal with split case */
1122 	if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
1123 	{
1124 		dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
1125 							  newlpage, newrpage);
1126 		return GPTP_SPLIT;
1127 	}
1128 
1129 	/* Else, we're ready to proceed with insertion */
1130 	return GPTP_INSERT;
1131 }
1132 
1133 /*
1134  * Perform data insertion after beginPlaceToPage has decided it will fit.
1135  *
1136  * This is invoked within a critical section, and XLOG record creation (if
1137  * needed) is already started.  The target buffer is registered in slot 0.
1138  */
1139 static void
dataExecPlaceToPageInternal(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void * ptp_workspace)1140 dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1141 							void *insertdata, BlockNumber updateblkno,
1142 							void *ptp_workspace)
1143 {
1144 	Page		page = BufferGetPage(buf);
1145 	OffsetNumber off = stack->off;
1146 	PostingItem *pitem;
1147 
1148 	/* Update existing downlink to point to next page (on internal page) */
1149 	pitem = GinDataPageGetPostingItem(page, off);
1150 	PostingItemSetBlockNumber(pitem, updateblkno);
1151 
1152 	/* Add new item */
1153 	pitem = (PostingItem *) insertdata;
1154 	GinDataPageAddPostingItem(page, pitem, off);
1155 
1156 	if (RelationNeedsWAL(btree->index))
1157 	{
1158 		/*
1159 		 * This must be static, because it has to survive until XLogInsert,
1160 		 * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
1161 		 * isn't reentrant anyway.
1162 		 */
1163 		static ginxlogInsertDataInternal data;
1164 
1165 		data.offset = off;
1166 		data.newitem = *pitem;
1167 
1168 		XLogRegisterBufData(0, (char *) &data,
1169 							sizeof(ginxlogInsertDataInternal));
1170 	}
1171 }
1172 
1173 /*
1174  * Prepare to insert data on a posting-tree data page.
1175  *
1176  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
1177  * before we enter the insertion critical section.  *ptp_workspace can be
1178  * set to pass information along to the execPlaceToPage function.
1179  *
1180  * If it won't fit, perform a page split and return two temporary page
1181  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
1182  *
1183  * In neither case should the given page buffer be modified here.
1184  *
1185  * Note: on insertion to an internal node, in addition to inserting the given
1186  * item, the downlink of the existing item at stack->off will be updated to
1187  * point to updateblkno.
1188  *
1189  * Calls relevant function for internal or leaf page because they are handled
1190  * very differently.
1191  */
1192 static GinPlaceToPageRC
dataBeginPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)1193 dataBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1194 					 void *insertdata, BlockNumber updateblkno,
1195 					 void **ptp_workspace,
1196 					 Page *newlpage, Page *newrpage)
1197 {
1198 	Page		page = BufferGetPage(buf);
1199 
1200 	Assert(GinPageIsData(page));
1201 
1202 	if (GinPageIsLeaf(page))
1203 		return dataBeginPlaceToPageLeaf(btree, buf, stack, insertdata,
1204 										ptp_workspace,
1205 										newlpage, newrpage);
1206 	else
1207 		return dataBeginPlaceToPageInternal(btree, buf, stack,
1208 											insertdata, updateblkno,
1209 											ptp_workspace,
1210 											newlpage, newrpage);
1211 }
1212 
1213 /*
1214  * Perform data insertion after beginPlaceToPage has decided it will fit.
1215  *
1216  * This is invoked within a critical section, and XLOG record creation (if
1217  * needed) is already started.  The target buffer is registered in slot 0.
1218  *
1219  * Calls relevant function for internal or leaf page because they are handled
1220  * very differently.
1221  */
1222 static void
dataExecPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void * ptp_workspace)1223 dataExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1224 					void *insertdata, BlockNumber updateblkno,
1225 					void *ptp_workspace)
1226 {
1227 	Page		page = BufferGetPage(buf);
1228 
1229 	if (GinPageIsLeaf(page))
1230 		dataExecPlaceToPageLeaf(btree, buf, stack, insertdata,
1231 								ptp_workspace);
1232 	else
1233 		dataExecPlaceToPageInternal(btree, buf, stack, insertdata,
1234 									updateblkno, ptp_workspace);
1235 }
1236 
1237 /*
1238  * Split internal page and insert new data.
1239  *
1240  * Returns new temp pages to *newlpage and *newrpage.
1241  * The original buffer is left untouched.
1242  */
1243 static void
dataSplitPageInternal(GinBtree btree,Buffer origbuf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Page * newlpage,Page * newrpage)1244 dataSplitPageInternal(GinBtree btree, Buffer origbuf,
1245 					  GinBtreeStack *stack,
1246 					  void *insertdata, BlockNumber updateblkno,
1247 					  Page *newlpage, Page *newrpage)
1248 {
1249 	Page		oldpage = BufferGetPage(origbuf);
1250 	OffsetNumber off = stack->off;
1251 	int			nitems = GinPageGetOpaque(oldpage)->maxoff;
1252 	int			nleftitems;
1253 	int			nrightitems;
1254 	Size		pageSize = PageGetPageSize(oldpage);
1255 	ItemPointerData oldbound = *GinDataPageGetRightBound(oldpage);
1256 	ItemPointer bound;
1257 	Page		lpage;
1258 	Page		rpage;
1259 	OffsetNumber separator;
1260 	PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
1261 
1262 	lpage = PageGetTempPage(oldpage);
1263 	rpage = PageGetTempPage(oldpage);
1264 	GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1265 	GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1266 
1267 	/*
1268 	 * First construct a new list of PostingItems, which includes all the old
1269 	 * items, and the new item.
1270 	 */
1271 	memcpy(allitems, GinDataPageGetPostingItem(oldpage, FirstOffsetNumber),
1272 		   (off - 1) * sizeof(PostingItem));
1273 
1274 	allitems[off - 1] = *((PostingItem *) insertdata);
1275 	memcpy(&allitems[off], GinDataPageGetPostingItem(oldpage, off),
1276 		   (nitems - (off - 1)) * sizeof(PostingItem));
1277 	nitems++;
1278 
1279 	/* Update existing downlink to point to next page */
1280 	PostingItemSetBlockNumber(&allitems[off], updateblkno);
1281 
1282 	/*
1283 	 * When creating a new index, fit as many tuples as possible on the left
1284 	 * page, on the assumption that the table is scanned from beginning to
1285 	 * end. This packs the index as tight as possible.
1286 	 */
1287 	if (btree->isBuild && GinPageRightMost(oldpage))
1288 		separator = GinNonLeafDataPageGetFreeSpace(rpage) / sizeof(PostingItem);
1289 	else
1290 		separator = nitems / 2;
1291 	nleftitems = separator;
1292 	nrightitems = nitems - separator;
1293 
1294 	memcpy(GinDataPageGetPostingItem(lpage, FirstOffsetNumber),
1295 		   allitems,
1296 		   nleftitems * sizeof(PostingItem));
1297 	GinPageGetOpaque(lpage)->maxoff = nleftitems;
1298 	memcpy(GinDataPageGetPostingItem(rpage, FirstOffsetNumber),
1299 		   &allitems[separator],
1300 		   nrightitems * sizeof(PostingItem));
1301 	GinPageGetOpaque(rpage)->maxoff = nrightitems;
1302 
1303 	/*
1304 	 * Also set pd_lower for both pages, like GinDataPageAddPostingItem does.
1305 	 */
1306 	GinDataPageSetDataSize(lpage, nleftitems * sizeof(PostingItem));
1307 	GinDataPageSetDataSize(rpage, nrightitems * sizeof(PostingItem));
1308 
1309 	/* set up right bound for left page */
1310 	bound = GinDataPageGetRightBound(lpage);
1311 	*bound = GinDataPageGetPostingItem(lpage, nleftitems)->key;
1312 
1313 	/* set up right bound for right page */
1314 	*GinDataPageGetRightBound(rpage) = oldbound;
1315 
1316 	/* return temp pages to caller */
1317 	*newlpage = lpage;
1318 	*newrpage = rpage;
1319 }
1320 
1321 /*
1322  * Construct insertion payload for inserting the downlink for given buffer.
1323  */
1324 static void *
dataPrepareDownlink(GinBtree btree,Buffer lbuf)1325 dataPrepareDownlink(GinBtree btree, Buffer lbuf)
1326 {
1327 	PostingItem *pitem = palloc(sizeof(PostingItem));
1328 	Page		lpage = BufferGetPage(lbuf);
1329 
1330 	PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
1331 	pitem->key = *GinDataPageGetRightBound(lpage);
1332 
1333 	return pitem;
1334 }
1335 
1336 /*
1337  * Fills new root by right bound values from child.
1338  * Also called from ginxlog, should not use btree
1339  */
1340 void
ginDataFillRoot(GinBtree btree,Page root,BlockNumber lblkno,Page lpage,BlockNumber rblkno,Page rpage)1341 ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
1342 {
1343 	PostingItem li,
1344 				ri;
1345 
1346 	li.key = *GinDataPageGetRightBound(lpage);
1347 	PostingItemSetBlockNumber(&li, lblkno);
1348 	GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
1349 
1350 	ri.key = *GinDataPageGetRightBound(rpage);
1351 	PostingItemSetBlockNumber(&ri, rblkno);
1352 	GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
1353 }
1354 
1355 
1356 /*** Functions to work with disassembled leaf pages ***/
1357 
1358 /*
1359  * Disassemble page into a disassembledLeaf struct.
1360  */
1361 static disassembledLeaf *
disassembleLeaf(Page page)1362 disassembleLeaf(Page page)
1363 {
1364 	disassembledLeaf *leaf;
1365 	GinPostingList *seg;
1366 	Pointer		segbegin;
1367 	Pointer		segend;
1368 
1369 	leaf = palloc0(sizeof(disassembledLeaf));
1370 	dlist_init(&leaf->segments);
1371 
1372 	if (GinPageIsCompressed(page))
1373 	{
1374 		/*
1375 		 * Create a leafSegment entry for each segment.
1376 		 */
1377 		seg = GinDataLeafPageGetPostingList(page);
1378 		segbegin = (Pointer) seg;
1379 		segend = segbegin + GinDataLeafPageGetPostingListSize(page);
1380 		while ((Pointer) seg < segend)
1381 		{
1382 			leafSegmentInfo *seginfo = palloc(sizeof(leafSegmentInfo));
1383 
1384 			seginfo->action = GIN_SEGMENT_UNMODIFIED;
1385 			seginfo->seg = seg;
1386 			seginfo->items = NULL;
1387 			seginfo->nitems = 0;
1388 			dlist_push_tail(&leaf->segments, &seginfo->node);
1389 
1390 			seg = GinNextPostingListSegment(seg);
1391 		}
1392 		leaf->oldformat = false;
1393 	}
1394 	else
1395 	{
1396 		/*
1397 		 * A pre-9.4 format uncompressed page is represented by a single
1398 		 * segment, with an array of items.  The corner case is uncompressed
1399 		 * page containing no items, which is represented as no segments.
1400 		 */
1401 		ItemPointer uncompressed;
1402 		int			nuncompressed;
1403 		leafSegmentInfo *seginfo;
1404 
1405 		uncompressed = dataLeafPageGetUncompressed(page, &nuncompressed);
1406 
1407 		if (nuncompressed > 0)
1408 		{
1409 			seginfo = palloc(sizeof(leafSegmentInfo));
1410 
1411 			seginfo->action = GIN_SEGMENT_REPLACE;
1412 			seginfo->seg = NULL;
1413 			seginfo->items = palloc(nuncompressed * sizeof(ItemPointerData));
1414 			memcpy(seginfo->items, uncompressed, nuncompressed * sizeof(ItemPointerData));
1415 			seginfo->nitems = nuncompressed;
1416 
1417 			dlist_push_tail(&leaf->segments, &seginfo->node);
1418 		}
1419 
1420 		leaf->oldformat = true;
1421 	}
1422 
1423 	return leaf;
1424 }
1425 
1426 /*
1427  * Distribute newItems to the segments.
1428  *
1429  * Any segments that acquire new items are decoded, and the new items are
1430  * merged with the old items.
1431  *
1432  * Returns true if any new items were added. False means they were all
1433  * duplicates of existing items on the page.
1434  */
1435 static bool
addItemsToLeaf(disassembledLeaf * leaf,ItemPointer newItems,int nNewItems)1436 addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems)
1437 {
1438 	dlist_iter	iter;
1439 	ItemPointer nextnew = newItems;
1440 	int			newleft = nNewItems;
1441 	bool		modified = false;
1442 	leafSegmentInfo *newseg;
1443 
1444 	/*
1445 	 * If the page is completely empty, just construct one new segment to hold
1446 	 * all the new items.
1447 	 */
1448 	if (dlist_is_empty(&leaf->segments))
1449 	{
1450 		newseg = palloc(sizeof(leafSegmentInfo));
1451 		newseg->seg = NULL;
1452 		newseg->items = newItems;
1453 		newseg->nitems = nNewItems;
1454 		newseg->action = GIN_SEGMENT_INSERT;
1455 		dlist_push_tail(&leaf->segments, &newseg->node);
1456 		return true;
1457 	}
1458 
1459 	dlist_foreach(iter, &leaf->segments)
1460 	{
1461 		leafSegmentInfo *cur = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node, iter.cur);
1462 		int			nthis;
1463 		ItemPointer tmpitems;
1464 		int			ntmpitems;
1465 
1466 		/*
1467 		 * How many of the new items fall into this segment?
1468 		 */
1469 		if (!dlist_has_next(&leaf->segments, iter.cur))
1470 			nthis = newleft;
1471 		else
1472 		{
1473 			leafSegmentInfo *next;
1474 			ItemPointerData next_first;
1475 
1476 			next = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node,
1477 								 dlist_next_node(&leaf->segments, iter.cur));
1478 			if (next->items)
1479 				next_first = next->items[0];
1480 			else
1481 			{
1482 				Assert(next->seg != NULL);
1483 				next_first = next->seg->first;
1484 			}
1485 
1486 			nthis = 0;
1487 			while (nthis < newleft && ginCompareItemPointers(&nextnew[nthis], &next_first) < 0)
1488 				nthis++;
1489 		}
1490 		if (nthis == 0)
1491 			continue;
1492 
1493 		/* Merge the new items with the existing items. */
1494 		if (!cur->items)
1495 			cur->items = ginPostingListDecode(cur->seg, &cur->nitems);
1496 
1497 		/*
1498 		 * Fast path for the important special case that we're appending to
1499 		 * the end of the page: don't let the last segment on the page grow
1500 		 * larger than the target, create a new segment before that happens.
1501 		 */
1502 		if (!dlist_has_next(&leaf->segments, iter.cur) &&
1503 			ginCompareItemPointers(&cur->items[cur->nitems - 1], &nextnew[0]) < 0 &&
1504 			cur->seg != NULL &&
1505 			SizeOfGinPostingList(cur->seg) >= GinPostingListSegmentTargetSize)
1506 		{
1507 			newseg = palloc(sizeof(leafSegmentInfo));
1508 			newseg->seg = NULL;
1509 			newseg->items = nextnew;
1510 			newseg->nitems = nthis;
1511 			newseg->action = GIN_SEGMENT_INSERT;
1512 			dlist_push_tail(&leaf->segments, &newseg->node);
1513 			modified = true;
1514 			break;
1515 		}
1516 
1517 		tmpitems = ginMergeItemPointers(cur->items, cur->nitems,
1518 										nextnew, nthis,
1519 										&ntmpitems);
1520 		if (ntmpitems != cur->nitems)
1521 		{
1522 			/*
1523 			 * If there are no duplicates, track the added items so that we
1524 			 * can emit a compact ADDITEMS WAL record later on. (it doesn't
1525 			 * seem worth re-checking which items were duplicates, if there
1526 			 * were any)
1527 			 */
1528 			if (ntmpitems == nthis + cur->nitems &&
1529 				cur->action == GIN_SEGMENT_UNMODIFIED)
1530 			{
1531 				cur->action = GIN_SEGMENT_ADDITEMS;
1532 				cur->modifieditems = nextnew;
1533 				cur->nmodifieditems = nthis;
1534 			}
1535 			else
1536 				cur->action = GIN_SEGMENT_REPLACE;
1537 
1538 			cur->items = tmpitems;
1539 			cur->nitems = ntmpitems;
1540 			cur->seg = NULL;
1541 			modified = true;
1542 		}
1543 
1544 		nextnew += nthis;
1545 		newleft -= nthis;
1546 		if (newleft == 0)
1547 			break;
1548 	}
1549 
1550 	return modified;
1551 }
1552 
1553 /*
1554  * Recompresses all segments that have been modified.
1555  *
1556  * If not all the items fit on two pages (ie. after split), we store as
1557  * many items as fit, and set *remaining to the first item that didn't fit.
1558  * If all items fit, *remaining is set to invalid.
1559  *
1560  * Returns true if the page has to be split.
1561  */
1562 static bool
leafRepackItems(disassembledLeaf * leaf,ItemPointer remaining)1563 leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining)
1564 {
1565 	int			pgused = 0;
1566 	bool		needsplit = false;
1567 	dlist_iter	iter;
1568 	int			segsize;
1569 	leafSegmentInfo *nextseg;
1570 	int			npacked;
1571 	bool		modified;
1572 	dlist_node *cur_node;
1573 	dlist_node *next_node;
1574 
1575 	ItemPointerSetInvalid(remaining);
1576 
1577 	/*
1578 	 * cannot use dlist_foreach_modify here because we insert adjacent items
1579 	 * while iterating.
1580 	 */
1581 	for (cur_node = dlist_head_node(&leaf->segments);
1582 		 cur_node != NULL;
1583 		 cur_node = next_node)
1584 	{
1585 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1586 												   cur_node);
1587 
1588 		if (dlist_has_next(&leaf->segments, cur_node))
1589 			next_node = dlist_next_node(&leaf->segments, cur_node);
1590 		else
1591 			next_node = NULL;
1592 
1593 		/* Compress the posting list, if necessary */
1594 		if (seginfo->action != GIN_SEGMENT_DELETE)
1595 		{
1596 			if (seginfo->seg == NULL)
1597 			{
1598 				if (seginfo->nitems > GinPostingListSegmentMaxSize)
1599 					npacked = 0;	/* no chance that it would fit. */
1600 				else
1601 				{
1602 					seginfo->seg = ginCompressPostingList(seginfo->items,
1603 														  seginfo->nitems,
1604 												GinPostingListSegmentMaxSize,
1605 														  &npacked);
1606 				}
1607 				if (npacked != seginfo->nitems)
1608 				{
1609 					/*
1610 					 * Too large. Compress again to the target size, and
1611 					 * create a new segment to represent the remaining items.
1612 					 * The new segment is inserted after this one, so it will
1613 					 * be processed in the next iteration of this loop.
1614 					 */
1615 					if (seginfo->seg)
1616 						pfree(seginfo->seg);
1617 					seginfo->seg = ginCompressPostingList(seginfo->items,
1618 														  seginfo->nitems,
1619 											 GinPostingListSegmentTargetSize,
1620 														  &npacked);
1621 					if (seginfo->action != GIN_SEGMENT_INSERT)
1622 						seginfo->action = GIN_SEGMENT_REPLACE;
1623 
1624 					nextseg = palloc(sizeof(leafSegmentInfo));
1625 					nextseg->action = GIN_SEGMENT_INSERT;
1626 					nextseg->seg = NULL;
1627 					nextseg->items = &seginfo->items[npacked];
1628 					nextseg->nitems = seginfo->nitems - npacked;
1629 					next_node = &nextseg->node;
1630 					dlist_insert_after(cur_node, next_node);
1631 				}
1632 			}
1633 
1634 			/*
1635 			 * If the segment is very small, merge it with the next segment.
1636 			 */
1637 			if (SizeOfGinPostingList(seginfo->seg) < GinPostingListSegmentMinSize && next_node)
1638 			{
1639 				int			nmerged;
1640 
1641 				nextseg = dlist_container(leafSegmentInfo, node, next_node);
1642 
1643 				if (seginfo->items == NULL)
1644 					seginfo->items = ginPostingListDecode(seginfo->seg,
1645 														  &seginfo->nitems);
1646 				if (nextseg->items == NULL)
1647 					nextseg->items = ginPostingListDecode(nextseg->seg,
1648 														  &nextseg->nitems);
1649 				nextseg->items =
1650 					ginMergeItemPointers(seginfo->items, seginfo->nitems,
1651 										 nextseg->items, nextseg->nitems,
1652 										 &nmerged);
1653 				Assert(nmerged == seginfo->nitems + nextseg->nitems);
1654 				nextseg->nitems = nmerged;
1655 				nextseg->seg = NULL;
1656 
1657 				nextseg->action = GIN_SEGMENT_REPLACE;
1658 				nextseg->modifieditems = NULL;
1659 				nextseg->nmodifieditems = 0;
1660 
1661 				if (seginfo->action == GIN_SEGMENT_INSERT)
1662 				{
1663 					dlist_delete(cur_node);
1664 					continue;
1665 				}
1666 				else
1667 				{
1668 					seginfo->action = GIN_SEGMENT_DELETE;
1669 					seginfo->seg = NULL;
1670 				}
1671 			}
1672 
1673 			seginfo->items = NULL;
1674 			seginfo->nitems = 0;
1675 		}
1676 
1677 		if (seginfo->action == GIN_SEGMENT_DELETE)
1678 			continue;
1679 
1680 		/*
1681 		 * OK, we now have a compressed version of this segment ready for
1682 		 * copying to the page. Did we exceed the size that fits on one page?
1683 		 */
1684 		segsize = SizeOfGinPostingList(seginfo->seg);
1685 		if (pgused + segsize > GinDataPageMaxDataSize)
1686 		{
1687 			if (!needsplit)
1688 			{
1689 				/* switch to right page */
1690 				Assert(pgused > 0);
1691 				leaf->lastleft = dlist_prev_node(&leaf->segments, cur_node);
1692 				needsplit = true;
1693 				leaf->lsize = pgused;
1694 				pgused = 0;
1695 			}
1696 			else
1697 			{
1698 				/*
1699 				 * Filled both pages. The last segment we constructed did not
1700 				 * fit.
1701 				 */
1702 				*remaining = seginfo->seg->first;
1703 
1704 				/*
1705 				 * remove all segments that did not fit from the list.
1706 				 */
1707 				while (dlist_has_next(&leaf->segments, cur_node))
1708 					dlist_delete(dlist_next_node(&leaf->segments, cur_node));
1709 				dlist_delete(cur_node);
1710 				break;
1711 			}
1712 		}
1713 
1714 		pgused += segsize;
1715 	}
1716 
1717 	if (!needsplit)
1718 	{
1719 		leaf->lsize = pgused;
1720 		leaf->rsize = 0;
1721 	}
1722 	else
1723 		leaf->rsize = pgused;
1724 
1725 	Assert(leaf->lsize <= GinDataPageMaxDataSize);
1726 	Assert(leaf->rsize <= GinDataPageMaxDataSize);
1727 
1728 	/*
1729 	 * Make a palloc'd copy of every segment after the first modified one,
1730 	 * because as we start copying items to the original page, we might
1731 	 * overwrite an existing segment.
1732 	 */
1733 	modified = false;
1734 	dlist_foreach(iter, &leaf->segments)
1735 	{
1736 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1737 												   iter.cur);
1738 
1739 		if (!modified && seginfo->action != GIN_SEGMENT_UNMODIFIED)
1740 		{
1741 			modified = true;
1742 		}
1743 		else if (modified && seginfo->action == GIN_SEGMENT_UNMODIFIED)
1744 		{
1745 			GinPostingList *tmp;
1746 
1747 			segsize = SizeOfGinPostingList(seginfo->seg);
1748 			tmp = palloc(segsize);
1749 			memcpy(tmp, seginfo->seg, segsize);
1750 			seginfo->seg = tmp;
1751 		}
1752 	}
1753 
1754 	return needsplit;
1755 }
1756 
1757 
1758 /*** Functions that are exported to the rest of the GIN code ***/
1759 
1760 /*
1761  * Creates new posting tree containing the given TIDs. Returns the page
1762  * number of the root of the new posting tree.
1763  *
1764  * items[] must be in sorted order with no duplicates.
1765  */
1766 BlockNumber
createPostingTree(Relation index,ItemPointerData * items,uint32 nitems,GinStatsData * buildStats)1767 createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
1768 				  GinStatsData *buildStats)
1769 {
1770 	BlockNumber blkno;
1771 	Buffer		buffer;
1772 	Page		tmppage;
1773 	Page		page;
1774 	Pointer		ptr;
1775 	int			nrootitems;
1776 	int			rootsize;
1777 
1778 	/* Construct the new root page in memory first. */
1779 	tmppage = (Page) palloc(BLCKSZ);
1780 	GinInitPage(tmppage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1781 	GinPageGetOpaque(tmppage)->rightlink = InvalidBlockNumber;
1782 
1783 	/*
1784 	 * Write as many of the items to the root page as fit. In segments of max
1785 	 * GinPostingListSegmentMaxSize bytes each.
1786 	 */
1787 	nrootitems = 0;
1788 	rootsize = 0;
1789 	ptr = (Pointer) GinDataLeafPageGetPostingList(tmppage);
1790 	while (nrootitems < nitems)
1791 	{
1792 		GinPostingList *segment;
1793 		int			npacked;
1794 		int			segsize;
1795 
1796 		segment = ginCompressPostingList(&items[nrootitems],
1797 										 nitems - nrootitems,
1798 										 GinPostingListSegmentMaxSize,
1799 										 &npacked);
1800 		segsize = SizeOfGinPostingList(segment);
1801 		if (rootsize + segsize > GinDataPageMaxDataSize)
1802 			break;
1803 
1804 		memcpy(ptr, segment, segsize);
1805 		ptr += segsize;
1806 		rootsize += segsize;
1807 		nrootitems += npacked;
1808 		pfree(segment);
1809 	}
1810 	GinDataPageSetDataSize(tmppage, rootsize);
1811 
1812 	/*
1813 	 * All set. Get a new physical page, and copy the in-memory page to it.
1814 	 */
1815 	buffer = GinNewBuffer(index);
1816 	page = BufferGetPage(buffer);
1817 	blkno = BufferGetBlockNumber(buffer);
1818 
1819 	START_CRIT_SECTION();
1820 
1821 	PageRestoreTempPage(tmppage, page);
1822 	MarkBufferDirty(buffer);
1823 
1824 	if (RelationNeedsWAL(index))
1825 	{
1826 		XLogRecPtr	recptr;
1827 		ginxlogCreatePostingTree data;
1828 
1829 		data.size = rootsize;
1830 
1831 		XLogBeginInsert();
1832 		XLogRegisterData((char *) &data, sizeof(ginxlogCreatePostingTree));
1833 
1834 		XLogRegisterData((char *) GinDataLeafPageGetPostingList(page),
1835 						 rootsize);
1836 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
1837 
1838 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE);
1839 		PageSetLSN(page, recptr);
1840 	}
1841 
1842 	UnlockReleaseBuffer(buffer);
1843 
1844 	END_CRIT_SECTION();
1845 
1846 	/* During index build, count the newly-added data page */
1847 	if (buildStats)
1848 		buildStats->nDataPages++;
1849 
1850 	elog(DEBUG2, "created GIN posting tree with %d items", nrootitems);
1851 
1852 	/*
1853 	 * Add any remaining TIDs to the newly-created posting tree.
1854 	 */
1855 	if (nitems > nrootitems)
1856 	{
1857 		ginInsertItemPointers(index, blkno,
1858 							  items + nrootitems,
1859 							  nitems - nrootitems,
1860 							  buildStats);
1861 	}
1862 
1863 	return blkno;
1864 }
1865 
1866 void
ginPrepareDataScan(GinBtree btree,Relation index,BlockNumber rootBlkno)1867 ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
1868 {
1869 	memset(btree, 0, sizeof(GinBtreeData));
1870 
1871 	btree->index = index;
1872 	btree->rootBlkno = rootBlkno;
1873 
1874 	btree->findChildPage = dataLocateItem;
1875 	btree->getLeftMostChild = dataGetLeftMostPage;
1876 	btree->isMoveRight = dataIsMoveRight;
1877 	btree->findItem = NULL;
1878 	btree->findChildPtr = dataFindChildPtr;
1879 	btree->beginPlaceToPage = dataBeginPlaceToPage;
1880 	btree->execPlaceToPage = dataExecPlaceToPage;
1881 	btree->fillRoot = ginDataFillRoot;
1882 	btree->prepareDownlink = dataPrepareDownlink;
1883 
1884 	btree->isData = TRUE;
1885 	btree->fullScan = FALSE;
1886 	btree->isBuild = FALSE;
1887 }
1888 
1889 /*
1890  * Inserts array of item pointers, may execute several tree scan (very rare)
1891  */
1892 void
ginInsertItemPointers(Relation index,BlockNumber rootBlkno,ItemPointerData * items,uint32 nitem,GinStatsData * buildStats)1893 ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
1894 					  ItemPointerData *items, uint32 nitem,
1895 					  GinStatsData *buildStats)
1896 {
1897 	GinBtreeData btree;
1898 	GinBtreeDataLeafInsertData insertdata;
1899 	GinBtreeStack *stack;
1900 
1901 	ginPrepareDataScan(&btree, index, rootBlkno);
1902 	btree.isBuild = (buildStats != NULL);
1903 	insertdata.items = items;
1904 	insertdata.nitem = nitem;
1905 	insertdata.curitem = 0;
1906 
1907 	while (insertdata.curitem < insertdata.nitem)
1908 	{
1909 		/* search for the leaf page where the first item should go to */
1910 		btree.itemptr = insertdata.items[insertdata.curitem];
1911 		stack = ginFindLeafPage(&btree, false, NULL);
1912 
1913 		ginInsertValue(&btree, stack, &insertdata, buildStats);
1914 	}
1915 }
1916 
1917 /*
1918  * Starts a new scan on a posting tree.
1919  */
1920 GinBtreeStack *
ginScanBeginPostingTree(GinBtree btree,Relation index,BlockNumber rootBlkno,Snapshot snapshot)1921 ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
1922 						Snapshot snapshot)
1923 {
1924 	GinBtreeStack *stack;
1925 
1926 	ginPrepareDataScan(btree, index, rootBlkno);
1927 
1928 	btree->fullScan = TRUE;
1929 
1930 	stack = ginFindLeafPage(btree, TRUE, snapshot);
1931 
1932 	return stack;
1933 }
1934