1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  *	  implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/bufmgr.h"
24 #include "utils/rel.h"
25 
26 
27 /*
28  * SPPageDesc tracks all info about a page we are inserting into.  In some
29  * situations it actually identifies a tuple, or even a specific node within
30  * an inner tuple.  But any of the fields can be invalid.  If the buffer
31  * field is valid, it implies we hold pin and exclusive lock on that buffer.
32  * page pointer should be valid exactly when buffer is.
33  */
34 typedef struct SPPageDesc
35 {
36 	BlockNumber blkno;			/* block number, or InvalidBlockNumber */
37 	Buffer		buffer;			/* page's buffer number, or InvalidBuffer */
38 	Page		page;			/* pointer to page buffer, or NULL */
39 	OffsetNumber offnum;		/* offset of tuple, or InvalidOffsetNumber */
40 	int			node;			/* node number within inner tuple, or -1 */
41 } SPPageDesc;
42 
43 
44 /*
45  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
46  * is used to update the parent inner tuple's downlink after a move or
47  * split operation.
48  */
49 void
spgUpdateNodeLink(SpGistInnerTuple tup,int nodeN,BlockNumber blkno,OffsetNumber offset)50 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
51 				  BlockNumber blkno, OffsetNumber offset)
52 {
53 	int			i;
54 	SpGistNodeTuple node;
55 
56 	SGITITERATE(tup, i, node)
57 	{
58 		if (i == nodeN)
59 		{
60 			ItemPointerSet(&node->t_tid, blkno, offset);
61 			return;
62 		}
63 	}
64 
65 	elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66 		 nodeN);
67 }
68 
69 /*
70  * Form a new inner tuple containing one more node than the given one, with
71  * the specified label datum, inserted at offset "offset" in the node array.
72  * The new tuple's prefix is the same as the old one's.
73  *
74  * Note that the new node initially has an invalid downlink.  We'll find a
75  * page to point it to later.
76  */
77 static SpGistInnerTuple
addNode(SpGistState * state,SpGistInnerTuple tuple,Datum label,int offset)78 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
79 {
80 	SpGistNodeTuple node,
81 			   *nodes;
82 	int			i;
83 
84 	/* if offset is negative, insert at end */
85 	if (offset < 0)
86 		offset = tuple->nNodes;
87 	else if (offset > tuple->nNodes)
88 		elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89 
90 	nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91 	SGITITERATE(tuple, i, node)
92 	{
93 		if (i < offset)
94 			nodes[i] = node;
95 		else
96 			nodes[i + 1] = node;
97 	}
98 
99 	nodes[offset] = spgFormNodeTuple(state, label, false);
100 
101 	return spgFormInnerTuple(state,
102 							 (tuple->prefixSize > 0),
103 							 SGITDATUM(tuple, state),
104 							 tuple->nNodes + 1,
105 							 nodes);
106 }
107 
108 /* qsort comparator for sorting OffsetNumbers */
109 static int
cmpOffsetNumbers(const void * a,const void * b)110 cmpOffsetNumbers(const void *a, const void *b)
111 {
112 	if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113 		return 0;
114 	return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
116 
117 /*
118  * Delete multiple tuples from an index page, preserving tuple offset numbers.
119  *
120  * The first tuple in the given list is replaced with a dead tuple of type
121  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122  * with dead tuples of type "reststate".  If either firststate or reststate
123  * is REDIRECT, blkno/offnum specify where to link to.
124  *
125  * NB: this is used during WAL replay, so beware of trying to make it too
126  * smart.  In particular, it shouldn't use "state" except for calling
127  * spgFormDeadTuple().  This is also used in a critical section, so no
128  * pallocs either!
129  */
130 void
spgPageIndexMultiDelete(SpGistState * state,Page page,OffsetNumber * itemnos,int nitems,int firststate,int reststate,BlockNumber blkno,OffsetNumber offnum)131 spgPageIndexMultiDelete(SpGistState *state, Page page,
132 						OffsetNumber *itemnos, int nitems,
133 						int firststate, int reststate,
134 						BlockNumber blkno, OffsetNumber offnum)
135 {
136 	OffsetNumber firstItem;
137 	OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 	SpGistDeadTuple tuple = NULL;
139 	int			i;
140 
141 	if (nitems == 0)
142 		return;					/* nothing to do */
143 
144 	/*
145 	 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 	 * targets to be listed in sorted order, so we have to sort the itemnos
147 	 * array.  (This also greatly simplifies the math for reinserting the
148 	 * replacement tuples.)  However, we must not scribble on the caller's
149 	 * array, so we have to make a copy.
150 	 */
151 	memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 	if (nitems > 1)
153 		qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 
155 	PageIndexMultiDelete(page, sortednos, nitems);
156 
157 	firstItem = itemnos[0];
158 
159 	for (i = 0; i < nitems; i++)
160 	{
161 		OffsetNumber itemno = sortednos[i];
162 		int			tupstate;
163 
164 		tupstate = (itemno == firstItem) ? firststate : reststate;
165 		if (tuple == NULL || tuple->tupstate != tupstate)
166 			tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 
168 		if (PageAddItem(page, (Item) tuple, tuple->size,
169 						itemno, false, false) != itemno)
170 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 				 tuple->size);
172 
173 		if (tupstate == SPGIST_REDIRECT)
174 			SpGistPageGetOpaque(page)->nRedirection++;
175 		else if (tupstate == SPGIST_PLACEHOLDER)
176 			SpGistPageGetOpaque(page)->nPlaceholder++;
177 	}
178 }
179 
180 /*
181  * Update the parent inner tuple's downlink, and mark the parent buffer
182  * dirty (this must be the last change to the parent page in the current
183  * WAL action).
184  */
185 static void
saveNodeLink(Relation index,SPPageDesc * parent,BlockNumber blkno,OffsetNumber offnum)186 saveNodeLink(Relation index, SPPageDesc *parent,
187 			 BlockNumber blkno, OffsetNumber offnum)
188 {
189 	SpGistInnerTuple innerTuple;
190 
191 	innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 												PageGetItemId(parent->page, parent->offnum));
193 
194 	spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 
196 	MarkBufferDirty(parent->buffer);
197 }
198 
199 /*
200  * Add a leaf tuple to a leaf page where there is known to be room for it
201  */
202 static void
addLeafTuple(Relation index,SpGistState * state,SpGistLeafTuple leafTuple,SPPageDesc * current,SPPageDesc * parent,bool isNulls,bool isNew)203 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 			 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 {
206 	spgxlogAddLeaf xlrec;
207 
208 	xlrec.newPage = isNew;
209 	xlrec.storesNulls = isNulls;
210 
211 	/* these will be filled below as needed */
212 	xlrec.offnumLeaf = InvalidOffsetNumber;
213 	xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 	xlrec.offnumParent = InvalidOffsetNumber;
215 	xlrec.nodeI = 0;
216 
217 	START_CRIT_SECTION();
218 
219 	if (current->offnum == InvalidOffsetNumber ||
220 		SpGistBlockIsRoot(current->blkno))
221 	{
222 		/* Tuple is not part of a chain */
223 		leafTuple->nextOffset = InvalidOffsetNumber;
224 		current->offnum = SpGistPageAddNewItem(state, current->page,
225 											   (Item) leafTuple, leafTuple->size,
226 											   NULL, false);
227 
228 		xlrec.offnumLeaf = current->offnum;
229 
230 		/* Must update parent's downlink if any */
231 		if (parent->buffer != InvalidBuffer)
232 		{
233 			xlrec.offnumParent = parent->offnum;
234 			xlrec.nodeI = parent->node;
235 
236 			saveNodeLink(index, parent, current->blkno, current->offnum);
237 		}
238 	}
239 	else
240 	{
241 		/*
242 		 * Tuple must be inserted into existing chain.  We mustn't change the
243 		 * chain's head address, but we don't need to chase the entire chain
244 		 * to put the tuple at the end; we can insert it second.
245 		 *
246 		 * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 		 * in which case we should replace the DEAD tuple in-place.
248 		 */
249 		SpGistLeafTuple head;
250 		OffsetNumber offnum;
251 
252 		head = (SpGistLeafTuple) PageGetItem(current->page,
253 											 PageGetItemId(current->page, current->offnum));
254 		if (head->tupstate == SPGIST_LIVE)
255 		{
256 			leafTuple->nextOffset = head->nextOffset;
257 			offnum = SpGistPageAddNewItem(state, current->page,
258 										  (Item) leafTuple, leafTuple->size,
259 										  NULL, false);
260 
261 			/*
262 			 * re-get head of list because it could have been moved on page,
263 			 * and set new second element
264 			 */
265 			head = (SpGistLeafTuple) PageGetItem(current->page,
266 												 PageGetItemId(current->page, current->offnum));
267 			head->nextOffset = offnum;
268 
269 			xlrec.offnumLeaf = offnum;
270 			xlrec.offnumHeadLeaf = current->offnum;
271 		}
272 		else if (head->tupstate == SPGIST_DEAD)
273 		{
274 			leafTuple->nextOffset = InvalidOffsetNumber;
275 			PageIndexTupleDelete(current->page, current->offnum);
276 			if (PageAddItem(current->page,
277 							(Item) leafTuple, leafTuple->size,
278 							current->offnum, false, false) != current->offnum)
279 				elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 					 leafTuple->size);
281 
282 			/* WAL replay distinguishes this case by equal offnums */
283 			xlrec.offnumLeaf = current->offnum;
284 			xlrec.offnumHeadLeaf = current->offnum;
285 		}
286 		else
287 			elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 	}
289 
290 	MarkBufferDirty(current->buffer);
291 
292 	if (RelationNeedsWAL(index) && !state->isBuild)
293 	{
294 		XLogRecPtr	recptr;
295 		int			flags;
296 
297 		XLogBeginInsert();
298 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 		XLogRegisterData((char *) leafTuple, leafTuple->size);
300 
301 		flags = REGBUF_STANDARD;
302 		if (xlrec.newPage)
303 			flags |= REGBUF_WILL_INIT;
304 		XLogRegisterBuffer(0, current->buffer, flags);
305 		if (xlrec.offnumParent != InvalidOffsetNumber)
306 			XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307 
308 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 
310 		PageSetLSN(current->page, recptr);
311 
312 		/* update parent only if we actually changed it */
313 		if (xlrec.offnumParent != InvalidOffsetNumber)
314 		{
315 			PageSetLSN(parent->page, recptr);
316 		}
317 	}
318 
319 	END_CRIT_SECTION();
320 }
321 
322 /*
323  * Count the number and total size of leaf tuples in the chain starting at
324  * current->offnum.  Return number into *nToSplit and total size as function
325  * result.
326  *
327  * Klugy special case when considering the root page (i.e., root is a leaf
328  * page, but we're about to split for the first time): return fake large
329  * values to force spgdoinsert() to take the doPickSplit rather than
330  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
331  */
332 static int
checkSplitConditions(Relation index,SpGistState * state,SPPageDesc * current,int * nToSplit)333 checkSplitConditions(Relation index, SpGistState *state,
334 					 SPPageDesc *current, int *nToSplit)
335 {
336 	int			i,
337 				n = 0,
338 				totalSize = 0;
339 
340 	if (SpGistBlockIsRoot(current->blkno))
341 	{
342 		/* return impossible values to force split */
343 		*nToSplit = BLCKSZ;
344 		return BLCKSZ;
345 	}
346 
347 	i = current->offnum;
348 	while (i != InvalidOffsetNumber)
349 	{
350 		SpGistLeafTuple it;
351 
352 		Assert(i >= FirstOffsetNumber &&
353 			   i <= PageGetMaxOffsetNumber(current->page));
354 		it = (SpGistLeafTuple) PageGetItem(current->page,
355 										   PageGetItemId(current->page, i));
356 		if (it->tupstate == SPGIST_LIVE)
357 		{
358 			n++;
359 			totalSize += it->size + sizeof(ItemIdData);
360 		}
361 		else if (it->tupstate == SPGIST_DEAD)
362 		{
363 			/* We could see a DEAD tuple as first/only chain item */
364 			Assert(i == current->offnum);
365 			Assert(it->nextOffset == InvalidOffsetNumber);
366 			/* Don't count it in result, because it won't go to other page */
367 		}
368 		else
369 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 
371 		i = it->nextOffset;
372 	}
373 
374 	*nToSplit = n;
375 
376 	return totalSize;
377 }
378 
379 /*
380  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381  * but the chain has to be moved because there's not enough room to add
382  * newLeafTuple to its page.  We use this method when the chain contains
383  * very little data so a split would be inefficient.  We are sure we can
384  * fit the chain plus newLeafTuple on one other page.
385  */
386 static void
moveLeafs(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,bool isNulls)387 moveLeafs(Relation index, SpGistState *state,
388 		  SPPageDesc *current, SPPageDesc *parent,
389 		  SpGistLeafTuple newLeafTuple, bool isNulls)
390 {
391 	int			i,
392 				nDelete,
393 				nInsert,
394 				size;
395 	Buffer		nbuf;
396 	Page		npage;
397 	SpGistLeafTuple it;
398 	OffsetNumber r = InvalidOffsetNumber,
399 				startOffset = InvalidOffsetNumber;
400 	bool		replaceDead = false;
401 	OffsetNumber *toDelete;
402 	OffsetNumber *toInsert;
403 	BlockNumber nblkno;
404 	spgxlogMoveLeafs xlrec;
405 	char	   *leafdata,
406 			   *leafptr;
407 
408 	/* This doesn't work on root page */
409 	Assert(parent->buffer != InvalidBuffer);
410 	Assert(parent->buffer != current->buffer);
411 
412 	/* Locate the tuples to be moved, and count up the space needed */
413 	i = PageGetMaxOffsetNumber(current->page);
414 	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 
417 	size = newLeafTuple->size + sizeof(ItemIdData);
418 
419 	nDelete = 0;
420 	i = current->offnum;
421 	while (i != InvalidOffsetNumber)
422 	{
423 		SpGistLeafTuple it;
424 
425 		Assert(i >= FirstOffsetNumber &&
426 			   i <= PageGetMaxOffsetNumber(current->page));
427 		it = (SpGistLeafTuple) PageGetItem(current->page,
428 										   PageGetItemId(current->page, i));
429 
430 		if (it->tupstate == SPGIST_LIVE)
431 		{
432 			toDelete[nDelete] = i;
433 			size += it->size + sizeof(ItemIdData);
434 			nDelete++;
435 		}
436 		else if (it->tupstate == SPGIST_DEAD)
437 		{
438 			/* We could see a DEAD tuple as first/only chain item */
439 			Assert(i == current->offnum);
440 			Assert(it->nextOffset == InvalidOffsetNumber);
441 			/* We don't want to move it, so don't count it in size */
442 			toDelete[nDelete] = i;
443 			nDelete++;
444 			replaceDead = true;
445 		}
446 		else
447 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 
449 		i = it->nextOffset;
450 	}
451 
452 	/* Find a leaf page that will hold them */
453 	nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454 						   size, &xlrec.newPage);
455 	npage = BufferGetPage(nbuf);
456 	nblkno = BufferGetBlockNumber(nbuf);
457 	Assert(nblkno != current->blkno);
458 
459 	leafdata = leafptr = palloc(size);
460 
461 	START_CRIT_SECTION();
462 
463 	/* copy all the old tuples to new page, unless they're dead */
464 	nInsert = 0;
465 	if (!replaceDead)
466 	{
467 		for (i = 0; i < nDelete; i++)
468 		{
469 			it = (SpGistLeafTuple) PageGetItem(current->page,
470 											   PageGetItemId(current->page, toDelete[i]));
471 			Assert(it->tupstate == SPGIST_LIVE);
472 
473 			/*
474 			 * Update chain link (notice the chain order gets reversed, but we
475 			 * don't care).  We're modifying the tuple on the source page
476 			 * here, but it's okay since we're about to delete it.
477 			 */
478 			it->nextOffset = r;
479 
480 			r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481 									 &startOffset, false);
482 
483 			toInsert[nInsert] = r;
484 			nInsert++;
485 
486 			/* save modified tuple into leafdata as well */
487 			memcpy(leafptr, it, it->size);
488 			leafptr += it->size;
489 		}
490 	}
491 
492 	/* add the new tuple as well */
493 	newLeafTuple->nextOffset = r;
494 	r = SpGistPageAddNewItem(state, npage,
495 							 (Item) newLeafTuple, newLeafTuple->size,
496 							 &startOffset, false);
497 	toInsert[nInsert] = r;
498 	nInsert++;
499 	memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500 	leafptr += newLeafTuple->size;
501 
502 	/*
503 	 * Now delete the old tuples, leaving a redirection pointer behind for the
504 	 * first one, unless we're doing an index build; in which case there can't
505 	 * be any concurrent scan so we need not provide a redirect.
506 	 */
507 	spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
508 							state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
509 							SPGIST_PLACEHOLDER,
510 							nblkno, r);
511 
512 	/* Update parent's downlink and mark parent page dirty */
513 	saveNodeLink(index, parent, nblkno, r);
514 
515 	/* Mark the leaf pages too */
516 	MarkBufferDirty(current->buffer);
517 	MarkBufferDirty(nbuf);
518 
519 	if (RelationNeedsWAL(index) && !state->isBuild)
520 	{
521 		XLogRecPtr	recptr;
522 
523 		/* prepare WAL info */
524 		STORE_STATE(state, xlrec.stateSrc);
525 
526 		xlrec.nMoves = nDelete;
527 		xlrec.replaceDead = replaceDead;
528 		xlrec.storesNulls = isNulls;
529 
530 		xlrec.offnumParent = parent->offnum;
531 		xlrec.nodeI = parent->node;
532 
533 		XLogBeginInsert();
534 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535 		XLogRegisterData((char *) toDelete,
536 						 sizeof(OffsetNumber) * nDelete);
537 		XLogRegisterData((char *) toInsert,
538 						 sizeof(OffsetNumber) * nInsert);
539 		XLogRegisterData((char *) leafdata, leafptr - leafdata);
540 
541 		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
542 		XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
543 		XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
544 
545 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546 
547 		PageSetLSN(current->page, recptr);
548 		PageSetLSN(npage, recptr);
549 		PageSetLSN(parent->page, recptr);
550 	}
551 
552 	END_CRIT_SECTION();
553 
554 	/* Update local free-space cache and release new buffer */
555 	SpGistSetLastUsedPage(index, nbuf);
556 	UnlockReleaseBuffer(nbuf);
557 }
558 
559 /*
560  * Update previously-created redirection tuple with appropriate destination
561  *
562  * We use this when it's not convenient to know the destination first.
563  * The tuple should have been made with the "impossible" destination of
564  * the metapage.
565  */
566 static void
setRedirectionTuple(SPPageDesc * current,OffsetNumber position,BlockNumber blkno,OffsetNumber offnum)567 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
568 					BlockNumber blkno, OffsetNumber offnum)
569 {
570 	SpGistDeadTuple dt;
571 
572 	dt = (SpGistDeadTuple) PageGetItem(current->page,
573 									   PageGetItemId(current->page, position));
574 	Assert(dt->tupstate == SPGIST_REDIRECT);
575 	Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
576 	ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
578 
579 /*
580  * Test to see if the user-defined picksplit function failed to do its job,
581  * ie, it put all the leaf tuples into the same node.
582  * If so, randomly divide the tuples into several nodes (all with the same
583  * label) and return true to select allTheSame mode for this inner tuple.
584  *
585  * (This code is also used to forcibly select allTheSame mode for nulls.)
586  *
587  * If we know that the leaf tuples wouldn't all fit on one page, then we
588  * exclude the last tuple (which is the incoming new tuple that forced a split)
589  * from the check to see if more than one node is used.  The reason for this
590  * is that if the existing tuples are put into only one chain, then even if
591  * we move them all to an empty page, there would still not be room for the
592  * new tuple, so we'd get into an infinite loop of picksplit attempts.
593  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594  * be split across pages.  (Exercise for the reader: figure out why this
595  * fixes the problem even when there is only one old tuple.)
596  */
597 static bool
checkAllTheSame(spgPickSplitIn * in,spgPickSplitOut * out,bool tooBig,bool * includeNew)598 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599 				bool *includeNew)
600 {
601 	int			theNode;
602 	int			limit;
603 	int			i;
604 
605 	/* For the moment, assume we can include the new leaf tuple */
606 	*includeNew = true;
607 
608 	/* If there's only the new leaf tuple, don't select allTheSame mode */
609 	if (in->nTuples <= 1)
610 		return false;
611 
612 	/* If tuple set doesn't fit on one page, ignore the new tuple in test */
613 	limit = tooBig ? in->nTuples - 1 : in->nTuples;
614 
615 	/* Check to see if more than one node is populated */
616 	theNode = out->mapTuplesToNodes[0];
617 	for (i = 1; i < limit; i++)
618 	{
619 		if (out->mapTuplesToNodes[i] != theNode)
620 			return false;
621 	}
622 
623 	/* Nope, so override the picksplit function's decisions */
624 
625 	/* If the new tuple is in its own node, it can't be included in split */
626 	if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627 		*includeNew = false;
628 
629 	out->nNodes = 8;			/* arbitrary number of child nodes */
630 
631 	/* Random assignment of tuples to nodes (note we include new tuple) */
632 	for (i = 0; i < in->nTuples; i++)
633 		out->mapTuplesToNodes[i] = i % out->nNodes;
634 
635 	/* The opclass may not use node labels, but if it does, duplicate 'em */
636 	if (out->nodeLabels)
637 	{
638 		Datum		theLabel = out->nodeLabels[theNode];
639 
640 		out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641 		for (i = 0; i < out->nNodes; i++)
642 			out->nodeLabels[i] = theLabel;
643 	}
644 
645 	/* We don't touch the prefix or the leaf tuple datum assignments */
646 
647 	return true;
648 }
649 
650 /*
651  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652  * but the chain has to be split because there's not enough room to add
653  * newLeafTuple to its page.
654  *
655  * This function splits the leaf tuple set according to picksplit's rules,
656  * creating one or more new chains that are spread across the current page
657  * and an additional leaf page (we assume that two leaf pages will be
658  * sufficient).  A new inner tuple is created, and the parent downlink
659  * pointer is updated to point to that inner tuple instead of the leaf chain.
660  *
661  * On exit, current contains the address of the new inner tuple.
662  *
663  * Returns true if we successfully inserted newLeafTuple during this function,
664  * false if caller still has to do it (meaning another picksplit operation is
665  * probably needed).  Failure could occur if the picksplit result is fairly
666  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667  * Because we force the picksplit result to be at least two chains, each
668  * cycle will get rid of at least one leaf tuple from the chain, so the loop
669  * will eventually terminate if lack of balance is the issue.  If the tuple
670  * is too big, we assume that repeated picksplit operations will eventually
671  * make it small enough by repeated prefix-stripping.  A broken opclass could
672  * make this an infinite loop, though, so spgdoinsert() checks that the
673  * leaf datums get smaller each time.
674  */
675 static bool
doPickSplit(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,int level,bool isNulls,bool isNew)676 doPickSplit(Relation index, SpGistState *state,
677 			SPPageDesc *current, SPPageDesc *parent,
678 			SpGistLeafTuple newLeafTuple,
679 			int level, bool isNulls, bool isNew)
680 {
681 	bool		insertedNew = false;
682 	spgPickSplitIn in;
683 	spgPickSplitOut out;
684 	FmgrInfo   *procinfo;
685 	bool		includeNew;
686 	int			i,
687 				max,
688 				n;
689 	SpGistInnerTuple innerTuple;
690 	SpGistNodeTuple node,
691 			   *nodes;
692 	Buffer		newInnerBuffer,
693 				newLeafBuffer;
694 	ItemPointerData *heapPtrs;
695 	uint8	   *leafPageSelect;
696 	int		   *leafSizes;
697 	OffsetNumber *toDelete;
698 	OffsetNumber *toInsert;
699 	OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700 	OffsetNumber startOffsets[2];
701 	SpGistLeafTuple *newLeafs;
702 	int			spaceToDelete;
703 	int			currentFreeSpace;
704 	int			totalLeafSizes;
705 	bool		allTheSame;
706 	spgxlogPickSplit xlrec;
707 	char	   *leafdata,
708 			   *leafptr;
709 	SPPageDesc	saveCurrent;
710 	int			nToDelete,
711 				nToInsert,
712 				maxToInclude;
713 
714 	in.level = level;
715 
716 	/*
717 	 * Allocate per-leaf-tuple work arrays with max possible size
718 	 */
719 	max = PageGetMaxOffsetNumber(current->page);
720 	n = max + 1;
721 	in.datums = (Datum *) palloc(sizeof(Datum) * n);
722 	heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
723 	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725 	newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
726 	leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
727 
728 	STORE_STATE(state, xlrec.stateSrc);
729 
730 	/*
731 	 * Form list of leaf tuples which will be distributed as split result;
732 	 * also, count up the amount of space that will be freed from current.
733 	 * (Note that in the non-root case, we won't actually delete the old
734 	 * tuples, only replace them with redirects or placeholders.)
735 	 *
736 	 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
737 	 * page.  For a pass-by-value data type we will fetch a word that must
738 	 * exist even though it may contain garbage (because of the fact that leaf
739 	 * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
740 	 * we are just computing a pointer that isn't going to get dereferenced.
741 	 * So it's not worth guarding the calls with isNulls checks.
742 	 */
743 	nToInsert = 0;
744 	nToDelete = 0;
745 	spaceToDelete = 0;
746 	if (SpGistBlockIsRoot(current->blkno))
747 	{
748 		/*
749 		 * We are splitting the root (which up to now is also a leaf page).
750 		 * Its tuples are not linked, so scan sequentially to get them all. We
751 		 * ignore the original value of current->offnum.
752 		 */
753 		for (i = FirstOffsetNumber; i <= max; i++)
754 		{
755 			SpGistLeafTuple it;
756 
757 			it = (SpGistLeafTuple) PageGetItem(current->page,
758 											   PageGetItemId(current->page, i));
759 			if (it->tupstate == SPGIST_LIVE)
760 			{
761 				in.datums[nToInsert] = SGLTDATUM(it, state);
762 				heapPtrs[nToInsert] = it->heapPtr;
763 				nToInsert++;
764 				toDelete[nToDelete] = i;
765 				nToDelete++;
766 				/* we will delete the tuple altogether, so count full space */
767 				spaceToDelete += it->size + sizeof(ItemIdData);
768 			}
769 			else				/* tuples on root should be live */
770 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
771 		}
772 	}
773 	else
774 	{
775 		/* Normal case, just collect the leaf tuples in the chain */
776 		i = current->offnum;
777 		while (i != InvalidOffsetNumber)
778 		{
779 			SpGistLeafTuple it;
780 
781 			Assert(i >= FirstOffsetNumber && i <= max);
782 			it = (SpGistLeafTuple) PageGetItem(current->page,
783 											   PageGetItemId(current->page, i));
784 			if (it->tupstate == SPGIST_LIVE)
785 			{
786 				in.datums[nToInsert] = SGLTDATUM(it, state);
787 				heapPtrs[nToInsert] = it->heapPtr;
788 				nToInsert++;
789 				toDelete[nToDelete] = i;
790 				nToDelete++;
791 				/* we will not delete the tuple, only replace with dead */
792 				Assert(it->size >= SGDTSIZE);
793 				spaceToDelete += it->size - SGDTSIZE;
794 			}
795 			else if (it->tupstate == SPGIST_DEAD)
796 			{
797 				/* We could see a DEAD tuple as first/only chain item */
798 				Assert(i == current->offnum);
799 				Assert(it->nextOffset == InvalidOffsetNumber);
800 				toDelete[nToDelete] = i;
801 				nToDelete++;
802 				/* replacing it with redirect will save no space */
803 			}
804 			else
805 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
806 
807 			i = it->nextOffset;
808 		}
809 	}
810 	in.nTuples = nToInsert;
811 
812 	/*
813 	 * We may not actually insert new tuple because another picksplit may be
814 	 * necessary due to too large value, but we will try to allocate enough
815 	 * space to include it; and in any case it has to be included in the input
816 	 * for the picksplit function.  So don't increment nToInsert yet.
817 	 */
818 	in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
819 	heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
820 	in.nTuples++;
821 
822 	memset(&out, 0, sizeof(out));
823 
824 	if (!isNulls)
825 	{
826 		/*
827 		 * Perform split using user-defined method.
828 		 */
829 		procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
830 		FunctionCall2Coll(procinfo,
831 						  index->rd_indcollation[0],
832 						  PointerGetDatum(&in),
833 						  PointerGetDatum(&out));
834 
835 		/*
836 		 * Form new leaf tuples and count up the total space needed.
837 		 */
838 		totalLeafSizes = 0;
839 		for (i = 0; i < in.nTuples; i++)
840 		{
841 			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
842 										   out.leafTupleDatums[i],
843 										   false);
844 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
845 		}
846 	}
847 	else
848 	{
849 		/*
850 		 * Perform dummy split that puts all tuples into one node.
851 		 * checkAllTheSame will override this and force allTheSame mode.
852 		 */
853 		out.hasPrefix = false;
854 		out.nNodes = 1;
855 		out.nodeLabels = NULL;
856 		out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
857 
858 		/*
859 		 * Form new leaf tuples and count up the total space needed.
860 		 */
861 		totalLeafSizes = 0;
862 		for (i = 0; i < in.nTuples; i++)
863 		{
864 			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
865 										   (Datum) 0,
866 										   true);
867 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
868 		}
869 	}
870 
871 	/*
872 	 * Check to see if the picksplit function failed to separate the values,
873 	 * ie, it put them all into the same child node.  If so, select allTheSame
874 	 * mode and create a random split instead.  See comments for
875 	 * checkAllTheSame as to why we need to know if the new leaf tuples could
876 	 * fit on one page.
877 	 */
878 	allTheSame = checkAllTheSame(&in, &out,
879 								 totalLeafSizes > SPGIST_PAGE_CAPACITY,
880 								 &includeNew);
881 
882 	/*
883 	 * If checkAllTheSame decided we must exclude the new tuple, don't
884 	 * consider it any further.
885 	 */
886 	if (includeNew)
887 		maxToInclude = in.nTuples;
888 	else
889 	{
890 		maxToInclude = in.nTuples - 1;
891 		totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
892 	}
893 
894 	/*
895 	 * Allocate per-node work arrays.  Since checkAllTheSame could replace
896 	 * out.nNodes with a value larger than the number of tuples on the input
897 	 * page, we can't allocate these arrays before here.
898 	 */
899 	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
900 	leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
901 
902 	/*
903 	 * Form nodes of inner tuple and inner tuple itself
904 	 */
905 	for (i = 0; i < out.nNodes; i++)
906 	{
907 		Datum		label = (Datum) 0;
908 		bool		labelisnull = (out.nodeLabels == NULL);
909 
910 		if (!labelisnull)
911 			label = out.nodeLabels[i];
912 		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
913 	}
914 	innerTuple = spgFormInnerTuple(state,
915 								   out.hasPrefix, out.prefixDatum,
916 								   out.nNodes, nodes);
917 	innerTuple->allTheSame = allTheSame;
918 
919 	/*
920 	 * Update nodes[] array to point into the newly formed innerTuple, so that
921 	 * we can adjust their downlinks below.
922 	 */
923 	SGITITERATE(innerTuple, i, node)
924 	{
925 		nodes[i] = node;
926 	}
927 
928 	/*
929 	 * Re-scan new leaf tuples and count up the space needed under each node.
930 	 */
931 	for (i = 0; i < maxToInclude; i++)
932 	{
933 		n = out.mapTuplesToNodes[i];
934 		if (n < 0 || n >= out.nNodes)
935 			elog(ERROR, "inconsistent result of SPGiST picksplit function");
936 		leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
937 	}
938 
939 	/*
940 	 * To perform the split, we must insert a new inner tuple, which can't go
941 	 * on a leaf page; and unless we are splitting the root page, we must then
942 	 * update the parent tuple's downlink to point to the inner tuple.  If
943 	 * there is room, we'll put the new inner tuple on the same page as the
944 	 * parent tuple, otherwise we need another non-leaf buffer. But if the
945 	 * parent page is the root, we can't add the new inner tuple there,
946 	 * because the root page must have only one inner tuple.
947 	 */
948 	xlrec.initInner = false;
949 	if (parent->buffer != InvalidBuffer &&
950 		!SpGistBlockIsRoot(parent->blkno) &&
951 		(SpGistPageGetFreeSpace(parent->page, 1) >=
952 		 innerTuple->size + sizeof(ItemIdData)))
953 	{
954 		/* New inner tuple will fit on parent page */
955 		newInnerBuffer = parent->buffer;
956 	}
957 	else if (parent->buffer != InvalidBuffer)
958 	{
959 		/* Send tuple to page with next triple parity (see README) */
960 		newInnerBuffer = SpGistGetBuffer(index,
961 										 GBUF_INNER_PARITY(parent->blkno + 1) |
962 										 (isNulls ? GBUF_NULLS : 0),
963 										 innerTuple->size + sizeof(ItemIdData),
964 										 &xlrec.initInner);
965 	}
966 	else
967 	{
968 		/* Root page split ... inner tuple will go to root page */
969 		newInnerBuffer = InvalidBuffer;
970 	}
971 
972 	/*
973 	 * The new leaf tuples converted from the existing ones should require the
974 	 * same or less space, and therefore should all fit onto one page
975 	 * (although that's not necessarily the current page, since we can't
976 	 * delete the old tuples but only replace them with placeholders).
977 	 * However, the incoming new tuple might not also fit, in which case we
978 	 * might need another picksplit cycle to reduce it some more.
979 	 *
980 	 * If there's not room to put everything back onto the current page, then
981 	 * we decide on a per-node basis which tuples go to the new page. (We do
982 	 * it like that because leaf tuple chains can't cross pages, so we must
983 	 * place all leaf tuples belonging to the same parent node on the same
984 	 * page.)
985 	 *
986 	 * If we are splitting the root page (turning it from a leaf page into an
987 	 * inner page), then no leaf tuples can go back to the current page; they
988 	 * must all go somewhere else.
989 	 */
990 	if (!SpGistBlockIsRoot(current->blkno))
991 		currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
992 	else
993 		currentFreeSpace = 0;	/* prevent assigning any tuples to current */
994 
995 	xlrec.initDest = false;
996 
997 	if (totalLeafSizes <= currentFreeSpace)
998 	{
999 		/* All the leaf tuples will fit on current page */
1000 		newLeafBuffer = InvalidBuffer;
1001 		/* mark new leaf tuple as included in insertions, if allowed */
1002 		if (includeNew)
1003 		{
1004 			nToInsert++;
1005 			insertedNew = true;
1006 		}
1007 		for (i = 0; i < nToInsert; i++)
1008 			leafPageSelect[i] = 0;	/* signifies current page */
1009 	}
1010 	else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1011 	{
1012 		/*
1013 		 * We're trying to split up a long value by repeated suffixing, but
1014 		 * it's not going to fit yet.  Don't bother allocating a second leaf
1015 		 * buffer that we won't be able to use.
1016 		 */
1017 		newLeafBuffer = InvalidBuffer;
1018 		Assert(includeNew);
1019 		Assert(nToInsert == 0);
1020 	}
1021 	else
1022 	{
1023 		/* We will need another leaf page */
1024 		uint8	   *nodePageSelect;
1025 		int			curspace;
1026 		int			newspace;
1027 
1028 		newLeafBuffer = SpGistGetBuffer(index,
1029 										GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1030 										Min(totalLeafSizes,
1031 											SPGIST_PAGE_CAPACITY),
1032 										&xlrec.initDest);
1033 
1034 		/*
1035 		 * Attempt to assign node groups to the two pages.  We might fail to
1036 		 * do so, even if totalLeafSizes is less than the available space,
1037 		 * because we can't split a group across pages.
1038 		 */
1039 		nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1040 
1041 		curspace = currentFreeSpace;
1042 		newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1043 		for (i = 0; i < out.nNodes; i++)
1044 		{
1045 			if (leafSizes[i] <= curspace)
1046 			{
1047 				nodePageSelect[i] = 0;	/* signifies current page */
1048 				curspace -= leafSizes[i];
1049 			}
1050 			else
1051 			{
1052 				nodePageSelect[i] = 1;	/* signifies new leaf page */
1053 				newspace -= leafSizes[i];
1054 			}
1055 		}
1056 		if (curspace >= 0 && newspace >= 0)
1057 		{
1058 			/* Successful assignment, so we can include the new leaf tuple */
1059 			if (includeNew)
1060 			{
1061 				nToInsert++;
1062 				insertedNew = true;
1063 			}
1064 		}
1065 		else if (includeNew)
1066 		{
1067 			/* We must exclude the new leaf tuple from the split */
1068 			int			nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1069 
1070 			leafSizes[nodeOfNewTuple] -=
1071 				newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1072 
1073 			/* Repeat the node assignment process --- should succeed now */
1074 			curspace = currentFreeSpace;
1075 			newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1076 			for (i = 0; i < out.nNodes; i++)
1077 			{
1078 				if (leafSizes[i] <= curspace)
1079 				{
1080 					nodePageSelect[i] = 0;	/* signifies current page */
1081 					curspace -= leafSizes[i];
1082 				}
1083 				else
1084 				{
1085 					nodePageSelect[i] = 1;	/* signifies new leaf page */
1086 					newspace -= leafSizes[i];
1087 				}
1088 			}
1089 			if (curspace < 0 || newspace < 0)
1090 				elog(ERROR, "failed to divide leaf tuple groups across pages");
1091 		}
1092 		else
1093 		{
1094 			/* oops, we already excluded new tuple ... should not get here */
1095 			elog(ERROR, "failed to divide leaf tuple groups across pages");
1096 		}
1097 		/* Expand the per-node assignments to be shown per leaf tuple */
1098 		for (i = 0; i < nToInsert; i++)
1099 		{
1100 			n = out.mapTuplesToNodes[i];
1101 			leafPageSelect[i] = nodePageSelect[n];
1102 		}
1103 	}
1104 
1105 	/* Start preparing WAL record */
1106 	xlrec.nDelete = 0;
1107 	xlrec.initSrc = isNew;
1108 	xlrec.storesNulls = isNulls;
1109 	xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1110 
1111 	leafdata = leafptr = (char *) palloc(totalLeafSizes);
1112 
1113 	/* Here we begin making the changes to the target pages */
1114 	START_CRIT_SECTION();
1115 
1116 	/*
1117 	 * Delete old leaf tuples from current buffer, except when we're splitting
1118 	 * the root; in that case there's no need because we'll re-init the page
1119 	 * below.  We do this first to make room for reinserting new leaf tuples.
1120 	 */
1121 	if (!SpGistBlockIsRoot(current->blkno))
1122 	{
1123 		/*
1124 		 * Init buffer instead of deleting individual tuples, but only if
1125 		 * there aren't any other live tuples and only during build; otherwise
1126 		 * we need to set a redirection tuple for concurrent scans.
1127 		 */
1128 		if (state->isBuild &&
1129 			nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1130 			PageGetMaxOffsetNumber(current->page))
1131 		{
1132 			SpGistInitBuffer(current->buffer,
1133 							 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1134 			xlrec.initSrc = true;
1135 		}
1136 		else if (isNew)
1137 		{
1138 			/* don't expose the freshly init'd buffer as a backup block */
1139 			Assert(nToDelete == 0);
1140 		}
1141 		else
1142 		{
1143 			xlrec.nDelete = nToDelete;
1144 
1145 			if (!state->isBuild)
1146 			{
1147 				/*
1148 				 * Need to create redirect tuple (it will point to new inner
1149 				 * tuple) but right now the new tuple's location is not known
1150 				 * yet.  So, set the redirection pointer to "impossible" value
1151 				 * and remember its position to update tuple later.
1152 				 */
1153 				if (nToDelete > 0)
1154 					redirectTuplePos = toDelete[0];
1155 				spgPageIndexMultiDelete(state, current->page,
1156 										toDelete, nToDelete,
1157 										SPGIST_REDIRECT,
1158 										SPGIST_PLACEHOLDER,
1159 										SPGIST_METAPAGE_BLKNO,
1160 										FirstOffsetNumber);
1161 			}
1162 			else
1163 			{
1164 				/*
1165 				 * During index build there is not concurrent searches, so we
1166 				 * don't need to create redirection tuple.
1167 				 */
1168 				spgPageIndexMultiDelete(state, current->page,
1169 										toDelete, nToDelete,
1170 										SPGIST_PLACEHOLDER,
1171 										SPGIST_PLACEHOLDER,
1172 										InvalidBlockNumber,
1173 										InvalidOffsetNumber);
1174 			}
1175 		}
1176 	}
1177 
1178 	/*
1179 	 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1180 	 * nodes.
1181 	 */
1182 	startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1183 	for (i = 0; i < nToInsert; i++)
1184 	{
1185 		SpGistLeafTuple it = newLeafs[i];
1186 		Buffer		leafBuffer;
1187 		BlockNumber leafBlock;
1188 		OffsetNumber newoffset;
1189 
1190 		/* Which page is it going to? */
1191 		leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1192 		leafBlock = BufferGetBlockNumber(leafBuffer);
1193 
1194 		/* Link tuple into correct chain for its node */
1195 		n = out.mapTuplesToNodes[i];
1196 
1197 		if (ItemPointerIsValid(&nodes[n]->t_tid))
1198 		{
1199 			Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1200 			it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1201 		}
1202 		else
1203 			it->nextOffset = InvalidOffsetNumber;
1204 
1205 		/* Insert it on page */
1206 		newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1207 										 (Item) it, it->size,
1208 										 &startOffsets[leafPageSelect[i]],
1209 										 false);
1210 		toInsert[i] = newoffset;
1211 
1212 		/* ... and complete the chain linking */
1213 		ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1214 
1215 		/* Also copy leaf tuple into WAL data */
1216 		memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1217 		leafptr += newLeafs[i]->size;
1218 	}
1219 
1220 	/*
1221 	 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1222 	 * current->buffer will be marked below, after we're entirely done
1223 	 * modifying it.
1224 	 */
1225 	if (newLeafBuffer != InvalidBuffer)
1226 	{
1227 		MarkBufferDirty(newLeafBuffer);
1228 	}
1229 
1230 	/* Remember current buffer, since we're about to change "current" */
1231 	saveCurrent = *current;
1232 
1233 	/*
1234 	 * Store the new innerTuple
1235 	 */
1236 	if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1237 	{
1238 		/*
1239 		 * new inner tuple goes to parent page
1240 		 */
1241 		Assert(current->buffer != parent->buffer);
1242 
1243 		/* Repoint "current" at the new inner tuple */
1244 		current->blkno = parent->blkno;
1245 		current->buffer = parent->buffer;
1246 		current->page = parent->page;
1247 		xlrec.offnumInner = current->offnum =
1248 			SpGistPageAddNewItem(state, current->page,
1249 								 (Item) innerTuple, innerTuple->size,
1250 								 NULL, false);
1251 
1252 		/*
1253 		 * Update parent node link and mark parent page dirty
1254 		 */
1255 		xlrec.innerIsParent = true;
1256 		xlrec.offnumParent = parent->offnum;
1257 		xlrec.nodeI = parent->node;
1258 		saveNodeLink(index, parent, current->blkno, current->offnum);
1259 
1260 		/*
1261 		 * Update redirection link (in old current buffer)
1262 		 */
1263 		if (redirectTuplePos != InvalidOffsetNumber)
1264 			setRedirectionTuple(&saveCurrent, redirectTuplePos,
1265 								current->blkno, current->offnum);
1266 
1267 		/* Done modifying old current buffer, mark it dirty */
1268 		MarkBufferDirty(saveCurrent.buffer);
1269 	}
1270 	else if (parent->buffer != InvalidBuffer)
1271 	{
1272 		/*
1273 		 * new inner tuple will be stored on a new page
1274 		 */
1275 		Assert(newInnerBuffer != InvalidBuffer);
1276 
1277 		/* Repoint "current" at the new inner tuple */
1278 		current->buffer = newInnerBuffer;
1279 		current->blkno = BufferGetBlockNumber(current->buffer);
1280 		current->page = BufferGetPage(current->buffer);
1281 		xlrec.offnumInner = current->offnum =
1282 			SpGistPageAddNewItem(state, current->page,
1283 								 (Item) innerTuple, innerTuple->size,
1284 								 NULL, false);
1285 
1286 		/* Done modifying new current buffer, mark it dirty */
1287 		MarkBufferDirty(current->buffer);
1288 
1289 		/*
1290 		 * Update parent node link and mark parent page dirty
1291 		 */
1292 		xlrec.innerIsParent = (parent->buffer == current->buffer);
1293 		xlrec.offnumParent = parent->offnum;
1294 		xlrec.nodeI = parent->node;
1295 		saveNodeLink(index, parent, current->blkno, current->offnum);
1296 
1297 		/*
1298 		 * Update redirection link (in old current buffer)
1299 		 */
1300 		if (redirectTuplePos != InvalidOffsetNumber)
1301 			setRedirectionTuple(&saveCurrent, redirectTuplePos,
1302 								current->blkno, current->offnum);
1303 
1304 		/* Done modifying old current buffer, mark it dirty */
1305 		MarkBufferDirty(saveCurrent.buffer);
1306 	}
1307 	else
1308 	{
1309 		/*
1310 		 * Splitting root page, which was a leaf but now becomes inner page
1311 		 * (and so "current" continues to point at it)
1312 		 */
1313 		Assert(SpGistBlockIsRoot(current->blkno));
1314 		Assert(redirectTuplePos == InvalidOffsetNumber);
1315 
1316 		SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1317 		xlrec.initInner = true;
1318 		xlrec.innerIsParent = false;
1319 
1320 		xlrec.offnumInner = current->offnum =
1321 			PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1322 						InvalidOffsetNumber, false, false);
1323 		if (current->offnum != FirstOffsetNumber)
1324 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1325 				 innerTuple->size);
1326 
1327 		/* No parent link to update, nor redirection to do */
1328 		xlrec.offnumParent = InvalidOffsetNumber;
1329 		xlrec.nodeI = 0;
1330 
1331 		/* Done modifying new current buffer, mark it dirty */
1332 		MarkBufferDirty(current->buffer);
1333 
1334 		/* saveCurrent doesn't represent a different buffer */
1335 		saveCurrent.buffer = InvalidBuffer;
1336 	}
1337 
1338 	if (RelationNeedsWAL(index) && !state->isBuild)
1339 	{
1340 		XLogRecPtr	recptr;
1341 		int			flags;
1342 
1343 		XLogBeginInsert();
1344 
1345 		xlrec.nInsert = nToInsert;
1346 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1347 
1348 		XLogRegisterData((char *) toDelete,
1349 						 sizeof(OffsetNumber) * xlrec.nDelete);
1350 		XLogRegisterData((char *) toInsert,
1351 						 sizeof(OffsetNumber) * xlrec.nInsert);
1352 		XLogRegisterData((char *) leafPageSelect,
1353 						 sizeof(uint8) * xlrec.nInsert);
1354 		XLogRegisterData((char *) innerTuple, innerTuple->size);
1355 		XLogRegisterData(leafdata, leafptr - leafdata);
1356 
1357 		/* Old leaf page */
1358 		if (BufferIsValid(saveCurrent.buffer))
1359 		{
1360 			flags = REGBUF_STANDARD;
1361 			if (xlrec.initSrc)
1362 				flags |= REGBUF_WILL_INIT;
1363 			XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1364 		}
1365 
1366 		/* New leaf page */
1367 		if (BufferIsValid(newLeafBuffer))
1368 		{
1369 			flags = REGBUF_STANDARD;
1370 			if (xlrec.initDest)
1371 				flags |= REGBUF_WILL_INIT;
1372 			XLogRegisterBuffer(1, newLeafBuffer, flags);
1373 		}
1374 
1375 		/* Inner page */
1376 		flags = REGBUF_STANDARD;
1377 		if (xlrec.initInner)
1378 			flags |= REGBUF_WILL_INIT;
1379 		XLogRegisterBuffer(2, current->buffer, flags);
1380 
1381 		/* Parent page, if different from inner page */
1382 		if (parent->buffer != InvalidBuffer)
1383 		{
1384 			if (parent->buffer != current->buffer)
1385 				XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1386 			else
1387 				Assert(xlrec.innerIsParent);
1388 		}
1389 
1390 		/* Issue the WAL record */
1391 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1392 
1393 		/* Update page LSNs on all affected pages */
1394 		if (newLeafBuffer != InvalidBuffer)
1395 		{
1396 			Page		page = BufferGetPage(newLeafBuffer);
1397 
1398 			PageSetLSN(page, recptr);
1399 		}
1400 
1401 		if (saveCurrent.buffer != InvalidBuffer)
1402 		{
1403 			Page		page = BufferGetPage(saveCurrent.buffer);
1404 
1405 			PageSetLSN(page, recptr);
1406 		}
1407 
1408 		PageSetLSN(current->page, recptr);
1409 
1410 		if (parent->buffer != InvalidBuffer)
1411 		{
1412 			PageSetLSN(parent->page, recptr);
1413 		}
1414 	}
1415 
1416 	END_CRIT_SECTION();
1417 
1418 	/* Update local free-space cache and unlock buffers */
1419 	if (newLeafBuffer != InvalidBuffer)
1420 	{
1421 		SpGistSetLastUsedPage(index, newLeafBuffer);
1422 		UnlockReleaseBuffer(newLeafBuffer);
1423 	}
1424 	if (saveCurrent.buffer != InvalidBuffer)
1425 	{
1426 		SpGistSetLastUsedPage(index, saveCurrent.buffer);
1427 		UnlockReleaseBuffer(saveCurrent.buffer);
1428 	}
1429 
1430 	return insertedNew;
1431 }
1432 
1433 /*
1434  * spgMatchNode action: descend to N'th child node of current inner tuple
1435  */
1436 static void
spgMatchNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN)1437 spgMatchNodeAction(Relation index, SpGistState *state,
1438 				   SpGistInnerTuple innerTuple,
1439 				   SPPageDesc *current, SPPageDesc *parent, int nodeN)
1440 {
1441 	int			i;
1442 	SpGistNodeTuple node;
1443 
1444 	/* Release previous parent buffer if any */
1445 	if (parent->buffer != InvalidBuffer &&
1446 		parent->buffer != current->buffer)
1447 	{
1448 		SpGistSetLastUsedPage(index, parent->buffer);
1449 		UnlockReleaseBuffer(parent->buffer);
1450 	}
1451 
1452 	/* Repoint parent to specified node of current inner tuple */
1453 	parent->blkno = current->blkno;
1454 	parent->buffer = current->buffer;
1455 	parent->page = current->page;
1456 	parent->offnum = current->offnum;
1457 	parent->node = nodeN;
1458 
1459 	/* Locate that node */
1460 	SGITITERATE(innerTuple, i, node)
1461 	{
1462 		if (i == nodeN)
1463 			break;
1464 	}
1465 
1466 	if (i != nodeN)
1467 		elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1468 			 nodeN);
1469 
1470 	/* Point current to the downlink location, if any */
1471 	if (ItemPointerIsValid(&node->t_tid))
1472 	{
1473 		current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1474 		current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1475 	}
1476 	else
1477 	{
1478 		/* Downlink is empty, so we'll need to find a new page */
1479 		current->blkno = InvalidBlockNumber;
1480 		current->offnum = InvalidOffsetNumber;
1481 	}
1482 
1483 	current->buffer = InvalidBuffer;
1484 	current->page = NULL;
1485 }
1486 
1487 /*
1488  * spgAddNode action: add a node to the inner tuple at current
1489  */
1490 static void
spgAddNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN,Datum nodeLabel)1491 spgAddNodeAction(Relation index, SpGistState *state,
1492 				 SpGistInnerTuple innerTuple,
1493 				 SPPageDesc *current, SPPageDesc *parent,
1494 				 int nodeN, Datum nodeLabel)
1495 {
1496 	SpGistInnerTuple newInnerTuple;
1497 	spgxlogAddNode xlrec;
1498 
1499 	/* Should not be applied to nulls */
1500 	Assert(!SpGistPageStoresNulls(current->page));
1501 
1502 	/* Construct new inner tuple with additional node */
1503 	newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1504 
1505 	/* Prepare WAL record */
1506 	STORE_STATE(state, xlrec.stateSrc);
1507 	xlrec.offnum = current->offnum;
1508 
1509 	/* we don't fill these unless we need to change the parent downlink */
1510 	xlrec.parentBlk = -1;
1511 	xlrec.offnumParent = InvalidOffsetNumber;
1512 	xlrec.nodeI = 0;
1513 
1514 	/* we don't fill these unless tuple has to be moved */
1515 	xlrec.offnumNew = InvalidOffsetNumber;
1516 	xlrec.newPage = false;
1517 
1518 	if (PageGetExactFreeSpace(current->page) >=
1519 		newInnerTuple->size - innerTuple->size)
1520 	{
1521 		/*
1522 		 * We can replace the inner tuple by new version in-place
1523 		 */
1524 		START_CRIT_SECTION();
1525 
1526 		PageIndexTupleDelete(current->page, current->offnum);
1527 		if (PageAddItem(current->page,
1528 						(Item) newInnerTuple, newInnerTuple->size,
1529 						current->offnum, false, false) != current->offnum)
1530 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1531 				 newInnerTuple->size);
1532 
1533 		MarkBufferDirty(current->buffer);
1534 
1535 		if (RelationNeedsWAL(index) && !state->isBuild)
1536 		{
1537 			XLogRecPtr	recptr;
1538 
1539 			XLogBeginInsert();
1540 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1541 			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1542 
1543 			XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1544 
1545 			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1546 
1547 			PageSetLSN(current->page, recptr);
1548 		}
1549 
1550 		END_CRIT_SECTION();
1551 	}
1552 	else
1553 	{
1554 		/*
1555 		 * move inner tuple to another page, and update parent
1556 		 */
1557 		SpGistDeadTuple dt;
1558 		SPPageDesc	saveCurrent;
1559 
1560 		/*
1561 		 * It should not be possible to get here for the root page, since we
1562 		 * allow only one inner tuple on the root page, and spgFormInnerTuple
1563 		 * always checks that inner tuples don't exceed the size of a page.
1564 		 */
1565 		if (SpGistBlockIsRoot(current->blkno))
1566 			elog(ERROR, "cannot enlarge root tuple any more");
1567 		Assert(parent->buffer != InvalidBuffer);
1568 
1569 		saveCurrent = *current;
1570 
1571 		xlrec.offnumParent = parent->offnum;
1572 		xlrec.nodeI = parent->node;
1573 
1574 		/*
1575 		 * obtain new buffer with the same parity as current, since it will be
1576 		 * a child of same parent tuple
1577 		 */
1578 		current->buffer = SpGistGetBuffer(index,
1579 										  GBUF_INNER_PARITY(current->blkno),
1580 										  newInnerTuple->size + sizeof(ItemIdData),
1581 										  &xlrec.newPage);
1582 		current->blkno = BufferGetBlockNumber(current->buffer);
1583 		current->page = BufferGetPage(current->buffer);
1584 
1585 		/*
1586 		 * Let's just make real sure new current isn't same as old.  Right now
1587 		 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1588 		 * delete placeholder tuples before checking space, maybe it wouldn't
1589 		 * be impossible.  The case would appear to work except that WAL
1590 		 * replay would be subtly wrong, so I think a mere assert isn't enough
1591 		 * here.
1592 		 */
1593 		if (current->blkno == saveCurrent.blkno)
1594 			elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1595 
1596 		/*
1597 		 * New current and parent buffer will both be modified; but note that
1598 		 * parent buffer could be same as either new or old current.
1599 		 */
1600 		if (parent->buffer == saveCurrent.buffer)
1601 			xlrec.parentBlk = 0;
1602 		else if (parent->buffer == current->buffer)
1603 			xlrec.parentBlk = 1;
1604 		else
1605 			xlrec.parentBlk = 2;
1606 
1607 		START_CRIT_SECTION();
1608 
1609 		/* insert new ... */
1610 		xlrec.offnumNew = current->offnum =
1611 			SpGistPageAddNewItem(state, current->page,
1612 								 (Item) newInnerTuple, newInnerTuple->size,
1613 								 NULL, false);
1614 
1615 		MarkBufferDirty(current->buffer);
1616 
1617 		/* update parent's downlink and mark parent page dirty */
1618 		saveNodeLink(index, parent, current->blkno, current->offnum);
1619 
1620 		/*
1621 		 * Replace old tuple with a placeholder or redirection tuple.  Unless
1622 		 * doing an index build, we have to insert a redirection tuple for
1623 		 * possible concurrent scans.  We can't just delete it in any case,
1624 		 * because that could change the offsets of other tuples on the page,
1625 		 * breaking downlinks from their parents.
1626 		 */
1627 		if (state->isBuild)
1628 			dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1629 								  InvalidBlockNumber, InvalidOffsetNumber);
1630 		else
1631 			dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1632 								  current->blkno, current->offnum);
1633 
1634 		PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1635 		if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1636 						saveCurrent.offnum,
1637 						false, false) != saveCurrent.offnum)
1638 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1639 				 dt->size);
1640 
1641 		if (state->isBuild)
1642 			SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1643 		else
1644 			SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1645 
1646 		MarkBufferDirty(saveCurrent.buffer);
1647 
1648 		if (RelationNeedsWAL(index) && !state->isBuild)
1649 		{
1650 			XLogRecPtr	recptr;
1651 			int			flags;
1652 
1653 			XLogBeginInsert();
1654 
1655 			/* orig page */
1656 			XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1657 			/* new page */
1658 			flags = REGBUF_STANDARD;
1659 			if (xlrec.newPage)
1660 				flags |= REGBUF_WILL_INIT;
1661 			XLogRegisterBuffer(1, current->buffer, flags);
1662 			/* parent page (if different from orig and new) */
1663 			if (xlrec.parentBlk == 2)
1664 				XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1665 
1666 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1667 			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1668 
1669 			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1670 
1671 			/* we don't bother to check if any of these are redundant */
1672 			PageSetLSN(current->page, recptr);
1673 			PageSetLSN(parent->page, recptr);
1674 			PageSetLSN(saveCurrent.page, recptr);
1675 		}
1676 
1677 		END_CRIT_SECTION();
1678 
1679 		/* Release saveCurrent if it's not same as current or parent */
1680 		if (saveCurrent.buffer != current->buffer &&
1681 			saveCurrent.buffer != parent->buffer)
1682 		{
1683 			SpGistSetLastUsedPage(index, saveCurrent.buffer);
1684 			UnlockReleaseBuffer(saveCurrent.buffer);
1685 		}
1686 	}
1687 }
1688 
1689 /*
1690  * spgSplitNode action: split inner tuple at current into prefix and postfix
1691  */
1692 static void
spgSplitNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,spgChooseOut * out)1693 spgSplitNodeAction(Relation index, SpGistState *state,
1694 				   SpGistInnerTuple innerTuple,
1695 				   SPPageDesc *current, spgChooseOut *out)
1696 {
1697 	SpGistInnerTuple prefixTuple,
1698 				postfixTuple;
1699 	SpGistNodeTuple node,
1700 			   *nodes;
1701 	BlockNumber postfixBlkno;
1702 	OffsetNumber postfixOffset;
1703 	int			i;
1704 	spgxlogSplitTuple xlrec;
1705 	Buffer		newBuffer = InvalidBuffer;
1706 
1707 	/* Should not be applied to nulls */
1708 	Assert(!SpGistPageStoresNulls(current->page));
1709 
1710 	/* Check opclass gave us sane values */
1711 	if (out->result.splitTuple.prefixNNodes <= 0 ||
1712 		out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1713 		elog(ERROR, "invalid number of prefix nodes: %d",
1714 			 out->result.splitTuple.prefixNNodes);
1715 	if (out->result.splitTuple.childNodeN < 0 ||
1716 		out->result.splitTuple.childNodeN >=
1717 		out->result.splitTuple.prefixNNodes)
1718 		elog(ERROR, "invalid child node number: %d",
1719 			 out->result.splitTuple.childNodeN);
1720 
1721 	/*
1722 	 * Construct new prefix tuple with requested number of nodes.  We'll fill
1723 	 * in the childNodeN'th node's downlink below.
1724 	 */
1725 	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1726 									   out->result.splitTuple.prefixNNodes);
1727 
1728 	for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1729 	{
1730 		Datum		label = (Datum) 0;
1731 		bool		labelisnull;
1732 
1733 		labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1734 		if (!labelisnull)
1735 			label = out->result.splitTuple.prefixNodeLabels[i];
1736 		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1737 	}
1738 
1739 	prefixTuple = spgFormInnerTuple(state,
1740 									out->result.splitTuple.prefixHasPrefix,
1741 									out->result.splitTuple.prefixPrefixDatum,
1742 									out->result.splitTuple.prefixNNodes,
1743 									nodes);
1744 
1745 	/* it must fit in the space that innerTuple now occupies */
1746 	if (prefixTuple->size > innerTuple->size)
1747 		elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1748 
1749 	/*
1750 	 * Construct new postfix tuple, containing all nodes of innerTuple with
1751 	 * same node datums, but with the prefix specified by the picksplit
1752 	 * function.
1753 	 */
1754 	nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1755 	SGITITERATE(innerTuple, i, node)
1756 	{
1757 		nodes[i] = node;
1758 	}
1759 
1760 	postfixTuple = spgFormInnerTuple(state,
1761 									 out->result.splitTuple.postfixHasPrefix,
1762 									 out->result.splitTuple.postfixPrefixDatum,
1763 									 innerTuple->nNodes, nodes);
1764 
1765 	/* Postfix tuple is allTheSame if original tuple was */
1766 	postfixTuple->allTheSame = innerTuple->allTheSame;
1767 
1768 	/* prep data for WAL record */
1769 	xlrec.newPage = false;
1770 
1771 	/*
1772 	 * If we can't fit both tuples on the current page, get a new page for the
1773 	 * postfix tuple.  In particular, can't split to the root page.
1774 	 *
1775 	 * For the space calculation, note that prefixTuple replaces innerTuple
1776 	 * but postfixTuple will be a new entry.
1777 	 */
1778 	if (SpGistBlockIsRoot(current->blkno) ||
1779 		SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1780 		prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1781 	{
1782 		/*
1783 		 * Choose page with next triple parity, because postfix tuple is a
1784 		 * child of prefix one
1785 		 */
1786 		newBuffer = SpGistGetBuffer(index,
1787 									GBUF_INNER_PARITY(current->blkno + 1),
1788 									postfixTuple->size + sizeof(ItemIdData),
1789 									&xlrec.newPage);
1790 	}
1791 
1792 	START_CRIT_SECTION();
1793 
1794 	/*
1795 	 * Replace old tuple by prefix tuple
1796 	 */
1797 	PageIndexTupleDelete(current->page, current->offnum);
1798 	xlrec.offnumPrefix = PageAddItem(current->page,
1799 									 (Item) prefixTuple, prefixTuple->size,
1800 									 current->offnum, false, false);
1801 	if (xlrec.offnumPrefix != current->offnum)
1802 		elog(ERROR, "failed to add item of size %u to SPGiST index page",
1803 			 prefixTuple->size);
1804 
1805 	/*
1806 	 * put postfix tuple into appropriate page
1807 	 */
1808 	if (newBuffer == InvalidBuffer)
1809 	{
1810 		postfixBlkno = current->blkno;
1811 		xlrec.offnumPostfix = postfixOffset =
1812 			SpGistPageAddNewItem(state, current->page,
1813 								 (Item) postfixTuple, postfixTuple->size,
1814 								 NULL, false);
1815 		xlrec.postfixBlkSame = true;
1816 	}
1817 	else
1818 	{
1819 		postfixBlkno = BufferGetBlockNumber(newBuffer);
1820 		xlrec.offnumPostfix = postfixOffset =
1821 			SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1822 								 (Item) postfixTuple, postfixTuple->size,
1823 								 NULL, false);
1824 		MarkBufferDirty(newBuffer);
1825 		xlrec.postfixBlkSame = false;
1826 	}
1827 
1828 	/*
1829 	 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1830 	 * (We can't avoid this step by doing the above two steps in opposite
1831 	 * order, because there might not be enough space on the page to insert
1832 	 * the postfix tuple first.)  We have to update the local copy of the
1833 	 * prefixTuple too, because that's what will be written to WAL.
1834 	 */
1835 	spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1836 					  postfixBlkno, postfixOffset);
1837 	prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1838 												 PageGetItemId(current->page, current->offnum));
1839 	spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1840 					  postfixBlkno, postfixOffset);
1841 
1842 	MarkBufferDirty(current->buffer);
1843 
1844 	if (RelationNeedsWAL(index) && !state->isBuild)
1845 	{
1846 		XLogRecPtr	recptr;
1847 
1848 		XLogBeginInsert();
1849 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1850 		XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1851 		XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1852 
1853 		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1854 		if (newBuffer != InvalidBuffer)
1855 		{
1856 			int			flags;
1857 
1858 			flags = REGBUF_STANDARD;
1859 			if (xlrec.newPage)
1860 				flags |= REGBUF_WILL_INIT;
1861 			XLogRegisterBuffer(1, newBuffer, flags);
1862 		}
1863 
1864 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1865 
1866 		PageSetLSN(current->page, recptr);
1867 
1868 		if (newBuffer != InvalidBuffer)
1869 		{
1870 			PageSetLSN(BufferGetPage(newBuffer), recptr);
1871 		}
1872 	}
1873 
1874 	END_CRIT_SECTION();
1875 
1876 	/* Update local free-space cache and release buffer */
1877 	if (newBuffer != InvalidBuffer)
1878 	{
1879 		SpGistSetLastUsedPage(index, newBuffer);
1880 		UnlockReleaseBuffer(newBuffer);
1881 	}
1882 }
1883 
1884 /*
1885  * Insert one item into the index.
1886  *
1887  * Returns true on success, false if we failed to complete the insertion
1888  * (typically because of conflict with a concurrent insert).  In the latter
1889  * case, caller should re-call spgdoinsert() with the same args.
1890  */
1891 bool
spgdoinsert(Relation index,SpGistState * state,ItemPointer heapPtr,Datum datum,bool isnull)1892 spgdoinsert(Relation index, SpGistState *state,
1893 			ItemPointer heapPtr, Datum datum, bool isnull)
1894 {
1895 	bool		result = true;
1896 	int			level = 0;
1897 	Datum		leafDatum;
1898 	int			leafSize;
1899 	int			bestLeafSize;
1900 	int			numNoProgressCycles = 0;
1901 	SPPageDesc	current,
1902 				parent;
1903 	FmgrInfo   *procinfo = NULL;
1904 
1905 	/*
1906 	 * Look up FmgrInfo of the user-defined choose function once, to save
1907 	 * cycles in the loop below.
1908 	 */
1909 	if (!isnull)
1910 		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1911 
1912 	/*
1913 	 * Prepare the leaf datum to insert.
1914 	 *
1915 	 * If an optional "compress" method is provided, then call it to form the
1916 	 * leaf datum from the input datum.  Otherwise store the input datum as
1917 	 * is.  Since we don't use index_form_tuple in this AM, we have to make
1918 	 * sure value to be inserted is not toasted; FormIndexDatum doesn't
1919 	 * guarantee that.  But we assume the "compress" method to return an
1920 	 * untoasted value.
1921 	 */
1922 	if (!isnull)
1923 	{
1924 		if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1925 		{
1926 			FmgrInfo   *compressProcinfo = NULL;
1927 
1928 			compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1929 			leafDatum = FunctionCall1Coll(compressProcinfo,
1930 										  index->rd_indcollation[0],
1931 										  datum);
1932 		}
1933 		else
1934 		{
1935 			Assert(state->attLeafType.type == state->attType.type);
1936 
1937 			if (state->attType.attlen == -1)
1938 				leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1939 			else
1940 				leafDatum = datum;
1941 		}
1942 	}
1943 	else
1944 		leafDatum = (Datum) 0;
1945 
1946 	/*
1947 	 * Compute space needed for a leaf tuple containing the given datum.
1948 	 *
1949 	 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1950 	 * suffixing, bail out now rather than doing a lot of useless work.
1951 	 */
1952 	if (!isnull)
1953 		leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1954 			SpGistGetTypeSize(&state->attLeafType, leafDatum);
1955 	else
1956 		leafSize = SGDTSIZE + sizeof(ItemIdData);
1957 
1958 	if (leafSize > SPGIST_PAGE_CAPACITY &&
1959 		(isnull || !state->config.longValuesOK))
1960 		ereport(ERROR,
1961 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1962 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1963 						leafSize - sizeof(ItemIdData),
1964 						SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1965 						RelationGetRelationName(index)),
1966 				 errhint("Values larger than a buffer page cannot be indexed.")));
1967 	bestLeafSize = leafSize;
1968 
1969 	/* Initialize "current" to the appropriate root page */
1970 	current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1971 	current.buffer = InvalidBuffer;
1972 	current.page = NULL;
1973 	current.offnum = FirstOffsetNumber;
1974 	current.node = -1;
1975 
1976 	/* "parent" is invalid for the moment */
1977 	parent.blkno = InvalidBlockNumber;
1978 	parent.buffer = InvalidBuffer;
1979 	parent.page = NULL;
1980 	parent.offnum = InvalidOffsetNumber;
1981 	parent.node = -1;
1982 
1983 	/*
1984 	 * Before entering the loop, try to clear any pending interrupt condition.
1985 	 * If a query cancel is pending, we might as well accept it now not later;
1986 	 * while if a non-canceling condition is pending, servicing it here avoids
1987 	 * having to restart the insertion and redo all the work so far.
1988 	 */
1989 	CHECK_FOR_INTERRUPTS();
1990 
1991 	for (;;)
1992 	{
1993 		bool		isNew = false;
1994 
1995 		/*
1996 		 * Bail out if query cancel is pending.  We must have this somewhere
1997 		 * in the loop since a broken opclass could produce an infinite
1998 		 * picksplit loop.  However, because we'll be holding buffer lock(s)
1999 		 * after the first iteration, ProcessInterrupts() wouldn't be able to
2000 		 * throw a cancel error here.  Hence, if we see that an interrupt is
2001 		 * pending, break out of the loop and deal with the situation below.
2002 		 * Set result = false because we must restart the insertion if the
2003 		 * interrupt isn't a query-cancel-or-die case.
2004 		 */
2005 		if (INTERRUPTS_PENDING_CONDITION())
2006 		{
2007 			result = false;
2008 			break;
2009 		}
2010 
2011 		if (current.blkno == InvalidBlockNumber)
2012 		{
2013 			/*
2014 			 * Create a leaf page.  If leafSize is too large to fit on a page,
2015 			 * we won't actually use the page yet, but it simplifies the API
2016 			 * for doPickSplit to always have a leaf page at hand; so just
2017 			 * quietly limit our request to a page size.
2018 			 */
2019 			current.buffer =
2020 				SpGistGetBuffer(index,
2021 								GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2022 								Min(leafSize, SPGIST_PAGE_CAPACITY),
2023 								&isNew);
2024 			current.blkno = BufferGetBlockNumber(current.buffer);
2025 		}
2026 		else if (parent.buffer == InvalidBuffer)
2027 		{
2028 			/* we hold no parent-page lock, so no deadlock is possible */
2029 			current.buffer = ReadBuffer(index, current.blkno);
2030 			LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2031 		}
2032 		else if (current.blkno != parent.blkno)
2033 		{
2034 			/* descend to a new child page */
2035 			current.buffer = ReadBuffer(index, current.blkno);
2036 
2037 			/*
2038 			 * Attempt to acquire lock on child page.  We must beware of
2039 			 * deadlock against another insertion process descending from that
2040 			 * page to our parent page (see README).  If we fail to get lock,
2041 			 * abandon the insertion and tell our caller to start over.
2042 			 *
2043 			 * XXX this could be improved, because failing to get lock on a
2044 			 * buffer is not proof of a deadlock situation; the lock might be
2045 			 * held by a reader, or even just background writer/checkpointer
2046 			 * process.  Perhaps it'd be worth retrying after sleeping a bit?
2047 			 */
2048 			if (!ConditionalLockBuffer(current.buffer))
2049 			{
2050 				ReleaseBuffer(current.buffer);
2051 				UnlockReleaseBuffer(parent.buffer);
2052 				return false;
2053 			}
2054 		}
2055 		else
2056 		{
2057 			/* inner tuple can be stored on the same page as parent one */
2058 			current.buffer = parent.buffer;
2059 		}
2060 		current.page = BufferGetPage(current.buffer);
2061 
2062 		/* should not arrive at a page of the wrong type */
2063 		if (isnull ? !SpGistPageStoresNulls(current.page) :
2064 			SpGistPageStoresNulls(current.page))
2065 			elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2066 				 current.blkno);
2067 
2068 		if (SpGistPageIsLeaf(current.page))
2069 		{
2070 			SpGistLeafTuple leafTuple;
2071 			int			nToSplit,
2072 						sizeToSplit;
2073 
2074 			leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2075 			if (leafTuple->size + sizeof(ItemIdData) <=
2076 				SpGistPageGetFreeSpace(current.page, 1))
2077 			{
2078 				/* it fits on page, so insert it and we're done */
2079 				addLeafTuple(index, state, leafTuple,
2080 							 &current, &parent, isnull, isNew);
2081 				break;
2082 			}
2083 			else if ((sizeToSplit =
2084 					  checkSplitConditions(index, state, &current,
2085 										   &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2086 					 nToSplit < 64 &&
2087 					 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2088 			{
2089 				/*
2090 				 * the amount of data is pretty small, so just move the whole
2091 				 * chain to another leaf page rather than splitting it.
2092 				 */
2093 				Assert(!isNew);
2094 				moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2095 				break;			/* we're done */
2096 			}
2097 			else
2098 			{
2099 				/* picksplit */
2100 				if (doPickSplit(index, state, &current, &parent,
2101 								leafTuple, level, isnull, isNew))
2102 					break;		/* doPickSplit installed new tuples */
2103 
2104 				/* leaf tuple will not be inserted yet */
2105 				pfree(leafTuple);
2106 
2107 				/*
2108 				 * current now describes new inner tuple, go insert into it
2109 				 */
2110 				Assert(!SpGistPageIsLeaf(current.page));
2111 				goto process_inner_tuple;
2112 			}
2113 		}
2114 		else					/* non-leaf page */
2115 		{
2116 			/*
2117 			 * Apply the opclass choose function to figure out how to insert
2118 			 * the given datum into the current inner tuple.
2119 			 */
2120 			SpGistInnerTuple innerTuple;
2121 			spgChooseIn in;
2122 			spgChooseOut out;
2123 
2124 			/*
2125 			 * spgAddNode and spgSplitTuple cases will loop back to here to
2126 			 * complete the insertion operation.  Just in case the choose
2127 			 * function is broken and produces add or split requests
2128 			 * repeatedly, check for query cancel (see comments above).
2129 			 */
2130 	process_inner_tuple:
2131 			if (INTERRUPTS_PENDING_CONDITION())
2132 			{
2133 				result = false;
2134 				break;
2135 			}
2136 
2137 			innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2138 														PageGetItemId(current.page, current.offnum));
2139 
2140 			in.datum = datum;
2141 			in.leafDatum = leafDatum;
2142 			in.level = level;
2143 			in.allTheSame = innerTuple->allTheSame;
2144 			in.hasPrefix = (innerTuple->prefixSize > 0);
2145 			in.prefixDatum = SGITDATUM(innerTuple, state);
2146 			in.nNodes = innerTuple->nNodes;
2147 			in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2148 
2149 			memset(&out, 0, sizeof(out));
2150 
2151 			if (!isnull)
2152 			{
2153 				/* use user-defined choose method */
2154 				FunctionCall2Coll(procinfo,
2155 								  index->rd_indcollation[0],
2156 								  PointerGetDatum(&in),
2157 								  PointerGetDatum(&out));
2158 			}
2159 			else
2160 			{
2161 				/* force "match" action (to insert to random subnode) */
2162 				out.resultType = spgMatchNode;
2163 			}
2164 
2165 			if (innerTuple->allTheSame)
2166 			{
2167 				/*
2168 				 * It's not allowed to do an AddNode at an allTheSame tuple.
2169 				 * Opclass must say "match", in which case we choose a random
2170 				 * one of the nodes to descend into, or "split".
2171 				 */
2172 				if (out.resultType == spgAddNode)
2173 					elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2174 				else if (out.resultType == spgMatchNode)
2175 					out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2176 			}
2177 
2178 			switch (out.resultType)
2179 			{
2180 				case spgMatchNode:
2181 					/* Descend to N'th child node */
2182 					spgMatchNodeAction(index, state, innerTuple,
2183 									   &current, &parent,
2184 									   out.result.matchNode.nodeN);
2185 					/* Adjust level as per opclass request */
2186 					level += out.result.matchNode.levelAdd;
2187 					/* Replace leafDatum and recompute leafSize */
2188 					if (!isnull)
2189 					{
2190 						leafDatum = out.result.matchNode.restDatum;
2191 						leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2192 							SpGistGetTypeSize(&state->attLeafType, leafDatum);
2193 					}
2194 
2195 					/*
2196 					 * Check new tuple size; fail if it can't fit, unless the
2197 					 * opclass says it can handle the situation by suffixing.
2198 					 *
2199 					 * A buggy opclass might not ever make the leaf datum
2200 					 * small enough, causing an infinite loop.  To detect such
2201 					 * a loop, check to see if we are making progress by
2202 					 * reducing the leafSize in each pass.  This is a bit
2203 					 * tricky though.  Because of alignment considerations,
2204 					 * the total tuple size might not decrease on every pass.
2205 					 * Also, there are edge cases where the choose method
2206 					 * might seem to not make progress for a cycle or two.
2207 					 * Somewhat arbitrarily, we allow up to 10 no-progress
2208 					 * iterations before failing.  (This limit should be more
2209 					 * than MAXALIGN, to accommodate opclasses that trim one
2210 					 * byte from the leaf datum per pass.)
2211 					 */
2212 					if (leafSize > SPGIST_PAGE_CAPACITY)
2213 					{
2214 						bool		ok = false;
2215 
2216 						if (state->config.longValuesOK && !isnull)
2217 						{
2218 							if (leafSize < bestLeafSize)
2219 							{
2220 								ok = true;
2221 								bestLeafSize = leafSize;
2222 								numNoProgressCycles = 0;
2223 							}
2224 							else if (++numNoProgressCycles < 10)
2225 								ok = true;
2226 						}
2227 						if (!ok)
2228 							ereport(ERROR,
2229 									(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2230 									 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2231 											leafSize - sizeof(ItemIdData),
2232 											SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2233 											RelationGetRelationName(index)),
2234 									 errhint("Values larger than a buffer page cannot be indexed.")));
2235 					}
2236 
2237 					/*
2238 					 * Loop around and attempt to insert the new leafDatum at
2239 					 * "current" (which might reference an existing child
2240 					 * tuple, or might be invalid to force us to find a new
2241 					 * page for the tuple).
2242 					 */
2243 					break;
2244 				case spgAddNode:
2245 					/* AddNode is not sensible if nodes don't have labels */
2246 					if (in.nodeLabels == NULL)
2247 						elog(ERROR, "cannot add a node to an inner tuple without node labels");
2248 					/* Add node to inner tuple, per request */
2249 					spgAddNodeAction(index, state, innerTuple,
2250 									 &current, &parent,
2251 									 out.result.addNode.nodeN,
2252 									 out.result.addNode.nodeLabel);
2253 
2254 					/*
2255 					 * Retry insertion into the enlarged node.  We assume that
2256 					 * we'll get a MatchNode result this time.
2257 					 */
2258 					goto process_inner_tuple;
2259 					break;
2260 				case spgSplitTuple:
2261 					/* Split inner tuple, per request */
2262 					spgSplitNodeAction(index, state, innerTuple,
2263 									   &current, &out);
2264 
2265 					/* Retry insertion into the split node */
2266 					goto process_inner_tuple;
2267 					break;
2268 				default:
2269 					elog(ERROR, "unrecognized SPGiST choose result: %d",
2270 						 (int) out.resultType);
2271 					break;
2272 			}
2273 		}
2274 	}							/* end loop */
2275 
2276 	/*
2277 	 * Release any buffers we're still holding.  Beware of possibility that
2278 	 * current and parent reference same buffer.
2279 	 */
2280 	if (current.buffer != InvalidBuffer)
2281 	{
2282 		SpGistSetLastUsedPage(index, current.buffer);
2283 		UnlockReleaseBuffer(current.buffer);
2284 	}
2285 	if (parent.buffer != InvalidBuffer &&
2286 		parent.buffer != current.buffer)
2287 	{
2288 		SpGistSetLastUsedPage(index, parent.buffer);
2289 		UnlockReleaseBuffer(parent.buffer);
2290 	}
2291 
2292 	/*
2293 	 * We do not support being called while some outer function is holding a
2294 	 * buffer lock (or any other reason to postpone query cancels).  If that
2295 	 * were the case, telling the caller to retry would create an infinite
2296 	 * loop.
2297 	 */
2298 	Assert(INTERRUPTS_CAN_BE_PROCESSED());
2299 
2300 	/*
2301 	 * Finally, check for interrupts again.  If there was a query cancel,
2302 	 * ProcessInterrupts() will be able to throw the error here.  If it was
2303 	 * some other kind of interrupt that can just be cleared, return false to
2304 	 * tell our caller to retry.
2305 	 */
2306 	CHECK_FOR_INTERRUPTS();
2307 
2308 	return result;
2309 }
2310