1 /*-------------------------------------------------------------------------
2 *
3 * gist.c
4 * interface routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "utils/index_selfuncs.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24
25
26 /* non-export function prototypes */
27 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
28 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
29 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
30 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
31 GISTSTATE *giststate,
32 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
33 Buffer leftchild, Buffer rightchild,
34 bool unlockbuf, bool unlockleftchild);
35 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
36 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
37 static void gistvacuumpage(Relation rel, Page page, Buffer buffer,
38 Relation heapRel);
39
40
41 #define ROTATEDIST(d) do { \
42 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
43 memset(tmp,0,sizeof(SplitedPageLayout)); \
44 tmp->block.blkno = InvalidBlockNumber; \
45 tmp->buffer = InvalidBuffer; \
46 tmp->next = (d); \
47 (d)=tmp; \
48 } while(0)
49
50
51 /*
52 * GiST handler function: return IndexAmRoutine with access method parameters
53 * and callbacks.
54 */
55 Datum
gisthandler(PG_FUNCTION_ARGS)56 gisthandler(PG_FUNCTION_ARGS)
57 {
58 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
59
60 amroutine->amstrategies = 0;
61 amroutine->amsupport = GISTNProcs;
62 amroutine->amcanorder = false;
63 amroutine->amcanorderbyop = true;
64 amroutine->amcanbackward = false;
65 amroutine->amcanunique = false;
66 amroutine->amcanmulticol = true;
67 amroutine->amoptionalkey = true;
68 amroutine->amsearcharray = false;
69 amroutine->amsearchnulls = true;
70 amroutine->amstorage = true;
71 amroutine->amclusterable = true;
72 amroutine->ampredlocks = false;
73 amroutine->amkeytype = InvalidOid;
74
75 amroutine->ambuild = gistbuild;
76 amroutine->ambuildempty = gistbuildempty;
77 amroutine->aminsert = gistinsert;
78 amroutine->ambulkdelete = gistbulkdelete;
79 amroutine->amvacuumcleanup = gistvacuumcleanup;
80 amroutine->amcanreturn = gistcanreturn;
81 amroutine->amcostestimate = gistcostestimate;
82 amroutine->amoptions = gistoptions;
83 amroutine->amproperty = gistproperty;
84 amroutine->amvalidate = gistvalidate;
85 amroutine->ambeginscan = gistbeginscan;
86 amroutine->amrescan = gistrescan;
87 amroutine->amgettuple = gistgettuple;
88 amroutine->amgetbitmap = gistgetbitmap;
89 amroutine->amendscan = gistendscan;
90 amroutine->ammarkpos = NULL;
91 amroutine->amrestrpos = NULL;
92
93 PG_RETURN_POINTER(amroutine);
94 }
95
96 /*
97 * Create and return a temporary memory context for use by GiST. We
98 * _always_ invoke user-provided methods in a temporary memory
99 * context, so that memory leaks in those functions cannot cause
100 * problems. Also, we use some additional temporary contexts in the
101 * GiST code itself, to avoid the need to do some awkward manual
102 * memory management.
103 */
104 MemoryContext
createTempGistContext(void)105 createTempGistContext(void)
106 {
107 return AllocSetContextCreate(CurrentMemoryContext,
108 "GiST temporary context",
109 ALLOCSET_DEFAULT_SIZES);
110 }
111
112 /*
113 * gistbuildempty() -- build an empty gist index in the initialization fork
114 */
115 void
gistbuildempty(Relation index)116 gistbuildempty(Relation index)
117 {
118 Buffer buffer;
119
120 /* Initialize the root page */
121 buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
122 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
123
124 /* Initialize and xlog buffer */
125 START_CRIT_SECTION();
126 GISTInitBuffer(buffer, F_LEAF);
127 MarkBufferDirty(buffer);
128 log_newpage_buffer(buffer, true);
129 END_CRIT_SECTION();
130
131 /* Unlock and release the buffer */
132 UnlockReleaseBuffer(buffer);
133 }
134
135 /*
136 * gistinsert -- wrapper for GiST tuple insertion.
137 *
138 * This is the public interface routine for tuple insertion in GiSTs.
139 * It doesn't do any work; just locks the relation and passes the buck.
140 */
141 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique)142 gistinsert(Relation r, Datum *values, bool *isnull,
143 ItemPointer ht_ctid, Relation heapRel,
144 IndexUniqueCheck checkUnique)
145 {
146 IndexTuple itup;
147 GISTSTATE *giststate;
148 MemoryContext oldCxt;
149
150 giststate = initGISTstate(r);
151
152 /*
153 * We use the giststate's scan context as temp context too. This means
154 * that any memory leaked by the support functions is not reclaimed until
155 * end of insert. In most cases, we aren't going to call the support
156 * functions very many times before finishing the insert, so this seems
157 * cheaper than resetting a temp context for each function call.
158 */
159 oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
160
161 itup = gistFormTuple(giststate, r,
162 values, isnull, true /* size is currently bogus */ );
163 itup->t_tid = *ht_ctid;
164
165 gistdoinsert(r, itup, 0, giststate, heapRel);
166
167 /* cleanup */
168 MemoryContextSwitchTo(oldCxt);
169 freeGISTstate(giststate);
170
171 return false;
172 }
173
174
175 /*
176 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
177 * at that offset is atomically removed along with inserting the new tuples.
178 * This is used to replace a tuple with a new one.
179 *
180 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
181 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
182 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
183 *
184 * If 'markfollowright' is true and the page is split, the left child is
185 * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
186 * index build, however, there is no concurrent access and the page splitting
187 * is done in a slightly simpler fashion, and false is passed.
188 *
189 * If there is not enough room on the page, it is split. All the split
190 * pages are kept pinned and locked and returned in *splitinfo, the caller
191 * is responsible for inserting the downlinks for them. However, if
192 * 'buffer' is the root page and it needs to be split, gistplacetopage()
193 * performs the split as one atomic operation, and *splitinfo is set to NIL.
194 * In that case, we continue to hold the root page locked, and the child
195 * pages are released; note that new tuple(s) are *not* on the root page
196 * but in one of the new child pages.
197 *
198 * If 'newblkno' is not NULL, returns the block number of page the first
199 * new/updated tuple was inserted to. Usually it's the given page, but could
200 * be its right sibling if the page was split.
201 *
202 * Returns 'true' if the page was split, 'false' otherwise.
203 */
204 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel)205 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
206 Buffer buffer,
207 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
208 BlockNumber *newblkno,
209 Buffer leftchildbuf,
210 List **splitinfo,
211 bool markfollowright,
212 Relation heapRel)
213 {
214 BlockNumber blkno = BufferGetBlockNumber(buffer);
215 Page page = BufferGetPage(buffer);
216 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
217 XLogRecPtr recptr;
218 int i;
219 bool is_split;
220
221 /*
222 * Refuse to modify a page that's incompletely split. This should not
223 * happen because we finish any incomplete splits while we walk down the
224 * tree. However, it's remotely possible that another concurrent inserter
225 * splits a parent page, and errors out before completing the split. We
226 * will just throw an error in that case, and leave any split we had in
227 * progress unfinished too. The next insert that comes along will clean up
228 * the mess.
229 */
230 if (GistFollowRight(page))
231 elog(ERROR, "concurrent GiST page split was incomplete");
232
233 *splitinfo = NIL;
234
235 /*
236 * if isupdate, remove old key: This node's key has been modified, either
237 * because a child split occurred or because we needed to adjust our key
238 * for an insert in a child node. Therefore, remove the old version of
239 * this node's key.
240 *
241 * for WAL replay, in the non-split case we handle this by setting up a
242 * one-element todelete array; in the split case, it's handled implicitly
243 * because the tuple vector passed to gistSplit won't include this tuple.
244 */
245 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
246
247 /*
248 * If leaf page is full, try at first to delete dead tuples. And then
249 * check again.
250 */
251 if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
252 {
253 gistvacuumpage(rel, page, buffer, heapRel);
254 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
255 }
256
257 if (is_split)
258 {
259 /* no space for insertion */
260 IndexTuple *itvec;
261 int tlen;
262 SplitedPageLayout *dist = NULL,
263 *ptr;
264 BlockNumber oldrlink = InvalidBlockNumber;
265 GistNSN oldnsn = 0;
266 SplitedPageLayout rootpg;
267 bool is_rootsplit;
268 int npage;
269
270 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
271
272 /*
273 * Form index tuples vector to split. If we're replacing an old tuple,
274 * remove the old version from the vector.
275 */
276 itvec = gistextractpage(page, &tlen);
277 if (OffsetNumberIsValid(oldoffnum))
278 {
279 /* on inner page we should remove old tuple */
280 int pos = oldoffnum - FirstOffsetNumber;
281
282 tlen--;
283 if (pos != tlen)
284 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
285 }
286 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
287 dist = gistSplit(rel, page, itvec, tlen, giststate);
288
289 /*
290 * Check that split didn't produce too many pages.
291 */
292 npage = 0;
293 for (ptr = dist; ptr; ptr = ptr->next)
294 npage++;
295 /* in a root split, we'll add one more page to the list below */
296 if (is_rootsplit)
297 npage++;
298 if (npage > GIST_MAX_SPLIT_PAGES)
299 elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
300 npage, GIST_MAX_SPLIT_PAGES);
301
302 /*
303 * Set up pages to work with. Allocate new buffers for all but the
304 * leftmost page. The original page becomes the new leftmost page, and
305 * is just replaced with the new contents.
306 *
307 * For a root-split, allocate new buffers for all child pages, the
308 * original page is overwritten with new root page containing
309 * downlinks to the new child pages.
310 */
311 ptr = dist;
312 if (!is_rootsplit)
313 {
314 /* save old rightlink and NSN */
315 oldrlink = GistPageGetOpaque(page)->rightlink;
316 oldnsn = GistPageGetNSN(page);
317
318 dist->buffer = buffer;
319 dist->block.blkno = BufferGetBlockNumber(buffer);
320 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
321
322 /* clean all flags except F_LEAF */
323 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
324
325 ptr = ptr->next;
326 }
327 for (; ptr; ptr = ptr->next)
328 {
329 /* Allocate new page */
330 ptr->buffer = gistNewBuffer(rel);
331 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
332 ptr->page = BufferGetPage(ptr->buffer);
333 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
334 }
335
336 /*
337 * Now that we know which blocks the new pages go to, set up downlink
338 * tuples to point to them.
339 */
340 for (ptr = dist; ptr; ptr = ptr->next)
341 {
342 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
343 GistTupleSetValid(ptr->itup);
344 }
345
346 /*
347 * If this is a root split, we construct the new root page with the
348 * downlinks here directly, instead of requiring the caller to insert
349 * them. Add the new root page to the list along with the child pages.
350 */
351 if (is_rootsplit)
352 {
353 IndexTuple *downlinks;
354 int ndownlinks = 0;
355 int i;
356
357 rootpg.buffer = buffer;
358 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
359 GistPageGetOpaque(rootpg.page)->flags = 0;
360
361 /* Prepare a vector of all the downlinks */
362 for (ptr = dist; ptr; ptr = ptr->next)
363 ndownlinks++;
364 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
365 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
366 downlinks[i++] = ptr->itup;
367
368 rootpg.block.blkno = GIST_ROOT_BLKNO;
369 rootpg.block.num = ndownlinks;
370 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
371 &(rootpg.lenlist));
372 rootpg.itup = NULL;
373
374 rootpg.next = dist;
375 dist = &rootpg;
376 }
377 else
378 {
379 /* Prepare split-info to be returned to caller */
380 for (ptr = dist; ptr; ptr = ptr->next)
381 {
382 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
383
384 si->buf = ptr->buffer;
385 si->downlink = ptr->itup;
386 *splitinfo = lappend(*splitinfo, si);
387 }
388 }
389
390 /*
391 * Fill all pages. All the pages are new, ie. freshly allocated empty
392 * pages, or a temporary copy of the old page.
393 */
394 for (ptr = dist; ptr; ptr = ptr->next)
395 {
396 char *data = (char *) (ptr->list);
397
398 for (i = 0; i < ptr->block.num; i++)
399 {
400 IndexTuple thistup = (IndexTuple) data;
401
402 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
403 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
404
405 /*
406 * If this is the first inserted/updated tuple, let the caller
407 * know which page it landed on.
408 */
409 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
410 *newblkno = ptr->block.blkno;
411
412 data += IndexTupleSize(thistup);
413 }
414
415 /* Set up rightlinks */
416 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
417 GistPageGetOpaque(ptr->page)->rightlink =
418 ptr->next->block.blkno;
419 else
420 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
421
422 /*
423 * Mark the all but the right-most page with the follow-right
424 * flag. It will be cleared as soon as the downlink is inserted
425 * into the parent, but this ensures that if we error out before
426 * that, the index is still consistent. (in buffering build mode,
427 * any error will abort the index build anyway, so this is not
428 * needed.)
429 */
430 if (ptr->next && !is_rootsplit && markfollowright)
431 GistMarkFollowRight(ptr->page);
432 else
433 GistClearFollowRight(ptr->page);
434
435 /*
436 * Copy the NSN of the original page to all pages. The
437 * F_FOLLOW_RIGHT flags ensure that scans will follow the
438 * rightlinks until the downlinks are inserted.
439 */
440 GistPageSetNSN(ptr->page, oldnsn);
441 }
442
443 /*
444 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
445 * insertion for that. NB: The number of pages and data segments
446 * specified here must match the calculations in gistXLogSplit()!
447 */
448 if (RelationNeedsWAL(rel))
449 XLogEnsureRecordSpace(npage, 1 + npage * 2);
450
451 START_CRIT_SECTION();
452
453 /*
454 * Must mark buffers dirty before XLogInsert, even though we'll still
455 * be changing their opaque fields below.
456 */
457 for (ptr = dist; ptr; ptr = ptr->next)
458 MarkBufferDirty(ptr->buffer);
459 if (BufferIsValid(leftchildbuf))
460 MarkBufferDirty(leftchildbuf);
461
462 /*
463 * The first page in the chain was a temporary working copy meant to
464 * replace the old page. Copy it over the old page.
465 */
466 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
467 dist->page = BufferGetPage(dist->buffer);
468
469 /* Write the WAL record */
470 if (RelationNeedsWAL(rel))
471 recptr = gistXLogSplit(is_leaf,
472 dist, oldrlink, oldnsn, leftchildbuf,
473 markfollowright);
474 else
475 recptr = gistGetFakeLSN(rel);
476
477 for (ptr = dist; ptr; ptr = ptr->next)
478 {
479 PageSetLSN(ptr->page, recptr);
480 }
481
482 /*
483 * Return the new child buffers to the caller.
484 *
485 * If this was a root split, we've already inserted the downlink
486 * pointers, in the form of a new root page. Therefore we can release
487 * all the new buffers, and keep just the root page locked.
488 */
489 if (is_rootsplit)
490 {
491 for (ptr = dist->next; ptr; ptr = ptr->next)
492 UnlockReleaseBuffer(ptr->buffer);
493 }
494 }
495 else
496 {
497 /*
498 * Enough space. We also get here if ntuples==0.
499 */
500 START_CRIT_SECTION();
501
502 /*
503 * While we delete only one tuple at once we could mix calls
504 * PageIndexTupleDelete() here and PageIndexMultiDelete() in
505 * gistRedoPageUpdateRecord()
506 */
507 if (OffsetNumberIsValid(oldoffnum))
508 PageIndexTupleDelete(page, oldoffnum);
509 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
510
511 MarkBufferDirty(buffer);
512
513 if (BufferIsValid(leftchildbuf))
514 MarkBufferDirty(leftchildbuf);
515
516 if (RelationNeedsWAL(rel))
517 {
518 OffsetNumber ndeloffs = 0,
519 deloffs[1];
520
521 if (OffsetNumberIsValid(oldoffnum))
522 {
523 deloffs[0] = oldoffnum;
524 ndeloffs = 1;
525 }
526
527 recptr = gistXLogUpdate(buffer,
528 deloffs, ndeloffs, itup, ntup,
529 leftchildbuf, NULL);
530
531 PageSetLSN(page, recptr);
532 }
533 else
534 {
535 recptr = gistGetFakeLSN(rel);
536 PageSetLSN(page, recptr);
537 }
538
539 if (newblkno)
540 *newblkno = blkno;
541 }
542
543 /*
544 * If we inserted the downlink for a child page, set NSN and clear
545 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
546 * follow the rightlink if and only if they looked at the parent page
547 * before we inserted the downlink.
548 *
549 * Note that we do this *after* writing the WAL record. That means that
550 * the possible full page image in the WAL record does not include these
551 * changes, and they must be replayed even if the page is restored from
552 * the full page image. There's a chicken-and-egg problem: if we updated
553 * the child pages first, we wouldn't know the recptr of the WAL record
554 * we're about to write.
555 */
556 if (BufferIsValid(leftchildbuf))
557 {
558 Page leftpg = BufferGetPage(leftchildbuf);
559
560 GistPageSetNSN(leftpg, recptr);
561 GistClearFollowRight(leftpg);
562
563 PageSetLSN(leftpg, recptr);
564 }
565
566 END_CRIT_SECTION();
567
568 return is_split;
569 }
570
571 /*
572 * Workhouse routine for doing insertion into a GiST index. Note that
573 * this routine assumes it is invoked in a short-lived memory context,
574 * so it does not bother releasing palloc'd allocations.
575 */
576 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel)577 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
578 GISTSTATE *giststate, Relation heapRel)
579 {
580 ItemId iid;
581 IndexTuple idxtuple;
582 GISTInsertStack firststack;
583 GISTInsertStack *stack;
584 GISTInsertState state;
585 bool xlocked = false;
586
587 memset(&state, 0, sizeof(GISTInsertState));
588 state.freespace = freespace;
589 state.r = r;
590 state.heapRel = heapRel;
591
592 /* Start from the root */
593 firststack.blkno = GIST_ROOT_BLKNO;
594 firststack.lsn = 0;
595 firststack.parent = NULL;
596 firststack.downlinkoffnum = InvalidOffsetNumber;
597 state.stack = stack = &firststack;
598
599 /*
600 * Walk down along the path of smallest penalty, updating the parent
601 * pointers with the key we're inserting as we go. If we crash in the
602 * middle, the tree is consistent, although the possible parent updates
603 * were a waste.
604 */
605 for (;;)
606 {
607 if (XLogRecPtrIsInvalid(stack->lsn))
608 stack->buffer = ReadBuffer(state.r, stack->blkno);
609
610 /*
611 * Be optimistic and grab shared lock first. Swap it for an exclusive
612 * lock later if we need to update the page.
613 */
614 if (!xlocked)
615 {
616 LockBuffer(stack->buffer, GIST_SHARE);
617 gistcheckpage(state.r, stack->buffer);
618 }
619
620 stack->page = (Page) BufferGetPage(stack->buffer);
621 stack->lsn = xlocked ?
622 PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
623 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
624
625 /*
626 * If this page was split but the downlink was never inserted to the
627 * parent because the inserting backend crashed before doing that, fix
628 * that now.
629 */
630 if (GistFollowRight(stack->page))
631 {
632 if (!xlocked)
633 {
634 LockBuffer(stack->buffer, GIST_UNLOCK);
635 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
636 xlocked = true;
637 /* someone might've completed the split when we unlocked */
638 if (!GistFollowRight(stack->page))
639 continue;
640 }
641 gistfixsplit(&state, giststate);
642
643 UnlockReleaseBuffer(stack->buffer);
644 xlocked = false;
645 state.stack = stack = stack->parent;
646 continue;
647 }
648
649 if (stack->blkno != GIST_ROOT_BLKNO &&
650 stack->parent->lsn < GistPageGetNSN(stack->page))
651 {
652 /*
653 * Concurrent split detected. There's no guarantee that the
654 * downlink for this page is consistent with the tuple we're
655 * inserting anymore, so go back to parent and rechoose the best
656 * child.
657 */
658 UnlockReleaseBuffer(stack->buffer);
659 xlocked = false;
660 state.stack = stack = stack->parent;
661 continue;
662 }
663
664 if (!GistPageIsLeaf(stack->page))
665 {
666 /*
667 * This is an internal page so continue to walk down the tree.
668 * Find the child node that has the minimum insertion penalty.
669 */
670 BlockNumber childblkno;
671 IndexTuple newtup;
672 GISTInsertStack *item;
673 OffsetNumber downlinkoffnum;
674
675 downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
676 iid = PageGetItemId(stack->page, downlinkoffnum);
677 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
678 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
679
680 /*
681 * Check that it's not a leftover invalid tuple from pre-9.1
682 */
683 if (GistTupleIsInvalid(idxtuple))
684 ereport(ERROR,
685 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
686 RelationGetRelationName(r)),
687 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
688 errhint("Please REINDEX it.")));
689
690 /*
691 * Check that the key representing the target child node is
692 * consistent with the key we're inserting. Update it if it's not.
693 */
694 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
695 if (newtup)
696 {
697 /*
698 * Swap shared lock for an exclusive one. Beware, the page may
699 * change while we unlock/lock the page...
700 */
701 if (!xlocked)
702 {
703 LockBuffer(stack->buffer, GIST_UNLOCK);
704 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
705 xlocked = true;
706 stack->page = (Page) BufferGetPage(stack->buffer);
707
708 if (PageGetLSN(stack->page) != stack->lsn)
709 {
710 /* the page was changed while we unlocked it, retry */
711 continue;
712 }
713 }
714
715 /*
716 * Update the tuple.
717 *
718 * We still hold the lock after gistinserttuple(), but it
719 * might have to split the page to make the updated tuple fit.
720 * In that case the updated tuple might migrate to the other
721 * half of the split, so we have to go back to the parent and
722 * descend back to the half that's a better fit for the new
723 * tuple.
724 */
725 if (gistinserttuple(&state, stack, giststate, newtup,
726 downlinkoffnum))
727 {
728 /*
729 * If this was a root split, the root page continues to be
730 * the parent and the updated tuple went to one of the
731 * child pages, so we just need to retry from the root
732 * page.
733 */
734 if (stack->blkno != GIST_ROOT_BLKNO)
735 {
736 UnlockReleaseBuffer(stack->buffer);
737 xlocked = false;
738 state.stack = stack = stack->parent;
739 }
740 continue;
741 }
742 }
743 LockBuffer(stack->buffer, GIST_UNLOCK);
744 xlocked = false;
745
746 /* descend to the chosen child */
747 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
748 item->blkno = childblkno;
749 item->parent = stack;
750 item->downlinkoffnum = downlinkoffnum;
751 state.stack = stack = item;
752 }
753 else
754 {
755 /*
756 * Leaf page. Insert the new key. We've already updated all the
757 * parents on the way down, but we might have to split the page if
758 * it doesn't fit. gistinserthere() will take care of that.
759 */
760
761 /*
762 * Swap shared lock for an exclusive one. Be careful, the page may
763 * change while we unlock/lock the page...
764 */
765 if (!xlocked)
766 {
767 LockBuffer(stack->buffer, GIST_UNLOCK);
768 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
769 xlocked = true;
770 stack->page = (Page) BufferGetPage(stack->buffer);
771 stack->lsn = PageGetLSN(stack->page);
772
773 if (stack->blkno == GIST_ROOT_BLKNO)
774 {
775 /*
776 * the only page that can become inner instead of leaf is
777 * the root page, so for root we should recheck it
778 */
779 if (!GistPageIsLeaf(stack->page))
780 {
781 /*
782 * very rare situation: during unlock/lock index with
783 * number of pages = 1 was increased
784 */
785 LockBuffer(stack->buffer, GIST_UNLOCK);
786 xlocked = false;
787 continue;
788 }
789
790 /*
791 * we don't need to check root split, because checking
792 * leaf/inner is enough to recognize split for root
793 */
794 }
795 else if (GistFollowRight(stack->page) ||
796 stack->parent->lsn < GistPageGetNSN(stack->page))
797 {
798 /*
799 * The page was split while we momentarily unlocked the
800 * page. Go back to parent.
801 */
802 UnlockReleaseBuffer(stack->buffer);
803 xlocked = false;
804 state.stack = stack = stack->parent;
805 continue;
806 }
807 }
808
809 /* now state.stack->(page, buffer and blkno) points to leaf page */
810
811 gistinserttuple(&state, stack, giststate, itup,
812 InvalidOffsetNumber);
813 LockBuffer(stack->buffer, GIST_UNLOCK);
814
815 /* Release any pins we might still hold before exiting */
816 for (; stack; stack = stack->parent)
817 ReleaseBuffer(stack->buffer);
818 break;
819 }
820 }
821 }
822
823 /*
824 * Traverse the tree to find path from root page to specified "child" block.
825 *
826 * returns a new insertion stack, starting from the parent of "child", up
827 * to the root. *downlinkoffnum is set to the offset of the downlink in the
828 * direct parent of child.
829 *
830 * To prevent deadlocks, this should lock only one page at a time.
831 */
832 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)833 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
834 {
835 Page page;
836 Buffer buffer;
837 OffsetNumber i,
838 maxoff;
839 ItemId iid;
840 IndexTuple idxtuple;
841 List *fifo;
842 GISTInsertStack *top,
843 *ptr;
844 BlockNumber blkno;
845
846 top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
847 top->blkno = GIST_ROOT_BLKNO;
848 top->downlinkoffnum = InvalidOffsetNumber;
849
850 fifo = list_make1(top);
851 while (fifo != NIL)
852 {
853 /* Get next page to visit */
854 top = linitial(fifo);
855 fifo = list_delete_first(fifo);
856
857 buffer = ReadBuffer(r, top->blkno);
858 LockBuffer(buffer, GIST_SHARE);
859 gistcheckpage(r, buffer);
860 page = (Page) BufferGetPage(buffer);
861
862 if (GistPageIsLeaf(page))
863 {
864 /*
865 * Because we scan the index top-down, all the rest of the pages
866 * in the queue must be leaf pages as well.
867 */
868 UnlockReleaseBuffer(buffer);
869 break;
870 }
871
872 top->lsn = BufferGetLSNAtomic(buffer);
873
874 /*
875 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
876 * downlink. This should not normally happen..
877 */
878 if (GistFollowRight(page))
879 elog(ERROR, "concurrent GiST page split was incomplete");
880
881 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
882 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
883 {
884 /*
885 * Page was split while we looked elsewhere. We didn't see the
886 * downlink to the right page when we scanned the parent, so add
887 * it to the queue now.
888 *
889 * Put the right page ahead of the queue, so that we visit it
890 * next. That's important, because if this is the lowest internal
891 * level, just above leaves, we might already have queued up some
892 * leaf pages, and we assume that there can't be any non-leaf
893 * pages behind leaf pages.
894 */
895 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
896 ptr->blkno = GistPageGetOpaque(page)->rightlink;
897 ptr->downlinkoffnum = InvalidOffsetNumber;
898 ptr->parent = top->parent;
899
900 fifo = lcons(ptr, fifo);
901 }
902
903 maxoff = PageGetMaxOffsetNumber(page);
904
905 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
906 {
907 iid = PageGetItemId(page, i);
908 idxtuple = (IndexTuple) PageGetItem(page, iid);
909 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
910 if (blkno == child)
911 {
912 /* Found it! */
913 UnlockReleaseBuffer(buffer);
914 *downlinkoffnum = i;
915 return top;
916 }
917 else
918 {
919 /* Append this child to the list of pages to visit later */
920 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
921 ptr->blkno = blkno;
922 ptr->downlinkoffnum = i;
923 ptr->parent = top;
924
925 fifo = lappend(fifo, ptr);
926 }
927 }
928
929 UnlockReleaseBuffer(buffer);
930 }
931
932 elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
933 RelationGetRelationName(r), child);
934 return NULL; /* keep compiler quiet */
935 }
936
937 /*
938 * Updates the stack so that child->parent is the correct parent of the
939 * child. child->parent must be exclusively locked on entry, and will
940 * remain so at exit, but it might not be the same page anymore.
941 */
942 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)943 gistFindCorrectParent(Relation r, GISTInsertStack *child)
944 {
945 GISTInsertStack *parent = child->parent;
946
947 gistcheckpage(r, parent->buffer);
948 parent->page = (Page) BufferGetPage(parent->buffer);
949
950 /* here we don't need to distinguish between split and page update */
951 if (child->downlinkoffnum == InvalidOffsetNumber ||
952 parent->lsn != PageGetLSN(parent->page))
953 {
954 /* parent is changed, look child in right links until found */
955 OffsetNumber i,
956 maxoff;
957 ItemId iid;
958 IndexTuple idxtuple;
959 GISTInsertStack *ptr;
960
961 while (true)
962 {
963 maxoff = PageGetMaxOffsetNumber(parent->page);
964 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
965 {
966 iid = PageGetItemId(parent->page, i);
967 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
968 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
969 {
970 /* yes!!, found */
971 child->downlinkoffnum = i;
972 return;
973 }
974 }
975
976 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
977 UnlockReleaseBuffer(parent->buffer);
978 if (parent->blkno == InvalidBlockNumber)
979 {
980 /*
981 * End of chain and still didn't find parent. It's a very-very
982 * rare situation when root splited.
983 */
984 break;
985 }
986 parent->buffer = ReadBuffer(r, parent->blkno);
987 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
988 gistcheckpage(r, parent->buffer);
989 parent->page = (Page) BufferGetPage(parent->buffer);
990 }
991
992 /*
993 * awful!!, we need search tree to find parent ... , but before we
994 * should release all old parent
995 */
996
997 ptr = child->parent->parent; /* child->parent already released
998 * above */
999 while (ptr)
1000 {
1001 ReleaseBuffer(ptr->buffer);
1002 ptr = ptr->parent;
1003 }
1004
1005 /* ok, find new path */
1006 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1007
1008 /* read all buffers as expected by caller */
1009 /* note we don't lock them or gistcheckpage them here! */
1010 while (ptr)
1011 {
1012 ptr->buffer = ReadBuffer(r, ptr->blkno);
1013 ptr->page = (Page) BufferGetPage(ptr->buffer);
1014 ptr = ptr->parent;
1015 }
1016
1017 /* install new chain of parents to stack */
1018 child->parent = parent;
1019
1020 /* make recursive call to normal processing */
1021 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1022 gistFindCorrectParent(r, child);
1023 }
1024
1025 return;
1026 }
1027
1028 /*
1029 * Form a downlink pointer for the page in 'buf'.
1030 */
1031 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1032 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1033 GISTInsertStack *stack)
1034 {
1035 Page page = BufferGetPage(buf);
1036 OffsetNumber maxoff;
1037 OffsetNumber offset;
1038 IndexTuple downlink = NULL;
1039
1040 maxoff = PageGetMaxOffsetNumber(page);
1041 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1042 {
1043 IndexTuple ituple = (IndexTuple)
1044 PageGetItem(page, PageGetItemId(page, offset));
1045
1046 if (downlink == NULL)
1047 downlink = CopyIndexTuple(ituple);
1048 else
1049 {
1050 IndexTuple newdownlink;
1051
1052 newdownlink = gistgetadjusted(rel, downlink, ituple,
1053 giststate);
1054 if (newdownlink)
1055 downlink = newdownlink;
1056 }
1057 }
1058
1059 /*
1060 * If the page is completely empty, we can't form a meaningful downlink
1061 * for it. But we have to insert a downlink for the page. Any key will do,
1062 * as long as its consistent with the downlink of parent page, so that we
1063 * can legally insert it to the parent. A minimal one that matches as few
1064 * scans as possible would be best, to keep scans from doing useless work,
1065 * but we don't know how to construct that. So we just use the downlink of
1066 * the original page that was split - that's as far from optimal as it can
1067 * get but will do..
1068 */
1069 if (!downlink)
1070 {
1071 ItemId iid;
1072
1073 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1074 gistFindCorrectParent(rel, stack);
1075 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1076 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1077 downlink = CopyIndexTuple(downlink);
1078 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1079 }
1080
1081 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1082 GistTupleSetValid(downlink);
1083
1084 return downlink;
1085 }
1086
1087
1088 /*
1089 * Complete the incomplete split of state->stack->page.
1090 */
1091 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1092 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1093 {
1094 GISTInsertStack *stack = state->stack;
1095 Buffer buf;
1096 Page page;
1097 List *splitinfo = NIL;
1098
1099 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1100 RelationGetRelationName(state->r), stack->blkno);
1101
1102 Assert(GistFollowRight(stack->page));
1103 Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1104
1105 buf = stack->buffer;
1106
1107 /*
1108 * Read the chain of split pages, following the rightlinks. Construct a
1109 * downlink tuple for each page.
1110 */
1111 for (;;)
1112 {
1113 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1114 IndexTuple downlink;
1115
1116 page = BufferGetPage(buf);
1117
1118 /* Form the new downlink tuples to insert to parent */
1119 downlink = gistformdownlink(state->r, buf, giststate, stack);
1120
1121 si->buf = buf;
1122 si->downlink = downlink;
1123
1124 splitinfo = lappend(splitinfo, si);
1125
1126 if (GistFollowRight(page))
1127 {
1128 /* lock next page */
1129 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1130 LockBuffer(buf, GIST_EXCLUSIVE);
1131 }
1132 else
1133 break;
1134 }
1135
1136 /* Insert the downlinks */
1137 gistfinishsplit(state, stack, giststate, splitinfo, false);
1138 }
1139
1140 /*
1141 * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1142 * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1143 * 'stack' represents the path from the root to the page being updated.
1144 *
1145 * The caller must hold an exclusive lock on stack->buffer. The lock is still
1146 * held on return, but the page might not contain the inserted tuple if the
1147 * page was split. The function returns true if the page was split, false
1148 * otherwise.
1149 */
1150 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1151 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1152 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1153 {
1154 return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1155 InvalidBuffer, InvalidBuffer, false, false);
1156 }
1157
1158 /* ----------------
1159 * An extended workhorse version of gistinserttuple(). This version allows
1160 * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1161 * This is used to recursively update the downlinks in the parent when a page
1162 * is split.
1163 *
1164 * If leftchild and rightchild are valid, we're inserting/replacing the
1165 * downlink for rightchild, and leftchild is its left sibling. We clear the
1166 * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1167 * insertion of the downlink.
1168 *
1169 * To avoid holding locks for longer than necessary, when recursing up the
1170 * tree to update the parents, the locking is a bit peculiar here. On entry,
1171 * the caller must hold an exclusive lock on stack->buffer, as well as
1172 * leftchild and rightchild if given. On return:
1173 *
1174 * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1175 * always kept pinned, however.
1176 * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1177 * is kept pinned.
1178 * - Lock and pin on 'rightchild' are always released.
1179 *
1180 * Returns 'true' if the page had to be split. Note that if the page was
1181 * split, the inserted/updated tuples might've been inserted to a right
1182 * sibling of stack->buffer instead of stack->buffer itself.
1183 */
1184 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1185 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1186 GISTSTATE *giststate,
1187 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1188 Buffer leftchild, Buffer rightchild,
1189 bool unlockbuf, bool unlockleftchild)
1190 {
1191 List *splitinfo;
1192 bool is_split;
1193
1194 /* Insert the tuple(s) to the page, splitting the page if necessary */
1195 is_split = gistplacetopage(state->r, state->freespace, giststate,
1196 stack->buffer,
1197 tuples, ntup,
1198 oldoffnum, NULL,
1199 leftchild,
1200 &splitinfo,
1201 true,
1202 state->heapRel);
1203
1204 /*
1205 * Before recursing up in case the page was split, release locks on the
1206 * child pages. We don't need to keep them locked when updating the
1207 * parent.
1208 */
1209 if (BufferIsValid(rightchild))
1210 UnlockReleaseBuffer(rightchild);
1211 if (BufferIsValid(leftchild) && unlockleftchild)
1212 LockBuffer(leftchild, GIST_UNLOCK);
1213
1214 /*
1215 * If we had to split, insert/update the downlinks in the parent. If the
1216 * caller requested us to release the lock on stack->buffer, tell
1217 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1218 * didn't have to split, release it ourselves.
1219 */
1220 if (splitinfo)
1221 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1222 else if (unlockbuf)
1223 LockBuffer(stack->buffer, GIST_UNLOCK);
1224
1225 return is_split;
1226 }
1227
1228 /*
1229 * Finish an incomplete split by inserting/updating the downlinks in parent
1230 * page. 'splitinfo' contains all the child pages involved in the split,
1231 * from left-to-right.
1232 *
1233 * On entry, the caller must hold a lock on stack->buffer and all the child
1234 * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1235 * released on return. The child pages are always unlocked and unpinned.
1236 */
1237 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1238 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1239 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1240 {
1241 ListCell *lc;
1242 List *reversed;
1243 GISTPageSplitInfo *right;
1244 GISTPageSplitInfo *left;
1245 IndexTuple tuples[2];
1246
1247 /* A split always contains at least two halves */
1248 Assert(list_length(splitinfo) >= 2);
1249
1250 /*
1251 * We need to insert downlinks for each new page, and update the downlink
1252 * for the original (leftmost) page in the split. Begin at the rightmost
1253 * page, inserting one downlink at a time until there's only two pages
1254 * left. Finally insert the downlink for the last new page and update the
1255 * downlink for the original page as one operation.
1256 */
1257
1258 /* for convenience, create a copy of the list in reverse order */
1259 reversed = NIL;
1260 foreach(lc, splitinfo)
1261 {
1262 reversed = lcons(lfirst(lc), reversed);
1263 }
1264
1265 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1266 gistFindCorrectParent(state->r, stack);
1267
1268 /*
1269 * insert downlinks for the siblings from right to left, until there are
1270 * only two siblings left.
1271 */
1272 while (list_length(reversed) > 2)
1273 {
1274 right = (GISTPageSplitInfo *) linitial(reversed);
1275 left = (GISTPageSplitInfo *) lsecond(reversed);
1276
1277 if (gistinserttuples(state, stack->parent, giststate,
1278 &right->downlink, 1,
1279 InvalidOffsetNumber,
1280 left->buf, right->buf, false, false))
1281 {
1282 /*
1283 * If the parent page was split, need to relocate the original
1284 * parent pointer.
1285 */
1286 gistFindCorrectParent(state->r, stack);
1287 }
1288 /* gistinserttuples() released the lock on right->buf. */
1289 reversed = list_delete_first(reversed);
1290 }
1291
1292 right = (GISTPageSplitInfo *) linitial(reversed);
1293 left = (GISTPageSplitInfo *) lsecond(reversed);
1294
1295 /*
1296 * Finally insert downlink for the remaining right page and update the
1297 * downlink for the original page to not contain the tuples that were
1298 * moved to the new pages.
1299 */
1300 tuples[0] = left->downlink;
1301 tuples[1] = right->downlink;
1302 gistinserttuples(state, stack->parent, giststate,
1303 tuples, 2,
1304 stack->downlinkoffnum,
1305 left->buf, right->buf,
1306 true, /* Unlock parent */
1307 unlockbuf /* Unlock stack->buffer if caller wants that */
1308 );
1309 Assert(left->buf == stack->buffer);
1310 }
1311
1312 /*
1313 * gistSplit -- split a page in the tree and fill struct
1314 * used for XLOG and real writes buffers. Function is recursive, ie
1315 * it will split page until keys will fit in every page.
1316 */
1317 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1318 gistSplit(Relation r,
1319 Page page,
1320 IndexTuple *itup, /* contains compressed entry */
1321 int len,
1322 GISTSTATE *giststate)
1323 {
1324 IndexTuple *lvectup,
1325 *rvectup;
1326 GistSplitVector v;
1327 int i;
1328 SplitedPageLayout *res = NULL;
1329
1330 /* this should never recurse very deeply, but better safe than sorry */
1331 check_stack_depth();
1332
1333 /* there's no point in splitting an empty page */
1334 Assert(len > 0);
1335
1336 /*
1337 * If a single tuple doesn't fit on a page, no amount of splitting will
1338 * help.
1339 */
1340 if (len == 1)
1341 ereport(ERROR,
1342 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1343 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1344 IndexTupleSize(itup[0]), GiSTPageSize,
1345 RelationGetRelationName(r))));
1346
1347 memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1348 memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1349 gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1350
1351 /* form left and right vector */
1352 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1353 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1354
1355 for (i = 0; i < v.splitVector.spl_nleft; i++)
1356 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1357
1358 for (i = 0; i < v.splitVector.spl_nright; i++)
1359 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1360
1361 /* finalize splitting (may need another split) */
1362 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1363 {
1364 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1365 }
1366 else
1367 {
1368 ROTATEDIST(res);
1369 res->block.num = v.splitVector.spl_nright;
1370 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1371 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1372 }
1373
1374 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1375 {
1376 SplitedPageLayout *resptr,
1377 *subres;
1378
1379 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1380
1381 /* install on list's tail */
1382 while (resptr->next)
1383 resptr = resptr->next;
1384
1385 resptr->next = res;
1386 res = subres;
1387 }
1388 else
1389 {
1390 ROTATEDIST(res);
1391 res->block.num = v.splitVector.spl_nleft;
1392 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1393 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1394 }
1395
1396 return res;
1397 }
1398
1399 /*
1400 * Create a GISTSTATE and fill it with information about the index
1401 */
1402 GISTSTATE *
initGISTstate(Relation index)1403 initGISTstate(Relation index)
1404 {
1405 GISTSTATE *giststate;
1406 MemoryContext scanCxt;
1407 MemoryContext oldCxt;
1408 int i;
1409
1410 /* safety check to protect fixed-size arrays in GISTSTATE */
1411 if (index->rd_att->natts > INDEX_MAX_KEYS)
1412 elog(ERROR, "numberOfAttributes %d > %d",
1413 index->rd_att->natts, INDEX_MAX_KEYS);
1414
1415 /* Create the memory context that will hold the GISTSTATE */
1416 scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1417 "GiST scan context",
1418 ALLOCSET_DEFAULT_SIZES);
1419 oldCxt = MemoryContextSwitchTo(scanCxt);
1420
1421 /* Create and fill in the GISTSTATE */
1422 giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1423
1424 giststate->scanCxt = scanCxt;
1425 giststate->tempCxt = scanCxt; /* caller must change this if needed */
1426 giststate->tupdesc = index->rd_att;
1427
1428 for (i = 0; i < index->rd_att->natts; i++)
1429 {
1430 fmgr_info_copy(&(giststate->consistentFn[i]),
1431 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1432 scanCxt);
1433 fmgr_info_copy(&(giststate->unionFn[i]),
1434 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1435 scanCxt);
1436 fmgr_info_copy(&(giststate->compressFn[i]),
1437 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1438 scanCxt);
1439 fmgr_info_copy(&(giststate->decompressFn[i]),
1440 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1441 scanCxt);
1442 fmgr_info_copy(&(giststate->penaltyFn[i]),
1443 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1444 scanCxt);
1445 fmgr_info_copy(&(giststate->picksplitFn[i]),
1446 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1447 scanCxt);
1448 fmgr_info_copy(&(giststate->equalFn[i]),
1449 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1450 scanCxt);
1451 /* opclasses are not required to provide a Distance method */
1452 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1453 fmgr_info_copy(&(giststate->distanceFn[i]),
1454 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1455 scanCxt);
1456 else
1457 giststate->distanceFn[i].fn_oid = InvalidOid;
1458
1459 /* opclasses are not required to provide a Fetch method */
1460 if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1461 fmgr_info_copy(&(giststate->fetchFn[i]),
1462 index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1463 scanCxt);
1464 else
1465 giststate->fetchFn[i].fn_oid = InvalidOid;
1466
1467 /*
1468 * If the index column has a specified collation, we should honor that
1469 * while doing comparisons. However, we may have a collatable storage
1470 * type for a noncollatable indexed data type. If there's no index
1471 * collation then specify default collation in case the support
1472 * functions need collation. This is harmless if the support
1473 * functions don't care about collation, so we just do it
1474 * unconditionally. (We could alternatively call get_typcollation,
1475 * but that seems like expensive overkill --- there aren't going to be
1476 * any cases where a GiST storage type has a nondefault collation.)
1477 */
1478 if (OidIsValid(index->rd_indcollation[i]))
1479 giststate->supportCollation[i] = index->rd_indcollation[i];
1480 else
1481 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1482 }
1483
1484 MemoryContextSwitchTo(oldCxt);
1485
1486 return giststate;
1487 }
1488
1489 void
freeGISTstate(GISTSTATE * giststate)1490 freeGISTstate(GISTSTATE *giststate)
1491 {
1492 /* It's sufficient to delete the scanCxt */
1493 MemoryContextDelete(giststate->scanCxt);
1494 }
1495
1496 /*
1497 * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1498 * Function assumes that buffer is exclusively locked.
1499 */
1500 static void
gistvacuumpage(Relation rel,Page page,Buffer buffer,Relation heapRel)1501 gistvacuumpage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1502 {
1503 OffsetNumber deletable[MaxIndexTuplesPerPage];
1504 int ndeletable = 0;
1505 OffsetNumber offnum,
1506 maxoff;
1507
1508 Assert(GistPageIsLeaf(page));
1509
1510 /*
1511 * Scan over all items to see which ones need to be deleted according to
1512 * LP_DEAD flags.
1513 */
1514 maxoff = PageGetMaxOffsetNumber(page);
1515 for (offnum = FirstOffsetNumber;
1516 offnum <= maxoff;
1517 offnum = OffsetNumberNext(offnum))
1518 {
1519 ItemId itemId = PageGetItemId(page, offnum);
1520
1521 if (ItemIdIsDead(itemId))
1522 deletable[ndeletable++] = offnum;
1523 }
1524
1525 if (ndeletable > 0)
1526 {
1527 START_CRIT_SECTION();
1528
1529 PageIndexMultiDelete(page, deletable, ndeletable);
1530
1531 /*
1532 * Mark the page as not containing any LP_DEAD items. This is not
1533 * certainly true (there might be some that have recently been marked,
1534 * but weren't included in our target-item list), but it will almost
1535 * always be true and it doesn't seem worth an additional page scan to
1536 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1537 */
1538 GistClearPageHasGarbage(page);
1539
1540 MarkBufferDirty(buffer);
1541
1542 /* XLOG stuff */
1543 if (RelationNeedsWAL(rel))
1544 {
1545 XLogRecPtr recptr;
1546
1547 recptr = gistXLogUpdate(buffer,
1548 deletable, ndeletable,
1549 NULL, 0, InvalidBuffer,
1550 &heapRel->rd_node);
1551
1552 PageSetLSN(page, recptr);
1553 }
1554 else
1555 PageSetLSN(page, gistGetFakeLSN(rel));
1556
1557 END_CRIT_SECTION();
1558 }
1559
1560 /*
1561 * Note: if we didn't find any LP_DEAD items, then the page's
1562 * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1563 * separate write to clear it, however. We will clear it when we split
1564 * the page.
1565 */
1566 }
1567