1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *	  interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "utils/index_selfuncs.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24 
25 
26 /* non-export function prototypes */
27 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
28 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
29 			 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
30 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
31 				 GISTSTATE *giststate,
32 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
33 				 Buffer leftchild, Buffer rightchild,
34 				 bool unlockbuf, bool unlockleftchild);
35 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
36 				GISTSTATE *giststate, List *splitinfo, bool releasebuf);
37 static void gistvacuumpage(Relation rel, Page page, Buffer buffer,
38 			   Relation heapRel);
39 
40 
41 #define ROTATEDIST(d) do { \
42 	SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
43 	memset(tmp,0,sizeof(SplitedPageLayout)); \
44 	tmp->block.blkno = InvalidBlockNumber;	\
45 	tmp->buffer = InvalidBuffer;	\
46 	tmp->next = (d); \
47 	(d)=tmp; \
48 } while(0)
49 
50 
51 /*
52  * GiST handler function: return IndexAmRoutine with access method parameters
53  * and callbacks.
54  */
55 Datum
gisthandler(PG_FUNCTION_ARGS)56 gisthandler(PG_FUNCTION_ARGS)
57 {
58 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
59 
60 	amroutine->amstrategies = 0;
61 	amroutine->amsupport = GISTNProcs;
62 	amroutine->amcanorder = false;
63 	amroutine->amcanorderbyop = true;
64 	amroutine->amcanbackward = false;
65 	amroutine->amcanunique = false;
66 	amroutine->amcanmulticol = true;
67 	amroutine->amoptionalkey = true;
68 	amroutine->amsearcharray = false;
69 	amroutine->amsearchnulls = true;
70 	amroutine->amstorage = true;
71 	amroutine->amclusterable = true;
72 	amroutine->ampredlocks = false;
73 	amroutine->amkeytype = InvalidOid;
74 
75 	amroutine->ambuild = gistbuild;
76 	amroutine->ambuildempty = gistbuildempty;
77 	amroutine->aminsert = gistinsert;
78 	amroutine->ambulkdelete = gistbulkdelete;
79 	amroutine->amvacuumcleanup = gistvacuumcleanup;
80 	amroutine->amcanreturn = gistcanreturn;
81 	amroutine->amcostestimate = gistcostestimate;
82 	amroutine->amoptions = gistoptions;
83 	amroutine->amproperty = gistproperty;
84 	amroutine->amvalidate = gistvalidate;
85 	amroutine->ambeginscan = gistbeginscan;
86 	amroutine->amrescan = gistrescan;
87 	amroutine->amgettuple = gistgettuple;
88 	amroutine->amgetbitmap = gistgetbitmap;
89 	amroutine->amendscan = gistendscan;
90 	amroutine->ammarkpos = NULL;
91 	amroutine->amrestrpos = NULL;
92 
93 	PG_RETURN_POINTER(amroutine);
94 }
95 
96 /*
97  * Create and return a temporary memory context for use by GiST. We
98  * _always_ invoke user-provided methods in a temporary memory
99  * context, so that memory leaks in those functions cannot cause
100  * problems. Also, we use some additional temporary contexts in the
101  * GiST code itself, to avoid the need to do some awkward manual
102  * memory management.
103  */
104 MemoryContext
createTempGistContext(void)105 createTempGistContext(void)
106 {
107 	return AllocSetContextCreate(CurrentMemoryContext,
108 								 "GiST temporary context",
109 								 ALLOCSET_DEFAULT_SIZES);
110 }
111 
112 /*
113  *	gistbuildempty() -- build an empty gist index in the initialization fork
114  */
115 void
gistbuildempty(Relation index)116 gistbuildempty(Relation index)
117 {
118 	Buffer		buffer;
119 
120 	/* Initialize the root page */
121 	buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
122 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
123 
124 	/* Initialize and xlog buffer */
125 	START_CRIT_SECTION();
126 	GISTInitBuffer(buffer, F_LEAF);
127 	MarkBufferDirty(buffer);
128 	log_newpage_buffer(buffer, true);
129 	END_CRIT_SECTION();
130 
131 	/* Unlock and release the buffer */
132 	UnlockReleaseBuffer(buffer);
133 }
134 
135 /*
136  *	gistinsert -- wrapper for GiST tuple insertion.
137  *
138  *	  This is the public interface routine for tuple insertion in GiSTs.
139  *	  It doesn't do any work; just locks the relation and passes the buck.
140  */
141 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique)142 gistinsert(Relation r, Datum *values, bool *isnull,
143 		   ItemPointer ht_ctid, Relation heapRel,
144 		   IndexUniqueCheck checkUnique)
145 {
146 	IndexTuple	itup;
147 	GISTSTATE  *giststate;
148 	MemoryContext oldCxt;
149 
150 	giststate = initGISTstate(r);
151 
152 	/*
153 	 * We use the giststate's scan context as temp context too.  This means
154 	 * that any memory leaked by the support functions is not reclaimed until
155 	 * end of insert.  In most cases, we aren't going to call the support
156 	 * functions very many times before finishing the insert, so this seems
157 	 * cheaper than resetting a temp context for each function call.
158 	 */
159 	oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
160 
161 	itup = gistFormTuple(giststate, r,
162 						 values, isnull, true /* size is currently bogus */ );
163 	itup->t_tid = *ht_ctid;
164 
165 	gistdoinsert(r, itup, 0, giststate, heapRel);
166 
167 	/* cleanup */
168 	MemoryContextSwitchTo(oldCxt);
169 	freeGISTstate(giststate);
170 
171 	return false;
172 }
173 
174 
175 /*
176  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
177  * at that offset is atomically removed along with inserting the new tuples.
178  * This is used to replace a tuple with a new one.
179  *
180  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
181  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
182  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
183  *
184  * If 'markfollowright' is true and the page is split, the left child is
185  * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
186  * index build, however, there is no concurrent access and the page splitting
187  * is done in a slightly simpler fashion, and false is passed.
188  *
189  * If there is not enough room on the page, it is split. All the split
190  * pages are kept pinned and locked and returned in *splitinfo, the caller
191  * is responsible for inserting the downlinks for them. However, if
192  * 'buffer' is the root page and it needs to be split, gistplacetopage()
193  * performs the split as one atomic operation, and *splitinfo is set to NIL.
194  * In that case, we continue to hold the root page locked, and the child
195  * pages are released; note that new tuple(s) are *not* on the root page
196  * but in one of the new child pages.
197  *
198  * If 'newblkno' is not NULL, returns the block number of page the first
199  * new/updated tuple was inserted to. Usually it's the given page, but could
200  * be its right sibling if the page was split.
201  *
202  * Returns 'true' if the page was split, 'false' otherwise.
203  */
204 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel)205 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
206 				Buffer buffer,
207 				IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
208 				BlockNumber *newblkno,
209 				Buffer leftchildbuf,
210 				List **splitinfo,
211 				bool markfollowright,
212 				Relation heapRel)
213 {
214 	BlockNumber blkno = BufferGetBlockNumber(buffer);
215 	Page		page = BufferGetPage(buffer);
216 	bool		is_leaf = (GistPageIsLeaf(page)) ? true : false;
217 	XLogRecPtr	recptr;
218 	int			i;
219 	bool		is_split;
220 
221 	/*
222 	 * Refuse to modify a page that's incompletely split. This should not
223 	 * happen because we finish any incomplete splits while we walk down the
224 	 * tree. However, it's remotely possible that another concurrent inserter
225 	 * splits a parent page, and errors out before completing the split. We
226 	 * will just throw an error in that case, and leave any split we had in
227 	 * progress unfinished too. The next insert that comes along will clean up
228 	 * the mess.
229 	 */
230 	if (GistFollowRight(page))
231 		elog(ERROR, "concurrent GiST page split was incomplete");
232 
233 	*splitinfo = NIL;
234 
235 	/*
236 	 * if isupdate, remove old key: This node's key has been modified, either
237 	 * because a child split occurred or because we needed to adjust our key
238 	 * for an insert in a child node. Therefore, remove the old version of
239 	 * this node's key.
240 	 *
241 	 * for WAL replay, in the non-split case we handle this by setting up a
242 	 * one-element todelete array; in the split case, it's handled implicitly
243 	 * because the tuple vector passed to gistSplit won't include this tuple.
244 	 */
245 	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
246 
247 	/*
248 	 * If leaf page is full, try at first to delete dead tuples. And then
249 	 * check again.
250 	 */
251 	if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
252 	{
253 		gistvacuumpage(rel, page, buffer, heapRel);
254 		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
255 	}
256 
257 	if (is_split)
258 	{
259 		/* no space for insertion */
260 		IndexTuple *itvec;
261 		int			tlen;
262 		SplitedPageLayout *dist = NULL,
263 				   *ptr;
264 		BlockNumber oldrlink = InvalidBlockNumber;
265 		GistNSN		oldnsn = 0;
266 		SplitedPageLayout rootpg;
267 		bool		is_rootsplit;
268 		int			npage;
269 
270 		is_rootsplit = (blkno == GIST_ROOT_BLKNO);
271 
272 		/*
273 		 * Form index tuples vector to split. If we're replacing an old tuple,
274 		 * remove the old version from the vector.
275 		 */
276 		itvec = gistextractpage(page, &tlen);
277 		if (OffsetNumberIsValid(oldoffnum))
278 		{
279 			/* on inner page we should remove old tuple */
280 			int			pos = oldoffnum - FirstOffsetNumber;
281 
282 			tlen--;
283 			if (pos != tlen)
284 				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
285 		}
286 		itvec = gistjoinvector(itvec, &tlen, itup, ntup);
287 		dist = gistSplit(rel, page, itvec, tlen, giststate);
288 
289 		/*
290 		 * Check that split didn't produce too many pages.
291 		 */
292 		npage = 0;
293 		for (ptr = dist; ptr; ptr = ptr->next)
294 			npage++;
295 		/* in a root split, we'll add one more page to the list below */
296 		if (is_rootsplit)
297 			npage++;
298 		if (npage > GIST_MAX_SPLIT_PAGES)
299 			elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
300 				 npage, GIST_MAX_SPLIT_PAGES);
301 
302 		/*
303 		 * Set up pages to work with. Allocate new buffers for all but the
304 		 * leftmost page. The original page becomes the new leftmost page, and
305 		 * is just replaced with the new contents.
306 		 *
307 		 * For a root-split, allocate new buffers for all child pages, the
308 		 * original page is overwritten with new root page containing
309 		 * downlinks to the new child pages.
310 		 */
311 		ptr = dist;
312 		if (!is_rootsplit)
313 		{
314 			/* save old rightlink and NSN */
315 			oldrlink = GistPageGetOpaque(page)->rightlink;
316 			oldnsn = GistPageGetNSN(page);
317 
318 			dist->buffer = buffer;
319 			dist->block.blkno = BufferGetBlockNumber(buffer);
320 			dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
321 
322 			/* clean all flags except F_LEAF */
323 			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
324 
325 			ptr = ptr->next;
326 		}
327 		for (; ptr; ptr = ptr->next)
328 		{
329 			/* Allocate new page */
330 			ptr->buffer = gistNewBuffer(rel);
331 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
332 			ptr->page = BufferGetPage(ptr->buffer);
333 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
334 		}
335 
336 		/*
337 		 * Now that we know which blocks the new pages go to, set up downlink
338 		 * tuples to point to them.
339 		 */
340 		for (ptr = dist; ptr; ptr = ptr->next)
341 		{
342 			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
343 			GistTupleSetValid(ptr->itup);
344 		}
345 
346 		/*
347 		 * If this is a root split, we construct the new root page with the
348 		 * downlinks here directly, instead of requiring the caller to insert
349 		 * them. Add the new root page to the list along with the child pages.
350 		 */
351 		if (is_rootsplit)
352 		{
353 			IndexTuple *downlinks;
354 			int			ndownlinks = 0;
355 			int			i;
356 
357 			rootpg.buffer = buffer;
358 			rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
359 			GistPageGetOpaque(rootpg.page)->flags = 0;
360 
361 			/* Prepare a vector of all the downlinks */
362 			for (ptr = dist; ptr; ptr = ptr->next)
363 				ndownlinks++;
364 			downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
365 			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
366 				downlinks[i++] = ptr->itup;
367 
368 			rootpg.block.blkno = GIST_ROOT_BLKNO;
369 			rootpg.block.num = ndownlinks;
370 			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
371 										  &(rootpg.lenlist));
372 			rootpg.itup = NULL;
373 
374 			rootpg.next = dist;
375 			dist = &rootpg;
376 		}
377 		else
378 		{
379 			/* Prepare split-info to be returned to caller */
380 			for (ptr = dist; ptr; ptr = ptr->next)
381 			{
382 				GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
383 
384 				si->buf = ptr->buffer;
385 				si->downlink = ptr->itup;
386 				*splitinfo = lappend(*splitinfo, si);
387 			}
388 		}
389 
390 		/*
391 		 * Fill all pages. All the pages are new, ie. freshly allocated empty
392 		 * pages, or a temporary copy of the old page.
393 		 */
394 		for (ptr = dist; ptr; ptr = ptr->next)
395 		{
396 			char	   *data = (char *) (ptr->list);
397 
398 			for (i = 0; i < ptr->block.num; i++)
399 			{
400 				IndexTuple	thistup = (IndexTuple) data;
401 
402 				if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
403 					elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
404 
405 				/*
406 				 * If this is the first inserted/updated tuple, let the caller
407 				 * know which page it landed on.
408 				 */
409 				if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
410 					*newblkno = ptr->block.blkno;
411 
412 				data += IndexTupleSize(thistup);
413 			}
414 
415 			/* Set up rightlinks */
416 			if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
417 				GistPageGetOpaque(ptr->page)->rightlink =
418 					ptr->next->block.blkno;
419 			else
420 				GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
421 
422 			/*
423 			 * Mark the all but the right-most page with the follow-right
424 			 * flag. It will be cleared as soon as the downlink is inserted
425 			 * into the parent, but this ensures that if we error out before
426 			 * that, the index is still consistent. (in buffering build mode,
427 			 * any error will abort the index build anyway, so this is not
428 			 * needed.)
429 			 */
430 			if (ptr->next && !is_rootsplit && markfollowright)
431 				GistMarkFollowRight(ptr->page);
432 			else
433 				GistClearFollowRight(ptr->page);
434 
435 			/*
436 			 * Copy the NSN of the original page to all pages. The
437 			 * F_FOLLOW_RIGHT flags ensure that scans will follow the
438 			 * rightlinks until the downlinks are inserted.
439 			 */
440 			GistPageSetNSN(ptr->page, oldnsn);
441 		}
442 
443 		/*
444 		 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
445 		 * insertion for that. NB: The number of pages and data segments
446 		 * specified here must match the calculations in gistXLogSplit()!
447 		 */
448 		if (RelationNeedsWAL(rel))
449 			XLogEnsureRecordSpace(npage, 1 + npage * 2);
450 
451 		START_CRIT_SECTION();
452 
453 		/*
454 		 * Must mark buffers dirty before XLogInsert, even though we'll still
455 		 * be changing their opaque fields below.
456 		 */
457 		for (ptr = dist; ptr; ptr = ptr->next)
458 			MarkBufferDirty(ptr->buffer);
459 		if (BufferIsValid(leftchildbuf))
460 			MarkBufferDirty(leftchildbuf);
461 
462 		/*
463 		 * The first page in the chain was a temporary working copy meant to
464 		 * replace the old page. Copy it over the old page.
465 		 */
466 		PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
467 		dist->page = BufferGetPage(dist->buffer);
468 
469 		/* Write the WAL record */
470 		if (RelationNeedsWAL(rel))
471 			recptr = gistXLogSplit(is_leaf,
472 								   dist, oldrlink, oldnsn, leftchildbuf,
473 								   markfollowright);
474 		else
475 			recptr = gistGetFakeLSN(rel);
476 
477 		for (ptr = dist; ptr; ptr = ptr->next)
478 		{
479 			PageSetLSN(ptr->page, recptr);
480 		}
481 
482 		/*
483 		 * Return the new child buffers to the caller.
484 		 *
485 		 * If this was a root split, we've already inserted the downlink
486 		 * pointers, in the form of a new root page. Therefore we can release
487 		 * all the new buffers, and keep just the root page locked.
488 		 */
489 		if (is_rootsplit)
490 		{
491 			for (ptr = dist->next; ptr; ptr = ptr->next)
492 				UnlockReleaseBuffer(ptr->buffer);
493 		}
494 	}
495 	else
496 	{
497 		/*
498 		 * Enough space. We also get here if ntuples==0.
499 		 */
500 		START_CRIT_SECTION();
501 
502 		/*
503 		 * While we delete only one tuple at once we could mix calls
504 		 * PageIndexTupleDelete() here and PageIndexMultiDelete() in
505 		 * gistRedoPageUpdateRecord()
506 		 */
507 		if (OffsetNumberIsValid(oldoffnum))
508 			PageIndexTupleDelete(page, oldoffnum);
509 		gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
510 
511 		MarkBufferDirty(buffer);
512 
513 		if (BufferIsValid(leftchildbuf))
514 			MarkBufferDirty(leftchildbuf);
515 
516 		if (RelationNeedsWAL(rel))
517 		{
518 			OffsetNumber ndeloffs = 0,
519 						deloffs[1];
520 
521 			if (OffsetNumberIsValid(oldoffnum))
522 			{
523 				deloffs[0] = oldoffnum;
524 				ndeloffs = 1;
525 			}
526 
527 			recptr = gistXLogUpdate(buffer,
528 									deloffs, ndeloffs, itup, ntup,
529 									leftchildbuf, NULL);
530 
531 			PageSetLSN(page, recptr);
532 		}
533 		else
534 		{
535 			recptr = gistGetFakeLSN(rel);
536 			PageSetLSN(page, recptr);
537 		}
538 
539 		if (newblkno)
540 			*newblkno = blkno;
541 	}
542 
543 	/*
544 	 * If we inserted the downlink for a child page, set NSN and clear
545 	 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
546 	 * follow the rightlink if and only if they looked at the parent page
547 	 * before we inserted the downlink.
548 	 *
549 	 * Note that we do this *after* writing the WAL record. That means that
550 	 * the possible full page image in the WAL record does not include these
551 	 * changes, and they must be replayed even if the page is restored from
552 	 * the full page image. There's a chicken-and-egg problem: if we updated
553 	 * the child pages first, we wouldn't know the recptr of the WAL record
554 	 * we're about to write.
555 	 */
556 	if (BufferIsValid(leftchildbuf))
557 	{
558 		Page		leftpg = BufferGetPage(leftchildbuf);
559 
560 		GistPageSetNSN(leftpg, recptr);
561 		GistClearFollowRight(leftpg);
562 
563 		PageSetLSN(leftpg, recptr);
564 	}
565 
566 	END_CRIT_SECTION();
567 
568 	return is_split;
569 }
570 
571 /*
572  * Workhouse routine for doing insertion into a GiST index. Note that
573  * this routine assumes it is invoked in a short-lived memory context,
574  * so it does not bother releasing palloc'd allocations.
575  */
576 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel)577 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
578 			 GISTSTATE *giststate, Relation heapRel)
579 {
580 	ItemId		iid;
581 	IndexTuple	idxtuple;
582 	GISTInsertStack firststack;
583 	GISTInsertStack *stack;
584 	GISTInsertState state;
585 	bool		xlocked = false;
586 
587 	memset(&state, 0, sizeof(GISTInsertState));
588 	state.freespace = freespace;
589 	state.r = r;
590 	state.heapRel = heapRel;
591 
592 	/* Start from the root */
593 	firststack.blkno = GIST_ROOT_BLKNO;
594 	firststack.lsn = 0;
595 	firststack.parent = NULL;
596 	firststack.downlinkoffnum = InvalidOffsetNumber;
597 	state.stack = stack = &firststack;
598 
599 	/*
600 	 * Walk down along the path of smallest penalty, updating the parent
601 	 * pointers with the key we're inserting as we go. If we crash in the
602 	 * middle, the tree is consistent, although the possible parent updates
603 	 * were a waste.
604 	 */
605 	for (;;)
606 	{
607 		if (XLogRecPtrIsInvalid(stack->lsn))
608 			stack->buffer = ReadBuffer(state.r, stack->blkno);
609 
610 		/*
611 		 * Be optimistic and grab shared lock first. Swap it for an exclusive
612 		 * lock later if we need to update the page.
613 		 */
614 		if (!xlocked)
615 		{
616 			LockBuffer(stack->buffer, GIST_SHARE);
617 			gistcheckpage(state.r, stack->buffer);
618 		}
619 
620 		stack->page = (Page) BufferGetPage(stack->buffer);
621 		stack->lsn = xlocked ?
622 			PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
623 		Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
624 
625 		/*
626 		 * If this page was split but the downlink was never inserted to the
627 		 * parent because the inserting backend crashed before doing that, fix
628 		 * that now.
629 		 */
630 		if (GistFollowRight(stack->page))
631 		{
632 			if (!xlocked)
633 			{
634 				LockBuffer(stack->buffer, GIST_UNLOCK);
635 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
636 				xlocked = true;
637 				/* someone might've completed the split when we unlocked */
638 				if (!GistFollowRight(stack->page))
639 					continue;
640 			}
641 			gistfixsplit(&state, giststate);
642 
643 			UnlockReleaseBuffer(stack->buffer);
644 			xlocked = false;
645 			state.stack = stack = stack->parent;
646 			continue;
647 		}
648 
649 		if (stack->blkno != GIST_ROOT_BLKNO &&
650 			stack->parent->lsn < GistPageGetNSN(stack->page))
651 		{
652 			/*
653 			 * Concurrent split detected. There's no guarantee that the
654 			 * downlink for this page is consistent with the tuple we're
655 			 * inserting anymore, so go back to parent and rechoose the best
656 			 * child.
657 			 */
658 			UnlockReleaseBuffer(stack->buffer);
659 			xlocked = false;
660 			state.stack = stack = stack->parent;
661 			continue;
662 		}
663 
664 		if (!GistPageIsLeaf(stack->page))
665 		{
666 			/*
667 			 * This is an internal page so continue to walk down the tree.
668 			 * Find the child node that has the minimum insertion penalty.
669 			 */
670 			BlockNumber childblkno;
671 			IndexTuple	newtup;
672 			GISTInsertStack *item;
673 			OffsetNumber downlinkoffnum;
674 
675 			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
676 			iid = PageGetItemId(stack->page, downlinkoffnum);
677 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
678 			childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
679 
680 			/*
681 			 * Check that it's not a leftover invalid tuple from pre-9.1
682 			 */
683 			if (GistTupleIsInvalid(idxtuple))
684 				ereport(ERROR,
685 						(errmsg("index \"%s\" contains an inner tuple marked as invalid",
686 								RelationGetRelationName(r)),
687 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
688 						 errhint("Please REINDEX it.")));
689 
690 			/*
691 			 * Check that the key representing the target child node is
692 			 * consistent with the key we're inserting. Update it if it's not.
693 			 */
694 			newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
695 			if (newtup)
696 			{
697 				/*
698 				 * Swap shared lock for an exclusive one. Beware, the page may
699 				 * change while we unlock/lock the page...
700 				 */
701 				if (!xlocked)
702 				{
703 					LockBuffer(stack->buffer, GIST_UNLOCK);
704 					LockBuffer(stack->buffer, GIST_EXCLUSIVE);
705 					xlocked = true;
706 					stack->page = (Page) BufferGetPage(stack->buffer);
707 
708 					if (PageGetLSN(stack->page) != stack->lsn)
709 					{
710 						/* the page was changed while we unlocked it, retry */
711 						continue;
712 					}
713 				}
714 
715 				/*
716 				 * Update the tuple.
717 				 *
718 				 * We still hold the lock after gistinserttuple(), but it
719 				 * might have to split the page to make the updated tuple fit.
720 				 * In that case the updated tuple might migrate to the other
721 				 * half of the split, so we have to go back to the parent and
722 				 * descend back to the half that's a better fit for the new
723 				 * tuple.
724 				 */
725 				if (gistinserttuple(&state, stack, giststate, newtup,
726 									downlinkoffnum))
727 				{
728 					/*
729 					 * If this was a root split, the root page continues to be
730 					 * the parent and the updated tuple went to one of the
731 					 * child pages, so we just need to retry from the root
732 					 * page.
733 					 */
734 					if (stack->blkno != GIST_ROOT_BLKNO)
735 					{
736 						UnlockReleaseBuffer(stack->buffer);
737 						xlocked = false;
738 						state.stack = stack = stack->parent;
739 					}
740 					continue;
741 				}
742 			}
743 			LockBuffer(stack->buffer, GIST_UNLOCK);
744 			xlocked = false;
745 
746 			/* descend to the chosen child */
747 			item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
748 			item->blkno = childblkno;
749 			item->parent = stack;
750 			item->downlinkoffnum = downlinkoffnum;
751 			state.stack = stack = item;
752 		}
753 		else
754 		{
755 			/*
756 			 * Leaf page. Insert the new key. We've already updated all the
757 			 * parents on the way down, but we might have to split the page if
758 			 * it doesn't fit. gistinserthere() will take care of that.
759 			 */
760 
761 			/*
762 			 * Swap shared lock for an exclusive one. Be careful, the page may
763 			 * change while we unlock/lock the page...
764 			 */
765 			if (!xlocked)
766 			{
767 				LockBuffer(stack->buffer, GIST_UNLOCK);
768 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
769 				xlocked = true;
770 				stack->page = (Page) BufferGetPage(stack->buffer);
771 				stack->lsn = PageGetLSN(stack->page);
772 
773 				if (stack->blkno == GIST_ROOT_BLKNO)
774 				{
775 					/*
776 					 * the only page that can become inner instead of leaf is
777 					 * the root page, so for root we should recheck it
778 					 */
779 					if (!GistPageIsLeaf(stack->page))
780 					{
781 						/*
782 						 * very rare situation: during unlock/lock index with
783 						 * number of pages = 1 was increased
784 						 */
785 						LockBuffer(stack->buffer, GIST_UNLOCK);
786 						xlocked = false;
787 						continue;
788 					}
789 
790 					/*
791 					 * we don't need to check root split, because checking
792 					 * leaf/inner is enough to recognize split for root
793 					 */
794 				}
795 				else if (GistFollowRight(stack->page) ||
796 						 stack->parent->lsn < GistPageGetNSN(stack->page))
797 				{
798 					/*
799 					 * The page was split while we momentarily unlocked the
800 					 * page. Go back to parent.
801 					 */
802 					UnlockReleaseBuffer(stack->buffer);
803 					xlocked = false;
804 					state.stack = stack = stack->parent;
805 					continue;
806 				}
807 			}
808 
809 			/* now state.stack->(page, buffer and blkno) points to leaf page */
810 
811 			gistinserttuple(&state, stack, giststate, itup,
812 							InvalidOffsetNumber);
813 			LockBuffer(stack->buffer, GIST_UNLOCK);
814 
815 			/* Release any pins we might still hold before exiting */
816 			for (; stack; stack = stack->parent)
817 				ReleaseBuffer(stack->buffer);
818 			break;
819 		}
820 	}
821 }
822 
823 /*
824  * Traverse the tree to find path from root page to specified "child" block.
825  *
826  * returns a new insertion stack, starting from the parent of "child", up
827  * to the root. *downlinkoffnum is set to the offset of the downlink in the
828  * direct parent of child.
829  *
830  * To prevent deadlocks, this should lock only one page at a time.
831  */
832 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)833 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
834 {
835 	Page		page;
836 	Buffer		buffer;
837 	OffsetNumber i,
838 				maxoff;
839 	ItemId		iid;
840 	IndexTuple	idxtuple;
841 	List	   *fifo;
842 	GISTInsertStack *top,
843 			   *ptr;
844 	BlockNumber blkno;
845 
846 	top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
847 	top->blkno = GIST_ROOT_BLKNO;
848 	top->downlinkoffnum = InvalidOffsetNumber;
849 
850 	fifo = list_make1(top);
851 	while (fifo != NIL)
852 	{
853 		/* Get next page to visit */
854 		top = linitial(fifo);
855 		fifo = list_delete_first(fifo);
856 
857 		buffer = ReadBuffer(r, top->blkno);
858 		LockBuffer(buffer, GIST_SHARE);
859 		gistcheckpage(r, buffer);
860 		page = (Page) BufferGetPage(buffer);
861 
862 		if (GistPageIsLeaf(page))
863 		{
864 			/*
865 			 * Because we scan the index top-down, all the rest of the pages
866 			 * in the queue must be leaf pages as well.
867 			 */
868 			UnlockReleaseBuffer(buffer);
869 			break;
870 		}
871 
872 		top->lsn = BufferGetLSNAtomic(buffer);
873 
874 		/*
875 		 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
876 		 * downlink. This should not normally happen..
877 		 */
878 		if (GistFollowRight(page))
879 			elog(ERROR, "concurrent GiST page split was incomplete");
880 
881 		if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
882 			GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
883 		{
884 			/*
885 			 * Page was split while we looked elsewhere. We didn't see the
886 			 * downlink to the right page when we scanned the parent, so add
887 			 * it to the queue now.
888 			 *
889 			 * Put the right page ahead of the queue, so that we visit it
890 			 * next. That's important, because if this is the lowest internal
891 			 * level, just above leaves, we might already have queued up some
892 			 * leaf pages, and we assume that there can't be any non-leaf
893 			 * pages behind leaf pages.
894 			 */
895 			ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
896 			ptr->blkno = GistPageGetOpaque(page)->rightlink;
897 			ptr->downlinkoffnum = InvalidOffsetNumber;
898 			ptr->parent = top->parent;
899 
900 			fifo = lcons(ptr, fifo);
901 		}
902 
903 		maxoff = PageGetMaxOffsetNumber(page);
904 
905 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
906 		{
907 			iid = PageGetItemId(page, i);
908 			idxtuple = (IndexTuple) PageGetItem(page, iid);
909 			blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
910 			if (blkno == child)
911 			{
912 				/* Found it! */
913 				UnlockReleaseBuffer(buffer);
914 				*downlinkoffnum = i;
915 				return top;
916 			}
917 			else
918 			{
919 				/* Append this child to the list of pages to visit later */
920 				ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
921 				ptr->blkno = blkno;
922 				ptr->downlinkoffnum = i;
923 				ptr->parent = top;
924 
925 				fifo = lappend(fifo, ptr);
926 			}
927 		}
928 
929 		UnlockReleaseBuffer(buffer);
930 	}
931 
932 	elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
933 		 RelationGetRelationName(r), child);
934 	return NULL;				/* keep compiler quiet */
935 }
936 
937 /*
938  * Updates the stack so that child->parent is the correct parent of the
939  * child. child->parent must be exclusively locked on entry, and will
940  * remain so at exit, but it might not be the same page anymore.
941  */
942 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)943 gistFindCorrectParent(Relation r, GISTInsertStack *child)
944 {
945 	GISTInsertStack *parent = child->parent;
946 
947 	gistcheckpage(r, parent->buffer);
948 	parent->page = (Page) BufferGetPage(parent->buffer);
949 
950 	/* here we don't need to distinguish between split and page update */
951 	if (child->downlinkoffnum == InvalidOffsetNumber ||
952 		parent->lsn != PageGetLSN(parent->page))
953 	{
954 		/* parent is changed, look child in right links until found */
955 		OffsetNumber i,
956 					maxoff;
957 		ItemId		iid;
958 		IndexTuple	idxtuple;
959 		GISTInsertStack *ptr;
960 
961 		while (true)
962 		{
963 			maxoff = PageGetMaxOffsetNumber(parent->page);
964 			for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
965 			{
966 				iid = PageGetItemId(parent->page, i);
967 				idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
968 				if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
969 				{
970 					/* yes!!, found */
971 					child->downlinkoffnum = i;
972 					return;
973 				}
974 			}
975 
976 			parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
977 			UnlockReleaseBuffer(parent->buffer);
978 			if (parent->blkno == InvalidBlockNumber)
979 			{
980 				/*
981 				 * End of chain and still didn't find parent. It's a very-very
982 				 * rare situation when root splited.
983 				 */
984 				break;
985 			}
986 			parent->buffer = ReadBuffer(r, parent->blkno);
987 			LockBuffer(parent->buffer, GIST_EXCLUSIVE);
988 			gistcheckpage(r, parent->buffer);
989 			parent->page = (Page) BufferGetPage(parent->buffer);
990 		}
991 
992 		/*
993 		 * awful!!, we need search tree to find parent ... , but before we
994 		 * should release all old parent
995 		 */
996 
997 		ptr = child->parent->parent;	/* child->parent already released
998 										 * above */
999 		while (ptr)
1000 		{
1001 			ReleaseBuffer(ptr->buffer);
1002 			ptr = ptr->parent;
1003 		}
1004 
1005 		/* ok, find new path */
1006 		ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1007 
1008 		/* read all buffers as expected by caller */
1009 		/* note we don't lock them or gistcheckpage them here! */
1010 		while (ptr)
1011 		{
1012 			ptr->buffer = ReadBuffer(r, ptr->blkno);
1013 			ptr->page = (Page) BufferGetPage(ptr->buffer);
1014 			ptr = ptr->parent;
1015 		}
1016 
1017 		/* install new chain of parents to stack */
1018 		child->parent = parent;
1019 
1020 		/* make recursive call to normal processing */
1021 		LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1022 		gistFindCorrectParent(r, child);
1023 	}
1024 
1025 	return;
1026 }
1027 
1028 /*
1029  * Form a downlink pointer for the page in 'buf'.
1030  */
1031 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1032 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1033 				 GISTInsertStack *stack)
1034 {
1035 	Page		page = BufferGetPage(buf);
1036 	OffsetNumber maxoff;
1037 	OffsetNumber offset;
1038 	IndexTuple	downlink = NULL;
1039 
1040 	maxoff = PageGetMaxOffsetNumber(page);
1041 	for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1042 	{
1043 		IndexTuple	ituple = (IndexTuple)
1044 		PageGetItem(page, PageGetItemId(page, offset));
1045 
1046 		if (downlink == NULL)
1047 			downlink = CopyIndexTuple(ituple);
1048 		else
1049 		{
1050 			IndexTuple	newdownlink;
1051 
1052 			newdownlink = gistgetadjusted(rel, downlink, ituple,
1053 										  giststate);
1054 			if (newdownlink)
1055 				downlink = newdownlink;
1056 		}
1057 	}
1058 
1059 	/*
1060 	 * If the page is completely empty, we can't form a meaningful downlink
1061 	 * for it. But we have to insert a downlink for the page. Any key will do,
1062 	 * as long as its consistent with the downlink of parent page, so that we
1063 	 * can legally insert it to the parent. A minimal one that matches as few
1064 	 * scans as possible would be best, to keep scans from doing useless work,
1065 	 * but we don't know how to construct that. So we just use the downlink of
1066 	 * the original page that was split - that's as far from optimal as it can
1067 	 * get but will do..
1068 	 */
1069 	if (!downlink)
1070 	{
1071 		ItemId		iid;
1072 
1073 		LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1074 		gistFindCorrectParent(rel, stack);
1075 		iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1076 		downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1077 		downlink = CopyIndexTuple(downlink);
1078 		LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1079 	}
1080 
1081 	ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1082 	GistTupleSetValid(downlink);
1083 
1084 	return downlink;
1085 }
1086 
1087 
1088 /*
1089  * Complete the incomplete split of state->stack->page.
1090  */
1091 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1092 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1093 {
1094 	GISTInsertStack *stack = state->stack;
1095 	Buffer		buf;
1096 	Page		page;
1097 	List	   *splitinfo = NIL;
1098 
1099 	elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1100 		 RelationGetRelationName(state->r), stack->blkno);
1101 
1102 	Assert(GistFollowRight(stack->page));
1103 	Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1104 
1105 	buf = stack->buffer;
1106 
1107 	/*
1108 	 * Read the chain of split pages, following the rightlinks. Construct a
1109 	 * downlink tuple for each page.
1110 	 */
1111 	for (;;)
1112 	{
1113 		GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1114 		IndexTuple	downlink;
1115 
1116 		page = BufferGetPage(buf);
1117 
1118 		/* Form the new downlink tuples to insert to parent */
1119 		downlink = gistformdownlink(state->r, buf, giststate, stack);
1120 
1121 		si->buf = buf;
1122 		si->downlink = downlink;
1123 
1124 		splitinfo = lappend(splitinfo, si);
1125 
1126 		if (GistFollowRight(page))
1127 		{
1128 			/* lock next page */
1129 			buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1130 			LockBuffer(buf, GIST_EXCLUSIVE);
1131 		}
1132 		else
1133 			break;
1134 	}
1135 
1136 	/* Insert the downlinks */
1137 	gistfinishsplit(state, stack, giststate, splitinfo, false);
1138 }
1139 
1140 /*
1141  * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1142  * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1143  * 'stack' represents the path from the root to the page being updated.
1144  *
1145  * The caller must hold an exclusive lock on stack->buffer.  The lock is still
1146  * held on return, but the page might not contain the inserted tuple if the
1147  * page was split. The function returns true if the page was split, false
1148  * otherwise.
1149  */
1150 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1151 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1152 			  GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1153 {
1154 	return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1155 							InvalidBuffer, InvalidBuffer, false, false);
1156 }
1157 
1158 /* ----------------
1159  * An extended workhorse version of gistinserttuple(). This version allows
1160  * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1161  * This is used to recursively update the downlinks in the parent when a page
1162  * is split.
1163  *
1164  * If leftchild and rightchild are valid, we're inserting/replacing the
1165  * downlink for rightchild, and leftchild is its left sibling. We clear the
1166  * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1167  * insertion of the downlink.
1168  *
1169  * To avoid holding locks for longer than necessary, when recursing up the
1170  * tree to update the parents, the locking is a bit peculiar here. On entry,
1171  * the caller must hold an exclusive lock on stack->buffer, as well as
1172  * leftchild and rightchild if given. On return:
1173  *
1174  *	- Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1175  *	  always kept pinned, however.
1176  *	- Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1177  *	  is kept pinned.
1178  *	- Lock and pin on 'rightchild' are always released.
1179  *
1180  * Returns 'true' if the page had to be split. Note that if the page was
1181  * split, the inserted/updated tuples might've been inserted to a right
1182  * sibling of stack->buffer instead of stack->buffer itself.
1183  */
1184 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1185 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1186 				 GISTSTATE *giststate,
1187 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1188 				 Buffer leftchild, Buffer rightchild,
1189 				 bool unlockbuf, bool unlockleftchild)
1190 {
1191 	List	   *splitinfo;
1192 	bool		is_split;
1193 
1194 	/* Insert the tuple(s) to the page, splitting the page if necessary */
1195 	is_split = gistplacetopage(state->r, state->freespace, giststate,
1196 							   stack->buffer,
1197 							   tuples, ntup,
1198 							   oldoffnum, NULL,
1199 							   leftchild,
1200 							   &splitinfo,
1201 							   true,
1202 							   state->heapRel);
1203 
1204 	/*
1205 	 * Before recursing up in case the page was split, release locks on the
1206 	 * child pages. We don't need to keep them locked when updating the
1207 	 * parent.
1208 	 */
1209 	if (BufferIsValid(rightchild))
1210 		UnlockReleaseBuffer(rightchild);
1211 	if (BufferIsValid(leftchild) && unlockleftchild)
1212 		LockBuffer(leftchild, GIST_UNLOCK);
1213 
1214 	/*
1215 	 * If we had to split, insert/update the downlinks in the parent. If the
1216 	 * caller requested us to release the lock on stack->buffer, tell
1217 	 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1218 	 * didn't have to split, release it ourselves.
1219 	 */
1220 	if (splitinfo)
1221 		gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1222 	else if (unlockbuf)
1223 		LockBuffer(stack->buffer, GIST_UNLOCK);
1224 
1225 	return is_split;
1226 }
1227 
1228 /*
1229  * Finish an incomplete split by inserting/updating the downlinks in parent
1230  * page. 'splitinfo' contains all the child pages involved in the split,
1231  * from left-to-right.
1232  *
1233  * On entry, the caller must hold a lock on stack->buffer and all the child
1234  * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1235  * released on return. The child pages are always unlocked and unpinned.
1236  */
1237 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1238 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1239 				GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1240 {
1241 	ListCell   *lc;
1242 	List	   *reversed;
1243 	GISTPageSplitInfo *right;
1244 	GISTPageSplitInfo *left;
1245 	IndexTuple	tuples[2];
1246 
1247 	/* A split always contains at least two halves */
1248 	Assert(list_length(splitinfo) >= 2);
1249 
1250 	/*
1251 	 * We need to insert downlinks for each new page, and update the downlink
1252 	 * for the original (leftmost) page in the split. Begin at the rightmost
1253 	 * page, inserting one downlink at a time until there's only two pages
1254 	 * left. Finally insert the downlink for the last new page and update the
1255 	 * downlink for the original page as one operation.
1256 	 */
1257 
1258 	/* for convenience, create a copy of the list in reverse order */
1259 	reversed = NIL;
1260 	foreach(lc, splitinfo)
1261 	{
1262 		reversed = lcons(lfirst(lc), reversed);
1263 	}
1264 
1265 	LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1266 	gistFindCorrectParent(state->r, stack);
1267 
1268 	/*
1269 	 * insert downlinks for the siblings from right to left, until there are
1270 	 * only two siblings left.
1271 	 */
1272 	while (list_length(reversed) > 2)
1273 	{
1274 		right = (GISTPageSplitInfo *) linitial(reversed);
1275 		left = (GISTPageSplitInfo *) lsecond(reversed);
1276 
1277 		if (gistinserttuples(state, stack->parent, giststate,
1278 							 &right->downlink, 1,
1279 							 InvalidOffsetNumber,
1280 							 left->buf, right->buf, false, false))
1281 		{
1282 			/*
1283 			 * If the parent page was split, need to relocate the original
1284 			 * parent pointer.
1285 			 */
1286 			gistFindCorrectParent(state->r, stack);
1287 		}
1288 		/* gistinserttuples() released the lock on right->buf. */
1289 		reversed = list_delete_first(reversed);
1290 	}
1291 
1292 	right = (GISTPageSplitInfo *) linitial(reversed);
1293 	left = (GISTPageSplitInfo *) lsecond(reversed);
1294 
1295 	/*
1296 	 * Finally insert downlink for the remaining right page and update the
1297 	 * downlink for the original page to not contain the tuples that were
1298 	 * moved to the new pages.
1299 	 */
1300 	tuples[0] = left->downlink;
1301 	tuples[1] = right->downlink;
1302 	gistinserttuples(state, stack->parent, giststate,
1303 					 tuples, 2,
1304 					 stack->downlinkoffnum,
1305 					 left->buf, right->buf,
1306 					 true,		/* Unlock parent */
1307 					 unlockbuf	/* Unlock stack->buffer if caller wants that */
1308 		);
1309 	Assert(left->buf == stack->buffer);
1310 }
1311 
1312 /*
1313  * gistSplit -- split a page in the tree and fill struct
1314  * used for XLOG and real writes buffers. Function is recursive, ie
1315  * it will split page until keys will fit in every page.
1316  */
1317 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1318 gistSplit(Relation r,
1319 		  Page page,
1320 		  IndexTuple *itup,		/* contains compressed entry */
1321 		  int len,
1322 		  GISTSTATE *giststate)
1323 {
1324 	IndexTuple *lvectup,
1325 			   *rvectup;
1326 	GistSplitVector v;
1327 	int			i;
1328 	SplitedPageLayout *res = NULL;
1329 
1330 	/* this should never recurse very deeply, but better safe than sorry */
1331 	check_stack_depth();
1332 
1333 	/* there's no point in splitting an empty page */
1334 	Assert(len > 0);
1335 
1336 	/*
1337 	 * If a single tuple doesn't fit on a page, no amount of splitting will
1338 	 * help.
1339 	 */
1340 	if (len == 1)
1341 		ereport(ERROR,
1342 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1343 			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1344 				   IndexTupleSize(itup[0]), GiSTPageSize,
1345 				   RelationGetRelationName(r))));
1346 
1347 	memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1348 	memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1349 	gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1350 
1351 	/* form left and right vector */
1352 	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1353 	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1354 
1355 	for (i = 0; i < v.splitVector.spl_nleft; i++)
1356 		lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1357 
1358 	for (i = 0; i < v.splitVector.spl_nright; i++)
1359 		rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1360 
1361 	/* finalize splitting (may need another split) */
1362 	if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1363 	{
1364 		res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1365 	}
1366 	else
1367 	{
1368 		ROTATEDIST(res);
1369 		res->block.num = v.splitVector.spl_nright;
1370 		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1371 		res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1372 	}
1373 
1374 	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1375 	{
1376 		SplitedPageLayout *resptr,
1377 				   *subres;
1378 
1379 		resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1380 
1381 		/* install on list's tail */
1382 		while (resptr->next)
1383 			resptr = resptr->next;
1384 
1385 		resptr->next = res;
1386 		res = subres;
1387 	}
1388 	else
1389 	{
1390 		ROTATEDIST(res);
1391 		res->block.num = v.splitVector.spl_nleft;
1392 		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1393 		res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1394 	}
1395 
1396 	return res;
1397 }
1398 
1399 /*
1400  * Create a GISTSTATE and fill it with information about the index
1401  */
1402 GISTSTATE *
initGISTstate(Relation index)1403 initGISTstate(Relation index)
1404 {
1405 	GISTSTATE  *giststate;
1406 	MemoryContext scanCxt;
1407 	MemoryContext oldCxt;
1408 	int			i;
1409 
1410 	/* safety check to protect fixed-size arrays in GISTSTATE */
1411 	if (index->rd_att->natts > INDEX_MAX_KEYS)
1412 		elog(ERROR, "numberOfAttributes %d > %d",
1413 			 index->rd_att->natts, INDEX_MAX_KEYS);
1414 
1415 	/* Create the memory context that will hold the GISTSTATE */
1416 	scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1417 									"GiST scan context",
1418 									ALLOCSET_DEFAULT_SIZES);
1419 	oldCxt = MemoryContextSwitchTo(scanCxt);
1420 
1421 	/* Create and fill in the GISTSTATE */
1422 	giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1423 
1424 	giststate->scanCxt = scanCxt;
1425 	giststate->tempCxt = scanCxt;		/* caller must change this if needed */
1426 	giststate->tupdesc = index->rd_att;
1427 
1428 	for (i = 0; i < index->rd_att->natts; i++)
1429 	{
1430 		fmgr_info_copy(&(giststate->consistentFn[i]),
1431 					   index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1432 					   scanCxt);
1433 		fmgr_info_copy(&(giststate->unionFn[i]),
1434 					   index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1435 					   scanCxt);
1436 		fmgr_info_copy(&(giststate->compressFn[i]),
1437 					   index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1438 					   scanCxt);
1439 		fmgr_info_copy(&(giststate->decompressFn[i]),
1440 					   index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1441 					   scanCxt);
1442 		fmgr_info_copy(&(giststate->penaltyFn[i]),
1443 					   index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1444 					   scanCxt);
1445 		fmgr_info_copy(&(giststate->picksplitFn[i]),
1446 					   index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1447 					   scanCxt);
1448 		fmgr_info_copy(&(giststate->equalFn[i]),
1449 					   index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1450 					   scanCxt);
1451 		/* opclasses are not required to provide a Distance method */
1452 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1453 			fmgr_info_copy(&(giststate->distanceFn[i]),
1454 						 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1455 						   scanCxt);
1456 		else
1457 			giststate->distanceFn[i].fn_oid = InvalidOid;
1458 
1459 		/* opclasses are not required to provide a Fetch method */
1460 		if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1461 			fmgr_info_copy(&(giststate->fetchFn[i]),
1462 						   index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1463 						   scanCxt);
1464 		else
1465 			giststate->fetchFn[i].fn_oid = InvalidOid;
1466 
1467 		/*
1468 		 * If the index column has a specified collation, we should honor that
1469 		 * while doing comparisons.  However, we may have a collatable storage
1470 		 * type for a noncollatable indexed data type.  If there's no index
1471 		 * collation then specify default collation in case the support
1472 		 * functions need collation.  This is harmless if the support
1473 		 * functions don't care about collation, so we just do it
1474 		 * unconditionally.  (We could alternatively call get_typcollation,
1475 		 * but that seems like expensive overkill --- there aren't going to be
1476 		 * any cases where a GiST storage type has a nondefault collation.)
1477 		 */
1478 		if (OidIsValid(index->rd_indcollation[i]))
1479 			giststate->supportCollation[i] = index->rd_indcollation[i];
1480 		else
1481 			giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1482 	}
1483 
1484 	MemoryContextSwitchTo(oldCxt);
1485 
1486 	return giststate;
1487 }
1488 
1489 void
freeGISTstate(GISTSTATE * giststate)1490 freeGISTstate(GISTSTATE *giststate)
1491 {
1492 	/* It's sufficient to delete the scanCxt */
1493 	MemoryContextDelete(giststate->scanCxt);
1494 }
1495 
1496 /*
1497  * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1498  * Function assumes that buffer is exclusively locked.
1499  */
1500 static void
gistvacuumpage(Relation rel,Page page,Buffer buffer,Relation heapRel)1501 gistvacuumpage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1502 {
1503 	OffsetNumber deletable[MaxIndexTuplesPerPage];
1504 	int			ndeletable = 0;
1505 	OffsetNumber offnum,
1506 				maxoff;
1507 
1508 	Assert(GistPageIsLeaf(page));
1509 
1510 	/*
1511 	 * Scan over all items to see which ones need to be deleted according to
1512 	 * LP_DEAD flags.
1513 	 */
1514 	maxoff = PageGetMaxOffsetNumber(page);
1515 	for (offnum = FirstOffsetNumber;
1516 		 offnum <= maxoff;
1517 		 offnum = OffsetNumberNext(offnum))
1518 	{
1519 		ItemId		itemId = PageGetItemId(page, offnum);
1520 
1521 		if (ItemIdIsDead(itemId))
1522 			deletable[ndeletable++] = offnum;
1523 	}
1524 
1525 	if (ndeletable > 0)
1526 	{
1527 		START_CRIT_SECTION();
1528 
1529 		PageIndexMultiDelete(page, deletable, ndeletable);
1530 
1531 		/*
1532 		 * Mark the page as not containing any LP_DEAD items.  This is not
1533 		 * certainly true (there might be some that have recently been marked,
1534 		 * but weren't included in our target-item list), but it will almost
1535 		 * always be true and it doesn't seem worth an additional page scan to
1536 		 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1537 		 */
1538 		GistClearPageHasGarbage(page);
1539 
1540 		MarkBufferDirty(buffer);
1541 
1542 		/* XLOG stuff */
1543 		if (RelationNeedsWAL(rel))
1544 		{
1545 			XLogRecPtr	recptr;
1546 
1547 			recptr = gistXLogUpdate(buffer,
1548 									deletable, ndeletable,
1549 									NULL, 0, InvalidBuffer,
1550 									&heapRel->rd_node);
1551 
1552 			PageSetLSN(page, recptr);
1553 		}
1554 		else
1555 			PageSetLSN(page, gistGetFakeLSN(rel));
1556 
1557 		END_CRIT_SECTION();
1558 	}
1559 
1560 	/*
1561 	 * Note: if we didn't find any LP_DEAD items, then the page's
1562 	 * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1563 	 * separate write to clear it, however.  We will clear it when we split
1564 	 * the page.
1565 	 */
1566 }
1567