1 /*-------------------------------------------------------------------------
2 *
3 * gist.c
4 * interface routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "storage/lmgr.h"
22 #include "storage/predicate.h"
23 #include "nodes/execnodes.h"
24 #include "utils/builtins.h"
25 #include "utils/index_selfuncs.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28
29
30 /* non-export function prototypes */
31 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 GISTSTATE *giststate,
36 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 Buffer leftchild, Buffer rightchild,
38 bool unlockbuf, bool unlockleftchild);
39 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
41 static void gistprunepage(Relation rel, Page page, Buffer buffer,
42 Relation heapRel);
43
44
45 #define ROTATEDIST(d) do { \
46 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc0(sizeof(SplitedPageLayout)); \
47 tmp->block.blkno = InvalidBlockNumber; \
48 tmp->buffer = InvalidBuffer; \
49 tmp->next = (d); \
50 (d)=tmp; \
51 } while(0)
52
53
54 /*
55 * GiST handler function: return IndexAmRoutine with access method parameters
56 * and callbacks.
57 */
58 Datum
gisthandler(PG_FUNCTION_ARGS)59 gisthandler(PG_FUNCTION_ARGS)
60 {
61 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
62
63 amroutine->amstrategies = 0;
64 amroutine->amsupport = GISTNProcs;
65 amroutine->amcanorder = false;
66 amroutine->amcanorderbyop = true;
67 amroutine->amcanbackward = false;
68 amroutine->amcanunique = false;
69 amroutine->amcanmulticol = true;
70 amroutine->amoptionalkey = true;
71 amroutine->amsearcharray = false;
72 amroutine->amsearchnulls = true;
73 amroutine->amstorage = true;
74 amroutine->amclusterable = true;
75 amroutine->ampredlocks = true;
76 amroutine->amcanparallel = false;
77 amroutine->amcaninclude = true;
78 amroutine->amkeytype = InvalidOid;
79
80 amroutine->ambuild = gistbuild;
81 amroutine->ambuildempty = gistbuildempty;
82 amroutine->aminsert = gistinsert;
83 amroutine->ambulkdelete = gistbulkdelete;
84 amroutine->amvacuumcleanup = gistvacuumcleanup;
85 amroutine->amcanreturn = gistcanreturn;
86 amroutine->amcostestimate = gistcostestimate;
87 amroutine->amoptions = gistoptions;
88 amroutine->amproperty = gistproperty;
89 amroutine->ambuildphasename = NULL;
90 amroutine->amvalidate = gistvalidate;
91 amroutine->ambeginscan = gistbeginscan;
92 amroutine->amrescan = gistrescan;
93 amroutine->amgettuple = gistgettuple;
94 amroutine->amgetbitmap = gistgetbitmap;
95 amroutine->amendscan = gistendscan;
96 amroutine->ammarkpos = NULL;
97 amroutine->amrestrpos = NULL;
98 amroutine->amestimateparallelscan = NULL;
99 amroutine->aminitparallelscan = NULL;
100 amroutine->amparallelrescan = NULL;
101
102 PG_RETURN_POINTER(amroutine);
103 }
104
105 /*
106 * Create and return a temporary memory context for use by GiST. We
107 * _always_ invoke user-provided methods in a temporary memory
108 * context, so that memory leaks in those functions cannot cause
109 * problems. Also, we use some additional temporary contexts in the
110 * GiST code itself, to avoid the need to do some awkward manual
111 * memory management.
112 */
113 MemoryContext
createTempGistContext(void)114 createTempGistContext(void)
115 {
116 return AllocSetContextCreate(CurrentMemoryContext,
117 "GiST temporary context",
118 ALLOCSET_DEFAULT_SIZES);
119 }
120
121 /*
122 * gistbuildempty() -- build an empty gist index in the initialization fork
123 */
124 void
gistbuildempty(Relation index)125 gistbuildempty(Relation index)
126 {
127 Buffer buffer;
128
129 /* Initialize the root page */
130 buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
131 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
132
133 /* Initialize and xlog buffer */
134 START_CRIT_SECTION();
135 GISTInitBuffer(buffer, F_LEAF);
136 MarkBufferDirty(buffer);
137 log_newpage_buffer(buffer, true);
138 END_CRIT_SECTION();
139
140 /* Unlock and release the buffer */
141 UnlockReleaseBuffer(buffer);
142 }
143
144 /*
145 * gistinsert -- wrapper for GiST tuple insertion.
146 *
147 * This is the public interface routine for tuple insertion in GiSTs.
148 * It doesn't do any work; just locks the relation and passes the buck.
149 */
150 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)151 gistinsert(Relation r, Datum *values, bool *isnull,
152 ItemPointer ht_ctid, Relation heapRel,
153 IndexUniqueCheck checkUnique,
154 IndexInfo *indexInfo)
155 {
156 GISTSTATE *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
157 IndexTuple itup;
158 MemoryContext oldCxt;
159
160 /* Initialize GISTSTATE cache if first call in this statement */
161 if (giststate == NULL)
162 {
163 oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
164 giststate = initGISTstate(r);
165 giststate->tempCxt = createTempGistContext();
166 indexInfo->ii_AmCache = (void *) giststate;
167 MemoryContextSwitchTo(oldCxt);
168 }
169
170 oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
171
172 itup = gistFormTuple(giststate, r,
173 values, isnull, true /* size is currently bogus */ );
174 itup->t_tid = *ht_ctid;
175
176 gistdoinsert(r, itup, 0, giststate, heapRel, false);
177
178 /* cleanup */
179 MemoryContextSwitchTo(oldCxt);
180 MemoryContextReset(giststate->tempCxt);
181
182 return false;
183 }
184
185
186 /*
187 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
188 * at that offset is atomically removed along with inserting the new tuples.
189 * This is used to replace a tuple with a new one.
190 *
191 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
192 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
193 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
194 *
195 * If 'markfollowright' is true and the page is split, the left child is
196 * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
197 * index build, however, there is no concurrent access and the page splitting
198 * is done in a slightly simpler fashion, and false is passed.
199 *
200 * If there is not enough room on the page, it is split. All the split
201 * pages are kept pinned and locked and returned in *splitinfo, the caller
202 * is responsible for inserting the downlinks for them. However, if
203 * 'buffer' is the root page and it needs to be split, gistplacetopage()
204 * performs the split as one atomic operation, and *splitinfo is set to NIL.
205 * In that case, we continue to hold the root page locked, and the child
206 * pages are released; note that new tuple(s) are *not* on the root page
207 * but in one of the new child pages.
208 *
209 * If 'newblkno' is not NULL, returns the block number of page the first
210 * new/updated tuple was inserted to. Usually it's the given page, but could
211 * be its right sibling if the page was split.
212 *
213 * Returns 'true' if the page was split, 'false' otherwise.
214 */
215 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel,bool is_build)216 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
217 Buffer buffer,
218 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
219 BlockNumber *newblkno,
220 Buffer leftchildbuf,
221 List **splitinfo,
222 bool markfollowright,
223 Relation heapRel,
224 bool is_build)
225 {
226 BlockNumber blkno = BufferGetBlockNumber(buffer);
227 Page page = BufferGetPage(buffer);
228 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
229 XLogRecPtr recptr;
230 int i;
231 bool is_split;
232
233 /*
234 * Refuse to modify a page that's incompletely split. This should not
235 * happen because we finish any incomplete splits while we walk down the
236 * tree. However, it's remotely possible that another concurrent inserter
237 * splits a parent page, and errors out before completing the split. We
238 * will just throw an error in that case, and leave any split we had in
239 * progress unfinished too. The next insert that comes along will clean up
240 * the mess.
241 */
242 if (GistFollowRight(page))
243 elog(ERROR, "concurrent GiST page split was incomplete");
244
245 /* should never try to insert to a deleted page */
246 Assert(!GistPageIsDeleted(page));
247
248 *splitinfo = NIL;
249
250 /*
251 * if isupdate, remove old key: This node's key has been modified, either
252 * because a child split occurred or because we needed to adjust our key
253 * for an insert in a child node. Therefore, remove the old version of
254 * this node's key.
255 *
256 * for WAL replay, in the non-split case we handle this by setting up a
257 * one-element todelete array; in the split case, it's handled implicitly
258 * because the tuple vector passed to gistSplit won't include this tuple.
259 */
260 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
261
262 /*
263 * If leaf page is full, try at first to delete dead tuples. And then
264 * check again.
265 */
266 if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
267 {
268 gistprunepage(rel, page, buffer, heapRel);
269 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
270 }
271
272 if (is_split)
273 {
274 /* no space for insertion */
275 IndexTuple *itvec;
276 int tlen;
277 SplitedPageLayout *dist = NULL,
278 *ptr;
279 BlockNumber oldrlink = InvalidBlockNumber;
280 GistNSN oldnsn = 0;
281 SplitedPageLayout rootpg;
282 bool is_rootsplit;
283 int npage;
284
285 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
286
287 /*
288 * Form index tuples vector to split. If we're replacing an old tuple,
289 * remove the old version from the vector.
290 */
291 itvec = gistextractpage(page, &tlen);
292 if (OffsetNumberIsValid(oldoffnum))
293 {
294 /* on inner page we should remove old tuple */
295 int pos = oldoffnum - FirstOffsetNumber;
296
297 tlen--;
298 if (pos != tlen)
299 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
300 }
301 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
302 dist = gistSplit(rel, page, itvec, tlen, giststate);
303
304 /*
305 * Check that split didn't produce too many pages.
306 */
307 npage = 0;
308 for (ptr = dist; ptr; ptr = ptr->next)
309 npage++;
310 /* in a root split, we'll add one more page to the list below */
311 if (is_rootsplit)
312 npage++;
313 if (npage > GIST_MAX_SPLIT_PAGES)
314 elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
315 npage, GIST_MAX_SPLIT_PAGES);
316
317 /*
318 * Set up pages to work with. Allocate new buffers for all but the
319 * leftmost page. The original page becomes the new leftmost page, and
320 * is just replaced with the new contents.
321 *
322 * For a root-split, allocate new buffers for all child pages, the
323 * original page is overwritten with new root page containing
324 * downlinks to the new child pages.
325 */
326 ptr = dist;
327 if (!is_rootsplit)
328 {
329 /* save old rightlink and NSN */
330 oldrlink = GistPageGetOpaque(page)->rightlink;
331 oldnsn = GistPageGetNSN(page);
332
333 dist->buffer = buffer;
334 dist->block.blkno = BufferGetBlockNumber(buffer);
335 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
336
337 /* clean all flags except F_LEAF */
338 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
339
340 ptr = ptr->next;
341 }
342 for (; ptr; ptr = ptr->next)
343 {
344 /* Allocate new page */
345 ptr->buffer = gistNewBuffer(rel);
346 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
347 ptr->page = BufferGetPage(ptr->buffer);
348 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
349 PredicateLockPageSplit(rel,
350 BufferGetBlockNumber(buffer),
351 BufferGetBlockNumber(ptr->buffer));
352 }
353
354 /*
355 * Now that we know which blocks the new pages go to, set up downlink
356 * tuples to point to them.
357 */
358 for (ptr = dist; ptr; ptr = ptr->next)
359 {
360 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
361 GistTupleSetValid(ptr->itup);
362 }
363
364 /*
365 * If this is a root split, we construct the new root page with the
366 * downlinks here directly, instead of requiring the caller to insert
367 * them. Add the new root page to the list along with the child pages.
368 */
369 if (is_rootsplit)
370 {
371 IndexTuple *downlinks;
372 int ndownlinks = 0;
373 int i;
374
375 rootpg.buffer = buffer;
376 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
377 GistPageGetOpaque(rootpg.page)->flags = 0;
378
379 /* Prepare a vector of all the downlinks */
380 for (ptr = dist; ptr; ptr = ptr->next)
381 ndownlinks++;
382 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
383 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
384 downlinks[i++] = ptr->itup;
385
386 rootpg.block.blkno = GIST_ROOT_BLKNO;
387 rootpg.block.num = ndownlinks;
388 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
389 &(rootpg.lenlist));
390 rootpg.itup = NULL;
391
392 rootpg.next = dist;
393 dist = &rootpg;
394 }
395 else
396 {
397 /* Prepare split-info to be returned to caller */
398 for (ptr = dist; ptr; ptr = ptr->next)
399 {
400 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
401
402 si->buf = ptr->buffer;
403 si->downlink = ptr->itup;
404 *splitinfo = lappend(*splitinfo, si);
405 }
406 }
407
408 /*
409 * Fill all pages. All the pages are new, ie. freshly allocated empty
410 * pages, or a temporary copy of the old page.
411 */
412 for (ptr = dist; ptr; ptr = ptr->next)
413 {
414 char *data = (char *) (ptr->list);
415
416 for (i = 0; i < ptr->block.num; i++)
417 {
418 IndexTuple thistup = (IndexTuple) data;
419
420 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
421 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
422
423 /*
424 * If this is the first inserted/updated tuple, let the caller
425 * know which page it landed on.
426 */
427 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
428 *newblkno = ptr->block.blkno;
429
430 data += IndexTupleSize(thistup);
431 }
432
433 /* Set up rightlinks */
434 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
435 GistPageGetOpaque(ptr->page)->rightlink =
436 ptr->next->block.blkno;
437 else
438 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
439
440 /*
441 * Mark the all but the right-most page with the follow-right
442 * flag. It will be cleared as soon as the downlink is inserted
443 * into the parent, but this ensures that if we error out before
444 * that, the index is still consistent. (in buffering build mode,
445 * any error will abort the index build anyway, so this is not
446 * needed.)
447 */
448 if (ptr->next && !is_rootsplit && markfollowright)
449 GistMarkFollowRight(ptr->page);
450 else
451 GistClearFollowRight(ptr->page);
452
453 /*
454 * Copy the NSN of the original page to all pages. The
455 * F_FOLLOW_RIGHT flags ensure that scans will follow the
456 * rightlinks until the downlinks are inserted.
457 */
458 GistPageSetNSN(ptr->page, oldnsn);
459 }
460
461 /*
462 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
463 * insertion for that. NB: The number of pages and data segments
464 * specified here must match the calculations in gistXLogSplit()!
465 */
466 if (!is_build && RelationNeedsWAL(rel))
467 XLogEnsureRecordSpace(npage, 1 + npage * 2);
468
469 START_CRIT_SECTION();
470
471 /*
472 * Must mark buffers dirty before XLogInsert, even though we'll still
473 * be changing their opaque fields below.
474 */
475 for (ptr = dist; ptr; ptr = ptr->next)
476 MarkBufferDirty(ptr->buffer);
477 if (BufferIsValid(leftchildbuf))
478 MarkBufferDirty(leftchildbuf);
479
480 /*
481 * The first page in the chain was a temporary working copy meant to
482 * replace the old page. Copy it over the old page.
483 */
484 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
485 dist->page = BufferGetPage(dist->buffer);
486
487 /*
488 * Write the WAL record.
489 *
490 * If we're building a new index, however, we don't WAL-log changes
491 * yet. The LSN-NSN interlock between parent and child requires that
492 * LSNs never move backwards, so set the LSNs to a value that's
493 * smaller than any real or fake unlogged LSN that might be generated
494 * later. (There can't be any concurrent scans during index build, so
495 * we don't need to be able to detect concurrent splits yet.)
496 */
497 if (is_build)
498 recptr = GistBuildLSN;
499 else
500 {
501 if (RelationNeedsWAL(rel))
502 recptr = gistXLogSplit(is_leaf,
503 dist, oldrlink, oldnsn, leftchildbuf,
504 markfollowright);
505 else
506 recptr = gistGetFakeLSN(rel);
507 }
508
509 for (ptr = dist; ptr; ptr = ptr->next)
510 PageSetLSN(ptr->page, recptr);
511
512 /*
513 * Return the new child buffers to the caller.
514 *
515 * If this was a root split, we've already inserted the downlink
516 * pointers, in the form of a new root page. Therefore we can release
517 * all the new buffers, and keep just the root page locked.
518 */
519 if (is_rootsplit)
520 {
521 for (ptr = dist->next; ptr; ptr = ptr->next)
522 UnlockReleaseBuffer(ptr->buffer);
523 }
524 }
525 else
526 {
527 /*
528 * Enough space. We always get here if ntup==0.
529 */
530 START_CRIT_SECTION();
531
532 /*
533 * Delete old tuple if any, then insert new tuple(s) if any. If
534 * possible, use the fast path of PageIndexTupleOverwrite.
535 */
536 if (OffsetNumberIsValid(oldoffnum))
537 {
538 if (ntup == 1)
539 {
540 /* One-for-one replacement, so use PageIndexTupleOverwrite */
541 if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
542 IndexTupleSize(*itup)))
543 elog(ERROR, "failed to add item to index page in \"%s\"",
544 RelationGetRelationName(rel));
545 }
546 else
547 {
548 /* Delete old, then append new tuple(s) to page */
549 PageIndexTupleDelete(page, oldoffnum);
550 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
551 }
552 }
553 else
554 {
555 /* Just append new tuples at the end of the page */
556 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
557 }
558
559 MarkBufferDirty(buffer);
560
561 if (BufferIsValid(leftchildbuf))
562 MarkBufferDirty(leftchildbuf);
563
564 if (is_build)
565 recptr = GistBuildLSN;
566 else
567 {
568 if (RelationNeedsWAL(rel))
569 {
570 OffsetNumber ndeloffs = 0,
571 deloffs[1];
572
573 if (OffsetNumberIsValid(oldoffnum))
574 {
575 deloffs[0] = oldoffnum;
576 ndeloffs = 1;
577 }
578
579 recptr = gistXLogUpdate(buffer,
580 deloffs, ndeloffs, itup, ntup,
581 leftchildbuf);
582 }
583 else
584 recptr = gistGetFakeLSN(rel);
585 }
586 PageSetLSN(page, recptr);
587
588 if (newblkno)
589 *newblkno = blkno;
590 }
591
592 /*
593 * If we inserted the downlink for a child page, set NSN and clear
594 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
595 * follow the rightlink if and only if they looked at the parent page
596 * before we inserted the downlink.
597 *
598 * Note that we do this *after* writing the WAL record. That means that
599 * the possible full page image in the WAL record does not include these
600 * changes, and they must be replayed even if the page is restored from
601 * the full page image. There's a chicken-and-egg problem: if we updated
602 * the child pages first, we wouldn't know the recptr of the WAL record
603 * we're about to write.
604 */
605 if (BufferIsValid(leftchildbuf))
606 {
607 Page leftpg = BufferGetPage(leftchildbuf);
608
609 GistPageSetNSN(leftpg, recptr);
610 GistClearFollowRight(leftpg);
611
612 PageSetLSN(leftpg, recptr);
613 }
614
615 END_CRIT_SECTION();
616
617 return is_split;
618 }
619
620 /*
621 * Workhouse routine for doing insertion into a GiST index. Note that
622 * this routine assumes it is invoked in a short-lived memory context,
623 * so it does not bother releasing palloc'd allocations.
624 */
625 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel,bool is_build)626 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
627 GISTSTATE *giststate, Relation heapRel, bool is_build)
628 {
629 ItemId iid;
630 IndexTuple idxtuple;
631 GISTInsertStack firststack;
632 GISTInsertStack *stack;
633 GISTInsertState state;
634 bool xlocked = false;
635
636 memset(&state, 0, sizeof(GISTInsertState));
637 state.freespace = freespace;
638 state.r = r;
639 state.heapRel = heapRel;
640 state.is_build = is_build;
641
642 /* Start from the root */
643 firststack.blkno = GIST_ROOT_BLKNO;
644 firststack.lsn = 0;
645 firststack.retry_from_parent = false;
646 firststack.parent = NULL;
647 firststack.downlinkoffnum = InvalidOffsetNumber;
648 state.stack = stack = &firststack;
649
650 /*
651 * Walk down along the path of smallest penalty, updating the parent
652 * pointers with the key we're inserting as we go. If we crash in the
653 * middle, the tree is consistent, although the possible parent updates
654 * were a waste.
655 */
656 for (;;)
657 {
658 /*
659 * If we split an internal page while descending the tree, we have to
660 * retry at the parent. (Normally, the LSN-NSN interlock below would
661 * also catch this and cause us to retry. But LSNs are not updated
662 * during index build.)
663 */
664 while (stack->retry_from_parent)
665 {
666 if (xlocked)
667 LockBuffer(stack->buffer, GIST_UNLOCK);
668 xlocked = false;
669 ReleaseBuffer(stack->buffer);
670 state.stack = stack = stack->parent;
671 }
672
673 if (XLogRecPtrIsInvalid(stack->lsn))
674 stack->buffer = ReadBuffer(state.r, stack->blkno);
675
676 /*
677 * Be optimistic and grab shared lock first. Swap it for an exclusive
678 * lock later if we need to update the page.
679 */
680 if (!xlocked)
681 {
682 LockBuffer(stack->buffer, GIST_SHARE);
683 gistcheckpage(state.r, stack->buffer);
684 }
685
686 stack->page = (Page) BufferGetPage(stack->buffer);
687 stack->lsn = xlocked ?
688 PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
689 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
690
691 /*
692 * If this page was split but the downlink was never inserted to the
693 * parent because the inserting backend crashed before doing that, fix
694 * that now.
695 */
696 if (GistFollowRight(stack->page))
697 {
698 if (!xlocked)
699 {
700 LockBuffer(stack->buffer, GIST_UNLOCK);
701 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
702 xlocked = true;
703 /* someone might've completed the split when we unlocked */
704 if (!GistFollowRight(stack->page))
705 continue;
706 }
707 gistfixsplit(&state, giststate);
708
709 UnlockReleaseBuffer(stack->buffer);
710 xlocked = false;
711 state.stack = stack = stack->parent;
712 continue;
713 }
714
715 if ((stack->blkno != GIST_ROOT_BLKNO &&
716 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
717 GistPageIsDeleted(stack->page))
718 {
719 /*
720 * Concurrent split or page deletion detected. There's no
721 * guarantee that the downlink for this page is consistent with
722 * the tuple we're inserting anymore, so go back to parent and
723 * rechoose the best child.
724 */
725 UnlockReleaseBuffer(stack->buffer);
726 xlocked = false;
727 state.stack = stack = stack->parent;
728 continue;
729 }
730
731 if (!GistPageIsLeaf(stack->page))
732 {
733 /*
734 * This is an internal page so continue to walk down the tree.
735 * Find the child node that has the minimum insertion penalty.
736 */
737 BlockNumber childblkno;
738 IndexTuple newtup;
739 GISTInsertStack *item;
740 OffsetNumber downlinkoffnum;
741
742 downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
743 iid = PageGetItemId(stack->page, downlinkoffnum);
744 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
745 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
746
747 /*
748 * Check that it's not a leftover invalid tuple from pre-9.1
749 */
750 if (GistTupleIsInvalid(idxtuple))
751 ereport(ERROR,
752 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
753 RelationGetRelationName(r)),
754 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
755 errhint("Please REINDEX it.")));
756
757 /*
758 * Check that the key representing the target child node is
759 * consistent with the key we're inserting. Update it if it's not.
760 */
761 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
762 if (newtup)
763 {
764 /*
765 * Swap shared lock for an exclusive one. Beware, the page may
766 * change while we unlock/lock the page...
767 */
768 if (!xlocked)
769 {
770 LockBuffer(stack->buffer, GIST_UNLOCK);
771 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
772 xlocked = true;
773 stack->page = (Page) BufferGetPage(stack->buffer);
774
775 if (PageGetLSN(stack->page) != stack->lsn)
776 {
777 /* the page was changed while we unlocked it, retry */
778 continue;
779 }
780 }
781
782 /*
783 * Update the tuple.
784 *
785 * We still hold the lock after gistinserttuple(), but it
786 * might have to split the page to make the updated tuple fit.
787 * In that case the updated tuple might migrate to the other
788 * half of the split, so we have to go back to the parent and
789 * descend back to the half that's a better fit for the new
790 * tuple.
791 */
792 if (gistinserttuple(&state, stack, giststate, newtup,
793 downlinkoffnum))
794 {
795 /*
796 * If this was a root split, the root page continues to be
797 * the parent and the updated tuple went to one of the
798 * child pages, so we just need to retry from the root
799 * page.
800 */
801 if (stack->blkno != GIST_ROOT_BLKNO)
802 {
803 UnlockReleaseBuffer(stack->buffer);
804 xlocked = false;
805 state.stack = stack = stack->parent;
806 }
807 continue;
808 }
809 }
810 LockBuffer(stack->buffer, GIST_UNLOCK);
811 xlocked = false;
812
813 /* descend to the chosen child */
814 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
815 item->blkno = childblkno;
816 item->parent = stack;
817 item->downlinkoffnum = downlinkoffnum;
818 state.stack = stack = item;
819 }
820 else
821 {
822 /*
823 * Leaf page. Insert the new key. We've already updated all the
824 * parents on the way down, but we might have to split the page if
825 * it doesn't fit. gistinserthere() will take care of that.
826 */
827
828 /*
829 * Swap shared lock for an exclusive one. Be careful, the page may
830 * change while we unlock/lock the page...
831 */
832 if (!xlocked)
833 {
834 LockBuffer(stack->buffer, GIST_UNLOCK);
835 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
836 xlocked = true;
837 stack->page = (Page) BufferGetPage(stack->buffer);
838 stack->lsn = PageGetLSN(stack->page);
839
840 if (stack->blkno == GIST_ROOT_BLKNO)
841 {
842 /*
843 * the only page that can become inner instead of leaf is
844 * the root page, so for root we should recheck it
845 */
846 if (!GistPageIsLeaf(stack->page))
847 {
848 /*
849 * very rare situation: during unlock/lock index with
850 * number of pages = 1 was increased
851 */
852 LockBuffer(stack->buffer, GIST_UNLOCK);
853 xlocked = false;
854 continue;
855 }
856
857 /*
858 * we don't need to check root split, because checking
859 * leaf/inner is enough to recognize split for root
860 */
861 }
862 else if ((GistFollowRight(stack->page) ||
863 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
864 GistPageIsDeleted(stack->page))
865 {
866 /*
867 * The page was split or deleted while we momentarily
868 * unlocked the page. Go back to parent.
869 */
870 UnlockReleaseBuffer(stack->buffer);
871 xlocked = false;
872 state.stack = stack = stack->parent;
873 continue;
874 }
875 }
876
877 /* now state.stack->(page, buffer and blkno) points to leaf page */
878
879 gistinserttuple(&state, stack, giststate, itup,
880 InvalidOffsetNumber);
881 LockBuffer(stack->buffer, GIST_UNLOCK);
882
883 /* Release any pins we might still hold before exiting */
884 for (; stack; stack = stack->parent)
885 ReleaseBuffer(stack->buffer);
886 break;
887 }
888 }
889 }
890
891 /*
892 * Traverse the tree to find path from root page to specified "child" block.
893 *
894 * returns a new insertion stack, starting from the parent of "child", up
895 * to the root. *downlinkoffnum is set to the offset of the downlink in the
896 * direct parent of child.
897 *
898 * To prevent deadlocks, this should lock only one page at a time.
899 */
900 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)901 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
902 {
903 Page page;
904 Buffer buffer;
905 OffsetNumber i,
906 maxoff;
907 ItemId iid;
908 IndexTuple idxtuple;
909 List *fifo;
910 GISTInsertStack *top,
911 *ptr;
912 BlockNumber blkno;
913
914 top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
915 top->blkno = GIST_ROOT_BLKNO;
916 top->downlinkoffnum = InvalidOffsetNumber;
917
918 fifo = list_make1(top);
919 while (fifo != NIL)
920 {
921 /* Get next page to visit */
922 top = linitial(fifo);
923 fifo = list_delete_first(fifo);
924
925 buffer = ReadBuffer(r, top->blkno);
926 LockBuffer(buffer, GIST_SHARE);
927 gistcheckpage(r, buffer);
928 page = (Page) BufferGetPage(buffer);
929
930 if (GistPageIsLeaf(page))
931 {
932 /*
933 * Because we scan the index top-down, all the rest of the pages
934 * in the queue must be leaf pages as well.
935 */
936 UnlockReleaseBuffer(buffer);
937 break;
938 }
939
940 /* currently, internal pages are never deleted */
941 Assert(!GistPageIsDeleted(page));
942
943 top->lsn = BufferGetLSNAtomic(buffer);
944
945 /*
946 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
947 * downlink. This should not normally happen..
948 */
949 if (GistFollowRight(page))
950 elog(ERROR, "concurrent GiST page split was incomplete");
951
952 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
953 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
954 {
955 /*
956 * Page was split while we looked elsewhere. We didn't see the
957 * downlink to the right page when we scanned the parent, so add
958 * it to the queue now.
959 *
960 * Put the right page ahead of the queue, so that we visit it
961 * next. That's important, because if this is the lowest internal
962 * level, just above leaves, we might already have queued up some
963 * leaf pages, and we assume that there can't be any non-leaf
964 * pages behind leaf pages.
965 */
966 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
967 ptr->blkno = GistPageGetOpaque(page)->rightlink;
968 ptr->downlinkoffnum = InvalidOffsetNumber;
969 ptr->parent = top->parent;
970
971 fifo = lcons(ptr, fifo);
972 }
973
974 maxoff = PageGetMaxOffsetNumber(page);
975
976 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
977 {
978 iid = PageGetItemId(page, i);
979 idxtuple = (IndexTuple) PageGetItem(page, iid);
980 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
981 if (blkno == child)
982 {
983 /* Found it! */
984 UnlockReleaseBuffer(buffer);
985 *downlinkoffnum = i;
986 return top;
987 }
988 else
989 {
990 /* Append this child to the list of pages to visit later */
991 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
992 ptr->blkno = blkno;
993 ptr->downlinkoffnum = i;
994 ptr->parent = top;
995
996 fifo = lappend(fifo, ptr);
997 }
998 }
999
1000 UnlockReleaseBuffer(buffer);
1001 }
1002
1003 elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
1004 RelationGetRelationName(r), child);
1005 return NULL; /* keep compiler quiet */
1006 }
1007
1008 /*
1009 * Updates the stack so that child->parent is the correct parent of the
1010 * child. child->parent must be exclusively locked on entry, and will
1011 * remain so at exit, but it might not be the same page anymore.
1012 */
1013 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)1014 gistFindCorrectParent(Relation r, GISTInsertStack *child)
1015 {
1016 GISTInsertStack *parent = child->parent;
1017
1018 gistcheckpage(r, parent->buffer);
1019 parent->page = (Page) BufferGetPage(parent->buffer);
1020
1021 /* here we don't need to distinguish between split and page update */
1022 if (child->downlinkoffnum == InvalidOffsetNumber ||
1023 parent->lsn != PageGetLSN(parent->page))
1024 {
1025 /* parent is changed, look child in right links until found */
1026 OffsetNumber i,
1027 maxoff;
1028 ItemId iid;
1029 IndexTuple idxtuple;
1030 GISTInsertStack *ptr;
1031
1032 while (true)
1033 {
1034 maxoff = PageGetMaxOffsetNumber(parent->page);
1035 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1036 {
1037 iid = PageGetItemId(parent->page, i);
1038 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1039 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1040 {
1041 /* yes!!, found */
1042 child->downlinkoffnum = i;
1043 return;
1044 }
1045 }
1046
1047 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1048 UnlockReleaseBuffer(parent->buffer);
1049 if (parent->blkno == InvalidBlockNumber)
1050 {
1051 /*
1052 * End of chain and still didn't find parent. It's a very-very
1053 * rare situation when root splited.
1054 */
1055 break;
1056 }
1057 parent->buffer = ReadBuffer(r, parent->blkno);
1058 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1059 gistcheckpage(r, parent->buffer);
1060 parent->page = (Page) BufferGetPage(parent->buffer);
1061 }
1062
1063 /*
1064 * awful!!, we need search tree to find parent ... , but before we
1065 * should release all old parent
1066 */
1067
1068 ptr = child->parent->parent; /* child->parent already released
1069 * above */
1070 while (ptr)
1071 {
1072 ReleaseBuffer(ptr->buffer);
1073 ptr = ptr->parent;
1074 }
1075
1076 /* ok, find new path */
1077 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1078
1079 /* read all buffers as expected by caller */
1080 /* note we don't lock them or gistcheckpage them here! */
1081 while (ptr)
1082 {
1083 ptr->buffer = ReadBuffer(r, ptr->blkno);
1084 ptr->page = (Page) BufferGetPage(ptr->buffer);
1085 ptr = ptr->parent;
1086 }
1087
1088 /* install new chain of parents to stack */
1089 child->parent = parent;
1090
1091 /* make recursive call to normal processing */
1092 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1093 gistFindCorrectParent(r, child);
1094 }
1095
1096 return;
1097 }
1098
1099 /*
1100 * Form a downlink pointer for the page in 'buf'.
1101 */
1102 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1103 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1104 GISTInsertStack *stack)
1105 {
1106 Page page = BufferGetPage(buf);
1107 OffsetNumber maxoff;
1108 OffsetNumber offset;
1109 IndexTuple downlink = NULL;
1110
1111 maxoff = PageGetMaxOffsetNumber(page);
1112 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1113 {
1114 IndexTuple ituple = (IndexTuple)
1115 PageGetItem(page, PageGetItemId(page, offset));
1116
1117 if (downlink == NULL)
1118 downlink = CopyIndexTuple(ituple);
1119 else
1120 {
1121 IndexTuple newdownlink;
1122
1123 newdownlink = gistgetadjusted(rel, downlink, ituple,
1124 giststate);
1125 if (newdownlink)
1126 downlink = newdownlink;
1127 }
1128 }
1129
1130 /*
1131 * If the page is completely empty, we can't form a meaningful downlink
1132 * for it. But we have to insert a downlink for the page. Any key will do,
1133 * as long as its consistent with the downlink of parent page, so that we
1134 * can legally insert it to the parent. A minimal one that matches as few
1135 * scans as possible would be best, to keep scans from doing useless work,
1136 * but we don't know how to construct that. So we just use the downlink of
1137 * the original page that was split - that's as far from optimal as it can
1138 * get but will do..
1139 */
1140 if (!downlink)
1141 {
1142 ItemId iid;
1143
1144 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1145 gistFindCorrectParent(rel, stack);
1146 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1147 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1148 downlink = CopyIndexTuple(downlink);
1149 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1150 }
1151
1152 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1153 GistTupleSetValid(downlink);
1154
1155 return downlink;
1156 }
1157
1158
1159 /*
1160 * Complete the incomplete split of state->stack->page.
1161 */
1162 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1163 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1164 {
1165 GISTInsertStack *stack = state->stack;
1166 Buffer buf;
1167 Page page;
1168 List *splitinfo = NIL;
1169
1170 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1171 RelationGetRelationName(state->r), stack->blkno);
1172
1173 Assert(GistFollowRight(stack->page));
1174 Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1175
1176 buf = stack->buffer;
1177
1178 /*
1179 * Read the chain of split pages, following the rightlinks. Construct a
1180 * downlink tuple for each page.
1181 */
1182 for (;;)
1183 {
1184 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1185 IndexTuple downlink;
1186
1187 page = BufferGetPage(buf);
1188
1189 /* Form the new downlink tuples to insert to parent */
1190 downlink = gistformdownlink(state->r, buf, giststate, stack);
1191
1192 si->buf = buf;
1193 si->downlink = downlink;
1194
1195 splitinfo = lappend(splitinfo, si);
1196
1197 if (GistFollowRight(page))
1198 {
1199 /* lock next page */
1200 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1201 LockBuffer(buf, GIST_EXCLUSIVE);
1202 }
1203 else
1204 break;
1205 }
1206
1207 /* Insert the downlinks */
1208 gistfinishsplit(state, stack, giststate, splitinfo, false);
1209 }
1210
1211 /*
1212 * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1213 * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1214 * 'stack' represents the path from the root to the page being updated.
1215 *
1216 * The caller must hold an exclusive lock on stack->buffer. The lock is still
1217 * held on return, but the page might not contain the inserted tuple if the
1218 * page was split. The function returns true if the page was split, false
1219 * otherwise.
1220 */
1221 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1222 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1223 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1224 {
1225 return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1226 InvalidBuffer, InvalidBuffer, false, false);
1227 }
1228
1229 /* ----------------
1230 * An extended workhorse version of gistinserttuple(). This version allows
1231 * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1232 * This is used to recursively update the downlinks in the parent when a page
1233 * is split.
1234 *
1235 * If leftchild and rightchild are valid, we're inserting/replacing the
1236 * downlink for rightchild, and leftchild is its left sibling. We clear the
1237 * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1238 * insertion of the downlink.
1239 *
1240 * To avoid holding locks for longer than necessary, when recursing up the
1241 * tree to update the parents, the locking is a bit peculiar here. On entry,
1242 * the caller must hold an exclusive lock on stack->buffer, as well as
1243 * leftchild and rightchild if given. On return:
1244 *
1245 * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1246 * always kept pinned, however.
1247 * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1248 * is kept pinned.
1249 * - Lock and pin on 'rightchild' are always released.
1250 *
1251 * Returns 'true' if the page had to be split. Note that if the page was
1252 * split, the inserted/updated tuples might've been inserted to a right
1253 * sibling of stack->buffer instead of stack->buffer itself.
1254 */
1255 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1256 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1257 GISTSTATE *giststate,
1258 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1259 Buffer leftchild, Buffer rightchild,
1260 bool unlockbuf, bool unlockleftchild)
1261 {
1262 List *splitinfo;
1263 bool is_split;
1264
1265 /*
1266 * Check for any rw conflicts (in serializable isolation level) just
1267 * before we intend to modify the page
1268 */
1269 CheckForSerializableConflictIn(state->r, NULL, stack->buffer);
1270
1271 /* Insert the tuple(s) to the page, splitting the page if necessary */
1272 is_split = gistplacetopage(state->r, state->freespace, giststate,
1273 stack->buffer,
1274 tuples, ntup,
1275 oldoffnum, NULL,
1276 leftchild,
1277 &splitinfo,
1278 true,
1279 state->heapRel,
1280 state->is_build);
1281
1282 /*
1283 * Before recursing up in case the page was split, release locks on the
1284 * child pages. We don't need to keep them locked when updating the
1285 * parent.
1286 */
1287 if (BufferIsValid(rightchild))
1288 UnlockReleaseBuffer(rightchild);
1289 if (BufferIsValid(leftchild) && unlockleftchild)
1290 LockBuffer(leftchild, GIST_UNLOCK);
1291
1292 /*
1293 * If we had to split, insert/update the downlinks in the parent. If the
1294 * caller requested us to release the lock on stack->buffer, tell
1295 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1296 * didn't have to split, release it ourselves.
1297 */
1298 if (splitinfo)
1299 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1300 else if (unlockbuf)
1301 LockBuffer(stack->buffer, GIST_UNLOCK);
1302
1303 return is_split;
1304 }
1305
1306 /*
1307 * Finish an incomplete split by inserting/updating the downlinks in parent
1308 * page. 'splitinfo' contains all the child pages involved in the split,
1309 * from left-to-right.
1310 *
1311 * On entry, the caller must hold a lock on stack->buffer and all the child
1312 * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1313 * released on return. The child pages are always unlocked and unpinned.
1314 */
1315 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1316 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1317 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1318 {
1319 ListCell *lc;
1320 List *reversed;
1321 GISTPageSplitInfo *right;
1322 GISTPageSplitInfo *left;
1323 IndexTuple tuples[2];
1324
1325 /* A split always contains at least two halves */
1326 Assert(list_length(splitinfo) >= 2);
1327
1328 /*
1329 * We need to insert downlinks for each new page, and update the downlink
1330 * for the original (leftmost) page in the split. Begin at the rightmost
1331 * page, inserting one downlink at a time until there's only two pages
1332 * left. Finally insert the downlink for the last new page and update the
1333 * downlink for the original page as one operation.
1334 */
1335
1336 /* for convenience, create a copy of the list in reverse order */
1337 reversed = NIL;
1338 foreach(lc, splitinfo)
1339 {
1340 reversed = lcons(lfirst(lc), reversed);
1341 }
1342
1343 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1344
1345 /*
1346 * Insert downlinks for the siblings from right to left, until there are
1347 * only two siblings left.
1348 */
1349 while (list_length(reversed) > 2)
1350 {
1351 right = (GISTPageSplitInfo *) linitial(reversed);
1352 left = (GISTPageSplitInfo *) lsecond(reversed);
1353
1354 gistFindCorrectParent(state->r, stack);
1355 if (gistinserttuples(state, stack->parent, giststate,
1356 &right->downlink, 1,
1357 InvalidOffsetNumber,
1358 left->buf, right->buf, false, false))
1359 {
1360 /*
1361 * If the parent page was split, the existing downlink might
1362 * have moved.
1363 */
1364 stack->downlinkoffnum = InvalidOffsetNumber;
1365 }
1366 /* gistinserttuples() released the lock on right->buf. */
1367 reversed = list_delete_first(reversed);
1368 }
1369
1370 right = (GISTPageSplitInfo *) linitial(reversed);
1371 left = (GISTPageSplitInfo *) lsecond(reversed);
1372
1373 /*
1374 * Finally insert downlink for the remaining right page and update the
1375 * downlink for the original page to not contain the tuples that were
1376 * moved to the new pages.
1377 */
1378 tuples[0] = left->downlink;
1379 tuples[1] = right->downlink;
1380 gistFindCorrectParent(state->r, stack);
1381 if (gistinserttuples(state, stack->parent, giststate,
1382 tuples, 2,
1383 stack->downlinkoffnum,
1384 left->buf, right->buf,
1385 true, /* Unlock parent */
1386 unlockbuf /* Unlock stack->buffer if caller wants that */
1387 ))
1388 {
1389 /*
1390 * If the parent page was split, the downlink might have moved.
1391 */
1392 stack->downlinkoffnum = InvalidOffsetNumber;
1393 }
1394
1395 Assert(left->buf == stack->buffer);
1396
1397 /*
1398 * If we split the page because we had to adjust the downlink on an
1399 * internal page, while descending the tree for inserting a new tuple,
1400 * then this might no longer be the correct page for the new tuple. The
1401 * downlink to this page might not cover the new tuple anymore, it might
1402 * need to go to the newly-created right sibling instead. Tell the caller
1403 * to walk back up the stack, to re-check at the parent which page to
1404 * insert to.
1405 *
1406 * Normally, the LSN-NSN interlock during the tree descend would also
1407 * detect that a concurrent split happened (by ourselves), and cause us to
1408 * retry at the parent. But that mechanism doesn't work during index
1409 * build, because we don't do WAL-logging, and don't update LSNs, during
1410 * index build.
1411 */
1412 stack->retry_from_parent = true;
1413 }
1414
1415 /*
1416 * gistSplit -- split a page in the tree and fill struct
1417 * used for XLOG and real writes buffers. Function is recursive, ie
1418 * it will split page until keys will fit in every page.
1419 */
1420 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1421 gistSplit(Relation r,
1422 Page page,
1423 IndexTuple *itup, /* contains compressed entry */
1424 int len,
1425 GISTSTATE *giststate)
1426 {
1427 IndexTuple *lvectup,
1428 *rvectup;
1429 GistSplitVector v;
1430 int i;
1431 SplitedPageLayout *res = NULL;
1432
1433 /* this should never recurse very deeply, but better safe than sorry */
1434 check_stack_depth();
1435
1436 /* there's no point in splitting an empty page */
1437 Assert(len > 0);
1438
1439 /*
1440 * If a single tuple doesn't fit on a page, no amount of splitting will
1441 * help.
1442 */
1443 if (len == 1)
1444 ereport(ERROR,
1445 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1446 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1447 IndexTupleSize(itup[0]), GiSTPageSize,
1448 RelationGetRelationName(r))));
1449
1450 memset(v.spl_lisnull, true,
1451 sizeof(bool) * giststate->nonLeafTupdesc->natts);
1452 memset(v.spl_risnull, true,
1453 sizeof(bool) * giststate->nonLeafTupdesc->natts);
1454 gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1455
1456 /* form left and right vector */
1457 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1458 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1459
1460 for (i = 0; i < v.splitVector.spl_nleft; i++)
1461 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1462
1463 for (i = 0; i < v.splitVector.spl_nright; i++)
1464 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1465
1466 /* finalize splitting (may need another split) */
1467 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1468 {
1469 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1470 }
1471 else
1472 {
1473 ROTATEDIST(res);
1474 res->block.num = v.splitVector.spl_nright;
1475 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1476 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1477 }
1478
1479 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1480 {
1481 SplitedPageLayout *resptr,
1482 *subres;
1483
1484 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1485
1486 /* install on list's tail */
1487 while (resptr->next)
1488 resptr = resptr->next;
1489
1490 resptr->next = res;
1491 res = subres;
1492 }
1493 else
1494 {
1495 ROTATEDIST(res);
1496 res->block.num = v.splitVector.spl_nleft;
1497 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1498 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1499 }
1500
1501 return res;
1502 }
1503
1504 /*
1505 * Create a GISTSTATE and fill it with information about the index
1506 */
1507 GISTSTATE *
initGISTstate(Relation index)1508 initGISTstate(Relation index)
1509 {
1510 GISTSTATE *giststate;
1511 MemoryContext scanCxt;
1512 MemoryContext oldCxt;
1513 int i;
1514
1515 /* safety check to protect fixed-size arrays in GISTSTATE */
1516 if (index->rd_att->natts > INDEX_MAX_KEYS)
1517 elog(ERROR, "numberOfAttributes %d > %d",
1518 index->rd_att->natts, INDEX_MAX_KEYS);
1519
1520 /* Create the memory context that will hold the GISTSTATE */
1521 scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1522 "GiST scan context",
1523 ALLOCSET_DEFAULT_SIZES);
1524 oldCxt = MemoryContextSwitchTo(scanCxt);
1525
1526 /* Create and fill in the GISTSTATE */
1527 giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1528
1529 giststate->scanCxt = scanCxt;
1530 giststate->tempCxt = scanCxt; /* caller must change this if needed */
1531 giststate->leafTupdesc = index->rd_att;
1532
1533 /*
1534 * The truncated tupdesc for non-leaf index tuples, which doesn't contain
1535 * the INCLUDE attributes.
1536 *
1537 * It is used to form tuples during tuple adjustment and page split.
1538 * B-tree creates shortened tuple descriptor for every truncated tuple,
1539 * because it is doing this less often: it does not have to form truncated
1540 * tuples during page split. Also, B-tree is not adjusting tuples on
1541 * internal pages the way GiST does.
1542 */
1543 giststate->nonLeafTupdesc = CreateTupleDescCopyConstr(index->rd_att);
1544 giststate->nonLeafTupdesc->natts =
1545 IndexRelationGetNumberOfKeyAttributes(index);
1546
1547 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(index); i++)
1548 {
1549 fmgr_info_copy(&(giststate->consistentFn[i]),
1550 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1551 scanCxt);
1552 fmgr_info_copy(&(giststate->unionFn[i]),
1553 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1554 scanCxt);
1555
1556 /* opclasses are not required to provide a Compress method */
1557 if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1558 fmgr_info_copy(&(giststate->compressFn[i]),
1559 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1560 scanCxt);
1561 else
1562 giststate->compressFn[i].fn_oid = InvalidOid;
1563
1564 /* opclasses are not required to provide a Decompress method */
1565 if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1566 fmgr_info_copy(&(giststate->decompressFn[i]),
1567 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1568 scanCxt);
1569 else
1570 giststate->decompressFn[i].fn_oid = InvalidOid;
1571
1572 fmgr_info_copy(&(giststate->penaltyFn[i]),
1573 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1574 scanCxt);
1575 fmgr_info_copy(&(giststate->picksplitFn[i]),
1576 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1577 scanCxt);
1578 fmgr_info_copy(&(giststate->equalFn[i]),
1579 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1580 scanCxt);
1581
1582 /* opclasses are not required to provide a Distance method */
1583 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1584 fmgr_info_copy(&(giststate->distanceFn[i]),
1585 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1586 scanCxt);
1587 else
1588 giststate->distanceFn[i].fn_oid = InvalidOid;
1589
1590 /* opclasses are not required to provide a Fetch method */
1591 if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1592 fmgr_info_copy(&(giststate->fetchFn[i]),
1593 index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1594 scanCxt);
1595 else
1596 giststate->fetchFn[i].fn_oid = InvalidOid;
1597
1598 /*
1599 * If the index column has a specified collation, we should honor that
1600 * while doing comparisons. However, we may have a collatable storage
1601 * type for a noncollatable indexed data type. If there's no index
1602 * collation then specify default collation in case the support
1603 * functions need collation. This is harmless if the support
1604 * functions don't care about collation, so we just do it
1605 * unconditionally. (We could alternatively call get_typcollation,
1606 * but that seems like expensive overkill --- there aren't going to be
1607 * any cases where a GiST storage type has a nondefault collation.)
1608 */
1609 if (OidIsValid(index->rd_indcollation[i]))
1610 giststate->supportCollation[i] = index->rd_indcollation[i];
1611 else
1612 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1613 }
1614
1615 /* No opclass information for INCLUDE attributes */
1616 for (; i < index->rd_att->natts; i++)
1617 {
1618 giststate->consistentFn[i].fn_oid = InvalidOid;
1619 giststate->unionFn[i].fn_oid = InvalidOid;
1620 giststate->compressFn[i].fn_oid = InvalidOid;
1621 giststate->decompressFn[i].fn_oid = InvalidOid;
1622 giststate->penaltyFn[i].fn_oid = InvalidOid;
1623 giststate->picksplitFn[i].fn_oid = InvalidOid;
1624 giststate->equalFn[i].fn_oid = InvalidOid;
1625 giststate->distanceFn[i].fn_oid = InvalidOid;
1626 giststate->fetchFn[i].fn_oid = InvalidOid;
1627 giststate->supportCollation[i] = InvalidOid;
1628 }
1629
1630 MemoryContextSwitchTo(oldCxt);
1631
1632 return giststate;
1633 }
1634
1635 void
freeGISTstate(GISTSTATE * giststate)1636 freeGISTstate(GISTSTATE *giststate)
1637 {
1638 /* It's sufficient to delete the scanCxt */
1639 MemoryContextDelete(giststate->scanCxt);
1640 }
1641
1642 /*
1643 * gistprunepage() -- try to remove LP_DEAD items from the given page.
1644 * Function assumes that buffer is exclusively locked.
1645 */
1646 static void
gistprunepage(Relation rel,Page page,Buffer buffer,Relation heapRel)1647 gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1648 {
1649 OffsetNumber deletable[MaxIndexTuplesPerPage];
1650 int ndeletable = 0;
1651 OffsetNumber offnum,
1652 maxoff;
1653 TransactionId latestRemovedXid = InvalidTransactionId;
1654
1655 Assert(GistPageIsLeaf(page));
1656
1657 /*
1658 * Scan over all items to see which ones need to be deleted according to
1659 * LP_DEAD flags.
1660 */
1661 maxoff = PageGetMaxOffsetNumber(page);
1662 for (offnum = FirstOffsetNumber;
1663 offnum <= maxoff;
1664 offnum = OffsetNumberNext(offnum))
1665 {
1666 ItemId itemId = PageGetItemId(page, offnum);
1667
1668 if (ItemIdIsDead(itemId))
1669 deletable[ndeletable++] = offnum;
1670 }
1671
1672 if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
1673 latestRemovedXid =
1674 index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
1675 deletable, ndeletable);
1676
1677 if (ndeletable > 0)
1678 {
1679 START_CRIT_SECTION();
1680
1681 PageIndexMultiDelete(page, deletable, ndeletable);
1682
1683 /*
1684 * Mark the page as not containing any LP_DEAD items. This is not
1685 * certainly true (there might be some that have recently been marked,
1686 * but weren't included in our target-item list), but it will almost
1687 * always be true and it doesn't seem worth an additional page scan to
1688 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1689 */
1690 GistClearPageHasGarbage(page);
1691
1692 MarkBufferDirty(buffer);
1693
1694 /* XLOG stuff */
1695 if (RelationNeedsWAL(rel))
1696 {
1697 XLogRecPtr recptr;
1698
1699 recptr = gistXLogDelete(buffer,
1700 deletable, ndeletable,
1701 latestRemovedXid);
1702
1703 PageSetLSN(page, recptr);
1704 }
1705 else
1706 PageSetLSN(page, gistGetFakeLSN(rel));
1707
1708 END_CRIT_SECTION();
1709 }
1710
1711 /*
1712 * Note: if we didn't find any LP_DEAD items, then the page's
1713 * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1714 * separate write to clear it, however. We will clear it when we split
1715 * the page.
1716 */
1717 }
1718