1 /*-------------------------------------------------------------------------
2 *
3 * gist.c
4 * interface routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "storage/lmgr.h"
22 #include "storage/predicate.h"
23 #include "nodes/execnodes.h"
24 #include "utils/builtins.h"
25 #include "utils/index_selfuncs.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28
29
30 /* non-export function prototypes */
31 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 GISTSTATE *giststate,
36 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 Buffer leftchild, Buffer rightchild,
38 bool unlockbuf, bool unlockleftchild);
39 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
41 static void gistvacuumpage(Relation rel, Page page, Buffer buffer,
42 Relation heapRel);
43
44
45 #define ROTATEDIST(d) do { \
46 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
47 memset(tmp,0,sizeof(SplitedPageLayout)); \
48 tmp->block.blkno = InvalidBlockNumber; \
49 tmp->buffer = InvalidBuffer; \
50 tmp->next = (d); \
51 (d)=tmp; \
52 } while(0)
53
54
55 /*
56 * GiST handler function: return IndexAmRoutine with access method parameters
57 * and callbacks.
58 */
59 Datum
gisthandler(PG_FUNCTION_ARGS)60 gisthandler(PG_FUNCTION_ARGS)
61 {
62 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
63
64 amroutine->amstrategies = 0;
65 amroutine->amsupport = GISTNProcs;
66 amroutine->amcanorder = false;
67 amroutine->amcanorderbyop = true;
68 amroutine->amcanbackward = false;
69 amroutine->amcanunique = false;
70 amroutine->amcanmulticol = true;
71 amroutine->amoptionalkey = true;
72 amroutine->amsearcharray = false;
73 amroutine->amsearchnulls = true;
74 amroutine->amstorage = true;
75 amroutine->amclusterable = true;
76 amroutine->ampredlocks = true;
77 amroutine->amcanparallel = false;
78 amroutine->amcaninclude = false;
79 amroutine->amkeytype = InvalidOid;
80
81 amroutine->ambuild = gistbuild;
82 amroutine->ambuildempty = gistbuildempty;
83 amroutine->aminsert = gistinsert;
84 amroutine->ambulkdelete = gistbulkdelete;
85 amroutine->amvacuumcleanup = gistvacuumcleanup;
86 amroutine->amcanreturn = gistcanreturn;
87 amroutine->amcostestimate = gistcostestimate;
88 amroutine->amoptions = gistoptions;
89 amroutine->amproperty = gistproperty;
90 amroutine->amvalidate = gistvalidate;
91 amroutine->ambeginscan = gistbeginscan;
92 amroutine->amrescan = gistrescan;
93 amroutine->amgettuple = gistgettuple;
94 amroutine->amgetbitmap = gistgetbitmap;
95 amroutine->amendscan = gistendscan;
96 amroutine->ammarkpos = NULL;
97 amroutine->amrestrpos = NULL;
98 amroutine->amestimateparallelscan = NULL;
99 amroutine->aminitparallelscan = NULL;
100 amroutine->amparallelrescan = NULL;
101
102 PG_RETURN_POINTER(amroutine);
103 }
104
105 /*
106 * Create and return a temporary memory context for use by GiST. We
107 * _always_ invoke user-provided methods in a temporary memory
108 * context, so that memory leaks in those functions cannot cause
109 * problems. Also, we use some additional temporary contexts in the
110 * GiST code itself, to avoid the need to do some awkward manual
111 * memory management.
112 */
113 MemoryContext
createTempGistContext(void)114 createTempGistContext(void)
115 {
116 return AllocSetContextCreate(CurrentMemoryContext,
117 "GiST temporary context",
118 ALLOCSET_DEFAULT_SIZES);
119 }
120
121 /*
122 * gistbuildempty() -- build an empty gist index in the initialization fork
123 */
124 void
gistbuildempty(Relation index)125 gistbuildempty(Relation index)
126 {
127 Buffer buffer;
128
129 /* Initialize the root page */
130 buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
131 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
132
133 /* Initialize and xlog buffer */
134 START_CRIT_SECTION();
135 GISTInitBuffer(buffer, F_LEAF);
136 MarkBufferDirty(buffer);
137 log_newpage_buffer(buffer, true);
138 END_CRIT_SECTION();
139
140 /* Unlock and release the buffer */
141 UnlockReleaseBuffer(buffer);
142 }
143
144 /*
145 * gistinsert -- wrapper for GiST tuple insertion.
146 *
147 * This is the public interface routine for tuple insertion in GiSTs.
148 * It doesn't do any work; just locks the relation and passes the buck.
149 */
150 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)151 gistinsert(Relation r, Datum *values, bool *isnull,
152 ItemPointer ht_ctid, Relation heapRel,
153 IndexUniqueCheck checkUnique,
154 IndexInfo *indexInfo)
155 {
156 GISTSTATE *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
157 IndexTuple itup;
158 MemoryContext oldCxt;
159
160 /* Initialize GISTSTATE cache if first call in this statement */
161 if (giststate == NULL)
162 {
163 oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
164 giststate = initGISTstate(r);
165 giststate->tempCxt = createTempGistContext();
166 indexInfo->ii_AmCache = (void *) giststate;
167 MemoryContextSwitchTo(oldCxt);
168 }
169
170 oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
171
172 itup = gistFormTuple(giststate, r,
173 values, isnull, true /* size is currently bogus */ );
174 itup->t_tid = *ht_ctid;
175
176 gistdoinsert(r, itup, 0, giststate, heapRel);
177
178 /* cleanup */
179 MemoryContextSwitchTo(oldCxt);
180 MemoryContextReset(giststate->tempCxt);
181
182 return false;
183 }
184
185
186 /*
187 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
188 * at that offset is atomically removed along with inserting the new tuples.
189 * This is used to replace a tuple with a new one.
190 *
191 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
192 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
193 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
194 *
195 * If 'markfollowright' is true and the page is split, the left child is
196 * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
197 * index build, however, there is no concurrent access and the page splitting
198 * is done in a slightly simpler fashion, and false is passed.
199 *
200 * If there is not enough room on the page, it is split. All the split
201 * pages are kept pinned and locked and returned in *splitinfo, the caller
202 * is responsible for inserting the downlinks for them. However, if
203 * 'buffer' is the root page and it needs to be split, gistplacetopage()
204 * performs the split as one atomic operation, and *splitinfo is set to NIL.
205 * In that case, we continue to hold the root page locked, and the child
206 * pages are released; note that new tuple(s) are *not* on the root page
207 * but in one of the new child pages.
208 *
209 * If 'newblkno' is not NULL, returns the block number of page the first
210 * new/updated tuple was inserted to. Usually it's the given page, but could
211 * be its right sibling if the page was split.
212 *
213 * Returns 'true' if the page was split, 'false' otherwise.
214 */
215 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel)216 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
217 Buffer buffer,
218 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
219 BlockNumber *newblkno,
220 Buffer leftchildbuf,
221 List **splitinfo,
222 bool markfollowright,
223 Relation heapRel)
224 {
225 BlockNumber blkno = BufferGetBlockNumber(buffer);
226 Page page = BufferGetPage(buffer);
227 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
228 XLogRecPtr recptr;
229 int i;
230 bool is_split;
231
232 /*
233 * Refuse to modify a page that's incompletely split. This should not
234 * happen because we finish any incomplete splits while we walk down the
235 * tree. However, it's remotely possible that another concurrent inserter
236 * splits a parent page, and errors out before completing the split. We
237 * will just throw an error in that case, and leave any split we had in
238 * progress unfinished too. The next insert that comes along will clean up
239 * the mess.
240 */
241 if (GistFollowRight(page))
242 elog(ERROR, "concurrent GiST page split was incomplete");
243
244 *splitinfo = NIL;
245
246 /*
247 * if isupdate, remove old key: This node's key has been modified, either
248 * because a child split occurred or because we needed to adjust our key
249 * for an insert in a child node. Therefore, remove the old version of
250 * this node's key.
251 *
252 * for WAL replay, in the non-split case we handle this by setting up a
253 * one-element todelete array; in the split case, it's handled implicitly
254 * because the tuple vector passed to gistSplit won't include this tuple.
255 */
256 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
257
258 /*
259 * If leaf page is full, try at first to delete dead tuples. And then
260 * check again.
261 */
262 if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
263 {
264 gistvacuumpage(rel, page, buffer, heapRel);
265 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
266 }
267
268 if (is_split)
269 {
270 /* no space for insertion */
271 IndexTuple *itvec;
272 int tlen;
273 SplitedPageLayout *dist = NULL,
274 *ptr;
275 BlockNumber oldrlink = InvalidBlockNumber;
276 GistNSN oldnsn = 0;
277 SplitedPageLayout rootpg;
278 bool is_rootsplit;
279 int npage;
280
281 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
282
283 /*
284 * Form index tuples vector to split. If we're replacing an old tuple,
285 * remove the old version from the vector.
286 */
287 itvec = gistextractpage(page, &tlen);
288 if (OffsetNumberIsValid(oldoffnum))
289 {
290 /* on inner page we should remove old tuple */
291 int pos = oldoffnum - FirstOffsetNumber;
292
293 tlen--;
294 if (pos != tlen)
295 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
296 }
297 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
298 dist = gistSplit(rel, page, itvec, tlen, giststate);
299
300 /*
301 * Check that split didn't produce too many pages.
302 */
303 npage = 0;
304 for (ptr = dist; ptr; ptr = ptr->next)
305 npage++;
306 /* in a root split, we'll add one more page to the list below */
307 if (is_rootsplit)
308 npage++;
309 if (npage > GIST_MAX_SPLIT_PAGES)
310 elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
311 npage, GIST_MAX_SPLIT_PAGES);
312
313 /*
314 * Set up pages to work with. Allocate new buffers for all but the
315 * leftmost page. The original page becomes the new leftmost page, and
316 * is just replaced with the new contents.
317 *
318 * For a root-split, allocate new buffers for all child pages, the
319 * original page is overwritten with new root page containing
320 * downlinks to the new child pages.
321 */
322 ptr = dist;
323 if (!is_rootsplit)
324 {
325 /* save old rightlink and NSN */
326 oldrlink = GistPageGetOpaque(page)->rightlink;
327 oldnsn = GistPageGetNSN(page);
328
329 dist->buffer = buffer;
330 dist->block.blkno = BufferGetBlockNumber(buffer);
331 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
332
333 /* clean all flags except F_LEAF */
334 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
335
336 ptr = ptr->next;
337 }
338 for (; ptr; ptr = ptr->next)
339 {
340 /* Allocate new page */
341 ptr->buffer = gistNewBuffer(rel);
342 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
343 ptr->page = BufferGetPage(ptr->buffer);
344 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
345 PredicateLockPageSplit(rel,
346 BufferGetBlockNumber(buffer),
347 BufferGetBlockNumber(ptr->buffer));
348 }
349
350 /*
351 * Now that we know which blocks the new pages go to, set up downlink
352 * tuples to point to them.
353 */
354 for (ptr = dist; ptr; ptr = ptr->next)
355 {
356 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
357 GistTupleSetValid(ptr->itup);
358 }
359
360 /*
361 * If this is a root split, we construct the new root page with the
362 * downlinks here directly, instead of requiring the caller to insert
363 * them. Add the new root page to the list along with the child pages.
364 */
365 if (is_rootsplit)
366 {
367 IndexTuple *downlinks;
368 int ndownlinks = 0;
369 int i;
370
371 rootpg.buffer = buffer;
372 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
373 GistPageGetOpaque(rootpg.page)->flags = 0;
374
375 /* Prepare a vector of all the downlinks */
376 for (ptr = dist; ptr; ptr = ptr->next)
377 ndownlinks++;
378 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
379 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
380 downlinks[i++] = ptr->itup;
381
382 rootpg.block.blkno = GIST_ROOT_BLKNO;
383 rootpg.block.num = ndownlinks;
384 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
385 &(rootpg.lenlist));
386 rootpg.itup = NULL;
387
388 rootpg.next = dist;
389 dist = &rootpg;
390 }
391 else
392 {
393 /* Prepare split-info to be returned to caller */
394 for (ptr = dist; ptr; ptr = ptr->next)
395 {
396 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
397
398 si->buf = ptr->buffer;
399 si->downlink = ptr->itup;
400 *splitinfo = lappend(*splitinfo, si);
401 }
402 }
403
404 /*
405 * Fill all pages. All the pages are new, ie. freshly allocated empty
406 * pages, or a temporary copy of the old page.
407 */
408 for (ptr = dist; ptr; ptr = ptr->next)
409 {
410 char *data = (char *) (ptr->list);
411
412 for (i = 0; i < ptr->block.num; i++)
413 {
414 IndexTuple thistup = (IndexTuple) data;
415
416 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
417 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
418
419 /*
420 * If this is the first inserted/updated tuple, let the caller
421 * know which page it landed on.
422 */
423 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
424 *newblkno = ptr->block.blkno;
425
426 data += IndexTupleSize(thistup);
427 }
428
429 /* Set up rightlinks */
430 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
431 GistPageGetOpaque(ptr->page)->rightlink =
432 ptr->next->block.blkno;
433 else
434 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
435
436 /*
437 * Mark the all but the right-most page with the follow-right
438 * flag. It will be cleared as soon as the downlink is inserted
439 * into the parent, but this ensures that if we error out before
440 * that, the index is still consistent. (in buffering build mode,
441 * any error will abort the index build anyway, so this is not
442 * needed.)
443 */
444 if (ptr->next && !is_rootsplit && markfollowright)
445 GistMarkFollowRight(ptr->page);
446 else
447 GistClearFollowRight(ptr->page);
448
449 /*
450 * Copy the NSN of the original page to all pages. The
451 * F_FOLLOW_RIGHT flags ensure that scans will follow the
452 * rightlinks until the downlinks are inserted.
453 */
454 GistPageSetNSN(ptr->page, oldnsn);
455 }
456
457 /*
458 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
459 * insertion for that. NB: The number of pages and data segments
460 * specified here must match the calculations in gistXLogSplit()!
461 */
462 if (RelationNeedsWAL(rel))
463 XLogEnsureRecordSpace(npage, 1 + npage * 2);
464
465 START_CRIT_SECTION();
466
467 /*
468 * Must mark buffers dirty before XLogInsert, even though we'll still
469 * be changing their opaque fields below.
470 */
471 for (ptr = dist; ptr; ptr = ptr->next)
472 MarkBufferDirty(ptr->buffer);
473 if (BufferIsValid(leftchildbuf))
474 MarkBufferDirty(leftchildbuf);
475
476 /*
477 * The first page in the chain was a temporary working copy meant to
478 * replace the old page. Copy it over the old page.
479 */
480 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
481 dist->page = BufferGetPage(dist->buffer);
482
483 /* Write the WAL record */
484 if (RelationNeedsWAL(rel))
485 recptr = gistXLogSplit(is_leaf,
486 dist, oldrlink, oldnsn, leftchildbuf,
487 markfollowright);
488 else
489 recptr = gistGetFakeLSN(rel);
490
491 for (ptr = dist; ptr; ptr = ptr->next)
492 {
493 PageSetLSN(ptr->page, recptr);
494 }
495
496 /*
497 * Return the new child buffers to the caller.
498 *
499 * If this was a root split, we've already inserted the downlink
500 * pointers, in the form of a new root page. Therefore we can release
501 * all the new buffers, and keep just the root page locked.
502 */
503 if (is_rootsplit)
504 {
505 for (ptr = dist->next; ptr; ptr = ptr->next)
506 UnlockReleaseBuffer(ptr->buffer);
507 }
508 }
509 else
510 {
511 /*
512 * Enough space. We always get here if ntup==0.
513 */
514 START_CRIT_SECTION();
515
516 /*
517 * Delete old tuple if any, then insert new tuple(s) if any. If
518 * possible, use the fast path of PageIndexTupleOverwrite.
519 */
520 if (OffsetNumberIsValid(oldoffnum))
521 {
522 if (ntup == 1)
523 {
524 /* One-for-one replacement, so use PageIndexTupleOverwrite */
525 if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
526 IndexTupleSize(*itup)))
527 elog(ERROR, "failed to add item to index page in \"%s\"",
528 RelationGetRelationName(rel));
529 }
530 else
531 {
532 /* Delete old, then append new tuple(s) to page */
533 PageIndexTupleDelete(page, oldoffnum);
534 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
535 }
536 }
537 else
538 {
539 /* Just append new tuples at the end of the page */
540 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
541 }
542
543 MarkBufferDirty(buffer);
544
545 if (BufferIsValid(leftchildbuf))
546 MarkBufferDirty(leftchildbuf);
547
548 if (RelationNeedsWAL(rel))
549 {
550 OffsetNumber ndeloffs = 0,
551 deloffs[1];
552
553 if (OffsetNumberIsValid(oldoffnum))
554 {
555 deloffs[0] = oldoffnum;
556 ndeloffs = 1;
557 }
558
559 recptr = gistXLogUpdate(buffer,
560 deloffs, ndeloffs, itup, ntup,
561 leftchildbuf, NULL);
562
563 PageSetLSN(page, recptr);
564 }
565 else
566 {
567 recptr = gistGetFakeLSN(rel);
568 PageSetLSN(page, recptr);
569 }
570
571 if (newblkno)
572 *newblkno = blkno;
573 }
574
575 /*
576 * If we inserted the downlink for a child page, set NSN and clear
577 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
578 * follow the rightlink if and only if they looked at the parent page
579 * before we inserted the downlink.
580 *
581 * Note that we do this *after* writing the WAL record. That means that
582 * the possible full page image in the WAL record does not include these
583 * changes, and they must be replayed even if the page is restored from
584 * the full page image. There's a chicken-and-egg problem: if we updated
585 * the child pages first, we wouldn't know the recptr of the WAL record
586 * we're about to write.
587 */
588 if (BufferIsValid(leftchildbuf))
589 {
590 Page leftpg = BufferGetPage(leftchildbuf);
591
592 GistPageSetNSN(leftpg, recptr);
593 GistClearFollowRight(leftpg);
594
595 PageSetLSN(leftpg, recptr);
596 }
597
598 END_CRIT_SECTION();
599
600 return is_split;
601 }
602
603 /*
604 * Workhouse routine for doing insertion into a GiST index. Note that
605 * this routine assumes it is invoked in a short-lived memory context,
606 * so it does not bother releasing palloc'd allocations.
607 */
608 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel)609 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
610 GISTSTATE *giststate, Relation heapRel)
611 {
612 ItemId iid;
613 IndexTuple idxtuple;
614 GISTInsertStack firststack;
615 GISTInsertStack *stack;
616 GISTInsertState state;
617 bool xlocked = false;
618
619 memset(&state, 0, sizeof(GISTInsertState));
620 state.freespace = freespace;
621 state.r = r;
622 state.heapRel = heapRel;
623
624 /* Start from the root */
625 firststack.blkno = GIST_ROOT_BLKNO;
626 firststack.lsn = 0;
627 firststack.parent = NULL;
628 firststack.downlinkoffnum = InvalidOffsetNumber;
629 state.stack = stack = &firststack;
630
631 /*
632 * Walk down along the path of smallest penalty, updating the parent
633 * pointers with the key we're inserting as we go. If we crash in the
634 * middle, the tree is consistent, although the possible parent updates
635 * were a waste.
636 */
637 for (;;)
638 {
639 if (XLogRecPtrIsInvalid(stack->lsn))
640 stack->buffer = ReadBuffer(state.r, stack->blkno);
641
642 /*
643 * Be optimistic and grab shared lock first. Swap it for an exclusive
644 * lock later if we need to update the page.
645 */
646 if (!xlocked)
647 {
648 LockBuffer(stack->buffer, GIST_SHARE);
649 gistcheckpage(state.r, stack->buffer);
650 }
651
652 stack->page = (Page) BufferGetPage(stack->buffer);
653 stack->lsn = xlocked ?
654 PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
655 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
656
657 /*
658 * If this page was split but the downlink was never inserted to the
659 * parent because the inserting backend crashed before doing that, fix
660 * that now.
661 */
662 if (GistFollowRight(stack->page))
663 {
664 if (!xlocked)
665 {
666 LockBuffer(stack->buffer, GIST_UNLOCK);
667 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
668 xlocked = true;
669 /* someone might've completed the split when we unlocked */
670 if (!GistFollowRight(stack->page))
671 continue;
672 }
673 gistfixsplit(&state, giststate);
674
675 UnlockReleaseBuffer(stack->buffer);
676 xlocked = false;
677 state.stack = stack = stack->parent;
678 continue;
679 }
680
681 if (stack->blkno != GIST_ROOT_BLKNO &&
682 stack->parent->lsn < GistPageGetNSN(stack->page))
683 {
684 /*
685 * Concurrent split detected. There's no guarantee that the
686 * downlink for this page is consistent with the tuple we're
687 * inserting anymore, so go back to parent and rechoose the best
688 * child.
689 */
690 UnlockReleaseBuffer(stack->buffer);
691 xlocked = false;
692 state.stack = stack = stack->parent;
693 continue;
694 }
695
696 if (!GistPageIsLeaf(stack->page))
697 {
698 /*
699 * This is an internal page so continue to walk down the tree.
700 * Find the child node that has the minimum insertion penalty.
701 */
702 BlockNumber childblkno;
703 IndexTuple newtup;
704 GISTInsertStack *item;
705 OffsetNumber downlinkoffnum;
706
707 downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
708 iid = PageGetItemId(stack->page, downlinkoffnum);
709 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
710 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
711
712 /*
713 * Check that it's not a leftover invalid tuple from pre-9.1
714 */
715 if (GistTupleIsInvalid(idxtuple))
716 ereport(ERROR,
717 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
718 RelationGetRelationName(r)),
719 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
720 errhint("Please REINDEX it.")));
721
722 /*
723 * Check that the key representing the target child node is
724 * consistent with the key we're inserting. Update it if it's not.
725 */
726 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
727 if (newtup)
728 {
729 /*
730 * Swap shared lock for an exclusive one. Beware, the page may
731 * change while we unlock/lock the page...
732 */
733 if (!xlocked)
734 {
735 LockBuffer(stack->buffer, GIST_UNLOCK);
736 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
737 xlocked = true;
738 stack->page = (Page) BufferGetPage(stack->buffer);
739
740 if (PageGetLSN(stack->page) != stack->lsn)
741 {
742 /* the page was changed while we unlocked it, retry */
743 continue;
744 }
745 }
746
747 /*
748 * Update the tuple.
749 *
750 * We still hold the lock after gistinserttuple(), but it
751 * might have to split the page to make the updated tuple fit.
752 * In that case the updated tuple might migrate to the other
753 * half of the split, so we have to go back to the parent and
754 * descend back to the half that's a better fit for the new
755 * tuple.
756 */
757 if (gistinserttuple(&state, stack, giststate, newtup,
758 downlinkoffnum))
759 {
760 /*
761 * If this was a root split, the root page continues to be
762 * the parent and the updated tuple went to one of the
763 * child pages, so we just need to retry from the root
764 * page.
765 */
766 if (stack->blkno != GIST_ROOT_BLKNO)
767 {
768 UnlockReleaseBuffer(stack->buffer);
769 xlocked = false;
770 state.stack = stack = stack->parent;
771 }
772 continue;
773 }
774 }
775 LockBuffer(stack->buffer, GIST_UNLOCK);
776 xlocked = false;
777
778 /* descend to the chosen child */
779 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
780 item->blkno = childblkno;
781 item->parent = stack;
782 item->downlinkoffnum = downlinkoffnum;
783 state.stack = stack = item;
784 }
785 else
786 {
787 /*
788 * Leaf page. Insert the new key. We've already updated all the
789 * parents on the way down, but we might have to split the page if
790 * it doesn't fit. gistinserthere() will take care of that.
791 */
792
793 /*
794 * Swap shared lock for an exclusive one. Be careful, the page may
795 * change while we unlock/lock the page...
796 */
797 if (!xlocked)
798 {
799 LockBuffer(stack->buffer, GIST_UNLOCK);
800 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
801 xlocked = true;
802 stack->page = (Page) BufferGetPage(stack->buffer);
803 stack->lsn = PageGetLSN(stack->page);
804
805 if (stack->blkno == GIST_ROOT_BLKNO)
806 {
807 /*
808 * the only page that can become inner instead of leaf is
809 * the root page, so for root we should recheck it
810 */
811 if (!GistPageIsLeaf(stack->page))
812 {
813 /*
814 * very rare situation: during unlock/lock index with
815 * number of pages = 1 was increased
816 */
817 LockBuffer(stack->buffer, GIST_UNLOCK);
818 xlocked = false;
819 continue;
820 }
821
822 /*
823 * we don't need to check root split, because checking
824 * leaf/inner is enough to recognize split for root
825 */
826 }
827 else if (GistFollowRight(stack->page) ||
828 stack->parent->lsn < GistPageGetNSN(stack->page))
829 {
830 /*
831 * The page was split while we momentarily unlocked the
832 * page. Go back to parent.
833 */
834 UnlockReleaseBuffer(stack->buffer);
835 xlocked = false;
836 state.stack = stack = stack->parent;
837 continue;
838 }
839 }
840
841 /* now state.stack->(page, buffer and blkno) points to leaf page */
842
843 gistinserttuple(&state, stack, giststate, itup,
844 InvalidOffsetNumber);
845 LockBuffer(stack->buffer, GIST_UNLOCK);
846
847 /* Release any pins we might still hold before exiting */
848 for (; stack; stack = stack->parent)
849 ReleaseBuffer(stack->buffer);
850 break;
851 }
852 }
853 }
854
855 /*
856 * Traverse the tree to find path from root page to specified "child" block.
857 *
858 * returns a new insertion stack, starting from the parent of "child", up
859 * to the root. *downlinkoffnum is set to the offset of the downlink in the
860 * direct parent of child.
861 *
862 * To prevent deadlocks, this should lock only one page at a time.
863 */
864 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)865 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
866 {
867 Page page;
868 Buffer buffer;
869 OffsetNumber i,
870 maxoff;
871 ItemId iid;
872 IndexTuple idxtuple;
873 List *fifo;
874 GISTInsertStack *top,
875 *ptr;
876 BlockNumber blkno;
877
878 top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
879 top->blkno = GIST_ROOT_BLKNO;
880 top->downlinkoffnum = InvalidOffsetNumber;
881
882 fifo = list_make1(top);
883 while (fifo != NIL)
884 {
885 /* Get next page to visit */
886 top = linitial(fifo);
887 fifo = list_delete_first(fifo);
888
889 buffer = ReadBuffer(r, top->blkno);
890 LockBuffer(buffer, GIST_SHARE);
891 gistcheckpage(r, buffer);
892 page = (Page) BufferGetPage(buffer);
893
894 if (GistPageIsLeaf(page))
895 {
896 /*
897 * Because we scan the index top-down, all the rest of the pages
898 * in the queue must be leaf pages as well.
899 */
900 UnlockReleaseBuffer(buffer);
901 break;
902 }
903
904 top->lsn = BufferGetLSNAtomic(buffer);
905
906 /*
907 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
908 * downlink. This should not normally happen..
909 */
910 if (GistFollowRight(page))
911 elog(ERROR, "concurrent GiST page split was incomplete");
912
913 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
914 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
915 {
916 /*
917 * Page was split while we looked elsewhere. We didn't see the
918 * downlink to the right page when we scanned the parent, so add
919 * it to the queue now.
920 *
921 * Put the right page ahead of the queue, so that we visit it
922 * next. That's important, because if this is the lowest internal
923 * level, just above leaves, we might already have queued up some
924 * leaf pages, and we assume that there can't be any non-leaf
925 * pages behind leaf pages.
926 */
927 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
928 ptr->blkno = GistPageGetOpaque(page)->rightlink;
929 ptr->downlinkoffnum = InvalidOffsetNumber;
930 ptr->parent = top->parent;
931
932 fifo = lcons(ptr, fifo);
933 }
934
935 maxoff = PageGetMaxOffsetNumber(page);
936
937 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
938 {
939 iid = PageGetItemId(page, i);
940 idxtuple = (IndexTuple) PageGetItem(page, iid);
941 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
942 if (blkno == child)
943 {
944 /* Found it! */
945 UnlockReleaseBuffer(buffer);
946 *downlinkoffnum = i;
947 return top;
948 }
949 else
950 {
951 /* Append this child to the list of pages to visit later */
952 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
953 ptr->blkno = blkno;
954 ptr->downlinkoffnum = i;
955 ptr->parent = top;
956
957 fifo = lappend(fifo, ptr);
958 }
959 }
960
961 UnlockReleaseBuffer(buffer);
962 }
963
964 elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
965 RelationGetRelationName(r), child);
966 return NULL; /* keep compiler quiet */
967 }
968
969 /*
970 * Updates the stack so that child->parent is the correct parent of the
971 * child. child->parent must be exclusively locked on entry, and will
972 * remain so at exit, but it might not be the same page anymore.
973 */
974 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)975 gistFindCorrectParent(Relation r, GISTInsertStack *child)
976 {
977 GISTInsertStack *parent = child->parent;
978
979 gistcheckpage(r, parent->buffer);
980 parent->page = (Page) BufferGetPage(parent->buffer);
981
982 /* here we don't need to distinguish between split and page update */
983 if (child->downlinkoffnum == InvalidOffsetNumber ||
984 parent->lsn != PageGetLSN(parent->page))
985 {
986 /* parent is changed, look child in right links until found */
987 OffsetNumber i,
988 maxoff;
989 ItemId iid;
990 IndexTuple idxtuple;
991 GISTInsertStack *ptr;
992
993 while (true)
994 {
995 maxoff = PageGetMaxOffsetNumber(parent->page);
996 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
997 {
998 iid = PageGetItemId(parent->page, i);
999 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1000 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1001 {
1002 /* yes!!, found */
1003 child->downlinkoffnum = i;
1004 return;
1005 }
1006 }
1007
1008 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1009 UnlockReleaseBuffer(parent->buffer);
1010 if (parent->blkno == InvalidBlockNumber)
1011 {
1012 /*
1013 * End of chain and still didn't find parent. It's a very-very
1014 * rare situation when root splited.
1015 */
1016 break;
1017 }
1018 parent->buffer = ReadBuffer(r, parent->blkno);
1019 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1020 gistcheckpage(r, parent->buffer);
1021 parent->page = (Page) BufferGetPage(parent->buffer);
1022 }
1023
1024 /*
1025 * awful!!, we need search tree to find parent ... , but before we
1026 * should release all old parent
1027 */
1028
1029 ptr = child->parent->parent; /* child->parent already released
1030 * above */
1031 while (ptr)
1032 {
1033 ReleaseBuffer(ptr->buffer);
1034 ptr = ptr->parent;
1035 }
1036
1037 /* ok, find new path */
1038 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1039
1040 /* read all buffers as expected by caller */
1041 /* note we don't lock them or gistcheckpage them here! */
1042 while (ptr)
1043 {
1044 ptr->buffer = ReadBuffer(r, ptr->blkno);
1045 ptr->page = (Page) BufferGetPage(ptr->buffer);
1046 ptr = ptr->parent;
1047 }
1048
1049 /* install new chain of parents to stack */
1050 child->parent = parent;
1051
1052 /* make recursive call to normal processing */
1053 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1054 gistFindCorrectParent(r, child);
1055 }
1056
1057 return;
1058 }
1059
1060 /*
1061 * Form a downlink pointer for the page in 'buf'.
1062 */
1063 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1064 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1065 GISTInsertStack *stack)
1066 {
1067 Page page = BufferGetPage(buf);
1068 OffsetNumber maxoff;
1069 OffsetNumber offset;
1070 IndexTuple downlink = NULL;
1071
1072 maxoff = PageGetMaxOffsetNumber(page);
1073 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1074 {
1075 IndexTuple ituple = (IndexTuple)
1076 PageGetItem(page, PageGetItemId(page, offset));
1077
1078 if (downlink == NULL)
1079 downlink = CopyIndexTuple(ituple);
1080 else
1081 {
1082 IndexTuple newdownlink;
1083
1084 newdownlink = gistgetadjusted(rel, downlink, ituple,
1085 giststate);
1086 if (newdownlink)
1087 downlink = newdownlink;
1088 }
1089 }
1090
1091 /*
1092 * If the page is completely empty, we can't form a meaningful downlink
1093 * for it. But we have to insert a downlink for the page. Any key will do,
1094 * as long as its consistent with the downlink of parent page, so that we
1095 * can legally insert it to the parent. A minimal one that matches as few
1096 * scans as possible would be best, to keep scans from doing useless work,
1097 * but we don't know how to construct that. So we just use the downlink of
1098 * the original page that was split - that's as far from optimal as it can
1099 * get but will do..
1100 */
1101 if (!downlink)
1102 {
1103 ItemId iid;
1104
1105 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1106 gistFindCorrectParent(rel, stack);
1107 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1108 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1109 downlink = CopyIndexTuple(downlink);
1110 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1111 }
1112
1113 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1114 GistTupleSetValid(downlink);
1115
1116 return downlink;
1117 }
1118
1119
1120 /*
1121 * Complete the incomplete split of state->stack->page.
1122 */
1123 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1124 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1125 {
1126 GISTInsertStack *stack = state->stack;
1127 Buffer buf;
1128 Page page;
1129 List *splitinfo = NIL;
1130
1131 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1132 RelationGetRelationName(state->r), stack->blkno);
1133
1134 Assert(GistFollowRight(stack->page));
1135 Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1136
1137 buf = stack->buffer;
1138
1139 /*
1140 * Read the chain of split pages, following the rightlinks. Construct a
1141 * downlink tuple for each page.
1142 */
1143 for (;;)
1144 {
1145 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1146 IndexTuple downlink;
1147
1148 page = BufferGetPage(buf);
1149
1150 /* Form the new downlink tuples to insert to parent */
1151 downlink = gistformdownlink(state->r, buf, giststate, stack);
1152
1153 si->buf = buf;
1154 si->downlink = downlink;
1155
1156 splitinfo = lappend(splitinfo, si);
1157
1158 if (GistFollowRight(page))
1159 {
1160 /* lock next page */
1161 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1162 LockBuffer(buf, GIST_EXCLUSIVE);
1163 }
1164 else
1165 break;
1166 }
1167
1168 /* Insert the downlinks */
1169 gistfinishsplit(state, stack, giststate, splitinfo, false);
1170 }
1171
1172 /*
1173 * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1174 * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1175 * 'stack' represents the path from the root to the page being updated.
1176 *
1177 * The caller must hold an exclusive lock on stack->buffer. The lock is still
1178 * held on return, but the page might not contain the inserted tuple if the
1179 * page was split. The function returns true if the page was split, false
1180 * otherwise.
1181 */
1182 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1183 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1184 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1185 {
1186 return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1187 InvalidBuffer, InvalidBuffer, false, false);
1188 }
1189
1190 /* ----------------
1191 * An extended workhorse version of gistinserttuple(). This version allows
1192 * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1193 * This is used to recursively update the downlinks in the parent when a page
1194 * is split.
1195 *
1196 * If leftchild and rightchild are valid, we're inserting/replacing the
1197 * downlink for rightchild, and leftchild is its left sibling. We clear the
1198 * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1199 * insertion of the downlink.
1200 *
1201 * To avoid holding locks for longer than necessary, when recursing up the
1202 * tree to update the parents, the locking is a bit peculiar here. On entry,
1203 * the caller must hold an exclusive lock on stack->buffer, as well as
1204 * leftchild and rightchild if given. On return:
1205 *
1206 * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1207 * always kept pinned, however.
1208 * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1209 * is kept pinned.
1210 * - Lock and pin on 'rightchild' are always released.
1211 *
1212 * Returns 'true' if the page had to be split. Note that if the page was
1213 * split, the inserted/updated tuples might've been inserted to a right
1214 * sibling of stack->buffer instead of stack->buffer itself.
1215 */
1216 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1217 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1218 GISTSTATE *giststate,
1219 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1220 Buffer leftchild, Buffer rightchild,
1221 bool unlockbuf, bool unlockleftchild)
1222 {
1223 List *splitinfo;
1224 bool is_split;
1225
1226 /*
1227 * Check for any rw conflicts (in serializable isolation level) just
1228 * before we intend to modify the page
1229 */
1230 CheckForSerializableConflictIn(state->r, NULL, stack->buffer);
1231
1232 /* Insert the tuple(s) to the page, splitting the page if necessary */
1233 is_split = gistplacetopage(state->r, state->freespace, giststate,
1234 stack->buffer,
1235 tuples, ntup,
1236 oldoffnum, NULL,
1237 leftchild,
1238 &splitinfo,
1239 true,
1240 state->heapRel);
1241
1242 /*
1243 * Before recursing up in case the page was split, release locks on the
1244 * child pages. We don't need to keep them locked when updating the
1245 * parent.
1246 */
1247 if (BufferIsValid(rightchild))
1248 UnlockReleaseBuffer(rightchild);
1249 if (BufferIsValid(leftchild) && unlockleftchild)
1250 LockBuffer(leftchild, GIST_UNLOCK);
1251
1252 /*
1253 * If we had to split, insert/update the downlinks in the parent. If the
1254 * caller requested us to release the lock on stack->buffer, tell
1255 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1256 * didn't have to split, release it ourselves.
1257 */
1258 if (splitinfo)
1259 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1260 else if (unlockbuf)
1261 LockBuffer(stack->buffer, GIST_UNLOCK);
1262
1263 return is_split;
1264 }
1265
1266 /*
1267 * Finish an incomplete split by inserting/updating the downlinks in parent
1268 * page. 'splitinfo' contains all the child pages involved in the split,
1269 * from left-to-right.
1270 *
1271 * On entry, the caller must hold a lock on stack->buffer and all the child
1272 * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1273 * released on return. The child pages are always unlocked and unpinned.
1274 */
1275 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1276 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1277 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1278 {
1279 ListCell *lc;
1280 List *reversed;
1281 GISTPageSplitInfo *right;
1282 GISTPageSplitInfo *left;
1283 IndexTuple tuples[2];
1284
1285 /* A split always contains at least two halves */
1286 Assert(list_length(splitinfo) >= 2);
1287
1288 /*
1289 * We need to insert downlinks for each new page, and update the downlink
1290 * for the original (leftmost) page in the split. Begin at the rightmost
1291 * page, inserting one downlink at a time until there's only two pages
1292 * left. Finally insert the downlink for the last new page and update the
1293 * downlink for the original page as one operation.
1294 */
1295
1296 /* for convenience, create a copy of the list in reverse order */
1297 reversed = NIL;
1298 foreach(lc, splitinfo)
1299 {
1300 reversed = lcons(lfirst(lc), reversed);
1301 }
1302
1303 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1304 gistFindCorrectParent(state->r, stack);
1305
1306 /*
1307 * insert downlinks for the siblings from right to left, until there are
1308 * only two siblings left.
1309 */
1310 while (list_length(reversed) > 2)
1311 {
1312 right = (GISTPageSplitInfo *) linitial(reversed);
1313 left = (GISTPageSplitInfo *) lsecond(reversed);
1314
1315 if (gistinserttuples(state, stack->parent, giststate,
1316 &right->downlink, 1,
1317 InvalidOffsetNumber,
1318 left->buf, right->buf, false, false))
1319 {
1320 /*
1321 * If the parent page was split, need to relocate the original
1322 * parent pointer.
1323 */
1324 gistFindCorrectParent(state->r, stack);
1325 }
1326 /* gistinserttuples() released the lock on right->buf. */
1327 reversed = list_delete_first(reversed);
1328 }
1329
1330 right = (GISTPageSplitInfo *) linitial(reversed);
1331 left = (GISTPageSplitInfo *) lsecond(reversed);
1332
1333 /*
1334 * Finally insert downlink for the remaining right page and update the
1335 * downlink for the original page to not contain the tuples that were
1336 * moved to the new pages.
1337 */
1338 tuples[0] = left->downlink;
1339 tuples[1] = right->downlink;
1340 gistinserttuples(state, stack->parent, giststate,
1341 tuples, 2,
1342 stack->downlinkoffnum,
1343 left->buf, right->buf,
1344 true, /* Unlock parent */
1345 unlockbuf /* Unlock stack->buffer if caller wants that */
1346 );
1347 Assert(left->buf == stack->buffer);
1348 }
1349
1350 /*
1351 * gistSplit -- split a page in the tree and fill struct
1352 * used for XLOG and real writes buffers. Function is recursive, ie
1353 * it will split page until keys will fit in every page.
1354 */
1355 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1356 gistSplit(Relation r,
1357 Page page,
1358 IndexTuple *itup, /* contains compressed entry */
1359 int len,
1360 GISTSTATE *giststate)
1361 {
1362 IndexTuple *lvectup,
1363 *rvectup;
1364 GistSplitVector v;
1365 int i;
1366 SplitedPageLayout *res = NULL;
1367
1368 /* this should never recurse very deeply, but better safe than sorry */
1369 check_stack_depth();
1370
1371 /* there's no point in splitting an empty page */
1372 Assert(len > 0);
1373
1374 /*
1375 * If a single tuple doesn't fit on a page, no amount of splitting will
1376 * help.
1377 */
1378 if (len == 1)
1379 ereport(ERROR,
1380 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1381 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1382 IndexTupleSize(itup[0]), GiSTPageSize,
1383 RelationGetRelationName(r))));
1384
1385 memset(v.spl_lisnull, true, sizeof(bool) * giststate->tupdesc->natts);
1386 memset(v.spl_risnull, true, sizeof(bool) * giststate->tupdesc->natts);
1387 gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1388
1389 /* form left and right vector */
1390 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1391 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1392
1393 for (i = 0; i < v.splitVector.spl_nleft; i++)
1394 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1395
1396 for (i = 0; i < v.splitVector.spl_nright; i++)
1397 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1398
1399 /* finalize splitting (may need another split) */
1400 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1401 {
1402 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1403 }
1404 else
1405 {
1406 ROTATEDIST(res);
1407 res->block.num = v.splitVector.spl_nright;
1408 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1409 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1410 }
1411
1412 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1413 {
1414 SplitedPageLayout *resptr,
1415 *subres;
1416
1417 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1418
1419 /* install on list's tail */
1420 while (resptr->next)
1421 resptr = resptr->next;
1422
1423 resptr->next = res;
1424 res = subres;
1425 }
1426 else
1427 {
1428 ROTATEDIST(res);
1429 res->block.num = v.splitVector.spl_nleft;
1430 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1431 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1432 }
1433
1434 return res;
1435 }
1436
1437 /*
1438 * Create a GISTSTATE and fill it with information about the index
1439 */
1440 GISTSTATE *
initGISTstate(Relation index)1441 initGISTstate(Relation index)
1442 {
1443 GISTSTATE *giststate;
1444 MemoryContext scanCxt;
1445 MemoryContext oldCxt;
1446 int i;
1447
1448 /* safety check to protect fixed-size arrays in GISTSTATE */
1449 if (index->rd_att->natts > INDEX_MAX_KEYS)
1450 elog(ERROR, "numberOfAttributes %d > %d",
1451 index->rd_att->natts, INDEX_MAX_KEYS);
1452
1453 /* Create the memory context that will hold the GISTSTATE */
1454 scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1455 "GiST scan context",
1456 ALLOCSET_DEFAULT_SIZES);
1457 oldCxt = MemoryContextSwitchTo(scanCxt);
1458
1459 /* Create and fill in the GISTSTATE */
1460 giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1461
1462 giststate->scanCxt = scanCxt;
1463 giststate->tempCxt = scanCxt; /* caller must change this if needed */
1464 giststate->tupdesc = index->rd_att;
1465
1466 for (i = 0; i < index->rd_att->natts; i++)
1467 {
1468 fmgr_info_copy(&(giststate->consistentFn[i]),
1469 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1470 scanCxt);
1471 fmgr_info_copy(&(giststate->unionFn[i]),
1472 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1473 scanCxt);
1474
1475 /* opclasses are not required to provide a Compress method */
1476 if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1477 fmgr_info_copy(&(giststate->compressFn[i]),
1478 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1479 scanCxt);
1480 else
1481 giststate->compressFn[i].fn_oid = InvalidOid;
1482
1483 /* opclasses are not required to provide a Decompress method */
1484 if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1485 fmgr_info_copy(&(giststate->decompressFn[i]),
1486 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1487 scanCxt);
1488 else
1489 giststate->decompressFn[i].fn_oid = InvalidOid;
1490
1491 fmgr_info_copy(&(giststate->penaltyFn[i]),
1492 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1493 scanCxt);
1494 fmgr_info_copy(&(giststate->picksplitFn[i]),
1495 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1496 scanCxt);
1497 fmgr_info_copy(&(giststate->equalFn[i]),
1498 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1499 scanCxt);
1500
1501 /* opclasses are not required to provide a Distance method */
1502 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1503 fmgr_info_copy(&(giststate->distanceFn[i]),
1504 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1505 scanCxt);
1506 else
1507 giststate->distanceFn[i].fn_oid = InvalidOid;
1508
1509 /* opclasses are not required to provide a Fetch method */
1510 if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1511 fmgr_info_copy(&(giststate->fetchFn[i]),
1512 index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1513 scanCxt);
1514 else
1515 giststate->fetchFn[i].fn_oid = InvalidOid;
1516
1517 /*
1518 * If the index column has a specified collation, we should honor that
1519 * while doing comparisons. However, we may have a collatable storage
1520 * type for a noncollatable indexed data type. If there's no index
1521 * collation then specify default collation in case the support
1522 * functions need collation. This is harmless if the support
1523 * functions don't care about collation, so we just do it
1524 * unconditionally. (We could alternatively call get_typcollation,
1525 * but that seems like expensive overkill --- there aren't going to be
1526 * any cases where a GiST storage type has a nondefault collation.)
1527 */
1528 if (OidIsValid(index->rd_indcollation[i]))
1529 giststate->supportCollation[i] = index->rd_indcollation[i];
1530 else
1531 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1532 }
1533
1534 MemoryContextSwitchTo(oldCxt);
1535
1536 return giststate;
1537 }
1538
1539 void
freeGISTstate(GISTSTATE * giststate)1540 freeGISTstate(GISTSTATE *giststate)
1541 {
1542 /* It's sufficient to delete the scanCxt */
1543 MemoryContextDelete(giststate->scanCxt);
1544 }
1545
1546 /*
1547 * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1548 * Function assumes that buffer is exclusively locked.
1549 */
1550 static void
gistvacuumpage(Relation rel,Page page,Buffer buffer,Relation heapRel)1551 gistvacuumpage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1552 {
1553 OffsetNumber deletable[MaxIndexTuplesPerPage];
1554 int ndeletable = 0;
1555 OffsetNumber offnum,
1556 maxoff;
1557
1558 Assert(GistPageIsLeaf(page));
1559
1560 /*
1561 * Scan over all items to see which ones need to be deleted according to
1562 * LP_DEAD flags.
1563 */
1564 maxoff = PageGetMaxOffsetNumber(page);
1565 for (offnum = FirstOffsetNumber;
1566 offnum <= maxoff;
1567 offnum = OffsetNumberNext(offnum))
1568 {
1569 ItemId itemId = PageGetItemId(page, offnum);
1570
1571 if (ItemIdIsDead(itemId))
1572 deletable[ndeletable++] = offnum;
1573 }
1574
1575 if (ndeletable > 0)
1576 {
1577 START_CRIT_SECTION();
1578
1579 PageIndexMultiDelete(page, deletable, ndeletable);
1580
1581 /*
1582 * Mark the page as not containing any LP_DEAD items. This is not
1583 * certainly true (there might be some that have recently been marked,
1584 * but weren't included in our target-item list), but it will almost
1585 * always be true and it doesn't seem worth an additional page scan to
1586 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1587 */
1588 GistClearPageHasGarbage(page);
1589
1590 MarkBufferDirty(buffer);
1591
1592 /* XLOG stuff */
1593 if (RelationNeedsWAL(rel))
1594 {
1595 XLogRecPtr recptr;
1596
1597 recptr = gistXLogUpdate(buffer,
1598 deletable, ndeletable,
1599 NULL, 0, InvalidBuffer,
1600 &heapRel->rd_node);
1601
1602 PageSetLSN(page, recptr);
1603 }
1604 else
1605 PageSetLSN(page, gistGetFakeLSN(rel));
1606
1607 END_CRIT_SECTION();
1608 }
1609
1610 /*
1611 * Note: if we didn't find any LP_DEAD items, then the page's
1612 * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1613 * separate write to clear it, however. We will clear it when we split
1614 * the page.
1615 */
1616 }
1617