1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *	  interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "storage/lmgr.h"
22 #include "storage/predicate.h"
23 #include "nodes/execnodes.h"
24 #include "utils/builtins.h"
25 #include "utils/index_selfuncs.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28 
29 
30 /* non-export function prototypes */
31 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 				GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 				 GISTSTATE *giststate,
36 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 				 Buffer leftchild, Buffer rightchild,
38 				 bool unlockbuf, bool unlockleftchild);
39 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 				GISTSTATE *giststate, List *splitinfo, bool releasebuf);
41 static void gistvacuumpage(Relation rel, Page page, Buffer buffer,
42 			   Relation heapRel);
43 
44 
45 #define ROTATEDIST(d) do { \
46 	SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
47 	memset(tmp,0,sizeof(SplitedPageLayout)); \
48 	tmp->block.blkno = InvalidBlockNumber;	\
49 	tmp->buffer = InvalidBuffer;	\
50 	tmp->next = (d); \
51 	(d)=tmp; \
52 } while(0)
53 
54 
55 /*
56  * GiST handler function: return IndexAmRoutine with access method parameters
57  * and callbacks.
58  */
59 Datum
gisthandler(PG_FUNCTION_ARGS)60 gisthandler(PG_FUNCTION_ARGS)
61 {
62 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
63 
64 	amroutine->amstrategies = 0;
65 	amroutine->amsupport = GISTNProcs;
66 	amroutine->amcanorder = false;
67 	amroutine->amcanorderbyop = true;
68 	amroutine->amcanbackward = false;
69 	amroutine->amcanunique = false;
70 	amroutine->amcanmulticol = true;
71 	amroutine->amoptionalkey = true;
72 	amroutine->amsearcharray = false;
73 	amroutine->amsearchnulls = true;
74 	amroutine->amstorage = true;
75 	amroutine->amclusterable = true;
76 	amroutine->ampredlocks = true;
77 	amroutine->amcanparallel = false;
78 	amroutine->amcaninclude = false;
79 	amroutine->amkeytype = InvalidOid;
80 
81 	amroutine->ambuild = gistbuild;
82 	amroutine->ambuildempty = gistbuildempty;
83 	amroutine->aminsert = gistinsert;
84 	amroutine->ambulkdelete = gistbulkdelete;
85 	amroutine->amvacuumcleanup = gistvacuumcleanup;
86 	amroutine->amcanreturn = gistcanreturn;
87 	amroutine->amcostestimate = gistcostestimate;
88 	amroutine->amoptions = gistoptions;
89 	amroutine->amproperty = gistproperty;
90 	amroutine->amvalidate = gistvalidate;
91 	amroutine->ambeginscan = gistbeginscan;
92 	amroutine->amrescan = gistrescan;
93 	amroutine->amgettuple = gistgettuple;
94 	amroutine->amgetbitmap = gistgetbitmap;
95 	amroutine->amendscan = gistendscan;
96 	amroutine->ammarkpos = NULL;
97 	amroutine->amrestrpos = NULL;
98 	amroutine->amestimateparallelscan = NULL;
99 	amroutine->aminitparallelscan = NULL;
100 	amroutine->amparallelrescan = NULL;
101 
102 	PG_RETURN_POINTER(amroutine);
103 }
104 
105 /*
106  * Create and return a temporary memory context for use by GiST. We
107  * _always_ invoke user-provided methods in a temporary memory
108  * context, so that memory leaks in those functions cannot cause
109  * problems. Also, we use some additional temporary contexts in the
110  * GiST code itself, to avoid the need to do some awkward manual
111  * memory management.
112  */
113 MemoryContext
createTempGistContext(void)114 createTempGistContext(void)
115 {
116 	return AllocSetContextCreate(CurrentMemoryContext,
117 								 "GiST temporary context",
118 								 ALLOCSET_DEFAULT_SIZES);
119 }
120 
121 /*
122  *	gistbuildempty() -- build an empty gist index in the initialization fork
123  */
124 void
gistbuildempty(Relation index)125 gistbuildempty(Relation index)
126 {
127 	Buffer		buffer;
128 
129 	/* Initialize the root page */
130 	buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
131 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
132 
133 	/* Initialize and xlog buffer */
134 	START_CRIT_SECTION();
135 	GISTInitBuffer(buffer, F_LEAF);
136 	MarkBufferDirty(buffer);
137 	log_newpage_buffer(buffer, true);
138 	END_CRIT_SECTION();
139 
140 	/* Unlock and release the buffer */
141 	UnlockReleaseBuffer(buffer);
142 }
143 
144 /*
145  *	gistinsert -- wrapper for GiST tuple insertion.
146  *
147  *	  This is the public interface routine for tuple insertion in GiSTs.
148  *	  It doesn't do any work; just locks the relation and passes the buck.
149  */
150 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)151 gistinsert(Relation r, Datum *values, bool *isnull,
152 		   ItemPointer ht_ctid, Relation heapRel,
153 		   IndexUniqueCheck checkUnique,
154 		   IndexInfo *indexInfo)
155 {
156 	GISTSTATE  *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
157 	IndexTuple	itup;
158 	MemoryContext oldCxt;
159 
160 	/* Initialize GISTSTATE cache if first call in this statement */
161 	if (giststate == NULL)
162 	{
163 		oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
164 		giststate = initGISTstate(r);
165 		giststate->tempCxt = createTempGistContext();
166 		indexInfo->ii_AmCache = (void *) giststate;
167 		MemoryContextSwitchTo(oldCxt);
168 	}
169 
170 	oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
171 
172 	itup = gistFormTuple(giststate, r,
173 						 values, isnull, true /* size is currently bogus */ );
174 	itup->t_tid = *ht_ctid;
175 
176 	gistdoinsert(r, itup, 0, giststate, heapRel);
177 
178 	/* cleanup */
179 	MemoryContextSwitchTo(oldCxt);
180 	MemoryContextReset(giststate->tempCxt);
181 
182 	return false;
183 }
184 
185 
186 /*
187  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
188  * at that offset is atomically removed along with inserting the new tuples.
189  * This is used to replace a tuple with a new one.
190  *
191  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
192  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
193  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
194  *
195  * If 'markfollowright' is true and the page is split, the left child is
196  * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
197  * index build, however, there is no concurrent access and the page splitting
198  * is done in a slightly simpler fashion, and false is passed.
199  *
200  * If there is not enough room on the page, it is split. All the split
201  * pages are kept pinned and locked and returned in *splitinfo, the caller
202  * is responsible for inserting the downlinks for them. However, if
203  * 'buffer' is the root page and it needs to be split, gistplacetopage()
204  * performs the split as one atomic operation, and *splitinfo is set to NIL.
205  * In that case, we continue to hold the root page locked, and the child
206  * pages are released; note that new tuple(s) are *not* on the root page
207  * but in one of the new child pages.
208  *
209  * If 'newblkno' is not NULL, returns the block number of page the first
210  * new/updated tuple was inserted to. Usually it's the given page, but could
211  * be its right sibling if the page was split.
212  *
213  * Returns 'true' if the page was split, 'false' otherwise.
214  */
215 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel)216 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
217 				Buffer buffer,
218 				IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
219 				BlockNumber *newblkno,
220 				Buffer leftchildbuf,
221 				List **splitinfo,
222 				bool markfollowright,
223 				Relation heapRel)
224 {
225 	BlockNumber blkno = BufferGetBlockNumber(buffer);
226 	Page		page = BufferGetPage(buffer);
227 	bool		is_leaf = (GistPageIsLeaf(page)) ? true : false;
228 	XLogRecPtr	recptr;
229 	int			i;
230 	bool		is_split;
231 
232 	/*
233 	 * Refuse to modify a page that's incompletely split. This should not
234 	 * happen because we finish any incomplete splits while we walk down the
235 	 * tree. However, it's remotely possible that another concurrent inserter
236 	 * splits a parent page, and errors out before completing the split. We
237 	 * will just throw an error in that case, and leave any split we had in
238 	 * progress unfinished too. The next insert that comes along will clean up
239 	 * the mess.
240 	 */
241 	if (GistFollowRight(page))
242 		elog(ERROR, "concurrent GiST page split was incomplete");
243 
244 	*splitinfo = NIL;
245 
246 	/*
247 	 * if isupdate, remove old key: This node's key has been modified, either
248 	 * because a child split occurred or because we needed to adjust our key
249 	 * for an insert in a child node. Therefore, remove the old version of
250 	 * this node's key.
251 	 *
252 	 * for WAL replay, in the non-split case we handle this by setting up a
253 	 * one-element todelete array; in the split case, it's handled implicitly
254 	 * because the tuple vector passed to gistSplit won't include this tuple.
255 	 */
256 	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
257 
258 	/*
259 	 * If leaf page is full, try at first to delete dead tuples. And then
260 	 * check again.
261 	 */
262 	if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
263 	{
264 		gistvacuumpage(rel, page, buffer, heapRel);
265 		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
266 	}
267 
268 	if (is_split)
269 	{
270 		/* no space for insertion */
271 		IndexTuple *itvec;
272 		int			tlen;
273 		SplitedPageLayout *dist = NULL,
274 				   *ptr;
275 		BlockNumber oldrlink = InvalidBlockNumber;
276 		GistNSN		oldnsn = 0;
277 		SplitedPageLayout rootpg;
278 		bool		is_rootsplit;
279 		int			npage;
280 
281 		is_rootsplit = (blkno == GIST_ROOT_BLKNO);
282 
283 		/*
284 		 * Form index tuples vector to split. If we're replacing an old tuple,
285 		 * remove the old version from the vector.
286 		 */
287 		itvec = gistextractpage(page, &tlen);
288 		if (OffsetNumberIsValid(oldoffnum))
289 		{
290 			/* on inner page we should remove old tuple */
291 			int			pos = oldoffnum - FirstOffsetNumber;
292 
293 			tlen--;
294 			if (pos != tlen)
295 				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
296 		}
297 		itvec = gistjoinvector(itvec, &tlen, itup, ntup);
298 		dist = gistSplit(rel, page, itvec, tlen, giststate);
299 
300 		/*
301 		 * Check that split didn't produce too many pages.
302 		 */
303 		npage = 0;
304 		for (ptr = dist; ptr; ptr = ptr->next)
305 			npage++;
306 		/* in a root split, we'll add one more page to the list below */
307 		if (is_rootsplit)
308 			npage++;
309 		if (npage > GIST_MAX_SPLIT_PAGES)
310 			elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
311 				 npage, GIST_MAX_SPLIT_PAGES);
312 
313 		/*
314 		 * Set up pages to work with. Allocate new buffers for all but the
315 		 * leftmost page. The original page becomes the new leftmost page, and
316 		 * is just replaced with the new contents.
317 		 *
318 		 * For a root-split, allocate new buffers for all child pages, the
319 		 * original page is overwritten with new root page containing
320 		 * downlinks to the new child pages.
321 		 */
322 		ptr = dist;
323 		if (!is_rootsplit)
324 		{
325 			/* save old rightlink and NSN */
326 			oldrlink = GistPageGetOpaque(page)->rightlink;
327 			oldnsn = GistPageGetNSN(page);
328 
329 			dist->buffer = buffer;
330 			dist->block.blkno = BufferGetBlockNumber(buffer);
331 			dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
332 
333 			/* clean all flags except F_LEAF */
334 			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
335 
336 			ptr = ptr->next;
337 		}
338 		for (; ptr; ptr = ptr->next)
339 		{
340 			/* Allocate new page */
341 			ptr->buffer = gistNewBuffer(rel);
342 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
343 			ptr->page = BufferGetPage(ptr->buffer);
344 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
345 			PredicateLockPageSplit(rel,
346 								   BufferGetBlockNumber(buffer),
347 								   BufferGetBlockNumber(ptr->buffer));
348 		}
349 
350 		/*
351 		 * Now that we know which blocks the new pages go to, set up downlink
352 		 * tuples to point to them.
353 		 */
354 		for (ptr = dist; ptr; ptr = ptr->next)
355 		{
356 			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
357 			GistTupleSetValid(ptr->itup);
358 		}
359 
360 		/*
361 		 * If this is a root split, we construct the new root page with the
362 		 * downlinks here directly, instead of requiring the caller to insert
363 		 * them. Add the new root page to the list along with the child pages.
364 		 */
365 		if (is_rootsplit)
366 		{
367 			IndexTuple *downlinks;
368 			int			ndownlinks = 0;
369 			int			i;
370 
371 			rootpg.buffer = buffer;
372 			rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
373 			GistPageGetOpaque(rootpg.page)->flags = 0;
374 
375 			/* Prepare a vector of all the downlinks */
376 			for (ptr = dist; ptr; ptr = ptr->next)
377 				ndownlinks++;
378 			downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
379 			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
380 				downlinks[i++] = ptr->itup;
381 
382 			rootpg.block.blkno = GIST_ROOT_BLKNO;
383 			rootpg.block.num = ndownlinks;
384 			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
385 										  &(rootpg.lenlist));
386 			rootpg.itup = NULL;
387 
388 			rootpg.next = dist;
389 			dist = &rootpg;
390 		}
391 		else
392 		{
393 			/* Prepare split-info to be returned to caller */
394 			for (ptr = dist; ptr; ptr = ptr->next)
395 			{
396 				GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
397 
398 				si->buf = ptr->buffer;
399 				si->downlink = ptr->itup;
400 				*splitinfo = lappend(*splitinfo, si);
401 			}
402 		}
403 
404 		/*
405 		 * Fill all pages. All the pages are new, ie. freshly allocated empty
406 		 * pages, or a temporary copy of the old page.
407 		 */
408 		for (ptr = dist; ptr; ptr = ptr->next)
409 		{
410 			char	   *data = (char *) (ptr->list);
411 
412 			for (i = 0; i < ptr->block.num; i++)
413 			{
414 				IndexTuple	thistup = (IndexTuple) data;
415 
416 				if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
417 					elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
418 
419 				/*
420 				 * If this is the first inserted/updated tuple, let the caller
421 				 * know which page it landed on.
422 				 */
423 				if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
424 					*newblkno = ptr->block.blkno;
425 
426 				data += IndexTupleSize(thistup);
427 			}
428 
429 			/* Set up rightlinks */
430 			if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
431 				GistPageGetOpaque(ptr->page)->rightlink =
432 					ptr->next->block.blkno;
433 			else
434 				GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
435 
436 			/*
437 			 * Mark the all but the right-most page with the follow-right
438 			 * flag. It will be cleared as soon as the downlink is inserted
439 			 * into the parent, but this ensures that if we error out before
440 			 * that, the index is still consistent. (in buffering build mode,
441 			 * any error will abort the index build anyway, so this is not
442 			 * needed.)
443 			 */
444 			if (ptr->next && !is_rootsplit && markfollowright)
445 				GistMarkFollowRight(ptr->page);
446 			else
447 				GistClearFollowRight(ptr->page);
448 
449 			/*
450 			 * Copy the NSN of the original page to all pages. The
451 			 * F_FOLLOW_RIGHT flags ensure that scans will follow the
452 			 * rightlinks until the downlinks are inserted.
453 			 */
454 			GistPageSetNSN(ptr->page, oldnsn);
455 		}
456 
457 		/*
458 		 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
459 		 * insertion for that. NB: The number of pages and data segments
460 		 * specified here must match the calculations in gistXLogSplit()!
461 		 */
462 		if (RelationNeedsWAL(rel))
463 			XLogEnsureRecordSpace(npage, 1 + npage * 2);
464 
465 		START_CRIT_SECTION();
466 
467 		/*
468 		 * Must mark buffers dirty before XLogInsert, even though we'll still
469 		 * be changing their opaque fields below.
470 		 */
471 		for (ptr = dist; ptr; ptr = ptr->next)
472 			MarkBufferDirty(ptr->buffer);
473 		if (BufferIsValid(leftchildbuf))
474 			MarkBufferDirty(leftchildbuf);
475 
476 		/*
477 		 * The first page in the chain was a temporary working copy meant to
478 		 * replace the old page. Copy it over the old page.
479 		 */
480 		PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
481 		dist->page = BufferGetPage(dist->buffer);
482 
483 		/* Write the WAL record */
484 		if (RelationNeedsWAL(rel))
485 			recptr = gistXLogSplit(is_leaf,
486 								   dist, oldrlink, oldnsn, leftchildbuf,
487 								   markfollowright);
488 		else
489 			recptr = gistGetFakeLSN(rel);
490 
491 		for (ptr = dist; ptr; ptr = ptr->next)
492 		{
493 			PageSetLSN(ptr->page, recptr);
494 		}
495 
496 		/*
497 		 * Return the new child buffers to the caller.
498 		 *
499 		 * If this was a root split, we've already inserted the downlink
500 		 * pointers, in the form of a new root page. Therefore we can release
501 		 * all the new buffers, and keep just the root page locked.
502 		 */
503 		if (is_rootsplit)
504 		{
505 			for (ptr = dist->next; ptr; ptr = ptr->next)
506 				UnlockReleaseBuffer(ptr->buffer);
507 		}
508 	}
509 	else
510 	{
511 		/*
512 		 * Enough space.  We always get here if ntup==0.
513 		 */
514 		START_CRIT_SECTION();
515 
516 		/*
517 		 * Delete old tuple if any, then insert new tuple(s) if any.  If
518 		 * possible, use the fast path of PageIndexTupleOverwrite.
519 		 */
520 		if (OffsetNumberIsValid(oldoffnum))
521 		{
522 			if (ntup == 1)
523 			{
524 				/* One-for-one replacement, so use PageIndexTupleOverwrite */
525 				if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
526 											 IndexTupleSize(*itup)))
527 					elog(ERROR, "failed to add item to index page in \"%s\"",
528 						 RelationGetRelationName(rel));
529 			}
530 			else
531 			{
532 				/* Delete old, then append new tuple(s) to page */
533 				PageIndexTupleDelete(page, oldoffnum);
534 				gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
535 			}
536 		}
537 		else
538 		{
539 			/* Just append new tuples at the end of the page */
540 			gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
541 		}
542 
543 		MarkBufferDirty(buffer);
544 
545 		if (BufferIsValid(leftchildbuf))
546 			MarkBufferDirty(leftchildbuf);
547 
548 		if (RelationNeedsWAL(rel))
549 		{
550 			OffsetNumber ndeloffs = 0,
551 						deloffs[1];
552 
553 			if (OffsetNumberIsValid(oldoffnum))
554 			{
555 				deloffs[0] = oldoffnum;
556 				ndeloffs = 1;
557 			}
558 
559 			recptr = gistXLogUpdate(buffer,
560 									deloffs, ndeloffs, itup, ntup,
561 									leftchildbuf, NULL);
562 
563 			PageSetLSN(page, recptr);
564 		}
565 		else
566 		{
567 			recptr = gistGetFakeLSN(rel);
568 			PageSetLSN(page, recptr);
569 		}
570 
571 		if (newblkno)
572 			*newblkno = blkno;
573 	}
574 
575 	/*
576 	 * If we inserted the downlink for a child page, set NSN and clear
577 	 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
578 	 * follow the rightlink if and only if they looked at the parent page
579 	 * before we inserted the downlink.
580 	 *
581 	 * Note that we do this *after* writing the WAL record. That means that
582 	 * the possible full page image in the WAL record does not include these
583 	 * changes, and they must be replayed even if the page is restored from
584 	 * the full page image. There's a chicken-and-egg problem: if we updated
585 	 * the child pages first, we wouldn't know the recptr of the WAL record
586 	 * we're about to write.
587 	 */
588 	if (BufferIsValid(leftchildbuf))
589 	{
590 		Page		leftpg = BufferGetPage(leftchildbuf);
591 
592 		GistPageSetNSN(leftpg, recptr);
593 		GistClearFollowRight(leftpg);
594 
595 		PageSetLSN(leftpg, recptr);
596 	}
597 
598 	END_CRIT_SECTION();
599 
600 	return is_split;
601 }
602 
603 /*
604  * Workhouse routine for doing insertion into a GiST index. Note that
605  * this routine assumes it is invoked in a short-lived memory context,
606  * so it does not bother releasing palloc'd allocations.
607  */
608 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel)609 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
610 			 GISTSTATE *giststate, Relation heapRel)
611 {
612 	ItemId		iid;
613 	IndexTuple	idxtuple;
614 	GISTInsertStack firststack;
615 	GISTInsertStack *stack;
616 	GISTInsertState state;
617 	bool		xlocked = false;
618 
619 	memset(&state, 0, sizeof(GISTInsertState));
620 	state.freespace = freespace;
621 	state.r = r;
622 	state.heapRel = heapRel;
623 
624 	/* Start from the root */
625 	firststack.blkno = GIST_ROOT_BLKNO;
626 	firststack.lsn = 0;
627 	firststack.parent = NULL;
628 	firststack.downlinkoffnum = InvalidOffsetNumber;
629 	state.stack = stack = &firststack;
630 
631 	/*
632 	 * Walk down along the path of smallest penalty, updating the parent
633 	 * pointers with the key we're inserting as we go. If we crash in the
634 	 * middle, the tree is consistent, although the possible parent updates
635 	 * were a waste.
636 	 */
637 	for (;;)
638 	{
639 		if (XLogRecPtrIsInvalid(stack->lsn))
640 			stack->buffer = ReadBuffer(state.r, stack->blkno);
641 
642 		/*
643 		 * Be optimistic and grab shared lock first. Swap it for an exclusive
644 		 * lock later if we need to update the page.
645 		 */
646 		if (!xlocked)
647 		{
648 			LockBuffer(stack->buffer, GIST_SHARE);
649 			gistcheckpage(state.r, stack->buffer);
650 		}
651 
652 		stack->page = (Page) BufferGetPage(stack->buffer);
653 		stack->lsn = xlocked ?
654 			PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
655 		Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
656 
657 		/*
658 		 * If this page was split but the downlink was never inserted to the
659 		 * parent because the inserting backend crashed before doing that, fix
660 		 * that now.
661 		 */
662 		if (GistFollowRight(stack->page))
663 		{
664 			if (!xlocked)
665 			{
666 				LockBuffer(stack->buffer, GIST_UNLOCK);
667 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
668 				xlocked = true;
669 				/* someone might've completed the split when we unlocked */
670 				if (!GistFollowRight(stack->page))
671 					continue;
672 			}
673 			gistfixsplit(&state, giststate);
674 
675 			UnlockReleaseBuffer(stack->buffer);
676 			xlocked = false;
677 			state.stack = stack = stack->parent;
678 			continue;
679 		}
680 
681 		if (stack->blkno != GIST_ROOT_BLKNO &&
682 			stack->parent->lsn < GistPageGetNSN(stack->page))
683 		{
684 			/*
685 			 * Concurrent split detected. There's no guarantee that the
686 			 * downlink for this page is consistent with the tuple we're
687 			 * inserting anymore, so go back to parent and rechoose the best
688 			 * child.
689 			 */
690 			UnlockReleaseBuffer(stack->buffer);
691 			xlocked = false;
692 			state.stack = stack = stack->parent;
693 			continue;
694 		}
695 
696 		if (!GistPageIsLeaf(stack->page))
697 		{
698 			/*
699 			 * This is an internal page so continue to walk down the tree.
700 			 * Find the child node that has the minimum insertion penalty.
701 			 */
702 			BlockNumber childblkno;
703 			IndexTuple	newtup;
704 			GISTInsertStack *item;
705 			OffsetNumber downlinkoffnum;
706 
707 			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
708 			iid = PageGetItemId(stack->page, downlinkoffnum);
709 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
710 			childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
711 
712 			/*
713 			 * Check that it's not a leftover invalid tuple from pre-9.1
714 			 */
715 			if (GistTupleIsInvalid(idxtuple))
716 				ereport(ERROR,
717 						(errmsg("index \"%s\" contains an inner tuple marked as invalid",
718 								RelationGetRelationName(r)),
719 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
720 						 errhint("Please REINDEX it.")));
721 
722 			/*
723 			 * Check that the key representing the target child node is
724 			 * consistent with the key we're inserting. Update it if it's not.
725 			 */
726 			newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
727 			if (newtup)
728 			{
729 				/*
730 				 * Swap shared lock for an exclusive one. Beware, the page may
731 				 * change while we unlock/lock the page...
732 				 */
733 				if (!xlocked)
734 				{
735 					LockBuffer(stack->buffer, GIST_UNLOCK);
736 					LockBuffer(stack->buffer, GIST_EXCLUSIVE);
737 					xlocked = true;
738 					stack->page = (Page) BufferGetPage(stack->buffer);
739 
740 					if (PageGetLSN(stack->page) != stack->lsn)
741 					{
742 						/* the page was changed while we unlocked it, retry */
743 						continue;
744 					}
745 				}
746 
747 				/*
748 				 * Update the tuple.
749 				 *
750 				 * We still hold the lock after gistinserttuple(), but it
751 				 * might have to split the page to make the updated tuple fit.
752 				 * In that case the updated tuple might migrate to the other
753 				 * half of the split, so we have to go back to the parent and
754 				 * descend back to the half that's a better fit for the new
755 				 * tuple.
756 				 */
757 				if (gistinserttuple(&state, stack, giststate, newtup,
758 									downlinkoffnum))
759 				{
760 					/*
761 					 * If this was a root split, the root page continues to be
762 					 * the parent and the updated tuple went to one of the
763 					 * child pages, so we just need to retry from the root
764 					 * page.
765 					 */
766 					if (stack->blkno != GIST_ROOT_BLKNO)
767 					{
768 						UnlockReleaseBuffer(stack->buffer);
769 						xlocked = false;
770 						state.stack = stack = stack->parent;
771 					}
772 					continue;
773 				}
774 			}
775 			LockBuffer(stack->buffer, GIST_UNLOCK);
776 			xlocked = false;
777 
778 			/* descend to the chosen child */
779 			item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
780 			item->blkno = childblkno;
781 			item->parent = stack;
782 			item->downlinkoffnum = downlinkoffnum;
783 			state.stack = stack = item;
784 		}
785 		else
786 		{
787 			/*
788 			 * Leaf page. Insert the new key. We've already updated all the
789 			 * parents on the way down, but we might have to split the page if
790 			 * it doesn't fit. gistinserthere() will take care of that.
791 			 */
792 
793 			/*
794 			 * Swap shared lock for an exclusive one. Be careful, the page may
795 			 * change while we unlock/lock the page...
796 			 */
797 			if (!xlocked)
798 			{
799 				LockBuffer(stack->buffer, GIST_UNLOCK);
800 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
801 				xlocked = true;
802 				stack->page = (Page) BufferGetPage(stack->buffer);
803 				stack->lsn = PageGetLSN(stack->page);
804 
805 				if (stack->blkno == GIST_ROOT_BLKNO)
806 				{
807 					/*
808 					 * the only page that can become inner instead of leaf is
809 					 * the root page, so for root we should recheck it
810 					 */
811 					if (!GistPageIsLeaf(stack->page))
812 					{
813 						/*
814 						 * very rare situation: during unlock/lock index with
815 						 * number of pages = 1 was increased
816 						 */
817 						LockBuffer(stack->buffer, GIST_UNLOCK);
818 						xlocked = false;
819 						continue;
820 					}
821 
822 					/*
823 					 * we don't need to check root split, because checking
824 					 * leaf/inner is enough to recognize split for root
825 					 */
826 				}
827 				else if (GistFollowRight(stack->page) ||
828 						 stack->parent->lsn < GistPageGetNSN(stack->page))
829 				{
830 					/*
831 					 * The page was split while we momentarily unlocked the
832 					 * page. Go back to parent.
833 					 */
834 					UnlockReleaseBuffer(stack->buffer);
835 					xlocked = false;
836 					state.stack = stack = stack->parent;
837 					continue;
838 				}
839 			}
840 
841 			/* now state.stack->(page, buffer and blkno) points to leaf page */
842 
843 			gistinserttuple(&state, stack, giststate, itup,
844 							InvalidOffsetNumber);
845 			LockBuffer(stack->buffer, GIST_UNLOCK);
846 
847 			/* Release any pins we might still hold before exiting */
848 			for (; stack; stack = stack->parent)
849 				ReleaseBuffer(stack->buffer);
850 			break;
851 		}
852 	}
853 }
854 
855 /*
856  * Traverse the tree to find path from root page to specified "child" block.
857  *
858  * returns a new insertion stack, starting from the parent of "child", up
859  * to the root. *downlinkoffnum is set to the offset of the downlink in the
860  * direct parent of child.
861  *
862  * To prevent deadlocks, this should lock only one page at a time.
863  */
864 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)865 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
866 {
867 	Page		page;
868 	Buffer		buffer;
869 	OffsetNumber i,
870 				maxoff;
871 	ItemId		iid;
872 	IndexTuple	idxtuple;
873 	List	   *fifo;
874 	GISTInsertStack *top,
875 			   *ptr;
876 	BlockNumber blkno;
877 
878 	top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
879 	top->blkno = GIST_ROOT_BLKNO;
880 	top->downlinkoffnum = InvalidOffsetNumber;
881 
882 	fifo = list_make1(top);
883 	while (fifo != NIL)
884 	{
885 		/* Get next page to visit */
886 		top = linitial(fifo);
887 		fifo = list_delete_first(fifo);
888 
889 		buffer = ReadBuffer(r, top->blkno);
890 		LockBuffer(buffer, GIST_SHARE);
891 		gistcheckpage(r, buffer);
892 		page = (Page) BufferGetPage(buffer);
893 
894 		if (GistPageIsLeaf(page))
895 		{
896 			/*
897 			 * Because we scan the index top-down, all the rest of the pages
898 			 * in the queue must be leaf pages as well.
899 			 */
900 			UnlockReleaseBuffer(buffer);
901 			break;
902 		}
903 
904 		top->lsn = BufferGetLSNAtomic(buffer);
905 
906 		/*
907 		 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
908 		 * downlink. This should not normally happen..
909 		 */
910 		if (GistFollowRight(page))
911 			elog(ERROR, "concurrent GiST page split was incomplete");
912 
913 		if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
914 			GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
915 		{
916 			/*
917 			 * Page was split while we looked elsewhere. We didn't see the
918 			 * downlink to the right page when we scanned the parent, so add
919 			 * it to the queue now.
920 			 *
921 			 * Put the right page ahead of the queue, so that we visit it
922 			 * next. That's important, because if this is the lowest internal
923 			 * level, just above leaves, we might already have queued up some
924 			 * leaf pages, and we assume that there can't be any non-leaf
925 			 * pages behind leaf pages.
926 			 */
927 			ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
928 			ptr->blkno = GistPageGetOpaque(page)->rightlink;
929 			ptr->downlinkoffnum = InvalidOffsetNumber;
930 			ptr->parent = top->parent;
931 
932 			fifo = lcons(ptr, fifo);
933 		}
934 
935 		maxoff = PageGetMaxOffsetNumber(page);
936 
937 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
938 		{
939 			iid = PageGetItemId(page, i);
940 			idxtuple = (IndexTuple) PageGetItem(page, iid);
941 			blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
942 			if (blkno == child)
943 			{
944 				/* Found it! */
945 				UnlockReleaseBuffer(buffer);
946 				*downlinkoffnum = i;
947 				return top;
948 			}
949 			else
950 			{
951 				/* Append this child to the list of pages to visit later */
952 				ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
953 				ptr->blkno = blkno;
954 				ptr->downlinkoffnum = i;
955 				ptr->parent = top;
956 
957 				fifo = lappend(fifo, ptr);
958 			}
959 		}
960 
961 		UnlockReleaseBuffer(buffer);
962 	}
963 
964 	elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
965 		 RelationGetRelationName(r), child);
966 	return NULL;				/* keep compiler quiet */
967 }
968 
969 /*
970  * Updates the stack so that child->parent is the correct parent of the
971  * child. child->parent must be exclusively locked on entry, and will
972  * remain so at exit, but it might not be the same page anymore.
973  */
974 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)975 gistFindCorrectParent(Relation r, GISTInsertStack *child)
976 {
977 	GISTInsertStack *parent = child->parent;
978 
979 	gistcheckpage(r, parent->buffer);
980 	parent->page = (Page) BufferGetPage(parent->buffer);
981 
982 	/* here we don't need to distinguish between split and page update */
983 	if (child->downlinkoffnum == InvalidOffsetNumber ||
984 		parent->lsn != PageGetLSN(parent->page))
985 	{
986 		/* parent is changed, look child in right links until found */
987 		OffsetNumber i,
988 					maxoff;
989 		ItemId		iid;
990 		IndexTuple	idxtuple;
991 		GISTInsertStack *ptr;
992 
993 		while (true)
994 		{
995 			maxoff = PageGetMaxOffsetNumber(parent->page);
996 			for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
997 			{
998 				iid = PageGetItemId(parent->page, i);
999 				idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1000 				if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1001 				{
1002 					/* yes!!, found */
1003 					child->downlinkoffnum = i;
1004 					return;
1005 				}
1006 			}
1007 
1008 			parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1009 			UnlockReleaseBuffer(parent->buffer);
1010 			if (parent->blkno == InvalidBlockNumber)
1011 			{
1012 				/*
1013 				 * End of chain and still didn't find parent. It's a very-very
1014 				 * rare situation when root splited.
1015 				 */
1016 				break;
1017 			}
1018 			parent->buffer = ReadBuffer(r, parent->blkno);
1019 			LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1020 			gistcheckpage(r, parent->buffer);
1021 			parent->page = (Page) BufferGetPage(parent->buffer);
1022 		}
1023 
1024 		/*
1025 		 * awful!!, we need search tree to find parent ... , but before we
1026 		 * should release all old parent
1027 		 */
1028 
1029 		ptr = child->parent->parent;	/* child->parent already released
1030 										 * above */
1031 		while (ptr)
1032 		{
1033 			ReleaseBuffer(ptr->buffer);
1034 			ptr = ptr->parent;
1035 		}
1036 
1037 		/* ok, find new path */
1038 		ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1039 
1040 		/* read all buffers as expected by caller */
1041 		/* note we don't lock them or gistcheckpage them here! */
1042 		while (ptr)
1043 		{
1044 			ptr->buffer = ReadBuffer(r, ptr->blkno);
1045 			ptr->page = (Page) BufferGetPage(ptr->buffer);
1046 			ptr = ptr->parent;
1047 		}
1048 
1049 		/* install new chain of parents to stack */
1050 		child->parent = parent;
1051 
1052 		/* make recursive call to normal processing */
1053 		LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1054 		gistFindCorrectParent(r, child);
1055 	}
1056 
1057 	return;
1058 }
1059 
1060 /*
1061  * Form a downlink pointer for the page in 'buf'.
1062  */
1063 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1064 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1065 				 GISTInsertStack *stack)
1066 {
1067 	Page		page = BufferGetPage(buf);
1068 	OffsetNumber maxoff;
1069 	OffsetNumber offset;
1070 	IndexTuple	downlink = NULL;
1071 
1072 	maxoff = PageGetMaxOffsetNumber(page);
1073 	for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1074 	{
1075 		IndexTuple	ituple = (IndexTuple)
1076 		PageGetItem(page, PageGetItemId(page, offset));
1077 
1078 		if (downlink == NULL)
1079 			downlink = CopyIndexTuple(ituple);
1080 		else
1081 		{
1082 			IndexTuple	newdownlink;
1083 
1084 			newdownlink = gistgetadjusted(rel, downlink, ituple,
1085 										  giststate);
1086 			if (newdownlink)
1087 				downlink = newdownlink;
1088 		}
1089 	}
1090 
1091 	/*
1092 	 * If the page is completely empty, we can't form a meaningful downlink
1093 	 * for it. But we have to insert a downlink for the page. Any key will do,
1094 	 * as long as its consistent with the downlink of parent page, so that we
1095 	 * can legally insert it to the parent. A minimal one that matches as few
1096 	 * scans as possible would be best, to keep scans from doing useless work,
1097 	 * but we don't know how to construct that. So we just use the downlink of
1098 	 * the original page that was split - that's as far from optimal as it can
1099 	 * get but will do..
1100 	 */
1101 	if (!downlink)
1102 	{
1103 		ItemId		iid;
1104 
1105 		LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1106 		gistFindCorrectParent(rel, stack);
1107 		iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1108 		downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1109 		downlink = CopyIndexTuple(downlink);
1110 		LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1111 	}
1112 
1113 	ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1114 	GistTupleSetValid(downlink);
1115 
1116 	return downlink;
1117 }
1118 
1119 
1120 /*
1121  * Complete the incomplete split of state->stack->page.
1122  */
1123 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1124 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1125 {
1126 	GISTInsertStack *stack = state->stack;
1127 	Buffer		buf;
1128 	Page		page;
1129 	List	   *splitinfo = NIL;
1130 
1131 	elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1132 		 RelationGetRelationName(state->r), stack->blkno);
1133 
1134 	Assert(GistFollowRight(stack->page));
1135 	Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1136 
1137 	buf = stack->buffer;
1138 
1139 	/*
1140 	 * Read the chain of split pages, following the rightlinks. Construct a
1141 	 * downlink tuple for each page.
1142 	 */
1143 	for (;;)
1144 	{
1145 		GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1146 		IndexTuple	downlink;
1147 
1148 		page = BufferGetPage(buf);
1149 
1150 		/* Form the new downlink tuples to insert to parent */
1151 		downlink = gistformdownlink(state->r, buf, giststate, stack);
1152 
1153 		si->buf = buf;
1154 		si->downlink = downlink;
1155 
1156 		splitinfo = lappend(splitinfo, si);
1157 
1158 		if (GistFollowRight(page))
1159 		{
1160 			/* lock next page */
1161 			buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1162 			LockBuffer(buf, GIST_EXCLUSIVE);
1163 		}
1164 		else
1165 			break;
1166 	}
1167 
1168 	/* Insert the downlinks */
1169 	gistfinishsplit(state, stack, giststate, splitinfo, false);
1170 }
1171 
1172 /*
1173  * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1174  * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1175  * 'stack' represents the path from the root to the page being updated.
1176  *
1177  * The caller must hold an exclusive lock on stack->buffer.  The lock is still
1178  * held on return, but the page might not contain the inserted tuple if the
1179  * page was split. The function returns true if the page was split, false
1180  * otherwise.
1181  */
1182 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1183 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1184 				GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1185 {
1186 	return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1187 							InvalidBuffer, InvalidBuffer, false, false);
1188 }
1189 
1190 /* ----------------
1191  * An extended workhorse version of gistinserttuple(). This version allows
1192  * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1193  * This is used to recursively update the downlinks in the parent when a page
1194  * is split.
1195  *
1196  * If leftchild and rightchild are valid, we're inserting/replacing the
1197  * downlink for rightchild, and leftchild is its left sibling. We clear the
1198  * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1199  * insertion of the downlink.
1200  *
1201  * To avoid holding locks for longer than necessary, when recursing up the
1202  * tree to update the parents, the locking is a bit peculiar here. On entry,
1203  * the caller must hold an exclusive lock on stack->buffer, as well as
1204  * leftchild and rightchild if given. On return:
1205  *
1206  *	- Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1207  *	  always kept pinned, however.
1208  *	- Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1209  *	  is kept pinned.
1210  *	- Lock and pin on 'rightchild' are always released.
1211  *
1212  * Returns 'true' if the page had to be split. Note that if the page was
1213  * split, the inserted/updated tuples might've been inserted to a right
1214  * sibling of stack->buffer instead of stack->buffer itself.
1215  */
1216 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1217 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1218 				 GISTSTATE *giststate,
1219 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1220 				 Buffer leftchild, Buffer rightchild,
1221 				 bool unlockbuf, bool unlockleftchild)
1222 {
1223 	List	   *splitinfo;
1224 	bool		is_split;
1225 
1226 	/*
1227 	 * Check for any rw conflicts (in serializable isolation level) just
1228 	 * before we intend to modify the page
1229 	 */
1230 	CheckForSerializableConflictIn(state->r, NULL, stack->buffer);
1231 
1232 	/* Insert the tuple(s) to the page, splitting the page if necessary */
1233 	is_split = gistplacetopage(state->r, state->freespace, giststate,
1234 							   stack->buffer,
1235 							   tuples, ntup,
1236 							   oldoffnum, NULL,
1237 							   leftchild,
1238 							   &splitinfo,
1239 							   true,
1240 							   state->heapRel);
1241 
1242 	/*
1243 	 * Before recursing up in case the page was split, release locks on the
1244 	 * child pages. We don't need to keep them locked when updating the
1245 	 * parent.
1246 	 */
1247 	if (BufferIsValid(rightchild))
1248 		UnlockReleaseBuffer(rightchild);
1249 	if (BufferIsValid(leftchild) && unlockleftchild)
1250 		LockBuffer(leftchild, GIST_UNLOCK);
1251 
1252 	/*
1253 	 * If we had to split, insert/update the downlinks in the parent. If the
1254 	 * caller requested us to release the lock on stack->buffer, tell
1255 	 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1256 	 * didn't have to split, release it ourselves.
1257 	 */
1258 	if (splitinfo)
1259 		gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1260 	else if (unlockbuf)
1261 		LockBuffer(stack->buffer, GIST_UNLOCK);
1262 
1263 	return is_split;
1264 }
1265 
1266 /*
1267  * Finish an incomplete split by inserting/updating the downlinks in parent
1268  * page. 'splitinfo' contains all the child pages involved in the split,
1269  * from left-to-right.
1270  *
1271  * On entry, the caller must hold a lock on stack->buffer and all the child
1272  * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1273  * released on return. The child pages are always unlocked and unpinned.
1274  */
1275 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1276 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1277 				GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1278 {
1279 	ListCell   *lc;
1280 	List	   *reversed;
1281 	GISTPageSplitInfo *right;
1282 	GISTPageSplitInfo *left;
1283 	IndexTuple	tuples[2];
1284 
1285 	/* A split always contains at least two halves */
1286 	Assert(list_length(splitinfo) >= 2);
1287 
1288 	/*
1289 	 * We need to insert downlinks for each new page, and update the downlink
1290 	 * for the original (leftmost) page in the split. Begin at the rightmost
1291 	 * page, inserting one downlink at a time until there's only two pages
1292 	 * left. Finally insert the downlink for the last new page and update the
1293 	 * downlink for the original page as one operation.
1294 	 */
1295 
1296 	/* for convenience, create a copy of the list in reverse order */
1297 	reversed = NIL;
1298 	foreach(lc, splitinfo)
1299 	{
1300 		reversed = lcons(lfirst(lc), reversed);
1301 	}
1302 
1303 	LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1304 	gistFindCorrectParent(state->r, stack);
1305 
1306 	/*
1307 	 * insert downlinks for the siblings from right to left, until there are
1308 	 * only two siblings left.
1309 	 */
1310 	while (list_length(reversed) > 2)
1311 	{
1312 		right = (GISTPageSplitInfo *) linitial(reversed);
1313 		left = (GISTPageSplitInfo *) lsecond(reversed);
1314 
1315 		if (gistinserttuples(state, stack->parent, giststate,
1316 							 &right->downlink, 1,
1317 							 InvalidOffsetNumber,
1318 							 left->buf, right->buf, false, false))
1319 		{
1320 			/*
1321 			 * If the parent page was split, need to relocate the original
1322 			 * parent pointer.
1323 			 */
1324 			gistFindCorrectParent(state->r, stack);
1325 		}
1326 		/* gistinserttuples() released the lock on right->buf. */
1327 		reversed = list_delete_first(reversed);
1328 	}
1329 
1330 	right = (GISTPageSplitInfo *) linitial(reversed);
1331 	left = (GISTPageSplitInfo *) lsecond(reversed);
1332 
1333 	/*
1334 	 * Finally insert downlink for the remaining right page and update the
1335 	 * downlink for the original page to not contain the tuples that were
1336 	 * moved to the new pages.
1337 	 */
1338 	tuples[0] = left->downlink;
1339 	tuples[1] = right->downlink;
1340 	gistinserttuples(state, stack->parent, giststate,
1341 					 tuples, 2,
1342 					 stack->downlinkoffnum,
1343 					 left->buf, right->buf,
1344 					 true,		/* Unlock parent */
1345 					 unlockbuf	/* Unlock stack->buffer if caller wants that */
1346 		);
1347 	Assert(left->buf == stack->buffer);
1348 }
1349 
1350 /*
1351  * gistSplit -- split a page in the tree and fill struct
1352  * used for XLOG and real writes buffers. Function is recursive, ie
1353  * it will split page until keys will fit in every page.
1354  */
1355 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1356 gistSplit(Relation r,
1357 		  Page page,
1358 		  IndexTuple *itup,		/* contains compressed entry */
1359 		  int len,
1360 		  GISTSTATE *giststate)
1361 {
1362 	IndexTuple *lvectup,
1363 			   *rvectup;
1364 	GistSplitVector v;
1365 	int			i;
1366 	SplitedPageLayout *res = NULL;
1367 
1368 	/* this should never recurse very deeply, but better safe than sorry */
1369 	check_stack_depth();
1370 
1371 	/* there's no point in splitting an empty page */
1372 	Assert(len > 0);
1373 
1374 	/*
1375 	 * If a single tuple doesn't fit on a page, no amount of splitting will
1376 	 * help.
1377 	 */
1378 	if (len == 1)
1379 		ereport(ERROR,
1380 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1381 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1382 						IndexTupleSize(itup[0]), GiSTPageSize,
1383 						RelationGetRelationName(r))));
1384 
1385 	memset(v.spl_lisnull, true, sizeof(bool) * giststate->tupdesc->natts);
1386 	memset(v.spl_risnull, true, sizeof(bool) * giststate->tupdesc->natts);
1387 	gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1388 
1389 	/* form left and right vector */
1390 	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1391 	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1392 
1393 	for (i = 0; i < v.splitVector.spl_nleft; i++)
1394 		lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1395 
1396 	for (i = 0; i < v.splitVector.spl_nright; i++)
1397 		rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1398 
1399 	/* finalize splitting (may need another split) */
1400 	if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1401 	{
1402 		res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1403 	}
1404 	else
1405 	{
1406 		ROTATEDIST(res);
1407 		res->block.num = v.splitVector.spl_nright;
1408 		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1409 		res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1410 	}
1411 
1412 	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1413 	{
1414 		SplitedPageLayout *resptr,
1415 				   *subres;
1416 
1417 		resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1418 
1419 		/* install on list's tail */
1420 		while (resptr->next)
1421 			resptr = resptr->next;
1422 
1423 		resptr->next = res;
1424 		res = subres;
1425 	}
1426 	else
1427 	{
1428 		ROTATEDIST(res);
1429 		res->block.num = v.splitVector.spl_nleft;
1430 		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1431 		res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1432 	}
1433 
1434 	return res;
1435 }
1436 
1437 /*
1438  * Create a GISTSTATE and fill it with information about the index
1439  */
1440 GISTSTATE *
initGISTstate(Relation index)1441 initGISTstate(Relation index)
1442 {
1443 	GISTSTATE  *giststate;
1444 	MemoryContext scanCxt;
1445 	MemoryContext oldCxt;
1446 	int			i;
1447 
1448 	/* safety check to protect fixed-size arrays in GISTSTATE */
1449 	if (index->rd_att->natts > INDEX_MAX_KEYS)
1450 		elog(ERROR, "numberOfAttributes %d > %d",
1451 			 index->rd_att->natts, INDEX_MAX_KEYS);
1452 
1453 	/* Create the memory context that will hold the GISTSTATE */
1454 	scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1455 									"GiST scan context",
1456 									ALLOCSET_DEFAULT_SIZES);
1457 	oldCxt = MemoryContextSwitchTo(scanCxt);
1458 
1459 	/* Create and fill in the GISTSTATE */
1460 	giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1461 
1462 	giststate->scanCxt = scanCxt;
1463 	giststate->tempCxt = scanCxt;	/* caller must change this if needed */
1464 	giststate->tupdesc = index->rd_att;
1465 
1466 	for (i = 0; i < index->rd_att->natts; i++)
1467 	{
1468 		fmgr_info_copy(&(giststate->consistentFn[i]),
1469 					   index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1470 					   scanCxt);
1471 		fmgr_info_copy(&(giststate->unionFn[i]),
1472 					   index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1473 					   scanCxt);
1474 
1475 		/* opclasses are not required to provide a Compress method */
1476 		if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1477 			fmgr_info_copy(&(giststate->compressFn[i]),
1478 						   index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1479 						   scanCxt);
1480 		else
1481 			giststate->compressFn[i].fn_oid = InvalidOid;
1482 
1483 		/* opclasses are not required to provide a Decompress method */
1484 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1485 			fmgr_info_copy(&(giststate->decompressFn[i]),
1486 						   index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1487 						   scanCxt);
1488 		else
1489 			giststate->decompressFn[i].fn_oid = InvalidOid;
1490 
1491 		fmgr_info_copy(&(giststate->penaltyFn[i]),
1492 					   index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1493 					   scanCxt);
1494 		fmgr_info_copy(&(giststate->picksplitFn[i]),
1495 					   index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1496 					   scanCxt);
1497 		fmgr_info_copy(&(giststate->equalFn[i]),
1498 					   index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1499 					   scanCxt);
1500 
1501 		/* opclasses are not required to provide a Distance method */
1502 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1503 			fmgr_info_copy(&(giststate->distanceFn[i]),
1504 						   index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1505 						   scanCxt);
1506 		else
1507 			giststate->distanceFn[i].fn_oid = InvalidOid;
1508 
1509 		/* opclasses are not required to provide a Fetch method */
1510 		if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1511 			fmgr_info_copy(&(giststate->fetchFn[i]),
1512 						   index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1513 						   scanCxt);
1514 		else
1515 			giststate->fetchFn[i].fn_oid = InvalidOid;
1516 
1517 		/*
1518 		 * If the index column has a specified collation, we should honor that
1519 		 * while doing comparisons.  However, we may have a collatable storage
1520 		 * type for a noncollatable indexed data type.  If there's no index
1521 		 * collation then specify default collation in case the support
1522 		 * functions need collation.  This is harmless if the support
1523 		 * functions don't care about collation, so we just do it
1524 		 * unconditionally.  (We could alternatively call get_typcollation,
1525 		 * but that seems like expensive overkill --- there aren't going to be
1526 		 * any cases where a GiST storage type has a nondefault collation.)
1527 		 */
1528 		if (OidIsValid(index->rd_indcollation[i]))
1529 			giststate->supportCollation[i] = index->rd_indcollation[i];
1530 		else
1531 			giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1532 	}
1533 
1534 	MemoryContextSwitchTo(oldCxt);
1535 
1536 	return giststate;
1537 }
1538 
1539 void
freeGISTstate(GISTSTATE * giststate)1540 freeGISTstate(GISTSTATE *giststate)
1541 {
1542 	/* It's sufficient to delete the scanCxt */
1543 	MemoryContextDelete(giststate->scanCxt);
1544 }
1545 
1546 /*
1547  * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1548  * Function assumes that buffer is exclusively locked.
1549  */
1550 static void
gistvacuumpage(Relation rel,Page page,Buffer buffer,Relation heapRel)1551 gistvacuumpage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1552 {
1553 	OffsetNumber deletable[MaxIndexTuplesPerPage];
1554 	int			ndeletable = 0;
1555 	OffsetNumber offnum,
1556 				maxoff;
1557 
1558 	Assert(GistPageIsLeaf(page));
1559 
1560 	/*
1561 	 * Scan over all items to see which ones need to be deleted according to
1562 	 * LP_DEAD flags.
1563 	 */
1564 	maxoff = PageGetMaxOffsetNumber(page);
1565 	for (offnum = FirstOffsetNumber;
1566 		 offnum <= maxoff;
1567 		 offnum = OffsetNumberNext(offnum))
1568 	{
1569 		ItemId		itemId = PageGetItemId(page, offnum);
1570 
1571 		if (ItemIdIsDead(itemId))
1572 			deletable[ndeletable++] = offnum;
1573 	}
1574 
1575 	if (ndeletable > 0)
1576 	{
1577 		START_CRIT_SECTION();
1578 
1579 		PageIndexMultiDelete(page, deletable, ndeletable);
1580 
1581 		/*
1582 		 * Mark the page as not containing any LP_DEAD items.  This is not
1583 		 * certainly true (there might be some that have recently been marked,
1584 		 * but weren't included in our target-item list), but it will almost
1585 		 * always be true and it doesn't seem worth an additional page scan to
1586 		 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1587 		 */
1588 		GistClearPageHasGarbage(page);
1589 
1590 		MarkBufferDirty(buffer);
1591 
1592 		/* XLOG stuff */
1593 		if (RelationNeedsWAL(rel))
1594 		{
1595 			XLogRecPtr	recptr;
1596 
1597 			recptr = gistXLogUpdate(buffer,
1598 									deletable, ndeletable,
1599 									NULL, 0, InvalidBuffer,
1600 									&heapRel->rd_node);
1601 
1602 			PageSetLSN(page, recptr);
1603 		}
1604 		else
1605 			PageSetLSN(page, gistGetFakeLSN(rel));
1606 
1607 		END_CRIT_SECTION();
1608 	}
1609 
1610 	/*
1611 	 * Note: if we didn't find any LP_DEAD items, then the page's
1612 	 * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1613 	 * separate write to clear it, however.  We will clear it when we split
1614 	 * the page.
1615 	 */
1616 }
1617