1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *	  routines for handling GIN posting tree pages.
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/gindatapage.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/xloginsert.h"
20 #include "lib/ilist.h"
21 #include "miscadmin.h"
22 #include "storage/predicate.h"
23 #include "utils/rel.h"
24 
25 /*
26  * Min, Max and Target size of posting lists stored on leaf pages, in bytes.
27  *
28  * The code can deal with any size, but random access is more efficient when
29  * a number of smaller lists are stored, rather than one big list. If a
30  * posting list would become larger than Max size as a result of insertions,
31  * it is split into two. If a posting list would be smaller than minimum
32  * size, it is merged with the next posting list.
33  */
34 #define GinPostingListSegmentMaxSize 384
35 #define GinPostingListSegmentTargetSize 256
36 #define GinPostingListSegmentMinSize 128
37 
38 /*
39  * At least this many items fit in a GinPostingListSegmentMaxSize-bytes
40  * long segment. This is used when estimating how much space is required
41  * for N items, at minimum.
42  */
43 #define MinTuplesPerSegment ((GinPostingListSegmentMaxSize - 2) / 6)
44 
45 /*
46  * A working struct for manipulating a posting tree leaf page.
47  */
48 typedef struct
49 {
50 	dlist_head	segments;		/* a list of leafSegmentInfos */
51 
52 	/*
53 	 * The following fields represent how the segments are split across pages,
54 	 * if a page split is required. Filled in by leafRepackItems.
55 	 */
56 	dlist_node *lastleft;		/* last segment on left page */
57 	int			lsize;			/* total size on left page */
58 	int			rsize;			/* total size on right page */
59 
60 	bool		oldformat;		/* page is in pre-9.4 format on disk */
61 
62 	/*
63 	 * If we need WAL data representing the reconstructed leaf page, it's
64 	 * stored here by computeLeafRecompressWALData.
65 	 */
66 	char	   *walinfo;		/* buffer start */
67 	int			walinfolen;		/* and length */
68 } disassembledLeaf;
69 
70 typedef struct
71 {
72 	dlist_node	node;			/* linked list pointers */
73 
74 	/*-------------
75 	 * 'action' indicates the status of this in-memory segment, compared to
76 	 * what's on disk. It is one of the GIN_SEGMENT_* action codes:
77 	 *
78 	 * UNMODIFIED	no changes
79 	 * DELETE		the segment is to be removed. 'seg' and 'items' are
80 	 *				ignored
81 	 * INSERT		this is a completely new segment
82 	 * REPLACE		this replaces an existing segment with new content
83 	 * ADDITEMS		like REPLACE, but no items have been removed, and we track
84 	 *				in detail what items have been added to this segment, in
85 	 *				'modifieditems'
86 	 *-------------
87 	 */
88 	char		action;
89 
90 	ItemPointerData *modifieditems;
91 	uint16		nmodifieditems;
92 
93 	/*
94 	 * The following fields represent the items in this segment. If 'items' is
95 	 * not NULL, it contains a palloc'd array of the itemsin this segment. If
96 	 * 'seg' is not NULL, it contains the items in an already-compressed
97 	 * format. It can point to an on-disk page (!modified), or a palloc'd
98 	 * segment in memory. If both are set, they must represent the same items.
99 	 */
100 	GinPostingList *seg;
101 	ItemPointer items;
102 	int			nitems;			/* # of items in 'items', if items != NULL */
103 } leafSegmentInfo;
104 
105 static ItemPointer dataLeafPageGetUncompressed(Page page, int *nitems);
106 static void dataSplitPageInternal(GinBtree btree, Buffer origbuf,
107 					  GinBtreeStack *stack,
108 					  void *insertdata, BlockNumber updateblkno,
109 					  Page *newlpage, Page *newrpage);
110 
111 static disassembledLeaf *disassembleLeaf(Page page);
112 static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining);
113 static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
114 			   int nNewItems);
115 
116 static void computeLeafRecompressWALData(disassembledLeaf *leaf);
117 static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
118 static void dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
119 						 ItemPointerData lbound, ItemPointerData rbound,
120 						 Page lpage, Page rpage);
121 
122 /*
123  * Read TIDs from leaf data page to single uncompressed array. The TIDs are
124  * returned in ascending order.
125  *
126  * advancePast is a hint, indicating that the caller is only interested in
127  * TIDs > advancePast. To return all items, use ItemPointerSetMin.
128  *
129  * Note: This function can still return items smaller than advancePast that
130  * are in the same posting list as the items of interest, so the caller must
131  * still check all the returned items. But passing it allows this function to
132  * skip whole posting lists.
133  */
134 ItemPointer
GinDataLeafPageGetItems(Page page,int * nitems,ItemPointerData advancePast)135 GinDataLeafPageGetItems(Page page, int *nitems, ItemPointerData advancePast)
136 {
137 	ItemPointer result;
138 
139 	if (GinPageIsCompressed(page))
140 	{
141 		GinPostingList *seg = GinDataLeafPageGetPostingList(page);
142 		Size		len = GinDataLeafPageGetPostingListSize(page);
143 		Pointer		endptr = ((Pointer) seg) + len;
144 		GinPostingList *next;
145 
146 		/* Skip to the segment containing advancePast+1 */
147 		if (ItemPointerIsValid(&advancePast))
148 		{
149 			next = GinNextPostingListSegment(seg);
150 			while ((Pointer) next < endptr &&
151 				   ginCompareItemPointers(&next->first, &advancePast) <= 0)
152 			{
153 				seg = next;
154 				next = GinNextPostingListSegment(seg);
155 			}
156 			len = endptr - (Pointer) seg;
157 		}
158 
159 		if (len > 0)
160 			result = ginPostingListDecodeAllSegments(seg, len, nitems);
161 		else
162 		{
163 			result = NULL;
164 			*nitems = 0;
165 		}
166 	}
167 	else
168 	{
169 		ItemPointer tmp = dataLeafPageGetUncompressed(page, nitems);
170 
171 		result = palloc((*nitems) * sizeof(ItemPointerData));
172 		memcpy(result, tmp, (*nitems) * sizeof(ItemPointerData));
173 	}
174 
175 	return result;
176 }
177 
178 /*
179  * Places all TIDs from leaf data page to bitmap.
180  */
181 int
GinDataLeafPageGetItemsToTbm(Page page,TIDBitmap * tbm)182 GinDataLeafPageGetItemsToTbm(Page page, TIDBitmap *tbm)
183 {
184 	ItemPointer uncompressed;
185 	int			nitems;
186 
187 	if (GinPageIsCompressed(page))
188 	{
189 		GinPostingList *segment = GinDataLeafPageGetPostingList(page);
190 		Size		len = GinDataLeafPageGetPostingListSize(page);
191 
192 		nitems = ginPostingListDecodeAllSegmentsToTbm(segment, len, tbm);
193 	}
194 	else
195 	{
196 		uncompressed = dataLeafPageGetUncompressed(page, &nitems);
197 
198 		if (nitems > 0)
199 			tbm_add_tuples(tbm, uncompressed, nitems, false);
200 	}
201 
202 	return nitems;
203 }
204 
205 /*
206  * Get pointer to the uncompressed array of items on a pre-9.4 format
207  * uncompressed leaf page. The number of items in the array is returned in
208  * *nitems.
209  */
210 static ItemPointer
dataLeafPageGetUncompressed(Page page,int * nitems)211 dataLeafPageGetUncompressed(Page page, int *nitems)
212 {
213 	ItemPointer items;
214 
215 	Assert(!GinPageIsCompressed(page));
216 
217 	/*
218 	 * In the old pre-9.4 page format, the whole page content is used for
219 	 * uncompressed items, and the number of items is stored in 'maxoff'
220 	 */
221 	items = (ItemPointer) GinDataPageGetData(page);
222 	*nitems = GinPageGetOpaque(page)->maxoff;
223 
224 	return items;
225 }
226 
227 /*
228  * Check if we should follow the right link to find the item we're searching
229  * for.
230  *
231  * Compares inserting item pointer with the right bound of the current page.
232  */
233 static bool
dataIsMoveRight(GinBtree btree,Page page)234 dataIsMoveRight(GinBtree btree, Page page)
235 {
236 	ItemPointer iptr = GinDataPageGetRightBound(page);
237 
238 	if (GinPageRightMost(page))
239 		return false;
240 
241 	if (GinPageIsDeleted(page))
242 		return true;
243 
244 	return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? true : false;
245 }
246 
247 /*
248  * Find correct PostingItem in non-leaf page. It is assumed that this is
249  * the correct page, and the searched value SHOULD be on the page.
250  */
251 static BlockNumber
dataLocateItem(GinBtree btree,GinBtreeStack * stack)252 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
253 {
254 	OffsetNumber low,
255 				high,
256 				maxoff;
257 	PostingItem *pitem = NULL;
258 	int			result;
259 	Page		page = BufferGetPage(stack->buffer);
260 
261 	Assert(!GinPageIsLeaf(page));
262 	Assert(GinPageIsData(page));
263 
264 	if (btree->fullScan)
265 	{
266 		stack->off = FirstOffsetNumber;
267 		stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
268 		return btree->getLeftMostChild(btree, page);
269 	}
270 
271 	low = FirstOffsetNumber;
272 	maxoff = high = GinPageGetOpaque(page)->maxoff;
273 	Assert(high >= low);
274 
275 	high++;
276 
277 	while (high > low)
278 	{
279 		OffsetNumber mid = low + ((high - low) / 2);
280 
281 		pitem = GinDataPageGetPostingItem(page, mid);
282 
283 		if (mid == maxoff)
284 		{
285 			/*
286 			 * Right infinity, page already correctly chosen with a help of
287 			 * dataIsMoveRight
288 			 */
289 			result = -1;
290 		}
291 		else
292 		{
293 			pitem = GinDataPageGetPostingItem(page, mid);
294 			result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
295 		}
296 
297 		if (result == 0)
298 		{
299 			stack->off = mid;
300 			return PostingItemGetBlockNumber(pitem);
301 		}
302 		else if (result > 0)
303 			low = mid + 1;
304 		else
305 			high = mid;
306 	}
307 
308 	Assert(high >= FirstOffsetNumber && high <= maxoff);
309 
310 	stack->off = high;
311 	pitem = GinDataPageGetPostingItem(page, high);
312 	return PostingItemGetBlockNumber(pitem);
313 }
314 
315 /*
316  * Find link to blkno on non-leaf page, returns offset of PostingItem
317  */
318 static OffsetNumber
dataFindChildPtr(GinBtree btree,Page page,BlockNumber blkno,OffsetNumber storedOff)319 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
320 {
321 	OffsetNumber i,
322 				maxoff = GinPageGetOpaque(page)->maxoff;
323 	PostingItem *pitem;
324 
325 	Assert(!GinPageIsLeaf(page));
326 	Assert(GinPageIsData(page));
327 
328 	/* if page isn't changed, we return storedOff */
329 	if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
330 	{
331 		pitem = GinDataPageGetPostingItem(page, storedOff);
332 		if (PostingItemGetBlockNumber(pitem) == blkno)
333 			return storedOff;
334 
335 		/*
336 		 * we hope, that needed pointer goes to right. It's true if there
337 		 * wasn't a deletion
338 		 */
339 		for (i = storedOff + 1; i <= maxoff; i++)
340 		{
341 			pitem = GinDataPageGetPostingItem(page, i);
342 			if (PostingItemGetBlockNumber(pitem) == blkno)
343 				return i;
344 		}
345 
346 		maxoff = storedOff - 1;
347 	}
348 
349 	/* last chance */
350 	for (i = FirstOffsetNumber; i <= maxoff; i++)
351 	{
352 		pitem = GinDataPageGetPostingItem(page, i);
353 		if (PostingItemGetBlockNumber(pitem) == blkno)
354 			return i;
355 	}
356 
357 	return InvalidOffsetNumber;
358 }
359 
360 /*
361  * Return blkno of leftmost child
362  */
363 static BlockNumber
dataGetLeftMostPage(GinBtree btree,Page page)364 dataGetLeftMostPage(GinBtree btree, Page page)
365 {
366 	PostingItem *pitem;
367 
368 	Assert(!GinPageIsLeaf(page));
369 	Assert(GinPageIsData(page));
370 	Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
371 
372 	pitem = GinDataPageGetPostingItem(page, FirstOffsetNumber);
373 	return PostingItemGetBlockNumber(pitem);
374 }
375 
376 /*
377  * Add PostingItem to a non-leaf page.
378  */
379 void
GinDataPageAddPostingItem(Page page,PostingItem * data,OffsetNumber offset)380 GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
381 {
382 	OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
383 	char	   *ptr;
384 
385 	Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
386 	Assert(!GinPageIsLeaf(page));
387 
388 	if (offset == InvalidOffsetNumber)
389 	{
390 		ptr = (char *) GinDataPageGetPostingItem(page, maxoff + 1);
391 	}
392 	else
393 	{
394 		ptr = (char *) GinDataPageGetPostingItem(page, offset);
395 		if (offset != maxoff + 1)
396 			memmove(ptr + sizeof(PostingItem),
397 					ptr,
398 					(maxoff - offset + 1) * sizeof(PostingItem));
399 	}
400 	memcpy(ptr, data, sizeof(PostingItem));
401 
402 	maxoff++;
403 	GinPageGetOpaque(page)->maxoff = maxoff;
404 
405 	/*
406 	 * Also set pd_lower to the end of the posting items, to follow the
407 	 * "standard" page layout, so that we can squeeze out the unused space
408 	 * from full-page images.
409 	 */
410 	GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
411 }
412 
413 /*
414  * Delete posting item from non-leaf page
415  */
416 void
GinPageDeletePostingItem(Page page,OffsetNumber offset)417 GinPageDeletePostingItem(Page page, OffsetNumber offset)
418 {
419 	OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
420 
421 	Assert(!GinPageIsLeaf(page));
422 	Assert(offset >= FirstOffsetNumber && offset <= maxoff);
423 
424 	if (offset != maxoff)
425 		memmove(GinDataPageGetPostingItem(page, offset),
426 				GinDataPageGetPostingItem(page, offset + 1),
427 				sizeof(PostingItem) * (maxoff - offset));
428 
429 	maxoff--;
430 	GinPageGetOpaque(page)->maxoff = maxoff;
431 
432 	GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
433 }
434 
435 /*
436  * Prepare to insert data on a leaf data page.
437  *
438  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
439  * before we enter the insertion critical section.  *ptp_workspace can be
440  * set to pass information along to the execPlaceToPage function.
441  *
442  * If it won't fit, perform a page split and return two temporary page
443  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
444  *
445  * In neither case should the given page buffer be modified here.
446  */
447 static GinPlaceToPageRC
dataBeginPlaceToPageLeaf(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,void ** ptp_workspace,Page * newlpage,Page * newrpage)448 dataBeginPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
449 						 void *insertdata,
450 						 void **ptp_workspace,
451 						 Page *newlpage, Page *newrpage)
452 {
453 	GinBtreeDataLeafInsertData *items = insertdata;
454 	ItemPointer newItems = &items->items[items->curitem];
455 	int			maxitems = items->nitem - items->curitem;
456 	Page		page = BufferGetPage(buf);
457 	int			i;
458 	ItemPointerData rbound;
459 	ItemPointerData lbound;
460 	bool		needsplit;
461 	bool		append;
462 	int			segsize;
463 	Size		freespace;
464 	disassembledLeaf *leaf;
465 	leafSegmentInfo *lastleftinfo;
466 	ItemPointerData maxOldItem;
467 	ItemPointerData remaining;
468 
469 	rbound = *GinDataPageGetRightBound(page);
470 
471 	/*
472 	 * Count how many of the new items belong to this page.
473 	 */
474 	if (!GinPageRightMost(page))
475 	{
476 		for (i = 0; i < maxitems; i++)
477 		{
478 			if (ginCompareItemPointers(&newItems[i], &rbound) > 0)
479 			{
480 				/*
481 				 * This needs to go to some other location in the tree. (The
482 				 * caller should've chosen the insert location so that at
483 				 * least the first item goes here.)
484 				 */
485 				Assert(i > 0);
486 				break;
487 			}
488 		}
489 		maxitems = i;
490 	}
491 
492 	/* Disassemble the data on the page */
493 	leaf = disassembleLeaf(page);
494 
495 	/*
496 	 * Are we appending to the end of the page? IOW, are all the new items
497 	 * larger than any of the existing items.
498 	 */
499 	if (!dlist_is_empty(&leaf->segments))
500 	{
501 		lastleftinfo = dlist_container(leafSegmentInfo, node,
502 									   dlist_tail_node(&leaf->segments));
503 		if (!lastleftinfo->items)
504 			lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
505 													   &lastleftinfo->nitems);
506 		maxOldItem = lastleftinfo->items[lastleftinfo->nitems - 1];
507 		if (ginCompareItemPointers(&newItems[0], &maxOldItem) >= 0)
508 			append = true;
509 		else
510 			append = false;
511 	}
512 	else
513 	{
514 		ItemPointerSetMin(&maxOldItem);
515 		append = true;
516 	}
517 
518 	/*
519 	 * If we're appending to the end of the page, we will append as many items
520 	 * as we can fit (after splitting), and stop when the pages becomes full.
521 	 * Otherwise we have to limit the number of new items to insert, because
522 	 * once we start packing we can't just stop when we run out of space,
523 	 * because we must make sure that all the old items still fit.
524 	 */
525 	if (GinPageIsCompressed(page))
526 		freespace = GinDataLeafPageGetFreeSpace(page);
527 	else
528 		freespace = 0;
529 	if (append)
530 	{
531 		/*
532 		 * Even when appending, trying to append more items than will fit is
533 		 * not completely free, because we will merge the new items and old
534 		 * items into an array below. In the best case, every new item fits in
535 		 * a single byte, and we can use all the free space on the old page as
536 		 * well as the new page. For simplicity, ignore segment overhead etc.
537 		 */
538 		maxitems = Min(maxitems, freespace + GinDataPageMaxDataSize);
539 	}
540 	else
541 	{
542 		/*
543 		 * Calculate a conservative estimate of how many new items we can fit
544 		 * on the two pages after splitting.
545 		 *
546 		 * We can use any remaining free space on the old page to store full
547 		 * segments, as well as the new page. Each full-sized segment can hold
548 		 * at least MinTuplesPerSegment items
549 		 */
550 		int			nnewsegments;
551 
552 		nnewsegments = freespace / GinPostingListSegmentMaxSize;
553 		nnewsegments += GinDataPageMaxDataSize / GinPostingListSegmentMaxSize;
554 		maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
555 	}
556 
557 	/* Add the new items to the segment list */
558 	if (!addItemsToLeaf(leaf, newItems, maxitems))
559 	{
560 		/* all items were duplicates, we have nothing to do */
561 		items->curitem += maxitems;
562 
563 		return GPTP_NO_WORK;
564 	}
565 
566 	/*
567 	 * Pack the items back to compressed segments, ready for writing to disk.
568 	 */
569 	needsplit = leafRepackItems(leaf, &remaining);
570 
571 	/*
572 	 * Did all the new items fit?
573 	 *
574 	 * If we're appending, it's OK if they didn't. But as a sanity check,
575 	 * verify that all the old items fit.
576 	 */
577 	if (ItemPointerIsValid(&remaining))
578 	{
579 		if (!append || ItemPointerCompare(&maxOldItem, &remaining) >= 0)
580 			elog(ERROR, "could not split GIN page; all old items didn't fit");
581 
582 		/* Count how many of the new items did fit. */
583 		for (i = 0; i < maxitems; i++)
584 		{
585 			if (ginCompareItemPointers(&newItems[i], &remaining) >= 0)
586 				break;
587 		}
588 		if (i == 0)
589 			elog(ERROR, "could not split GIN page; no new items fit");
590 		maxitems = i;
591 	}
592 
593 	if (!needsplit)
594 	{
595 		/*
596 		 * Great, all the items fit on a single page.  If needed, prepare data
597 		 * for a WAL record describing the changes we'll make.
598 		 */
599 		if (RelationNeedsWAL(btree->index))
600 			computeLeafRecompressWALData(leaf);
601 
602 		/*
603 		 * We're ready to enter the critical section, but
604 		 * dataExecPlaceToPageLeaf will need access to the "leaf" data.
605 		 */
606 		*ptp_workspace = leaf;
607 
608 		if (append)
609 			elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
610 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
611 				 items->nitem - items->curitem - maxitems);
612 		else
613 			elog(DEBUG2, "inserted %d new items to block %u; %d bytes (%d to go)",
614 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
615 				 items->nitem - items->curitem - maxitems);
616 	}
617 	else
618 	{
619 		/*
620 		 * Have to split.
621 		 *
622 		 * leafRepackItems already divided the segments between the left and
623 		 * the right page. It filled the left page as full as possible, and
624 		 * put the rest to the right page. When building a new index, that's
625 		 * good, because the table is scanned from beginning to end and there
626 		 * won't be any more insertions to the left page during the build.
627 		 * This packs the index as tight as possible. But otherwise, split
628 		 * 50/50, by moving segments from the left page to the right page
629 		 * until they're balanced.
630 		 *
631 		 * As a further heuristic, when appending items to the end of the
632 		 * page, try to make the left page 75% full, on the assumption that
633 		 * subsequent insertions will probably also go to the end. This packs
634 		 * the index somewhat tighter when appending to a table, which is very
635 		 * common.
636 		 */
637 		if (!btree->isBuild)
638 		{
639 			while (dlist_has_prev(&leaf->segments, leaf->lastleft))
640 			{
641 				lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
642 
643 				/* ignore deleted segments */
644 				if (lastleftinfo->action != GIN_SEGMENT_DELETE)
645 				{
646 					segsize = SizeOfGinPostingList(lastleftinfo->seg);
647 
648 					/*
649 					 * Note that we check that the right page doesn't become
650 					 * more full than the left page even when appending. It's
651 					 * possible that we added enough items to make both pages
652 					 * more than 75% full.
653 					 */
654 					if ((leaf->lsize - segsize) - (leaf->rsize + segsize) < 0)
655 						break;
656 					if (append)
657 					{
658 						if ((leaf->lsize - segsize) < (BLCKSZ * 3) / 4)
659 							break;
660 					}
661 
662 					leaf->lsize -= segsize;
663 					leaf->rsize += segsize;
664 				}
665 				leaf->lastleft = dlist_prev_node(&leaf->segments, leaf->lastleft);
666 			}
667 		}
668 		Assert(leaf->lsize <= GinDataPageMaxDataSize);
669 		Assert(leaf->rsize <= GinDataPageMaxDataSize);
670 
671 		/*
672 		 * Fetch the max item in the left page's last segment; it becomes the
673 		 * right bound of the page.
674 		 */
675 		lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
676 		if (!lastleftinfo->items)
677 			lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
678 													   &lastleftinfo->nitems);
679 		lbound = lastleftinfo->items[lastleftinfo->nitems - 1];
680 
681 		/*
682 		 * Now allocate a couple of temporary page images, and fill them.
683 		 */
684 		*newlpage = palloc(BLCKSZ);
685 		*newrpage = palloc(BLCKSZ);
686 
687 		dataPlaceToPageLeafSplit(leaf, lbound, rbound,
688 								 *newlpage, *newrpage);
689 
690 		Assert(GinPageRightMost(page) ||
691 			   ginCompareItemPointers(GinDataPageGetRightBound(*newlpage),
692 									  GinDataPageGetRightBound(*newrpage)) < 0);
693 
694 		if (append)
695 			elog(DEBUG2, "appended %d items to block %u; split %d/%d (%d to go)",
696 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
697 				 items->nitem - items->curitem - maxitems);
698 		else
699 			elog(DEBUG2, "inserted %d items to block %u; split %d/%d (%d to go)",
700 				 maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
701 				 items->nitem - items->curitem - maxitems);
702 	}
703 
704 	items->curitem += maxitems;
705 
706 	return needsplit ? GPTP_SPLIT : GPTP_INSERT;
707 }
708 
709 /*
710  * Perform data insertion after beginPlaceToPage has decided it will fit.
711  *
712  * This is invoked within a critical section, and XLOG record creation (if
713  * needed) is already started.  The target buffer is registered in slot 0.
714  */
715 static void
dataExecPlaceToPageLeaf(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,void * ptp_workspace)716 dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
717 						void *insertdata, void *ptp_workspace)
718 {
719 	disassembledLeaf *leaf = (disassembledLeaf *) ptp_workspace;
720 
721 	/* Apply changes to page */
722 	dataPlaceToPageLeafRecompress(buf, leaf);
723 
724 	/* If needed, register WAL data built by computeLeafRecompressWALData */
725 	if (RelationNeedsWAL(btree->index))
726 	{
727 		XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
728 	}
729 }
730 
731 /*
732  * Vacuum a posting tree leaf page.
733  */
734 void
ginVacuumPostingTreeLeaf(Relation indexrel,Buffer buffer,GinVacuumState * gvs)735 ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs)
736 {
737 	Page		page = BufferGetPage(buffer);
738 	disassembledLeaf *leaf;
739 	bool		removedsomething = false;
740 	dlist_iter	iter;
741 
742 	leaf = disassembleLeaf(page);
743 
744 	/* Vacuum each segment. */
745 	dlist_foreach(iter, &leaf->segments)
746 	{
747 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
748 		int			oldsegsize;
749 		ItemPointer cleaned;
750 		int			ncleaned;
751 
752 		if (!seginfo->items)
753 			seginfo->items = ginPostingListDecode(seginfo->seg,
754 												  &seginfo->nitems);
755 		if (seginfo->seg)
756 			oldsegsize = SizeOfGinPostingList(seginfo->seg);
757 		else
758 			oldsegsize = GinDataPageMaxDataSize;
759 
760 		cleaned = ginVacuumItemPointers(gvs,
761 										seginfo->items,
762 										seginfo->nitems,
763 										&ncleaned);
764 		pfree(seginfo->items);
765 		seginfo->items = NULL;
766 		seginfo->nitems = 0;
767 		if (cleaned)
768 		{
769 			if (ncleaned > 0)
770 			{
771 				int			npacked;
772 
773 				seginfo->seg = ginCompressPostingList(cleaned,
774 													  ncleaned,
775 													  oldsegsize,
776 													  &npacked);
777 				/* Removing an item never increases the size of the segment */
778 				if (npacked != ncleaned)
779 					elog(ERROR, "could not fit vacuumed posting list");
780 				seginfo->action = GIN_SEGMENT_REPLACE;
781 			}
782 			else
783 			{
784 				seginfo->seg = NULL;
785 				seginfo->items = NULL;
786 				seginfo->action = GIN_SEGMENT_DELETE;
787 			}
788 			seginfo->nitems = ncleaned;
789 
790 			removedsomething = true;
791 		}
792 	}
793 
794 	/*
795 	 * If we removed any items, reconstruct the page from the pieces.
796 	 *
797 	 * We don't try to re-encode the segments here, even though some of them
798 	 * might be really small now that we've removed some items from them. It
799 	 * seems like a waste of effort, as there isn't really any benefit from
800 	 * larger segments per se; larger segments only help to pack more items in
801 	 * the same space. We might as well delay doing that until the next
802 	 * insertion, which will need to re-encode at least part of the page
803 	 * anyway.
804 	 *
805 	 * Also note if the page was in uncompressed, pre-9.4 format before, it is
806 	 * now represented as one huge segment that contains all the items. It
807 	 * might make sense to split that, to speed up random access, but we don't
808 	 * bother. You'll have to REINDEX anyway if you want the full gain of the
809 	 * new tighter index format.
810 	 */
811 	if (removedsomething)
812 	{
813 		bool		modified;
814 
815 		/*
816 		 * Make sure we have a palloc'd copy of all segments, after the first
817 		 * segment that is modified. (dataPlaceToPageLeafRecompress requires
818 		 * this).
819 		 */
820 		modified = false;
821 		dlist_foreach(iter, &leaf->segments)
822 		{
823 			leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
824 													   iter.cur);
825 
826 			if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
827 				modified = true;
828 			if (modified && seginfo->action != GIN_SEGMENT_DELETE)
829 			{
830 				int			segsize = SizeOfGinPostingList(seginfo->seg);
831 				GinPostingList *tmp = (GinPostingList *) palloc(segsize);
832 
833 				memcpy(tmp, seginfo->seg, segsize);
834 				seginfo->seg = tmp;
835 			}
836 		}
837 
838 		if (RelationNeedsWAL(indexrel))
839 			computeLeafRecompressWALData(leaf);
840 
841 		/* Apply changes to page */
842 		START_CRIT_SECTION();
843 
844 		dataPlaceToPageLeafRecompress(buffer, leaf);
845 
846 		MarkBufferDirty(buffer);
847 
848 		if (RelationNeedsWAL(indexrel))
849 		{
850 			XLogRecPtr	recptr;
851 
852 			XLogBeginInsert();
853 			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
854 			XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
855 			recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE);
856 			PageSetLSN(page, recptr);
857 		}
858 
859 		END_CRIT_SECTION();
860 	}
861 }
862 
863 /*
864  * Construct a ginxlogRecompressDataLeaf record representing the changes
865  * in *leaf.  (Because this requires a palloc, we have to do it before
866  * we enter the critical section that actually updates the page.)
867  */
868 static void
computeLeafRecompressWALData(disassembledLeaf * leaf)869 computeLeafRecompressWALData(disassembledLeaf *leaf)
870 {
871 	int			nmodified = 0;
872 	char	   *walbufbegin;
873 	char	   *walbufend;
874 	dlist_iter	iter;
875 	int			segno;
876 	ginxlogRecompressDataLeaf *recompress_xlog;
877 
878 	/* Count the modified segments */
879 	dlist_foreach(iter, &leaf->segments)
880 	{
881 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
882 												   iter.cur);
883 
884 		if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
885 			nmodified++;
886 	}
887 
888 	walbufbegin =
889 		palloc(sizeof(ginxlogRecompressDataLeaf) +
890 			   BLCKSZ +			/* max size needed to hold the segment data */
891 			   nmodified * 2	/* (segno + action) per action */
892 		);
893 	walbufend = walbufbegin;
894 
895 	recompress_xlog = (ginxlogRecompressDataLeaf *) walbufend;
896 	walbufend += sizeof(ginxlogRecompressDataLeaf);
897 
898 	recompress_xlog->nactions = nmodified;
899 
900 	segno = 0;
901 	dlist_foreach(iter, &leaf->segments)
902 	{
903 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
904 												   iter.cur);
905 		int			segsize = 0;
906 		int			datalen;
907 		uint8		action = seginfo->action;
908 
909 		if (action == GIN_SEGMENT_UNMODIFIED)
910 		{
911 			segno++;
912 			continue;
913 		}
914 
915 		if (action != GIN_SEGMENT_DELETE)
916 			segsize = SizeOfGinPostingList(seginfo->seg);
917 
918 		/*
919 		 * If storing the uncompressed list of added item pointers would take
920 		 * more space than storing the compressed segment as is, do that
921 		 * instead.
922 		 */
923 		if (action == GIN_SEGMENT_ADDITEMS &&
924 			seginfo->nmodifieditems * sizeof(ItemPointerData) > segsize)
925 		{
926 			action = GIN_SEGMENT_REPLACE;
927 		}
928 
929 		*((uint8 *) (walbufend++)) = segno;
930 		*(walbufend++) = action;
931 
932 		switch (action)
933 		{
934 			case GIN_SEGMENT_DELETE:
935 				datalen = 0;
936 				break;
937 
938 			case GIN_SEGMENT_ADDITEMS:
939 				datalen = seginfo->nmodifieditems * sizeof(ItemPointerData);
940 				memcpy(walbufend, &seginfo->nmodifieditems, sizeof(uint16));
941 				memcpy(walbufend + sizeof(uint16), seginfo->modifieditems, datalen);
942 				datalen += sizeof(uint16);
943 				break;
944 
945 			case GIN_SEGMENT_INSERT:
946 			case GIN_SEGMENT_REPLACE:
947 				datalen = SHORTALIGN(segsize);
948 				memcpy(walbufend, seginfo->seg, segsize);
949 				break;
950 
951 			default:
952 				elog(ERROR, "unexpected GIN leaf action %d", action);
953 		}
954 		walbufend += datalen;
955 
956 		if (action != GIN_SEGMENT_INSERT)
957 			segno++;
958 	}
959 
960 	/* Pass back the constructed info via *leaf */
961 	leaf->walinfo = walbufbegin;
962 	leaf->walinfolen = walbufend - walbufbegin;
963 }
964 
965 /*
966  * Assemble a disassembled posting tree leaf page back to a buffer.
967  *
968  * This just updates the target buffer; WAL stuff is caller's responsibility.
969  *
970  * NOTE: The segment pointers must not point directly to the same buffer,
971  * except for segments that have not been modified and whose preceding
972  * segments have not been modified either.
973  */
974 static void
dataPlaceToPageLeafRecompress(Buffer buf,disassembledLeaf * leaf)975 dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf)
976 {
977 	Page		page = BufferGetPage(buf);
978 	char	   *ptr;
979 	int			newsize;
980 	bool		modified = false;
981 	dlist_iter	iter;
982 	int			segsize;
983 
984 	/*
985 	 * If the page was in pre-9.4 format before, convert the header, and force
986 	 * all segments to be copied to the page whether they were modified or
987 	 * not.
988 	 */
989 	if (!GinPageIsCompressed(page))
990 	{
991 		Assert(leaf->oldformat);
992 		GinPageSetCompressed(page);
993 		GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
994 		modified = true;
995 	}
996 
997 	ptr = (char *) GinDataLeafPageGetPostingList(page);
998 	newsize = 0;
999 	dlist_foreach(iter, &leaf->segments)
1000 	{
1001 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
1002 
1003 		if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
1004 			modified = true;
1005 
1006 		if (seginfo->action != GIN_SEGMENT_DELETE)
1007 		{
1008 			segsize = SizeOfGinPostingList(seginfo->seg);
1009 
1010 			if (modified)
1011 				memcpy(ptr, seginfo->seg, segsize);
1012 
1013 			ptr += segsize;
1014 			newsize += segsize;
1015 		}
1016 	}
1017 
1018 	Assert(newsize <= GinDataPageMaxDataSize);
1019 	GinDataPageSetDataSize(page, newsize);
1020 }
1021 
1022 /*
1023  * Like dataPlaceToPageLeafRecompress, but writes the disassembled leaf
1024  * segments to two pages instead of one.
1025  *
1026  * This is different from the non-split cases in that this does not modify
1027  * the original page directly, but writes to temporary in-memory copies of
1028  * the new left and right pages.
1029  */
1030 static void
dataPlaceToPageLeafSplit(disassembledLeaf * leaf,ItemPointerData lbound,ItemPointerData rbound,Page lpage,Page rpage)1031 dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
1032 						 ItemPointerData lbound, ItemPointerData rbound,
1033 						 Page lpage, Page rpage)
1034 {
1035 	char	   *ptr;
1036 	int			segsize;
1037 	int			lsize;
1038 	int			rsize;
1039 	dlist_node *node;
1040 	dlist_node *firstright;
1041 	leafSegmentInfo *seginfo;
1042 
1043 	/* Initialize temporary pages to hold the new left and right pages */
1044 	GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1045 	GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1046 
1047 	/*
1048 	 * Copy the segments that go to the left page.
1049 	 *
1050 	 * XXX: We should skip copying the unmodified part of the left page, like
1051 	 * we do when recompressing.
1052 	 */
1053 	lsize = 0;
1054 	ptr = (char *) GinDataLeafPageGetPostingList(lpage);
1055 	firstright = dlist_next_node(&leaf->segments, leaf->lastleft);
1056 	for (node = dlist_head_node(&leaf->segments);
1057 		 node != firstright;
1058 		 node = dlist_next_node(&leaf->segments, node))
1059 	{
1060 		seginfo = dlist_container(leafSegmentInfo, node, node);
1061 
1062 		if (seginfo->action != GIN_SEGMENT_DELETE)
1063 		{
1064 			segsize = SizeOfGinPostingList(seginfo->seg);
1065 			memcpy(ptr, seginfo->seg, segsize);
1066 			ptr += segsize;
1067 			lsize += segsize;
1068 		}
1069 	}
1070 	Assert(lsize == leaf->lsize);
1071 	GinDataPageSetDataSize(lpage, lsize);
1072 	*GinDataPageGetRightBound(lpage) = lbound;
1073 
1074 	/* Copy the segments that go to the right page */
1075 	ptr = (char *) GinDataLeafPageGetPostingList(rpage);
1076 	rsize = 0;
1077 	for (node = firstright;
1078 		 ;
1079 		 node = dlist_next_node(&leaf->segments, node))
1080 	{
1081 		seginfo = dlist_container(leafSegmentInfo, node, node);
1082 
1083 		if (seginfo->action != GIN_SEGMENT_DELETE)
1084 		{
1085 			segsize = SizeOfGinPostingList(seginfo->seg);
1086 			memcpy(ptr, seginfo->seg, segsize);
1087 			ptr += segsize;
1088 			rsize += segsize;
1089 		}
1090 
1091 		if (!dlist_has_next(&leaf->segments, node))
1092 			break;
1093 	}
1094 	Assert(rsize == leaf->rsize);
1095 	GinDataPageSetDataSize(rpage, rsize);
1096 	*GinDataPageGetRightBound(rpage) = rbound;
1097 }
1098 
1099 /*
1100  * Prepare to insert data on an internal data page.
1101  *
1102  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
1103  * before we enter the insertion critical section.  *ptp_workspace can be
1104  * set to pass information along to the execPlaceToPage function.
1105  *
1106  * If it won't fit, perform a page split and return two temporary page
1107  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
1108  *
1109  * In neither case should the given page buffer be modified here.
1110  *
1111  * Note: on insertion to an internal node, in addition to inserting the given
1112  * item, the downlink of the existing item at stack->off will be updated to
1113  * point to updateblkno.
1114  */
1115 static GinPlaceToPageRC
dataBeginPlaceToPageInternal(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)1116 dataBeginPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1117 							 void *insertdata, BlockNumber updateblkno,
1118 							 void **ptp_workspace,
1119 							 Page *newlpage, Page *newrpage)
1120 {
1121 	Page		page = BufferGetPage(buf);
1122 
1123 	/* If it doesn't fit, deal with split case */
1124 	if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
1125 	{
1126 		dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
1127 							  newlpage, newrpage);
1128 		return GPTP_SPLIT;
1129 	}
1130 
1131 	/* Else, we're ready to proceed with insertion */
1132 	return GPTP_INSERT;
1133 }
1134 
1135 /*
1136  * Perform data insertion after beginPlaceToPage has decided it will fit.
1137  *
1138  * This is invoked within a critical section, and XLOG record creation (if
1139  * needed) is already started.  The target buffer is registered in slot 0.
1140  */
1141 static void
dataExecPlaceToPageInternal(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void * ptp_workspace)1142 dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1143 							void *insertdata, BlockNumber updateblkno,
1144 							void *ptp_workspace)
1145 {
1146 	Page		page = BufferGetPage(buf);
1147 	OffsetNumber off = stack->off;
1148 	PostingItem *pitem;
1149 
1150 	/* Update existing downlink to point to next page (on internal page) */
1151 	pitem = GinDataPageGetPostingItem(page, off);
1152 	PostingItemSetBlockNumber(pitem, updateblkno);
1153 
1154 	/* Add new item */
1155 	pitem = (PostingItem *) insertdata;
1156 	GinDataPageAddPostingItem(page, pitem, off);
1157 
1158 	if (RelationNeedsWAL(btree->index))
1159 	{
1160 		/*
1161 		 * This must be static, because it has to survive until XLogInsert,
1162 		 * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
1163 		 * isn't reentrant anyway.
1164 		 */
1165 		static ginxlogInsertDataInternal data;
1166 
1167 		data.offset = off;
1168 		data.newitem = *pitem;
1169 
1170 		XLogRegisterBufData(0, (char *) &data,
1171 							sizeof(ginxlogInsertDataInternal));
1172 	}
1173 }
1174 
1175 /*
1176  * Prepare to insert data on a posting-tree data page.
1177  *
1178  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
1179  * before we enter the insertion critical section.  *ptp_workspace can be
1180  * set to pass information along to the execPlaceToPage function.
1181  *
1182  * If it won't fit, perform a page split and return two temporary page
1183  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
1184  *
1185  * In neither case should the given page buffer be modified here.
1186  *
1187  * Note: on insertion to an internal node, in addition to inserting the given
1188  * item, the downlink of the existing item at stack->off will be updated to
1189  * point to updateblkno.
1190  *
1191  * Calls relevant function for internal or leaf page because they are handled
1192  * very differently.
1193  */
1194 static GinPlaceToPageRC
dataBeginPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)1195 dataBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1196 					 void *insertdata, BlockNumber updateblkno,
1197 					 void **ptp_workspace,
1198 					 Page *newlpage, Page *newrpage)
1199 {
1200 	Page		page = BufferGetPage(buf);
1201 
1202 	Assert(GinPageIsData(page));
1203 
1204 	if (GinPageIsLeaf(page))
1205 		return dataBeginPlaceToPageLeaf(btree, buf, stack, insertdata,
1206 										ptp_workspace,
1207 										newlpage, newrpage);
1208 	else
1209 		return dataBeginPlaceToPageInternal(btree, buf, stack,
1210 											insertdata, updateblkno,
1211 											ptp_workspace,
1212 											newlpage, newrpage);
1213 }
1214 
1215 /*
1216  * Perform data insertion after beginPlaceToPage has decided it will fit.
1217  *
1218  * This is invoked within a critical section, and XLOG record creation (if
1219  * needed) is already started.  The target buffer is registered in slot 0.
1220  *
1221  * Calls relevant function for internal or leaf page because they are handled
1222  * very differently.
1223  */
1224 static void
dataExecPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,void * ptp_workspace)1225 dataExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1226 					void *insertdata, BlockNumber updateblkno,
1227 					void *ptp_workspace)
1228 {
1229 	Page		page = BufferGetPage(buf);
1230 
1231 	if (GinPageIsLeaf(page))
1232 		dataExecPlaceToPageLeaf(btree, buf, stack, insertdata,
1233 								ptp_workspace);
1234 	else
1235 		dataExecPlaceToPageInternal(btree, buf, stack, insertdata,
1236 									updateblkno, ptp_workspace);
1237 }
1238 
1239 /*
1240  * Split internal page and insert new data.
1241  *
1242  * Returns new temp pages to *newlpage and *newrpage.
1243  * The original buffer is left untouched.
1244  */
1245 static void
dataSplitPageInternal(GinBtree btree,Buffer origbuf,GinBtreeStack * stack,void * insertdata,BlockNumber updateblkno,Page * newlpage,Page * newrpage)1246 dataSplitPageInternal(GinBtree btree, Buffer origbuf,
1247 					  GinBtreeStack *stack,
1248 					  void *insertdata, BlockNumber updateblkno,
1249 					  Page *newlpage, Page *newrpage)
1250 {
1251 	Page		oldpage = BufferGetPage(origbuf);
1252 	OffsetNumber off = stack->off;
1253 	int			nitems = GinPageGetOpaque(oldpage)->maxoff;
1254 	int			nleftitems;
1255 	int			nrightitems;
1256 	Size		pageSize = PageGetPageSize(oldpage);
1257 	ItemPointerData oldbound = *GinDataPageGetRightBound(oldpage);
1258 	ItemPointer bound;
1259 	Page		lpage;
1260 	Page		rpage;
1261 	OffsetNumber separator;
1262 	PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
1263 
1264 	lpage = PageGetTempPage(oldpage);
1265 	rpage = PageGetTempPage(oldpage);
1266 	GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1267 	GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1268 
1269 	/*
1270 	 * First construct a new list of PostingItems, which includes all the old
1271 	 * items, and the new item.
1272 	 */
1273 	memcpy(allitems, GinDataPageGetPostingItem(oldpage, FirstOffsetNumber),
1274 		   (off - 1) * sizeof(PostingItem));
1275 
1276 	allitems[off - 1] = *((PostingItem *) insertdata);
1277 	memcpy(&allitems[off], GinDataPageGetPostingItem(oldpage, off),
1278 		   (nitems - (off - 1)) * sizeof(PostingItem));
1279 	nitems++;
1280 
1281 	/* Update existing downlink to point to next page */
1282 	PostingItemSetBlockNumber(&allitems[off], updateblkno);
1283 
1284 	/*
1285 	 * When creating a new index, fit as many tuples as possible on the left
1286 	 * page, on the assumption that the table is scanned from beginning to
1287 	 * end. This packs the index as tight as possible.
1288 	 */
1289 	if (btree->isBuild && GinPageRightMost(oldpage))
1290 		separator = GinNonLeafDataPageGetFreeSpace(rpage) / sizeof(PostingItem);
1291 	else
1292 		separator = nitems / 2;
1293 	nleftitems = separator;
1294 	nrightitems = nitems - separator;
1295 
1296 	memcpy(GinDataPageGetPostingItem(lpage, FirstOffsetNumber),
1297 		   allitems,
1298 		   nleftitems * sizeof(PostingItem));
1299 	GinPageGetOpaque(lpage)->maxoff = nleftitems;
1300 	memcpy(GinDataPageGetPostingItem(rpage, FirstOffsetNumber),
1301 		   &allitems[separator],
1302 		   nrightitems * sizeof(PostingItem));
1303 	GinPageGetOpaque(rpage)->maxoff = nrightitems;
1304 
1305 	/*
1306 	 * Also set pd_lower for both pages, like GinDataPageAddPostingItem does.
1307 	 */
1308 	GinDataPageSetDataSize(lpage, nleftitems * sizeof(PostingItem));
1309 	GinDataPageSetDataSize(rpage, nrightitems * sizeof(PostingItem));
1310 
1311 	/* set up right bound for left page */
1312 	bound = GinDataPageGetRightBound(lpage);
1313 	*bound = GinDataPageGetPostingItem(lpage, nleftitems)->key;
1314 
1315 	/* set up right bound for right page */
1316 	*GinDataPageGetRightBound(rpage) = oldbound;
1317 
1318 	/* return temp pages to caller */
1319 	*newlpage = lpage;
1320 	*newrpage = rpage;
1321 }
1322 
1323 /*
1324  * Construct insertion payload for inserting the downlink for given buffer.
1325  */
1326 static void *
dataPrepareDownlink(GinBtree btree,Buffer lbuf)1327 dataPrepareDownlink(GinBtree btree, Buffer lbuf)
1328 {
1329 	PostingItem *pitem = palloc(sizeof(PostingItem));
1330 	Page		lpage = BufferGetPage(lbuf);
1331 
1332 	PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
1333 	pitem->key = *GinDataPageGetRightBound(lpage);
1334 
1335 	return pitem;
1336 }
1337 
1338 /*
1339  * Fills new root by right bound values from child.
1340  * Also called from ginxlog, should not use btree
1341  */
1342 void
ginDataFillRoot(GinBtree btree,Page root,BlockNumber lblkno,Page lpage,BlockNumber rblkno,Page rpage)1343 ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
1344 {
1345 	PostingItem li,
1346 				ri;
1347 
1348 	li.key = *GinDataPageGetRightBound(lpage);
1349 	PostingItemSetBlockNumber(&li, lblkno);
1350 	GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
1351 
1352 	ri.key = *GinDataPageGetRightBound(rpage);
1353 	PostingItemSetBlockNumber(&ri, rblkno);
1354 	GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
1355 }
1356 
1357 
1358 /*** Functions to work with disassembled leaf pages ***/
1359 
1360 /*
1361  * Disassemble page into a disassembledLeaf struct.
1362  */
1363 static disassembledLeaf *
disassembleLeaf(Page page)1364 disassembleLeaf(Page page)
1365 {
1366 	disassembledLeaf *leaf;
1367 	GinPostingList *seg;
1368 	Pointer		segbegin;
1369 	Pointer		segend;
1370 
1371 	leaf = palloc0(sizeof(disassembledLeaf));
1372 	dlist_init(&leaf->segments);
1373 
1374 	if (GinPageIsCompressed(page))
1375 	{
1376 		/*
1377 		 * Create a leafSegment entry for each segment.
1378 		 */
1379 		seg = GinDataLeafPageGetPostingList(page);
1380 		segbegin = (Pointer) seg;
1381 		segend = segbegin + GinDataLeafPageGetPostingListSize(page);
1382 		while ((Pointer) seg < segend)
1383 		{
1384 			leafSegmentInfo *seginfo = palloc(sizeof(leafSegmentInfo));
1385 
1386 			seginfo->action = GIN_SEGMENT_UNMODIFIED;
1387 			seginfo->seg = seg;
1388 			seginfo->items = NULL;
1389 			seginfo->nitems = 0;
1390 			dlist_push_tail(&leaf->segments, &seginfo->node);
1391 
1392 			seg = GinNextPostingListSegment(seg);
1393 		}
1394 		leaf->oldformat = false;
1395 	}
1396 	else
1397 	{
1398 		/*
1399 		 * A pre-9.4 format uncompressed page is represented by a single
1400 		 * segment, with an array of items.  The corner case is uncompressed
1401 		 * page containing no items, which is represented as no segments.
1402 		 */
1403 		ItemPointer uncompressed;
1404 		int			nuncompressed;
1405 		leafSegmentInfo *seginfo;
1406 
1407 		uncompressed = dataLeafPageGetUncompressed(page, &nuncompressed);
1408 
1409 		if (nuncompressed > 0)
1410 		{
1411 			seginfo = palloc(sizeof(leafSegmentInfo));
1412 
1413 			seginfo->action = GIN_SEGMENT_REPLACE;
1414 			seginfo->seg = NULL;
1415 			seginfo->items = palloc(nuncompressed * sizeof(ItemPointerData));
1416 			memcpy(seginfo->items, uncompressed, nuncompressed * sizeof(ItemPointerData));
1417 			seginfo->nitems = nuncompressed;
1418 
1419 			dlist_push_tail(&leaf->segments, &seginfo->node);
1420 		}
1421 
1422 		leaf->oldformat = true;
1423 	}
1424 
1425 	return leaf;
1426 }
1427 
1428 /*
1429  * Distribute newItems to the segments.
1430  *
1431  * Any segments that acquire new items are decoded, and the new items are
1432  * merged with the old items.
1433  *
1434  * Returns true if any new items were added. False means they were all
1435  * duplicates of existing items on the page.
1436  */
1437 static bool
addItemsToLeaf(disassembledLeaf * leaf,ItemPointer newItems,int nNewItems)1438 addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems)
1439 {
1440 	dlist_iter	iter;
1441 	ItemPointer nextnew = newItems;
1442 	int			newleft = nNewItems;
1443 	bool		modified = false;
1444 	leafSegmentInfo *newseg;
1445 
1446 	/*
1447 	 * If the page is completely empty, just construct one new segment to hold
1448 	 * all the new items.
1449 	 */
1450 	if (dlist_is_empty(&leaf->segments))
1451 	{
1452 		newseg = palloc(sizeof(leafSegmentInfo));
1453 		newseg->seg = NULL;
1454 		newseg->items = newItems;
1455 		newseg->nitems = nNewItems;
1456 		newseg->action = GIN_SEGMENT_INSERT;
1457 		dlist_push_tail(&leaf->segments, &newseg->node);
1458 		return true;
1459 	}
1460 
1461 	dlist_foreach(iter, &leaf->segments)
1462 	{
1463 		leafSegmentInfo *cur = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node, iter.cur);
1464 		int			nthis;
1465 		ItemPointer tmpitems;
1466 		int			ntmpitems;
1467 
1468 		/*
1469 		 * How many of the new items fall into this segment?
1470 		 */
1471 		if (!dlist_has_next(&leaf->segments, iter.cur))
1472 			nthis = newleft;
1473 		else
1474 		{
1475 			leafSegmentInfo *next;
1476 			ItemPointerData next_first;
1477 
1478 			next = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node,
1479 													   dlist_next_node(&leaf->segments, iter.cur));
1480 			if (next->items)
1481 				next_first = next->items[0];
1482 			else
1483 			{
1484 				Assert(next->seg != NULL);
1485 				next_first = next->seg->first;
1486 			}
1487 
1488 			nthis = 0;
1489 			while (nthis < newleft && ginCompareItemPointers(&nextnew[nthis], &next_first) < 0)
1490 				nthis++;
1491 		}
1492 		if (nthis == 0)
1493 			continue;
1494 
1495 		/* Merge the new items with the existing items. */
1496 		if (!cur->items)
1497 			cur->items = ginPostingListDecode(cur->seg, &cur->nitems);
1498 
1499 		/*
1500 		 * Fast path for the important special case that we're appending to
1501 		 * the end of the page: don't let the last segment on the page grow
1502 		 * larger than the target, create a new segment before that happens.
1503 		 */
1504 		if (!dlist_has_next(&leaf->segments, iter.cur) &&
1505 			ginCompareItemPointers(&cur->items[cur->nitems - 1], &nextnew[0]) < 0 &&
1506 			cur->seg != NULL &&
1507 			SizeOfGinPostingList(cur->seg) >= GinPostingListSegmentTargetSize)
1508 		{
1509 			newseg = palloc(sizeof(leafSegmentInfo));
1510 			newseg->seg = NULL;
1511 			newseg->items = nextnew;
1512 			newseg->nitems = nthis;
1513 			newseg->action = GIN_SEGMENT_INSERT;
1514 			dlist_push_tail(&leaf->segments, &newseg->node);
1515 			modified = true;
1516 			break;
1517 		}
1518 
1519 		tmpitems = ginMergeItemPointers(cur->items, cur->nitems,
1520 										nextnew, nthis,
1521 										&ntmpitems);
1522 		if (ntmpitems != cur->nitems)
1523 		{
1524 			/*
1525 			 * If there are no duplicates, track the added items so that we
1526 			 * can emit a compact ADDITEMS WAL record later on. (it doesn't
1527 			 * seem worth re-checking which items were duplicates, if there
1528 			 * were any)
1529 			 */
1530 			if (ntmpitems == nthis + cur->nitems &&
1531 				cur->action == GIN_SEGMENT_UNMODIFIED)
1532 			{
1533 				cur->action = GIN_SEGMENT_ADDITEMS;
1534 				cur->modifieditems = nextnew;
1535 				cur->nmodifieditems = nthis;
1536 			}
1537 			else
1538 				cur->action = GIN_SEGMENT_REPLACE;
1539 
1540 			cur->items = tmpitems;
1541 			cur->nitems = ntmpitems;
1542 			cur->seg = NULL;
1543 			modified = true;
1544 		}
1545 
1546 		nextnew += nthis;
1547 		newleft -= nthis;
1548 		if (newleft == 0)
1549 			break;
1550 	}
1551 
1552 	return modified;
1553 }
1554 
1555 /*
1556  * Recompresses all segments that have been modified.
1557  *
1558  * If not all the items fit on two pages (ie. after split), we store as
1559  * many items as fit, and set *remaining to the first item that didn't fit.
1560  * If all items fit, *remaining is set to invalid.
1561  *
1562  * Returns true if the page has to be split.
1563  */
1564 static bool
leafRepackItems(disassembledLeaf * leaf,ItemPointer remaining)1565 leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining)
1566 {
1567 	int			pgused = 0;
1568 	bool		needsplit = false;
1569 	dlist_iter	iter;
1570 	int			segsize;
1571 	leafSegmentInfo *nextseg;
1572 	int			npacked;
1573 	bool		modified;
1574 	dlist_node *cur_node;
1575 	dlist_node *next_node;
1576 
1577 	ItemPointerSetInvalid(remaining);
1578 
1579 	/*
1580 	 * cannot use dlist_foreach_modify here because we insert adjacent items
1581 	 * while iterating.
1582 	 */
1583 	for (cur_node = dlist_head_node(&leaf->segments);
1584 		 cur_node != NULL;
1585 		 cur_node = next_node)
1586 	{
1587 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1588 												   cur_node);
1589 
1590 		if (dlist_has_next(&leaf->segments, cur_node))
1591 			next_node = dlist_next_node(&leaf->segments, cur_node);
1592 		else
1593 			next_node = NULL;
1594 
1595 		/* Compress the posting list, if necessary */
1596 		if (seginfo->action != GIN_SEGMENT_DELETE)
1597 		{
1598 			if (seginfo->seg == NULL)
1599 			{
1600 				if (seginfo->nitems > GinPostingListSegmentMaxSize)
1601 					npacked = 0;	/* no chance that it would fit. */
1602 				else
1603 				{
1604 					seginfo->seg = ginCompressPostingList(seginfo->items,
1605 														  seginfo->nitems,
1606 														  GinPostingListSegmentMaxSize,
1607 														  &npacked);
1608 				}
1609 				if (npacked != seginfo->nitems)
1610 				{
1611 					/*
1612 					 * Too large. Compress again to the target size, and
1613 					 * create a new segment to represent the remaining items.
1614 					 * The new segment is inserted after this one, so it will
1615 					 * be processed in the next iteration of this loop.
1616 					 */
1617 					if (seginfo->seg)
1618 						pfree(seginfo->seg);
1619 					seginfo->seg = ginCompressPostingList(seginfo->items,
1620 														  seginfo->nitems,
1621 														  GinPostingListSegmentTargetSize,
1622 														  &npacked);
1623 					if (seginfo->action != GIN_SEGMENT_INSERT)
1624 						seginfo->action = GIN_SEGMENT_REPLACE;
1625 
1626 					nextseg = palloc(sizeof(leafSegmentInfo));
1627 					nextseg->action = GIN_SEGMENT_INSERT;
1628 					nextseg->seg = NULL;
1629 					nextseg->items = &seginfo->items[npacked];
1630 					nextseg->nitems = seginfo->nitems - npacked;
1631 					next_node = &nextseg->node;
1632 					dlist_insert_after(cur_node, next_node);
1633 				}
1634 			}
1635 
1636 			/*
1637 			 * If the segment is very small, merge it with the next segment.
1638 			 */
1639 			if (SizeOfGinPostingList(seginfo->seg) < GinPostingListSegmentMinSize && next_node)
1640 			{
1641 				int			nmerged;
1642 
1643 				nextseg = dlist_container(leafSegmentInfo, node, next_node);
1644 
1645 				if (seginfo->items == NULL)
1646 					seginfo->items = ginPostingListDecode(seginfo->seg,
1647 														  &seginfo->nitems);
1648 				if (nextseg->items == NULL)
1649 					nextseg->items = ginPostingListDecode(nextseg->seg,
1650 														  &nextseg->nitems);
1651 				nextseg->items =
1652 					ginMergeItemPointers(seginfo->items, seginfo->nitems,
1653 										 nextseg->items, nextseg->nitems,
1654 										 &nmerged);
1655 				Assert(nmerged == seginfo->nitems + nextseg->nitems);
1656 				nextseg->nitems = nmerged;
1657 				nextseg->seg = NULL;
1658 
1659 				nextseg->action = GIN_SEGMENT_REPLACE;
1660 				nextseg->modifieditems = NULL;
1661 				nextseg->nmodifieditems = 0;
1662 
1663 				if (seginfo->action == GIN_SEGMENT_INSERT)
1664 				{
1665 					dlist_delete(cur_node);
1666 					continue;
1667 				}
1668 				else
1669 				{
1670 					seginfo->action = GIN_SEGMENT_DELETE;
1671 					seginfo->seg = NULL;
1672 				}
1673 			}
1674 
1675 			seginfo->items = NULL;
1676 			seginfo->nitems = 0;
1677 		}
1678 
1679 		if (seginfo->action == GIN_SEGMENT_DELETE)
1680 			continue;
1681 
1682 		/*
1683 		 * OK, we now have a compressed version of this segment ready for
1684 		 * copying to the page. Did we exceed the size that fits on one page?
1685 		 */
1686 		segsize = SizeOfGinPostingList(seginfo->seg);
1687 		if (pgused + segsize > GinDataPageMaxDataSize)
1688 		{
1689 			if (!needsplit)
1690 			{
1691 				/* switch to right page */
1692 				Assert(pgused > 0);
1693 				leaf->lastleft = dlist_prev_node(&leaf->segments, cur_node);
1694 				needsplit = true;
1695 				leaf->lsize = pgused;
1696 				pgused = 0;
1697 			}
1698 			else
1699 			{
1700 				/*
1701 				 * Filled both pages. The last segment we constructed did not
1702 				 * fit.
1703 				 */
1704 				*remaining = seginfo->seg->first;
1705 
1706 				/*
1707 				 * remove all segments that did not fit from the list.
1708 				 */
1709 				while (dlist_has_next(&leaf->segments, cur_node))
1710 					dlist_delete(dlist_next_node(&leaf->segments, cur_node));
1711 				dlist_delete(cur_node);
1712 				break;
1713 			}
1714 		}
1715 
1716 		pgused += segsize;
1717 	}
1718 
1719 	if (!needsplit)
1720 	{
1721 		leaf->lsize = pgused;
1722 		leaf->rsize = 0;
1723 	}
1724 	else
1725 		leaf->rsize = pgused;
1726 
1727 	Assert(leaf->lsize <= GinDataPageMaxDataSize);
1728 	Assert(leaf->rsize <= GinDataPageMaxDataSize);
1729 
1730 	/*
1731 	 * Make a palloc'd copy of every segment after the first modified one,
1732 	 * because as we start copying items to the original page, we might
1733 	 * overwrite an existing segment.
1734 	 */
1735 	modified = false;
1736 	dlist_foreach(iter, &leaf->segments)
1737 	{
1738 		leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1739 												   iter.cur);
1740 
1741 		if (!modified && seginfo->action != GIN_SEGMENT_UNMODIFIED)
1742 		{
1743 			modified = true;
1744 		}
1745 		else if (modified && seginfo->action == GIN_SEGMENT_UNMODIFIED)
1746 		{
1747 			GinPostingList *tmp;
1748 
1749 			segsize = SizeOfGinPostingList(seginfo->seg);
1750 			tmp = palloc(segsize);
1751 			memcpy(tmp, seginfo->seg, segsize);
1752 			seginfo->seg = tmp;
1753 		}
1754 	}
1755 
1756 	return needsplit;
1757 }
1758 
1759 
1760 /*** Functions that are exported to the rest of the GIN code ***/
1761 
1762 /*
1763  * Creates new posting tree containing the given TIDs. Returns the page
1764  * number of the root of the new posting tree.
1765  *
1766  * items[] must be in sorted order with no duplicates.
1767  */
1768 BlockNumber
createPostingTree(Relation index,ItemPointerData * items,uint32 nitems,GinStatsData * buildStats,Buffer entrybuffer)1769 createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
1770 				  GinStatsData *buildStats, Buffer entrybuffer)
1771 {
1772 	BlockNumber blkno;
1773 	Buffer		buffer;
1774 	Page		tmppage;
1775 	Page		page;
1776 	Pointer		ptr;
1777 	int			nrootitems;
1778 	int			rootsize;
1779 
1780 	/* Construct the new root page in memory first. */
1781 	tmppage = (Page) palloc(BLCKSZ);
1782 	GinInitPage(tmppage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1783 	GinPageGetOpaque(tmppage)->rightlink = InvalidBlockNumber;
1784 
1785 	/*
1786 	 * Write as many of the items to the root page as fit. In segments of max
1787 	 * GinPostingListSegmentMaxSize bytes each.
1788 	 */
1789 	nrootitems = 0;
1790 	rootsize = 0;
1791 	ptr = (Pointer) GinDataLeafPageGetPostingList(tmppage);
1792 	while (nrootitems < nitems)
1793 	{
1794 		GinPostingList *segment;
1795 		int			npacked;
1796 		int			segsize;
1797 
1798 		segment = ginCompressPostingList(&items[nrootitems],
1799 										 nitems - nrootitems,
1800 										 GinPostingListSegmentMaxSize,
1801 										 &npacked);
1802 		segsize = SizeOfGinPostingList(segment);
1803 		if (rootsize + segsize > GinDataPageMaxDataSize)
1804 			break;
1805 
1806 		memcpy(ptr, segment, segsize);
1807 		ptr += segsize;
1808 		rootsize += segsize;
1809 		nrootitems += npacked;
1810 		pfree(segment);
1811 	}
1812 	GinDataPageSetDataSize(tmppage, rootsize);
1813 
1814 	/*
1815 	 * All set. Get a new physical page, and copy the in-memory page to it.
1816 	 */
1817 	buffer = GinNewBuffer(index);
1818 	page = BufferGetPage(buffer);
1819 	blkno = BufferGetBlockNumber(buffer);
1820 
1821 	/*
1822 	 * Copy any predicate locks from the entry tree leaf (containing posting
1823 	 * list) to the posting tree.
1824 	 */
1825 	PredicateLockPageSplit(index, BufferGetBlockNumber(entrybuffer), blkno);
1826 
1827 	START_CRIT_SECTION();
1828 
1829 	PageRestoreTempPage(tmppage, page);
1830 	MarkBufferDirty(buffer);
1831 
1832 	if (RelationNeedsWAL(index))
1833 	{
1834 		XLogRecPtr	recptr;
1835 		ginxlogCreatePostingTree data;
1836 
1837 		data.size = rootsize;
1838 
1839 		XLogBeginInsert();
1840 		XLogRegisterData((char *) &data, sizeof(ginxlogCreatePostingTree));
1841 
1842 		XLogRegisterData((char *) GinDataLeafPageGetPostingList(page),
1843 						 rootsize);
1844 		XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
1845 
1846 		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE);
1847 		PageSetLSN(page, recptr);
1848 	}
1849 
1850 	UnlockReleaseBuffer(buffer);
1851 
1852 	END_CRIT_SECTION();
1853 
1854 	/* During index build, count the newly-added data page */
1855 	if (buildStats)
1856 		buildStats->nDataPages++;
1857 
1858 	elog(DEBUG2, "created GIN posting tree with %d items", nrootitems);
1859 
1860 	/*
1861 	 * Add any remaining TIDs to the newly-created posting tree.
1862 	 */
1863 	if (nitems > nrootitems)
1864 	{
1865 		ginInsertItemPointers(index, blkno,
1866 							  items + nrootitems,
1867 							  nitems - nrootitems,
1868 							  buildStats);
1869 	}
1870 
1871 	return blkno;
1872 }
1873 
1874 static void
ginPrepareDataScan(GinBtree btree,Relation index,BlockNumber rootBlkno)1875 ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
1876 {
1877 	memset(btree, 0, sizeof(GinBtreeData));
1878 
1879 	btree->index = index;
1880 	btree->rootBlkno = rootBlkno;
1881 
1882 	btree->findChildPage = dataLocateItem;
1883 	btree->getLeftMostChild = dataGetLeftMostPage;
1884 	btree->isMoveRight = dataIsMoveRight;
1885 	btree->findItem = NULL;
1886 	btree->findChildPtr = dataFindChildPtr;
1887 	btree->beginPlaceToPage = dataBeginPlaceToPage;
1888 	btree->execPlaceToPage = dataExecPlaceToPage;
1889 	btree->fillRoot = ginDataFillRoot;
1890 	btree->prepareDownlink = dataPrepareDownlink;
1891 
1892 	btree->isData = true;
1893 	btree->fullScan = false;
1894 	btree->isBuild = false;
1895 }
1896 
1897 /*
1898  * Inserts array of item pointers, may execute several tree scan (very rare)
1899  */
1900 void
ginInsertItemPointers(Relation index,BlockNumber rootBlkno,ItemPointerData * items,uint32 nitem,GinStatsData * buildStats)1901 ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
1902 					  ItemPointerData *items, uint32 nitem,
1903 					  GinStatsData *buildStats)
1904 {
1905 	GinBtreeData btree;
1906 	GinBtreeDataLeafInsertData insertdata;
1907 	GinBtreeStack *stack;
1908 
1909 	ginPrepareDataScan(&btree, index, rootBlkno);
1910 	btree.isBuild = (buildStats != NULL);
1911 	insertdata.items = items;
1912 	insertdata.nitem = nitem;
1913 	insertdata.curitem = 0;
1914 
1915 	while (insertdata.curitem < insertdata.nitem)
1916 	{
1917 		/* search for the leaf page where the first item should go to */
1918 		btree.itemptr = insertdata.items[insertdata.curitem];
1919 		stack = ginFindLeafPage(&btree, false, true, NULL);
1920 
1921 		ginInsertValue(&btree, stack, &insertdata, buildStats);
1922 	}
1923 }
1924 
1925 /*
1926  * Starts a new scan on a posting tree.
1927  */
1928 GinBtreeStack *
ginScanBeginPostingTree(GinBtree btree,Relation index,BlockNumber rootBlkno,Snapshot snapshot)1929 ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
1930 						Snapshot snapshot)
1931 {
1932 	GinBtreeStack *stack;
1933 
1934 	ginPrepareDataScan(btree, index, rootBlkno);
1935 
1936 	btree->fullScan = true;
1937 
1938 	stack = ginFindLeafPage(btree, true, false, snapshot);
1939 
1940 	return stack;
1941 }
1942