1 /*-------------------------------------------------------------------------
2  *
3  * ginentrypage.c
4  *	  routines for handling GIN entry tree pages.
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gin/ginentrypage.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/xloginsert.h"
20 #include "miscadmin.h"
21 #include "utils/rel.h"
22 
23 static void entrySplitPage(GinBtree btree, Buffer origbuf,
24 			   GinBtreeStack *stack,
25 			   GinBtreeEntryInsertData *insertData,
26 			   BlockNumber updateblkno,
27 			   Page *newlpage, Page *newrpage);
28 
29 /*
30  * Form a tuple for entry tree.
31  *
32  * If the tuple would be too big to be stored, function throws a suitable
33  * error if errorTooBig is TRUE, or returns NULL if errorTooBig is FALSE.
34  *
35  * See src/backend/access/gin/README for a description of the index tuple
36  * format that is being built here.  We build on the assumption that we
37  * are making a leaf-level key entry containing a posting list of nipd items.
38  * If the caller is actually trying to make a posting-tree entry, non-leaf
39  * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
40  * the t_tid fields as necessary.  In any case, 'data' can be NULL to skip
41  * filling in the posting list; the caller is responsible for filling it
42  * afterwards if data = NULL and nipd > 0.
43  */
44 IndexTuple
GinFormTuple(GinState * ginstate,OffsetNumber attnum,Datum key,GinNullCategory category,Pointer data,Size dataSize,int nipd,bool errorTooBig)45 GinFormTuple(GinState *ginstate,
46 			 OffsetNumber attnum, Datum key, GinNullCategory category,
47 			 Pointer data, Size dataSize, int nipd,
48 			 bool errorTooBig)
49 {
50 	Datum		datums[2];
51 	bool		isnull[2];
52 	IndexTuple	itup;
53 	uint32		newsize;
54 
55 	/* Build the basic tuple: optional column number, plus key datum */
56 	if (ginstate->oneCol)
57 	{
58 		datums[0] = key;
59 		isnull[0] = (category != GIN_CAT_NORM_KEY);
60 	}
61 	else
62 	{
63 		datums[0] = UInt16GetDatum(attnum);
64 		isnull[0] = false;
65 		datums[1] = key;
66 		isnull[1] = (category != GIN_CAT_NORM_KEY);
67 	}
68 
69 	itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);
70 
71 	/*
72 	 * Determine and store offset to the posting list, making sure there is
73 	 * room for the category byte if needed.
74 	 *
75 	 * Note: because index_form_tuple MAXALIGNs the tuple size, there may well
76 	 * be some wasted pad space.  Is it worth recomputing the data length to
77 	 * prevent that?  That would also allow us to Assert that the real data
78 	 * doesn't overlap the GinNullCategory byte, which this code currently
79 	 * takes on faith.
80 	 */
81 	newsize = IndexTupleSize(itup);
82 
83 	if (IndexTupleHasNulls(itup))
84 	{
85 		uint32		minsize;
86 
87 		Assert(category != GIN_CAT_NORM_KEY);
88 		minsize = GinCategoryOffset(itup, ginstate) + sizeof(GinNullCategory);
89 		newsize = Max(newsize, minsize);
90 	}
91 
92 	newsize = SHORTALIGN(newsize);
93 
94 	GinSetPostingOffset(itup, newsize);
95 	GinSetNPosting(itup, nipd);
96 
97 	/*
98 	 * Add space needed for posting list, if any.  Then check that the tuple
99 	 * won't be too big to store.
100 	 */
101 	newsize += dataSize;
102 
103 	newsize = MAXALIGN(newsize);
104 
105 	if (newsize > GinMaxItemSize)
106 	{
107 		if (errorTooBig)
108 			ereport(ERROR,
109 					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
110 					 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
111 							(Size) newsize, (Size) GinMaxItemSize,
112 							RelationGetRelationName(ginstate->index))));
113 		pfree(itup);
114 		return NULL;
115 	}
116 
117 	/*
118 	 * Resize tuple if needed
119 	 */
120 	if (newsize != IndexTupleSize(itup))
121 	{
122 		itup = repalloc(itup, newsize);
123 
124 		/*
125 		 * PostgreSQL 9.3 and earlier did not clear this new space, so we
126 		 * might find uninitialized padding when reading tuples from disk.
127 		 */
128 		memset((char *) itup + IndexTupleSize(itup),
129 			   0, newsize - IndexTupleSize(itup));
130 		/* set new size in tuple header */
131 		itup->t_info &= ~INDEX_SIZE_MASK;
132 		itup->t_info |= newsize;
133 	}
134 
135 	/*
136 	 * Copy in the posting list, if provided
137 	 */
138 	if (data)
139 	{
140 		char	   *ptr = GinGetPosting(itup);
141 
142 		memcpy(ptr, data, dataSize);
143 	}
144 
145 	/*
146 	 * Insert category byte, if needed
147 	 */
148 	if (category != GIN_CAT_NORM_KEY)
149 	{
150 		Assert(IndexTupleHasNulls(itup));
151 		GinSetNullCategory(itup, ginstate, category);
152 	}
153 	return itup;
154 }
155 
156 /*
157  * Read item pointers from leaf entry tuple.
158  *
159  * Returns a palloc'd array of ItemPointers. The number of items is returned
160  * in *nitems.
161  */
162 ItemPointer
ginReadTuple(GinState * ginstate,OffsetNumber attnum,IndexTuple itup,int * nitems)163 ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup,
164 			 int *nitems)
165 {
166 	Pointer		ptr = GinGetPosting(itup);
167 	int			nipd = GinGetNPosting(itup);
168 	ItemPointer ipd;
169 	int			ndecoded;
170 
171 	if (GinItupIsCompressed(itup))
172 	{
173 		if (nipd > 0)
174 		{
175 			ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded);
176 			if (nipd != ndecoded)
177 				elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
178 					 nipd, ndecoded);
179 		}
180 		else
181 		{
182 			ipd = palloc(0);
183 		}
184 	}
185 	else
186 	{
187 		ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd);
188 		memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd);
189 	}
190 	*nitems = nipd;
191 	return ipd;
192 }
193 
194 /*
195  * Form a non-leaf entry tuple by copying the key data from the given tuple,
196  * which can be either a leaf or non-leaf entry tuple.
197  *
198  * Any posting list in the source tuple is not copied.  The specified child
199  * block number is inserted into t_tid.
200  */
201 static IndexTuple
GinFormInteriorTuple(IndexTuple itup,Page page,BlockNumber childblk)202 GinFormInteriorTuple(IndexTuple itup, Page page, BlockNumber childblk)
203 {
204 	IndexTuple	nitup;
205 
206 	if (GinPageIsLeaf(page) && !GinIsPostingTree(itup))
207 	{
208 		/* Tuple contains a posting list, just copy stuff before that */
209 		uint32		origsize = GinGetPostingOffset(itup);
210 
211 		origsize = MAXALIGN(origsize);
212 		nitup = (IndexTuple) palloc(origsize);
213 		memcpy(nitup, itup, origsize);
214 		/* ... be sure to fix the size header field ... */
215 		nitup->t_info &= ~INDEX_SIZE_MASK;
216 		nitup->t_info |= origsize;
217 	}
218 	else
219 	{
220 		/* Copy the tuple as-is */
221 		nitup = (IndexTuple) palloc(IndexTupleSize(itup));
222 		memcpy(nitup, itup, IndexTupleSize(itup));
223 	}
224 
225 	/* Now insert the correct downlink */
226 	GinSetDownlink(nitup, childblk);
227 
228 	return nitup;
229 }
230 
231 /*
232  * Entry tree is a "static", ie tuple never deletes from it,
233  * so we don't use right bound, we use rightmost key instead.
234  */
235 static IndexTuple
getRightMostTuple(Page page)236 getRightMostTuple(Page page)
237 {
238 	OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
239 
240 	return (IndexTuple) PageGetItem(page, PageGetItemId(page, maxoff));
241 }
242 
243 static bool
entryIsMoveRight(GinBtree btree,Page page)244 entryIsMoveRight(GinBtree btree, Page page)
245 {
246 	IndexTuple	itup;
247 	OffsetNumber attnum;
248 	Datum		key;
249 	GinNullCategory category;
250 
251 	if (GinPageRightMost(page))
252 		return FALSE;
253 
254 	itup = getRightMostTuple(page);
255 	attnum = gintuple_get_attrnum(btree->ginstate, itup);
256 	key = gintuple_get_key(btree->ginstate, itup, &category);
257 
258 	if (ginCompareAttEntries(btree->ginstate,
259 							 btree->entryAttnum, btree->entryKey, btree->entryCategory,
260 							 attnum, key, category) > 0)
261 		return TRUE;
262 
263 	return FALSE;
264 }
265 
266 /*
267  * Find correct tuple in non-leaf page. It supposed that
268  * page correctly chosen and searching value SHOULD be on page
269  */
270 static BlockNumber
entryLocateEntry(GinBtree btree,GinBtreeStack * stack)271 entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
272 {
273 	OffsetNumber low,
274 				high,
275 				maxoff;
276 	IndexTuple	itup = NULL;
277 	int			result;
278 	Page		page = BufferGetPage(stack->buffer);
279 
280 	Assert(!GinPageIsLeaf(page));
281 	Assert(!GinPageIsData(page));
282 
283 	if (btree->fullScan)
284 	{
285 		stack->off = FirstOffsetNumber;
286 		stack->predictNumber *= PageGetMaxOffsetNumber(page);
287 		return btree->getLeftMostChild(btree, page);
288 	}
289 
290 	low = FirstOffsetNumber;
291 	maxoff = high = PageGetMaxOffsetNumber(page);
292 	Assert(high >= low);
293 
294 	high++;
295 
296 	while (high > low)
297 	{
298 		OffsetNumber mid = low + ((high - low) / 2);
299 
300 		if (mid == maxoff && GinPageRightMost(page))
301 		{
302 			/* Right infinity */
303 			result = -1;
304 		}
305 		else
306 		{
307 			OffsetNumber attnum;
308 			Datum		key;
309 			GinNullCategory category;
310 
311 			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
312 			attnum = gintuple_get_attrnum(btree->ginstate, itup);
313 			key = gintuple_get_key(btree->ginstate, itup, &category);
314 			result = ginCompareAttEntries(btree->ginstate,
315 										  btree->entryAttnum,
316 										  btree->entryKey,
317 										  btree->entryCategory,
318 										  attnum, key, category);
319 		}
320 
321 		if (result == 0)
322 		{
323 			stack->off = mid;
324 			Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
325 			return GinGetDownlink(itup);
326 		}
327 		else if (result > 0)
328 			low = mid + 1;
329 		else
330 			high = mid;
331 	}
332 
333 	Assert(high >= FirstOffsetNumber && high <= maxoff);
334 
335 	stack->off = high;
336 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
337 	Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
338 	return GinGetDownlink(itup);
339 }
340 
341 /*
342  * Searches correct position for value on leaf page.
343  * Page should be correctly chosen.
344  * Returns true if value found on page.
345  */
346 static bool
entryLocateLeafEntry(GinBtree btree,GinBtreeStack * stack)347 entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack)
348 {
349 	Page		page = BufferGetPage(stack->buffer);
350 	OffsetNumber low,
351 				high;
352 
353 	Assert(GinPageIsLeaf(page));
354 	Assert(!GinPageIsData(page));
355 
356 	if (btree->fullScan)
357 	{
358 		stack->off = FirstOffsetNumber;
359 		return TRUE;
360 	}
361 
362 	low = FirstOffsetNumber;
363 	high = PageGetMaxOffsetNumber(page);
364 
365 	if (high < low)
366 	{
367 		stack->off = FirstOffsetNumber;
368 		return false;
369 	}
370 
371 	high++;
372 
373 	while (high > low)
374 	{
375 		OffsetNumber mid = low + ((high - low) / 2);
376 		IndexTuple	itup;
377 		OffsetNumber attnum;
378 		Datum		key;
379 		GinNullCategory category;
380 		int			result;
381 
382 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
383 		attnum = gintuple_get_attrnum(btree->ginstate, itup);
384 		key = gintuple_get_key(btree->ginstate, itup, &category);
385 		result = ginCompareAttEntries(btree->ginstate,
386 									  btree->entryAttnum,
387 									  btree->entryKey,
388 									  btree->entryCategory,
389 									  attnum, key, category);
390 		if (result == 0)
391 		{
392 			stack->off = mid;
393 			return true;
394 		}
395 		else if (result > 0)
396 			low = mid + 1;
397 		else
398 			high = mid;
399 	}
400 
401 	stack->off = high;
402 	return false;
403 }
404 
405 static OffsetNumber
entryFindChildPtr(GinBtree btree,Page page,BlockNumber blkno,OffsetNumber storedOff)406 entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
407 {
408 	OffsetNumber i,
409 				maxoff = PageGetMaxOffsetNumber(page);
410 	IndexTuple	itup;
411 
412 	Assert(!GinPageIsLeaf(page));
413 	Assert(!GinPageIsData(page));
414 
415 	/* if page isn't changed, we returns storedOff */
416 	if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
417 	{
418 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
419 		if (GinGetDownlink(itup) == blkno)
420 			return storedOff;
421 
422 		/*
423 		 * we hope, that needed pointer goes to right. It's true if there
424 		 * wasn't a deletion
425 		 */
426 		for (i = storedOff + 1; i <= maxoff; i++)
427 		{
428 			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
429 			if (GinGetDownlink(itup) == blkno)
430 				return i;
431 		}
432 		maxoff = storedOff - 1;
433 	}
434 
435 	/* last chance */
436 	for (i = FirstOffsetNumber; i <= maxoff; i++)
437 	{
438 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
439 		if (GinGetDownlink(itup) == blkno)
440 			return i;
441 	}
442 
443 	return InvalidOffsetNumber;
444 }
445 
446 static BlockNumber
entryGetLeftMostPage(GinBtree btree,Page page)447 entryGetLeftMostPage(GinBtree btree, Page page)
448 {
449 	IndexTuple	itup;
450 
451 	Assert(!GinPageIsLeaf(page));
452 	Assert(!GinPageIsData(page));
453 	Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
454 
455 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
456 	return GinGetDownlink(itup);
457 }
458 
459 static bool
entryIsEnoughSpace(GinBtree btree,Buffer buf,OffsetNumber off,GinBtreeEntryInsertData * insertData)460 entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
461 				   GinBtreeEntryInsertData *insertData)
462 {
463 	Size		releasedsz = 0;
464 	Size		addedsz;
465 	Page		page = BufferGetPage(buf);
466 
467 	Assert(insertData->entry);
468 	Assert(!GinPageIsData(page));
469 
470 	if (insertData->isDelete)
471 	{
472 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
473 
474 		releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
475 	}
476 
477 	addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
478 
479 	if (PageGetFreeSpace(page) + releasedsz >= addedsz)
480 		return true;
481 
482 	return false;
483 }
484 
485 /*
486  * Delete tuple on leaf page if tuples existed and we
487  * should update it, update old child blkno to new right page
488  * if child split occurred
489  */
490 static void
entryPreparePage(GinBtree btree,Page page,OffsetNumber off,GinBtreeEntryInsertData * insertData,BlockNumber updateblkno)491 entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
492 				 GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
493 {
494 	Assert(insertData->entry);
495 	Assert(!GinPageIsData(page));
496 
497 	if (insertData->isDelete)
498 	{
499 		Assert(GinPageIsLeaf(page));
500 		PageIndexTupleDelete(page, off);
501 	}
502 
503 	if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
504 	{
505 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
506 
507 		GinSetDownlink(itup, updateblkno);
508 	}
509 }
510 
511 /*
512  * Prepare to insert data on an entry page.
513  *
514  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
515  * before we enter the insertion critical section.  *ptp_workspace can be
516  * set to pass information along to the execPlaceToPage function.
517  *
518  * If it won't fit, perform a page split and return two temporary page
519  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
520  *
521  * In neither case should the given page buffer be modified here.
522  *
523  * Note: on insertion to an internal node, in addition to inserting the given
524  * item, the downlink of the existing item at stack->off will be updated to
525  * point to updateblkno.
526  */
527 static GinPlaceToPageRC
entryBeginPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertPayload,BlockNumber updateblkno,void ** ptp_workspace,Page * newlpage,Page * newrpage)528 entryBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
529 					  void *insertPayload, BlockNumber updateblkno,
530 					  void **ptp_workspace,
531 					  Page *newlpage, Page *newrpage)
532 {
533 	GinBtreeEntryInsertData *insertData = insertPayload;
534 	OffsetNumber off = stack->off;
535 
536 	/* If it doesn't fit, deal with split case */
537 	if (!entryIsEnoughSpace(btree, buf, off, insertData))
538 	{
539 		entrySplitPage(btree, buf, stack, insertData, updateblkno,
540 					   newlpage, newrpage);
541 		return GPTP_SPLIT;
542 	}
543 
544 	/* Else, we're ready to proceed with insertion */
545 	return GPTP_INSERT;
546 }
547 
548 /*
549  * Perform data insertion after beginPlaceToPage has decided it will fit.
550  *
551  * This is invoked within a critical section, and XLOG record creation (if
552  * needed) is already started.  The target buffer is registered in slot 0.
553  */
554 static void
entryExecPlaceToPage(GinBtree btree,Buffer buf,GinBtreeStack * stack,void * insertPayload,BlockNumber updateblkno,void * ptp_workspace)555 entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
556 					 void *insertPayload, BlockNumber updateblkno,
557 					 void *ptp_workspace)
558 {
559 	GinBtreeEntryInsertData *insertData = insertPayload;
560 	Page		page = BufferGetPage(buf);
561 	OffsetNumber off = stack->off;
562 	OffsetNumber placed;
563 
564 	entryPreparePage(btree, page, off, insertData, updateblkno);
565 
566 	placed = PageAddItem(page,
567 						 (Item) insertData->entry,
568 						 IndexTupleSize(insertData->entry),
569 						 off, false, false);
570 	if (placed != off)
571 		elog(ERROR, "failed to add item to index page in \"%s\"",
572 			 RelationGetRelationName(btree->index));
573 
574 	if (RelationNeedsWAL(btree->index))
575 	{
576 		/*
577 		 * This must be static, because it has to survive until XLogInsert,
578 		 * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
579 		 * isn't reentrant anyway.
580 		 */
581 		static ginxlogInsertEntry data;
582 
583 		data.isDelete = insertData->isDelete;
584 		data.offset = off;
585 
586 		XLogRegisterBufData(0, (char *) &data,
587 							offsetof(ginxlogInsertEntry, tuple));
588 		XLogRegisterBufData(0, (char *) insertData->entry,
589 							IndexTupleSize(insertData->entry));
590 	}
591 }
592 
593 /*
594  * Split entry page and insert new data.
595  *
596  * Returns new temp pages to *newlpage and *newrpage.
597  * The original buffer is left untouched.
598  */
599 static void
entrySplitPage(GinBtree btree,Buffer origbuf,GinBtreeStack * stack,GinBtreeEntryInsertData * insertData,BlockNumber updateblkno,Page * newlpage,Page * newrpage)600 entrySplitPage(GinBtree btree, Buffer origbuf,
601 			   GinBtreeStack *stack,
602 			   GinBtreeEntryInsertData *insertData,
603 			   BlockNumber updateblkno,
604 			   Page *newlpage, Page *newrpage)
605 {
606 	OffsetNumber off = stack->off;
607 	OffsetNumber i,
608 				maxoff,
609 				separator = InvalidOffsetNumber;
610 	Size		totalsize = 0;
611 	Size		lsize = 0,
612 				size;
613 	char	   *ptr;
614 	IndexTuple	itup;
615 	Page		page;
616 	Page		lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
617 	Page		rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
618 	Size		pageSize = PageGetPageSize(lpage);
619 	PGAlignedBlock tupstore[2]; /* could need 2 pages' worth of tuples */
620 
621 	entryPreparePage(btree, lpage, off, insertData, updateblkno);
622 
623 	/*
624 	 * First, append all the existing tuples and the new tuple we're inserting
625 	 * one after another in a temporary workspace.
626 	 */
627 	maxoff = PageGetMaxOffsetNumber(lpage);
628 	ptr = tupstore[0].data;
629 	for (i = FirstOffsetNumber; i <= maxoff; i++)
630 	{
631 		if (i == off)
632 		{
633 			size = MAXALIGN(IndexTupleSize(insertData->entry));
634 			memcpy(ptr, insertData->entry, size);
635 			ptr += size;
636 			totalsize += size + sizeof(ItemIdData);
637 		}
638 
639 		itup = (IndexTuple) PageGetItem(lpage, PageGetItemId(lpage, i));
640 		size = MAXALIGN(IndexTupleSize(itup));
641 		memcpy(ptr, itup, size);
642 		ptr += size;
643 		totalsize += size + sizeof(ItemIdData);
644 	}
645 
646 	if (off == maxoff + 1)
647 	{
648 		size = MAXALIGN(IndexTupleSize(insertData->entry));
649 		memcpy(ptr, insertData->entry, size);
650 		ptr += size;
651 		totalsize += size + sizeof(ItemIdData);
652 	}
653 
654 	/*
655 	 * Initialize the left and right pages, and copy all the tuples back to
656 	 * them.
657 	 */
658 	GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
659 	GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
660 
661 	ptr = tupstore[0].data;
662 	maxoff++;
663 	lsize = 0;
664 
665 	page = lpage;
666 	for (i = FirstOffsetNumber; i <= maxoff; i++)
667 	{
668 		itup = (IndexTuple) ptr;
669 
670 		/*
671 		 * Decide where to split.  We try to equalize the pages' total data
672 		 * size, not number of tuples.
673 		 */
674 		if (lsize > totalsize / 2)
675 		{
676 			if (separator == InvalidOffsetNumber)
677 				separator = i - 1;
678 			page = rpage;
679 		}
680 		else
681 		{
682 			lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
683 		}
684 
685 		if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
686 			elog(ERROR, "failed to add item to index page in \"%s\"",
687 				 RelationGetRelationName(btree->index));
688 		ptr += MAXALIGN(IndexTupleSize(itup));
689 	}
690 
691 	/* return temp pages to caller */
692 	*newlpage = lpage;
693 	*newrpage = rpage;
694 }
695 
696 /*
697  * Construct insertion payload for inserting the downlink for given buffer.
698  */
699 static void *
entryPrepareDownlink(GinBtree btree,Buffer lbuf)700 entryPrepareDownlink(GinBtree btree, Buffer lbuf)
701 {
702 	GinBtreeEntryInsertData *insertData;
703 	Page		lpage = BufferGetPage(lbuf);
704 	BlockNumber lblkno = BufferGetBlockNumber(lbuf);
705 	IndexTuple	itup;
706 
707 	itup = getRightMostTuple(lpage);
708 
709 	insertData = palloc(sizeof(GinBtreeEntryInsertData));
710 	insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
711 	insertData->isDelete = false;
712 
713 	return insertData;
714 }
715 
716 /*
717  * Fills new root by rightest values from child.
718  * Also called from ginxlog, should not use btree
719  */
720 void
ginEntryFillRoot(GinBtree btree,Page root,BlockNumber lblkno,Page lpage,BlockNumber rblkno,Page rpage)721 ginEntryFillRoot(GinBtree btree, Page root,
722 				 BlockNumber lblkno, Page lpage,
723 				 BlockNumber rblkno, Page rpage)
724 {
725 	IndexTuple	itup;
726 
727 	itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
728 	if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
729 		elog(ERROR, "failed to add item to index root page");
730 	pfree(itup);
731 
732 	itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
733 	if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
734 		elog(ERROR, "failed to add item to index root page");
735 	pfree(itup);
736 }
737 
738 /*
739  * Set up GinBtree for entry page access
740  *
741  * Note: during WAL recovery, there may be no valid data in ginstate
742  * other than a faked-up Relation pointer; the key datum is bogus too.
743  */
744 void
ginPrepareEntryScan(GinBtree btree,OffsetNumber attnum,Datum key,GinNullCategory category,GinState * ginstate)745 ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
746 					Datum key, GinNullCategory category,
747 					GinState *ginstate)
748 {
749 	memset(btree, 0, sizeof(GinBtreeData));
750 
751 	btree->index = ginstate->index;
752 	btree->rootBlkno = GIN_ROOT_BLKNO;
753 	btree->ginstate = ginstate;
754 
755 	btree->findChildPage = entryLocateEntry;
756 	btree->getLeftMostChild = entryGetLeftMostPage;
757 	btree->isMoveRight = entryIsMoveRight;
758 	btree->findItem = entryLocateLeafEntry;
759 	btree->findChildPtr = entryFindChildPtr;
760 	btree->beginPlaceToPage = entryBeginPlaceToPage;
761 	btree->execPlaceToPage = entryExecPlaceToPage;
762 	btree->fillRoot = ginEntryFillRoot;
763 	btree->prepareDownlink = entryPrepareDownlink;
764 
765 	btree->isData = FALSE;
766 	btree->fullScan = FALSE;
767 	btree->isBuild = FALSE;
768 
769 	btree->entryAttnum = attnum;
770 	btree->entryKey = key;
771 	btree->entryCategory = category;
772 }
773