1 /*-------------------------------------------------------------------------
2 *
3 * gist.c
4 * interface routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "commands/vacuum.h"
21 #include "miscadmin.h"
22 #include "nodes/execnodes.h"
23 #include "storage/lmgr.h"
24 #include "storage/predicate.h"
25 #include "utils/builtins.h"
26 #include "utils/index_selfuncs.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29
30 /* non-export function prototypes */
31 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35 GISTSTATE *giststate,
36 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37 Buffer leftchild, Buffer rightchild,
38 bool unlockbuf, bool unlockleftchild);
39 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40 GISTSTATE *giststate, List *splitinfo, bool unlockbuf);
41 static void gistprunepage(Relation rel, Page page, Buffer buffer,
42 Relation heapRel);
43
44
45 #define ROTATEDIST(d) do { \
46 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc0(sizeof(SplitedPageLayout)); \
47 tmp->block.blkno = InvalidBlockNumber; \
48 tmp->buffer = InvalidBuffer; \
49 tmp->next = (d); \
50 (d)=tmp; \
51 } while(0)
52
53
54 /*
55 * GiST handler function: return IndexAmRoutine with access method parameters
56 * and callbacks.
57 */
58 Datum
gisthandler(PG_FUNCTION_ARGS)59 gisthandler(PG_FUNCTION_ARGS)
60 {
61 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
62
63 amroutine->amstrategies = 0;
64 amroutine->amsupport = GISTNProcs;
65 amroutine->amoptsprocnum = GIST_OPTIONS_PROC;
66 amroutine->amcanorder = false;
67 amroutine->amcanorderbyop = true;
68 amroutine->amcanbackward = false;
69 amroutine->amcanunique = false;
70 amroutine->amcanmulticol = true;
71 amroutine->amoptionalkey = true;
72 amroutine->amsearcharray = false;
73 amroutine->amsearchnulls = true;
74 amroutine->amstorage = true;
75 amroutine->amclusterable = true;
76 amroutine->ampredlocks = true;
77 amroutine->amcanparallel = false;
78 amroutine->amcaninclude = true;
79 amroutine->amusemaintenanceworkmem = false;
80 amroutine->amparallelvacuumoptions =
81 VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
82 amroutine->amkeytype = InvalidOid;
83
84 amroutine->ambuild = gistbuild;
85 amroutine->ambuildempty = gistbuildempty;
86 amroutine->aminsert = gistinsert;
87 amroutine->ambulkdelete = gistbulkdelete;
88 amroutine->amvacuumcleanup = gistvacuumcleanup;
89 amroutine->amcanreturn = gistcanreturn;
90 amroutine->amcostestimate = gistcostestimate;
91 amroutine->amoptions = gistoptions;
92 amroutine->amproperty = gistproperty;
93 amroutine->ambuildphasename = NULL;
94 amroutine->amvalidate = gistvalidate;
95 amroutine->ambeginscan = gistbeginscan;
96 amroutine->amrescan = gistrescan;
97 amroutine->amgettuple = gistgettuple;
98 amroutine->amgetbitmap = gistgetbitmap;
99 amroutine->amendscan = gistendscan;
100 amroutine->ammarkpos = NULL;
101 amroutine->amrestrpos = NULL;
102 amroutine->amestimateparallelscan = NULL;
103 amroutine->aminitparallelscan = NULL;
104 amroutine->amparallelrescan = NULL;
105
106 PG_RETURN_POINTER(amroutine);
107 }
108
109 /*
110 * Create and return a temporary memory context for use by GiST. We
111 * _always_ invoke user-provided methods in a temporary memory
112 * context, so that memory leaks in those functions cannot cause
113 * problems. Also, we use some additional temporary contexts in the
114 * GiST code itself, to avoid the need to do some awkward manual
115 * memory management.
116 */
117 MemoryContext
createTempGistContext(void)118 createTempGistContext(void)
119 {
120 return AllocSetContextCreate(CurrentMemoryContext,
121 "GiST temporary context",
122 ALLOCSET_DEFAULT_SIZES);
123 }
124
125 /*
126 * gistbuildempty() -- build an empty gist index in the initialization fork
127 */
128 void
gistbuildempty(Relation index)129 gistbuildempty(Relation index)
130 {
131 Buffer buffer;
132
133 /* Initialize the root page */
134 buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
135 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
136
137 /* Initialize and xlog buffer */
138 START_CRIT_SECTION();
139 GISTInitBuffer(buffer, F_LEAF);
140 MarkBufferDirty(buffer);
141 log_newpage_buffer(buffer, true);
142 END_CRIT_SECTION();
143
144 /* Unlock and release the buffer */
145 UnlockReleaseBuffer(buffer);
146 }
147
148 /*
149 * gistinsert -- wrapper for GiST tuple insertion.
150 *
151 * This is the public interface routine for tuple insertion in GiSTs.
152 * It doesn't do any work; just locks the relation and passes the buck.
153 */
154 bool
gistinsert(Relation r,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)155 gistinsert(Relation r, Datum *values, bool *isnull,
156 ItemPointer ht_ctid, Relation heapRel,
157 IndexUniqueCheck checkUnique,
158 IndexInfo *indexInfo)
159 {
160 GISTSTATE *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
161 IndexTuple itup;
162 MemoryContext oldCxt;
163
164 /* Initialize GISTSTATE cache if first call in this statement */
165 if (giststate == NULL)
166 {
167 oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
168 giststate = initGISTstate(r);
169 giststate->tempCxt = createTempGistContext();
170 indexInfo->ii_AmCache = (void *) giststate;
171 MemoryContextSwitchTo(oldCxt);
172 }
173
174 oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
175
176 itup = gistFormTuple(giststate, r,
177 values, isnull, true /* size is currently bogus */ );
178 itup->t_tid = *ht_ctid;
179
180 gistdoinsert(r, itup, 0, giststate, heapRel, false);
181
182 /* cleanup */
183 MemoryContextSwitchTo(oldCxt);
184 MemoryContextReset(giststate->tempCxt);
185
186 return false;
187 }
188
189
190 /*
191 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
192 * at that offset is atomically removed along with inserting the new tuples.
193 * This is used to replace a tuple with a new one.
194 *
195 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
196 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
197 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
198 *
199 * If 'markfollowright' is true and the page is split, the left child is
200 * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
201 * index build, however, there is no concurrent access and the page splitting
202 * is done in a slightly simpler fashion, and false is passed.
203 *
204 * If there is not enough room on the page, it is split. All the split
205 * pages are kept pinned and locked and returned in *splitinfo, the caller
206 * is responsible for inserting the downlinks for them. However, if
207 * 'buffer' is the root page and it needs to be split, gistplacetopage()
208 * performs the split as one atomic operation, and *splitinfo is set to NIL.
209 * In that case, we continue to hold the root page locked, and the child
210 * pages are released; note that new tuple(s) are *not* on the root page
211 * but in one of the new child pages.
212 *
213 * If 'newblkno' is not NULL, returns the block number of page the first
214 * new/updated tuple was inserted to. Usually it's the given page, but could
215 * be its right sibling if the page was split.
216 *
217 * Returns 'true' if the page was split, 'false' otherwise.
218 */
219 bool
gistplacetopage(Relation rel,Size freespace,GISTSTATE * giststate,Buffer buffer,IndexTuple * itup,int ntup,OffsetNumber oldoffnum,BlockNumber * newblkno,Buffer leftchildbuf,List ** splitinfo,bool markfollowright,Relation heapRel,bool is_build)220 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
221 Buffer buffer,
222 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
223 BlockNumber *newblkno,
224 Buffer leftchildbuf,
225 List **splitinfo,
226 bool markfollowright,
227 Relation heapRel,
228 bool is_build)
229 {
230 BlockNumber blkno = BufferGetBlockNumber(buffer);
231 Page page = BufferGetPage(buffer);
232 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
233 XLogRecPtr recptr;
234 int i;
235 bool is_split;
236
237 /*
238 * Refuse to modify a page that's incompletely split. This should not
239 * happen because we finish any incomplete splits while we walk down the
240 * tree. However, it's remotely possible that another concurrent inserter
241 * splits a parent page, and errors out before completing the split. We
242 * will just throw an error in that case, and leave any split we had in
243 * progress unfinished too. The next insert that comes along will clean up
244 * the mess.
245 */
246 if (GistFollowRight(page))
247 elog(ERROR, "concurrent GiST page split was incomplete");
248
249 /* should never try to insert to a deleted page */
250 Assert(!GistPageIsDeleted(page));
251
252 *splitinfo = NIL;
253
254 /*
255 * if isupdate, remove old key: This node's key has been modified, either
256 * because a child split occurred or because we needed to adjust our key
257 * for an insert in a child node. Therefore, remove the old version of
258 * this node's key.
259 *
260 * for WAL replay, in the non-split case we handle this by setting up a
261 * one-element todelete array; in the split case, it's handled implicitly
262 * because the tuple vector passed to gistSplit won't include this tuple.
263 */
264 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
265
266 /*
267 * If leaf page is full, try at first to delete dead tuples. And then
268 * check again.
269 */
270 if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
271 {
272 gistprunepage(rel, page, buffer, heapRel);
273 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
274 }
275
276 if (is_split)
277 {
278 /* no space for insertion */
279 IndexTuple *itvec;
280 int tlen;
281 SplitedPageLayout *dist = NULL,
282 *ptr;
283 BlockNumber oldrlink = InvalidBlockNumber;
284 GistNSN oldnsn = 0;
285 SplitedPageLayout rootpg;
286 bool is_rootsplit;
287 int npage;
288
289 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
290
291 /*
292 * Form index tuples vector to split. If we're replacing an old tuple,
293 * remove the old version from the vector.
294 */
295 itvec = gistextractpage(page, &tlen);
296 if (OffsetNumberIsValid(oldoffnum))
297 {
298 /* on inner page we should remove old tuple */
299 int pos = oldoffnum - FirstOffsetNumber;
300
301 tlen--;
302 if (pos != tlen)
303 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
304 }
305 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
306 dist = gistSplit(rel, page, itvec, tlen, giststate);
307
308 /*
309 * Check that split didn't produce too many pages.
310 */
311 npage = 0;
312 for (ptr = dist; ptr; ptr = ptr->next)
313 npage++;
314 /* in a root split, we'll add one more page to the list below */
315 if (is_rootsplit)
316 npage++;
317 if (npage > GIST_MAX_SPLIT_PAGES)
318 elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
319 npage, GIST_MAX_SPLIT_PAGES);
320
321 /*
322 * Set up pages to work with. Allocate new buffers for all but the
323 * leftmost page. The original page becomes the new leftmost page, and
324 * is just replaced with the new contents.
325 *
326 * For a root-split, allocate new buffers for all child pages, the
327 * original page is overwritten with new root page containing
328 * downlinks to the new child pages.
329 */
330 ptr = dist;
331 if (!is_rootsplit)
332 {
333 /* save old rightlink and NSN */
334 oldrlink = GistPageGetOpaque(page)->rightlink;
335 oldnsn = GistPageGetNSN(page);
336
337 dist->buffer = buffer;
338 dist->block.blkno = BufferGetBlockNumber(buffer);
339 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
340
341 /* clean all flags except F_LEAF */
342 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
343
344 ptr = ptr->next;
345 }
346 for (; ptr; ptr = ptr->next)
347 {
348 /* Allocate new page */
349 ptr->buffer = gistNewBuffer(rel);
350 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
351 ptr->page = BufferGetPage(ptr->buffer);
352 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
353 PredicateLockPageSplit(rel,
354 BufferGetBlockNumber(buffer),
355 BufferGetBlockNumber(ptr->buffer));
356 }
357
358 /*
359 * Now that we know which blocks the new pages go to, set up downlink
360 * tuples to point to them.
361 */
362 for (ptr = dist; ptr; ptr = ptr->next)
363 {
364 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
365 GistTupleSetValid(ptr->itup);
366 }
367
368 /*
369 * If this is a root split, we construct the new root page with the
370 * downlinks here directly, instead of requiring the caller to insert
371 * them. Add the new root page to the list along with the child pages.
372 */
373 if (is_rootsplit)
374 {
375 IndexTuple *downlinks;
376 int ndownlinks = 0;
377 int i;
378
379 rootpg.buffer = buffer;
380 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
381 GistPageGetOpaque(rootpg.page)->flags = 0;
382
383 /* Prepare a vector of all the downlinks */
384 for (ptr = dist; ptr; ptr = ptr->next)
385 ndownlinks++;
386 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
387 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
388 downlinks[i++] = ptr->itup;
389
390 rootpg.block.blkno = GIST_ROOT_BLKNO;
391 rootpg.block.num = ndownlinks;
392 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
393 &(rootpg.lenlist));
394 rootpg.itup = NULL;
395
396 rootpg.next = dist;
397 dist = &rootpg;
398 }
399 else
400 {
401 /* Prepare split-info to be returned to caller */
402 for (ptr = dist; ptr; ptr = ptr->next)
403 {
404 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
405
406 si->buf = ptr->buffer;
407 si->downlink = ptr->itup;
408 *splitinfo = lappend(*splitinfo, si);
409 }
410 }
411
412 /*
413 * Fill all pages. All the pages are new, ie. freshly allocated empty
414 * pages, or a temporary copy of the old page.
415 */
416 for (ptr = dist; ptr; ptr = ptr->next)
417 {
418 char *data = (char *) (ptr->list);
419
420 for (i = 0; i < ptr->block.num; i++)
421 {
422 IndexTuple thistup = (IndexTuple) data;
423
424 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
425 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
426
427 /*
428 * If this is the first inserted/updated tuple, let the caller
429 * know which page it landed on.
430 */
431 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
432 *newblkno = ptr->block.blkno;
433
434 data += IndexTupleSize(thistup);
435 }
436
437 /* Set up rightlinks */
438 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
439 GistPageGetOpaque(ptr->page)->rightlink =
440 ptr->next->block.blkno;
441 else
442 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
443
444 /*
445 * Mark the all but the right-most page with the follow-right
446 * flag. It will be cleared as soon as the downlink is inserted
447 * into the parent, but this ensures that if we error out before
448 * that, the index is still consistent. (in buffering build mode,
449 * any error will abort the index build anyway, so this is not
450 * needed.)
451 */
452 if (ptr->next && !is_rootsplit && markfollowright)
453 GistMarkFollowRight(ptr->page);
454 else
455 GistClearFollowRight(ptr->page);
456
457 /*
458 * Copy the NSN of the original page to all pages. The
459 * F_FOLLOW_RIGHT flags ensure that scans will follow the
460 * rightlinks until the downlinks are inserted.
461 */
462 GistPageSetNSN(ptr->page, oldnsn);
463 }
464
465 /*
466 * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
467 * insertion for that. NB: The number of pages and data segments
468 * specified here must match the calculations in gistXLogSplit()!
469 */
470 if (!is_build && RelationNeedsWAL(rel))
471 XLogEnsureRecordSpace(npage, 1 + npage * 2);
472
473 START_CRIT_SECTION();
474
475 /*
476 * Must mark buffers dirty before XLogInsert, even though we'll still
477 * be changing their opaque fields below.
478 */
479 for (ptr = dist; ptr; ptr = ptr->next)
480 MarkBufferDirty(ptr->buffer);
481 if (BufferIsValid(leftchildbuf))
482 MarkBufferDirty(leftchildbuf);
483
484 /*
485 * The first page in the chain was a temporary working copy meant to
486 * replace the old page. Copy it over the old page.
487 */
488 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
489 dist->page = BufferGetPage(dist->buffer);
490
491 /*
492 * Write the WAL record.
493 *
494 * If we're building a new index, however, we don't WAL-log changes
495 * yet. The LSN-NSN interlock between parent and child requires that
496 * LSNs never move backwards, so set the LSNs to a value that's
497 * smaller than any real or fake unlogged LSN that might be generated
498 * later. (There can't be any concurrent scans during index build, so
499 * we don't need to be able to detect concurrent splits yet.)
500 */
501 if (is_build)
502 recptr = GistBuildLSN;
503 else
504 {
505 if (RelationNeedsWAL(rel))
506 recptr = gistXLogSplit(is_leaf,
507 dist, oldrlink, oldnsn, leftchildbuf,
508 markfollowright);
509 else
510 recptr = gistGetFakeLSN(rel);
511 }
512
513 for (ptr = dist; ptr; ptr = ptr->next)
514 PageSetLSN(ptr->page, recptr);
515
516 /*
517 * Return the new child buffers to the caller.
518 *
519 * If this was a root split, we've already inserted the downlink
520 * pointers, in the form of a new root page. Therefore we can release
521 * all the new buffers, and keep just the root page locked.
522 */
523 if (is_rootsplit)
524 {
525 for (ptr = dist->next; ptr; ptr = ptr->next)
526 UnlockReleaseBuffer(ptr->buffer);
527 }
528 }
529 else
530 {
531 /*
532 * Enough space. We always get here if ntup==0.
533 */
534 START_CRIT_SECTION();
535
536 /*
537 * Delete old tuple if any, then insert new tuple(s) if any. If
538 * possible, use the fast path of PageIndexTupleOverwrite.
539 */
540 if (OffsetNumberIsValid(oldoffnum))
541 {
542 if (ntup == 1)
543 {
544 /* One-for-one replacement, so use PageIndexTupleOverwrite */
545 if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
546 IndexTupleSize(*itup)))
547 elog(ERROR, "failed to add item to index page in \"%s\"",
548 RelationGetRelationName(rel));
549 }
550 else
551 {
552 /* Delete old, then append new tuple(s) to page */
553 PageIndexTupleDelete(page, oldoffnum);
554 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
555 }
556 }
557 else
558 {
559 /* Just append new tuples at the end of the page */
560 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
561 }
562
563 MarkBufferDirty(buffer);
564
565 if (BufferIsValid(leftchildbuf))
566 MarkBufferDirty(leftchildbuf);
567
568 if (is_build)
569 recptr = GistBuildLSN;
570 else
571 {
572 if (RelationNeedsWAL(rel))
573 {
574 OffsetNumber ndeloffs = 0,
575 deloffs[1];
576
577 if (OffsetNumberIsValid(oldoffnum))
578 {
579 deloffs[0] = oldoffnum;
580 ndeloffs = 1;
581 }
582
583 recptr = gistXLogUpdate(buffer,
584 deloffs, ndeloffs, itup, ntup,
585 leftchildbuf);
586 }
587 else
588 recptr = gistGetFakeLSN(rel);
589 }
590 PageSetLSN(page, recptr);
591
592 if (newblkno)
593 *newblkno = blkno;
594 }
595
596 /*
597 * If we inserted the downlink for a child page, set NSN and clear
598 * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
599 * follow the rightlink if and only if they looked at the parent page
600 * before we inserted the downlink.
601 *
602 * Note that we do this *after* writing the WAL record. That means that
603 * the possible full page image in the WAL record does not include these
604 * changes, and they must be replayed even if the page is restored from
605 * the full page image. There's a chicken-and-egg problem: if we updated
606 * the child pages first, we wouldn't know the recptr of the WAL record
607 * we're about to write.
608 */
609 if (BufferIsValid(leftchildbuf))
610 {
611 Page leftpg = BufferGetPage(leftchildbuf);
612
613 GistPageSetNSN(leftpg, recptr);
614 GistClearFollowRight(leftpg);
615
616 PageSetLSN(leftpg, recptr);
617 }
618
619 END_CRIT_SECTION();
620
621 return is_split;
622 }
623
624 /*
625 * Workhouse routine for doing insertion into a GiST index. Note that
626 * this routine assumes it is invoked in a short-lived memory context,
627 * so it does not bother releasing palloc'd allocations.
628 */
629 void
gistdoinsert(Relation r,IndexTuple itup,Size freespace,GISTSTATE * giststate,Relation heapRel,bool is_build)630 gistdoinsert(Relation r, IndexTuple itup, Size freespace,
631 GISTSTATE *giststate, Relation heapRel, bool is_build)
632 {
633 ItemId iid;
634 IndexTuple idxtuple;
635 GISTInsertStack firststack;
636 GISTInsertStack *stack;
637 GISTInsertState state;
638 bool xlocked = false;
639
640 memset(&state, 0, sizeof(GISTInsertState));
641 state.freespace = freespace;
642 state.r = r;
643 state.heapRel = heapRel;
644 state.is_build = is_build;
645
646 /* Start from the root */
647 firststack.blkno = GIST_ROOT_BLKNO;
648 firststack.lsn = 0;
649 firststack.retry_from_parent = false;
650 firststack.parent = NULL;
651 firststack.downlinkoffnum = InvalidOffsetNumber;
652 state.stack = stack = &firststack;
653
654 /*
655 * Walk down along the path of smallest penalty, updating the parent
656 * pointers with the key we're inserting as we go. If we crash in the
657 * middle, the tree is consistent, although the possible parent updates
658 * were a waste.
659 */
660 for (;;)
661 {
662 /*
663 * If we split an internal page while descending the tree, we have to
664 * retry at the parent. (Normally, the LSN-NSN interlock below would
665 * also catch this and cause us to retry. But LSNs are not updated
666 * during index build.)
667 */
668 while (stack->retry_from_parent)
669 {
670 if (xlocked)
671 LockBuffer(stack->buffer, GIST_UNLOCK);
672 xlocked = false;
673 ReleaseBuffer(stack->buffer);
674 state.stack = stack = stack->parent;
675 }
676
677 if (XLogRecPtrIsInvalid(stack->lsn))
678 stack->buffer = ReadBuffer(state.r, stack->blkno);
679
680 /*
681 * Be optimistic and grab shared lock first. Swap it for an exclusive
682 * lock later if we need to update the page.
683 */
684 if (!xlocked)
685 {
686 LockBuffer(stack->buffer, GIST_SHARE);
687 gistcheckpage(state.r, stack->buffer);
688 }
689
690 stack->page = (Page) BufferGetPage(stack->buffer);
691 stack->lsn = xlocked ?
692 PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
693 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
694
695 /*
696 * If this page was split but the downlink was never inserted to the
697 * parent because the inserting backend crashed before doing that, fix
698 * that now.
699 */
700 if (GistFollowRight(stack->page))
701 {
702 if (!xlocked)
703 {
704 LockBuffer(stack->buffer, GIST_UNLOCK);
705 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
706 xlocked = true;
707 /* someone might've completed the split when we unlocked */
708 if (!GistFollowRight(stack->page))
709 continue;
710 }
711 gistfixsplit(&state, giststate);
712
713 UnlockReleaseBuffer(stack->buffer);
714 xlocked = false;
715 state.stack = stack = stack->parent;
716 continue;
717 }
718
719 if ((stack->blkno != GIST_ROOT_BLKNO &&
720 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
721 GistPageIsDeleted(stack->page))
722 {
723 /*
724 * Concurrent split or page deletion detected. There's no
725 * guarantee that the downlink for this page is consistent with
726 * the tuple we're inserting anymore, so go back to parent and
727 * rechoose the best child.
728 */
729 UnlockReleaseBuffer(stack->buffer);
730 xlocked = false;
731 state.stack = stack = stack->parent;
732 continue;
733 }
734
735 if (!GistPageIsLeaf(stack->page))
736 {
737 /*
738 * This is an internal page so continue to walk down the tree.
739 * Find the child node that has the minimum insertion penalty.
740 */
741 BlockNumber childblkno;
742 IndexTuple newtup;
743 GISTInsertStack *item;
744 OffsetNumber downlinkoffnum;
745
746 downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
747 iid = PageGetItemId(stack->page, downlinkoffnum);
748 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
749 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
750
751 /*
752 * Check that it's not a leftover invalid tuple from pre-9.1
753 */
754 if (GistTupleIsInvalid(idxtuple))
755 ereport(ERROR,
756 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
757 RelationGetRelationName(r)),
758 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
759 errhint("Please REINDEX it.")));
760
761 /*
762 * Check that the key representing the target child node is
763 * consistent with the key we're inserting. Update it if it's not.
764 */
765 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
766 if (newtup)
767 {
768 /*
769 * Swap shared lock for an exclusive one. Beware, the page may
770 * change while we unlock/lock the page...
771 */
772 if (!xlocked)
773 {
774 LockBuffer(stack->buffer, GIST_UNLOCK);
775 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
776 xlocked = true;
777 stack->page = (Page) BufferGetPage(stack->buffer);
778
779 if (PageGetLSN(stack->page) != stack->lsn)
780 {
781 /* the page was changed while we unlocked it, retry */
782 continue;
783 }
784 }
785
786 /*
787 * Update the tuple.
788 *
789 * We still hold the lock after gistinserttuple(), but it
790 * might have to split the page to make the updated tuple fit.
791 * In that case the updated tuple might migrate to the other
792 * half of the split, so we have to go back to the parent and
793 * descend back to the half that's a better fit for the new
794 * tuple.
795 */
796 if (gistinserttuple(&state, stack, giststate, newtup,
797 downlinkoffnum))
798 {
799 /*
800 * If this was a root split, the root page continues to be
801 * the parent and the updated tuple went to one of the
802 * child pages, so we just need to retry from the root
803 * page.
804 */
805 if (stack->blkno != GIST_ROOT_BLKNO)
806 {
807 UnlockReleaseBuffer(stack->buffer);
808 xlocked = false;
809 state.stack = stack = stack->parent;
810 }
811 continue;
812 }
813 }
814 LockBuffer(stack->buffer, GIST_UNLOCK);
815 xlocked = false;
816
817 /* descend to the chosen child */
818 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
819 item->blkno = childblkno;
820 item->parent = stack;
821 item->downlinkoffnum = downlinkoffnum;
822 state.stack = stack = item;
823 }
824 else
825 {
826 /*
827 * Leaf page. Insert the new key. We've already updated all the
828 * parents on the way down, but we might have to split the page if
829 * it doesn't fit. gistinserttuple() will take care of that.
830 */
831
832 /*
833 * Swap shared lock for an exclusive one. Be careful, the page may
834 * change while we unlock/lock the page...
835 */
836 if (!xlocked)
837 {
838 LockBuffer(stack->buffer, GIST_UNLOCK);
839 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
840 xlocked = true;
841 stack->page = (Page) BufferGetPage(stack->buffer);
842 stack->lsn = PageGetLSN(stack->page);
843
844 if (stack->blkno == GIST_ROOT_BLKNO)
845 {
846 /*
847 * the only page that can become inner instead of leaf is
848 * the root page, so for root we should recheck it
849 */
850 if (!GistPageIsLeaf(stack->page))
851 {
852 /*
853 * very rare situation: during unlock/lock index with
854 * number of pages = 1 was increased
855 */
856 LockBuffer(stack->buffer, GIST_UNLOCK);
857 xlocked = false;
858 continue;
859 }
860
861 /*
862 * we don't need to check root split, because checking
863 * leaf/inner is enough to recognize split for root
864 */
865 }
866 else if ((GistFollowRight(stack->page) ||
867 stack->parent->lsn < GistPageGetNSN(stack->page)) ||
868 GistPageIsDeleted(stack->page))
869 {
870 /*
871 * The page was split or deleted while we momentarily
872 * unlocked the page. Go back to parent.
873 */
874 UnlockReleaseBuffer(stack->buffer);
875 xlocked = false;
876 state.stack = stack = stack->parent;
877 continue;
878 }
879 }
880
881 /* now state.stack->(page, buffer and blkno) points to leaf page */
882
883 gistinserttuple(&state, stack, giststate, itup,
884 InvalidOffsetNumber);
885 LockBuffer(stack->buffer, GIST_UNLOCK);
886
887 /* Release any pins we might still hold before exiting */
888 for (; stack; stack = stack->parent)
889 ReleaseBuffer(stack->buffer);
890 break;
891 }
892 }
893 }
894
895 /*
896 * Traverse the tree to find path from root page to specified "child" block.
897 *
898 * returns a new insertion stack, starting from the parent of "child", up
899 * to the root. *downlinkoffnum is set to the offset of the downlink in the
900 * direct parent of child.
901 *
902 * To prevent deadlocks, this should lock only one page at a time.
903 */
904 static GISTInsertStack *
gistFindPath(Relation r,BlockNumber child,OffsetNumber * downlinkoffnum)905 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
906 {
907 Page page;
908 Buffer buffer;
909 OffsetNumber i,
910 maxoff;
911 ItemId iid;
912 IndexTuple idxtuple;
913 List *fifo;
914 GISTInsertStack *top,
915 *ptr;
916 BlockNumber blkno;
917
918 top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
919 top->blkno = GIST_ROOT_BLKNO;
920 top->downlinkoffnum = InvalidOffsetNumber;
921
922 fifo = list_make1(top);
923 while (fifo != NIL)
924 {
925 /* Get next page to visit */
926 top = linitial(fifo);
927 fifo = list_delete_first(fifo);
928
929 buffer = ReadBuffer(r, top->blkno);
930 LockBuffer(buffer, GIST_SHARE);
931 gistcheckpage(r, buffer);
932 page = (Page) BufferGetPage(buffer);
933
934 if (GistPageIsLeaf(page))
935 {
936 /*
937 * Because we scan the index top-down, all the rest of the pages
938 * in the queue must be leaf pages as well.
939 */
940 UnlockReleaseBuffer(buffer);
941 break;
942 }
943
944 /* currently, internal pages are never deleted */
945 Assert(!GistPageIsDeleted(page));
946
947 top->lsn = BufferGetLSNAtomic(buffer);
948
949 /*
950 * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
951 * downlink. This should not normally happen..
952 */
953 if (GistFollowRight(page))
954 elog(ERROR, "concurrent GiST page split was incomplete");
955
956 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
957 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
958 {
959 /*
960 * Page was split while we looked elsewhere. We didn't see the
961 * downlink to the right page when we scanned the parent, so add
962 * it to the queue now.
963 *
964 * Put the right page ahead of the queue, so that we visit it
965 * next. That's important, because if this is the lowest internal
966 * level, just above leaves, we might already have queued up some
967 * leaf pages, and we assume that there can't be any non-leaf
968 * pages behind leaf pages.
969 */
970 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
971 ptr->blkno = GistPageGetOpaque(page)->rightlink;
972 ptr->downlinkoffnum = InvalidOffsetNumber;
973 ptr->parent = top->parent;
974
975 fifo = lcons(ptr, fifo);
976 }
977
978 maxoff = PageGetMaxOffsetNumber(page);
979
980 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
981 {
982 iid = PageGetItemId(page, i);
983 idxtuple = (IndexTuple) PageGetItem(page, iid);
984 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
985 if (blkno == child)
986 {
987 /* Found it! */
988 UnlockReleaseBuffer(buffer);
989 *downlinkoffnum = i;
990 return top;
991 }
992 else
993 {
994 /* Append this child to the list of pages to visit later */
995 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
996 ptr->blkno = blkno;
997 ptr->downlinkoffnum = i;
998 ptr->parent = top;
999
1000 fifo = lappend(fifo, ptr);
1001 }
1002 }
1003
1004 UnlockReleaseBuffer(buffer);
1005 }
1006
1007 elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
1008 RelationGetRelationName(r), child);
1009 return NULL; /* keep compiler quiet */
1010 }
1011
1012 /*
1013 * Updates the stack so that child->parent is the correct parent of the
1014 * child. child->parent must be exclusively locked on entry, and will
1015 * remain so at exit, but it might not be the same page anymore.
1016 */
1017 static void
gistFindCorrectParent(Relation r,GISTInsertStack * child)1018 gistFindCorrectParent(Relation r, GISTInsertStack *child)
1019 {
1020 GISTInsertStack *parent = child->parent;
1021
1022 gistcheckpage(r, parent->buffer);
1023 parent->page = (Page) BufferGetPage(parent->buffer);
1024
1025 /* here we don't need to distinguish between split and page update */
1026 if (child->downlinkoffnum == InvalidOffsetNumber ||
1027 parent->lsn != PageGetLSN(parent->page))
1028 {
1029 /* parent is changed, look child in right links until found */
1030 OffsetNumber i,
1031 maxoff;
1032 ItemId iid;
1033 IndexTuple idxtuple;
1034 GISTInsertStack *ptr;
1035
1036 while (true)
1037 {
1038 maxoff = PageGetMaxOffsetNumber(parent->page);
1039 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1040 {
1041 iid = PageGetItemId(parent->page, i);
1042 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1043 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1044 {
1045 /* yes!!, found */
1046 child->downlinkoffnum = i;
1047 return;
1048 }
1049 }
1050
1051 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1052 UnlockReleaseBuffer(parent->buffer);
1053 if (parent->blkno == InvalidBlockNumber)
1054 {
1055 /*
1056 * End of chain and still didn't find parent. It's a very-very
1057 * rare situation when root splitted.
1058 */
1059 break;
1060 }
1061 parent->buffer = ReadBuffer(r, parent->blkno);
1062 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1063 gistcheckpage(r, parent->buffer);
1064 parent->page = (Page) BufferGetPage(parent->buffer);
1065 }
1066
1067 /*
1068 * awful!!, we need search tree to find parent ... , but before we
1069 * should release all old parent
1070 */
1071
1072 ptr = child->parent->parent; /* child->parent already released
1073 * above */
1074 while (ptr)
1075 {
1076 ReleaseBuffer(ptr->buffer);
1077 ptr = ptr->parent;
1078 }
1079
1080 /* ok, find new path */
1081 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1082
1083 /* read all buffers as expected by caller */
1084 /* note we don't lock them or gistcheckpage them here! */
1085 while (ptr)
1086 {
1087 ptr->buffer = ReadBuffer(r, ptr->blkno);
1088 ptr->page = (Page) BufferGetPage(ptr->buffer);
1089 ptr = ptr->parent;
1090 }
1091
1092 /* install new chain of parents to stack */
1093 child->parent = parent;
1094
1095 /* make recursive call to normal processing */
1096 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1097 gistFindCorrectParent(r, child);
1098 }
1099 }
1100
1101 /*
1102 * Form a downlink pointer for the page in 'buf'.
1103 */
1104 static IndexTuple
gistformdownlink(Relation rel,Buffer buf,GISTSTATE * giststate,GISTInsertStack * stack)1105 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1106 GISTInsertStack *stack)
1107 {
1108 Page page = BufferGetPage(buf);
1109 OffsetNumber maxoff;
1110 OffsetNumber offset;
1111 IndexTuple downlink = NULL;
1112
1113 maxoff = PageGetMaxOffsetNumber(page);
1114 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1115 {
1116 IndexTuple ituple = (IndexTuple)
1117 PageGetItem(page, PageGetItemId(page, offset));
1118
1119 if (downlink == NULL)
1120 downlink = CopyIndexTuple(ituple);
1121 else
1122 {
1123 IndexTuple newdownlink;
1124
1125 newdownlink = gistgetadjusted(rel, downlink, ituple,
1126 giststate);
1127 if (newdownlink)
1128 downlink = newdownlink;
1129 }
1130 }
1131
1132 /*
1133 * If the page is completely empty, we can't form a meaningful downlink
1134 * for it. But we have to insert a downlink for the page. Any key will do,
1135 * as long as its consistent with the downlink of parent page, so that we
1136 * can legally insert it to the parent. A minimal one that matches as few
1137 * scans as possible would be best, to keep scans from doing useless work,
1138 * but we don't know how to construct that. So we just use the downlink of
1139 * the original page that was split - that's as far from optimal as it can
1140 * get but will do..
1141 */
1142 if (!downlink)
1143 {
1144 ItemId iid;
1145
1146 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1147 gistFindCorrectParent(rel, stack);
1148 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1149 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1150 downlink = CopyIndexTuple(downlink);
1151 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1152 }
1153
1154 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1155 GistTupleSetValid(downlink);
1156
1157 return downlink;
1158 }
1159
1160
1161 /*
1162 * Complete the incomplete split of state->stack->page.
1163 */
1164 static void
gistfixsplit(GISTInsertState * state,GISTSTATE * giststate)1165 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1166 {
1167 GISTInsertStack *stack = state->stack;
1168 Buffer buf;
1169 Page page;
1170 List *splitinfo = NIL;
1171
1172 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1173 RelationGetRelationName(state->r), stack->blkno);
1174
1175 Assert(GistFollowRight(stack->page));
1176 Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1177
1178 buf = stack->buffer;
1179
1180 /*
1181 * Read the chain of split pages, following the rightlinks. Construct a
1182 * downlink tuple for each page.
1183 */
1184 for (;;)
1185 {
1186 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1187 IndexTuple downlink;
1188
1189 page = BufferGetPage(buf);
1190
1191 /* Form the new downlink tuples to insert to parent */
1192 downlink = gistformdownlink(state->r, buf, giststate, stack);
1193
1194 si->buf = buf;
1195 si->downlink = downlink;
1196
1197 splitinfo = lappend(splitinfo, si);
1198
1199 if (GistFollowRight(page))
1200 {
1201 /* lock next page */
1202 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1203 LockBuffer(buf, GIST_EXCLUSIVE);
1204 }
1205 else
1206 break;
1207 }
1208
1209 /* Insert the downlinks */
1210 gistfinishsplit(state, stack, giststate, splitinfo, false);
1211 }
1212
1213 /*
1214 * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1215 * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1216 * 'stack' represents the path from the root to the page being updated.
1217 *
1218 * The caller must hold an exclusive lock on stack->buffer. The lock is still
1219 * held on return, but the page might not contain the inserted tuple if the
1220 * page was split. The function returns true if the page was split, false
1221 * otherwise.
1222 */
1223 static bool
gistinserttuple(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple tuple,OffsetNumber oldoffnum)1224 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1225 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1226 {
1227 return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1228 InvalidBuffer, InvalidBuffer, false, false);
1229 }
1230
1231 /* ----------------
1232 * An extended workhorse version of gistinserttuple(). This version allows
1233 * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1234 * This is used to recursively update the downlinks in the parent when a page
1235 * is split.
1236 *
1237 * If leftchild and rightchild are valid, we're inserting/replacing the
1238 * downlink for rightchild, and leftchild is its left sibling. We clear the
1239 * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1240 * insertion of the downlink.
1241 *
1242 * To avoid holding locks for longer than necessary, when recursing up the
1243 * tree to update the parents, the locking is a bit peculiar here. On entry,
1244 * the caller must hold an exclusive lock on stack->buffer, as well as
1245 * leftchild and rightchild if given. On return:
1246 *
1247 * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1248 * always kept pinned, however.
1249 * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1250 * is kept pinned.
1251 * - Lock and pin on 'rightchild' are always released.
1252 *
1253 * Returns 'true' if the page had to be split. Note that if the page was
1254 * split, the inserted/updated tuples might've been inserted to a right
1255 * sibling of stack->buffer instead of stack->buffer itself.
1256 */
1257 static bool
gistinserttuples(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,IndexTuple * tuples,int ntup,OffsetNumber oldoffnum,Buffer leftchild,Buffer rightchild,bool unlockbuf,bool unlockleftchild)1258 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1259 GISTSTATE *giststate,
1260 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1261 Buffer leftchild, Buffer rightchild,
1262 bool unlockbuf, bool unlockleftchild)
1263 {
1264 List *splitinfo;
1265 bool is_split;
1266
1267 /*
1268 * Check for any rw conflicts (in serializable isolation level) just
1269 * before we intend to modify the page
1270 */
1271 CheckForSerializableConflictIn(state->r, NULL, BufferGetBlockNumber(stack->buffer));
1272
1273 /* Insert the tuple(s) to the page, splitting the page if necessary */
1274 is_split = gistplacetopage(state->r, state->freespace, giststate,
1275 stack->buffer,
1276 tuples, ntup,
1277 oldoffnum, NULL,
1278 leftchild,
1279 &splitinfo,
1280 true,
1281 state->heapRel,
1282 state->is_build);
1283
1284 /*
1285 * Before recursing up in case the page was split, release locks on the
1286 * child pages. We don't need to keep them locked when updating the
1287 * parent.
1288 */
1289 if (BufferIsValid(rightchild))
1290 UnlockReleaseBuffer(rightchild);
1291 if (BufferIsValid(leftchild) && unlockleftchild)
1292 LockBuffer(leftchild, GIST_UNLOCK);
1293
1294 /*
1295 * If we had to split, insert/update the downlinks in the parent. If the
1296 * caller requested us to release the lock on stack->buffer, tell
1297 * gistfinishsplit() to do that as soon as it's safe to do so. If we
1298 * didn't have to split, release it ourselves.
1299 */
1300 if (splitinfo)
1301 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1302 else if (unlockbuf)
1303 LockBuffer(stack->buffer, GIST_UNLOCK);
1304
1305 return is_split;
1306 }
1307
1308 /*
1309 * Finish an incomplete split by inserting/updating the downlinks in parent
1310 * page. 'splitinfo' contains all the child pages involved in the split,
1311 * from left-to-right.
1312 *
1313 * On entry, the caller must hold a lock on stack->buffer and all the child
1314 * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1315 * released on return. The child pages are always unlocked and unpinned.
1316 */
1317 static void
gistfinishsplit(GISTInsertState * state,GISTInsertStack * stack,GISTSTATE * giststate,List * splitinfo,bool unlockbuf)1318 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1319 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1320 {
1321 GISTPageSplitInfo *right;
1322 GISTPageSplitInfo *left;
1323 IndexTuple tuples[2];
1324
1325 /* A split always contains at least two halves */
1326 Assert(list_length(splitinfo) >= 2);
1327
1328 /*
1329 * We need to insert downlinks for each new page, and update the downlink
1330 * for the original (leftmost) page in the split. Begin at the rightmost
1331 * page, inserting one downlink at a time until there's only two pages
1332 * left. Finally insert the downlink for the last new page and update the
1333 * downlink for the original page as one operation.
1334 */
1335 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1336
1337 /*
1338 * Insert downlinks for the siblings from right to left, until there are
1339 * only two siblings left.
1340 */
1341 for (int pos = list_length(splitinfo) - 1; pos > 1; pos--)
1342 {
1343 right = (GISTPageSplitInfo *) list_nth(splitinfo, pos);
1344 left = (GISTPageSplitInfo *) list_nth(splitinfo, pos - 1);
1345
1346 gistFindCorrectParent(state->r, stack);
1347 if (gistinserttuples(state, stack->parent, giststate,
1348 &right->downlink, 1,
1349 InvalidOffsetNumber,
1350 left->buf, right->buf, false, false))
1351 {
1352 /*
1353 * If the parent page was split, the existing downlink might have
1354 * moved.
1355 */
1356 stack->downlinkoffnum = InvalidOffsetNumber;
1357 }
1358 /* gistinserttuples() released the lock on right->buf. */
1359 }
1360
1361 right = (GISTPageSplitInfo *) lsecond(splitinfo);
1362 left = (GISTPageSplitInfo *) linitial(splitinfo);
1363
1364 /*
1365 * Finally insert downlink for the remaining right page and update the
1366 * downlink for the original page to not contain the tuples that were
1367 * moved to the new pages.
1368 */
1369 tuples[0] = left->downlink;
1370 tuples[1] = right->downlink;
1371 gistFindCorrectParent(state->r, stack);
1372 if (gistinserttuples(state, stack->parent, giststate,
1373 tuples, 2,
1374 stack->downlinkoffnum,
1375 left->buf, right->buf,
1376 true, /* Unlock parent */
1377 unlockbuf /* Unlock stack->buffer if caller wants
1378 * that */
1379 ))
1380 {
1381 /*
1382 * If the parent page was split, the downlink might have moved.
1383 */
1384 stack->downlinkoffnum = InvalidOffsetNumber;
1385 }
1386
1387 Assert(left->buf == stack->buffer);
1388
1389 /*
1390 * If we split the page because we had to adjust the downlink on an
1391 * internal page, while descending the tree for inserting a new tuple,
1392 * then this might no longer be the correct page for the new tuple. The
1393 * downlink to this page might not cover the new tuple anymore, it might
1394 * need to go to the newly-created right sibling instead. Tell the caller
1395 * to walk back up the stack, to re-check at the parent which page to
1396 * insert to.
1397 *
1398 * Normally, the LSN-NSN interlock during the tree descend would also
1399 * detect that a concurrent split happened (by ourselves), and cause us to
1400 * retry at the parent. But that mechanism doesn't work during index
1401 * build, because we don't do WAL-logging, and don't update LSNs, during
1402 * index build.
1403 */
1404 stack->retry_from_parent = true;
1405 }
1406
1407 /*
1408 * gistSplit -- split a page in the tree and fill struct
1409 * used for XLOG and real writes buffers. Function is recursive, ie
1410 * it will split page until keys will fit in every page.
1411 */
1412 SplitedPageLayout *
gistSplit(Relation r,Page page,IndexTuple * itup,int len,GISTSTATE * giststate)1413 gistSplit(Relation r,
1414 Page page,
1415 IndexTuple *itup, /* contains compressed entry */
1416 int len,
1417 GISTSTATE *giststate)
1418 {
1419 IndexTuple *lvectup,
1420 *rvectup;
1421 GistSplitVector v;
1422 int i;
1423 SplitedPageLayout *res = NULL;
1424
1425 /* this should never recurse very deeply, but better safe than sorry */
1426 check_stack_depth();
1427
1428 /* there's no point in splitting an empty page */
1429 Assert(len > 0);
1430
1431 /*
1432 * If a single tuple doesn't fit on a page, no amount of splitting will
1433 * help.
1434 */
1435 if (len == 1)
1436 ereport(ERROR,
1437 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1438 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1439 IndexTupleSize(itup[0]), GiSTPageSize,
1440 RelationGetRelationName(r))));
1441
1442 memset(v.spl_lisnull, true,
1443 sizeof(bool) * giststate->nonLeafTupdesc->natts);
1444 memset(v.spl_risnull, true,
1445 sizeof(bool) * giststate->nonLeafTupdesc->natts);
1446 gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1447
1448 /* form left and right vector */
1449 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1450 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1451
1452 for (i = 0; i < v.splitVector.spl_nleft; i++)
1453 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1454
1455 for (i = 0; i < v.splitVector.spl_nright; i++)
1456 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1457
1458 /* finalize splitting (may need another split) */
1459 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1460 {
1461 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1462 }
1463 else
1464 {
1465 ROTATEDIST(res);
1466 res->block.num = v.splitVector.spl_nright;
1467 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1468 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1469 }
1470
1471 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1472 {
1473 SplitedPageLayout *resptr,
1474 *subres;
1475
1476 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1477
1478 /* install on list's tail */
1479 while (resptr->next)
1480 resptr = resptr->next;
1481
1482 resptr->next = res;
1483 res = subres;
1484 }
1485 else
1486 {
1487 ROTATEDIST(res);
1488 res->block.num = v.splitVector.spl_nleft;
1489 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1490 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1491 }
1492
1493 return res;
1494 }
1495
1496 /*
1497 * Create a GISTSTATE and fill it with information about the index
1498 */
1499 GISTSTATE *
initGISTstate(Relation index)1500 initGISTstate(Relation index)
1501 {
1502 GISTSTATE *giststate;
1503 MemoryContext scanCxt;
1504 MemoryContext oldCxt;
1505 int i;
1506
1507 /* safety check to protect fixed-size arrays in GISTSTATE */
1508 if (index->rd_att->natts > INDEX_MAX_KEYS)
1509 elog(ERROR, "numberOfAttributes %d > %d",
1510 index->rd_att->natts, INDEX_MAX_KEYS);
1511
1512 /* Create the memory context that will hold the GISTSTATE */
1513 scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1514 "GiST scan context",
1515 ALLOCSET_DEFAULT_SIZES);
1516 oldCxt = MemoryContextSwitchTo(scanCxt);
1517
1518 /* Create and fill in the GISTSTATE */
1519 giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1520
1521 giststate->scanCxt = scanCxt;
1522 giststate->tempCxt = scanCxt; /* caller must change this if needed */
1523 giststate->leafTupdesc = index->rd_att;
1524
1525 /*
1526 * The truncated tupdesc for non-leaf index tuples, which doesn't contain
1527 * the INCLUDE attributes.
1528 *
1529 * It is used to form tuples during tuple adjustment and page split.
1530 * B-tree creates shortened tuple descriptor for every truncated tuple,
1531 * because it is doing this less often: it does not have to form truncated
1532 * tuples during page split. Also, B-tree is not adjusting tuples on
1533 * internal pages the way GiST does.
1534 */
1535 giststate->nonLeafTupdesc = CreateTupleDescCopyConstr(index->rd_att);
1536 giststate->nonLeafTupdesc->natts =
1537 IndexRelationGetNumberOfKeyAttributes(index);
1538
1539 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(index); i++)
1540 {
1541 fmgr_info_copy(&(giststate->consistentFn[i]),
1542 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1543 scanCxt);
1544 fmgr_info_copy(&(giststate->unionFn[i]),
1545 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1546 scanCxt);
1547
1548 /* opclasses are not required to provide a Compress method */
1549 if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1550 fmgr_info_copy(&(giststate->compressFn[i]),
1551 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1552 scanCxt);
1553 else
1554 giststate->compressFn[i].fn_oid = InvalidOid;
1555
1556 /* opclasses are not required to provide a Decompress method */
1557 if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1558 fmgr_info_copy(&(giststate->decompressFn[i]),
1559 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1560 scanCxt);
1561 else
1562 giststate->decompressFn[i].fn_oid = InvalidOid;
1563
1564 fmgr_info_copy(&(giststate->penaltyFn[i]),
1565 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1566 scanCxt);
1567 fmgr_info_copy(&(giststate->picksplitFn[i]),
1568 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1569 scanCxt);
1570 fmgr_info_copy(&(giststate->equalFn[i]),
1571 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1572 scanCxt);
1573
1574 /* opclasses are not required to provide a Distance method */
1575 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1576 fmgr_info_copy(&(giststate->distanceFn[i]),
1577 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1578 scanCxt);
1579 else
1580 giststate->distanceFn[i].fn_oid = InvalidOid;
1581
1582 /* opclasses are not required to provide a Fetch method */
1583 if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1584 fmgr_info_copy(&(giststate->fetchFn[i]),
1585 index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1586 scanCxt);
1587 else
1588 giststate->fetchFn[i].fn_oid = InvalidOid;
1589
1590 /*
1591 * If the index column has a specified collation, we should honor that
1592 * while doing comparisons. However, we may have a collatable storage
1593 * type for a noncollatable indexed data type. If there's no index
1594 * collation then specify default collation in case the support
1595 * functions need collation. This is harmless if the support
1596 * functions don't care about collation, so we just do it
1597 * unconditionally. (We could alternatively call get_typcollation,
1598 * but that seems like expensive overkill --- there aren't going to be
1599 * any cases where a GiST storage type has a nondefault collation.)
1600 */
1601 if (OidIsValid(index->rd_indcollation[i]))
1602 giststate->supportCollation[i] = index->rd_indcollation[i];
1603 else
1604 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1605 }
1606
1607 /* No opclass information for INCLUDE attributes */
1608 for (; i < index->rd_att->natts; i++)
1609 {
1610 giststate->consistentFn[i].fn_oid = InvalidOid;
1611 giststate->unionFn[i].fn_oid = InvalidOid;
1612 giststate->compressFn[i].fn_oid = InvalidOid;
1613 giststate->decompressFn[i].fn_oid = InvalidOid;
1614 giststate->penaltyFn[i].fn_oid = InvalidOid;
1615 giststate->picksplitFn[i].fn_oid = InvalidOid;
1616 giststate->equalFn[i].fn_oid = InvalidOid;
1617 giststate->distanceFn[i].fn_oid = InvalidOid;
1618 giststate->fetchFn[i].fn_oid = InvalidOid;
1619 giststate->supportCollation[i] = InvalidOid;
1620 }
1621
1622 MemoryContextSwitchTo(oldCxt);
1623
1624 return giststate;
1625 }
1626
1627 void
freeGISTstate(GISTSTATE * giststate)1628 freeGISTstate(GISTSTATE *giststate)
1629 {
1630 /* It's sufficient to delete the scanCxt */
1631 MemoryContextDelete(giststate->scanCxt);
1632 }
1633
1634 /*
1635 * gistprunepage() -- try to remove LP_DEAD items from the given page.
1636 * Function assumes that buffer is exclusively locked.
1637 */
1638 static void
gistprunepage(Relation rel,Page page,Buffer buffer,Relation heapRel)1639 gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1640 {
1641 OffsetNumber deletable[MaxIndexTuplesPerPage];
1642 int ndeletable = 0;
1643 OffsetNumber offnum,
1644 maxoff;
1645 TransactionId latestRemovedXid = InvalidTransactionId;
1646
1647 Assert(GistPageIsLeaf(page));
1648
1649 /*
1650 * Scan over all items to see which ones need to be deleted according to
1651 * LP_DEAD flags.
1652 */
1653 maxoff = PageGetMaxOffsetNumber(page);
1654 for (offnum = FirstOffsetNumber;
1655 offnum <= maxoff;
1656 offnum = OffsetNumberNext(offnum))
1657 {
1658 ItemId itemId = PageGetItemId(page, offnum);
1659
1660 if (ItemIdIsDead(itemId))
1661 deletable[ndeletable++] = offnum;
1662 }
1663
1664 if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
1665 latestRemovedXid =
1666 index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
1667 deletable, ndeletable);
1668
1669 if (ndeletable > 0)
1670 {
1671 START_CRIT_SECTION();
1672
1673 PageIndexMultiDelete(page, deletable, ndeletable);
1674
1675 /*
1676 * Mark the page as not containing any LP_DEAD items. This is not
1677 * certainly true (there might be some that have recently been marked,
1678 * but weren't included in our target-item list), but it will almost
1679 * always be true and it doesn't seem worth an additional page scan to
1680 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1681 */
1682 GistClearPageHasGarbage(page);
1683
1684 MarkBufferDirty(buffer);
1685
1686 /* XLOG stuff */
1687 if (RelationNeedsWAL(rel))
1688 {
1689 XLogRecPtr recptr;
1690
1691 recptr = gistXLogDelete(buffer,
1692 deletable, ndeletable,
1693 latestRemovedXid);
1694
1695 PageSetLSN(page, recptr);
1696 }
1697 else
1698 PageSetLSN(page, gistGetFakeLSN(rel));
1699
1700 END_CRIT_SECTION();
1701 }
1702
1703 /*
1704 * Note: if we didn't find any LP_DEAD items, then the page's
1705 * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1706 * separate write to clear it, however. We will clear it when we split
1707 * the page.
1708 */
1709 }
1710