1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *	  interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "storage/lmgr.h"
22 #include "storage/predicate.h"
23 #include "nodes/execnodes.h"
24 #include "utils/builtins.h"
25 #include "utils/index_selfuncs.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28 
29 
30 /* non-export function prototypes */
31 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 							GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 							 GISTSTATE *giststate,
36 							 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 							 Buffer leftchild, Buffer rightchild,
38 							 bool unlockbuf, bool unlockleftchild);
39 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 							GISTSTATE *giststate, List *splitinfo, bool releasebuf);
41 static void gistprunepage(Relation rel, Page page, Buffer buffer,
42 						  Relation heapRel);
43 
44 
45 #define ROTATEDIST(d) do { \
46 	SplitedPageLayout *tmp=(SplitedPageLayout*)palloc0(sizeof(SplitedPageLayout)); \
47 	tmp->block.blkno = InvalidBlockNumber;	\
48 	tmp->buffer = InvalidBuffer;	\
49 	tmp->next = (d); \
50 	(d)=tmp; \
51 } while(0)
52 
53 
54 /*
55  * GiST handler function: return IndexAmRoutine with access method parameters
56  * and callbacks.
57  */
58 Datum
gisthandler(PG_FUNCTION_ARGS)59 gisthandler(PG_FUNCTION_ARGS)
60 {
61 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
62 
63 	amroutine->amstrategies = 0;
64 	amroutine->amsupport = GISTNProcs;
65 	amroutine->amcanorder = false;
66 	amroutine->amcanorderbyop = true;
67 	amroutine->amcanbackward = false;
68 	amroutine->amcanunique = false;
69 	amroutine->amcanmulticol = true;
70 	amroutine->amoptionalkey = true;
71 	amroutine->amsearcharray = false;
72 	amroutine->amsearchnulls = true;
73 	amroutine->amstorage = true;
74 	amroutine->amclusterable = true;
75 	amroutine->ampredlocks = true;
76 	amroutine->amcanparallel = false;
77 	amroutine->amcaninclude = true;
78 	amroutine->amkeytype = InvalidOid;
79 
80 	amroutine->ambuild = gistbuild;
81 	amroutine->ambuildempty = gistbuildempty;
82 	amroutine->aminsert = gistinsert;
83 	amroutine->ambulkdelete = gistbulkdelete;
84 	amroutine->amvacuumcleanup = gistvacuumcleanup;
85 	amroutine->amcanreturn = gistcanreturn;
86 	amroutine->amcostestimate = gistcostestimate;
87 	amroutine->amoptions = gistoptions;
88 	amroutine->amproperty = gistproperty;
89 	amroutine->ambuildphasename = NULL;
90 	amroutine->amvalidate = gistvalidate;
91 	amroutine->ambeginscan = gistbeginscan;
92 	amroutine->amrescan = gistrescan;
93 	amroutine->amgettuple = gistgettuple;
94 	amroutine->amgetbitmap = gistgetbitmap;
95 	amroutine->amendscan = gistendscan;
96 	amroutine->ammarkpos = NULL;
97 	amroutine->amrestrpos = NULL;
98 	amroutine->amestimateparallelscan = NULL;
99 	amroutine->aminitparallelscan = NULL;
100 	amroutine->amparallelrescan = NULL;
101 
102 	PG_RETURN_POINTER(amroutine);
103 }
104 
105 /*
106  * Create and return a temporary memory context for use by GiST. We
107  * _always_ invoke user-provided methods in a temporary memory
108  * context, so that memory leaks in those functions cannot cause
109  * problems. Also, we use some additional temporary contexts in the
110  * GiST code itself, to avoid the need to do some awkward manual
111  * memory management.
112  */
113 MemoryContext
createTempGistContext(void)114 createTempGistContext(void)
115 {
116 	return AllocSetContextCreate(CurrentMemoryContext,
117 								 "GiST temporary context",
118 								 ALLOCSET_DEFAULT_SIZES);
119 }
120 
121 /*
122  *	gistbuildempty() -- build an empty gist index in the initialization fork
123  */
124 void
gistbuildempty(Relation index)125 gistbuildempty(Relation index)
126 {
127 	Buffer		buffer;
128 
129 	/* Initialize the root page */
130 	buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
131 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
132 
133 	/* Initialize and xlog buffer */
134 	START_CRIT_SECTION();
135 	GISTInitBuffer(buffer, F_LEAF);
136 	MarkBufferDirty(buffer);
137 	log_newpage_buffer(buffer, true);
138 	END_CRIT_SECTION();
139 
140 	/* Unlock and release the buffer */
141 	UnlockReleaseBuffer(buffer);
142 }
143 
144 /*
145  *	gistinsert -- wrapper for GiST tuple insertion.
146  *
147  *	  This is the public interface routine for tuple insertion in GiSTs.
148  *	  It doesn't do any work; just locks the relation and passes the buck.
149  */
150 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)151 gistinsert(Relation r, Datum *values, bool *isnull,
152 		   ItemPointer ht_ctid, Relation heapRel,
153 		   IndexUniqueCheck checkUnique,
154 		   IndexInfo *indexInfo)
155 {
156 	GISTSTATE  *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
157 	IndexTuple	itup;
158 	MemoryContext oldCxt;
159 
160 	/* Initialize GISTSTATE cache if first call in this statement */
161 	if (giststate == NULL)
162 	{
163 		oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
164 		giststate = initGISTstate(r);
165 		giststate->tempCxt = createTempGistContext();
166 		indexInfo->ii_AmCache = (void *) giststate;
167 		MemoryContextSwitchTo(oldCxt);
168 	}
169 
170 	oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
171 
172 	itup = gistFormTuple(giststate, r,
173 						 values, isnull, true /* size is currently bogus */ );
174 	itup->t_tid = *ht_ctid;
175 
176 	gistdoinsert(r, itup, 0, giststate, heapRel, false);
177 
178 	/* cleanup */
179 	MemoryContextSwitchTo(oldCxt);
180 	MemoryContextReset(giststate->tempCxt);
181 
182 	return false;
183 }
184 
185 
186 /*
187  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
188  * at that offset is atomically removed along with inserting the new tuples.
189  * This is used to replace a tuple with a new one.
190  *
191  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
192  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
193  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
194  *
195  * If 'markfollowright' is true and the page is split, the left child is
196  * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
197  * index build, however, there is no concurrent access and the page splitting
198  * is done in a slightly simpler fashion, and false is passed.
199  *
200  * If there is not enough room on the page, it is split. All the split
201  * pages are kept pinned and locked and returned in *splitinfo, the caller
202  * is responsible for inserting the downlinks for them. However, if
203  * 'buffer' is the root page and it needs to be split, gistplacetopage()
204  * performs the split as one atomic operation, and *splitinfo is set to NIL.
205  * In that case, we continue to hold the root page locked, and the child
206  * pages are released; note that new tuple(s) are *not* on the root page
207  * but in one of the new child pages.
208  *
209  * If 'newblkno' is not NULL, returns the block number of page the first
210  * new/updated tuple was inserted to. Usually it's the given page, but could
211  * be its right sibling if the page was split.
212  *
213  * Returns 'true' if the page was split, 'false' otherwise.
214  */
215 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel,bool is_build)216 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
217 				Buffer buffer,
218 				IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
219 				BlockNumber *newblkno,
220 				Buffer leftchildbuf,
221 				List **splitinfo,
222 				bool markfollowright,
223 				Relation heapRel,
224 				bool is_build)
225 {
226 	BlockNumber blkno = BufferGetBlockNumber(buffer);
227 	Page		page = BufferGetPage(buffer);
228 	bool		is_leaf = (GistPageIsLeaf(page)) ? true : false;
229 	XLogRecPtr	recptr;
230 	int			i;
231 	bool		is_split;
232 
233 	/*
234 	 * Refuse to modify a page that's incompletely split. This should not
235 	 * happen because we finish any incomplete splits while we walk down the
236 	 * tree. However, it's remotely possible that another concurrent inserter
237 	 * splits a parent page, and errors out before completing the split. We
238 	 * will just throw an error in that case, and leave any split we had in
239 	 * progress unfinished too. The next insert that comes along will clean up
240 	 * the mess.
241 	 */
242 	if (GistFollowRight(page))
243 		elog(ERROR, "concurrent GiST page split was incomplete");
244 
245 	/* should never try to insert to a deleted page */
246 	Assert(!GistPageIsDeleted(page));
247 
248 	*splitinfo = NIL;
249 
250 	/*
251 	 * if isupdate, remove old key: This node's key has been modified, either
252 	 * because a child split occurred or because we needed to adjust our key
253 	 * for an insert in a child node. Therefore, remove the old version of
254 	 * this node's key.
255 	 *
256 	 * for WAL replay, in the non-split case we handle this by setting up a
257 	 * one-element todelete array; in the split case, it's handled implicitly
258 	 * because the tuple vector passed to gistSplit won't include this tuple.
259 	 */
260 	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
261 
262 	/*
263 	 * If leaf page is full, try at first to delete dead tuples. And then
264 	 * check again.
265 	 */
266 	if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
267 	{
268 		gistprunepage(rel, page, buffer, heapRel);
269 		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
270 	}
271 
272 	if (is_split)
273 	{
274 		/* no space for insertion */
275 		IndexTuple *itvec;
276 		int			tlen;
277 		SplitedPageLayout *dist = NULL,
278 				   *ptr;
279 		BlockNumber oldrlink = InvalidBlockNumber;
280 		GistNSN		oldnsn = 0;
281 		SplitedPageLayout rootpg;
282 		bool		is_rootsplit;
283 		int			npage;
284 
285 		is_rootsplit = (blkno == GIST_ROOT_BLKNO);
286 
287 		/*
288 		 * Form index tuples vector to split. If we're replacing an old tuple,
289 		 * remove the old version from the vector.
290 		 */
291 		itvec = gistextractpage(page, &tlen);
292 		if (OffsetNumberIsValid(oldoffnum))
293 		{
294 			/* on inner page we should remove old tuple */
295 			int			pos = oldoffnum - FirstOffsetNumber;
296 
297 			tlen--;
298 			if (pos != tlen)
299 				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
300 		}
301 		itvec = gistjoinvector(itvec, &tlen, itup, ntup);
302 		dist = gistSplit(rel, page, itvec, tlen, giststate);
303 
304 		/*
305 		 * Check that split didn't produce too many pages.
306 		 */
307 		npage = 0;
308 		for (ptr = dist; ptr; ptr = ptr->next)
309 			npage++;
310 		/* in a root split, we'll add one more page to the list below */
311 		if (is_rootsplit)
312 			npage++;
313 		if (npage > GIST_MAX_SPLIT_PAGES)
314 			elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
315 				 npage, GIST_MAX_SPLIT_PAGES);
316 
317 		/*
318 		 * Set up pages to work with. Allocate new buffers for all but the
319 		 * leftmost page. The original page becomes the new leftmost page, and
320 		 * is just replaced with the new contents.
321 		 *
322 		 * For a root-split, allocate new buffers for all child pages, the
323 		 * original page is overwritten with new root page containing
324 		 * downlinks to the new child pages.
325 		 */
326 		ptr = dist;
327 		if (!is_rootsplit)
328 		{
329 			/* save old rightlink and NSN */
330 			oldrlink = GistPageGetOpaque(page)->rightlink;
331 			oldnsn = GistPageGetNSN(page);
332 
333 			dist->buffer = buffer;
334 			dist->block.blkno = BufferGetBlockNumber(buffer);
335 			dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
336 
337 			/* clean all flags except F_LEAF */
338 			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
339 
340 			ptr = ptr->next;
341 		}
342 		for (; ptr; ptr = ptr->next)
343 		{
344 			/* Allocate new page */
345 			ptr->buffer = gistNewBuffer(rel);
346 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
347 			ptr->page = BufferGetPage(ptr->buffer);
348 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
349 			PredicateLockPageSplit(rel,
350 								   BufferGetBlockNumber(buffer),
351 								   BufferGetBlockNumber(ptr->buffer));
352 		}
353 
354 		/*
355 		 * Now that we know which blocks the new pages go to, set up downlink
356 		 * tuples to point to them.
357 		 */
358 		for (ptr = dist; ptr; ptr = ptr->next)
359 		{
360 			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
361 			GistTupleSetValid(ptr->itup);
362 		}
363 
364 		/*
365 		 * If this is a root split, we construct the new root page with the
366 		 * downlinks here directly, instead of requiring the caller to insert
367 		 * them. Add the new root page to the list along with the child pages.
368 		 */
369 		if (is_rootsplit)
370 		{
371 			IndexTuple *downlinks;
372 			int			ndownlinks = 0;
373 			int			i;
374 
375 			rootpg.buffer = buffer;
376 			rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
377 			GistPageGetOpaque(rootpg.page)->flags = 0;
378 
379 			/* Prepare a vector of all the downlinks */
380 			for (ptr = dist; ptr; ptr = ptr->next)
381 				ndownlinks++;
382 			downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
383 			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
384 				downlinks[i++] = ptr->itup;
385 
386 			rootpg.block.blkno = GIST_ROOT_BLKNO;
387 			rootpg.block.num = ndownlinks;
388 			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
389 										  &(rootpg.lenlist));
390 			rootpg.itup = NULL;
391 
392 			rootpg.next = dist;
393 			dist = &rootpg;
394 		}
395 		else
396 		{
397 			/* Prepare split-info to be returned to caller */
398 			for (ptr = dist; ptr; ptr = ptr->next)
399 			{
400 				GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
401 
402 				si->buf = ptr->buffer;
403 				si->downlink = ptr->itup;
404 				*splitinfo = lappend(*splitinfo, si);
405 			}
406 		}
407 
408 		/*
409 		 * Fill all pages. All the pages are new, ie. freshly allocated empty
410 		 * pages, or a temporary copy of the old page.
411 		 */
412 		for (ptr = dist; ptr; ptr = ptr->next)
413 		{
414 			char	   *data = (char *) (ptr->list);
415 
416 			for (i = 0; i < ptr->block.num; i++)
417 			{
418 				IndexTuple	thistup = (IndexTuple) data;
419 
420 				if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
421 					elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
422 
423 				/*
424 				 * If this is the first inserted/updated tuple, let the caller
425 				 * know which page it landed on.
426 				 */
427 				if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
428 					*newblkno = ptr->block.blkno;
429 
430 				data += IndexTupleSize(thistup);
431 			}
432 
433 			/* Set up rightlinks */
434 			if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
435 				GistPageGetOpaque(ptr->page)->rightlink =
436 					ptr->next->block.blkno;
437 			else
438 				GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
439 
440 			/*
441 			 * Mark the all but the right-most page with the follow-right
442 			 * flag. It will be cleared as soon as the downlink is inserted
443 			 * into the parent, but this ensures that if we error out before
444 			 * that, the index is still consistent. (in buffering build mode,
445 			 * any error will abort the index build anyway, so this is not
446 			 * needed.)
447 			 */
448 			if (ptr->next && !is_rootsplit && markfollowright)
449 				GistMarkFollowRight(ptr->page);
450 			else
451 				GistClearFollowRight(ptr->page);
452 
453 			/*
454 			 * Copy the NSN of the original page to all pages. The
455 			 * F_FOLLOW_RIGHT flags ensure that scans will follow the
456 			 * rightlinks until the downlinks are inserted.
457 			 */
458 			GistPageSetNSN(ptr->page, oldnsn);
459 		}
460 
461 		/*
462 		 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
463 		 * insertion for that. NB: The number of pages and data segments
464 		 * specified here must match the calculations in gistXLogSplit()!
465 		 */
466 		if (!is_build && RelationNeedsWAL(rel))
467 			XLogEnsureRecordSpace(npage, 1 + npage * 2);
468 
469 		START_CRIT_SECTION();
470 
471 		/*
472 		 * Must mark buffers dirty before XLogInsert, even though we'll still
473 		 * be changing their opaque fields below.
474 		 */
475 		for (ptr = dist; ptr; ptr = ptr->next)
476 			MarkBufferDirty(ptr->buffer);
477 		if (BufferIsValid(leftchildbuf))
478 			MarkBufferDirty(leftchildbuf);
479 
480 		/*
481 		 * The first page in the chain was a temporary working copy meant to
482 		 * replace the old page. Copy it over the old page.
483 		 */
484 		PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
485 		dist->page = BufferGetPage(dist->buffer);
486 
487 		/*
488 		 * Write the WAL record.
489 		 *
490 		 * If we're building a new index, however, we don't WAL-log changes
491 		 * yet. The LSN-NSN interlock between parent and child requires that
492 		 * LSNs never move backwards, so set the LSNs to a value that's
493 		 * smaller than any real or fake unlogged LSN that might be generated
494 		 * later. (There can't be any concurrent scans during index build, so
495 		 * we don't need to be able to detect concurrent splits yet.)
496 		 */
497 		if (is_build)
498 			recptr = GistBuildLSN;
499 		else
500 		{
501 			if (RelationNeedsWAL(rel))
502 				recptr = gistXLogSplit(is_leaf,
503 									   dist, oldrlink, oldnsn, leftchildbuf,
504 									   markfollowright);
505 			else
506 				recptr = gistGetFakeLSN(rel);
507 		}
508 
509 		for (ptr = dist; ptr; ptr = ptr->next)
510 			PageSetLSN(ptr->page, recptr);
511 
512 		/*
513 		 * Return the new child buffers to the caller.
514 		 *
515 		 * If this was a root split, we've already inserted the downlink
516 		 * pointers, in the form of a new root page. Therefore we can release
517 		 * all the new buffers, and keep just the root page locked.
518 		 */
519 		if (is_rootsplit)
520 		{
521 			for (ptr = dist->next; ptr; ptr = ptr->next)
522 				UnlockReleaseBuffer(ptr->buffer);
523 		}
524 	}
525 	else
526 	{
527 		/*
528 		 * Enough space.  We always get here if ntup==0.
529 		 */
530 		START_CRIT_SECTION();
531 
532 		/*
533 		 * Delete old tuple if any, then insert new tuple(s) if any.  If
534 		 * possible, use the fast path of PageIndexTupleOverwrite.
535 		 */
536 		if (OffsetNumberIsValid(oldoffnum))
537 		{
538 			if (ntup == 1)
539 			{
540 				/* One-for-one replacement, so use PageIndexTupleOverwrite */
541 				if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
542 											 IndexTupleSize(*itup)))
543 					elog(ERROR, "failed to add item to index page in \"%s\"",
544 						 RelationGetRelationName(rel));
545 			}
546 			else
547 			{
548 				/* Delete old, then append new tuple(s) to page */
549 				PageIndexTupleDelete(page, oldoffnum);
550 				gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
551 			}
552 		}
553 		else
554 		{
555 			/* Just append new tuples at the end of the page */
556 			gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
557 		}
558 
559 		MarkBufferDirty(buffer);
560 
561 		if (BufferIsValid(leftchildbuf))
562 			MarkBufferDirty(leftchildbuf);
563 
564 		if (is_build)
565 			recptr = GistBuildLSN;
566 		else
567 		{
568 			if (RelationNeedsWAL(rel))
569 			{
570 				OffsetNumber ndeloffs = 0,
571 							deloffs[1];
572 
573 				if (OffsetNumberIsValid(oldoffnum))
574 				{
575 					deloffs[0] = oldoffnum;
576 					ndeloffs = 1;
577 				}
578 
579 				recptr = gistXLogUpdate(buffer,
580 										deloffs, ndeloffs, itup, ntup,
581 										leftchildbuf);
582 			}
583 			else
584 				recptr = gistGetFakeLSN(rel);
585 		}
586 		PageSetLSN(page, recptr);
587 
588 		if (newblkno)
589 			*newblkno = blkno;
590 	}
591 
592 	/*
593 	 * If we inserted the downlink for a child page, set NSN and clear
594 	 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
595 	 * follow the rightlink if and only if they looked at the parent page
596 	 * before we inserted the downlink.
597 	 *
598 	 * Note that we do this *after* writing the WAL record. That means that
599 	 * the possible full page image in the WAL record does not include these
600 	 * changes, and they must be replayed even if the page is restored from
601 	 * the full page image. There's a chicken-and-egg problem: if we updated
602 	 * the child pages first, we wouldn't know the recptr of the WAL record
603 	 * we're about to write.
604 	 */
605 	if (BufferIsValid(leftchildbuf))
606 	{
607 		Page		leftpg = BufferGetPage(leftchildbuf);
608 
609 		GistPageSetNSN(leftpg, recptr);
610 		GistClearFollowRight(leftpg);
611 
612 		PageSetLSN(leftpg, recptr);
613 	}
614 
615 	END_CRIT_SECTION();
616 
617 	return is_split;
618 }
619 
620 /*
621  * Workhouse routine for doing insertion into a GiST index. Note that
622  * this routine assumes it is invoked in a short-lived memory context,
623  * so it does not bother releasing palloc'd allocations.
624  */
625 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel,bool is_build)626 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
627 			 GISTSTATE *giststate, Relation heapRel, bool is_build)
628 {
629 	ItemId		iid;
630 	IndexTuple	idxtuple;
631 	GISTInsertStack firststack;
632 	GISTInsertStack *stack;
633 	GISTInsertState state;
634 	bool		xlocked = false;
635 
636 	memset(&state, 0, sizeof(GISTInsertState));
637 	state.freespace = freespace;
638 	state.r = r;
639 	state.heapRel = heapRel;
640 	state.is_build = is_build;
641 
642 	/* Start from the root */
643 	firststack.blkno = GIST_ROOT_BLKNO;
644 	firststack.lsn = 0;
645 	firststack.retry_from_parent = false;
646 	firststack.parent = NULL;
647 	firststack.downlinkoffnum = InvalidOffsetNumber;
648 	state.stack = stack = &firststack;
649 
650 	/*
651 	 * Walk down along the path of smallest penalty, updating the parent
652 	 * pointers with the key we're inserting as we go. If we crash in the
653 	 * middle, the tree is consistent, although the possible parent updates
654 	 * were a waste.
655 	 */
656 	for (;;)
657 	{
658 		/*
659 		 * If we split an internal page while descending the tree, we have to
660 		 * retry at the parent. (Normally, the LSN-NSN interlock below would
661 		 * also catch this and cause us to retry. But LSNs are not updated
662 		 * during index build.)
663 		 */
664 		while (stack->retry_from_parent)
665 		{
666 			if (xlocked)
667 				LockBuffer(stack->buffer, GIST_UNLOCK);
668 			xlocked = false;
669 			ReleaseBuffer(stack->buffer);
670 			state.stack = stack = stack->parent;
671 		}
672 
673 		if (XLogRecPtrIsInvalid(stack->lsn))
674 			stack->buffer = ReadBuffer(state.r, stack->blkno);
675 
676 		/*
677 		 * Be optimistic and grab shared lock first. Swap it for an exclusive
678 		 * lock later if we need to update the page.
679 		 */
680 		if (!xlocked)
681 		{
682 			LockBuffer(stack->buffer, GIST_SHARE);
683 			gistcheckpage(state.r, stack->buffer);
684 		}
685 
686 		stack->page = (Page) BufferGetPage(stack->buffer);
687 		stack->lsn = xlocked ?
688 			PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
689 		Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
690 
691 		/*
692 		 * If this page was split but the downlink was never inserted to the
693 		 * parent because the inserting backend crashed before doing that, fix
694 		 * that now.
695 		 */
696 		if (GistFollowRight(stack->page))
697 		{
698 			if (!xlocked)
699 			{
700 				LockBuffer(stack->buffer, GIST_UNLOCK);
701 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
702 				xlocked = true;
703 				/* someone might've completed the split when we unlocked */
704 				if (!GistFollowRight(stack->page))
705 					continue;
706 			}
707 			gistfixsplit(&state, giststate);
708 
709 			UnlockReleaseBuffer(stack->buffer);
710 			xlocked = false;
711 			state.stack = stack = stack->parent;
712 			continue;
713 		}
714 
715 		if ((stack->blkno != GIST_ROOT_BLKNO &&
716 			 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
717 			GistPageIsDeleted(stack->page))
718 		{
719 			/*
720 			 * Concurrent split or page deletion detected. There's no
721 			 * guarantee that the downlink for this page is consistent with
722 			 * the tuple we're inserting anymore, so go back to parent and
723 			 * rechoose the best child.
724 			 */
725 			UnlockReleaseBuffer(stack->buffer);
726 			xlocked = false;
727 			state.stack = stack = stack->parent;
728 			continue;
729 		}
730 
731 		if (!GistPageIsLeaf(stack->page))
732 		{
733 			/*
734 			 * This is an internal page so continue to walk down the tree.
735 			 * Find the child node that has the minimum insertion penalty.
736 			 */
737 			BlockNumber childblkno;
738 			IndexTuple	newtup;
739 			GISTInsertStack *item;
740 			OffsetNumber downlinkoffnum;
741 
742 			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
743 			iid = PageGetItemId(stack->page, downlinkoffnum);
744 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
745 			childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
746 
747 			/*
748 			 * Check that it's not a leftover invalid tuple from pre-9.1
749 			 */
750 			if (GistTupleIsInvalid(idxtuple))
751 				ereport(ERROR,
752 						(errmsg("index \"%s\" contains an inner tuple marked as invalid",
753 								RelationGetRelationName(r)),
754 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
755 						 errhint("Please REINDEX it.")));
756 
757 			/*
758 			 * Check that the key representing the target child node is
759 			 * consistent with the key we're inserting. Update it if it's not.
760 			 */
761 			newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
762 			if (newtup)
763 			{
764 				/*
765 				 * Swap shared lock for an exclusive one. Beware, the page may
766 				 * change while we unlock/lock the page...
767 				 */
768 				if (!xlocked)
769 				{
770 					LockBuffer(stack->buffer, GIST_UNLOCK);
771 					LockBuffer(stack->buffer, GIST_EXCLUSIVE);
772 					xlocked = true;
773 					stack->page = (Page) BufferGetPage(stack->buffer);
774 
775 					if (PageGetLSN(stack->page) != stack->lsn)
776 					{
777 						/* the page was changed while we unlocked it, retry */
778 						continue;
779 					}
780 				}
781 
782 				/*
783 				 * Update the tuple.
784 				 *
785 				 * We still hold the lock after gistinserttuple(), but it
786 				 * might have to split the page to make the updated tuple fit.
787 				 * In that case the updated tuple might migrate to the other
788 				 * half of the split, so we have to go back to the parent and
789 				 * descend back to the half that's a better fit for the new
790 				 * tuple.
791 				 */
792 				if (gistinserttuple(&state, stack, giststate, newtup,
793 									downlinkoffnum))
794 				{
795 					/*
796 					 * If this was a root split, the root page continues to be
797 					 * the parent and the updated tuple went to one of the
798 					 * child pages, so we just need to retry from the root
799 					 * page.
800 					 */
801 					if (stack->blkno != GIST_ROOT_BLKNO)
802 					{
803 						UnlockReleaseBuffer(stack->buffer);
804 						xlocked = false;
805 						state.stack = stack = stack->parent;
806 					}
807 					continue;
808 				}
809 			}
810 			LockBuffer(stack->buffer, GIST_UNLOCK);
811 			xlocked = false;
812 
813 			/* descend to the chosen child */
814 			item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
815 			item->blkno = childblkno;
816 			item->parent = stack;
817 			item->downlinkoffnum = downlinkoffnum;
818 			state.stack = stack = item;
819 		}
820 		else
821 		{
822 			/*
823 			 * Leaf page. Insert the new key. We've already updated all the
824 			 * parents on the way down, but we might have to split the page if
825 			 * it doesn't fit. gistinserthere() will take care of that.
826 			 */
827 
828 			/*
829 			 * Swap shared lock for an exclusive one. Be careful, the page may
830 			 * change while we unlock/lock the page...
831 			 */
832 			if (!xlocked)
833 			{
834 				LockBuffer(stack->buffer, GIST_UNLOCK);
835 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
836 				xlocked = true;
837 				stack->page = (Page) BufferGetPage(stack->buffer);
838 				stack->lsn = PageGetLSN(stack->page);
839 
840 				if (stack->blkno == GIST_ROOT_BLKNO)
841 				{
842 					/*
843 					 * the only page that can become inner instead of leaf is
844 					 * the root page, so for root we should recheck it
845 					 */
846 					if (!GistPageIsLeaf(stack->page))
847 					{
848 						/*
849 						 * very rare situation: during unlock/lock index with
850 						 * number of pages = 1 was increased
851 						 */
852 						LockBuffer(stack->buffer, GIST_UNLOCK);
853 						xlocked = false;
854 						continue;
855 					}
856 
857 					/*
858 					 * we don't need to check root split, because checking
859 					 * leaf/inner is enough to recognize split for root
860 					 */
861 				}
862 				else if ((GistFollowRight(stack->page) ||
863 						  stack->parent->lsn < GistPageGetNSN(stack->page)) ||
864 						 GistPageIsDeleted(stack->page))
865 				{
866 					/*
867 					 * The page was split or deleted while we momentarily
868 					 * unlocked the page. Go back to parent.
869 					 */
870 					UnlockReleaseBuffer(stack->buffer);
871 					xlocked = false;
872 					state.stack = stack = stack->parent;
873 					continue;
874 				}
875 			}
876 
877 			/* now state.stack->(page, buffer and blkno) points to leaf page */
878 
879 			gistinserttuple(&state, stack, giststate, itup,
880 							InvalidOffsetNumber);
881 			LockBuffer(stack->buffer, GIST_UNLOCK);
882 
883 			/* Release any pins we might still hold before exiting */
884 			for (; stack; stack = stack->parent)
885 				ReleaseBuffer(stack->buffer);
886 			break;
887 		}
888 	}
889 }
890 
891 /*
892  * Traverse the tree to find path from root page to specified "child" block.
893  *
894  * returns a new insertion stack, starting from the parent of "child", up
895  * to the root. *downlinkoffnum is set to the offset of the downlink in the
896  * direct parent of child.
897  *
898  * To prevent deadlocks, this should lock only one page at a time.
899  */
900 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)901 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
902 {
903 	Page		page;
904 	Buffer		buffer;
905 	OffsetNumber i,
906 				maxoff;
907 	ItemId		iid;
908 	IndexTuple	idxtuple;
909 	List	   *fifo;
910 	GISTInsertStack *top,
911 			   *ptr;
912 	BlockNumber blkno;
913 
914 	top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
915 	top->blkno = GIST_ROOT_BLKNO;
916 	top->downlinkoffnum = InvalidOffsetNumber;
917 
918 	fifo = list_make1(top);
919 	while (fifo != NIL)
920 	{
921 		/* Get next page to visit */
922 		top = linitial(fifo);
923 		fifo = list_delete_first(fifo);
924 
925 		buffer = ReadBuffer(r, top->blkno);
926 		LockBuffer(buffer, GIST_SHARE);
927 		gistcheckpage(r, buffer);
928 		page = (Page) BufferGetPage(buffer);
929 
930 		if (GistPageIsLeaf(page))
931 		{
932 			/*
933 			 * Because we scan the index top-down, all the rest of the pages
934 			 * in the queue must be leaf pages as well.
935 			 */
936 			UnlockReleaseBuffer(buffer);
937 			break;
938 		}
939 
940 		/* currently, internal pages are never deleted */
941 		Assert(!GistPageIsDeleted(page));
942 
943 		top->lsn = BufferGetLSNAtomic(buffer);
944 
945 		/*
946 		 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
947 		 * downlink. This should not normally happen..
948 		 */
949 		if (GistFollowRight(page))
950 			elog(ERROR, "concurrent GiST page split was incomplete");
951 
952 		if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
953 			GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
954 		{
955 			/*
956 			 * Page was split while we looked elsewhere. We didn't see the
957 			 * downlink to the right page when we scanned the parent, so add
958 			 * it to the queue now.
959 			 *
960 			 * Put the right page ahead of the queue, so that we visit it
961 			 * next. That's important, because if this is the lowest internal
962 			 * level, just above leaves, we might already have queued up some
963 			 * leaf pages, and we assume that there can't be any non-leaf
964 			 * pages behind leaf pages.
965 			 */
966 			ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
967 			ptr->blkno = GistPageGetOpaque(page)->rightlink;
968 			ptr->downlinkoffnum = InvalidOffsetNumber;
969 			ptr->parent = top->parent;
970 
971 			fifo = lcons(ptr, fifo);
972 		}
973 
974 		maxoff = PageGetMaxOffsetNumber(page);
975 
976 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
977 		{
978 			iid = PageGetItemId(page, i);
979 			idxtuple = (IndexTuple) PageGetItem(page, iid);
980 			blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
981 			if (blkno == child)
982 			{
983 				/* Found it! */
984 				UnlockReleaseBuffer(buffer);
985 				*downlinkoffnum = i;
986 				return top;
987 			}
988 			else
989 			{
990 				/* Append this child to the list of pages to visit later */
991 				ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
992 				ptr->blkno = blkno;
993 				ptr->downlinkoffnum = i;
994 				ptr->parent = top;
995 
996 				fifo = lappend(fifo, ptr);
997 			}
998 		}
999 
1000 		UnlockReleaseBuffer(buffer);
1001 	}
1002 
1003 	elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
1004 		 RelationGetRelationName(r), child);
1005 	return NULL;				/* keep compiler quiet */
1006 }
1007 
1008 /*
1009  * Updates the stack so that child->parent is the correct parent of the
1010  * child. child->parent must be exclusively locked on entry, and will
1011  * remain so at exit, but it might not be the same page anymore.
1012  */
1013 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)1014 gistFindCorrectParent(Relation r, GISTInsertStack *child)
1015 {
1016 	GISTInsertStack *parent = child->parent;
1017 
1018 	gistcheckpage(r, parent->buffer);
1019 	parent->page = (Page) BufferGetPage(parent->buffer);
1020 
1021 	/* here we don't need to distinguish between split and page update */
1022 	if (child->downlinkoffnum == InvalidOffsetNumber ||
1023 		parent->lsn != PageGetLSN(parent->page))
1024 	{
1025 		/* parent is changed, look child in right links until found */
1026 		OffsetNumber i,
1027 					maxoff;
1028 		ItemId		iid;
1029 		IndexTuple	idxtuple;
1030 		GISTInsertStack *ptr;
1031 
1032 		while (true)
1033 		{
1034 			maxoff = PageGetMaxOffsetNumber(parent->page);
1035 			for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1036 			{
1037 				iid = PageGetItemId(parent->page, i);
1038 				idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1039 				if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1040 				{
1041 					/* yes!!, found */
1042 					child->downlinkoffnum = i;
1043 					return;
1044 				}
1045 			}
1046 
1047 			parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1048 			UnlockReleaseBuffer(parent->buffer);
1049 			if (parent->blkno == InvalidBlockNumber)
1050 			{
1051 				/*
1052 				 * End of chain and still didn't find parent. It's a very-very
1053 				 * rare situation when root splited.
1054 				 */
1055 				break;
1056 			}
1057 			parent->buffer = ReadBuffer(r, parent->blkno);
1058 			LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1059 			gistcheckpage(r, parent->buffer);
1060 			parent->page = (Page) BufferGetPage(parent->buffer);
1061 		}
1062 
1063 		/*
1064 		 * awful!!, we need search tree to find parent ... , but before we
1065 		 * should release all old parent
1066 		 */
1067 
1068 		ptr = child->parent->parent;	/* child->parent already released
1069 										 * above */
1070 		while (ptr)
1071 		{
1072 			ReleaseBuffer(ptr->buffer);
1073 			ptr = ptr->parent;
1074 		}
1075 
1076 		/* ok, find new path */
1077 		ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1078 
1079 		/* read all buffers as expected by caller */
1080 		/* note we don't lock them or gistcheckpage them here! */
1081 		while (ptr)
1082 		{
1083 			ptr->buffer = ReadBuffer(r, ptr->blkno);
1084 			ptr->page = (Page) BufferGetPage(ptr->buffer);
1085 			ptr = ptr->parent;
1086 		}
1087 
1088 		/* install new chain of parents to stack */
1089 		child->parent = parent;
1090 
1091 		/* make recursive call to normal processing */
1092 		LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1093 		gistFindCorrectParent(r, child);
1094 	}
1095 
1096 	return;
1097 }
1098 
1099 /*
1100  * Form a downlink pointer for the page in 'buf'.
1101  */
1102 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1103 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1104 				 GISTInsertStack *stack)
1105 {
1106 	Page		page = BufferGetPage(buf);
1107 	OffsetNumber maxoff;
1108 	OffsetNumber offset;
1109 	IndexTuple	downlink = NULL;
1110 
1111 	maxoff = PageGetMaxOffsetNumber(page);
1112 	for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1113 	{
1114 		IndexTuple	ituple = (IndexTuple)
1115 		PageGetItem(page, PageGetItemId(page, offset));
1116 
1117 		if (downlink == NULL)
1118 			downlink = CopyIndexTuple(ituple);
1119 		else
1120 		{
1121 			IndexTuple	newdownlink;
1122 
1123 			newdownlink = gistgetadjusted(rel, downlink, ituple,
1124 										  giststate);
1125 			if (newdownlink)
1126 				downlink = newdownlink;
1127 		}
1128 	}
1129 
1130 	/*
1131 	 * If the page is completely empty, we can't form a meaningful downlink
1132 	 * for it. But we have to insert a downlink for the page. Any key will do,
1133 	 * as long as its consistent with the downlink of parent page, so that we
1134 	 * can legally insert it to the parent. A minimal one that matches as few
1135 	 * scans as possible would be best, to keep scans from doing useless work,
1136 	 * but we don't know how to construct that. So we just use the downlink of
1137 	 * the original page that was split - that's as far from optimal as it can
1138 	 * get but will do..
1139 	 */
1140 	if (!downlink)
1141 	{
1142 		ItemId		iid;
1143 
1144 		LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1145 		gistFindCorrectParent(rel, stack);
1146 		iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1147 		downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1148 		downlink = CopyIndexTuple(downlink);
1149 		LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1150 	}
1151 
1152 	ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1153 	GistTupleSetValid(downlink);
1154 
1155 	return downlink;
1156 }
1157 
1158 
1159 /*
1160  * Complete the incomplete split of state->stack->page.
1161  */
1162 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1163 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1164 {
1165 	GISTInsertStack *stack = state->stack;
1166 	Buffer		buf;
1167 	Page		page;
1168 	List	   *splitinfo = NIL;
1169 
1170 	elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1171 		 RelationGetRelationName(state->r), stack->blkno);
1172 
1173 	Assert(GistFollowRight(stack->page));
1174 	Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1175 
1176 	buf = stack->buffer;
1177 
1178 	/*
1179 	 * Read the chain of split pages, following the rightlinks. Construct a
1180 	 * downlink tuple for each page.
1181 	 */
1182 	for (;;)
1183 	{
1184 		GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1185 		IndexTuple	downlink;
1186 
1187 		page = BufferGetPage(buf);
1188 
1189 		/* Form the new downlink tuples to insert to parent */
1190 		downlink = gistformdownlink(state->r, buf, giststate, stack);
1191 
1192 		si->buf = buf;
1193 		si->downlink = downlink;
1194 
1195 		splitinfo = lappend(splitinfo, si);
1196 
1197 		if (GistFollowRight(page))
1198 		{
1199 			/* lock next page */
1200 			buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1201 			LockBuffer(buf, GIST_EXCLUSIVE);
1202 		}
1203 		else
1204 			break;
1205 	}
1206 
1207 	/* Insert the downlinks */
1208 	gistfinishsplit(state, stack, giststate, splitinfo, false);
1209 }
1210 
1211 /*
1212  * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1213  * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1214  * 'stack' represents the path from the root to the page being updated.
1215  *
1216  * The caller must hold an exclusive lock on stack->buffer.  The lock is still
1217  * held on return, but the page might not contain the inserted tuple if the
1218  * page was split. The function returns true if the page was split, false
1219  * otherwise.
1220  */
1221 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1222 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1223 				GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1224 {
1225 	return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1226 							InvalidBuffer, InvalidBuffer, false, false);
1227 }
1228 
1229 /* ----------------
1230  * An extended workhorse version of gistinserttuple(). This version allows
1231  * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1232  * This is used to recursively update the downlinks in the parent when a page
1233  * is split.
1234  *
1235  * If leftchild and rightchild are valid, we're inserting/replacing the
1236  * downlink for rightchild, and leftchild is its left sibling. We clear the
1237  * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1238  * insertion of the downlink.
1239  *
1240  * To avoid holding locks for longer than necessary, when recursing up the
1241  * tree to update the parents, the locking is a bit peculiar here. On entry,
1242  * the caller must hold an exclusive lock on stack->buffer, as well as
1243  * leftchild and rightchild if given. On return:
1244  *
1245  *	- Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1246  *	  always kept pinned, however.
1247  *	- Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1248  *	  is kept pinned.
1249  *	- Lock and pin on 'rightchild' are always released.
1250  *
1251  * Returns 'true' if the page had to be split. Note that if the page was
1252  * split, the inserted/updated tuples might've been inserted to a right
1253  * sibling of stack->buffer instead of stack->buffer itself.
1254  */
1255 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1256 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1257 				 GISTSTATE *giststate,
1258 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1259 				 Buffer leftchild, Buffer rightchild,
1260 				 bool unlockbuf, bool unlockleftchild)
1261 {
1262 	List	   *splitinfo;
1263 	bool		is_split;
1264 
1265 	/*
1266 	 * Check for any rw conflicts (in serializable isolation level) just
1267 	 * before we intend to modify the page
1268 	 */
1269 	CheckForSerializableConflictIn(state->r, NULL, stack->buffer);
1270 
1271 	/* Insert the tuple(s) to the page, splitting the page if necessary */
1272 	is_split = gistplacetopage(state->r, state->freespace, giststate,
1273 							   stack->buffer,
1274 							   tuples, ntup,
1275 							   oldoffnum, NULL,
1276 							   leftchild,
1277 							   &splitinfo,
1278 							   true,
1279 							   state->heapRel,
1280 							   state->is_build);
1281 
1282 	/*
1283 	 * Before recursing up in case the page was split, release locks on the
1284 	 * child pages. We don't need to keep them locked when updating the
1285 	 * parent.
1286 	 */
1287 	if (BufferIsValid(rightchild))
1288 		UnlockReleaseBuffer(rightchild);
1289 	if (BufferIsValid(leftchild) && unlockleftchild)
1290 		LockBuffer(leftchild, GIST_UNLOCK);
1291 
1292 	/*
1293 	 * If we had to split, insert/update the downlinks in the parent. If the
1294 	 * caller requested us to release the lock on stack->buffer, tell
1295 	 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1296 	 * didn't have to split, release it ourselves.
1297 	 */
1298 	if (splitinfo)
1299 		gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1300 	else if (unlockbuf)
1301 		LockBuffer(stack->buffer, GIST_UNLOCK);
1302 
1303 	return is_split;
1304 }
1305 
1306 /*
1307  * Finish an incomplete split by inserting/updating the downlinks in parent
1308  * page. 'splitinfo' contains all the child pages involved in the split,
1309  * from left-to-right.
1310  *
1311  * On entry, the caller must hold a lock on stack->buffer and all the child
1312  * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1313  * released on return. The child pages are always unlocked and unpinned.
1314  */
1315 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1316 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1317 				GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1318 {
1319 	ListCell   *lc;
1320 	List	   *reversed;
1321 	GISTPageSplitInfo *right;
1322 	GISTPageSplitInfo *left;
1323 	IndexTuple	tuples[2];
1324 
1325 	/* A split always contains at least two halves */
1326 	Assert(list_length(splitinfo) >= 2);
1327 
1328 	/*
1329 	 * We need to insert downlinks for each new page, and update the downlink
1330 	 * for the original (leftmost) page in the split. Begin at the rightmost
1331 	 * page, inserting one downlink at a time until there's only two pages
1332 	 * left. Finally insert the downlink for the last new page and update the
1333 	 * downlink for the original page as one operation.
1334 	 */
1335 
1336 	/* for convenience, create a copy of the list in reverse order */
1337 	reversed = NIL;
1338 	foreach(lc, splitinfo)
1339 	{
1340 		reversed = lcons(lfirst(lc), reversed);
1341 	}
1342 
1343 	LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1344 
1345 	/*
1346 	 * Insert downlinks for the siblings from right to left, until there are
1347 	 * only two siblings left.
1348 	 */
1349 	while (list_length(reversed) > 2)
1350 	{
1351 		right = (GISTPageSplitInfo *) linitial(reversed);
1352 		left = (GISTPageSplitInfo *) lsecond(reversed);
1353 
1354 		gistFindCorrectParent(state->r, stack);
1355 		if (gistinserttuples(state, stack->parent, giststate,
1356 							 &right->downlink, 1,
1357 							 InvalidOffsetNumber,
1358 							 left->buf, right->buf, false, false))
1359 		{
1360 			/*
1361 			 * If the parent page was split, the existing downlink might
1362 			 * have moved.
1363 			 */
1364 			stack->downlinkoffnum = InvalidOffsetNumber;
1365 		}
1366 		/* gistinserttuples() released the lock on right->buf. */
1367 		reversed = list_delete_first(reversed);
1368 	}
1369 
1370 	right = (GISTPageSplitInfo *) linitial(reversed);
1371 	left = (GISTPageSplitInfo *) lsecond(reversed);
1372 
1373 	/*
1374 	 * Finally insert downlink for the remaining right page and update the
1375 	 * downlink for the original page to not contain the tuples that were
1376 	 * moved to the new pages.
1377 	 */
1378 	tuples[0] = left->downlink;
1379 	tuples[1] = right->downlink;
1380 	gistFindCorrectParent(state->r, stack);
1381 	if (gistinserttuples(state, stack->parent, giststate,
1382 						 tuples, 2,
1383 						 stack->downlinkoffnum,
1384 						 left->buf, right->buf,
1385 						 true,		/* Unlock parent */
1386 						 unlockbuf	/* Unlock stack->buffer if caller wants that */
1387 			))
1388 	{
1389 		/*
1390 		 * If the parent page was split, the downlink might have moved.
1391 		 */
1392 		stack->downlinkoffnum = InvalidOffsetNumber;
1393 	}
1394 
1395 	Assert(left->buf == stack->buffer);
1396 
1397 	/*
1398 	 * If we split the page because we had to adjust the downlink on an
1399 	 * internal page, while descending the tree for inserting a new tuple,
1400 	 * then this might no longer be the correct page for the new tuple. The
1401 	 * downlink to this page might not cover the new tuple anymore, it might
1402 	 * need to go to the newly-created right sibling instead. Tell the caller
1403 	 * to walk back up the stack, to re-check at the parent which page to
1404 	 * insert to.
1405 	 *
1406 	 * Normally, the LSN-NSN interlock during the tree descend would also
1407 	 * detect that a concurrent split happened (by ourselves), and cause us to
1408 	 * retry at the parent. But that mechanism doesn't work during index
1409 	 * build, because we don't do WAL-logging, and don't update LSNs, during
1410 	 * index build.
1411 	 */
1412 	stack->retry_from_parent = true;
1413 }
1414 
1415 /*
1416  * gistSplit -- split a page in the tree and fill struct
1417  * used for XLOG and real writes buffers. Function is recursive, ie
1418  * it will split page until keys will fit in every page.
1419  */
1420 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1421 gistSplit(Relation r,
1422 		  Page page,
1423 		  IndexTuple *itup,		/* contains compressed entry */
1424 		  int len,
1425 		  GISTSTATE *giststate)
1426 {
1427 	IndexTuple *lvectup,
1428 			   *rvectup;
1429 	GistSplitVector v;
1430 	int			i;
1431 	SplitedPageLayout *res = NULL;
1432 
1433 	/* this should never recurse very deeply, but better safe than sorry */
1434 	check_stack_depth();
1435 
1436 	/* there's no point in splitting an empty page */
1437 	Assert(len > 0);
1438 
1439 	/*
1440 	 * If a single tuple doesn't fit on a page, no amount of splitting will
1441 	 * help.
1442 	 */
1443 	if (len == 1)
1444 		ereport(ERROR,
1445 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1446 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1447 						IndexTupleSize(itup[0]), GiSTPageSize,
1448 						RelationGetRelationName(r))));
1449 
1450 	memset(v.spl_lisnull, true,
1451 		   sizeof(bool) * giststate->nonLeafTupdesc->natts);
1452 	memset(v.spl_risnull, true,
1453 		   sizeof(bool) * giststate->nonLeafTupdesc->natts);
1454 	gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1455 
1456 	/* form left and right vector */
1457 	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1458 	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1459 
1460 	for (i = 0; i < v.splitVector.spl_nleft; i++)
1461 		lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1462 
1463 	for (i = 0; i < v.splitVector.spl_nright; i++)
1464 		rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1465 
1466 	/* finalize splitting (may need another split) */
1467 	if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1468 	{
1469 		res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1470 	}
1471 	else
1472 	{
1473 		ROTATEDIST(res);
1474 		res->block.num = v.splitVector.spl_nright;
1475 		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1476 		res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1477 	}
1478 
1479 	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1480 	{
1481 		SplitedPageLayout *resptr,
1482 				   *subres;
1483 
1484 		resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1485 
1486 		/* install on list's tail */
1487 		while (resptr->next)
1488 			resptr = resptr->next;
1489 
1490 		resptr->next = res;
1491 		res = subres;
1492 	}
1493 	else
1494 	{
1495 		ROTATEDIST(res);
1496 		res->block.num = v.splitVector.spl_nleft;
1497 		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1498 		res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1499 	}
1500 
1501 	return res;
1502 }
1503 
1504 /*
1505  * Create a GISTSTATE and fill it with information about the index
1506  */
1507 GISTSTATE *
initGISTstate(Relation index)1508 initGISTstate(Relation index)
1509 {
1510 	GISTSTATE  *giststate;
1511 	MemoryContext scanCxt;
1512 	MemoryContext oldCxt;
1513 	int			i;
1514 
1515 	/* safety check to protect fixed-size arrays in GISTSTATE */
1516 	if (index->rd_att->natts > INDEX_MAX_KEYS)
1517 		elog(ERROR, "numberOfAttributes %d > %d",
1518 			 index->rd_att->natts, INDEX_MAX_KEYS);
1519 
1520 	/* Create the memory context that will hold the GISTSTATE */
1521 	scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1522 									"GiST scan context",
1523 									ALLOCSET_DEFAULT_SIZES);
1524 	oldCxt = MemoryContextSwitchTo(scanCxt);
1525 
1526 	/* Create and fill in the GISTSTATE */
1527 	giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1528 
1529 	giststate->scanCxt = scanCxt;
1530 	giststate->tempCxt = scanCxt;	/* caller must change this if needed */
1531 	giststate->leafTupdesc = index->rd_att;
1532 
1533 	/*
1534 	 * The truncated tupdesc for non-leaf index tuples, which doesn't contain
1535 	 * the INCLUDE attributes.
1536 	 *
1537 	 * It is used to form tuples during tuple adjustment and page split.
1538 	 * B-tree creates shortened tuple descriptor for every truncated tuple,
1539 	 * because it is doing this less often: it does not have to form truncated
1540 	 * tuples during page split.  Also, B-tree is not adjusting tuples on
1541 	 * internal pages the way GiST does.
1542 	 */
1543 	giststate->nonLeafTupdesc = CreateTupleDescCopyConstr(index->rd_att);
1544 	giststate->nonLeafTupdesc->natts =
1545 		IndexRelationGetNumberOfKeyAttributes(index);
1546 
1547 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(index); i++)
1548 	{
1549 		fmgr_info_copy(&(giststate->consistentFn[i]),
1550 					   index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1551 					   scanCxt);
1552 		fmgr_info_copy(&(giststate->unionFn[i]),
1553 					   index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1554 					   scanCxt);
1555 
1556 		/* opclasses are not required to provide a Compress method */
1557 		if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1558 			fmgr_info_copy(&(giststate->compressFn[i]),
1559 						   index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1560 						   scanCxt);
1561 		else
1562 			giststate->compressFn[i].fn_oid = InvalidOid;
1563 
1564 		/* opclasses are not required to provide a Decompress method */
1565 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1566 			fmgr_info_copy(&(giststate->decompressFn[i]),
1567 						   index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1568 						   scanCxt);
1569 		else
1570 			giststate->decompressFn[i].fn_oid = InvalidOid;
1571 
1572 		fmgr_info_copy(&(giststate->penaltyFn[i]),
1573 					   index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1574 					   scanCxt);
1575 		fmgr_info_copy(&(giststate->picksplitFn[i]),
1576 					   index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1577 					   scanCxt);
1578 		fmgr_info_copy(&(giststate->equalFn[i]),
1579 					   index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1580 					   scanCxt);
1581 
1582 		/* opclasses are not required to provide a Distance method */
1583 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1584 			fmgr_info_copy(&(giststate->distanceFn[i]),
1585 						   index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1586 						   scanCxt);
1587 		else
1588 			giststate->distanceFn[i].fn_oid = InvalidOid;
1589 
1590 		/* opclasses are not required to provide a Fetch method */
1591 		if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1592 			fmgr_info_copy(&(giststate->fetchFn[i]),
1593 						   index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1594 						   scanCxt);
1595 		else
1596 			giststate->fetchFn[i].fn_oid = InvalidOid;
1597 
1598 		/*
1599 		 * If the index column has a specified collation, we should honor that
1600 		 * while doing comparisons.  However, we may have a collatable storage
1601 		 * type for a noncollatable indexed data type.  If there's no index
1602 		 * collation then specify default collation in case the support
1603 		 * functions need collation.  This is harmless if the support
1604 		 * functions don't care about collation, so we just do it
1605 		 * unconditionally.  (We could alternatively call get_typcollation,
1606 		 * but that seems like expensive overkill --- there aren't going to be
1607 		 * any cases where a GiST storage type has a nondefault collation.)
1608 		 */
1609 		if (OidIsValid(index->rd_indcollation[i]))
1610 			giststate->supportCollation[i] = index->rd_indcollation[i];
1611 		else
1612 			giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1613 	}
1614 
1615 	/* No opclass information for INCLUDE attributes */
1616 	for (; i < index->rd_att->natts; i++)
1617 	{
1618 		giststate->consistentFn[i].fn_oid = InvalidOid;
1619 		giststate->unionFn[i].fn_oid = InvalidOid;
1620 		giststate->compressFn[i].fn_oid = InvalidOid;
1621 		giststate->decompressFn[i].fn_oid = InvalidOid;
1622 		giststate->penaltyFn[i].fn_oid = InvalidOid;
1623 		giststate->picksplitFn[i].fn_oid = InvalidOid;
1624 		giststate->equalFn[i].fn_oid = InvalidOid;
1625 		giststate->distanceFn[i].fn_oid = InvalidOid;
1626 		giststate->fetchFn[i].fn_oid = InvalidOid;
1627 		giststate->supportCollation[i] = InvalidOid;
1628 	}
1629 
1630 	MemoryContextSwitchTo(oldCxt);
1631 
1632 	return giststate;
1633 }
1634 
1635 void
freeGISTstate(GISTSTATE * giststate)1636 freeGISTstate(GISTSTATE *giststate)
1637 {
1638 	/* It's sufficient to delete the scanCxt */
1639 	MemoryContextDelete(giststate->scanCxt);
1640 }
1641 
1642 /*
1643  * gistprunepage() -- try to remove LP_DEAD items from the given page.
1644  * Function assumes that buffer is exclusively locked.
1645  */
1646 static void
gistprunepage(Relation rel,Page page,Buffer buffer,Relation heapRel)1647 gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1648 {
1649 	OffsetNumber deletable[MaxIndexTuplesPerPage];
1650 	int			ndeletable = 0;
1651 	OffsetNumber offnum,
1652 				maxoff;
1653 	TransactionId latestRemovedXid = InvalidTransactionId;
1654 
1655 	Assert(GistPageIsLeaf(page));
1656 
1657 	/*
1658 	 * Scan over all items to see which ones need to be deleted according to
1659 	 * LP_DEAD flags.
1660 	 */
1661 	maxoff = PageGetMaxOffsetNumber(page);
1662 	for (offnum = FirstOffsetNumber;
1663 		 offnum <= maxoff;
1664 		 offnum = OffsetNumberNext(offnum))
1665 	{
1666 		ItemId		itemId = PageGetItemId(page, offnum);
1667 
1668 		if (ItemIdIsDead(itemId))
1669 			deletable[ndeletable++] = offnum;
1670 	}
1671 
1672 	if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
1673 		latestRemovedXid =
1674 			index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
1675 												 deletable, ndeletable);
1676 
1677 	if (ndeletable > 0)
1678 	{
1679 		START_CRIT_SECTION();
1680 
1681 		PageIndexMultiDelete(page, deletable, ndeletable);
1682 
1683 		/*
1684 		 * Mark the page as not containing any LP_DEAD items.  This is not
1685 		 * certainly true (there might be some that have recently been marked,
1686 		 * but weren't included in our target-item list), but it will almost
1687 		 * always be true and it doesn't seem worth an additional page scan to
1688 		 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1689 		 */
1690 		GistClearPageHasGarbage(page);
1691 
1692 		MarkBufferDirty(buffer);
1693 
1694 		/* XLOG stuff */
1695 		if (RelationNeedsWAL(rel))
1696 		{
1697 			XLogRecPtr	recptr;
1698 
1699 			recptr = gistXLogDelete(buffer,
1700 									deletable, ndeletable,
1701 									latestRemovedXid);
1702 
1703 			PageSetLSN(page, recptr);
1704 		}
1705 		else
1706 			PageSetLSN(page, gistGetFakeLSN(rel));
1707 
1708 		END_CRIT_SECTION();
1709 	}
1710 
1711 	/*
1712 	 * Note: if we didn't find any LP_DEAD items, then the page's
1713 	 * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1714 	 * separate write to clear it, however.  We will clear it when we split
1715 	 * the page.
1716 	 */
1717 }
1718