1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *	  interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "nodes/execnodes.h"
23 #include "storage/lmgr.h"
24 #include "storage/predicate.h"
25 #include "utils/builtins.h"
26 #include "utils/index_selfuncs.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 
30 /* non-export function prototypes */
31 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 							GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 							 GISTSTATE *giststate,
36 							 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 							 Buffer leftchild, Buffer rightchild,
38 							 bool unlockbuf, bool unlockleftchild);
39 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 							GISTSTATE *giststate, List *splitinfo, bool unlockbuf);
41 static void gistprunepage(Relation rel, Page page, Buffer buffer,
42 						  Relation heapRel);
43 
44 
45 #define ROTATEDIST(d) do { \
46 	SplitedPageLayout *tmp=(SplitedPageLayout*)palloc0(sizeof(SplitedPageLayout)); \
47 	tmp->block.blkno = InvalidBlockNumber;	\
48 	tmp->buffer = InvalidBuffer;	\
49 	tmp->next = (d); \
50 	(d)=tmp; \
51 } while(0)
52 
53 
54 /*
55  * GiST handler function: return IndexAmRoutine with access method parameters
56  * and callbacks.
57  */
58 Datum
gisthandler(PG_FUNCTION_ARGS)59 gisthandler(PG_FUNCTION_ARGS)
60 {
61 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
62 
63 	amroutine->amstrategies = 0;
64 	amroutine->amsupport = GISTNProcs;
65 	amroutine->amoptsprocnum = GIST_OPTIONS_PROC;
66 	amroutine->amcanorder = false;
67 	amroutine->amcanorderbyop = true;
68 	amroutine->amcanbackward = false;
69 	amroutine->amcanunique = false;
70 	amroutine->amcanmulticol = true;
71 	amroutine->amoptionalkey = true;
72 	amroutine->amsearcharray = false;
73 	amroutine->amsearchnulls = true;
74 	amroutine->amstorage = true;
75 	amroutine->amclusterable = true;
76 	amroutine->ampredlocks = true;
77 	amroutine->amcanparallel = false;
78 	amroutine->amcaninclude = true;
79 	amroutine->amusemaintenanceworkmem = false;
80 	amroutine->amparallelvacuumoptions =
81 		VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
82 	amroutine->amkeytype = InvalidOid;
83 
84 	amroutine->ambuild = gistbuild;
85 	amroutine->ambuildempty = gistbuildempty;
86 	amroutine->aminsert = gistinsert;
87 	amroutine->ambulkdelete = gistbulkdelete;
88 	amroutine->amvacuumcleanup = gistvacuumcleanup;
89 	amroutine->amcanreturn = gistcanreturn;
90 	amroutine->amcostestimate = gistcostestimate;
91 	amroutine->amoptions = gistoptions;
92 	amroutine->amproperty = gistproperty;
93 	amroutine->ambuildphasename = NULL;
94 	amroutine->amvalidate = gistvalidate;
95 	amroutine->ambeginscan = gistbeginscan;
96 	amroutine->amrescan = gistrescan;
97 	amroutine->amgettuple = gistgettuple;
98 	amroutine->amgetbitmap = gistgetbitmap;
99 	amroutine->amendscan = gistendscan;
100 	amroutine->ammarkpos = NULL;
101 	amroutine->amrestrpos = NULL;
102 	amroutine->amestimateparallelscan = NULL;
103 	amroutine->aminitparallelscan = NULL;
104 	amroutine->amparallelrescan = NULL;
105 
106 	PG_RETURN_POINTER(amroutine);
107 }
108 
109 /*
110  * Create and return a temporary memory context for use by GiST. We
111  * _always_ invoke user-provided methods in a temporary memory
112  * context, so that memory leaks in those functions cannot cause
113  * problems. Also, we use some additional temporary contexts in the
114  * GiST code itself, to avoid the need to do some awkward manual
115  * memory management.
116  */
117 MemoryContext
createTempGistContext(void)118 createTempGistContext(void)
119 {
120 	return AllocSetContextCreate(CurrentMemoryContext,
121 								 "GiST temporary context",
122 								 ALLOCSET_DEFAULT_SIZES);
123 }
124 
125 /*
126  *	gistbuildempty() -- build an empty gist index in the initialization fork
127  */
128 void
gistbuildempty(Relation index)129 gistbuildempty(Relation index)
130 {
131 	Buffer		buffer;
132 
133 	/* Initialize the root page */
134 	buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
135 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
136 
137 	/* Initialize and xlog buffer */
138 	START_CRIT_SECTION();
139 	GISTInitBuffer(buffer, F_LEAF);
140 	MarkBufferDirty(buffer);
141 	log_newpage_buffer(buffer, true);
142 	END_CRIT_SECTION();
143 
144 	/* Unlock and release the buffer */
145 	UnlockReleaseBuffer(buffer);
146 }
147 
148 /*
149  *	gistinsert -- wrapper for GiST tuple insertion.
150  *
151  *	  This is the public interface routine for tuple insertion in GiSTs.
152  *	  It doesn't do any work; just locks the relation and passes the buck.
153  */
154 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)155 gistinsert(Relation r, Datum *values, bool *isnull,
156 		   ItemPointer ht_ctid, Relation heapRel,
157 		   IndexUniqueCheck checkUnique,
158 		   IndexInfo *indexInfo)
159 {
160 	GISTSTATE  *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
161 	IndexTuple	itup;
162 	MemoryContext oldCxt;
163 
164 	/* Initialize GISTSTATE cache if first call in this statement */
165 	if (giststate == NULL)
166 	{
167 		oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
168 		giststate = initGISTstate(r);
169 		giststate->tempCxt = createTempGistContext();
170 		indexInfo->ii_AmCache = (void *) giststate;
171 		MemoryContextSwitchTo(oldCxt);
172 	}
173 
174 	oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
175 
176 	itup = gistFormTuple(giststate, r,
177 						 values, isnull, true /* size is currently bogus */ );
178 	itup->t_tid = *ht_ctid;
179 
180 	gistdoinsert(r, itup, 0, giststate, heapRel, false);
181 
182 	/* cleanup */
183 	MemoryContextSwitchTo(oldCxt);
184 	MemoryContextReset(giststate->tempCxt);
185 
186 	return false;
187 }
188 
189 
190 /*
191  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
192  * at that offset is atomically removed along with inserting the new tuples.
193  * This is used to replace a tuple with a new one.
194  *
195  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
196  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
197  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
198  *
199  * If 'markfollowright' is true and the page is split, the left child is
200  * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
201  * index build, however, there is no concurrent access and the page splitting
202  * is done in a slightly simpler fashion, and false is passed.
203  *
204  * If there is not enough room on the page, it is split. All the split
205  * pages are kept pinned and locked and returned in *splitinfo, the caller
206  * is responsible for inserting the downlinks for them. However, if
207  * 'buffer' is the root page and it needs to be split, gistplacetopage()
208  * performs the split as one atomic operation, and *splitinfo is set to NIL.
209  * In that case, we continue to hold the root page locked, and the child
210  * pages are released; note that new tuple(s) are *not* on the root page
211  * but in one of the new child pages.
212  *
213  * If 'newblkno' is not NULL, returns the block number of page the first
214  * new/updated tuple was inserted to. Usually it's the given page, but could
215  * be its right sibling if the page was split.
216  *
217  * Returns 'true' if the page was split, 'false' otherwise.
218  */
219 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel,bool is_build)220 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
221 				Buffer buffer,
222 				IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
223 				BlockNumber *newblkno,
224 				Buffer leftchildbuf,
225 				List **splitinfo,
226 				bool markfollowright,
227 				Relation heapRel,
228 				bool is_build)
229 {
230 	BlockNumber blkno = BufferGetBlockNumber(buffer);
231 	Page		page = BufferGetPage(buffer);
232 	bool		is_leaf = (GistPageIsLeaf(page)) ? true : false;
233 	XLogRecPtr	recptr;
234 	int			i;
235 	bool		is_split;
236 
237 	/*
238 	 * Refuse to modify a page that's incompletely split. This should not
239 	 * happen because we finish any incomplete splits while we walk down the
240 	 * tree. However, it's remotely possible that another concurrent inserter
241 	 * splits a parent page, and errors out before completing the split. We
242 	 * will just throw an error in that case, and leave any split we had in
243 	 * progress unfinished too. The next insert that comes along will clean up
244 	 * the mess.
245 	 */
246 	if (GistFollowRight(page))
247 		elog(ERROR, "concurrent GiST page split was incomplete");
248 
249 	/* should never try to insert to a deleted page */
250 	Assert(!GistPageIsDeleted(page));
251 
252 	*splitinfo = NIL;
253 
254 	/*
255 	 * if isupdate, remove old key: This node's key has been modified, either
256 	 * because a child split occurred or because we needed to adjust our key
257 	 * for an insert in a child node. Therefore, remove the old version of
258 	 * this node's key.
259 	 *
260 	 * for WAL replay, in the non-split case we handle this by setting up a
261 	 * one-element todelete array; in the split case, it's handled implicitly
262 	 * because the tuple vector passed to gistSplit won't include this tuple.
263 	 */
264 	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
265 
266 	/*
267 	 * If leaf page is full, try at first to delete dead tuples. And then
268 	 * check again.
269 	 */
270 	if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
271 	{
272 		gistprunepage(rel, page, buffer, heapRel);
273 		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
274 	}
275 
276 	if (is_split)
277 	{
278 		/* no space for insertion */
279 		IndexTuple *itvec;
280 		int			tlen;
281 		SplitedPageLayout *dist = NULL,
282 				   *ptr;
283 		BlockNumber oldrlink = InvalidBlockNumber;
284 		GistNSN		oldnsn = 0;
285 		SplitedPageLayout rootpg;
286 		bool		is_rootsplit;
287 		int			npage;
288 
289 		is_rootsplit = (blkno == GIST_ROOT_BLKNO);
290 
291 		/*
292 		 * Form index tuples vector to split. If we're replacing an old tuple,
293 		 * remove the old version from the vector.
294 		 */
295 		itvec = gistextractpage(page, &tlen);
296 		if (OffsetNumberIsValid(oldoffnum))
297 		{
298 			/* on inner page we should remove old tuple */
299 			int			pos = oldoffnum - FirstOffsetNumber;
300 
301 			tlen--;
302 			if (pos != tlen)
303 				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
304 		}
305 		itvec = gistjoinvector(itvec, &tlen, itup, ntup);
306 		dist = gistSplit(rel, page, itvec, tlen, giststate);
307 
308 		/*
309 		 * Check that split didn't produce too many pages.
310 		 */
311 		npage = 0;
312 		for (ptr = dist; ptr; ptr = ptr->next)
313 			npage++;
314 		/* in a root split, we'll add one more page to the list below */
315 		if (is_rootsplit)
316 			npage++;
317 		if (npage > GIST_MAX_SPLIT_PAGES)
318 			elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
319 				 npage, GIST_MAX_SPLIT_PAGES);
320 
321 		/*
322 		 * Set up pages to work with. Allocate new buffers for all but the
323 		 * leftmost page. The original page becomes the new leftmost page, and
324 		 * is just replaced with the new contents.
325 		 *
326 		 * For a root-split, allocate new buffers for all child pages, the
327 		 * original page is overwritten with new root page containing
328 		 * downlinks to the new child pages.
329 		 */
330 		ptr = dist;
331 		if (!is_rootsplit)
332 		{
333 			/* save old rightlink and NSN */
334 			oldrlink = GistPageGetOpaque(page)->rightlink;
335 			oldnsn = GistPageGetNSN(page);
336 
337 			dist->buffer = buffer;
338 			dist->block.blkno = BufferGetBlockNumber(buffer);
339 			dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
340 
341 			/* clean all flags except F_LEAF */
342 			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
343 
344 			ptr = ptr->next;
345 		}
346 		for (; ptr; ptr = ptr->next)
347 		{
348 			/* Allocate new page */
349 			ptr->buffer = gistNewBuffer(rel);
350 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
351 			ptr->page = BufferGetPage(ptr->buffer);
352 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
353 			PredicateLockPageSplit(rel,
354 								   BufferGetBlockNumber(buffer),
355 								   BufferGetBlockNumber(ptr->buffer));
356 		}
357 
358 		/*
359 		 * Now that we know which blocks the new pages go to, set up downlink
360 		 * tuples to point to them.
361 		 */
362 		for (ptr = dist; ptr; ptr = ptr->next)
363 		{
364 			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
365 			GistTupleSetValid(ptr->itup);
366 		}
367 
368 		/*
369 		 * If this is a root split, we construct the new root page with the
370 		 * downlinks here directly, instead of requiring the caller to insert
371 		 * them. Add the new root page to the list along with the child pages.
372 		 */
373 		if (is_rootsplit)
374 		{
375 			IndexTuple *downlinks;
376 			int			ndownlinks = 0;
377 			int			i;
378 
379 			rootpg.buffer = buffer;
380 			rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
381 			GistPageGetOpaque(rootpg.page)->flags = 0;
382 
383 			/* Prepare a vector of all the downlinks */
384 			for (ptr = dist; ptr; ptr = ptr->next)
385 				ndownlinks++;
386 			downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
387 			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
388 				downlinks[i++] = ptr->itup;
389 
390 			rootpg.block.blkno = GIST_ROOT_BLKNO;
391 			rootpg.block.num = ndownlinks;
392 			rootpg.list = gistfillitupvec(downlinks, ndownlinks,
393 										  &(rootpg.lenlist));
394 			rootpg.itup = NULL;
395 
396 			rootpg.next = dist;
397 			dist = &rootpg;
398 		}
399 		else
400 		{
401 			/* Prepare split-info to be returned to caller */
402 			for (ptr = dist; ptr; ptr = ptr->next)
403 			{
404 				GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
405 
406 				si->buf = ptr->buffer;
407 				si->downlink = ptr->itup;
408 				*splitinfo = lappend(*splitinfo, si);
409 			}
410 		}
411 
412 		/*
413 		 * Fill all pages. All the pages are new, ie. freshly allocated empty
414 		 * pages, or a temporary copy of the old page.
415 		 */
416 		for (ptr = dist; ptr; ptr = ptr->next)
417 		{
418 			char	   *data = (char *) (ptr->list);
419 
420 			for (i = 0; i < ptr->block.num; i++)
421 			{
422 				IndexTuple	thistup = (IndexTuple) data;
423 
424 				if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
425 					elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
426 
427 				/*
428 				 * If this is the first inserted/updated tuple, let the caller
429 				 * know which page it landed on.
430 				 */
431 				if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
432 					*newblkno = ptr->block.blkno;
433 
434 				data += IndexTupleSize(thistup);
435 			}
436 
437 			/* Set up rightlinks */
438 			if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
439 				GistPageGetOpaque(ptr->page)->rightlink =
440 					ptr->next->block.blkno;
441 			else
442 				GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
443 
444 			/*
445 			 * Mark the all but the right-most page with the follow-right
446 			 * flag. It will be cleared as soon as the downlink is inserted
447 			 * into the parent, but this ensures that if we error out before
448 			 * that, the index is still consistent. (in buffering build mode,
449 			 * any error will abort the index build anyway, so this is not
450 			 * needed.)
451 			 */
452 			if (ptr->next && !is_rootsplit && markfollowright)
453 				GistMarkFollowRight(ptr->page);
454 			else
455 				GistClearFollowRight(ptr->page);
456 
457 			/*
458 			 * Copy the NSN of the original page to all pages. The
459 			 * F_FOLLOW_RIGHT flags ensure that scans will follow the
460 			 * rightlinks until the downlinks are inserted.
461 			 */
462 			GistPageSetNSN(ptr->page, oldnsn);
463 		}
464 
465 		/*
466 		 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
467 		 * insertion for that. NB: The number of pages and data segments
468 		 * specified here must match the calculations in gistXLogSplit()!
469 		 */
470 		if (!is_build && RelationNeedsWAL(rel))
471 			XLogEnsureRecordSpace(npage, 1 + npage * 2);
472 
473 		START_CRIT_SECTION();
474 
475 		/*
476 		 * Must mark buffers dirty before XLogInsert, even though we'll still
477 		 * be changing their opaque fields below.
478 		 */
479 		for (ptr = dist; ptr; ptr = ptr->next)
480 			MarkBufferDirty(ptr->buffer);
481 		if (BufferIsValid(leftchildbuf))
482 			MarkBufferDirty(leftchildbuf);
483 
484 		/*
485 		 * The first page in the chain was a temporary working copy meant to
486 		 * replace the old page. Copy it over the old page.
487 		 */
488 		PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
489 		dist->page = BufferGetPage(dist->buffer);
490 
491 		/*
492 		 * Write the WAL record.
493 		 *
494 		 * If we're building a new index, however, we don't WAL-log changes
495 		 * yet. The LSN-NSN interlock between parent and child requires that
496 		 * LSNs never move backwards, so set the LSNs to a value that's
497 		 * smaller than any real or fake unlogged LSN that might be generated
498 		 * later. (There can't be any concurrent scans during index build, so
499 		 * we don't need to be able to detect concurrent splits yet.)
500 		 */
501 		if (is_build)
502 			recptr = GistBuildLSN;
503 		else
504 		{
505 			if (RelationNeedsWAL(rel))
506 				recptr = gistXLogSplit(is_leaf,
507 									   dist, oldrlink, oldnsn, leftchildbuf,
508 									   markfollowright);
509 			else
510 				recptr = gistGetFakeLSN(rel);
511 		}
512 
513 		for (ptr = dist; ptr; ptr = ptr->next)
514 			PageSetLSN(ptr->page, recptr);
515 
516 		/*
517 		 * Return the new child buffers to the caller.
518 		 *
519 		 * If this was a root split, we've already inserted the downlink
520 		 * pointers, in the form of a new root page. Therefore we can release
521 		 * all the new buffers, and keep just the root page locked.
522 		 */
523 		if (is_rootsplit)
524 		{
525 			for (ptr = dist->next; ptr; ptr = ptr->next)
526 				UnlockReleaseBuffer(ptr->buffer);
527 		}
528 	}
529 	else
530 	{
531 		/*
532 		 * Enough space.  We always get here if ntup==0.
533 		 */
534 		START_CRIT_SECTION();
535 
536 		/*
537 		 * Delete old tuple if any, then insert new tuple(s) if any.  If
538 		 * possible, use the fast path of PageIndexTupleOverwrite.
539 		 */
540 		if (OffsetNumberIsValid(oldoffnum))
541 		{
542 			if (ntup == 1)
543 			{
544 				/* One-for-one replacement, so use PageIndexTupleOverwrite */
545 				if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
546 											 IndexTupleSize(*itup)))
547 					elog(ERROR, "failed to add item to index page in \"%s\"",
548 						 RelationGetRelationName(rel));
549 			}
550 			else
551 			{
552 				/* Delete old, then append new tuple(s) to page */
553 				PageIndexTupleDelete(page, oldoffnum);
554 				gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
555 			}
556 		}
557 		else
558 		{
559 			/* Just append new tuples at the end of the page */
560 			gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
561 		}
562 
563 		MarkBufferDirty(buffer);
564 
565 		if (BufferIsValid(leftchildbuf))
566 			MarkBufferDirty(leftchildbuf);
567 
568 		if (is_build)
569 			recptr = GistBuildLSN;
570 		else
571 		{
572 			if (RelationNeedsWAL(rel))
573 			{
574 				OffsetNumber ndeloffs = 0,
575 							deloffs[1];
576 
577 				if (OffsetNumberIsValid(oldoffnum))
578 				{
579 					deloffs[0] = oldoffnum;
580 					ndeloffs = 1;
581 				}
582 
583 				recptr = gistXLogUpdate(buffer,
584 										deloffs, ndeloffs, itup, ntup,
585 										leftchildbuf);
586 			}
587 			else
588 				recptr = gistGetFakeLSN(rel);
589 		}
590 		PageSetLSN(page, recptr);
591 
592 		if (newblkno)
593 			*newblkno = blkno;
594 	}
595 
596 	/*
597 	 * If we inserted the downlink for a child page, set NSN and clear
598 	 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
599 	 * follow the rightlink if and only if they looked at the parent page
600 	 * before we inserted the downlink.
601 	 *
602 	 * Note that we do this *after* writing the WAL record. That means that
603 	 * the possible full page image in the WAL record does not include these
604 	 * changes, and they must be replayed even if the page is restored from
605 	 * the full page image. There's a chicken-and-egg problem: if we updated
606 	 * the child pages first, we wouldn't know the recptr of the WAL record
607 	 * we're about to write.
608 	 */
609 	if (BufferIsValid(leftchildbuf))
610 	{
611 		Page		leftpg = BufferGetPage(leftchildbuf);
612 
613 		GistPageSetNSN(leftpg, recptr);
614 		GistClearFollowRight(leftpg);
615 
616 		PageSetLSN(leftpg, recptr);
617 	}
618 
619 	END_CRIT_SECTION();
620 
621 	return is_split;
622 }
623 
624 /*
625  * Workhouse routine for doing insertion into a GiST index. Note that
626  * this routine assumes it is invoked in a short-lived memory context,
627  * so it does not bother releasing palloc'd allocations.
628  */
629 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel,bool is_build)630 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
631 			 GISTSTATE *giststate, Relation heapRel, bool is_build)
632 {
633 	ItemId		iid;
634 	IndexTuple	idxtuple;
635 	GISTInsertStack firststack;
636 	GISTInsertStack *stack;
637 	GISTInsertState state;
638 	bool		xlocked = false;
639 
640 	memset(&state, 0, sizeof(GISTInsertState));
641 	state.freespace = freespace;
642 	state.r = r;
643 	state.heapRel = heapRel;
644 	state.is_build = is_build;
645 
646 	/* Start from the root */
647 	firststack.blkno = GIST_ROOT_BLKNO;
648 	firststack.lsn = 0;
649 	firststack.retry_from_parent = false;
650 	firststack.parent = NULL;
651 	firststack.downlinkoffnum = InvalidOffsetNumber;
652 	state.stack = stack = &firststack;
653 
654 	/*
655 	 * Walk down along the path of smallest penalty, updating the parent
656 	 * pointers with the key we're inserting as we go. If we crash in the
657 	 * middle, the tree is consistent, although the possible parent updates
658 	 * were a waste.
659 	 */
660 	for (;;)
661 	{
662 		/*
663 		 * If we split an internal page while descending the tree, we have to
664 		 * retry at the parent. (Normally, the LSN-NSN interlock below would
665 		 * also catch this and cause us to retry. But LSNs are not updated
666 		 * during index build.)
667 		 */
668 		while (stack->retry_from_parent)
669 		{
670 			if (xlocked)
671 				LockBuffer(stack->buffer, GIST_UNLOCK);
672 			xlocked = false;
673 			ReleaseBuffer(stack->buffer);
674 			state.stack = stack = stack->parent;
675 		}
676 
677 		if (XLogRecPtrIsInvalid(stack->lsn))
678 			stack->buffer = ReadBuffer(state.r, stack->blkno);
679 
680 		/*
681 		 * Be optimistic and grab shared lock first. Swap it for an exclusive
682 		 * lock later if we need to update the page.
683 		 */
684 		if (!xlocked)
685 		{
686 			LockBuffer(stack->buffer, GIST_SHARE);
687 			gistcheckpage(state.r, stack->buffer);
688 		}
689 
690 		stack->page = (Page) BufferGetPage(stack->buffer);
691 		stack->lsn = xlocked ?
692 			PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
693 		Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
694 
695 		/*
696 		 * If this page was split but the downlink was never inserted to the
697 		 * parent because the inserting backend crashed before doing that, fix
698 		 * that now.
699 		 */
700 		if (GistFollowRight(stack->page))
701 		{
702 			if (!xlocked)
703 			{
704 				LockBuffer(stack->buffer, GIST_UNLOCK);
705 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
706 				xlocked = true;
707 				/* someone might've completed the split when we unlocked */
708 				if (!GistFollowRight(stack->page))
709 					continue;
710 			}
711 			gistfixsplit(&state, giststate);
712 
713 			UnlockReleaseBuffer(stack->buffer);
714 			xlocked = false;
715 			state.stack = stack = stack->parent;
716 			continue;
717 		}
718 
719 		if ((stack->blkno != GIST_ROOT_BLKNO &&
720 			 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
721 			GistPageIsDeleted(stack->page))
722 		{
723 			/*
724 			 * Concurrent split or page deletion detected. There's no
725 			 * guarantee that the downlink for this page is consistent with
726 			 * the tuple we're inserting anymore, so go back to parent and
727 			 * rechoose the best child.
728 			 */
729 			UnlockReleaseBuffer(stack->buffer);
730 			xlocked = false;
731 			state.stack = stack = stack->parent;
732 			continue;
733 		}
734 
735 		if (!GistPageIsLeaf(stack->page))
736 		{
737 			/*
738 			 * This is an internal page so continue to walk down the tree.
739 			 * Find the child node that has the minimum insertion penalty.
740 			 */
741 			BlockNumber childblkno;
742 			IndexTuple	newtup;
743 			GISTInsertStack *item;
744 			OffsetNumber downlinkoffnum;
745 
746 			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
747 			iid = PageGetItemId(stack->page, downlinkoffnum);
748 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
749 			childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
750 
751 			/*
752 			 * Check that it's not a leftover invalid tuple from pre-9.1
753 			 */
754 			if (GistTupleIsInvalid(idxtuple))
755 				ereport(ERROR,
756 						(errmsg("index \"%s\" contains an inner tuple marked as invalid",
757 								RelationGetRelationName(r)),
758 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
759 						 errhint("Please REINDEX it.")));
760 
761 			/*
762 			 * Check that the key representing the target child node is
763 			 * consistent with the key we're inserting. Update it if it's not.
764 			 */
765 			newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
766 			if (newtup)
767 			{
768 				/*
769 				 * Swap shared lock for an exclusive one. Beware, the page may
770 				 * change while we unlock/lock the page...
771 				 */
772 				if (!xlocked)
773 				{
774 					LockBuffer(stack->buffer, GIST_UNLOCK);
775 					LockBuffer(stack->buffer, GIST_EXCLUSIVE);
776 					xlocked = true;
777 					stack->page = (Page) BufferGetPage(stack->buffer);
778 
779 					if (PageGetLSN(stack->page) != stack->lsn)
780 					{
781 						/* the page was changed while we unlocked it, retry */
782 						continue;
783 					}
784 				}
785 
786 				/*
787 				 * Update the tuple.
788 				 *
789 				 * We still hold the lock after gistinserttuple(), but it
790 				 * might have to split the page to make the updated tuple fit.
791 				 * In that case the updated tuple might migrate to the other
792 				 * half of the split, so we have to go back to the parent and
793 				 * descend back to the half that's a better fit for the new
794 				 * tuple.
795 				 */
796 				if (gistinserttuple(&state, stack, giststate, newtup,
797 									downlinkoffnum))
798 				{
799 					/*
800 					 * If this was a root split, the root page continues to be
801 					 * the parent and the updated tuple went to one of the
802 					 * child pages, so we just need to retry from the root
803 					 * page.
804 					 */
805 					if (stack->blkno != GIST_ROOT_BLKNO)
806 					{
807 						UnlockReleaseBuffer(stack->buffer);
808 						xlocked = false;
809 						state.stack = stack = stack->parent;
810 					}
811 					continue;
812 				}
813 			}
814 			LockBuffer(stack->buffer, GIST_UNLOCK);
815 			xlocked = false;
816 
817 			/* descend to the chosen child */
818 			item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
819 			item->blkno = childblkno;
820 			item->parent = stack;
821 			item->downlinkoffnum = downlinkoffnum;
822 			state.stack = stack = item;
823 		}
824 		else
825 		{
826 			/*
827 			 * Leaf page. Insert the new key. We've already updated all the
828 			 * parents on the way down, but we might have to split the page if
829 			 * it doesn't fit. gistinserttuple() will take care of that.
830 			 */
831 
832 			/*
833 			 * Swap shared lock for an exclusive one. Be careful, the page may
834 			 * change while we unlock/lock the page...
835 			 */
836 			if (!xlocked)
837 			{
838 				LockBuffer(stack->buffer, GIST_UNLOCK);
839 				LockBuffer(stack->buffer, GIST_EXCLUSIVE);
840 				xlocked = true;
841 				stack->page = (Page) BufferGetPage(stack->buffer);
842 				stack->lsn = PageGetLSN(stack->page);
843 
844 				if (stack->blkno == GIST_ROOT_BLKNO)
845 				{
846 					/*
847 					 * the only page that can become inner instead of leaf is
848 					 * the root page, so for root we should recheck it
849 					 */
850 					if (!GistPageIsLeaf(stack->page))
851 					{
852 						/*
853 						 * very rare situation: during unlock/lock index with
854 						 * number of pages = 1 was increased
855 						 */
856 						LockBuffer(stack->buffer, GIST_UNLOCK);
857 						xlocked = false;
858 						continue;
859 					}
860 
861 					/*
862 					 * we don't need to check root split, because checking
863 					 * leaf/inner is enough to recognize split for root
864 					 */
865 				}
866 				else if ((GistFollowRight(stack->page) ||
867 						  stack->parent->lsn < GistPageGetNSN(stack->page)) ||
868 						 GistPageIsDeleted(stack->page))
869 				{
870 					/*
871 					 * The page was split or deleted while we momentarily
872 					 * unlocked the page. Go back to parent.
873 					 */
874 					UnlockReleaseBuffer(stack->buffer);
875 					xlocked = false;
876 					state.stack = stack = stack->parent;
877 					continue;
878 				}
879 			}
880 
881 			/* now state.stack->(page, buffer and blkno) points to leaf page */
882 
883 			gistinserttuple(&state, stack, giststate, itup,
884 							InvalidOffsetNumber);
885 			LockBuffer(stack->buffer, GIST_UNLOCK);
886 
887 			/* Release any pins we might still hold before exiting */
888 			for (; stack; stack = stack->parent)
889 				ReleaseBuffer(stack->buffer);
890 			break;
891 		}
892 	}
893 }
894 
895 /*
896  * Traverse the tree to find path from root page to specified "child" block.
897  *
898  * returns a new insertion stack, starting from the parent of "child", up
899  * to the root. *downlinkoffnum is set to the offset of the downlink in the
900  * direct parent of child.
901  *
902  * To prevent deadlocks, this should lock only one page at a time.
903  */
904 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)905 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
906 {
907 	Page		page;
908 	Buffer		buffer;
909 	OffsetNumber i,
910 				maxoff;
911 	ItemId		iid;
912 	IndexTuple	idxtuple;
913 	List	   *fifo;
914 	GISTInsertStack *top,
915 			   *ptr;
916 	BlockNumber blkno;
917 
918 	top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
919 	top->blkno = GIST_ROOT_BLKNO;
920 	top->downlinkoffnum = InvalidOffsetNumber;
921 
922 	fifo = list_make1(top);
923 	while (fifo != NIL)
924 	{
925 		/* Get next page to visit */
926 		top = linitial(fifo);
927 		fifo = list_delete_first(fifo);
928 
929 		buffer = ReadBuffer(r, top->blkno);
930 		LockBuffer(buffer, GIST_SHARE);
931 		gistcheckpage(r, buffer);
932 		page = (Page) BufferGetPage(buffer);
933 
934 		if (GistPageIsLeaf(page))
935 		{
936 			/*
937 			 * Because we scan the index top-down, all the rest of the pages
938 			 * in the queue must be leaf pages as well.
939 			 */
940 			UnlockReleaseBuffer(buffer);
941 			break;
942 		}
943 
944 		/* currently, internal pages are never deleted */
945 		Assert(!GistPageIsDeleted(page));
946 
947 		top->lsn = BufferGetLSNAtomic(buffer);
948 
949 		/*
950 		 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
951 		 * downlink. This should not normally happen..
952 		 */
953 		if (GistFollowRight(page))
954 			elog(ERROR, "concurrent GiST page split was incomplete");
955 
956 		if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
957 			GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
958 		{
959 			/*
960 			 * Page was split while we looked elsewhere. We didn't see the
961 			 * downlink to the right page when we scanned the parent, so add
962 			 * it to the queue now.
963 			 *
964 			 * Put the right page ahead of the queue, so that we visit it
965 			 * next. That's important, because if this is the lowest internal
966 			 * level, just above leaves, we might already have queued up some
967 			 * leaf pages, and we assume that there can't be any non-leaf
968 			 * pages behind leaf pages.
969 			 */
970 			ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
971 			ptr->blkno = GistPageGetOpaque(page)->rightlink;
972 			ptr->downlinkoffnum = InvalidOffsetNumber;
973 			ptr->parent = top->parent;
974 
975 			fifo = lcons(ptr, fifo);
976 		}
977 
978 		maxoff = PageGetMaxOffsetNumber(page);
979 
980 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
981 		{
982 			iid = PageGetItemId(page, i);
983 			idxtuple = (IndexTuple) PageGetItem(page, iid);
984 			blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
985 			if (blkno == child)
986 			{
987 				/* Found it! */
988 				UnlockReleaseBuffer(buffer);
989 				*downlinkoffnum = i;
990 				return top;
991 			}
992 			else
993 			{
994 				/* Append this child to the list of pages to visit later */
995 				ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
996 				ptr->blkno = blkno;
997 				ptr->downlinkoffnum = i;
998 				ptr->parent = top;
999 
1000 				fifo = lappend(fifo, ptr);
1001 			}
1002 		}
1003 
1004 		UnlockReleaseBuffer(buffer);
1005 	}
1006 
1007 	elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
1008 		 RelationGetRelationName(r), child);
1009 	return NULL;				/* keep compiler quiet */
1010 }
1011 
1012 /*
1013  * Updates the stack so that child->parent is the correct parent of the
1014  * child. child->parent must be exclusively locked on entry, and will
1015  * remain so at exit, but it might not be the same page anymore.
1016  */
1017 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)1018 gistFindCorrectParent(Relation r, GISTInsertStack *child)
1019 {
1020 	GISTInsertStack *parent = child->parent;
1021 
1022 	gistcheckpage(r, parent->buffer);
1023 	parent->page = (Page) BufferGetPage(parent->buffer);
1024 
1025 	/* here we don't need to distinguish between split and page update */
1026 	if (child->downlinkoffnum == InvalidOffsetNumber ||
1027 		parent->lsn != PageGetLSN(parent->page))
1028 	{
1029 		/* parent is changed, look child in right links until found */
1030 		OffsetNumber i,
1031 					maxoff;
1032 		ItemId		iid;
1033 		IndexTuple	idxtuple;
1034 		GISTInsertStack *ptr;
1035 
1036 		while (true)
1037 		{
1038 			maxoff = PageGetMaxOffsetNumber(parent->page);
1039 			for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1040 			{
1041 				iid = PageGetItemId(parent->page, i);
1042 				idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1043 				if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1044 				{
1045 					/* yes!!, found */
1046 					child->downlinkoffnum = i;
1047 					return;
1048 				}
1049 			}
1050 
1051 			parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1052 			UnlockReleaseBuffer(parent->buffer);
1053 			if (parent->blkno == InvalidBlockNumber)
1054 			{
1055 				/*
1056 				 * End of chain and still didn't find parent. It's a very-very
1057 				 * rare situation when root splitted.
1058 				 */
1059 				break;
1060 			}
1061 			parent->buffer = ReadBuffer(r, parent->blkno);
1062 			LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1063 			gistcheckpage(r, parent->buffer);
1064 			parent->page = (Page) BufferGetPage(parent->buffer);
1065 		}
1066 
1067 		/*
1068 		 * awful!!, we need search tree to find parent ... , but before we
1069 		 * should release all old parent
1070 		 */
1071 
1072 		ptr = child->parent->parent;	/* child->parent already released
1073 										 * above */
1074 		while (ptr)
1075 		{
1076 			ReleaseBuffer(ptr->buffer);
1077 			ptr = ptr->parent;
1078 		}
1079 
1080 		/* ok, find new path */
1081 		ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1082 
1083 		/* read all buffers as expected by caller */
1084 		/* note we don't lock them or gistcheckpage them here! */
1085 		while (ptr)
1086 		{
1087 			ptr->buffer = ReadBuffer(r, ptr->blkno);
1088 			ptr->page = (Page) BufferGetPage(ptr->buffer);
1089 			ptr = ptr->parent;
1090 		}
1091 
1092 		/* install new chain of parents to stack */
1093 		child->parent = parent;
1094 
1095 		/* make recursive call to normal processing */
1096 		LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1097 		gistFindCorrectParent(r, child);
1098 	}
1099 }
1100 
1101 /*
1102  * Form a downlink pointer for the page in 'buf'.
1103  */
1104 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1105 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1106 				 GISTInsertStack *stack)
1107 {
1108 	Page		page = BufferGetPage(buf);
1109 	OffsetNumber maxoff;
1110 	OffsetNumber offset;
1111 	IndexTuple	downlink = NULL;
1112 
1113 	maxoff = PageGetMaxOffsetNumber(page);
1114 	for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1115 	{
1116 		IndexTuple	ituple = (IndexTuple)
1117 		PageGetItem(page, PageGetItemId(page, offset));
1118 
1119 		if (downlink == NULL)
1120 			downlink = CopyIndexTuple(ituple);
1121 		else
1122 		{
1123 			IndexTuple	newdownlink;
1124 
1125 			newdownlink = gistgetadjusted(rel, downlink, ituple,
1126 										  giststate);
1127 			if (newdownlink)
1128 				downlink = newdownlink;
1129 		}
1130 	}
1131 
1132 	/*
1133 	 * If the page is completely empty, we can't form a meaningful downlink
1134 	 * for it. But we have to insert a downlink for the page. Any key will do,
1135 	 * as long as its consistent with the downlink of parent page, so that we
1136 	 * can legally insert it to the parent. A minimal one that matches as few
1137 	 * scans as possible would be best, to keep scans from doing useless work,
1138 	 * but we don't know how to construct that. So we just use the downlink of
1139 	 * the original page that was split - that's as far from optimal as it can
1140 	 * get but will do..
1141 	 */
1142 	if (!downlink)
1143 	{
1144 		ItemId		iid;
1145 
1146 		LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1147 		gistFindCorrectParent(rel, stack);
1148 		iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1149 		downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1150 		downlink = CopyIndexTuple(downlink);
1151 		LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1152 	}
1153 
1154 	ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1155 	GistTupleSetValid(downlink);
1156 
1157 	return downlink;
1158 }
1159 
1160 
1161 /*
1162  * Complete the incomplete split of state->stack->page.
1163  */
1164 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1165 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1166 {
1167 	GISTInsertStack *stack = state->stack;
1168 	Buffer		buf;
1169 	Page		page;
1170 	List	   *splitinfo = NIL;
1171 
1172 	elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1173 		 RelationGetRelationName(state->r), stack->blkno);
1174 
1175 	Assert(GistFollowRight(stack->page));
1176 	Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1177 
1178 	buf = stack->buffer;
1179 
1180 	/*
1181 	 * Read the chain of split pages, following the rightlinks. Construct a
1182 	 * downlink tuple for each page.
1183 	 */
1184 	for (;;)
1185 	{
1186 		GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1187 		IndexTuple	downlink;
1188 
1189 		page = BufferGetPage(buf);
1190 
1191 		/* Form the new downlink tuples to insert to parent */
1192 		downlink = gistformdownlink(state->r, buf, giststate, stack);
1193 
1194 		si->buf = buf;
1195 		si->downlink = downlink;
1196 
1197 		splitinfo = lappend(splitinfo, si);
1198 
1199 		if (GistFollowRight(page))
1200 		{
1201 			/* lock next page */
1202 			buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1203 			LockBuffer(buf, GIST_EXCLUSIVE);
1204 		}
1205 		else
1206 			break;
1207 	}
1208 
1209 	/* Insert the downlinks */
1210 	gistfinishsplit(state, stack, giststate, splitinfo, false);
1211 }
1212 
1213 /*
1214  * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1215  * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1216  * 'stack' represents the path from the root to the page being updated.
1217  *
1218  * The caller must hold an exclusive lock on stack->buffer.  The lock is still
1219  * held on return, but the page might not contain the inserted tuple if the
1220  * page was split. The function returns true if the page was split, false
1221  * otherwise.
1222  */
1223 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1224 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1225 				GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1226 {
1227 	return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1228 							InvalidBuffer, InvalidBuffer, false, false);
1229 }
1230 
1231 /* ----------------
1232  * An extended workhorse version of gistinserttuple(). This version allows
1233  * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1234  * This is used to recursively update the downlinks in the parent when a page
1235  * is split.
1236  *
1237  * If leftchild and rightchild are valid, we're inserting/replacing the
1238  * downlink for rightchild, and leftchild is its left sibling. We clear the
1239  * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1240  * insertion of the downlink.
1241  *
1242  * To avoid holding locks for longer than necessary, when recursing up the
1243  * tree to update the parents, the locking is a bit peculiar here. On entry,
1244  * the caller must hold an exclusive lock on stack->buffer, as well as
1245  * leftchild and rightchild if given. On return:
1246  *
1247  *	- Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1248  *	  always kept pinned, however.
1249  *	- Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1250  *	  is kept pinned.
1251  *	- Lock and pin on 'rightchild' are always released.
1252  *
1253  * Returns 'true' if the page had to be split. Note that if the page was
1254  * split, the inserted/updated tuples might've been inserted to a right
1255  * sibling of stack->buffer instead of stack->buffer itself.
1256  */
1257 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1258 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1259 				 GISTSTATE *giststate,
1260 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1261 				 Buffer leftchild, Buffer rightchild,
1262 				 bool unlockbuf, bool unlockleftchild)
1263 {
1264 	List	   *splitinfo;
1265 	bool		is_split;
1266 
1267 	/*
1268 	 * Check for any rw conflicts (in serializable isolation level) just
1269 	 * before we intend to modify the page
1270 	 */
1271 	CheckForSerializableConflictIn(state->r, NULL, BufferGetBlockNumber(stack->buffer));
1272 
1273 	/* Insert the tuple(s) to the page, splitting the page if necessary */
1274 	is_split = gistplacetopage(state->r, state->freespace, giststate,
1275 							   stack->buffer,
1276 							   tuples, ntup,
1277 							   oldoffnum, NULL,
1278 							   leftchild,
1279 							   &splitinfo,
1280 							   true,
1281 							   state->heapRel,
1282 							   state->is_build);
1283 
1284 	/*
1285 	 * Before recursing up in case the page was split, release locks on the
1286 	 * child pages. We don't need to keep them locked when updating the
1287 	 * parent.
1288 	 */
1289 	if (BufferIsValid(rightchild))
1290 		UnlockReleaseBuffer(rightchild);
1291 	if (BufferIsValid(leftchild) && unlockleftchild)
1292 		LockBuffer(leftchild, GIST_UNLOCK);
1293 
1294 	/*
1295 	 * If we had to split, insert/update the downlinks in the parent. If the
1296 	 * caller requested us to release the lock on stack->buffer, tell
1297 	 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1298 	 * didn't have to split, release it ourselves.
1299 	 */
1300 	if (splitinfo)
1301 		gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1302 	else if (unlockbuf)
1303 		LockBuffer(stack->buffer, GIST_UNLOCK);
1304 
1305 	return is_split;
1306 }
1307 
1308 /*
1309  * Finish an incomplete split by inserting/updating the downlinks in parent
1310  * page. 'splitinfo' contains all the child pages involved in the split,
1311  * from left-to-right.
1312  *
1313  * On entry, the caller must hold a lock on stack->buffer and all the child
1314  * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1315  * released on return. The child pages are always unlocked and unpinned.
1316  */
1317 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1318 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1319 				GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1320 {
1321 	GISTPageSplitInfo *right;
1322 	GISTPageSplitInfo *left;
1323 	IndexTuple	tuples[2];
1324 
1325 	/* A split always contains at least two halves */
1326 	Assert(list_length(splitinfo) >= 2);
1327 
1328 	/*
1329 	 * We need to insert downlinks for each new page, and update the downlink
1330 	 * for the original (leftmost) page in the split. Begin at the rightmost
1331 	 * page, inserting one downlink at a time until there's only two pages
1332 	 * left. Finally insert the downlink for the last new page and update the
1333 	 * downlink for the original page as one operation.
1334 	 */
1335 	LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1336 
1337 	/*
1338 	 * Insert downlinks for the siblings from right to left, until there are
1339 	 * only two siblings left.
1340 	 */
1341 	for (int pos = list_length(splitinfo) - 1; pos > 1; pos--)
1342 	{
1343 		right = (GISTPageSplitInfo *) list_nth(splitinfo, pos);
1344 		left = (GISTPageSplitInfo *) list_nth(splitinfo, pos - 1);
1345 
1346 		gistFindCorrectParent(state->r, stack);
1347 		if (gistinserttuples(state, stack->parent, giststate,
1348 							 &right->downlink, 1,
1349 							 InvalidOffsetNumber,
1350 							 left->buf, right->buf, false, false))
1351 		{
1352 			/*
1353 			 * If the parent page was split, the existing downlink might have
1354 			 * moved.
1355 			 */
1356 			stack->downlinkoffnum = InvalidOffsetNumber;
1357 		}
1358 		/* gistinserttuples() released the lock on right->buf. */
1359 	}
1360 
1361 	right = (GISTPageSplitInfo *) lsecond(splitinfo);
1362 	left = (GISTPageSplitInfo *) linitial(splitinfo);
1363 
1364 	/*
1365 	 * Finally insert downlink for the remaining right page and update the
1366 	 * downlink for the original page to not contain the tuples that were
1367 	 * moved to the new pages.
1368 	 */
1369 	tuples[0] = left->downlink;
1370 	tuples[1] = right->downlink;
1371 	gistFindCorrectParent(state->r, stack);
1372 	if (gistinserttuples(state, stack->parent, giststate,
1373 						 tuples, 2,
1374 						 stack->downlinkoffnum,
1375 						 left->buf, right->buf,
1376 						 true,	/* Unlock parent */
1377 						 unlockbuf	/* Unlock stack->buffer if caller wants
1378 									 * that */
1379 						 ))
1380 	{
1381 		/*
1382 		 * If the parent page was split, the downlink might have moved.
1383 		 */
1384 		stack->downlinkoffnum = InvalidOffsetNumber;
1385 	}
1386 
1387 	Assert(left->buf == stack->buffer);
1388 
1389 	/*
1390 	 * If we split the page because we had to adjust the downlink on an
1391 	 * internal page, while descending the tree for inserting a new tuple,
1392 	 * then this might no longer be the correct page for the new tuple. The
1393 	 * downlink to this page might not cover the new tuple anymore, it might
1394 	 * need to go to the newly-created right sibling instead. Tell the caller
1395 	 * to walk back up the stack, to re-check at the parent which page to
1396 	 * insert to.
1397 	 *
1398 	 * Normally, the LSN-NSN interlock during the tree descend would also
1399 	 * detect that a concurrent split happened (by ourselves), and cause us to
1400 	 * retry at the parent. But that mechanism doesn't work during index
1401 	 * build, because we don't do WAL-logging, and don't update LSNs, during
1402 	 * index build.
1403 	 */
1404 	stack->retry_from_parent = true;
1405 }
1406 
1407 /*
1408  * gistSplit -- split a page in the tree and fill struct
1409  * used for XLOG and real writes buffers. Function is recursive, ie
1410  * it will split page until keys will fit in every page.
1411  */
1412 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1413 gistSplit(Relation r,
1414 		  Page page,
1415 		  IndexTuple *itup,		/* contains compressed entry */
1416 		  int len,
1417 		  GISTSTATE *giststate)
1418 {
1419 	IndexTuple *lvectup,
1420 			   *rvectup;
1421 	GistSplitVector v;
1422 	int			i;
1423 	SplitedPageLayout *res = NULL;
1424 
1425 	/* this should never recurse very deeply, but better safe than sorry */
1426 	check_stack_depth();
1427 
1428 	/* there's no point in splitting an empty page */
1429 	Assert(len > 0);
1430 
1431 	/*
1432 	 * If a single tuple doesn't fit on a page, no amount of splitting will
1433 	 * help.
1434 	 */
1435 	if (len == 1)
1436 		ereport(ERROR,
1437 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1438 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1439 						IndexTupleSize(itup[0]), GiSTPageSize,
1440 						RelationGetRelationName(r))));
1441 
1442 	memset(v.spl_lisnull, true,
1443 		   sizeof(bool) * giststate->nonLeafTupdesc->natts);
1444 	memset(v.spl_risnull, true,
1445 		   sizeof(bool) * giststate->nonLeafTupdesc->natts);
1446 	gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1447 
1448 	/* form left and right vector */
1449 	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1450 	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1451 
1452 	for (i = 0; i < v.splitVector.spl_nleft; i++)
1453 		lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1454 
1455 	for (i = 0; i < v.splitVector.spl_nright; i++)
1456 		rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1457 
1458 	/* finalize splitting (may need another split) */
1459 	if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1460 	{
1461 		res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1462 	}
1463 	else
1464 	{
1465 		ROTATEDIST(res);
1466 		res->block.num = v.splitVector.spl_nright;
1467 		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1468 		res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1469 	}
1470 
1471 	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1472 	{
1473 		SplitedPageLayout *resptr,
1474 				   *subres;
1475 
1476 		resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1477 
1478 		/* install on list's tail */
1479 		while (resptr->next)
1480 			resptr = resptr->next;
1481 
1482 		resptr->next = res;
1483 		res = subres;
1484 	}
1485 	else
1486 	{
1487 		ROTATEDIST(res);
1488 		res->block.num = v.splitVector.spl_nleft;
1489 		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1490 		res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1491 	}
1492 
1493 	return res;
1494 }
1495 
1496 /*
1497  * Create a GISTSTATE and fill it with information about the index
1498  */
1499 GISTSTATE *
initGISTstate(Relation index)1500 initGISTstate(Relation index)
1501 {
1502 	GISTSTATE  *giststate;
1503 	MemoryContext scanCxt;
1504 	MemoryContext oldCxt;
1505 	int			i;
1506 
1507 	/* safety check to protect fixed-size arrays in GISTSTATE */
1508 	if (index->rd_att->natts > INDEX_MAX_KEYS)
1509 		elog(ERROR, "numberOfAttributes %d > %d",
1510 			 index->rd_att->natts, INDEX_MAX_KEYS);
1511 
1512 	/* Create the memory context that will hold the GISTSTATE */
1513 	scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1514 									"GiST scan context",
1515 									ALLOCSET_DEFAULT_SIZES);
1516 	oldCxt = MemoryContextSwitchTo(scanCxt);
1517 
1518 	/* Create and fill in the GISTSTATE */
1519 	giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1520 
1521 	giststate->scanCxt = scanCxt;
1522 	giststate->tempCxt = scanCxt;	/* caller must change this if needed */
1523 	giststate->leafTupdesc = index->rd_att;
1524 
1525 	/*
1526 	 * The truncated tupdesc for non-leaf index tuples, which doesn't contain
1527 	 * the INCLUDE attributes.
1528 	 *
1529 	 * It is used to form tuples during tuple adjustment and page split.
1530 	 * B-tree creates shortened tuple descriptor for every truncated tuple,
1531 	 * because it is doing this less often: it does not have to form truncated
1532 	 * tuples during page split.  Also, B-tree is not adjusting tuples on
1533 	 * internal pages the way GiST does.
1534 	 */
1535 	giststate->nonLeafTupdesc = CreateTupleDescCopyConstr(index->rd_att);
1536 	giststate->nonLeafTupdesc->natts =
1537 		IndexRelationGetNumberOfKeyAttributes(index);
1538 
1539 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(index); i++)
1540 	{
1541 		fmgr_info_copy(&(giststate->consistentFn[i]),
1542 					   index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1543 					   scanCxt);
1544 		fmgr_info_copy(&(giststate->unionFn[i]),
1545 					   index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1546 					   scanCxt);
1547 
1548 		/* opclasses are not required to provide a Compress method */
1549 		if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1550 			fmgr_info_copy(&(giststate->compressFn[i]),
1551 						   index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1552 						   scanCxt);
1553 		else
1554 			giststate->compressFn[i].fn_oid = InvalidOid;
1555 
1556 		/* opclasses are not required to provide a Decompress method */
1557 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1558 			fmgr_info_copy(&(giststate->decompressFn[i]),
1559 						   index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1560 						   scanCxt);
1561 		else
1562 			giststate->decompressFn[i].fn_oid = InvalidOid;
1563 
1564 		fmgr_info_copy(&(giststate->penaltyFn[i]),
1565 					   index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1566 					   scanCxt);
1567 		fmgr_info_copy(&(giststate->picksplitFn[i]),
1568 					   index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1569 					   scanCxt);
1570 		fmgr_info_copy(&(giststate->equalFn[i]),
1571 					   index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1572 					   scanCxt);
1573 
1574 		/* opclasses are not required to provide a Distance method */
1575 		if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1576 			fmgr_info_copy(&(giststate->distanceFn[i]),
1577 						   index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1578 						   scanCxt);
1579 		else
1580 			giststate->distanceFn[i].fn_oid = InvalidOid;
1581 
1582 		/* opclasses are not required to provide a Fetch method */
1583 		if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1584 			fmgr_info_copy(&(giststate->fetchFn[i]),
1585 						   index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1586 						   scanCxt);
1587 		else
1588 			giststate->fetchFn[i].fn_oid = InvalidOid;
1589 
1590 		/*
1591 		 * If the index column has a specified collation, we should honor that
1592 		 * while doing comparisons.  However, we may have a collatable storage
1593 		 * type for a noncollatable indexed data type.  If there's no index
1594 		 * collation then specify default collation in case the support
1595 		 * functions need collation.  This is harmless if the support
1596 		 * functions don't care about collation, so we just do it
1597 		 * unconditionally.  (We could alternatively call get_typcollation,
1598 		 * but that seems like expensive overkill --- there aren't going to be
1599 		 * any cases where a GiST storage type has a nondefault collation.)
1600 		 */
1601 		if (OidIsValid(index->rd_indcollation[i]))
1602 			giststate->supportCollation[i] = index->rd_indcollation[i];
1603 		else
1604 			giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1605 	}
1606 
1607 	/* No opclass information for INCLUDE attributes */
1608 	for (; i < index->rd_att->natts; i++)
1609 	{
1610 		giststate->consistentFn[i].fn_oid = InvalidOid;
1611 		giststate->unionFn[i].fn_oid = InvalidOid;
1612 		giststate->compressFn[i].fn_oid = InvalidOid;
1613 		giststate->decompressFn[i].fn_oid = InvalidOid;
1614 		giststate->penaltyFn[i].fn_oid = InvalidOid;
1615 		giststate->picksplitFn[i].fn_oid = InvalidOid;
1616 		giststate->equalFn[i].fn_oid = InvalidOid;
1617 		giststate->distanceFn[i].fn_oid = InvalidOid;
1618 		giststate->fetchFn[i].fn_oid = InvalidOid;
1619 		giststate->supportCollation[i] = InvalidOid;
1620 	}
1621 
1622 	MemoryContextSwitchTo(oldCxt);
1623 
1624 	return giststate;
1625 }
1626 
1627 void
freeGISTstate(GISTSTATE * giststate)1628 freeGISTstate(GISTSTATE *giststate)
1629 {
1630 	/* It's sufficient to delete the scanCxt */
1631 	MemoryContextDelete(giststate->scanCxt);
1632 }
1633 
1634 /*
1635  * gistprunepage() -- try to remove LP_DEAD items from the given page.
1636  * Function assumes that buffer is exclusively locked.
1637  */
1638 static void
gistprunepage(Relation rel,Page page,Buffer buffer,Relation heapRel)1639 gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1640 {
1641 	OffsetNumber deletable[MaxIndexTuplesPerPage];
1642 	int			ndeletable = 0;
1643 	OffsetNumber offnum,
1644 				maxoff;
1645 	TransactionId latestRemovedXid = InvalidTransactionId;
1646 
1647 	Assert(GistPageIsLeaf(page));
1648 
1649 	/*
1650 	 * Scan over all items to see which ones need to be deleted according to
1651 	 * LP_DEAD flags.
1652 	 */
1653 	maxoff = PageGetMaxOffsetNumber(page);
1654 	for (offnum = FirstOffsetNumber;
1655 		 offnum <= maxoff;
1656 		 offnum = OffsetNumberNext(offnum))
1657 	{
1658 		ItemId		itemId = PageGetItemId(page, offnum);
1659 
1660 		if (ItemIdIsDead(itemId))
1661 			deletable[ndeletable++] = offnum;
1662 	}
1663 
1664 	if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
1665 		latestRemovedXid =
1666 			index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
1667 												 deletable, ndeletable);
1668 
1669 	if (ndeletable > 0)
1670 	{
1671 		START_CRIT_SECTION();
1672 
1673 		PageIndexMultiDelete(page, deletable, ndeletable);
1674 
1675 		/*
1676 		 * Mark the page as not containing any LP_DEAD items.  This is not
1677 		 * certainly true (there might be some that have recently been marked,
1678 		 * but weren't included in our target-item list), but it will almost
1679 		 * always be true and it doesn't seem worth an additional page scan to
1680 		 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1681 		 */
1682 		GistClearPageHasGarbage(page);
1683 
1684 		MarkBufferDirty(buffer);
1685 
1686 		/* XLOG stuff */
1687 		if (RelationNeedsWAL(rel))
1688 		{
1689 			XLogRecPtr	recptr;
1690 
1691 			recptr = gistXLogDelete(buffer,
1692 									deletable, ndeletable,
1693 									latestRemovedXid);
1694 
1695 			PageSetLSN(page, recptr);
1696 		}
1697 		else
1698 			PageSetLSN(page, gistGetFakeLSN(rel));
1699 
1700 		END_CRIT_SECTION();
1701 	}
1702 
1703 	/*
1704 	 * Note: if we didn't find any LP_DEAD items, then the page's
1705 	 * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1706 	 * separate write to clear it, however.  We will clear it when we split
1707 	 * the page.
1708 	 */
1709 }
1710