1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  *	  implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/xloginsert.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "utils/rel.h"
24 
25 
26 /*
27  * SPPageDesc tracks all info about a page we are inserting into.  In some
28  * situations it actually identifies a tuple, or even a specific node within
29  * an inner tuple.  But any of the fields can be invalid.  If the buffer
30  * field is valid, it implies we hold pin and exclusive lock on that buffer.
31  * page pointer should be valid exactly when buffer is.
32  */
33 typedef struct SPPageDesc
34 {
35 	BlockNumber blkno;			/* block number, or InvalidBlockNumber */
36 	Buffer		buffer;			/* page's buffer number, or InvalidBuffer */
37 	Page		page;			/* pointer to page buffer, or NULL */
38 	OffsetNumber offnum;		/* offset of tuple, or InvalidOffsetNumber */
39 	int			node;			/* node number within inner tuple, or -1 */
40 } SPPageDesc;
41 
42 
43 /*
44  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
45  * is used to update the parent inner tuple's downlink after a move or
46  * split operation.
47  */
48 void
spgUpdateNodeLink(SpGistInnerTuple tup,int nodeN,BlockNumber blkno,OffsetNumber offset)49 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
50 				  BlockNumber blkno, OffsetNumber offset)
51 {
52 	int			i;
53 	SpGistNodeTuple node;
54 
55 	SGITITERATE(tup, i, node)
56 	{
57 		if (i == nodeN)
58 		{
59 			ItemPointerSet(&node->t_tid, blkno, offset);
60 			return;
61 		}
62 	}
63 
64 	elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
65 		 nodeN);
66 }
67 
68 /*
69  * Form a new inner tuple containing one more node than the given one, with
70  * the specified label datum, inserted at offset "offset" in the node array.
71  * The new tuple's prefix is the same as the old one's.
72  *
73  * Note that the new node initially has an invalid downlink.  We'll find a
74  * page to point it to later.
75  */
76 static SpGistInnerTuple
addNode(SpGistState * state,SpGistInnerTuple tuple,Datum label,int offset)77 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
78 {
79 	SpGistNodeTuple node,
80 			   *nodes;
81 	int			i;
82 
83 	/* if offset is negative, insert at end */
84 	if (offset < 0)
85 		offset = tuple->nNodes;
86 	else if (offset > tuple->nNodes)
87 		elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
88 
89 	nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
90 	SGITITERATE(tuple, i, node)
91 	{
92 		if (i < offset)
93 			nodes[i] = node;
94 		else
95 			nodes[i + 1] = node;
96 	}
97 
98 	nodes[offset] = spgFormNodeTuple(state, label, false);
99 
100 	return spgFormInnerTuple(state,
101 							 (tuple->prefixSize > 0),
102 							 SGITDATUM(tuple, state),
103 							 tuple->nNodes + 1,
104 							 nodes);
105 }
106 
107 /* qsort comparator for sorting OffsetNumbers */
108 static int
cmpOffsetNumbers(const void * a,const void * b)109 cmpOffsetNumbers(const void *a, const void *b)
110 {
111 	if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
112 		return 0;
113 	return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
114 }
115 
116 /*
117  * Delete multiple tuples from an index page, preserving tuple offset numbers.
118  *
119  * The first tuple in the given list is replaced with a dead tuple of type
120  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
121  * with dead tuples of type "reststate".  If either firststate or reststate
122  * is REDIRECT, blkno/offnum specify where to link to.
123  *
124  * NB: this is used during WAL replay, so beware of trying to make it too
125  * smart.  In particular, it shouldn't use "state" except for calling
126  * spgFormDeadTuple().  This is also used in a critical section, so no
127  * pallocs either!
128  */
129 void
spgPageIndexMultiDelete(SpGistState * state,Page page,OffsetNumber * itemnos,int nitems,int firststate,int reststate,BlockNumber blkno,OffsetNumber offnum)130 spgPageIndexMultiDelete(SpGistState *state, Page page,
131 						OffsetNumber *itemnos, int nitems,
132 						int firststate, int reststate,
133 						BlockNumber blkno, OffsetNumber offnum)
134 {
135 	OffsetNumber firstItem;
136 	OffsetNumber sortednos[MaxIndexTuplesPerPage];
137 	SpGistDeadTuple tuple = NULL;
138 	int			i;
139 
140 	if (nitems == 0)
141 		return;					/* nothing to do */
142 
143 	/*
144 	 * For efficiency we want to use PageIndexMultiDelete, which requires the
145 	 * targets to be listed in sorted order, so we have to sort the itemnos
146 	 * array.  (This also greatly simplifies the math for reinserting the
147 	 * replacement tuples.)  However, we must not scribble on the caller's
148 	 * array, so we have to make a copy.
149 	 */
150 	memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
151 	if (nitems > 1)
152 		qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
153 
154 	PageIndexMultiDelete(page, sortednos, nitems);
155 
156 	firstItem = itemnos[0];
157 
158 	for (i = 0; i < nitems; i++)
159 	{
160 		OffsetNumber itemno = sortednos[i];
161 		int			tupstate;
162 
163 		tupstate = (itemno == firstItem) ? firststate : reststate;
164 		if (tuple == NULL || tuple->tupstate != tupstate)
165 			tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
166 
167 		if (PageAddItem(page, (Item) tuple, tuple->size,
168 						itemno, false, false) != itemno)
169 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 				 tuple->size);
171 
172 		if (tupstate == SPGIST_REDIRECT)
173 			SpGistPageGetOpaque(page)->nRedirection++;
174 		else if (tupstate == SPGIST_PLACEHOLDER)
175 			SpGistPageGetOpaque(page)->nPlaceholder++;
176 	}
177 }
178 
179 /*
180  * Update the parent inner tuple's downlink, and mark the parent buffer
181  * dirty (this must be the last change to the parent page in the current
182  * WAL action).
183  */
184 static void
saveNodeLink(Relation index,SPPageDesc * parent,BlockNumber blkno,OffsetNumber offnum)185 saveNodeLink(Relation index, SPPageDesc *parent,
186 			 BlockNumber blkno, OffsetNumber offnum)
187 {
188 	SpGistInnerTuple innerTuple;
189 
190 	innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191 								PageGetItemId(parent->page, parent->offnum));
192 
193 	spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194 
195 	MarkBufferDirty(parent->buffer);
196 }
197 
198 /*
199  * Add a leaf tuple to a leaf page where there is known to be room for it
200  */
201 static void
addLeafTuple(Relation index,SpGistState * state,SpGistLeafTuple leafTuple,SPPageDesc * current,SPPageDesc * parent,bool isNulls,bool isNew)202 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203 		   SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204 {
205 	spgxlogAddLeaf xlrec;
206 
207 	xlrec.newPage = isNew;
208 	xlrec.storesNulls = isNulls;
209 
210 	/* these will be filled below as needed */
211 	xlrec.offnumLeaf = InvalidOffsetNumber;
212 	xlrec.offnumHeadLeaf = InvalidOffsetNumber;
213 	xlrec.offnumParent = InvalidOffsetNumber;
214 	xlrec.nodeI = 0;
215 
216 	START_CRIT_SECTION();
217 
218 	if (current->offnum == InvalidOffsetNumber ||
219 		SpGistBlockIsRoot(current->blkno))
220 	{
221 		/* Tuple is not part of a chain */
222 		leafTuple->nextOffset = InvalidOffsetNumber;
223 		current->offnum = SpGistPageAddNewItem(state, current->page,
224 										   (Item) leafTuple, leafTuple->size,
225 											   NULL, false);
226 
227 		xlrec.offnumLeaf = current->offnum;
228 
229 		/* Must update parent's downlink if any */
230 		if (parent->buffer != InvalidBuffer)
231 		{
232 			xlrec.offnumParent = parent->offnum;
233 			xlrec.nodeI = parent->node;
234 
235 			saveNodeLink(index, parent, current->blkno, current->offnum);
236 		}
237 	}
238 	else
239 	{
240 		/*
241 		 * Tuple must be inserted into existing chain.  We mustn't change the
242 		 * chain's head address, but we don't need to chase the entire chain
243 		 * to put the tuple at the end; we can insert it second.
244 		 *
245 		 * Also, it's possible that the "chain" consists only of a DEAD tuple,
246 		 * in which case we should replace the DEAD tuple in-place.
247 		 */
248 		SpGistLeafTuple head;
249 		OffsetNumber offnum;
250 
251 		head = (SpGistLeafTuple) PageGetItem(current->page,
252 							  PageGetItemId(current->page, current->offnum));
253 		if (head->tupstate == SPGIST_LIVE)
254 		{
255 			leafTuple->nextOffset = head->nextOffset;
256 			offnum = SpGistPageAddNewItem(state, current->page,
257 										  (Item) leafTuple, leafTuple->size,
258 										  NULL, false);
259 
260 			/*
261 			 * re-get head of list because it could have been moved on page,
262 			 * and set new second element
263 			 */
264 			head = (SpGistLeafTuple) PageGetItem(current->page,
265 							  PageGetItemId(current->page, current->offnum));
266 			head->nextOffset = offnum;
267 
268 			xlrec.offnumLeaf = offnum;
269 			xlrec.offnumHeadLeaf = current->offnum;
270 		}
271 		else if (head->tupstate == SPGIST_DEAD)
272 		{
273 			leafTuple->nextOffset = InvalidOffsetNumber;
274 			PageIndexTupleDelete(current->page, current->offnum);
275 			if (PageAddItem(current->page,
276 							(Item) leafTuple, leafTuple->size,
277 							current->offnum, false, false) != current->offnum)
278 				elog(ERROR, "failed to add item of size %u to SPGiST index page",
279 					 leafTuple->size);
280 
281 			/* WAL replay distinguishes this case by equal offnums */
282 			xlrec.offnumLeaf = current->offnum;
283 			xlrec.offnumHeadLeaf = current->offnum;
284 		}
285 		else
286 			elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
287 	}
288 
289 	MarkBufferDirty(current->buffer);
290 
291 	if (RelationNeedsWAL(index))
292 	{
293 		XLogRecPtr	recptr;
294 		int			flags;
295 
296 		XLogBeginInsert();
297 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
298 		XLogRegisterData((char *) leafTuple, leafTuple->size);
299 
300 		flags = REGBUF_STANDARD;
301 		if (xlrec.newPage)
302 			flags |= REGBUF_WILL_INIT;
303 		XLogRegisterBuffer(0, current->buffer, flags);
304 		if (xlrec.offnumParent != InvalidOffsetNumber)
305 			XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
306 
307 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
308 
309 		PageSetLSN(current->page, recptr);
310 
311 		/* update parent only if we actually changed it */
312 		if (xlrec.offnumParent != InvalidOffsetNumber)
313 		{
314 			PageSetLSN(parent->page, recptr);
315 		}
316 	}
317 
318 	END_CRIT_SECTION();
319 }
320 
321 /*
322  * Count the number and total size of leaf tuples in the chain starting at
323  * current->offnum.  Return number into *nToSplit and total size as function
324  * result.
325  *
326  * Klugy special case when considering the root page (i.e., root is a leaf
327  * page, but we're about to split for the first time): return fake large
328  * values to force spgdoinsert() to take the doPickSplit rather than
329  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
330  */
331 static int
checkSplitConditions(Relation index,SpGistState * state,SPPageDesc * current,int * nToSplit)332 checkSplitConditions(Relation index, SpGistState *state,
333 					 SPPageDesc *current, int *nToSplit)
334 {
335 	int			i,
336 				n = 0,
337 				totalSize = 0;
338 
339 	if (SpGistBlockIsRoot(current->blkno))
340 	{
341 		/* return impossible values to force split */
342 		*nToSplit = BLCKSZ;
343 		return BLCKSZ;
344 	}
345 
346 	i = current->offnum;
347 	while (i != InvalidOffsetNumber)
348 	{
349 		SpGistLeafTuple it;
350 
351 		Assert(i >= FirstOffsetNumber &&
352 			   i <= PageGetMaxOffsetNumber(current->page));
353 		it = (SpGistLeafTuple) PageGetItem(current->page,
354 										   PageGetItemId(current->page, i));
355 		if (it->tupstate == SPGIST_LIVE)
356 		{
357 			n++;
358 			totalSize += it->size + sizeof(ItemIdData);
359 		}
360 		else if (it->tupstate == SPGIST_DEAD)
361 		{
362 			/* We could see a DEAD tuple as first/only chain item */
363 			Assert(i == current->offnum);
364 			Assert(it->nextOffset == InvalidOffsetNumber);
365 			/* Don't count it in result, because it won't go to other page */
366 		}
367 		else
368 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
369 
370 		i = it->nextOffset;
371 	}
372 
373 	*nToSplit = n;
374 
375 	return totalSize;
376 }
377 
378 /*
379  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
380  * but the chain has to be moved because there's not enough room to add
381  * newLeafTuple to its page.  We use this method when the chain contains
382  * very little data so a split would be inefficient.  We are sure we can
383  * fit the chain plus newLeafTuple on one other page.
384  */
385 static void
moveLeafs(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,bool isNulls)386 moveLeafs(Relation index, SpGistState *state,
387 		  SPPageDesc *current, SPPageDesc *parent,
388 		  SpGistLeafTuple newLeafTuple, bool isNulls)
389 {
390 	int			i,
391 				nDelete,
392 				nInsert,
393 				size;
394 	Buffer		nbuf;
395 	Page		npage;
396 	SpGistLeafTuple it;
397 	OffsetNumber r = InvalidOffsetNumber,
398 				startOffset = InvalidOffsetNumber;
399 	bool		replaceDead = false;
400 	OffsetNumber *toDelete;
401 	OffsetNumber *toInsert;
402 	BlockNumber nblkno;
403 	spgxlogMoveLeafs xlrec;
404 	char	   *leafdata,
405 			   *leafptr;
406 
407 	/* This doesn't work on root page */
408 	Assert(parent->buffer != InvalidBuffer);
409 	Assert(parent->buffer != current->buffer);
410 
411 	/* Locate the tuples to be moved, and count up the space needed */
412 	i = PageGetMaxOffsetNumber(current->page);
413 	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415 
416 	size = newLeafTuple->size + sizeof(ItemIdData);
417 
418 	nDelete = 0;
419 	i = current->offnum;
420 	while (i != InvalidOffsetNumber)
421 	{
422 		SpGistLeafTuple it;
423 
424 		Assert(i >= FirstOffsetNumber &&
425 			   i <= PageGetMaxOffsetNumber(current->page));
426 		it = (SpGistLeafTuple) PageGetItem(current->page,
427 										   PageGetItemId(current->page, i));
428 
429 		if (it->tupstate == SPGIST_LIVE)
430 		{
431 			toDelete[nDelete] = i;
432 			size += it->size + sizeof(ItemIdData);
433 			nDelete++;
434 		}
435 		else if (it->tupstate == SPGIST_DEAD)
436 		{
437 			/* We could see a DEAD tuple as first/only chain item */
438 			Assert(i == current->offnum);
439 			Assert(it->nextOffset == InvalidOffsetNumber);
440 			/* We don't want to move it, so don't count it in size */
441 			toDelete[nDelete] = i;
442 			nDelete++;
443 			replaceDead = true;
444 		}
445 		else
446 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447 
448 		i = it->nextOffset;
449 	}
450 
451 	/* Find a leaf page that will hold them */
452 	nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453 						   size, &xlrec.newPage);
454 	npage = BufferGetPage(nbuf);
455 	nblkno = BufferGetBlockNumber(nbuf);
456 	Assert(nblkno != current->blkno);
457 
458 	leafdata = leafptr = palloc(size);
459 
460 	START_CRIT_SECTION();
461 
462 	/* copy all the old tuples to new page, unless they're dead */
463 	nInsert = 0;
464 	if (!replaceDead)
465 	{
466 		for (i = 0; i < nDelete; i++)
467 		{
468 			it = (SpGistLeafTuple) PageGetItem(current->page,
469 								  PageGetItemId(current->page, toDelete[i]));
470 			Assert(it->tupstate == SPGIST_LIVE);
471 
472 			/*
473 			 * Update chain link (notice the chain order gets reversed, but we
474 			 * don't care).  We're modifying the tuple on the source page
475 			 * here, but it's okay since we're about to delete it.
476 			 */
477 			it->nextOffset = r;
478 
479 			r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
480 									 &startOffset, false);
481 
482 			toInsert[nInsert] = r;
483 			nInsert++;
484 
485 			/* save modified tuple into leafdata as well */
486 			memcpy(leafptr, it, it->size);
487 			leafptr += it->size;
488 		}
489 	}
490 
491 	/* add the new tuple as well */
492 	newLeafTuple->nextOffset = r;
493 	r = SpGistPageAddNewItem(state, npage,
494 							 (Item) newLeafTuple, newLeafTuple->size,
495 							 &startOffset, false);
496 	toInsert[nInsert] = r;
497 	nInsert++;
498 	memcpy(leafptr, newLeafTuple, newLeafTuple->size);
499 	leafptr += newLeafTuple->size;
500 
501 	/*
502 	 * Now delete the old tuples, leaving a redirection pointer behind for the
503 	 * first one, unless we're doing an index build; in which case there can't
504 	 * be any concurrent scan so we need not provide a redirect.
505 	 */
506 	spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
507 					   state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
508 							SPGIST_PLACEHOLDER,
509 							nblkno, r);
510 
511 	/* Update parent's downlink and mark parent page dirty */
512 	saveNodeLink(index, parent, nblkno, r);
513 
514 	/* Mark the leaf pages too */
515 	MarkBufferDirty(current->buffer);
516 	MarkBufferDirty(nbuf);
517 
518 	if (RelationNeedsWAL(index))
519 	{
520 		XLogRecPtr	recptr;
521 
522 		/* prepare WAL info */
523 		STORE_STATE(state, xlrec.stateSrc);
524 
525 		xlrec.nMoves = nDelete;
526 		xlrec.replaceDead = replaceDead;
527 		xlrec.storesNulls = isNulls;
528 
529 		xlrec.offnumParent = parent->offnum;
530 		xlrec.nodeI = parent->node;
531 
532 		XLogBeginInsert();
533 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
534 		XLogRegisterData((char *) toDelete,
535 						 sizeof(OffsetNumber) * nDelete);
536 		XLogRegisterData((char *) toInsert,
537 						 sizeof(OffsetNumber) * nInsert);
538 		XLogRegisterData((char *) leafdata, leafptr - leafdata);
539 
540 		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
541 		XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
542 		XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
543 
544 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
545 
546 		PageSetLSN(current->page, recptr);
547 		PageSetLSN(npage, recptr);
548 		PageSetLSN(parent->page, recptr);
549 	}
550 
551 	END_CRIT_SECTION();
552 
553 	/* Update local free-space cache and release new buffer */
554 	SpGistSetLastUsedPage(index, nbuf);
555 	UnlockReleaseBuffer(nbuf);
556 }
557 
558 /*
559  * Update previously-created redirection tuple with appropriate destination
560  *
561  * We use this when it's not convenient to know the destination first.
562  * The tuple should have been made with the "impossible" destination of
563  * the metapage.
564  */
565 static void
setRedirectionTuple(SPPageDesc * current,OffsetNumber position,BlockNumber blkno,OffsetNumber offnum)566 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
567 					BlockNumber blkno, OffsetNumber offnum)
568 {
569 	SpGistDeadTuple dt;
570 
571 	dt = (SpGistDeadTuple) PageGetItem(current->page,
572 									 PageGetItemId(current->page, position));
573 	Assert(dt->tupstate == SPGIST_REDIRECT);
574 	Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
575 	ItemPointerSet(&dt->pointer, blkno, offnum);
576 }
577 
578 /*
579  * Test to see if the user-defined picksplit function failed to do its job,
580  * ie, it put all the leaf tuples into the same node.
581  * If so, randomly divide the tuples into several nodes (all with the same
582  * label) and return TRUE to select allTheSame mode for this inner tuple.
583  *
584  * (This code is also used to forcibly select allTheSame mode for nulls.)
585  *
586  * If we know that the leaf tuples wouldn't all fit on one page, then we
587  * exclude the last tuple (which is the incoming new tuple that forced a split)
588  * from the check to see if more than one node is used.  The reason for this
589  * is that if the existing tuples are put into only one chain, then even if
590  * we move them all to an empty page, there would still not be room for the
591  * new tuple, so we'd get into an infinite loop of picksplit attempts.
592  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
593  * be split across pages.  (Exercise for the reader: figure out why this
594  * fixes the problem even when there is only one old tuple.)
595  */
596 static bool
checkAllTheSame(spgPickSplitIn * in,spgPickSplitOut * out,bool tooBig,bool * includeNew)597 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
598 				bool *includeNew)
599 {
600 	int			theNode;
601 	int			limit;
602 	int			i;
603 
604 	/* For the moment, assume we can include the new leaf tuple */
605 	*includeNew = true;
606 
607 	/* If there's only the new leaf tuple, don't select allTheSame mode */
608 	if (in->nTuples <= 1)
609 		return false;
610 
611 	/* If tuple set doesn't fit on one page, ignore the new tuple in test */
612 	limit = tooBig ? in->nTuples - 1 : in->nTuples;
613 
614 	/* Check to see if more than one node is populated */
615 	theNode = out->mapTuplesToNodes[0];
616 	for (i = 1; i < limit; i++)
617 	{
618 		if (out->mapTuplesToNodes[i] != theNode)
619 			return false;
620 	}
621 
622 	/* Nope, so override the picksplit function's decisions */
623 
624 	/* If the new tuple is in its own node, it can't be included in split */
625 	if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
626 		*includeNew = false;
627 
628 	out->nNodes = 8;			/* arbitrary number of child nodes */
629 
630 	/* Random assignment of tuples to nodes (note we include new tuple) */
631 	for (i = 0; i < in->nTuples; i++)
632 		out->mapTuplesToNodes[i] = i % out->nNodes;
633 
634 	/* The opclass may not use node labels, but if it does, duplicate 'em */
635 	if (out->nodeLabels)
636 	{
637 		Datum		theLabel = out->nodeLabels[theNode];
638 
639 		out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
640 		for (i = 0; i < out->nNodes; i++)
641 			out->nodeLabels[i] = theLabel;
642 	}
643 
644 	/* We don't touch the prefix or the leaf tuple datum assignments */
645 
646 	return true;
647 }
648 
649 /*
650  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
651  * but the chain has to be split because there's not enough room to add
652  * newLeafTuple to its page.
653  *
654  * This function splits the leaf tuple set according to picksplit's rules,
655  * creating one or more new chains that are spread across the current page
656  * and an additional leaf page (we assume that two leaf pages will be
657  * sufficient).  A new inner tuple is created, and the parent downlink
658  * pointer is updated to point to that inner tuple instead of the leaf chain.
659  *
660  * On exit, current contains the address of the new inner tuple.
661  *
662  * Returns true if we successfully inserted newLeafTuple during this function,
663  * false if caller still has to do it (meaning another picksplit operation is
664  * probably needed).  Failure could occur if the picksplit result is fairly
665  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
666  * Because we force the picksplit result to be at least two chains, each
667  * cycle will get rid of at least one leaf tuple from the chain, so the loop
668  * will eventually terminate if lack of balance is the issue.  If the tuple
669  * is too big, we assume that repeated picksplit operations will eventually
670  * make it small enough by repeated prefix-stripping.  A broken opclass could
671  * make this an infinite loop, though, so spgdoinsert() checks that the
672  * leaf datums get smaller each time.
673  */
674 static bool
doPickSplit(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,int level,bool isNulls,bool isNew)675 doPickSplit(Relation index, SpGistState *state,
676 			SPPageDesc *current, SPPageDesc *parent,
677 			SpGistLeafTuple newLeafTuple,
678 			int level, bool isNulls, bool isNew)
679 {
680 	bool		insertedNew = false;
681 	spgPickSplitIn in;
682 	spgPickSplitOut out;
683 	FmgrInfo   *procinfo;
684 	bool		includeNew;
685 	int			i,
686 				max,
687 				n;
688 	SpGistInnerTuple innerTuple;
689 	SpGistNodeTuple node,
690 			   *nodes;
691 	Buffer		newInnerBuffer,
692 				newLeafBuffer;
693 	ItemPointerData *heapPtrs;
694 	uint8	   *leafPageSelect;
695 	int		   *leafSizes;
696 	OffsetNumber *toDelete;
697 	OffsetNumber *toInsert;
698 	OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699 	OffsetNumber startOffsets[2];
700 	SpGistLeafTuple *newLeafs;
701 	int			spaceToDelete;
702 	int			currentFreeSpace;
703 	int			totalLeafSizes;
704 	bool		allTheSame;
705 	spgxlogPickSplit xlrec;
706 	char	   *leafdata,
707 			   *leafptr;
708 	SPPageDesc	saveCurrent;
709 	int			nToDelete,
710 				nToInsert,
711 				maxToInclude;
712 
713 	in.level = level;
714 
715 	/*
716 	 * Allocate per-leaf-tuple work arrays with max possible size
717 	 */
718 	max = PageGetMaxOffsetNumber(current->page);
719 	n = max + 1;
720 	in.datums = (Datum *) palloc(sizeof(Datum) * n);
721 	heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
722 	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724 	newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725 	leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726 
727 	STORE_STATE(state, xlrec.stateSrc);
728 
729 	/*
730 	 * Form list of leaf tuples which will be distributed as split result;
731 	 * also, count up the amount of space that will be freed from current.
732 	 * (Note that in the non-root case, we won't actually delete the old
733 	 * tuples, only replace them with redirects or placeholders.)
734 	 *
735 	 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
736 	 * page.  For a pass-by-value data type we will fetch a word that must
737 	 * exist even though it may contain garbage (because of the fact that leaf
738 	 * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
739 	 * we are just computing a pointer that isn't going to get dereferenced.
740 	 * So it's not worth guarding the calls with isNulls checks.
741 	 */
742 	nToInsert = 0;
743 	nToDelete = 0;
744 	spaceToDelete = 0;
745 	if (SpGistBlockIsRoot(current->blkno))
746 	{
747 		/*
748 		 * We are splitting the root (which up to now is also a leaf page).
749 		 * Its tuples are not linked, so scan sequentially to get them all. We
750 		 * ignore the original value of current->offnum.
751 		 */
752 		for (i = FirstOffsetNumber; i <= max; i++)
753 		{
754 			SpGistLeafTuple it;
755 
756 			it = (SpGistLeafTuple) PageGetItem(current->page,
757 											PageGetItemId(current->page, i));
758 			if (it->tupstate == SPGIST_LIVE)
759 			{
760 				in.datums[nToInsert] = SGLTDATUM(it, state);
761 				heapPtrs[nToInsert] = it->heapPtr;
762 				nToInsert++;
763 				toDelete[nToDelete] = i;
764 				nToDelete++;
765 				/* we will delete the tuple altogether, so count full space */
766 				spaceToDelete += it->size + sizeof(ItemIdData);
767 			}
768 			else	/* tuples on root should be live */
769 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
770 		}
771 	}
772 	else
773 	{
774 		/* Normal case, just collect the leaf tuples in the chain */
775 		i = current->offnum;
776 		while (i != InvalidOffsetNumber)
777 		{
778 			SpGistLeafTuple it;
779 
780 			Assert(i >= FirstOffsetNumber && i <= max);
781 			it = (SpGistLeafTuple) PageGetItem(current->page,
782 											PageGetItemId(current->page, i));
783 			if (it->tupstate == SPGIST_LIVE)
784 			{
785 				in.datums[nToInsert] = SGLTDATUM(it, state);
786 				heapPtrs[nToInsert] = it->heapPtr;
787 				nToInsert++;
788 				toDelete[nToDelete] = i;
789 				nToDelete++;
790 				/* we will not delete the tuple, only replace with dead */
791 				Assert(it->size >= SGDTSIZE);
792 				spaceToDelete += it->size - SGDTSIZE;
793 			}
794 			else if (it->tupstate == SPGIST_DEAD)
795 			{
796 				/* We could see a DEAD tuple as first/only chain item */
797 				Assert(i == current->offnum);
798 				Assert(it->nextOffset == InvalidOffsetNumber);
799 				toDelete[nToDelete] = i;
800 				nToDelete++;
801 				/* replacing it with redirect will save no space */
802 			}
803 			else
804 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
805 
806 			i = it->nextOffset;
807 		}
808 	}
809 	in.nTuples = nToInsert;
810 
811 	/*
812 	 * We may not actually insert new tuple because another picksplit may be
813 	 * necessary due to too large value, but we will try to allocate enough
814 	 * space to include it; and in any case it has to be included in the input
815 	 * for the picksplit function.  So don't increment nToInsert yet.
816 	 */
817 	in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
818 	heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
819 	in.nTuples++;
820 
821 	memset(&out, 0, sizeof(out));
822 
823 	if (!isNulls)
824 	{
825 		/*
826 		 * Perform split using user-defined method.
827 		 */
828 		procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829 		FunctionCall2Coll(procinfo,
830 						  index->rd_indcollation[0],
831 						  PointerGetDatum(&in),
832 						  PointerGetDatum(&out));
833 
834 		/*
835 		 * Form new leaf tuples and count up the total space needed.
836 		 */
837 		totalLeafSizes = 0;
838 		for (i = 0; i < in.nTuples; i++)
839 		{
840 			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
841 										   out.leafTupleDatums[i],
842 										   false);
843 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
844 		}
845 	}
846 	else
847 	{
848 		/*
849 		 * Perform dummy split that puts all tuples into one node.
850 		 * checkAllTheSame will override this and force allTheSame mode.
851 		 */
852 		out.hasPrefix = false;
853 		out.nNodes = 1;
854 		out.nodeLabels = NULL;
855 		out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
856 
857 		/*
858 		 * Form new leaf tuples and count up the total space needed.
859 		 */
860 		totalLeafSizes = 0;
861 		for (i = 0; i < in.nTuples; i++)
862 		{
863 			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
864 										   (Datum) 0,
865 										   true);
866 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
867 		}
868 	}
869 
870 	/*
871 	 * Check to see if the picksplit function failed to separate the values,
872 	 * ie, it put them all into the same child node.  If so, select allTheSame
873 	 * mode and create a random split instead.  See comments for
874 	 * checkAllTheSame as to why we need to know if the new leaf tuples could
875 	 * fit on one page.
876 	 */
877 	allTheSame = checkAllTheSame(&in, &out,
878 								 totalLeafSizes > SPGIST_PAGE_CAPACITY,
879 								 &includeNew);
880 
881 	/*
882 	 * If checkAllTheSame decided we must exclude the new tuple, don't
883 	 * consider it any further.
884 	 */
885 	if (includeNew)
886 		maxToInclude = in.nTuples;
887 	else
888 	{
889 		maxToInclude = in.nTuples - 1;
890 		totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
891 	}
892 
893 	/*
894 	 * Allocate per-node work arrays.  Since checkAllTheSame could replace
895 	 * out.nNodes with a value larger than the number of tuples on the input
896 	 * page, we can't allocate these arrays before here.
897 	 */
898 	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
899 	leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
900 
901 	/*
902 	 * Form nodes of inner tuple and inner tuple itself
903 	 */
904 	for (i = 0; i < out.nNodes; i++)
905 	{
906 		Datum		label = (Datum) 0;
907 		bool		labelisnull = (out.nodeLabels == NULL);
908 
909 		if (!labelisnull)
910 			label = out.nodeLabels[i];
911 		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
912 	}
913 	innerTuple = spgFormInnerTuple(state,
914 								   out.hasPrefix, out.prefixDatum,
915 								   out.nNodes, nodes);
916 	innerTuple->allTheSame = allTheSame;
917 
918 	/*
919 	 * Update nodes[] array to point into the newly formed innerTuple, so that
920 	 * we can adjust their downlinks below.
921 	 */
922 	SGITITERATE(innerTuple, i, node)
923 	{
924 		nodes[i] = node;
925 	}
926 
927 	/*
928 	 * Re-scan new leaf tuples and count up the space needed under each node.
929 	 */
930 	for (i = 0; i < maxToInclude; i++)
931 	{
932 		n = out.mapTuplesToNodes[i];
933 		if (n < 0 || n >= out.nNodes)
934 			elog(ERROR, "inconsistent result of SPGiST picksplit function");
935 		leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
936 	}
937 
938 	/*
939 	 * To perform the split, we must insert a new inner tuple, which can't go
940 	 * on a leaf page; and unless we are splitting the root page, we must then
941 	 * update the parent tuple's downlink to point to the inner tuple.  If
942 	 * there is room, we'll put the new inner tuple on the same page as the
943 	 * parent tuple, otherwise we need another non-leaf buffer. But if the
944 	 * parent page is the root, we can't add the new inner tuple there,
945 	 * because the root page must have only one inner tuple.
946 	 */
947 	xlrec.initInner = false;
948 	if (parent->buffer != InvalidBuffer &&
949 		!SpGistBlockIsRoot(parent->blkno) &&
950 		(SpGistPageGetFreeSpace(parent->page, 1) >=
951 		 innerTuple->size + sizeof(ItemIdData)))
952 	{
953 		/* New inner tuple will fit on parent page */
954 		newInnerBuffer = parent->buffer;
955 	}
956 	else if (parent->buffer != InvalidBuffer)
957 	{
958 		/* Send tuple to page with next triple parity (see README) */
959 		newInnerBuffer = SpGistGetBuffer(index,
960 									   GBUF_INNER_PARITY(parent->blkno + 1) |
961 										 (isNulls ? GBUF_NULLS : 0),
962 									   innerTuple->size + sizeof(ItemIdData),
963 										 &xlrec.initInner);
964 	}
965 	else
966 	{
967 		/* Root page split ... inner tuple will go to root page */
968 		newInnerBuffer = InvalidBuffer;
969 	}
970 
971 	/*
972 	 * The new leaf tuples converted from the existing ones should require the
973 	 * same or less space, and therefore should all fit onto one page
974 	 * (although that's not necessarily the current page, since we can't
975 	 * delete the old tuples but only replace them with placeholders).
976 	 * However, the incoming new tuple might not also fit, in which case we
977 	 * might need another picksplit cycle to reduce it some more.
978 	 *
979 	 * If there's not room to put everything back onto the current page, then
980 	 * we decide on a per-node basis which tuples go to the new page. (We do
981 	 * it like that because leaf tuple chains can't cross pages, so we must
982 	 * place all leaf tuples belonging to the same parent node on the same
983 	 * page.)
984 	 *
985 	 * If we are splitting the root page (turning it from a leaf page into an
986 	 * inner page), then no leaf tuples can go back to the current page; they
987 	 * must all go somewhere else.
988 	 */
989 	if (!SpGistBlockIsRoot(current->blkno))
990 		currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
991 	else
992 		currentFreeSpace = 0;	/* prevent assigning any tuples to current */
993 
994 	xlrec.initDest = false;
995 
996 	if (totalLeafSizes <= currentFreeSpace)
997 	{
998 		/* All the leaf tuples will fit on current page */
999 		newLeafBuffer = InvalidBuffer;
1000 		/* mark new leaf tuple as included in insertions, if allowed */
1001 		if (includeNew)
1002 		{
1003 			nToInsert++;
1004 			insertedNew = true;
1005 		}
1006 		for (i = 0; i < nToInsert; i++)
1007 			leafPageSelect[i] = 0;		/* signifies current page */
1008 	}
1009 	else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1010 	{
1011 		/*
1012 		 * We're trying to split up a long value by repeated suffixing, but
1013 		 * it's not going to fit yet.  Don't bother allocating a second leaf
1014 		 * buffer that we won't be able to use.
1015 		 */
1016 		newLeafBuffer = InvalidBuffer;
1017 		Assert(includeNew);
1018 		Assert(nToInsert == 0);
1019 	}
1020 	else
1021 	{
1022 		/* We will need another leaf page */
1023 		uint8	   *nodePageSelect;
1024 		int			curspace;
1025 		int			newspace;
1026 
1027 		newLeafBuffer = SpGistGetBuffer(index,
1028 									  GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1029 										Min(totalLeafSizes,
1030 											SPGIST_PAGE_CAPACITY),
1031 										&xlrec.initDest);
1032 
1033 		/*
1034 		 * Attempt to assign node groups to the two pages.  We might fail to
1035 		 * do so, even if totalLeafSizes is less than the available space,
1036 		 * because we can't split a group across pages.
1037 		 */
1038 		nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1039 
1040 		curspace = currentFreeSpace;
1041 		newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1042 		for (i = 0; i < out.nNodes; i++)
1043 		{
1044 			if (leafSizes[i] <= curspace)
1045 			{
1046 				nodePageSelect[i] = 0;	/* signifies current page */
1047 				curspace -= leafSizes[i];
1048 			}
1049 			else
1050 			{
1051 				nodePageSelect[i] = 1;	/* signifies new leaf page */
1052 				newspace -= leafSizes[i];
1053 			}
1054 		}
1055 		if (curspace >= 0 && newspace >= 0)
1056 		{
1057 			/* Successful assignment, so we can include the new leaf tuple */
1058 			if (includeNew)
1059 			{
1060 				nToInsert++;
1061 				insertedNew = true;
1062 			}
1063 		}
1064 		else if (includeNew)
1065 		{
1066 			/* We must exclude the new leaf tuple from the split */
1067 			int			nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1068 
1069 			leafSizes[nodeOfNewTuple] -=
1070 				newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1071 
1072 			/* Repeat the node assignment process --- should succeed now */
1073 			curspace = currentFreeSpace;
1074 			newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1075 			for (i = 0; i < out.nNodes; i++)
1076 			{
1077 				if (leafSizes[i] <= curspace)
1078 				{
1079 					nodePageSelect[i] = 0;		/* signifies current page */
1080 					curspace -= leafSizes[i];
1081 				}
1082 				else
1083 				{
1084 					nodePageSelect[i] = 1;		/* signifies new leaf page */
1085 					newspace -= leafSizes[i];
1086 				}
1087 			}
1088 			if (curspace < 0 || newspace < 0)
1089 				elog(ERROR, "failed to divide leaf tuple groups across pages");
1090 		}
1091 		else
1092 		{
1093 			/* oops, we already excluded new tuple ... should not get here */
1094 			elog(ERROR, "failed to divide leaf tuple groups across pages");
1095 		}
1096 		/* Expand the per-node assignments to be shown per leaf tuple */
1097 		for (i = 0; i < nToInsert; i++)
1098 		{
1099 			n = out.mapTuplesToNodes[i];
1100 			leafPageSelect[i] = nodePageSelect[n];
1101 		}
1102 	}
1103 
1104 	/* Start preparing WAL record */
1105 	xlrec.nDelete = 0;
1106 	xlrec.initSrc = isNew;
1107 	xlrec.storesNulls = isNulls;
1108 	xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1109 
1110 	leafdata = leafptr = (char *) palloc(totalLeafSizes);
1111 
1112 	/* Here we begin making the changes to the target pages */
1113 	START_CRIT_SECTION();
1114 
1115 	/*
1116 	 * Delete old leaf tuples from current buffer, except when we're splitting
1117 	 * the root; in that case there's no need because we'll re-init the page
1118 	 * below.  We do this first to make room for reinserting new leaf tuples.
1119 	 */
1120 	if (!SpGistBlockIsRoot(current->blkno))
1121 	{
1122 		/*
1123 		 * Init buffer instead of deleting individual tuples, but only if
1124 		 * there aren't any other live tuples and only during build; otherwise
1125 		 * we need to set a redirection tuple for concurrent scans.
1126 		 */
1127 		if (state->isBuild &&
1128 			nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1129 			PageGetMaxOffsetNumber(current->page))
1130 		{
1131 			SpGistInitBuffer(current->buffer,
1132 							 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1133 			xlrec.initSrc = true;
1134 		}
1135 		else if (isNew)
1136 		{
1137 			/* don't expose the freshly init'd buffer as a backup block */
1138 			Assert(nToDelete == 0);
1139 		}
1140 		else
1141 		{
1142 			xlrec.nDelete = nToDelete;
1143 
1144 			if (!state->isBuild)
1145 			{
1146 				/*
1147 				 * Need to create redirect tuple (it will point to new inner
1148 				 * tuple) but right now the new tuple's location is not known
1149 				 * yet.  So, set the redirection pointer to "impossible" value
1150 				 * and remember its position to update tuple later.
1151 				 */
1152 				if (nToDelete > 0)
1153 					redirectTuplePos = toDelete[0];
1154 				spgPageIndexMultiDelete(state, current->page,
1155 										toDelete, nToDelete,
1156 										SPGIST_REDIRECT,
1157 										SPGIST_PLACEHOLDER,
1158 										SPGIST_METAPAGE_BLKNO,
1159 										FirstOffsetNumber);
1160 			}
1161 			else
1162 			{
1163 				/*
1164 				 * During index build there is not concurrent searches, so we
1165 				 * don't need to create redirection tuple.
1166 				 */
1167 				spgPageIndexMultiDelete(state, current->page,
1168 										toDelete, nToDelete,
1169 										SPGIST_PLACEHOLDER,
1170 										SPGIST_PLACEHOLDER,
1171 										InvalidBlockNumber,
1172 										InvalidOffsetNumber);
1173 			}
1174 		}
1175 	}
1176 
1177 	/*
1178 	 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1179 	 * nodes.
1180 	 */
1181 	startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1182 	for (i = 0; i < nToInsert; i++)
1183 	{
1184 		SpGistLeafTuple it = newLeafs[i];
1185 		Buffer		leafBuffer;
1186 		BlockNumber leafBlock;
1187 		OffsetNumber newoffset;
1188 
1189 		/* Which page is it going to? */
1190 		leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1191 		leafBlock = BufferGetBlockNumber(leafBuffer);
1192 
1193 		/* Link tuple into correct chain for its node */
1194 		n = out.mapTuplesToNodes[i];
1195 
1196 		if (ItemPointerIsValid(&nodes[n]->t_tid))
1197 		{
1198 			Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1199 			it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1200 		}
1201 		else
1202 			it->nextOffset = InvalidOffsetNumber;
1203 
1204 		/* Insert it on page */
1205 		newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1206 										 (Item) it, it->size,
1207 										 &startOffsets[leafPageSelect[i]],
1208 										 false);
1209 		toInsert[i] = newoffset;
1210 
1211 		/* ... and complete the chain linking */
1212 		ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1213 
1214 		/* Also copy leaf tuple into WAL data */
1215 		memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1216 		leafptr += newLeafs[i]->size;
1217 	}
1218 
1219 	/*
1220 	 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1221 	 * current->buffer will be marked below, after we're entirely done
1222 	 * modifying it.
1223 	 */
1224 	if (newLeafBuffer != InvalidBuffer)
1225 	{
1226 		MarkBufferDirty(newLeafBuffer);
1227 	}
1228 
1229 	/* Remember current buffer, since we're about to change "current" */
1230 	saveCurrent = *current;
1231 
1232 	/*
1233 	 * Store the new innerTuple
1234 	 */
1235 	if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1236 	{
1237 		/*
1238 		 * new inner tuple goes to parent page
1239 		 */
1240 		Assert(current->buffer != parent->buffer);
1241 
1242 		/* Repoint "current" at the new inner tuple */
1243 		current->blkno = parent->blkno;
1244 		current->buffer = parent->buffer;
1245 		current->page = parent->page;
1246 		xlrec.offnumInner = current->offnum =
1247 			SpGistPageAddNewItem(state, current->page,
1248 								 (Item) innerTuple, innerTuple->size,
1249 								 NULL, false);
1250 
1251 		/*
1252 		 * Update parent node link and mark parent page dirty
1253 		 */
1254 		xlrec.innerIsParent = true;
1255 		xlrec.offnumParent = parent->offnum;
1256 		xlrec.nodeI = parent->node;
1257 		saveNodeLink(index, parent, current->blkno, current->offnum);
1258 
1259 		/*
1260 		 * Update redirection link (in old current buffer)
1261 		 */
1262 		if (redirectTuplePos != InvalidOffsetNumber)
1263 			setRedirectionTuple(&saveCurrent, redirectTuplePos,
1264 								current->blkno, current->offnum);
1265 
1266 		/* Done modifying old current buffer, mark it dirty */
1267 		MarkBufferDirty(saveCurrent.buffer);
1268 	}
1269 	else if (parent->buffer != InvalidBuffer)
1270 	{
1271 		/*
1272 		 * new inner tuple will be stored on a new page
1273 		 */
1274 		Assert(newInnerBuffer != InvalidBuffer);
1275 
1276 		/* Repoint "current" at the new inner tuple */
1277 		current->buffer = newInnerBuffer;
1278 		current->blkno = BufferGetBlockNumber(current->buffer);
1279 		current->page = BufferGetPage(current->buffer);
1280 		xlrec.offnumInner = current->offnum =
1281 			SpGistPageAddNewItem(state, current->page,
1282 								 (Item) innerTuple, innerTuple->size,
1283 								 NULL, false);
1284 
1285 		/* Done modifying new current buffer, mark it dirty */
1286 		MarkBufferDirty(current->buffer);
1287 
1288 		/*
1289 		 * Update parent node link and mark parent page dirty
1290 		 */
1291 		xlrec.innerIsParent = (parent->buffer == current->buffer);
1292 		xlrec.offnumParent = parent->offnum;
1293 		xlrec.nodeI = parent->node;
1294 		saveNodeLink(index, parent, current->blkno, current->offnum);
1295 
1296 		/*
1297 		 * Update redirection link (in old current buffer)
1298 		 */
1299 		if (redirectTuplePos != InvalidOffsetNumber)
1300 			setRedirectionTuple(&saveCurrent, redirectTuplePos,
1301 								current->blkno, current->offnum);
1302 
1303 		/* Done modifying old current buffer, mark it dirty */
1304 		MarkBufferDirty(saveCurrent.buffer);
1305 	}
1306 	else
1307 	{
1308 		/*
1309 		 * Splitting root page, which was a leaf but now becomes inner page
1310 		 * (and so "current" continues to point at it)
1311 		 */
1312 		Assert(SpGistBlockIsRoot(current->blkno));
1313 		Assert(redirectTuplePos == InvalidOffsetNumber);
1314 
1315 		SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1316 		xlrec.initInner = true;
1317 		xlrec.innerIsParent = false;
1318 
1319 		xlrec.offnumInner = current->offnum =
1320 			PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1321 						InvalidOffsetNumber, false, false);
1322 		if (current->offnum != FirstOffsetNumber)
1323 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1324 				 innerTuple->size);
1325 
1326 		/* No parent link to update, nor redirection to do */
1327 		xlrec.offnumParent = InvalidOffsetNumber;
1328 		xlrec.nodeI = 0;
1329 
1330 		/* Done modifying new current buffer, mark it dirty */
1331 		MarkBufferDirty(current->buffer);
1332 
1333 		/* saveCurrent doesn't represent a different buffer */
1334 		saveCurrent.buffer = InvalidBuffer;
1335 	}
1336 
1337 	if (RelationNeedsWAL(index))
1338 	{
1339 		XLogRecPtr	recptr;
1340 		int			flags;
1341 
1342 		XLogBeginInsert();
1343 
1344 		xlrec.nInsert = nToInsert;
1345 		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1346 
1347 		XLogRegisterData((char *) toDelete,
1348 						 sizeof(OffsetNumber) * xlrec.nDelete);
1349 		XLogRegisterData((char *) toInsert,
1350 						 sizeof(OffsetNumber) * xlrec.nInsert);
1351 		XLogRegisterData((char *) leafPageSelect,
1352 						 sizeof(uint8) * xlrec.nInsert);
1353 		XLogRegisterData((char *) innerTuple, innerTuple->size);
1354 		XLogRegisterData(leafdata, leafptr - leafdata);
1355 
1356 		/* Old leaf page */
1357 		if (BufferIsValid(saveCurrent.buffer))
1358 		{
1359 			flags = REGBUF_STANDARD;
1360 			if (xlrec.initSrc)
1361 				flags |= REGBUF_WILL_INIT;
1362 			XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1363 		}
1364 
1365 		/* New leaf page */
1366 		if (BufferIsValid(newLeafBuffer))
1367 		{
1368 			flags = REGBUF_STANDARD;
1369 			if (xlrec.initDest)
1370 				flags |= REGBUF_WILL_INIT;
1371 			XLogRegisterBuffer(1, newLeafBuffer, flags);
1372 		}
1373 
1374 		/* Inner page */
1375 		flags = REGBUF_STANDARD;
1376 		if (xlrec.initInner)
1377 			flags |= REGBUF_WILL_INIT;
1378 		XLogRegisterBuffer(2, current->buffer, flags);
1379 
1380 		/* Parent page, if different from inner page */
1381 		if (parent->buffer != InvalidBuffer)
1382 		{
1383 			if (parent->buffer != current->buffer)
1384 				XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1385 			else
1386 				Assert(xlrec.innerIsParent);
1387 		}
1388 
1389 		/* Issue the WAL record */
1390 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1391 
1392 		/* Update page LSNs on all affected pages */
1393 		if (newLeafBuffer != InvalidBuffer)
1394 		{
1395 			Page		page = BufferGetPage(newLeafBuffer);
1396 
1397 			PageSetLSN(page, recptr);
1398 		}
1399 
1400 		if (saveCurrent.buffer != InvalidBuffer)
1401 		{
1402 			Page		page = BufferGetPage(saveCurrent.buffer);
1403 
1404 			PageSetLSN(page, recptr);
1405 		}
1406 
1407 		PageSetLSN(current->page, recptr);
1408 
1409 		if (parent->buffer != InvalidBuffer)
1410 		{
1411 			PageSetLSN(parent->page, recptr);
1412 		}
1413 	}
1414 
1415 	END_CRIT_SECTION();
1416 
1417 	/* Update local free-space cache and unlock buffers */
1418 	if (newLeafBuffer != InvalidBuffer)
1419 	{
1420 		SpGistSetLastUsedPage(index, newLeafBuffer);
1421 		UnlockReleaseBuffer(newLeafBuffer);
1422 	}
1423 	if (saveCurrent.buffer != InvalidBuffer)
1424 	{
1425 		SpGistSetLastUsedPage(index, saveCurrent.buffer);
1426 		UnlockReleaseBuffer(saveCurrent.buffer);
1427 	}
1428 
1429 	return insertedNew;
1430 }
1431 
1432 /*
1433  * spgMatchNode action: descend to N'th child node of current inner tuple
1434  */
1435 static void
spgMatchNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN)1436 spgMatchNodeAction(Relation index, SpGistState *state,
1437 				   SpGistInnerTuple innerTuple,
1438 				   SPPageDesc *current, SPPageDesc *parent, int nodeN)
1439 {
1440 	int			i;
1441 	SpGistNodeTuple node;
1442 
1443 	/* Release previous parent buffer if any */
1444 	if (parent->buffer != InvalidBuffer &&
1445 		parent->buffer != current->buffer)
1446 	{
1447 		SpGistSetLastUsedPage(index, parent->buffer);
1448 		UnlockReleaseBuffer(parent->buffer);
1449 	}
1450 
1451 	/* Repoint parent to specified node of current inner tuple */
1452 	parent->blkno = current->blkno;
1453 	parent->buffer = current->buffer;
1454 	parent->page = current->page;
1455 	parent->offnum = current->offnum;
1456 	parent->node = nodeN;
1457 
1458 	/* Locate that node */
1459 	SGITITERATE(innerTuple, i, node)
1460 	{
1461 		if (i == nodeN)
1462 			break;
1463 	}
1464 
1465 	if (i != nodeN)
1466 		elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1467 			 nodeN);
1468 
1469 	/* Point current to the downlink location, if any */
1470 	if (ItemPointerIsValid(&node->t_tid))
1471 	{
1472 		current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1473 		current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1474 	}
1475 	else
1476 	{
1477 		/* Downlink is empty, so we'll need to find a new page */
1478 		current->blkno = InvalidBlockNumber;
1479 		current->offnum = InvalidOffsetNumber;
1480 	}
1481 
1482 	current->buffer = InvalidBuffer;
1483 	current->page = NULL;
1484 }
1485 
1486 /*
1487  * spgAddNode action: add a node to the inner tuple at current
1488  */
1489 static void
spgAddNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN,Datum nodeLabel)1490 spgAddNodeAction(Relation index, SpGistState *state,
1491 				 SpGistInnerTuple innerTuple,
1492 				 SPPageDesc *current, SPPageDesc *parent,
1493 				 int nodeN, Datum nodeLabel)
1494 {
1495 	SpGistInnerTuple newInnerTuple;
1496 	spgxlogAddNode xlrec;
1497 
1498 	/* Should not be applied to nulls */
1499 	Assert(!SpGistPageStoresNulls(current->page));
1500 
1501 	/* Construct new inner tuple with additional node */
1502 	newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1503 
1504 	/* Prepare WAL record */
1505 	STORE_STATE(state, xlrec.stateSrc);
1506 	xlrec.offnum = current->offnum;
1507 
1508 	/* we don't fill these unless we need to change the parent downlink */
1509 	xlrec.parentBlk = -1;
1510 	xlrec.offnumParent = InvalidOffsetNumber;
1511 	xlrec.nodeI = 0;
1512 
1513 	/* we don't fill these unless tuple has to be moved */
1514 	xlrec.offnumNew = InvalidOffsetNumber;
1515 	xlrec.newPage = false;
1516 
1517 	if (PageGetExactFreeSpace(current->page) >=
1518 		newInnerTuple->size - innerTuple->size)
1519 	{
1520 		/*
1521 		 * We can replace the inner tuple by new version in-place
1522 		 */
1523 		START_CRIT_SECTION();
1524 
1525 		PageIndexTupleDelete(current->page, current->offnum);
1526 		if (PageAddItem(current->page,
1527 						(Item) newInnerTuple, newInnerTuple->size,
1528 						current->offnum, false, false) != current->offnum)
1529 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1530 				 newInnerTuple->size);
1531 
1532 		MarkBufferDirty(current->buffer);
1533 
1534 		if (RelationNeedsWAL(index))
1535 		{
1536 			XLogRecPtr	recptr;
1537 
1538 			XLogBeginInsert();
1539 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1540 			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1541 
1542 			XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1543 
1544 			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1545 
1546 			PageSetLSN(current->page, recptr);
1547 		}
1548 
1549 		END_CRIT_SECTION();
1550 	}
1551 	else
1552 	{
1553 		/*
1554 		 * move inner tuple to another page, and update parent
1555 		 */
1556 		SpGistDeadTuple dt;
1557 		SPPageDesc	saveCurrent;
1558 
1559 		/*
1560 		 * It should not be possible to get here for the root page, since we
1561 		 * allow only one inner tuple on the root page, and spgFormInnerTuple
1562 		 * always checks that inner tuples don't exceed the size of a page.
1563 		 */
1564 		if (SpGistBlockIsRoot(current->blkno))
1565 			elog(ERROR, "cannot enlarge root tuple any more");
1566 		Assert(parent->buffer != InvalidBuffer);
1567 
1568 		saveCurrent = *current;
1569 
1570 		xlrec.offnumParent = parent->offnum;
1571 		xlrec.nodeI = parent->node;
1572 
1573 		/*
1574 		 * obtain new buffer with the same parity as current, since it will be
1575 		 * a child of same parent tuple
1576 		 */
1577 		current->buffer = SpGistGetBuffer(index,
1578 										  GBUF_INNER_PARITY(current->blkno),
1579 									newInnerTuple->size + sizeof(ItemIdData),
1580 										  &xlrec.newPage);
1581 		current->blkno = BufferGetBlockNumber(current->buffer);
1582 		current->page = BufferGetPage(current->buffer);
1583 
1584 		/*
1585 		 * Let's just make real sure new current isn't same as old.  Right now
1586 		 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1587 		 * delete placeholder tuples before checking space, maybe it wouldn't
1588 		 * be impossible.  The case would appear to work except that WAL
1589 		 * replay would be subtly wrong, so I think a mere assert isn't enough
1590 		 * here.
1591 		 */
1592 		if (current->blkno == saveCurrent.blkno)
1593 			elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1594 
1595 		/*
1596 		 * New current and parent buffer will both be modified; but note that
1597 		 * parent buffer could be same as either new or old current.
1598 		 */
1599 		if (parent->buffer == saveCurrent.buffer)
1600 			xlrec.parentBlk = 0;
1601 		else if (parent->buffer == current->buffer)
1602 			xlrec.parentBlk = 1;
1603 		else
1604 			xlrec.parentBlk = 2;
1605 
1606 		START_CRIT_SECTION();
1607 
1608 		/* insert new ... */
1609 		xlrec.offnumNew = current->offnum =
1610 			SpGistPageAddNewItem(state, current->page,
1611 								 (Item) newInnerTuple, newInnerTuple->size,
1612 								 NULL, false);
1613 
1614 		MarkBufferDirty(current->buffer);
1615 
1616 		/* update parent's downlink and mark parent page dirty */
1617 		saveNodeLink(index, parent, current->blkno, current->offnum);
1618 
1619 		/*
1620 		 * Replace old tuple with a placeholder or redirection tuple.  Unless
1621 		 * doing an index build, we have to insert a redirection tuple for
1622 		 * possible concurrent scans.  We can't just delete it in any case,
1623 		 * because that could change the offsets of other tuples on the page,
1624 		 * breaking downlinks from their parents.
1625 		 */
1626 		if (state->isBuild)
1627 			dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1628 								  InvalidBlockNumber, InvalidOffsetNumber);
1629 		else
1630 			dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1631 								  current->blkno, current->offnum);
1632 
1633 		PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1634 		if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1635 						saveCurrent.offnum,
1636 						false, false) != saveCurrent.offnum)
1637 			elog(ERROR, "failed to add item of size %u to SPGiST index page",
1638 				 dt->size);
1639 
1640 		if (state->isBuild)
1641 			SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1642 		else
1643 			SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1644 
1645 		MarkBufferDirty(saveCurrent.buffer);
1646 
1647 		if (RelationNeedsWAL(index))
1648 		{
1649 			XLogRecPtr	recptr;
1650 			int			flags;
1651 
1652 			XLogBeginInsert();
1653 
1654 			/* orig page */
1655 			XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1656 			/* new page */
1657 			flags = REGBUF_STANDARD;
1658 			if (xlrec.newPage)
1659 				flags |= REGBUF_WILL_INIT;
1660 			XLogRegisterBuffer(1, current->buffer, flags);
1661 			/* parent page (if different from orig and new) */
1662 			if (xlrec.parentBlk == 2)
1663 				XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1664 
1665 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1666 			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1667 
1668 			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1669 
1670 			/* we don't bother to check if any of these are redundant */
1671 			PageSetLSN(current->page, recptr);
1672 			PageSetLSN(parent->page, recptr);
1673 			PageSetLSN(saveCurrent.page, recptr);
1674 		}
1675 
1676 		END_CRIT_SECTION();
1677 
1678 		/* Release saveCurrent if it's not same as current or parent */
1679 		if (saveCurrent.buffer != current->buffer &&
1680 			saveCurrent.buffer != parent->buffer)
1681 		{
1682 			SpGistSetLastUsedPage(index, saveCurrent.buffer);
1683 			UnlockReleaseBuffer(saveCurrent.buffer);
1684 		}
1685 	}
1686 }
1687 
1688 /*
1689  * spgSplitNode action: split inner tuple at current into prefix and postfix
1690  */
1691 static void
spgSplitNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,spgChooseOut * out)1692 spgSplitNodeAction(Relation index, SpGistState *state,
1693 				   SpGistInnerTuple innerTuple,
1694 				   SPPageDesc *current, spgChooseOut *out)
1695 {
1696 	SpGistInnerTuple prefixTuple,
1697 				postfixTuple;
1698 	SpGistNodeTuple node,
1699 			   *nodes;
1700 	BlockNumber postfixBlkno;
1701 	OffsetNumber postfixOffset;
1702 	int			i;
1703 	spgxlogSplitTuple xlrec;
1704 	Buffer		newBuffer = InvalidBuffer;
1705 
1706 	/* Should not be applied to nulls */
1707 	Assert(!SpGistPageStoresNulls(current->page));
1708 
1709 	/*
1710 	 * Construct new prefix tuple, containing a single node with the specified
1711 	 * label.  (We'll update the node's downlink to point to the new postfix
1712 	 * tuple, below.)
1713 	 */
1714 	node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
1715 
1716 	prefixTuple = spgFormInnerTuple(state,
1717 									out->result.splitTuple.prefixHasPrefix,
1718 									out->result.splitTuple.prefixPrefixDatum,
1719 									1, &node);
1720 
1721 	/* it must fit in the space that innerTuple now occupies */
1722 	if (prefixTuple->size > innerTuple->size)
1723 		elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1724 
1725 	/*
1726 	 * Construct new postfix tuple, containing all nodes of innerTuple with
1727 	 * same node datums, but with the prefix specified by the picksplit
1728 	 * function.
1729 	 */
1730 	nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1731 	SGITITERATE(innerTuple, i, node)
1732 	{
1733 		nodes[i] = node;
1734 	}
1735 
1736 	postfixTuple = spgFormInnerTuple(state,
1737 									 out->result.splitTuple.postfixHasPrefix,
1738 								   out->result.splitTuple.postfixPrefixDatum,
1739 									 innerTuple->nNodes, nodes);
1740 
1741 	/* Postfix tuple is allTheSame if original tuple was */
1742 	postfixTuple->allTheSame = innerTuple->allTheSame;
1743 
1744 	/* prep data for WAL record */
1745 	xlrec.newPage = false;
1746 
1747 	/*
1748 	 * If we can't fit both tuples on the current page, get a new page for the
1749 	 * postfix tuple.  In particular, can't split to the root page.
1750 	 *
1751 	 * For the space calculation, note that prefixTuple replaces innerTuple
1752 	 * but postfixTuple will be a new entry.
1753 	 */
1754 	if (SpGistBlockIsRoot(current->blkno) ||
1755 		SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1756 		prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1757 	{
1758 		/*
1759 		 * Choose page with next triple parity, because postfix tuple is a
1760 		 * child of prefix one
1761 		 */
1762 		newBuffer = SpGistGetBuffer(index,
1763 									GBUF_INNER_PARITY(current->blkno + 1),
1764 									postfixTuple->size + sizeof(ItemIdData),
1765 									&xlrec.newPage);
1766 	}
1767 
1768 	START_CRIT_SECTION();
1769 
1770 	/*
1771 	 * Replace old tuple by prefix tuple
1772 	 */
1773 	PageIndexTupleDelete(current->page, current->offnum);
1774 	xlrec.offnumPrefix = PageAddItem(current->page,
1775 									 (Item) prefixTuple, prefixTuple->size,
1776 									 current->offnum, false, false);
1777 	if (xlrec.offnumPrefix != current->offnum)
1778 		elog(ERROR, "failed to add item of size %u to SPGiST index page",
1779 			 prefixTuple->size);
1780 
1781 	/*
1782 	 * put postfix tuple into appropriate page
1783 	 */
1784 	if (newBuffer == InvalidBuffer)
1785 	{
1786 		postfixBlkno = current->blkno;
1787 		xlrec.offnumPostfix = postfixOffset =
1788 			SpGistPageAddNewItem(state, current->page,
1789 								 (Item) postfixTuple, postfixTuple->size,
1790 								 NULL, false);
1791 		xlrec.postfixBlkSame = true;
1792 	}
1793 	else
1794 	{
1795 		postfixBlkno = BufferGetBlockNumber(newBuffer);
1796 		xlrec.offnumPostfix = postfixOffset =
1797 			SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1798 								 (Item) postfixTuple, postfixTuple->size,
1799 								 NULL, false);
1800 		MarkBufferDirty(newBuffer);
1801 		xlrec.postfixBlkSame = false;
1802 	}
1803 
1804 	/*
1805 	 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1806 	 * (We can't avoid this step by doing the above two steps in opposite
1807 	 * order, because there might not be enough space on the page to insert
1808 	 * the postfix tuple first.)  We have to update the local copy of the
1809 	 * prefixTuple too, because that's what will be written to WAL.
1810 	 */
1811 	spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1812 	prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1813 							  PageGetItemId(current->page, current->offnum));
1814 	spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1815 
1816 	MarkBufferDirty(current->buffer);
1817 
1818 	if (RelationNeedsWAL(index))
1819 	{
1820 		XLogRecPtr	recptr;
1821 
1822 		XLogBeginInsert();
1823 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1824 		XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1825 		XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1826 
1827 		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1828 		if (newBuffer != InvalidBuffer)
1829 		{
1830 			int			flags;
1831 
1832 			flags = REGBUF_STANDARD;
1833 			if (xlrec.newPage)
1834 				flags |= REGBUF_WILL_INIT;
1835 			XLogRegisterBuffer(1, newBuffer, flags);
1836 		}
1837 
1838 		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1839 
1840 		PageSetLSN(current->page, recptr);
1841 
1842 		if (newBuffer != InvalidBuffer)
1843 		{
1844 			PageSetLSN(BufferGetPage(newBuffer), recptr);
1845 		}
1846 	}
1847 
1848 	END_CRIT_SECTION();
1849 
1850 	/* Update local free-space cache and release buffer */
1851 	if (newBuffer != InvalidBuffer)
1852 	{
1853 		SpGistSetLastUsedPage(index, newBuffer);
1854 		UnlockReleaseBuffer(newBuffer);
1855 	}
1856 }
1857 
1858 /*
1859  * Insert one item into the index.
1860  *
1861  * Returns true on success, false if we failed to complete the insertion
1862  * (typically because of conflict with a concurrent insert).  In the latter
1863  * case, caller should re-call spgdoinsert() with the same args.
1864  */
1865 bool
spgdoinsert(Relation index,SpGistState * state,ItemPointer heapPtr,Datum datum,bool isnull)1866 spgdoinsert(Relation index, SpGistState *state,
1867 			ItemPointer heapPtr, Datum datum, bool isnull)
1868 {
1869 	bool		result = true;
1870 	int			level = 0;
1871 	Datum		leafDatum;
1872 	int			leafSize;
1873 	int			bestLeafSize;
1874 	int			numNoProgressCycles = 0;
1875 	SPPageDesc	current,
1876 				parent;
1877 	FmgrInfo   *procinfo = NULL;
1878 
1879 	/*
1880 	 * Look up FmgrInfo of the user-defined choose function once, to save
1881 	 * cycles in the loop below.
1882 	 */
1883 	if (!isnull)
1884 		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1885 
1886 	/*
1887 	 * Since we don't use index_form_tuple in this AM, we have to make sure
1888 	 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1889 	 * that.
1890 	 */
1891 	if (!isnull && state->attType.attlen == -1)
1892 		datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1893 
1894 	leafDatum = datum;
1895 
1896 	/*
1897 	 * Compute space needed for a leaf tuple containing the given datum.
1898 	 *
1899 	 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1900 	 * suffixing, bail out now rather than doing a lot of useless work.
1901 	 */
1902 	if (!isnull)
1903 		leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1904 			SpGistGetTypeSize(&state->attType, leafDatum);
1905 	else
1906 		leafSize = SGDTSIZE + sizeof(ItemIdData);
1907 
1908 	if (leafSize > SPGIST_PAGE_CAPACITY &&
1909 		(isnull || !state->config.longValuesOK))
1910 		ereport(ERROR,
1911 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1912 			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1913 				   leafSize - sizeof(ItemIdData),
1914 				   SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1915 				   RelationGetRelationName(index)),
1916 			errhint("Values larger than a buffer page cannot be indexed.")));
1917 	bestLeafSize = leafSize;
1918 
1919 	/* Initialize "current" to the appropriate root page */
1920 	current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1921 	current.buffer = InvalidBuffer;
1922 	current.page = NULL;
1923 	current.offnum = FirstOffsetNumber;
1924 	current.node = -1;
1925 
1926 	/* "parent" is invalid for the moment */
1927 	parent.blkno = InvalidBlockNumber;
1928 	parent.buffer = InvalidBuffer;
1929 	parent.page = NULL;
1930 	parent.offnum = InvalidOffsetNumber;
1931 	parent.node = -1;
1932 
1933 	/*
1934 	 * Before entering the loop, try to clear any pending interrupt condition.
1935 	 * If a query cancel is pending, we might as well accept it now not later;
1936 	 * while if a non-canceling condition is pending, servicing it here avoids
1937 	 * having to restart the insertion and redo all the work so far.
1938 	 */
1939 	CHECK_FOR_INTERRUPTS();
1940 
1941 	for (;;)
1942 	{
1943 		bool		isNew = false;
1944 
1945 		/*
1946 		 * Bail out if query cancel is pending.  We must have this somewhere
1947 		 * in the loop since a broken opclass could produce an infinite
1948 		 * picksplit loop.  However, because we'll be holding buffer lock(s)
1949 		 * after the first iteration, ProcessInterrupts() wouldn't be able to
1950 		 * throw a cancel error here.  Hence, if we see that an interrupt is
1951 		 * pending, break out of the loop and deal with the situation below.
1952 		 * Set result = false because we must restart the insertion if the
1953 		 * interrupt isn't a query-cancel-or-die case.
1954 		 */
1955 		if (INTERRUPTS_PENDING_CONDITION())
1956 		{
1957 			result = false;
1958 			break;
1959 		}
1960 
1961 		if (current.blkno == InvalidBlockNumber)
1962 		{
1963 			/*
1964 			 * Create a leaf page.  If leafSize is too large to fit on a page,
1965 			 * we won't actually use the page yet, but it simplifies the API
1966 			 * for doPickSplit to always have a leaf page at hand; so just
1967 			 * quietly limit our request to a page size.
1968 			 */
1969 			current.buffer =
1970 				SpGistGetBuffer(index,
1971 								GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1972 								Min(leafSize, SPGIST_PAGE_CAPACITY),
1973 								&isNew);
1974 			current.blkno = BufferGetBlockNumber(current.buffer);
1975 		}
1976 		else if (parent.buffer == InvalidBuffer)
1977 		{
1978 			/* we hold no parent-page lock, so no deadlock is possible */
1979 			current.buffer = ReadBuffer(index, current.blkno);
1980 			LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1981 		}
1982 		else if (current.blkno != parent.blkno)
1983 		{
1984 			/* descend to a new child page */
1985 			current.buffer = ReadBuffer(index, current.blkno);
1986 
1987 			/*
1988 			 * Attempt to acquire lock on child page.  We must beware of
1989 			 * deadlock against another insertion process descending from that
1990 			 * page to our parent page (see README).  If we fail to get lock,
1991 			 * abandon the insertion and tell our caller to start over.
1992 			 *
1993 			 * XXX this could be improved, because failing to get lock on a
1994 			 * buffer is not proof of a deadlock situation; the lock might be
1995 			 * held by a reader, or even just background writer/checkpointer
1996 			 * process.  Perhaps it'd be worth retrying after sleeping a bit?
1997 			 */
1998 			if (!ConditionalLockBuffer(current.buffer))
1999 			{
2000 				ReleaseBuffer(current.buffer);
2001 				UnlockReleaseBuffer(parent.buffer);
2002 				return false;
2003 			}
2004 		}
2005 		else
2006 		{
2007 			/* inner tuple can be stored on the same page as parent one */
2008 			current.buffer = parent.buffer;
2009 		}
2010 		current.page = BufferGetPage(current.buffer);
2011 
2012 		/* should not arrive at a page of the wrong type */
2013 		if (isnull ? !SpGistPageStoresNulls(current.page) :
2014 			SpGistPageStoresNulls(current.page))
2015 			elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2016 				 current.blkno);
2017 
2018 		if (SpGistPageIsLeaf(current.page))
2019 		{
2020 			SpGistLeafTuple leafTuple;
2021 			int			nToSplit,
2022 						sizeToSplit;
2023 
2024 			leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2025 			if (leafTuple->size + sizeof(ItemIdData) <=
2026 				SpGistPageGetFreeSpace(current.page, 1))
2027 			{
2028 				/* it fits on page, so insert it and we're done */
2029 				addLeafTuple(index, state, leafTuple,
2030 							 &current, &parent, isnull, isNew);
2031 				break;
2032 			}
2033 			else if ((sizeToSplit =
2034 					  checkSplitConditions(index, state, &current,
2035 									&nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2036 					 nToSplit < 64 &&
2037 					 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2038 			{
2039 				/*
2040 				 * the amount of data is pretty small, so just move the whole
2041 				 * chain to another leaf page rather than splitting it.
2042 				 */
2043 				Assert(!isNew);
2044 				moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2045 				break;			/* we're done */
2046 			}
2047 			else
2048 			{
2049 				/* picksplit */
2050 				if (doPickSplit(index, state, &current, &parent,
2051 								leafTuple, level, isnull, isNew))
2052 					break;		/* doPickSplit installed new tuples */
2053 
2054 				/* leaf tuple will not be inserted yet */
2055 				pfree(leafTuple);
2056 
2057 				/*
2058 				 * current now describes new inner tuple, go insert into it
2059 				 */
2060 				Assert(!SpGistPageIsLeaf(current.page));
2061 				goto process_inner_tuple;
2062 			}
2063 		}
2064 		else	/* non-leaf page */
2065 		{
2066 			/*
2067 			 * Apply the opclass choose function to figure out how to insert
2068 			 * the given datum into the current inner tuple.
2069 			 */
2070 			SpGistInnerTuple innerTuple;
2071 			spgChooseIn in;
2072 			spgChooseOut out;
2073 
2074 			/*
2075 			 * spgAddNode and spgSplitTuple cases will loop back to here to
2076 			 * complete the insertion operation.  Just in case the choose
2077 			 * function is broken and produces add or split requests
2078 			 * repeatedly, check for query cancel (see comments above).
2079 			 */
2080 	process_inner_tuple:
2081 			if (INTERRUPTS_PENDING_CONDITION())
2082 			{
2083 				result = false;
2084 				break;
2085 			}
2086 
2087 			innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2088 								PageGetItemId(current.page, current.offnum));
2089 
2090 			in.datum = datum;
2091 			in.leafDatum = leafDatum;
2092 			in.level = level;
2093 			in.allTheSame = innerTuple->allTheSame;
2094 			in.hasPrefix = (innerTuple->prefixSize > 0);
2095 			in.prefixDatum = SGITDATUM(innerTuple, state);
2096 			in.nNodes = innerTuple->nNodes;
2097 			in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2098 
2099 			memset(&out, 0, sizeof(out));
2100 
2101 			if (!isnull)
2102 			{
2103 				/* use user-defined choose method */
2104 				FunctionCall2Coll(procinfo,
2105 								  index->rd_indcollation[0],
2106 								  PointerGetDatum(&in),
2107 								  PointerGetDatum(&out));
2108 			}
2109 			else
2110 			{
2111 				/* force "match" action (to insert to random subnode) */
2112 				out.resultType = spgMatchNode;
2113 			}
2114 
2115 			if (innerTuple->allTheSame)
2116 			{
2117 				/*
2118 				 * It's not allowed to do an AddNode at an allTheSame tuple.
2119 				 * Opclass must say "match", in which case we choose a random
2120 				 * one of the nodes to descend into, or "split".
2121 				 */
2122 				if (out.resultType == spgAddNode)
2123 					elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2124 				else if (out.resultType == spgMatchNode)
2125 					out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2126 			}
2127 
2128 			switch (out.resultType)
2129 			{
2130 				case spgMatchNode:
2131 					/* Descend to N'th child node */
2132 					spgMatchNodeAction(index, state, innerTuple,
2133 									   &current, &parent,
2134 									   out.result.matchNode.nodeN);
2135 					/* Adjust level as per opclass request */
2136 					level += out.result.matchNode.levelAdd;
2137 					/* Replace leafDatum and recompute leafSize */
2138 					if (!isnull)
2139 					{
2140 						leafDatum = out.result.matchNode.restDatum;
2141 						leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2142 							SpGistGetTypeSize(&state->attType, leafDatum);
2143 					}
2144 
2145 					/*
2146 					 * Check new tuple size; fail if it can't fit, unless the
2147 					 * opclass says it can handle the situation by suffixing.
2148 					 *
2149 					 * A buggy opclass might not ever make the leaf datum
2150 					 * small enough, causing an infinite loop.  To detect such
2151 					 * a loop, check to see if we are making progress by
2152 					 * reducing the leafSize in each pass.  This is a bit
2153 					 * tricky though.  Because of alignment considerations,
2154 					 * the total tuple size might not decrease on every pass.
2155 					 * Also, there are edge cases where the choose method
2156 					 * might seem to not make progress for a cycle or two.
2157 					 * Somewhat arbitrarily, we allow up to 10 no-progress
2158 					 * iterations before failing.  (This limit should be more
2159 					 * than MAXALIGN, to accommodate opclasses that trim one
2160 					 * byte from the leaf datum per pass.)
2161 					 */
2162 					if (leafSize > SPGIST_PAGE_CAPACITY)
2163 					{
2164 						bool		ok = false;
2165 
2166 						if (state->config.longValuesOK && !isnull)
2167 						{
2168 							if (leafSize < bestLeafSize)
2169 							{
2170 								ok = true;
2171 								bestLeafSize = leafSize;
2172 								numNoProgressCycles = 0;
2173 							}
2174 							else if (++numNoProgressCycles < 10)
2175 								ok = true;
2176 						}
2177 						if (!ok)
2178 							ereport(ERROR,
2179 									(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2180 									 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2181 											leafSize - sizeof(ItemIdData),
2182 											SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2183 											RelationGetRelationName(index)),
2184 									 errhint("Values larger than a buffer page cannot be indexed.")));
2185 					}
2186 
2187 					/*
2188 					 * Loop around and attempt to insert the new leafDatum at
2189 					 * "current" (which might reference an existing child
2190 					 * tuple, or might be invalid to force us to find a new
2191 					 * page for the tuple).
2192 					 */
2193 					break;
2194 				case spgAddNode:
2195 					/* AddNode is not sensible if nodes don't have labels */
2196 					if (in.nodeLabels == NULL)
2197 						elog(ERROR, "cannot add a node to an inner tuple without node labels");
2198 					/* Add node to inner tuple, per request */
2199 					spgAddNodeAction(index, state, innerTuple,
2200 									 &current, &parent,
2201 									 out.result.addNode.nodeN,
2202 									 out.result.addNode.nodeLabel);
2203 
2204 					/*
2205 					 * Retry insertion into the enlarged node.  We assume that
2206 					 * we'll get a MatchNode result this time.
2207 					 */
2208 					goto process_inner_tuple;
2209 					break;
2210 				case spgSplitTuple:
2211 					/* Split inner tuple, per request */
2212 					spgSplitNodeAction(index, state, innerTuple,
2213 									   &current, &out);
2214 
2215 					/* Retry insertion into the split node */
2216 					goto process_inner_tuple;
2217 					break;
2218 				default:
2219 					elog(ERROR, "unrecognized SPGiST choose result: %d",
2220 						 (int) out.resultType);
2221 					break;
2222 			}
2223 		}
2224 	}							/* end loop */
2225 
2226 	/*
2227 	 * Release any buffers we're still holding.  Beware of possibility that
2228 	 * current and parent reference same buffer.
2229 	 */
2230 	if (current.buffer != InvalidBuffer)
2231 	{
2232 		SpGistSetLastUsedPage(index, current.buffer);
2233 		UnlockReleaseBuffer(current.buffer);
2234 	}
2235 	if (parent.buffer != InvalidBuffer &&
2236 		parent.buffer != current.buffer)
2237 	{
2238 		SpGistSetLastUsedPage(index, parent.buffer);
2239 		UnlockReleaseBuffer(parent.buffer);
2240 	}
2241 
2242 	/*
2243 	 * We do not support being called while some outer function is holding a
2244 	 * buffer lock (or any other reason to postpone query cancels).  If that
2245 	 * were the case, telling the caller to retry would create an infinite
2246 	 * loop.
2247 	 */
2248 	Assert(INTERRUPTS_CAN_BE_PROCESSED());
2249 
2250 	/*
2251 	 * Finally, check for interrupts again.  If there was a query cancel,
2252 	 * ProcessInterrupts() will be able to throw the error here.  If it was
2253 	 * some other kind of interrupt that can just be cleared, return false to
2254 	 * tell our caller to retry.
2255 	 */
2256 	CHECK_FOR_INTERRUPTS();
2257 
2258 	return result;
2259 }
2260