1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  *	  implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/bufmgr.h"
24 #include "utils/rel.h"
25 
26 
27 /*
28  * SPPageDesc tracks all info about a page we are inserting into.  In some
29  * situations it actually identifies a tuple, or even a specific node within
30  * an inner tuple.  But any of the fields can be invalid.  If the buffer
31  * field is valid, it implies we hold pin and exclusive lock on that buffer.
32  * page pointer should be valid exactly when buffer is.
33  */
34 typedef struct SPPageDesc
35 {
36 	BlockNumber blkno;			/* block number, or InvalidBlockNumber */
37 	Buffer		buffer;			/* page's buffer number, or InvalidBuffer */
38 	Page		page;			/* pointer to page buffer, or NULL */
39 	OffsetNumber offnum;		/* offset of tuple, or InvalidOffsetNumber */
40 	int			node;			/* node number within inner tuple, or -1 */
41 } SPPageDesc;
42 
43 
44 /*
45  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
46  * is used to update the parent inner tuple's downlink after a move or
47  * split operation.
48  */
49 void
spgUpdateNodeLink(SpGistInnerTuple tup,int nodeN,BlockNumber blkno,OffsetNumber offset)50 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
51 				  BlockNumber blkno, OffsetNumber offset)
52 {
53 	int			i;
54 	SpGistNodeTuple node;
55 
56 	SGITITERATE(tup, i, node)
57 	{
58 		if (i == nodeN)
59 		{
60 			ItemPointerSet(&node->t_tid, blkno, offset);
61 			return;
62 		}
63 	}
64 
65 	elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66 		 nodeN);
67 }
68 
69 /*
70  * Form a new inner tuple containing one more node than the given one, with
71  * the specified label datum, inserted at offset "offset" in the node array.
72  * The new tuple's prefix is the same as the old one's.
73  *
74  * Note that the new node initially has an invalid downlink.  We'll find a
75  * page to point it to later.
76  */
77 static SpGistInnerTuple
addNode(SpGistState * state,SpGistInnerTuple tuple,Datum label,int offset)78 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
79 {
80 	SpGistNodeTuple node,
81 			   *nodes;
82 	int			i;
83 
84 	/* if offset is negative, insert at end */
85 	if (offset < 0)
86 		offset = tuple->nNodes;
87 	else if (offset > tuple->nNodes)
88 		elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89 
90 	nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91 	SGITITERATE(tuple, i, node)
92 	{
93 		if (i < offset)
94 			nodes[i] = node;
95 		else
96 			nodes[i + 1] = node;
97 	}
98 
99 	nodes[offset] = spgFormNodeTuple(state, label, false);
100 
101 	return spgFormInnerTuple(state,
102 							 (tuple->prefixSize > 0),
103 							 SGITDATUM(tuple, state),
104 							 tuple->nNodes + 1,
105 							 nodes);
106 }
107 
108 /* qsort comparator for sorting OffsetNumbers */
109 static int
cmpOffsetNumbers(const void * a,const void * b)110 cmpOffsetNumbers(const void *a, const void *b)
111 {
112 	if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113 		return 0;
114 	return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
116 
117 /*
118  * Delete multiple tuples from an index page, preserving tuple offset numbers.
119  *
120  * The first tuple in the given list is replaced with a dead tuple of type
121  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122  * with dead tuples of type "reststate".  If either firststate or reststate
123  * is REDIRECT, blkno/offnum specify where to link to.
124  *
125  * NB: this is used during WAL replay, so beware of trying to make it too
126  * smart.  In particular, it shouldn't use "state" except for calling
127  * spgFormDeadTuple().  This is also used in a critical section, so no
128  * pallocs either!
129  */
130 void
spgPageIndexMultiDelete(SpGistState * state,Page page,OffsetNumber * itemnos,int nitems,int firststate,int reststate,BlockNumber blkno,OffsetNumber offnum)131 spgPageIndexMultiDelete(SpGistState *state, Page page,
132 						OffsetNumber *itemnos, int nitems,
133 						int firststate, int reststate,
134 						BlockNumber blkno, OffsetNumber offnum)
135 {
136 	OffsetNumber firstItem;
137 	OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 	SpGistDeadTuple tuple = NULL;
139 	int			i;
140 
141 	if (nitems == 0)
142 		return;					/* nothing to do */
143 
144 	/*
145 	 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 	 * targets to be listed in sorted order, so we have to sort the itemnos
147 	 * array.  (This also greatly simplifies the math for reinserting the
148 	 * replacement tuples.)  However, we must not scribble on the caller's
149 	 * array, so we have to make a copy.
150 	 */
151 	memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 	if (nitems > 1)
153 		qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 
155 	PageIndexMultiDelete(page, sortednos, nitems);
156 
157 	firstItem = itemnos[0];
158 
159 	for (i = 0; i < nitems; i++)
160 	{
161 		OffsetNumber itemno = sortednos[i];
162 		int			tupstate;
163 
164 		tupstate = (itemno == firstItem) ? firststate : reststate;
165 		if (tuple == NULL || tuple->tupstate != tupstate)
166 			tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 
168 		if (PageAddItem(page, (Item) tuple, tuple->size,
169 						itemno, false, false) != itemno)
170 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 				 tuple->size);
172 
173 		if (tupstate == SPGIST_REDIRECT)
174 			SpGistPageGetOpaque(page)->nRedirection++;
175 		else if (tupstate == SPGIST_PLACEHOLDER)
176 			SpGistPageGetOpaque(page)->nPlaceholder++;
177 	}
178 }
179 
180 /*
181  * Update the parent inner tuple's downlink, and mark the parent buffer
182  * dirty (this must be the last change to the parent page in the current
183  * WAL action).
184  */
185 static void
saveNodeLink(Relation index,SPPageDesc * parent,BlockNumber blkno,OffsetNumber offnum)186 saveNodeLink(Relation index, SPPageDesc *parent,
187 			 BlockNumber blkno, OffsetNumber offnum)
188 {
189 	SpGistInnerTuple innerTuple;
190 
191 	innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 												PageGetItemId(parent->page, parent->offnum));
193 
194 	spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 
196 	MarkBufferDirty(parent->buffer);
197 }
198 
199 /*
200  * Add a leaf tuple to a leaf page where there is known to be room for it
201  */
202 static void
addLeafTuple(Relation index,SpGistState * state,SpGistLeafTuple leafTuple,SPPageDesc * current,SPPageDesc * parent,bool isNulls,bool isNew)203 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 			 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 {
206 	spgxlogAddLeaf xlrec;
207 
208 	xlrec.newPage = isNew;
209 	xlrec.storesNulls = isNulls;
210 
211 	/* these will be filled below as needed */
212 	xlrec.offnumLeaf = InvalidOffsetNumber;
213 	xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 	xlrec.offnumParent = InvalidOffsetNumber;
215 	xlrec.nodeI = 0;
216 
217 	START_CRIT_SECTION();
218 
219 	if (current->offnum == InvalidOffsetNumber ||
220 		SpGistBlockIsRoot(current->blkno))
221 	{
222 		/* Tuple is not part of a chain */
223 		SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
224 		current->offnum = SpGistPageAddNewItem(state, current->page,
225 											   (Item) leafTuple, leafTuple->size,
226 											   NULL, false);
227 
228 		xlrec.offnumLeaf = current->offnum;
229 
230 		/* Must update parent's downlink if any */
231 		if (parent->buffer != InvalidBuffer)
232 		{
233 			xlrec.offnumParent = parent->offnum;
234 			xlrec.nodeI = parent->node;
235 
236 			saveNodeLink(index, parent, current->blkno, current->offnum);
237 		}
238 	}
239 	else
240 	{
241 		/*
242 		 * Tuple must be inserted into existing chain.  We mustn't change the
243 		 * chain's head address, but we don't need to chase the entire chain
244 		 * to put the tuple at the end; we can insert it second.
245 		 *
246 		 * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 		 * in which case we should replace the DEAD tuple in-place.
248 		 */
249 		SpGistLeafTuple head;
250 		OffsetNumber offnum;
251 
252 		head = (SpGistLeafTuple) PageGetItem(current->page,
253 											 PageGetItemId(current->page, current->offnum));
254 		if (head->tupstate == SPGIST_LIVE)
255 		{
256 			SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257 			offnum = SpGistPageAddNewItem(state, current->page,
258 										  (Item) leafTuple, leafTuple->size,
259 										  NULL, false);
260 
261 			/*
262 			 * re-get head of list because it could have been moved on page,
263 			 * and set new second element
264 			 */
265 			head = (SpGistLeafTuple) PageGetItem(current->page,
266 												 PageGetItemId(current->page, current->offnum));
267 			SGLT_SET_NEXTOFFSET(head, offnum);
268 
269 			xlrec.offnumLeaf = offnum;
270 			xlrec.offnumHeadLeaf = current->offnum;
271 		}
272 		else if (head->tupstate == SPGIST_DEAD)
273 		{
274 			SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
275 			PageIndexTupleDelete(current->page, current->offnum);
276 			if (PageAddItem(current->page,
277 							(Item) leafTuple, leafTuple->size,
278 							current->offnum, false, false) != current->offnum)
279 				elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 					 leafTuple->size);
281 
282 			/* WAL replay distinguishes this case by equal offnums */
283 			xlrec.offnumLeaf = current->offnum;
284 			xlrec.offnumHeadLeaf = current->offnum;
285 		}
286 		else
287 			elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 	}
289 
290 	MarkBufferDirty(current->buffer);
291 
292 	if (RelationNeedsWAL(index) && !state->isBuild)
293 	{
294 		XLogRecPtr	recptr;
295 		int			flags;
296 
297 		XLogBeginInsert();
298 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 		XLogRegisterData((char *) leafTuple, leafTuple->size);
300 
301 		flags = REGBUF_STANDARD;
302 		if (xlrec.newPage)
303 			flags |= REGBUF_WILL_INIT;
304 		XLogRegisterBuffer(0, current->buffer, flags);
305 		if (xlrec.offnumParent != InvalidOffsetNumber)
306 			XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307 
308 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 
310 		PageSetLSN(current->page, recptr);
311 
312 		/* update parent only if we actually changed it */
313 		if (xlrec.offnumParent != InvalidOffsetNumber)
314 		{
315 			PageSetLSN(parent->page, recptr);
316 		}
317 	}
318 
319 	END_CRIT_SECTION();
320 }
321 
322 /*
323  * Count the number and total size of leaf tuples in the chain starting at
324  * current->offnum.  Return number into *nToSplit and total size as function
325  * result.
326  *
327  * Klugy special case when considering the root page (i.e., root is a leaf
328  * page, but we're about to split for the first time): return fake large
329  * values to force spgdoinsert() to take the doPickSplit rather than
330  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
331  */
332 static int
checkSplitConditions(Relation index,SpGistState * state,SPPageDesc * current,int * nToSplit)333 checkSplitConditions(Relation index, SpGistState *state,
334 					 SPPageDesc *current, int *nToSplit)
335 {
336 	int			i,
337 				n = 0,
338 				totalSize = 0;
339 
340 	if (SpGistBlockIsRoot(current->blkno))
341 	{
342 		/* return impossible values to force split */
343 		*nToSplit = BLCKSZ;
344 		return BLCKSZ;
345 	}
346 
347 	i = current->offnum;
348 	while (i != InvalidOffsetNumber)
349 	{
350 		SpGistLeafTuple it;
351 
352 		Assert(i >= FirstOffsetNumber &&
353 			   i <= PageGetMaxOffsetNumber(current->page));
354 		it = (SpGistLeafTuple) PageGetItem(current->page,
355 										   PageGetItemId(current->page, i));
356 		if (it->tupstate == SPGIST_LIVE)
357 		{
358 			n++;
359 			totalSize += it->size + sizeof(ItemIdData);
360 		}
361 		else if (it->tupstate == SPGIST_DEAD)
362 		{
363 			/* We could see a DEAD tuple as first/only chain item */
364 			Assert(i == current->offnum);
365 			Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
366 			/* Don't count it in result, because it won't go to other page */
367 		}
368 		else
369 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 
371 		i = SGLT_GET_NEXTOFFSET(it);
372 	}
373 
374 	*nToSplit = n;
375 
376 	return totalSize;
377 }
378 
379 /*
380  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381  * but the chain has to be moved because there's not enough room to add
382  * newLeafTuple to its page.  We use this method when the chain contains
383  * very little data so a split would be inefficient.  We are sure we can
384  * fit the chain plus newLeafTuple on one other page.
385  */
386 static void
moveLeafs(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,bool isNulls)387 moveLeafs(Relation index, SpGistState *state,
388 		  SPPageDesc *current, SPPageDesc *parent,
389 		  SpGistLeafTuple newLeafTuple, bool isNulls)
390 {
391 	int			i,
392 				nDelete,
393 				nInsert,
394 				size;
395 	Buffer		nbuf;
396 	Page		npage;
397 	SpGistLeafTuple it;
398 	OffsetNumber r = InvalidOffsetNumber,
399 				startOffset = InvalidOffsetNumber;
400 	bool		replaceDead = false;
401 	OffsetNumber *toDelete;
402 	OffsetNumber *toInsert;
403 	BlockNumber nblkno;
404 	spgxlogMoveLeafs xlrec;
405 	char	   *leafdata,
406 			   *leafptr;
407 
408 	/* This doesn't work on root page */
409 	Assert(parent->buffer != InvalidBuffer);
410 	Assert(parent->buffer != current->buffer);
411 
412 	/* Locate the tuples to be moved, and count up the space needed */
413 	i = PageGetMaxOffsetNumber(current->page);
414 	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 
417 	size = newLeafTuple->size + sizeof(ItemIdData);
418 
419 	nDelete = 0;
420 	i = current->offnum;
421 	while (i != InvalidOffsetNumber)
422 	{
423 		SpGistLeafTuple it;
424 
425 		Assert(i >= FirstOffsetNumber &&
426 			   i <= PageGetMaxOffsetNumber(current->page));
427 		it = (SpGistLeafTuple) PageGetItem(current->page,
428 										   PageGetItemId(current->page, i));
429 
430 		if (it->tupstate == SPGIST_LIVE)
431 		{
432 			toDelete[nDelete] = i;
433 			size += it->size + sizeof(ItemIdData);
434 			nDelete++;
435 		}
436 		else if (it->tupstate == SPGIST_DEAD)
437 		{
438 			/* We could see a DEAD tuple as first/only chain item */
439 			Assert(i == current->offnum);
440 			Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
441 			/* We don't want to move it, so don't count it in size */
442 			toDelete[nDelete] = i;
443 			nDelete++;
444 			replaceDead = true;
445 		}
446 		else
447 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 
449 		i = SGLT_GET_NEXTOFFSET(it);
450 	}
451 
452 	/* Find a leaf page that will hold them */
453 	nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454 						   size, &xlrec.newPage);
455 	npage = BufferGetPage(nbuf);
456 	nblkno = BufferGetBlockNumber(nbuf);
457 	Assert(nblkno != current->blkno);
458 
459 	leafdata = leafptr = palloc(size);
460 
461 	START_CRIT_SECTION();
462 
463 	/* copy all the old tuples to new page, unless they're dead */
464 	nInsert = 0;
465 	if (!replaceDead)
466 	{
467 		for (i = 0; i < nDelete; i++)
468 		{
469 			it = (SpGistLeafTuple) PageGetItem(current->page,
470 											   PageGetItemId(current->page, toDelete[i]));
471 			Assert(it->tupstate == SPGIST_LIVE);
472 
473 			/*
474 			 * Update chain link (notice the chain order gets reversed, but we
475 			 * don't care).  We're modifying the tuple on the source page
476 			 * here, but it's okay since we're about to delete it.
477 			 */
478 			SGLT_SET_NEXTOFFSET(it, r);
479 
480 			r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481 									 &startOffset, false);
482 
483 			toInsert[nInsert] = r;
484 			nInsert++;
485 
486 			/* save modified tuple into leafdata as well */
487 			memcpy(leafptr, it, it->size);
488 			leafptr += it->size;
489 		}
490 	}
491 
492 	/* add the new tuple as well */
493 	SGLT_SET_NEXTOFFSET(newLeafTuple, r);
494 	r = SpGistPageAddNewItem(state, npage,
495 							 (Item) newLeafTuple, newLeafTuple->size,
496 							 &startOffset, false);
497 	toInsert[nInsert] = r;
498 	nInsert++;
499 	memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500 	leafptr += newLeafTuple->size;
501 
502 	/*
503 	 * Now delete the old tuples, leaving a redirection pointer behind for the
504 	 * first one, unless we're doing an index build; in which case there can't
505 	 * be any concurrent scan so we need not provide a redirect.
506 	 */
507 	spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
508 							state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
509 							SPGIST_PLACEHOLDER,
510 							nblkno, r);
511 
512 	/* Update parent's downlink and mark parent page dirty */
513 	saveNodeLink(index, parent, nblkno, r);
514 
515 	/* Mark the leaf pages too */
516 	MarkBufferDirty(current->buffer);
517 	MarkBufferDirty(nbuf);
518 
519 	if (RelationNeedsWAL(index) && !state->isBuild)
520 	{
521 		XLogRecPtr	recptr;
522 
523 		/* prepare WAL info */
524 		STORE_STATE(state, xlrec.stateSrc);
525 
526 		xlrec.nMoves = nDelete;
527 		xlrec.replaceDead = replaceDead;
528 		xlrec.storesNulls = isNulls;
529 
530 		xlrec.offnumParent = parent->offnum;
531 		xlrec.nodeI = parent->node;
532 
533 		XLogBeginInsert();
534 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535 		XLogRegisterData((char *) toDelete,
536 						 sizeof(OffsetNumber) * nDelete);
537 		XLogRegisterData((char *) toInsert,
538 						 sizeof(OffsetNumber) * nInsert);
539 		XLogRegisterData((char *) leafdata, leafptr - leafdata);
540 
541 		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
542 		XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
543 		XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
544 
545 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546 
547 		PageSetLSN(current->page, recptr);
548 		PageSetLSN(npage, recptr);
549 		PageSetLSN(parent->page, recptr);
550 	}
551 
552 	END_CRIT_SECTION();
553 
554 	/* Update local free-space cache and release new buffer */
555 	SpGistSetLastUsedPage(index, nbuf);
556 	UnlockReleaseBuffer(nbuf);
557 }
558 
559 /*
560  * Update previously-created redirection tuple with appropriate destination
561  *
562  * We use this when it's not convenient to know the destination first.
563  * The tuple should have been made with the "impossible" destination of
564  * the metapage.
565  */
566 static void
setRedirectionTuple(SPPageDesc * current,OffsetNumber position,BlockNumber blkno,OffsetNumber offnum)567 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
568 					BlockNumber blkno, OffsetNumber offnum)
569 {
570 	SpGistDeadTuple dt;
571 
572 	dt = (SpGistDeadTuple) PageGetItem(current->page,
573 									   PageGetItemId(current->page, position));
574 	Assert(dt->tupstate == SPGIST_REDIRECT);
575 	Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
576 	ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
578 
579 /*
580  * Test to see if the user-defined picksplit function failed to do its job,
581  * ie, it put all the leaf tuples into the same node.
582  * If so, randomly divide the tuples into several nodes (all with the same
583  * label) and return true to select allTheSame mode for this inner tuple.
584  *
585  * (This code is also used to forcibly select allTheSame mode for nulls.)
586  *
587  * If we know that the leaf tuples wouldn't all fit on one page, then we
588  * exclude the last tuple (which is the incoming new tuple that forced a split)
589  * from the check to see if more than one node is used.  The reason for this
590  * is that if the existing tuples are put into only one chain, then even if
591  * we move them all to an empty page, there would still not be room for the
592  * new tuple, so we'd get into an infinite loop of picksplit attempts.
593  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594  * be split across pages.  (Exercise for the reader: figure out why this
595  * fixes the problem even when there is only one old tuple.)
596  */
597 static bool
checkAllTheSame(spgPickSplitIn * in,spgPickSplitOut * out,bool tooBig,bool * includeNew)598 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599 				bool *includeNew)
600 {
601 	int			theNode;
602 	int			limit;
603 	int			i;
604 
605 	/* For the moment, assume we can include the new leaf tuple */
606 	*includeNew = true;
607 
608 	/* If there's only the new leaf tuple, don't select allTheSame mode */
609 	if (in->nTuples <= 1)
610 		return false;
611 
612 	/* If tuple set doesn't fit on one page, ignore the new tuple in test */
613 	limit = tooBig ? in->nTuples - 1 : in->nTuples;
614 
615 	/* Check to see if more than one node is populated */
616 	theNode = out->mapTuplesToNodes[0];
617 	for (i = 1; i < limit; i++)
618 	{
619 		if (out->mapTuplesToNodes[i] != theNode)
620 			return false;
621 	}
622 
623 	/* Nope, so override the picksplit function's decisions */
624 
625 	/* If the new tuple is in its own node, it can't be included in split */
626 	if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627 		*includeNew = false;
628 
629 	out->nNodes = 8;			/* arbitrary number of child nodes */
630 
631 	/* Random assignment of tuples to nodes (note we include new tuple) */
632 	for (i = 0; i < in->nTuples; i++)
633 		out->mapTuplesToNodes[i] = i % out->nNodes;
634 
635 	/* The opclass may not use node labels, but if it does, duplicate 'em */
636 	if (out->nodeLabels)
637 	{
638 		Datum		theLabel = out->nodeLabels[theNode];
639 
640 		out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641 		for (i = 0; i < out->nNodes; i++)
642 			out->nodeLabels[i] = theLabel;
643 	}
644 
645 	/* We don't touch the prefix or the leaf tuple datum assignments */
646 
647 	return true;
648 }
649 
650 /*
651  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652  * but the chain has to be split because there's not enough room to add
653  * newLeafTuple to its page.
654  *
655  * This function splits the leaf tuple set according to picksplit's rules,
656  * creating one or more new chains that are spread across the current page
657  * and an additional leaf page (we assume that two leaf pages will be
658  * sufficient).  A new inner tuple is created, and the parent downlink
659  * pointer is updated to point to that inner tuple instead of the leaf chain.
660  *
661  * On exit, current contains the address of the new inner tuple.
662  *
663  * Returns true if we successfully inserted newLeafTuple during this function,
664  * false if caller still has to do it (meaning another picksplit operation is
665  * probably needed).  Failure could occur if the picksplit result is fairly
666  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667  * Because we force the picksplit result to be at least two chains, each
668  * cycle will get rid of at least one leaf tuple from the chain, so the loop
669  * will eventually terminate if lack of balance is the issue.  If the tuple
670  * is too big, we assume that repeated picksplit operations will eventually
671  * make it small enough by repeated prefix-stripping.  A broken opclass could
672  * make this an infinite loop, though, so spgdoinsert() checks that the
673  * leaf datums get smaller each time.
674  */
675 static bool
doPickSplit(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,int level,bool isNulls,bool isNew)676 doPickSplit(Relation index, SpGistState *state,
677 			SPPageDesc *current, SPPageDesc *parent,
678 			SpGistLeafTuple newLeafTuple,
679 			int level, bool isNulls, bool isNew)
680 {
681 	bool		insertedNew = false;
682 	spgPickSplitIn in;
683 	spgPickSplitOut out;
684 	FmgrInfo   *procinfo;
685 	bool		includeNew;
686 	int			i,
687 				max,
688 				n;
689 	SpGistInnerTuple innerTuple;
690 	SpGistNodeTuple node,
691 			   *nodes;
692 	Buffer		newInnerBuffer,
693 				newLeafBuffer;
694 	uint8	   *leafPageSelect;
695 	int		   *leafSizes;
696 	OffsetNumber *toDelete;
697 	OffsetNumber *toInsert;
698 	OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699 	OffsetNumber startOffsets[2];
700 	SpGistLeafTuple *oldLeafs;
701 	SpGistLeafTuple *newLeafs;
702 	Datum		leafDatums[INDEX_MAX_KEYS];
703 	bool		leafIsnulls[INDEX_MAX_KEYS];
704 	int			spaceToDelete;
705 	int			currentFreeSpace;
706 	int			totalLeafSizes;
707 	bool		allTheSame;
708 	spgxlogPickSplit xlrec;
709 	char	   *leafdata,
710 			   *leafptr;
711 	SPPageDesc	saveCurrent;
712 	int			nToDelete,
713 				nToInsert,
714 				maxToInclude;
715 
716 	in.level = level;
717 
718 	/*
719 	 * Allocate per-leaf-tuple work arrays with max possible size
720 	 */
721 	max = PageGetMaxOffsetNumber(current->page);
722 	n = max + 1;
723 	in.datums = (Datum *) palloc(sizeof(Datum) * n);
724 	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726 	oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
727 	newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728 	leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
729 
730 	STORE_STATE(state, xlrec.stateSrc);
731 
732 	/*
733 	 * Form list of leaf tuples which will be distributed as split result;
734 	 * also, count up the amount of space that will be freed from current.
735 	 * (Note that in the non-root case, we won't actually delete the old
736 	 * tuples, only replace them with redirects or placeholders.)
737 	 */
738 	nToInsert = 0;
739 	nToDelete = 0;
740 	spaceToDelete = 0;
741 	if (SpGistBlockIsRoot(current->blkno))
742 	{
743 		/*
744 		 * We are splitting the root (which up to now is also a leaf page).
745 		 * Its tuples are not linked, so scan sequentially to get them all. We
746 		 * ignore the original value of current->offnum.
747 		 */
748 		for (i = FirstOffsetNumber; i <= max; i++)
749 		{
750 			SpGistLeafTuple it;
751 
752 			it = (SpGistLeafTuple) PageGetItem(current->page,
753 											   PageGetItemId(current->page, i));
754 			if (it->tupstate == SPGIST_LIVE)
755 			{
756 				in.datums[nToInsert] =
757 					isNulls ? (Datum) 0 : SGLTDATUM(it, state);
758 				oldLeafs[nToInsert] = it;
759 				nToInsert++;
760 				toDelete[nToDelete] = i;
761 				nToDelete++;
762 				/* we will delete the tuple altogether, so count full space */
763 				spaceToDelete += it->size + sizeof(ItemIdData);
764 			}
765 			else				/* tuples on root should be live */
766 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
767 		}
768 	}
769 	else
770 	{
771 		/* Normal case, just collect the leaf tuples in the chain */
772 		i = current->offnum;
773 		while (i != InvalidOffsetNumber)
774 		{
775 			SpGistLeafTuple it;
776 
777 			Assert(i >= FirstOffsetNumber && i <= max);
778 			it = (SpGistLeafTuple) PageGetItem(current->page,
779 											   PageGetItemId(current->page, i));
780 			if (it->tupstate == SPGIST_LIVE)
781 			{
782 				in.datums[nToInsert] =
783 					isNulls ? (Datum) 0 : SGLTDATUM(it, state);
784 				oldLeafs[nToInsert] = it;
785 				nToInsert++;
786 				toDelete[nToDelete] = i;
787 				nToDelete++;
788 				/* we will not delete the tuple, only replace with dead */
789 				Assert(it->size >= SGDTSIZE);
790 				spaceToDelete += it->size - SGDTSIZE;
791 			}
792 			else if (it->tupstate == SPGIST_DEAD)
793 			{
794 				/* We could see a DEAD tuple as first/only chain item */
795 				Assert(i == current->offnum);
796 				Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
797 				toDelete[nToDelete] = i;
798 				nToDelete++;
799 				/* replacing it with redirect will save no space */
800 			}
801 			else
802 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
803 
804 			i = SGLT_GET_NEXTOFFSET(it);
805 		}
806 	}
807 	in.nTuples = nToInsert;
808 
809 	/*
810 	 * We may not actually insert new tuple because another picksplit may be
811 	 * necessary due to too large value, but we will try to allocate enough
812 	 * space to include it; and in any case it has to be included in the input
813 	 * for the picksplit function.  So don't increment nToInsert yet.
814 	 */
815 	in.datums[in.nTuples] =
816 		isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
817 	oldLeafs[in.nTuples] = newLeafTuple;
818 	in.nTuples++;
819 
820 	memset(&out, 0, sizeof(out));
821 
822 	if (!isNulls)
823 	{
824 		/*
825 		 * Perform split using user-defined method.
826 		 */
827 		procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
828 		FunctionCall2Coll(procinfo,
829 						  index->rd_indcollation[0],
830 						  PointerGetDatum(&in),
831 						  PointerGetDatum(&out));
832 
833 		/*
834 		 * Form new leaf tuples and count up the total space needed.
835 		 */
836 		totalLeafSizes = 0;
837 		for (i = 0; i < in.nTuples; i++)
838 		{
839 			if (state->leafTupDesc->natts > 1)
840 				spgDeformLeafTuple(oldLeafs[i],
841 								   state->leafTupDesc,
842 								   leafDatums,
843 								   leafIsnulls,
844 								   isNulls);
845 
846 			leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
847 			leafIsnulls[spgKeyColumn] = false;
848 
849 			newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
850 										   leafDatums,
851 										   leafIsnulls);
852 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
853 		}
854 	}
855 	else
856 	{
857 		/*
858 		 * Perform dummy split that puts all tuples into one node.
859 		 * checkAllTheSame will override this and force allTheSame mode.
860 		 */
861 		out.hasPrefix = false;
862 		out.nNodes = 1;
863 		out.nodeLabels = NULL;
864 		out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
865 
866 		/*
867 		 * Form new leaf tuples and count up the total space needed.
868 		 */
869 		totalLeafSizes = 0;
870 		for (i = 0; i < in.nTuples; i++)
871 		{
872 			if (state->leafTupDesc->natts > 1)
873 				spgDeformLeafTuple(oldLeafs[i],
874 								   state->leafTupDesc,
875 								   leafDatums,
876 								   leafIsnulls,
877 								   isNulls);
878 
879 			/*
880 			 * Nulls tree can contain only null key values.
881 			 */
882 			leafDatums[spgKeyColumn] = (Datum) 0;
883 			leafIsnulls[spgKeyColumn] = true;
884 
885 			newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
886 										   leafDatums,
887 										   leafIsnulls);
888 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
889 		}
890 	}
891 
892 	/*
893 	 * Check to see if the picksplit function failed to separate the values,
894 	 * ie, it put them all into the same child node.  If so, select allTheSame
895 	 * mode and create a random split instead.  See comments for
896 	 * checkAllTheSame as to why we need to know if the new leaf tuples could
897 	 * fit on one page.
898 	 */
899 	allTheSame = checkAllTheSame(&in, &out,
900 								 totalLeafSizes > SPGIST_PAGE_CAPACITY,
901 								 &includeNew);
902 
903 	/*
904 	 * If checkAllTheSame decided we must exclude the new tuple, don't
905 	 * consider it any further.
906 	 */
907 	if (includeNew)
908 		maxToInclude = in.nTuples;
909 	else
910 	{
911 		maxToInclude = in.nTuples - 1;
912 		totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
913 	}
914 
915 	/*
916 	 * Allocate per-node work arrays.  Since checkAllTheSame could replace
917 	 * out.nNodes with a value larger than the number of tuples on the input
918 	 * page, we can't allocate these arrays before here.
919 	 */
920 	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
921 	leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
922 
923 	/*
924 	 * Form nodes of inner tuple and inner tuple itself
925 	 */
926 	for (i = 0; i < out.nNodes; i++)
927 	{
928 		Datum		label = (Datum) 0;
929 		bool		labelisnull = (out.nodeLabels == NULL);
930 
931 		if (!labelisnull)
932 			label = out.nodeLabels[i];
933 		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
934 	}
935 	innerTuple = spgFormInnerTuple(state,
936 								   out.hasPrefix, out.prefixDatum,
937 								   out.nNodes, nodes);
938 	innerTuple->allTheSame = allTheSame;
939 
940 	/*
941 	 * Update nodes[] array to point into the newly formed innerTuple, so that
942 	 * we can adjust their downlinks below.
943 	 */
944 	SGITITERATE(innerTuple, i, node)
945 	{
946 		nodes[i] = node;
947 	}
948 
949 	/*
950 	 * Re-scan new leaf tuples and count up the space needed under each node.
951 	 */
952 	for (i = 0; i < maxToInclude; i++)
953 	{
954 		n = out.mapTuplesToNodes[i];
955 		if (n < 0 || n >= out.nNodes)
956 			elog(ERROR, "inconsistent result of SPGiST picksplit function");
957 		leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
958 	}
959 
960 	/*
961 	 * To perform the split, we must insert a new inner tuple, which can't go
962 	 * on a leaf page; and unless we are splitting the root page, we must then
963 	 * update the parent tuple's downlink to point to the inner tuple.  If
964 	 * there is room, we'll put the new inner tuple on the same page as the
965 	 * parent tuple, otherwise we need another non-leaf buffer. But if the
966 	 * parent page is the root, we can't add the new inner tuple there,
967 	 * because the root page must have only one inner tuple.
968 	 */
969 	xlrec.initInner = false;
970 	if (parent->buffer != InvalidBuffer &&
971 		!SpGistBlockIsRoot(parent->blkno) &&
972 		(SpGistPageGetFreeSpace(parent->page, 1) >=
973 		 innerTuple->size + sizeof(ItemIdData)))
974 	{
975 		/* New inner tuple will fit on parent page */
976 		newInnerBuffer = parent->buffer;
977 	}
978 	else if (parent->buffer != InvalidBuffer)
979 	{
980 		/* Send tuple to page with next triple parity (see README) */
981 		newInnerBuffer = SpGistGetBuffer(index,
982 										 GBUF_INNER_PARITY(parent->blkno + 1) |
983 										 (isNulls ? GBUF_NULLS : 0),
984 										 innerTuple->size + sizeof(ItemIdData),
985 										 &xlrec.initInner);
986 	}
987 	else
988 	{
989 		/* Root page split ... inner tuple will go to root page */
990 		newInnerBuffer = InvalidBuffer;
991 	}
992 
993 	/*
994 	 * The new leaf tuples converted from the existing ones should require the
995 	 * same or less space, and therefore should all fit onto one page
996 	 * (although that's not necessarily the current page, since we can't
997 	 * delete the old tuples but only replace them with placeholders).
998 	 * However, the incoming new tuple might not also fit, in which case we
999 	 * might need another picksplit cycle to reduce it some more.
1000 	 *
1001 	 * If there's not room to put everything back onto the current page, then
1002 	 * we decide on a per-node basis which tuples go to the new page. (We do
1003 	 * it like that because leaf tuple chains can't cross pages, so we must
1004 	 * place all leaf tuples belonging to the same parent node on the same
1005 	 * page.)
1006 	 *
1007 	 * If we are splitting the root page (turning it from a leaf page into an
1008 	 * inner page), then no leaf tuples can go back to the current page; they
1009 	 * must all go somewhere else.
1010 	 */
1011 	if (!SpGistBlockIsRoot(current->blkno))
1012 		currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1013 	else
1014 		currentFreeSpace = 0;	/* prevent assigning any tuples to current */
1015 
1016 	xlrec.initDest = false;
1017 
1018 	if (totalLeafSizes <= currentFreeSpace)
1019 	{
1020 		/* All the leaf tuples will fit on current page */
1021 		newLeafBuffer = InvalidBuffer;
1022 		/* mark new leaf tuple as included in insertions, if allowed */
1023 		if (includeNew)
1024 		{
1025 			nToInsert++;
1026 			insertedNew = true;
1027 		}
1028 		for (i = 0; i < nToInsert; i++)
1029 			leafPageSelect[i] = 0;	/* signifies current page */
1030 	}
1031 	else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1032 	{
1033 		/*
1034 		 * We're trying to split up a long value by repeated suffixing, but
1035 		 * it's not going to fit yet.  Don't bother allocating a second leaf
1036 		 * buffer that we won't be able to use.
1037 		 */
1038 		newLeafBuffer = InvalidBuffer;
1039 		Assert(includeNew);
1040 		Assert(nToInsert == 0);
1041 	}
1042 	else
1043 	{
1044 		/* We will need another leaf page */
1045 		uint8	   *nodePageSelect;
1046 		int			curspace;
1047 		int			newspace;
1048 
1049 		newLeafBuffer = SpGistGetBuffer(index,
1050 										GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1051 										Min(totalLeafSizes,
1052 											SPGIST_PAGE_CAPACITY),
1053 										&xlrec.initDest);
1054 
1055 		/*
1056 		 * Attempt to assign node groups to the two pages.  We might fail to
1057 		 * do so, even if totalLeafSizes is less than the available space,
1058 		 * because we can't split a group across pages.
1059 		 */
1060 		nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1061 
1062 		curspace = currentFreeSpace;
1063 		newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1064 		for (i = 0; i < out.nNodes; i++)
1065 		{
1066 			if (leafSizes[i] <= curspace)
1067 			{
1068 				nodePageSelect[i] = 0;	/* signifies current page */
1069 				curspace -= leafSizes[i];
1070 			}
1071 			else
1072 			{
1073 				nodePageSelect[i] = 1;	/* signifies new leaf page */
1074 				newspace -= leafSizes[i];
1075 			}
1076 		}
1077 		if (curspace >= 0 && newspace >= 0)
1078 		{
1079 			/* Successful assignment, so we can include the new leaf tuple */
1080 			if (includeNew)
1081 			{
1082 				nToInsert++;
1083 				insertedNew = true;
1084 			}
1085 		}
1086 		else if (includeNew)
1087 		{
1088 			/* We must exclude the new leaf tuple from the split */
1089 			int			nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1090 
1091 			leafSizes[nodeOfNewTuple] -=
1092 				newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1093 
1094 			/* Repeat the node assignment process --- should succeed now */
1095 			curspace = currentFreeSpace;
1096 			newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1097 			for (i = 0; i < out.nNodes; i++)
1098 			{
1099 				if (leafSizes[i] <= curspace)
1100 				{
1101 					nodePageSelect[i] = 0;	/* signifies current page */
1102 					curspace -= leafSizes[i];
1103 				}
1104 				else
1105 				{
1106 					nodePageSelect[i] = 1;	/* signifies new leaf page */
1107 					newspace -= leafSizes[i];
1108 				}
1109 			}
1110 			if (curspace < 0 || newspace < 0)
1111 				elog(ERROR, "failed to divide leaf tuple groups across pages");
1112 		}
1113 		else
1114 		{
1115 			/* oops, we already excluded new tuple ... should not get here */
1116 			elog(ERROR, "failed to divide leaf tuple groups across pages");
1117 		}
1118 		/* Expand the per-node assignments to be shown per leaf tuple */
1119 		for (i = 0; i < nToInsert; i++)
1120 		{
1121 			n = out.mapTuplesToNodes[i];
1122 			leafPageSelect[i] = nodePageSelect[n];
1123 		}
1124 	}
1125 
1126 	/* Start preparing WAL record */
1127 	xlrec.nDelete = 0;
1128 	xlrec.initSrc = isNew;
1129 	xlrec.storesNulls = isNulls;
1130 	xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1131 
1132 	leafdata = leafptr = (char *) palloc(totalLeafSizes);
1133 
1134 	/* Here we begin making the changes to the target pages */
1135 	START_CRIT_SECTION();
1136 
1137 	/*
1138 	 * Delete old leaf tuples from current buffer, except when we're splitting
1139 	 * the root; in that case there's no need because we'll re-init the page
1140 	 * below.  We do this first to make room for reinserting new leaf tuples.
1141 	 */
1142 	if (!SpGistBlockIsRoot(current->blkno))
1143 	{
1144 		/*
1145 		 * Init buffer instead of deleting individual tuples, but only if
1146 		 * there aren't any other live tuples and only during build; otherwise
1147 		 * we need to set a redirection tuple for concurrent scans.
1148 		 */
1149 		if (state->isBuild &&
1150 			nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1151 			PageGetMaxOffsetNumber(current->page))
1152 		{
1153 			SpGistInitBuffer(current->buffer,
1154 							 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1155 			xlrec.initSrc = true;
1156 		}
1157 		else if (isNew)
1158 		{
1159 			/* don't expose the freshly init'd buffer as a backup block */
1160 			Assert(nToDelete == 0);
1161 		}
1162 		else
1163 		{
1164 			xlrec.nDelete = nToDelete;
1165 
1166 			if (!state->isBuild)
1167 			{
1168 				/*
1169 				 * Need to create redirect tuple (it will point to new inner
1170 				 * tuple) but right now the new tuple's location is not known
1171 				 * yet.  So, set the redirection pointer to "impossible" value
1172 				 * and remember its position to update tuple later.
1173 				 */
1174 				if (nToDelete > 0)
1175 					redirectTuplePos = toDelete[0];
1176 				spgPageIndexMultiDelete(state, current->page,
1177 										toDelete, nToDelete,
1178 										SPGIST_REDIRECT,
1179 										SPGIST_PLACEHOLDER,
1180 										SPGIST_METAPAGE_BLKNO,
1181 										FirstOffsetNumber);
1182 			}
1183 			else
1184 			{
1185 				/*
1186 				 * During index build there is not concurrent searches, so we
1187 				 * don't need to create redirection tuple.
1188 				 */
1189 				spgPageIndexMultiDelete(state, current->page,
1190 										toDelete, nToDelete,
1191 										SPGIST_PLACEHOLDER,
1192 										SPGIST_PLACEHOLDER,
1193 										InvalidBlockNumber,
1194 										InvalidOffsetNumber);
1195 			}
1196 		}
1197 	}
1198 
1199 	/*
1200 	 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1201 	 * nodes.
1202 	 */
1203 	startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1204 	for (i = 0; i < nToInsert; i++)
1205 	{
1206 		SpGistLeafTuple it = newLeafs[i];
1207 		Buffer		leafBuffer;
1208 		BlockNumber leafBlock;
1209 		OffsetNumber newoffset;
1210 
1211 		/* Which page is it going to? */
1212 		leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1213 		leafBlock = BufferGetBlockNumber(leafBuffer);
1214 
1215 		/* Link tuple into correct chain for its node */
1216 		n = out.mapTuplesToNodes[i];
1217 
1218 		if (ItemPointerIsValid(&nodes[n]->t_tid))
1219 		{
1220 			Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1221 			SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1222 		}
1223 		else
1224 			SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1225 
1226 		/* Insert it on page */
1227 		newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1228 										 (Item) it, it->size,
1229 										 &startOffsets[leafPageSelect[i]],
1230 										 false);
1231 		toInsert[i] = newoffset;
1232 
1233 		/* ... and complete the chain linking */
1234 		ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1235 
1236 		/* Also copy leaf tuple into WAL data */
1237 		memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1238 		leafptr += newLeafs[i]->size;
1239 	}
1240 
1241 	/*
1242 	 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1243 	 * current->buffer will be marked below, after we're entirely done
1244 	 * modifying it.
1245 	 */
1246 	if (newLeafBuffer != InvalidBuffer)
1247 	{
1248 		MarkBufferDirty(newLeafBuffer);
1249 	}
1250 
1251 	/* Remember current buffer, since we're about to change "current" */
1252 	saveCurrent = *current;
1253 
1254 	/*
1255 	 * Store the new innerTuple
1256 	 */
1257 	if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1258 	{
1259 		/*
1260 		 * new inner tuple goes to parent page
1261 		 */
1262 		Assert(current->buffer != parent->buffer);
1263 
1264 		/* Repoint "current" at the new inner tuple */
1265 		current->blkno = parent->blkno;
1266 		current->buffer = parent->buffer;
1267 		current->page = parent->page;
1268 		xlrec.offnumInner = current->offnum =
1269 			SpGistPageAddNewItem(state, current->page,
1270 								 (Item) innerTuple, innerTuple->size,
1271 								 NULL, false);
1272 
1273 		/*
1274 		 * Update parent node link and mark parent page dirty
1275 		 */
1276 		xlrec.innerIsParent = true;
1277 		xlrec.offnumParent = parent->offnum;
1278 		xlrec.nodeI = parent->node;
1279 		saveNodeLink(index, parent, current->blkno, current->offnum);
1280 
1281 		/*
1282 		 * Update redirection link (in old current buffer)
1283 		 */
1284 		if (redirectTuplePos != InvalidOffsetNumber)
1285 			setRedirectionTuple(&saveCurrent, redirectTuplePos,
1286 								current->blkno, current->offnum);
1287 
1288 		/* Done modifying old current buffer, mark it dirty */
1289 		MarkBufferDirty(saveCurrent.buffer);
1290 	}
1291 	else if (parent->buffer != InvalidBuffer)
1292 	{
1293 		/*
1294 		 * new inner tuple will be stored on a new page
1295 		 */
1296 		Assert(newInnerBuffer != InvalidBuffer);
1297 
1298 		/* Repoint "current" at the new inner tuple */
1299 		current->buffer = newInnerBuffer;
1300 		current->blkno = BufferGetBlockNumber(current->buffer);
1301 		current->page = BufferGetPage(current->buffer);
1302 		xlrec.offnumInner = current->offnum =
1303 			SpGistPageAddNewItem(state, current->page,
1304 								 (Item) innerTuple, innerTuple->size,
1305 								 NULL, false);
1306 
1307 		/* Done modifying new current buffer, mark it dirty */
1308 		MarkBufferDirty(current->buffer);
1309 
1310 		/*
1311 		 * Update parent node link and mark parent page dirty
1312 		 */
1313 		xlrec.innerIsParent = (parent->buffer == current->buffer);
1314 		xlrec.offnumParent = parent->offnum;
1315 		xlrec.nodeI = parent->node;
1316 		saveNodeLink(index, parent, current->blkno, current->offnum);
1317 
1318 		/*
1319 		 * Update redirection link (in old current buffer)
1320 		 */
1321 		if (redirectTuplePos != InvalidOffsetNumber)
1322 			setRedirectionTuple(&saveCurrent, redirectTuplePos,
1323 								current->blkno, current->offnum);
1324 
1325 		/* Done modifying old current buffer, mark it dirty */
1326 		MarkBufferDirty(saveCurrent.buffer);
1327 	}
1328 	else
1329 	{
1330 		/*
1331 		 * Splitting root page, which was a leaf but now becomes inner page
1332 		 * (and so "current" continues to point at it)
1333 		 */
1334 		Assert(SpGistBlockIsRoot(current->blkno));
1335 		Assert(redirectTuplePos == InvalidOffsetNumber);
1336 
1337 		SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1338 		xlrec.initInner = true;
1339 		xlrec.innerIsParent = false;
1340 
1341 		xlrec.offnumInner = current->offnum =
1342 			PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1343 						InvalidOffsetNumber, false, false);
1344 		if (current->offnum != FirstOffsetNumber)
1345 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1346 				 innerTuple->size);
1347 
1348 		/* No parent link to update, nor redirection to do */
1349 		xlrec.offnumParent = InvalidOffsetNumber;
1350 		xlrec.nodeI = 0;
1351 
1352 		/* Done modifying new current buffer, mark it dirty */
1353 		MarkBufferDirty(current->buffer);
1354 
1355 		/* saveCurrent doesn't represent a different buffer */
1356 		saveCurrent.buffer = InvalidBuffer;
1357 	}
1358 
1359 	if (RelationNeedsWAL(index) && !state->isBuild)
1360 	{
1361 		XLogRecPtr	recptr;
1362 		int			flags;
1363 
1364 		XLogBeginInsert();
1365 
1366 		xlrec.nInsert = nToInsert;
1367 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1368 
1369 		XLogRegisterData((char *) toDelete,
1370 						 sizeof(OffsetNumber) * xlrec.nDelete);
1371 		XLogRegisterData((char *) toInsert,
1372 						 sizeof(OffsetNumber) * xlrec.nInsert);
1373 		XLogRegisterData((char *) leafPageSelect,
1374 						 sizeof(uint8) * xlrec.nInsert);
1375 		XLogRegisterData((char *) innerTuple, innerTuple->size);
1376 		XLogRegisterData(leafdata, leafptr - leafdata);
1377 
1378 		/* Old leaf page */
1379 		if (BufferIsValid(saveCurrent.buffer))
1380 		{
1381 			flags = REGBUF_STANDARD;
1382 			if (xlrec.initSrc)
1383 				flags |= REGBUF_WILL_INIT;
1384 			XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1385 		}
1386 
1387 		/* New leaf page */
1388 		if (BufferIsValid(newLeafBuffer))
1389 		{
1390 			flags = REGBUF_STANDARD;
1391 			if (xlrec.initDest)
1392 				flags |= REGBUF_WILL_INIT;
1393 			XLogRegisterBuffer(1, newLeafBuffer, flags);
1394 		}
1395 
1396 		/* Inner page */
1397 		flags = REGBUF_STANDARD;
1398 		if (xlrec.initInner)
1399 			flags |= REGBUF_WILL_INIT;
1400 		XLogRegisterBuffer(2, current->buffer, flags);
1401 
1402 		/* Parent page, if different from inner page */
1403 		if (parent->buffer != InvalidBuffer)
1404 		{
1405 			if (parent->buffer != current->buffer)
1406 				XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1407 			else
1408 				Assert(xlrec.innerIsParent);
1409 		}
1410 
1411 		/* Issue the WAL record */
1412 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1413 
1414 		/* Update page LSNs on all affected pages */
1415 		if (newLeafBuffer != InvalidBuffer)
1416 		{
1417 			Page		page = BufferGetPage(newLeafBuffer);
1418 
1419 			PageSetLSN(page, recptr);
1420 		}
1421 
1422 		if (saveCurrent.buffer != InvalidBuffer)
1423 		{
1424 			Page		page = BufferGetPage(saveCurrent.buffer);
1425 
1426 			PageSetLSN(page, recptr);
1427 		}
1428 
1429 		PageSetLSN(current->page, recptr);
1430 
1431 		if (parent->buffer != InvalidBuffer)
1432 		{
1433 			PageSetLSN(parent->page, recptr);
1434 		}
1435 	}
1436 
1437 	END_CRIT_SECTION();
1438 
1439 	/* Update local free-space cache and unlock buffers */
1440 	if (newLeafBuffer != InvalidBuffer)
1441 	{
1442 		SpGistSetLastUsedPage(index, newLeafBuffer);
1443 		UnlockReleaseBuffer(newLeafBuffer);
1444 	}
1445 	if (saveCurrent.buffer != InvalidBuffer)
1446 	{
1447 		SpGistSetLastUsedPage(index, saveCurrent.buffer);
1448 		UnlockReleaseBuffer(saveCurrent.buffer);
1449 	}
1450 
1451 	return insertedNew;
1452 }
1453 
1454 /*
1455  * spgMatchNode action: descend to N'th child node of current inner tuple
1456  */
1457 static void
spgMatchNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN)1458 spgMatchNodeAction(Relation index, SpGistState *state,
1459 				   SpGistInnerTuple innerTuple,
1460 				   SPPageDesc *current, SPPageDesc *parent, int nodeN)
1461 {
1462 	int			i;
1463 	SpGistNodeTuple node;
1464 
1465 	/* Release previous parent buffer if any */
1466 	if (parent->buffer != InvalidBuffer &&
1467 		parent->buffer != current->buffer)
1468 	{
1469 		SpGistSetLastUsedPage(index, parent->buffer);
1470 		UnlockReleaseBuffer(parent->buffer);
1471 	}
1472 
1473 	/* Repoint parent to specified node of current inner tuple */
1474 	parent->blkno = current->blkno;
1475 	parent->buffer = current->buffer;
1476 	parent->page = current->page;
1477 	parent->offnum = current->offnum;
1478 	parent->node = nodeN;
1479 
1480 	/* Locate that node */
1481 	SGITITERATE(innerTuple, i, node)
1482 	{
1483 		if (i == nodeN)
1484 			break;
1485 	}
1486 
1487 	if (i != nodeN)
1488 		elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1489 			 nodeN);
1490 
1491 	/* Point current to the downlink location, if any */
1492 	if (ItemPointerIsValid(&node->t_tid))
1493 	{
1494 		current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1495 		current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1496 	}
1497 	else
1498 	{
1499 		/* Downlink is empty, so we'll need to find a new page */
1500 		current->blkno = InvalidBlockNumber;
1501 		current->offnum = InvalidOffsetNumber;
1502 	}
1503 
1504 	current->buffer = InvalidBuffer;
1505 	current->page = NULL;
1506 }
1507 
1508 /*
1509  * spgAddNode action: add a node to the inner tuple at current
1510  */
1511 static void
spgAddNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN,Datum nodeLabel)1512 spgAddNodeAction(Relation index, SpGistState *state,
1513 				 SpGistInnerTuple innerTuple,
1514 				 SPPageDesc *current, SPPageDesc *parent,
1515 				 int nodeN, Datum nodeLabel)
1516 {
1517 	SpGistInnerTuple newInnerTuple;
1518 	spgxlogAddNode xlrec;
1519 
1520 	/* Should not be applied to nulls */
1521 	Assert(!SpGistPageStoresNulls(current->page));
1522 
1523 	/* Construct new inner tuple with additional node */
1524 	newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1525 
1526 	/* Prepare WAL record */
1527 	STORE_STATE(state, xlrec.stateSrc);
1528 	xlrec.offnum = current->offnum;
1529 
1530 	/* we don't fill these unless we need to change the parent downlink */
1531 	xlrec.parentBlk = -1;
1532 	xlrec.offnumParent = InvalidOffsetNumber;
1533 	xlrec.nodeI = 0;
1534 
1535 	/* we don't fill these unless tuple has to be moved */
1536 	xlrec.offnumNew = InvalidOffsetNumber;
1537 	xlrec.newPage = false;
1538 
1539 	if (PageGetExactFreeSpace(current->page) >=
1540 		newInnerTuple->size - innerTuple->size)
1541 	{
1542 		/*
1543 		 * We can replace the inner tuple by new version in-place
1544 		 */
1545 		START_CRIT_SECTION();
1546 
1547 		PageIndexTupleDelete(current->page, current->offnum);
1548 		if (PageAddItem(current->page,
1549 						(Item) newInnerTuple, newInnerTuple->size,
1550 						current->offnum, false, false) != current->offnum)
1551 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1552 				 newInnerTuple->size);
1553 
1554 		MarkBufferDirty(current->buffer);
1555 
1556 		if (RelationNeedsWAL(index) && !state->isBuild)
1557 		{
1558 			XLogRecPtr	recptr;
1559 
1560 			XLogBeginInsert();
1561 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1562 			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1563 
1564 			XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1565 
1566 			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1567 
1568 			PageSetLSN(current->page, recptr);
1569 		}
1570 
1571 		END_CRIT_SECTION();
1572 	}
1573 	else
1574 	{
1575 		/*
1576 		 * move inner tuple to another page, and update parent
1577 		 */
1578 		SpGistDeadTuple dt;
1579 		SPPageDesc	saveCurrent;
1580 
1581 		/*
1582 		 * It should not be possible to get here for the root page, since we
1583 		 * allow only one inner tuple on the root page, and spgFormInnerTuple
1584 		 * always checks that inner tuples don't exceed the size of a page.
1585 		 */
1586 		if (SpGistBlockIsRoot(current->blkno))
1587 			elog(ERROR, "cannot enlarge root tuple any more");
1588 		Assert(parent->buffer != InvalidBuffer);
1589 
1590 		saveCurrent = *current;
1591 
1592 		xlrec.offnumParent = parent->offnum;
1593 		xlrec.nodeI = parent->node;
1594 
1595 		/*
1596 		 * obtain new buffer with the same parity as current, since it will be
1597 		 * a child of same parent tuple
1598 		 */
1599 		current->buffer = SpGistGetBuffer(index,
1600 										  GBUF_INNER_PARITY(current->blkno),
1601 										  newInnerTuple->size + sizeof(ItemIdData),
1602 										  &xlrec.newPage);
1603 		current->blkno = BufferGetBlockNumber(current->buffer);
1604 		current->page = BufferGetPage(current->buffer);
1605 
1606 		/*
1607 		 * Let's just make real sure new current isn't same as old.  Right now
1608 		 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1609 		 * delete placeholder tuples before checking space, maybe it wouldn't
1610 		 * be impossible.  The case would appear to work except that WAL
1611 		 * replay would be subtly wrong, so I think a mere assert isn't enough
1612 		 * here.
1613 		 */
1614 		if (current->blkno == saveCurrent.blkno)
1615 			elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1616 
1617 		/*
1618 		 * New current and parent buffer will both be modified; but note that
1619 		 * parent buffer could be same as either new or old current.
1620 		 */
1621 		if (parent->buffer == saveCurrent.buffer)
1622 			xlrec.parentBlk = 0;
1623 		else if (parent->buffer == current->buffer)
1624 			xlrec.parentBlk = 1;
1625 		else
1626 			xlrec.parentBlk = 2;
1627 
1628 		START_CRIT_SECTION();
1629 
1630 		/* insert new ... */
1631 		xlrec.offnumNew = current->offnum =
1632 			SpGistPageAddNewItem(state, current->page,
1633 								 (Item) newInnerTuple, newInnerTuple->size,
1634 								 NULL, false);
1635 
1636 		MarkBufferDirty(current->buffer);
1637 
1638 		/* update parent's downlink and mark parent page dirty */
1639 		saveNodeLink(index, parent, current->blkno, current->offnum);
1640 
1641 		/*
1642 		 * Replace old tuple with a placeholder or redirection tuple.  Unless
1643 		 * doing an index build, we have to insert a redirection tuple for
1644 		 * possible concurrent scans.  We can't just delete it in any case,
1645 		 * because that could change the offsets of other tuples on the page,
1646 		 * breaking downlinks from their parents.
1647 		 */
1648 		if (state->isBuild)
1649 			dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1650 								  InvalidBlockNumber, InvalidOffsetNumber);
1651 		else
1652 			dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1653 								  current->blkno, current->offnum);
1654 
1655 		PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1656 		if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1657 						saveCurrent.offnum,
1658 						false, false) != saveCurrent.offnum)
1659 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1660 				 dt->size);
1661 
1662 		if (state->isBuild)
1663 			SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1664 		else
1665 			SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1666 
1667 		MarkBufferDirty(saveCurrent.buffer);
1668 
1669 		if (RelationNeedsWAL(index) && !state->isBuild)
1670 		{
1671 			XLogRecPtr	recptr;
1672 			int			flags;
1673 
1674 			XLogBeginInsert();
1675 
1676 			/* orig page */
1677 			XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1678 			/* new page */
1679 			flags = REGBUF_STANDARD;
1680 			if (xlrec.newPage)
1681 				flags |= REGBUF_WILL_INIT;
1682 			XLogRegisterBuffer(1, current->buffer, flags);
1683 			/* parent page (if different from orig and new) */
1684 			if (xlrec.parentBlk == 2)
1685 				XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1686 
1687 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1688 			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1689 
1690 			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1691 
1692 			/* we don't bother to check if any of these are redundant */
1693 			PageSetLSN(current->page, recptr);
1694 			PageSetLSN(parent->page, recptr);
1695 			PageSetLSN(saveCurrent.page, recptr);
1696 		}
1697 
1698 		END_CRIT_SECTION();
1699 
1700 		/* Release saveCurrent if it's not same as current or parent */
1701 		if (saveCurrent.buffer != current->buffer &&
1702 			saveCurrent.buffer != parent->buffer)
1703 		{
1704 			SpGistSetLastUsedPage(index, saveCurrent.buffer);
1705 			UnlockReleaseBuffer(saveCurrent.buffer);
1706 		}
1707 	}
1708 }
1709 
1710 /*
1711  * spgSplitNode action: split inner tuple at current into prefix and postfix
1712  */
1713 static void
spgSplitNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,spgChooseOut * out)1714 spgSplitNodeAction(Relation index, SpGistState *state,
1715 				   SpGistInnerTuple innerTuple,
1716 				   SPPageDesc *current, spgChooseOut *out)
1717 {
1718 	SpGistInnerTuple prefixTuple,
1719 				postfixTuple;
1720 	SpGistNodeTuple node,
1721 			   *nodes;
1722 	BlockNumber postfixBlkno;
1723 	OffsetNumber postfixOffset;
1724 	int			i;
1725 	spgxlogSplitTuple xlrec;
1726 	Buffer		newBuffer = InvalidBuffer;
1727 
1728 	/* Should not be applied to nulls */
1729 	Assert(!SpGistPageStoresNulls(current->page));
1730 
1731 	/* Check opclass gave us sane values */
1732 	if (out->result.splitTuple.prefixNNodes <= 0 ||
1733 		out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1734 		elog(ERROR, "invalid number of prefix nodes: %d",
1735 			 out->result.splitTuple.prefixNNodes);
1736 	if (out->result.splitTuple.childNodeN < 0 ||
1737 		out->result.splitTuple.childNodeN >=
1738 		out->result.splitTuple.prefixNNodes)
1739 		elog(ERROR, "invalid child node number: %d",
1740 			 out->result.splitTuple.childNodeN);
1741 
1742 	/*
1743 	 * Construct new prefix tuple with requested number of nodes.  We'll fill
1744 	 * in the childNodeN'th node's downlink below.
1745 	 */
1746 	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1747 									   out->result.splitTuple.prefixNNodes);
1748 
1749 	for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1750 	{
1751 		Datum		label = (Datum) 0;
1752 		bool		labelisnull;
1753 
1754 		labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1755 		if (!labelisnull)
1756 			label = out->result.splitTuple.prefixNodeLabels[i];
1757 		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1758 	}
1759 
1760 	prefixTuple = spgFormInnerTuple(state,
1761 									out->result.splitTuple.prefixHasPrefix,
1762 									out->result.splitTuple.prefixPrefixDatum,
1763 									out->result.splitTuple.prefixNNodes,
1764 									nodes);
1765 
1766 	/* it must fit in the space that innerTuple now occupies */
1767 	if (prefixTuple->size > innerTuple->size)
1768 		elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1769 
1770 	/*
1771 	 * Construct new postfix tuple, containing all nodes of innerTuple with
1772 	 * same node datums, but with the prefix specified by the picksplit
1773 	 * function.
1774 	 */
1775 	nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1776 	SGITITERATE(innerTuple, i, node)
1777 	{
1778 		nodes[i] = node;
1779 	}
1780 
1781 	postfixTuple = spgFormInnerTuple(state,
1782 									 out->result.splitTuple.postfixHasPrefix,
1783 									 out->result.splitTuple.postfixPrefixDatum,
1784 									 innerTuple->nNodes, nodes);
1785 
1786 	/* Postfix tuple is allTheSame if original tuple was */
1787 	postfixTuple->allTheSame = innerTuple->allTheSame;
1788 
1789 	/* prep data for WAL record */
1790 	xlrec.newPage = false;
1791 
1792 	/*
1793 	 * If we can't fit both tuples on the current page, get a new page for the
1794 	 * postfix tuple.  In particular, can't split to the root page.
1795 	 *
1796 	 * For the space calculation, note that prefixTuple replaces innerTuple
1797 	 * but postfixTuple will be a new entry.
1798 	 */
1799 	if (SpGistBlockIsRoot(current->blkno) ||
1800 		SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1801 		prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1802 	{
1803 		/*
1804 		 * Choose page with next triple parity, because postfix tuple is a
1805 		 * child of prefix one
1806 		 */
1807 		newBuffer = SpGistGetBuffer(index,
1808 									GBUF_INNER_PARITY(current->blkno + 1),
1809 									postfixTuple->size + sizeof(ItemIdData),
1810 									&xlrec.newPage);
1811 	}
1812 
1813 	START_CRIT_SECTION();
1814 
1815 	/*
1816 	 * Replace old tuple by prefix tuple
1817 	 */
1818 	PageIndexTupleDelete(current->page, current->offnum);
1819 	xlrec.offnumPrefix = PageAddItem(current->page,
1820 									 (Item) prefixTuple, prefixTuple->size,
1821 									 current->offnum, false, false);
1822 	if (xlrec.offnumPrefix != current->offnum)
1823 		elog(ERROR, "failed to add item of size %u to SPGiST index page",
1824 			 prefixTuple->size);
1825 
1826 	/*
1827 	 * put postfix tuple into appropriate page
1828 	 */
1829 	if (newBuffer == InvalidBuffer)
1830 	{
1831 		postfixBlkno = current->blkno;
1832 		xlrec.offnumPostfix = postfixOffset =
1833 			SpGistPageAddNewItem(state, current->page,
1834 								 (Item) postfixTuple, postfixTuple->size,
1835 								 NULL, false);
1836 		xlrec.postfixBlkSame = true;
1837 	}
1838 	else
1839 	{
1840 		postfixBlkno = BufferGetBlockNumber(newBuffer);
1841 		xlrec.offnumPostfix = postfixOffset =
1842 			SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1843 								 (Item) postfixTuple, postfixTuple->size,
1844 								 NULL, false);
1845 		MarkBufferDirty(newBuffer);
1846 		xlrec.postfixBlkSame = false;
1847 	}
1848 
1849 	/*
1850 	 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1851 	 * (We can't avoid this step by doing the above two steps in opposite
1852 	 * order, because there might not be enough space on the page to insert
1853 	 * the postfix tuple first.)  We have to update the local copy of the
1854 	 * prefixTuple too, because that's what will be written to WAL.
1855 	 */
1856 	spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1857 					  postfixBlkno, postfixOffset);
1858 	prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1859 												 PageGetItemId(current->page, current->offnum));
1860 	spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1861 					  postfixBlkno, postfixOffset);
1862 
1863 	MarkBufferDirty(current->buffer);
1864 
1865 	if (RelationNeedsWAL(index) && !state->isBuild)
1866 	{
1867 		XLogRecPtr	recptr;
1868 
1869 		XLogBeginInsert();
1870 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1871 		XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1872 		XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1873 
1874 		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1875 		if (newBuffer != InvalidBuffer)
1876 		{
1877 			int			flags;
1878 
1879 			flags = REGBUF_STANDARD;
1880 			if (xlrec.newPage)
1881 				flags |= REGBUF_WILL_INIT;
1882 			XLogRegisterBuffer(1, newBuffer, flags);
1883 		}
1884 
1885 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1886 
1887 		PageSetLSN(current->page, recptr);
1888 
1889 		if (newBuffer != InvalidBuffer)
1890 		{
1891 			PageSetLSN(BufferGetPage(newBuffer), recptr);
1892 		}
1893 	}
1894 
1895 	END_CRIT_SECTION();
1896 
1897 	/* Update local free-space cache and release buffer */
1898 	if (newBuffer != InvalidBuffer)
1899 	{
1900 		SpGistSetLastUsedPage(index, newBuffer);
1901 		UnlockReleaseBuffer(newBuffer);
1902 	}
1903 }
1904 
1905 /*
1906  * Insert one item into the index.
1907  *
1908  * Returns true on success, false if we failed to complete the insertion
1909  * (typically because of conflict with a concurrent insert).  In the latter
1910  * case, caller should re-call spgdoinsert() with the same args.
1911  */
1912 bool
spgdoinsert(Relation index,SpGistState * state,ItemPointer heapPtr,Datum * datums,bool * isnulls)1913 spgdoinsert(Relation index, SpGistState *state,
1914 			ItemPointer heapPtr, Datum *datums, bool *isnulls)
1915 {
1916 	bool		result = true;
1917 	TupleDesc	leafDescriptor = state->leafTupDesc;
1918 	bool		isnull = isnulls[spgKeyColumn];
1919 	int			level = 0;
1920 	Datum		leafDatums[INDEX_MAX_KEYS];
1921 	int			leafSize;
1922 	int			bestLeafSize;
1923 	int			numNoProgressCycles = 0;
1924 	SPPageDesc	current,
1925 				parent;
1926 	FmgrInfo   *procinfo = NULL;
1927 
1928 	/*
1929 	 * Look up FmgrInfo of the user-defined choose function once, to save
1930 	 * cycles in the loop below.
1931 	 */
1932 	if (!isnull)
1933 		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1934 
1935 	/*
1936 	 * Prepare the leaf datum to insert.
1937 	 *
1938 	 * If an optional "compress" method is provided, then call it to form the
1939 	 * leaf key datum from the input datum.  Otherwise, store the input datum
1940 	 * as is.  Since we don't use index_form_tuple in this AM, we have to make
1941 	 * sure value to be inserted is not toasted; FormIndexDatum doesn't
1942 	 * guarantee that.  But we assume the "compress" method to return an
1943 	 * untoasted value.
1944 	 */
1945 	if (!isnull)
1946 	{
1947 		if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1948 		{
1949 			FmgrInfo   *compressProcinfo = NULL;
1950 
1951 			compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1952 			leafDatums[spgKeyColumn] =
1953 				FunctionCall1Coll(compressProcinfo,
1954 								  index->rd_indcollation[spgKeyColumn],
1955 								  datums[spgKeyColumn]);
1956 		}
1957 		else
1958 		{
1959 			Assert(state->attLeafType.type == state->attType.type);
1960 
1961 			if (state->attType.attlen == -1)
1962 				leafDatums[spgKeyColumn] =
1963 					PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1964 			else
1965 				leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1966 		}
1967 	}
1968 	else
1969 		leafDatums[spgKeyColumn] = (Datum) 0;
1970 
1971 	/* Likewise, ensure that any INCLUDE values are not toasted */
1972 	for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1973 	{
1974 		if (!isnulls[i])
1975 		{
1976 			if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1977 				leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1978 			else
1979 				leafDatums[i] = datums[i];
1980 		}
1981 		else
1982 			leafDatums[i] = (Datum) 0;
1983 	}
1984 
1985 	/*
1986 	 * Compute space needed for a leaf tuple containing the given data.
1987 	 */
1988 	leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1989 	/* Account for an item pointer, too */
1990 	leafSize += sizeof(ItemIdData);
1991 
1992 	/*
1993 	 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1994 	 * suffixing, bail out now rather than doing a lot of useless work.
1995 	 */
1996 	if (leafSize > SPGIST_PAGE_CAPACITY &&
1997 		(isnull || !state->config.longValuesOK))
1998 		ereport(ERROR,
1999 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2000 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2001 						leafSize - sizeof(ItemIdData),
2002 						SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2003 						RelationGetRelationName(index)),
2004 				 errhint("Values larger than a buffer page cannot be indexed.")));
2005 	bestLeafSize = leafSize;
2006 
2007 	/* Initialize "current" to the appropriate root page */
2008 	current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2009 	current.buffer = InvalidBuffer;
2010 	current.page = NULL;
2011 	current.offnum = FirstOffsetNumber;
2012 	current.node = -1;
2013 
2014 	/* "parent" is invalid for the moment */
2015 	parent.blkno = InvalidBlockNumber;
2016 	parent.buffer = InvalidBuffer;
2017 	parent.page = NULL;
2018 	parent.offnum = InvalidOffsetNumber;
2019 	parent.node = -1;
2020 
2021 	/*
2022 	 * Before entering the loop, try to clear any pending interrupt condition.
2023 	 * If a query cancel is pending, we might as well accept it now not later;
2024 	 * while if a non-canceling condition is pending, servicing it here avoids
2025 	 * having to restart the insertion and redo all the work so far.
2026 	 */
2027 	CHECK_FOR_INTERRUPTS();
2028 
2029 	for (;;)
2030 	{
2031 		bool		isNew = false;
2032 
2033 		/*
2034 		 * Bail out if query cancel is pending.  We must have this somewhere
2035 		 * in the loop since a broken opclass could produce an infinite
2036 		 * picksplit loop.  However, because we'll be holding buffer lock(s)
2037 		 * after the first iteration, ProcessInterrupts() wouldn't be able to
2038 		 * throw a cancel error here.  Hence, if we see that an interrupt is
2039 		 * pending, break out of the loop and deal with the situation below.
2040 		 * Set result = false because we must restart the insertion if the
2041 		 * interrupt isn't a query-cancel-or-die case.
2042 		 */
2043 		if (INTERRUPTS_PENDING_CONDITION())
2044 		{
2045 			result = false;
2046 			break;
2047 		}
2048 
2049 		if (current.blkno == InvalidBlockNumber)
2050 		{
2051 			/*
2052 			 * Create a leaf page.  If leafSize is too large to fit on a page,
2053 			 * we won't actually use the page yet, but it simplifies the API
2054 			 * for doPickSplit to always have a leaf page at hand; so just
2055 			 * quietly limit our request to a page size.
2056 			 */
2057 			current.buffer =
2058 				SpGistGetBuffer(index,
2059 								GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2060 								Min(leafSize, SPGIST_PAGE_CAPACITY),
2061 								&isNew);
2062 			current.blkno = BufferGetBlockNumber(current.buffer);
2063 		}
2064 		else if (parent.buffer == InvalidBuffer)
2065 		{
2066 			/* we hold no parent-page lock, so no deadlock is possible */
2067 			current.buffer = ReadBuffer(index, current.blkno);
2068 			LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2069 		}
2070 		else if (current.blkno != parent.blkno)
2071 		{
2072 			/* descend to a new child page */
2073 			current.buffer = ReadBuffer(index, current.blkno);
2074 
2075 			/*
2076 			 * Attempt to acquire lock on child page.  We must beware of
2077 			 * deadlock against another insertion process descending from that
2078 			 * page to our parent page (see README).  If we fail to get lock,
2079 			 * abandon the insertion and tell our caller to start over.
2080 			 *
2081 			 * XXX this could be improved, because failing to get lock on a
2082 			 * buffer is not proof of a deadlock situation; the lock might be
2083 			 * held by a reader, or even just background writer/checkpointer
2084 			 * process.  Perhaps it'd be worth retrying after sleeping a bit?
2085 			 */
2086 			if (!ConditionalLockBuffer(current.buffer))
2087 			{
2088 				ReleaseBuffer(current.buffer);
2089 				UnlockReleaseBuffer(parent.buffer);
2090 				return false;
2091 			}
2092 		}
2093 		else
2094 		{
2095 			/* inner tuple can be stored on the same page as parent one */
2096 			current.buffer = parent.buffer;
2097 		}
2098 		current.page = BufferGetPage(current.buffer);
2099 
2100 		/* should not arrive at a page of the wrong type */
2101 		if (isnull ? !SpGistPageStoresNulls(current.page) :
2102 			SpGistPageStoresNulls(current.page))
2103 			elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2104 				 current.blkno);
2105 
2106 		if (SpGistPageIsLeaf(current.page))
2107 		{
2108 			SpGistLeafTuple leafTuple;
2109 			int			nToSplit,
2110 						sizeToSplit;
2111 
2112 			leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2113 			if (leafTuple->size + sizeof(ItemIdData) <=
2114 				SpGistPageGetFreeSpace(current.page, 1))
2115 			{
2116 				/* it fits on page, so insert it and we're done */
2117 				addLeafTuple(index, state, leafTuple,
2118 							 &current, &parent, isnull, isNew);
2119 				break;
2120 			}
2121 			else if ((sizeToSplit =
2122 					  checkSplitConditions(index, state, &current,
2123 										   &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2124 					 nToSplit < 64 &&
2125 					 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2126 			{
2127 				/*
2128 				 * the amount of data is pretty small, so just move the whole
2129 				 * chain to another leaf page rather than splitting it.
2130 				 */
2131 				Assert(!isNew);
2132 				moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2133 				break;			/* we're done */
2134 			}
2135 			else
2136 			{
2137 				/* picksplit */
2138 				if (doPickSplit(index, state, &current, &parent,
2139 								leafTuple, level, isnull, isNew))
2140 					break;		/* doPickSplit installed new tuples */
2141 
2142 				/* leaf tuple will not be inserted yet */
2143 				pfree(leafTuple);
2144 
2145 				/*
2146 				 * current now describes new inner tuple, go insert into it
2147 				 */
2148 				Assert(!SpGistPageIsLeaf(current.page));
2149 				goto process_inner_tuple;
2150 			}
2151 		}
2152 		else					/* non-leaf page */
2153 		{
2154 			/*
2155 			 * Apply the opclass choose function to figure out how to insert
2156 			 * the given datum into the current inner tuple.
2157 			 */
2158 			SpGistInnerTuple innerTuple;
2159 			spgChooseIn in;
2160 			spgChooseOut out;
2161 
2162 			/*
2163 			 * spgAddNode and spgSplitTuple cases will loop back to here to
2164 			 * complete the insertion operation.  Just in case the choose
2165 			 * function is broken and produces add or split requests
2166 			 * repeatedly, check for query cancel (see comments above).
2167 			 */
2168 	process_inner_tuple:
2169 			if (INTERRUPTS_PENDING_CONDITION())
2170 			{
2171 				result = false;
2172 				break;
2173 			}
2174 
2175 			innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2176 														PageGetItemId(current.page, current.offnum));
2177 
2178 			in.datum = datums[spgKeyColumn];
2179 			in.leafDatum = leafDatums[spgKeyColumn];
2180 			in.level = level;
2181 			in.allTheSame = innerTuple->allTheSame;
2182 			in.hasPrefix = (innerTuple->prefixSize > 0);
2183 			in.prefixDatum = SGITDATUM(innerTuple, state);
2184 			in.nNodes = innerTuple->nNodes;
2185 			in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2186 
2187 			memset(&out, 0, sizeof(out));
2188 
2189 			if (!isnull)
2190 			{
2191 				/* use user-defined choose method */
2192 				FunctionCall2Coll(procinfo,
2193 								  index->rd_indcollation[0],
2194 								  PointerGetDatum(&in),
2195 								  PointerGetDatum(&out));
2196 			}
2197 			else
2198 			{
2199 				/* force "match" action (to insert to random subnode) */
2200 				out.resultType = spgMatchNode;
2201 			}
2202 
2203 			if (innerTuple->allTheSame)
2204 			{
2205 				/*
2206 				 * It's not allowed to do an AddNode at an allTheSame tuple.
2207 				 * Opclass must say "match", in which case we choose a random
2208 				 * one of the nodes to descend into, or "split".
2209 				 */
2210 				if (out.resultType == spgAddNode)
2211 					elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2212 				else if (out.resultType == spgMatchNode)
2213 					out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2214 			}
2215 
2216 			switch (out.resultType)
2217 			{
2218 				case spgMatchNode:
2219 					/* Descend to N'th child node */
2220 					spgMatchNodeAction(index, state, innerTuple,
2221 									   &current, &parent,
2222 									   out.result.matchNode.nodeN);
2223 					/* Adjust level as per opclass request */
2224 					level += out.result.matchNode.levelAdd;
2225 					/* Replace leafDatum and recompute leafSize */
2226 					if (!isnull)
2227 					{
2228 						leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2229 						leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2230 														  leafDatums, isnulls);
2231 						leafSize += sizeof(ItemIdData);
2232 					}
2233 
2234 					/*
2235 					 * Check new tuple size; fail if it can't fit, unless the
2236 					 * opclass says it can handle the situation by suffixing.
2237 					 *
2238 					 * However, the opclass can only shorten the leaf datum,
2239 					 * which may not be enough to ever make the tuple fit,
2240 					 * since INCLUDE columns might alone use more than a page.
2241 					 * Depending on the opclass' behavior, that could lead to
2242 					 * an infinite loop --- spgtextproc.c, for example, will
2243 					 * just repeatedly generate an empty-string leaf datum
2244 					 * once it runs out of data.  Actual bugs in opclasses
2245 					 * might cause infinite looping, too.  To detect such a
2246 					 * loop, check to see if we are making progress by
2247 					 * reducing the leafSize in each pass.  This is a bit
2248 					 * tricky though.  Because of alignment considerations,
2249 					 * the total tuple size might not decrease on every pass.
2250 					 * Also, there are edge cases where the choose method
2251 					 * might seem to not make progress for a cycle or two.
2252 					 * Somewhat arbitrarily, we allow up to 10 no-progress
2253 					 * iterations before failing.  (This limit should be more
2254 					 * than MAXALIGN, to accommodate opclasses that trim one
2255 					 * byte from the leaf datum per pass.)
2256 					 */
2257 					if (leafSize > SPGIST_PAGE_CAPACITY)
2258 					{
2259 						bool		ok = false;
2260 
2261 						if (state->config.longValuesOK && !isnull)
2262 						{
2263 							if (leafSize < bestLeafSize)
2264 							{
2265 								ok = true;
2266 								bestLeafSize = leafSize;
2267 								numNoProgressCycles = 0;
2268 							}
2269 							else if (++numNoProgressCycles < 10)
2270 								ok = true;
2271 						}
2272 						if (!ok)
2273 							ereport(ERROR,
2274 									(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2275 									 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2276 											leafSize - sizeof(ItemIdData),
2277 											SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2278 											RelationGetRelationName(index)),
2279 									 errhint("Values larger than a buffer page cannot be indexed.")));
2280 					}
2281 
2282 					/*
2283 					 * Loop around and attempt to insert the new leafDatum at
2284 					 * "current" (which might reference an existing child
2285 					 * tuple, or might be invalid to force us to find a new
2286 					 * page for the tuple).
2287 					 */
2288 					break;
2289 				case spgAddNode:
2290 					/* AddNode is not sensible if nodes don't have labels */
2291 					if (in.nodeLabels == NULL)
2292 						elog(ERROR, "cannot add a node to an inner tuple without node labels");
2293 					/* Add node to inner tuple, per request */
2294 					spgAddNodeAction(index, state, innerTuple,
2295 									 &current, &parent,
2296 									 out.result.addNode.nodeN,
2297 									 out.result.addNode.nodeLabel);
2298 
2299 					/*
2300 					 * Retry insertion into the enlarged node.  We assume that
2301 					 * we'll get a MatchNode result this time.
2302 					 */
2303 					goto process_inner_tuple;
2304 					break;
2305 				case spgSplitTuple:
2306 					/* Split inner tuple, per request */
2307 					spgSplitNodeAction(index, state, innerTuple,
2308 									   &current, &out);
2309 
2310 					/* Retry insertion into the split node */
2311 					goto process_inner_tuple;
2312 					break;
2313 				default:
2314 					elog(ERROR, "unrecognized SPGiST choose result: %d",
2315 						 (int) out.resultType);
2316 					break;
2317 			}
2318 		}
2319 	}							/* end loop */
2320 
2321 	/*
2322 	 * Release any buffers we're still holding.  Beware of possibility that
2323 	 * current and parent reference same buffer.
2324 	 */
2325 	if (current.buffer != InvalidBuffer)
2326 	{
2327 		SpGistSetLastUsedPage(index, current.buffer);
2328 		UnlockReleaseBuffer(current.buffer);
2329 	}
2330 	if (parent.buffer != InvalidBuffer &&
2331 		parent.buffer != current.buffer)
2332 	{
2333 		SpGistSetLastUsedPage(index, parent.buffer);
2334 		UnlockReleaseBuffer(parent.buffer);
2335 	}
2336 
2337 	/*
2338 	 * We do not support being called while some outer function is holding a
2339 	 * buffer lock (or any other reason to postpone query cancels).  If that
2340 	 * were the case, telling the caller to retry would create an infinite
2341 	 * loop.
2342 	 */
2343 	Assert(INTERRUPTS_CAN_BE_PROCESSED());
2344 
2345 	/*
2346 	 * Finally, check for interrupts again.  If there was a query cancel,
2347 	 * ProcessInterrupts() will be able to throw the error here.  If it was
2348 	 * some other kind of interrupt that can just be cleared, return false to
2349 	 * tell our caller to retry.
2350 	 */
2351 	CHECK_FOR_INTERRUPTS();
2352 
2353 	return result;
2354 }
2355