1 /*-------------------------------------------------------------------------
2 *
3 * spgdoinsert.c
4 * implementation of insert algorithm
5 *
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgdoinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/xloginsert.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "utils/rel.h"
24
25
26 /*
27 * SPPageDesc tracks all info about a page we are inserting into. In some
28 * situations it actually identifies a tuple, or even a specific node within
29 * an inner tuple. But any of the fields can be invalid. If the buffer
30 * field is valid, it implies we hold pin and exclusive lock on that buffer.
31 * page pointer should be valid exactly when buffer is.
32 */
33 typedef struct SPPageDesc
34 {
35 BlockNumber blkno; /* block number, or InvalidBlockNumber */
36 Buffer buffer; /* page's buffer number, or InvalidBuffer */
37 Page page; /* pointer to page buffer, or NULL */
38 OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
39 int node; /* node number within inner tuple, or -1 */
40 } SPPageDesc;
41
42
43 /*
44 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
45 * is used to update the parent inner tuple's downlink after a move or
46 * split operation.
47 */
48 void
spgUpdateNodeLink(SpGistInnerTuple tup,int nodeN,BlockNumber blkno,OffsetNumber offset)49 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
50 BlockNumber blkno, OffsetNumber offset)
51 {
52 int i;
53 SpGistNodeTuple node;
54
55 SGITITERATE(tup, i, node)
56 {
57 if (i == nodeN)
58 {
59 ItemPointerSet(&node->t_tid, blkno, offset);
60 return;
61 }
62 }
63
64 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
65 nodeN);
66 }
67
68 /*
69 * Form a new inner tuple containing one more node than the given one, with
70 * the specified label datum, inserted at offset "offset" in the node array.
71 * The new tuple's prefix is the same as the old one's.
72 *
73 * Note that the new node initially has an invalid downlink. We'll find a
74 * page to point it to later.
75 */
76 static SpGistInnerTuple
addNode(SpGistState * state,SpGistInnerTuple tuple,Datum label,int offset)77 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
78 {
79 SpGistNodeTuple node,
80 *nodes;
81 int i;
82
83 /* if offset is negative, insert at end */
84 if (offset < 0)
85 offset = tuple->nNodes;
86 else if (offset > tuple->nNodes)
87 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
88
89 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
90 SGITITERATE(tuple, i, node)
91 {
92 if (i < offset)
93 nodes[i] = node;
94 else
95 nodes[i + 1] = node;
96 }
97
98 nodes[offset] = spgFormNodeTuple(state, label, false);
99
100 return spgFormInnerTuple(state,
101 (tuple->prefixSize > 0),
102 SGITDATUM(tuple, state),
103 tuple->nNodes + 1,
104 nodes);
105 }
106
107 /* qsort comparator for sorting OffsetNumbers */
108 static int
cmpOffsetNumbers(const void * a,const void * b)109 cmpOffsetNumbers(const void *a, const void *b)
110 {
111 if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
112 return 0;
113 return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
114 }
115
116 /*
117 * Delete multiple tuples from an index page, preserving tuple offset numbers.
118 *
119 * The first tuple in the given list is replaced with a dead tuple of type
120 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
121 * with dead tuples of type "reststate". If either firststate or reststate
122 * is REDIRECT, blkno/offnum specify where to link to.
123 *
124 * NB: this is used during WAL replay, so beware of trying to make it too
125 * smart. In particular, it shouldn't use "state" except for calling
126 * spgFormDeadTuple(). This is also used in a critical section, so no
127 * pallocs either!
128 */
129 void
spgPageIndexMultiDelete(SpGistState * state,Page page,OffsetNumber * itemnos,int nitems,int firststate,int reststate,BlockNumber blkno,OffsetNumber offnum)130 spgPageIndexMultiDelete(SpGistState *state, Page page,
131 OffsetNumber *itemnos, int nitems,
132 int firststate, int reststate,
133 BlockNumber blkno, OffsetNumber offnum)
134 {
135 OffsetNumber firstItem;
136 OffsetNumber sortednos[MaxIndexTuplesPerPage];
137 SpGistDeadTuple tuple = NULL;
138 int i;
139
140 if (nitems == 0)
141 return; /* nothing to do */
142
143 /*
144 * For efficiency we want to use PageIndexMultiDelete, which requires the
145 * targets to be listed in sorted order, so we have to sort the itemnos
146 * array. (This also greatly simplifies the math for reinserting the
147 * replacement tuples.) However, we must not scribble on the caller's
148 * array, so we have to make a copy.
149 */
150 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
151 if (nitems > 1)
152 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
153
154 PageIndexMultiDelete(page, sortednos, nitems);
155
156 firstItem = itemnos[0];
157
158 for (i = 0; i < nitems; i++)
159 {
160 OffsetNumber itemno = sortednos[i];
161 int tupstate;
162
163 tupstate = (itemno == firstItem) ? firststate : reststate;
164 if (tuple == NULL || tuple->tupstate != tupstate)
165 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
166
167 if (PageAddItem(page, (Item) tuple, tuple->size,
168 itemno, false, false) != itemno)
169 elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 tuple->size);
171
172 if (tupstate == SPGIST_REDIRECT)
173 SpGistPageGetOpaque(page)->nRedirection++;
174 else if (tupstate == SPGIST_PLACEHOLDER)
175 SpGistPageGetOpaque(page)->nPlaceholder++;
176 }
177 }
178
179 /*
180 * Update the parent inner tuple's downlink, and mark the parent buffer
181 * dirty (this must be the last change to the parent page in the current
182 * WAL action).
183 */
184 static void
saveNodeLink(Relation index,SPPageDesc * parent,BlockNumber blkno,OffsetNumber offnum)185 saveNodeLink(Relation index, SPPageDesc *parent,
186 BlockNumber blkno, OffsetNumber offnum)
187 {
188 SpGistInnerTuple innerTuple;
189
190 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191 PageGetItemId(parent->page, parent->offnum));
192
193 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194
195 MarkBufferDirty(parent->buffer);
196 }
197
198 /*
199 * Add a leaf tuple to a leaf page where there is known to be room for it
200 */
201 static void
addLeafTuple(Relation index,SpGistState * state,SpGistLeafTuple leafTuple,SPPageDesc * current,SPPageDesc * parent,bool isNulls,bool isNew)202 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204 {
205 spgxlogAddLeaf xlrec;
206
207 xlrec.newPage = isNew;
208 xlrec.storesNulls = isNulls;
209
210 /* these will be filled below as needed */
211 xlrec.offnumLeaf = InvalidOffsetNumber;
212 xlrec.offnumHeadLeaf = InvalidOffsetNumber;
213 xlrec.offnumParent = InvalidOffsetNumber;
214 xlrec.nodeI = 0;
215
216 START_CRIT_SECTION();
217
218 if (current->offnum == InvalidOffsetNumber ||
219 SpGistBlockIsRoot(current->blkno))
220 {
221 /* Tuple is not part of a chain */
222 leafTuple->nextOffset = InvalidOffsetNumber;
223 current->offnum = SpGistPageAddNewItem(state, current->page,
224 (Item) leafTuple, leafTuple->size,
225 NULL, false);
226
227 xlrec.offnumLeaf = current->offnum;
228
229 /* Must update parent's downlink if any */
230 if (parent->buffer != InvalidBuffer)
231 {
232 xlrec.offnumParent = parent->offnum;
233 xlrec.nodeI = parent->node;
234
235 saveNodeLink(index, parent, current->blkno, current->offnum);
236 }
237 }
238 else
239 {
240 /*
241 * Tuple must be inserted into existing chain. We mustn't change the
242 * chain's head address, but we don't need to chase the entire chain
243 * to put the tuple at the end; we can insert it second.
244 *
245 * Also, it's possible that the "chain" consists only of a DEAD tuple,
246 * in which case we should replace the DEAD tuple in-place.
247 */
248 SpGistLeafTuple head;
249 OffsetNumber offnum;
250
251 head = (SpGistLeafTuple) PageGetItem(current->page,
252 PageGetItemId(current->page, current->offnum));
253 if (head->tupstate == SPGIST_LIVE)
254 {
255 leafTuple->nextOffset = head->nextOffset;
256 offnum = SpGistPageAddNewItem(state, current->page,
257 (Item) leafTuple, leafTuple->size,
258 NULL, false);
259
260 /*
261 * re-get head of list because it could have been moved on page,
262 * and set new second element
263 */
264 head = (SpGistLeafTuple) PageGetItem(current->page,
265 PageGetItemId(current->page, current->offnum));
266 head->nextOffset = offnum;
267
268 xlrec.offnumLeaf = offnum;
269 xlrec.offnumHeadLeaf = current->offnum;
270 }
271 else if (head->tupstate == SPGIST_DEAD)
272 {
273 leafTuple->nextOffset = InvalidOffsetNumber;
274 PageIndexTupleDelete(current->page, current->offnum);
275 if (PageAddItem(current->page,
276 (Item) leafTuple, leafTuple->size,
277 current->offnum, false, false) != current->offnum)
278 elog(ERROR, "failed to add item of size %u to SPGiST index page",
279 leafTuple->size);
280
281 /* WAL replay distinguishes this case by equal offnums */
282 xlrec.offnumLeaf = current->offnum;
283 xlrec.offnumHeadLeaf = current->offnum;
284 }
285 else
286 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
287 }
288
289 MarkBufferDirty(current->buffer);
290
291 if (RelationNeedsWAL(index))
292 {
293 XLogRecPtr recptr;
294 int flags;
295
296 XLogBeginInsert();
297 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
298 XLogRegisterData((char *) leafTuple, leafTuple->size);
299
300 flags = REGBUF_STANDARD;
301 if (xlrec.newPage)
302 flags |= REGBUF_WILL_INIT;
303 XLogRegisterBuffer(0, current->buffer, flags);
304 if (xlrec.offnumParent != InvalidOffsetNumber)
305 XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
306
307 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
308
309 PageSetLSN(current->page, recptr);
310
311 /* update parent only if we actually changed it */
312 if (xlrec.offnumParent != InvalidOffsetNumber)
313 {
314 PageSetLSN(parent->page, recptr);
315 }
316 }
317
318 END_CRIT_SECTION();
319 }
320
321 /*
322 * Count the number and total size of leaf tuples in the chain starting at
323 * current->offnum. Return number into *nToSplit and total size as function
324 * result.
325 *
326 * Klugy special case when considering the root page (i.e., root is a leaf
327 * page, but we're about to split for the first time): return fake large
328 * values to force spgdoinsert() to take the doPickSplit rather than
329 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
330 */
331 static int
checkSplitConditions(Relation index,SpGistState * state,SPPageDesc * current,int * nToSplit)332 checkSplitConditions(Relation index, SpGistState *state,
333 SPPageDesc *current, int *nToSplit)
334 {
335 int i,
336 n = 0,
337 totalSize = 0;
338
339 if (SpGistBlockIsRoot(current->blkno))
340 {
341 /* return impossible values to force split */
342 *nToSplit = BLCKSZ;
343 return BLCKSZ;
344 }
345
346 i = current->offnum;
347 while (i != InvalidOffsetNumber)
348 {
349 SpGistLeafTuple it;
350
351 Assert(i >= FirstOffsetNumber &&
352 i <= PageGetMaxOffsetNumber(current->page));
353 it = (SpGistLeafTuple) PageGetItem(current->page,
354 PageGetItemId(current->page, i));
355 if (it->tupstate == SPGIST_LIVE)
356 {
357 n++;
358 totalSize += it->size + sizeof(ItemIdData);
359 }
360 else if (it->tupstate == SPGIST_DEAD)
361 {
362 /* We could see a DEAD tuple as first/only chain item */
363 Assert(i == current->offnum);
364 Assert(it->nextOffset == InvalidOffsetNumber);
365 /* Don't count it in result, because it won't go to other page */
366 }
367 else
368 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
369
370 i = it->nextOffset;
371 }
372
373 *nToSplit = n;
374
375 return totalSize;
376 }
377
378 /*
379 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
380 * but the chain has to be moved because there's not enough room to add
381 * newLeafTuple to its page. We use this method when the chain contains
382 * very little data so a split would be inefficient. We are sure we can
383 * fit the chain plus newLeafTuple on one other page.
384 */
385 static void
moveLeafs(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,bool isNulls)386 moveLeafs(Relation index, SpGistState *state,
387 SPPageDesc *current, SPPageDesc *parent,
388 SpGistLeafTuple newLeafTuple, bool isNulls)
389 {
390 int i,
391 nDelete,
392 nInsert,
393 size;
394 Buffer nbuf;
395 Page npage;
396 SpGistLeafTuple it;
397 OffsetNumber r = InvalidOffsetNumber,
398 startOffset = InvalidOffsetNumber;
399 bool replaceDead = false;
400 OffsetNumber *toDelete;
401 OffsetNumber *toInsert;
402 BlockNumber nblkno;
403 spgxlogMoveLeafs xlrec;
404 char *leafdata,
405 *leafptr;
406
407 /* This doesn't work on root page */
408 Assert(parent->buffer != InvalidBuffer);
409 Assert(parent->buffer != current->buffer);
410
411 /* Locate the tuples to be moved, and count up the space needed */
412 i = PageGetMaxOffsetNumber(current->page);
413 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415
416 size = newLeafTuple->size + sizeof(ItemIdData);
417
418 nDelete = 0;
419 i = current->offnum;
420 while (i != InvalidOffsetNumber)
421 {
422 SpGistLeafTuple it;
423
424 Assert(i >= FirstOffsetNumber &&
425 i <= PageGetMaxOffsetNumber(current->page));
426 it = (SpGistLeafTuple) PageGetItem(current->page,
427 PageGetItemId(current->page, i));
428
429 if (it->tupstate == SPGIST_LIVE)
430 {
431 toDelete[nDelete] = i;
432 size += it->size + sizeof(ItemIdData);
433 nDelete++;
434 }
435 else if (it->tupstate == SPGIST_DEAD)
436 {
437 /* We could see a DEAD tuple as first/only chain item */
438 Assert(i == current->offnum);
439 Assert(it->nextOffset == InvalidOffsetNumber);
440 /* We don't want to move it, so don't count it in size */
441 toDelete[nDelete] = i;
442 nDelete++;
443 replaceDead = true;
444 }
445 else
446 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447
448 i = it->nextOffset;
449 }
450
451 /* Find a leaf page that will hold them */
452 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453 size, &xlrec.newPage);
454 npage = BufferGetPage(nbuf);
455 nblkno = BufferGetBlockNumber(nbuf);
456 Assert(nblkno != current->blkno);
457
458 leafdata = leafptr = palloc(size);
459
460 START_CRIT_SECTION();
461
462 /* copy all the old tuples to new page, unless they're dead */
463 nInsert = 0;
464 if (!replaceDead)
465 {
466 for (i = 0; i < nDelete; i++)
467 {
468 it = (SpGistLeafTuple) PageGetItem(current->page,
469 PageGetItemId(current->page, toDelete[i]));
470 Assert(it->tupstate == SPGIST_LIVE);
471
472 /*
473 * Update chain link (notice the chain order gets reversed, but we
474 * don't care). We're modifying the tuple on the source page
475 * here, but it's okay since we're about to delete it.
476 */
477 it->nextOffset = r;
478
479 r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
480 &startOffset, false);
481
482 toInsert[nInsert] = r;
483 nInsert++;
484
485 /* save modified tuple into leafdata as well */
486 memcpy(leafptr, it, it->size);
487 leafptr += it->size;
488 }
489 }
490
491 /* add the new tuple as well */
492 newLeafTuple->nextOffset = r;
493 r = SpGistPageAddNewItem(state, npage,
494 (Item) newLeafTuple, newLeafTuple->size,
495 &startOffset, false);
496 toInsert[nInsert] = r;
497 nInsert++;
498 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
499 leafptr += newLeafTuple->size;
500
501 /*
502 * Now delete the old tuples, leaving a redirection pointer behind for the
503 * first one, unless we're doing an index build; in which case there can't
504 * be any concurrent scan so we need not provide a redirect.
505 */
506 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
507 state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
508 SPGIST_PLACEHOLDER,
509 nblkno, r);
510
511 /* Update parent's downlink and mark parent page dirty */
512 saveNodeLink(index, parent, nblkno, r);
513
514 /* Mark the leaf pages too */
515 MarkBufferDirty(current->buffer);
516 MarkBufferDirty(nbuf);
517
518 if (RelationNeedsWAL(index))
519 {
520 XLogRecPtr recptr;
521
522 /* prepare WAL info */
523 STORE_STATE(state, xlrec.stateSrc);
524
525 xlrec.nMoves = nDelete;
526 xlrec.replaceDead = replaceDead;
527 xlrec.storesNulls = isNulls;
528
529 xlrec.offnumParent = parent->offnum;
530 xlrec.nodeI = parent->node;
531
532 XLogBeginInsert();
533 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
534 XLogRegisterData((char *) toDelete,
535 sizeof(OffsetNumber) * nDelete);
536 XLogRegisterData((char *) toInsert,
537 sizeof(OffsetNumber) * nInsert);
538 XLogRegisterData((char *) leafdata, leafptr - leafdata);
539
540 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
541 XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
542 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
543
544 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
545
546 PageSetLSN(current->page, recptr);
547 PageSetLSN(npage, recptr);
548 PageSetLSN(parent->page, recptr);
549 }
550
551 END_CRIT_SECTION();
552
553 /* Update local free-space cache and release new buffer */
554 SpGistSetLastUsedPage(index, nbuf);
555 UnlockReleaseBuffer(nbuf);
556 }
557
558 /*
559 * Update previously-created redirection tuple with appropriate destination
560 *
561 * We use this when it's not convenient to know the destination first.
562 * The tuple should have been made with the "impossible" destination of
563 * the metapage.
564 */
565 static void
setRedirectionTuple(SPPageDesc * current,OffsetNumber position,BlockNumber blkno,OffsetNumber offnum)566 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
567 BlockNumber blkno, OffsetNumber offnum)
568 {
569 SpGistDeadTuple dt;
570
571 dt = (SpGistDeadTuple) PageGetItem(current->page,
572 PageGetItemId(current->page, position));
573 Assert(dt->tupstate == SPGIST_REDIRECT);
574 Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
575 ItemPointerSet(&dt->pointer, blkno, offnum);
576 }
577
578 /*
579 * Test to see if the user-defined picksplit function failed to do its job,
580 * ie, it put all the leaf tuples into the same node.
581 * If so, randomly divide the tuples into several nodes (all with the same
582 * label) and return TRUE to select allTheSame mode for this inner tuple.
583 *
584 * (This code is also used to forcibly select allTheSame mode for nulls.)
585 *
586 * If we know that the leaf tuples wouldn't all fit on one page, then we
587 * exclude the last tuple (which is the incoming new tuple that forced a split)
588 * from the check to see if more than one node is used. The reason for this
589 * is that if the existing tuples are put into only one chain, then even if
590 * we move them all to an empty page, there would still not be room for the
591 * new tuple, so we'd get into an infinite loop of picksplit attempts.
592 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
593 * be split across pages. (Exercise for the reader: figure out why this
594 * fixes the problem even when there is only one old tuple.)
595 */
596 static bool
checkAllTheSame(spgPickSplitIn * in,spgPickSplitOut * out,bool tooBig,bool * includeNew)597 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
598 bool *includeNew)
599 {
600 int theNode;
601 int limit;
602 int i;
603
604 /* For the moment, assume we can include the new leaf tuple */
605 *includeNew = true;
606
607 /* If there's only the new leaf tuple, don't select allTheSame mode */
608 if (in->nTuples <= 1)
609 return false;
610
611 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
612 limit = tooBig ? in->nTuples - 1 : in->nTuples;
613
614 /* Check to see if more than one node is populated */
615 theNode = out->mapTuplesToNodes[0];
616 for (i = 1; i < limit; i++)
617 {
618 if (out->mapTuplesToNodes[i] != theNode)
619 return false;
620 }
621
622 /* Nope, so override the picksplit function's decisions */
623
624 /* If the new tuple is in its own node, it can't be included in split */
625 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
626 *includeNew = false;
627
628 out->nNodes = 8; /* arbitrary number of child nodes */
629
630 /* Random assignment of tuples to nodes (note we include new tuple) */
631 for (i = 0; i < in->nTuples; i++)
632 out->mapTuplesToNodes[i] = i % out->nNodes;
633
634 /* The opclass may not use node labels, but if it does, duplicate 'em */
635 if (out->nodeLabels)
636 {
637 Datum theLabel = out->nodeLabels[theNode];
638
639 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
640 for (i = 0; i < out->nNodes; i++)
641 out->nodeLabels[i] = theLabel;
642 }
643
644 /* We don't touch the prefix or the leaf tuple datum assignments */
645
646 return true;
647 }
648
649 /*
650 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
651 * but the chain has to be split because there's not enough room to add
652 * newLeafTuple to its page.
653 *
654 * This function splits the leaf tuple set according to picksplit's rules,
655 * creating one or more new chains that are spread across the current page
656 * and an additional leaf page (we assume that two leaf pages will be
657 * sufficient). A new inner tuple is created, and the parent downlink
658 * pointer is updated to point to that inner tuple instead of the leaf chain.
659 *
660 * On exit, current contains the address of the new inner tuple.
661 *
662 * Returns true if we successfully inserted newLeafTuple during this function,
663 * false if caller still has to do it (meaning another picksplit operation is
664 * probably needed). Failure could occur if the picksplit result is fairly
665 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
666 * Because we force the picksplit result to be at least two chains, each
667 * cycle will get rid of at least one leaf tuple from the chain, so the loop
668 * will eventually terminate if lack of balance is the issue. If the tuple
669 * is too big, we assume that repeated picksplit operations will eventually
670 * make it small enough by repeated prefix-stripping. A broken opclass could
671 * make this an infinite loop, though, so spgdoinsert() checks that the
672 * leaf datums get smaller each time.
673 */
674 static bool
doPickSplit(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,int level,bool isNulls,bool isNew)675 doPickSplit(Relation index, SpGistState *state,
676 SPPageDesc *current, SPPageDesc *parent,
677 SpGistLeafTuple newLeafTuple,
678 int level, bool isNulls, bool isNew)
679 {
680 bool insertedNew = false;
681 spgPickSplitIn in;
682 spgPickSplitOut out;
683 FmgrInfo *procinfo;
684 bool includeNew;
685 int i,
686 max,
687 n;
688 SpGistInnerTuple innerTuple;
689 SpGistNodeTuple node,
690 *nodes;
691 Buffer newInnerBuffer,
692 newLeafBuffer;
693 ItemPointerData *heapPtrs;
694 uint8 *leafPageSelect;
695 int *leafSizes;
696 OffsetNumber *toDelete;
697 OffsetNumber *toInsert;
698 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699 OffsetNumber startOffsets[2];
700 SpGistLeafTuple *newLeafs;
701 int spaceToDelete;
702 int currentFreeSpace;
703 int totalLeafSizes;
704 bool allTheSame;
705 spgxlogPickSplit xlrec;
706 char *leafdata,
707 *leafptr;
708 SPPageDesc saveCurrent;
709 int nToDelete,
710 nToInsert,
711 maxToInclude;
712
713 in.level = level;
714
715 /*
716 * Allocate per-leaf-tuple work arrays with max possible size
717 */
718 max = PageGetMaxOffsetNumber(current->page);
719 n = max + 1;
720 in.datums = (Datum *) palloc(sizeof(Datum) * n);
721 heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
722 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726
727 STORE_STATE(state, xlrec.stateSrc);
728
729 /*
730 * Form list of leaf tuples which will be distributed as split result;
731 * also, count up the amount of space that will be freed from current.
732 * (Note that in the non-root case, we won't actually delete the old
733 * tuples, only replace them with redirects or placeholders.)
734 *
735 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
736 * page. For a pass-by-value data type we will fetch a word that must
737 * exist even though it may contain garbage (because of the fact that leaf
738 * tuples must have size at least SGDTSIZE). For a pass-by-reference type
739 * we are just computing a pointer that isn't going to get dereferenced.
740 * So it's not worth guarding the calls with isNulls checks.
741 */
742 nToInsert = 0;
743 nToDelete = 0;
744 spaceToDelete = 0;
745 if (SpGistBlockIsRoot(current->blkno))
746 {
747 /*
748 * We are splitting the root (which up to now is also a leaf page).
749 * Its tuples are not linked, so scan sequentially to get them all. We
750 * ignore the original value of current->offnum.
751 */
752 for (i = FirstOffsetNumber; i <= max; i++)
753 {
754 SpGistLeafTuple it;
755
756 it = (SpGistLeafTuple) PageGetItem(current->page,
757 PageGetItemId(current->page, i));
758 if (it->tupstate == SPGIST_LIVE)
759 {
760 in.datums[nToInsert] = SGLTDATUM(it, state);
761 heapPtrs[nToInsert] = it->heapPtr;
762 nToInsert++;
763 toDelete[nToDelete] = i;
764 nToDelete++;
765 /* we will delete the tuple altogether, so count full space */
766 spaceToDelete += it->size + sizeof(ItemIdData);
767 }
768 else /* tuples on root should be live */
769 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
770 }
771 }
772 else
773 {
774 /* Normal case, just collect the leaf tuples in the chain */
775 i = current->offnum;
776 while (i != InvalidOffsetNumber)
777 {
778 SpGistLeafTuple it;
779
780 Assert(i >= FirstOffsetNumber && i <= max);
781 it = (SpGistLeafTuple) PageGetItem(current->page,
782 PageGetItemId(current->page, i));
783 if (it->tupstate == SPGIST_LIVE)
784 {
785 in.datums[nToInsert] = SGLTDATUM(it, state);
786 heapPtrs[nToInsert] = it->heapPtr;
787 nToInsert++;
788 toDelete[nToDelete] = i;
789 nToDelete++;
790 /* we will not delete the tuple, only replace with dead */
791 Assert(it->size >= SGDTSIZE);
792 spaceToDelete += it->size - SGDTSIZE;
793 }
794 else if (it->tupstate == SPGIST_DEAD)
795 {
796 /* We could see a DEAD tuple as first/only chain item */
797 Assert(i == current->offnum);
798 Assert(it->nextOffset == InvalidOffsetNumber);
799 toDelete[nToDelete] = i;
800 nToDelete++;
801 /* replacing it with redirect will save no space */
802 }
803 else
804 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
805
806 i = it->nextOffset;
807 }
808 }
809 in.nTuples = nToInsert;
810
811 /*
812 * We may not actually insert new tuple because another picksplit may be
813 * necessary due to too large value, but we will try to allocate enough
814 * space to include it; and in any case it has to be included in the input
815 * for the picksplit function. So don't increment nToInsert yet.
816 */
817 in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
818 heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
819 in.nTuples++;
820
821 memset(&out, 0, sizeof(out));
822
823 if (!isNulls)
824 {
825 /*
826 * Perform split using user-defined method.
827 */
828 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829 FunctionCall2Coll(procinfo,
830 index->rd_indcollation[0],
831 PointerGetDatum(&in),
832 PointerGetDatum(&out));
833
834 /*
835 * Form new leaf tuples and count up the total space needed.
836 */
837 totalLeafSizes = 0;
838 for (i = 0; i < in.nTuples; i++)
839 {
840 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
841 out.leafTupleDatums[i],
842 false);
843 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
844 }
845 }
846 else
847 {
848 /*
849 * Perform dummy split that puts all tuples into one node.
850 * checkAllTheSame will override this and force allTheSame mode.
851 */
852 out.hasPrefix = false;
853 out.nNodes = 1;
854 out.nodeLabels = NULL;
855 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
856
857 /*
858 * Form new leaf tuples and count up the total space needed.
859 */
860 totalLeafSizes = 0;
861 for (i = 0; i < in.nTuples; i++)
862 {
863 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
864 (Datum) 0,
865 true);
866 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
867 }
868 }
869
870 /*
871 * Check to see if the picksplit function failed to separate the values,
872 * ie, it put them all into the same child node. If so, select allTheSame
873 * mode and create a random split instead. See comments for
874 * checkAllTheSame as to why we need to know if the new leaf tuples could
875 * fit on one page.
876 */
877 allTheSame = checkAllTheSame(&in, &out,
878 totalLeafSizes > SPGIST_PAGE_CAPACITY,
879 &includeNew);
880
881 /*
882 * If checkAllTheSame decided we must exclude the new tuple, don't
883 * consider it any further.
884 */
885 if (includeNew)
886 maxToInclude = in.nTuples;
887 else
888 {
889 maxToInclude = in.nTuples - 1;
890 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
891 }
892
893 /*
894 * Allocate per-node work arrays. Since checkAllTheSame could replace
895 * out.nNodes with a value larger than the number of tuples on the input
896 * page, we can't allocate these arrays before here.
897 */
898 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
899 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
900
901 /*
902 * Form nodes of inner tuple and inner tuple itself
903 */
904 for (i = 0; i < out.nNodes; i++)
905 {
906 Datum label = (Datum) 0;
907 bool labelisnull = (out.nodeLabels == NULL);
908
909 if (!labelisnull)
910 label = out.nodeLabels[i];
911 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
912 }
913 innerTuple = spgFormInnerTuple(state,
914 out.hasPrefix, out.prefixDatum,
915 out.nNodes, nodes);
916 innerTuple->allTheSame = allTheSame;
917
918 /*
919 * Update nodes[] array to point into the newly formed innerTuple, so that
920 * we can adjust their downlinks below.
921 */
922 SGITITERATE(innerTuple, i, node)
923 {
924 nodes[i] = node;
925 }
926
927 /*
928 * Re-scan new leaf tuples and count up the space needed under each node.
929 */
930 for (i = 0; i < maxToInclude; i++)
931 {
932 n = out.mapTuplesToNodes[i];
933 if (n < 0 || n >= out.nNodes)
934 elog(ERROR, "inconsistent result of SPGiST picksplit function");
935 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
936 }
937
938 /*
939 * To perform the split, we must insert a new inner tuple, which can't go
940 * on a leaf page; and unless we are splitting the root page, we must then
941 * update the parent tuple's downlink to point to the inner tuple. If
942 * there is room, we'll put the new inner tuple on the same page as the
943 * parent tuple, otherwise we need another non-leaf buffer. But if the
944 * parent page is the root, we can't add the new inner tuple there,
945 * because the root page must have only one inner tuple.
946 */
947 xlrec.initInner = false;
948 if (parent->buffer != InvalidBuffer &&
949 !SpGistBlockIsRoot(parent->blkno) &&
950 (SpGistPageGetFreeSpace(parent->page, 1) >=
951 innerTuple->size + sizeof(ItemIdData)))
952 {
953 /* New inner tuple will fit on parent page */
954 newInnerBuffer = parent->buffer;
955 }
956 else if (parent->buffer != InvalidBuffer)
957 {
958 /* Send tuple to page with next triple parity (see README) */
959 newInnerBuffer = SpGistGetBuffer(index,
960 GBUF_INNER_PARITY(parent->blkno + 1) |
961 (isNulls ? GBUF_NULLS : 0),
962 innerTuple->size + sizeof(ItemIdData),
963 &xlrec.initInner);
964 }
965 else
966 {
967 /* Root page split ... inner tuple will go to root page */
968 newInnerBuffer = InvalidBuffer;
969 }
970
971 /*
972 * The new leaf tuples converted from the existing ones should require the
973 * same or less space, and therefore should all fit onto one page
974 * (although that's not necessarily the current page, since we can't
975 * delete the old tuples but only replace them with placeholders).
976 * However, the incoming new tuple might not also fit, in which case we
977 * might need another picksplit cycle to reduce it some more.
978 *
979 * If there's not room to put everything back onto the current page, then
980 * we decide on a per-node basis which tuples go to the new page. (We do
981 * it like that because leaf tuple chains can't cross pages, so we must
982 * place all leaf tuples belonging to the same parent node on the same
983 * page.)
984 *
985 * If we are splitting the root page (turning it from a leaf page into an
986 * inner page), then no leaf tuples can go back to the current page; they
987 * must all go somewhere else.
988 */
989 if (!SpGistBlockIsRoot(current->blkno))
990 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
991 else
992 currentFreeSpace = 0; /* prevent assigning any tuples to current */
993
994 xlrec.initDest = false;
995
996 if (totalLeafSizes <= currentFreeSpace)
997 {
998 /* All the leaf tuples will fit on current page */
999 newLeafBuffer = InvalidBuffer;
1000 /* mark new leaf tuple as included in insertions, if allowed */
1001 if (includeNew)
1002 {
1003 nToInsert++;
1004 insertedNew = true;
1005 }
1006 for (i = 0; i < nToInsert; i++)
1007 leafPageSelect[i] = 0; /* signifies current page */
1008 }
1009 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1010 {
1011 /*
1012 * We're trying to split up a long value by repeated suffixing, but
1013 * it's not going to fit yet. Don't bother allocating a second leaf
1014 * buffer that we won't be able to use.
1015 */
1016 newLeafBuffer = InvalidBuffer;
1017 Assert(includeNew);
1018 Assert(nToInsert == 0);
1019 }
1020 else
1021 {
1022 /* We will need another leaf page */
1023 uint8 *nodePageSelect;
1024 int curspace;
1025 int newspace;
1026
1027 newLeafBuffer = SpGistGetBuffer(index,
1028 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1029 Min(totalLeafSizes,
1030 SPGIST_PAGE_CAPACITY),
1031 &xlrec.initDest);
1032
1033 /*
1034 * Attempt to assign node groups to the two pages. We might fail to
1035 * do so, even if totalLeafSizes is less than the available space,
1036 * because we can't split a group across pages.
1037 */
1038 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1039
1040 curspace = currentFreeSpace;
1041 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1042 for (i = 0; i < out.nNodes; i++)
1043 {
1044 if (leafSizes[i] <= curspace)
1045 {
1046 nodePageSelect[i] = 0; /* signifies current page */
1047 curspace -= leafSizes[i];
1048 }
1049 else
1050 {
1051 nodePageSelect[i] = 1; /* signifies new leaf page */
1052 newspace -= leafSizes[i];
1053 }
1054 }
1055 if (curspace >= 0 && newspace >= 0)
1056 {
1057 /* Successful assignment, so we can include the new leaf tuple */
1058 if (includeNew)
1059 {
1060 nToInsert++;
1061 insertedNew = true;
1062 }
1063 }
1064 else if (includeNew)
1065 {
1066 /* We must exclude the new leaf tuple from the split */
1067 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1068
1069 leafSizes[nodeOfNewTuple] -=
1070 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1071
1072 /* Repeat the node assignment process --- should succeed now */
1073 curspace = currentFreeSpace;
1074 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1075 for (i = 0; i < out.nNodes; i++)
1076 {
1077 if (leafSizes[i] <= curspace)
1078 {
1079 nodePageSelect[i] = 0; /* signifies current page */
1080 curspace -= leafSizes[i];
1081 }
1082 else
1083 {
1084 nodePageSelect[i] = 1; /* signifies new leaf page */
1085 newspace -= leafSizes[i];
1086 }
1087 }
1088 if (curspace < 0 || newspace < 0)
1089 elog(ERROR, "failed to divide leaf tuple groups across pages");
1090 }
1091 else
1092 {
1093 /* oops, we already excluded new tuple ... should not get here */
1094 elog(ERROR, "failed to divide leaf tuple groups across pages");
1095 }
1096 /* Expand the per-node assignments to be shown per leaf tuple */
1097 for (i = 0; i < nToInsert; i++)
1098 {
1099 n = out.mapTuplesToNodes[i];
1100 leafPageSelect[i] = nodePageSelect[n];
1101 }
1102 }
1103
1104 /* Start preparing WAL record */
1105 xlrec.nDelete = 0;
1106 xlrec.initSrc = isNew;
1107 xlrec.storesNulls = isNulls;
1108 xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1109
1110 leafdata = leafptr = (char *) palloc(totalLeafSizes);
1111
1112 /* Here we begin making the changes to the target pages */
1113 START_CRIT_SECTION();
1114
1115 /*
1116 * Delete old leaf tuples from current buffer, except when we're splitting
1117 * the root; in that case there's no need because we'll re-init the page
1118 * below. We do this first to make room for reinserting new leaf tuples.
1119 */
1120 if (!SpGistBlockIsRoot(current->blkno))
1121 {
1122 /*
1123 * Init buffer instead of deleting individual tuples, but only if
1124 * there aren't any other live tuples and only during build; otherwise
1125 * we need to set a redirection tuple for concurrent scans.
1126 */
1127 if (state->isBuild &&
1128 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1129 PageGetMaxOffsetNumber(current->page))
1130 {
1131 SpGistInitBuffer(current->buffer,
1132 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1133 xlrec.initSrc = true;
1134 }
1135 else if (isNew)
1136 {
1137 /* don't expose the freshly init'd buffer as a backup block */
1138 Assert(nToDelete == 0);
1139 }
1140 else
1141 {
1142 xlrec.nDelete = nToDelete;
1143
1144 if (!state->isBuild)
1145 {
1146 /*
1147 * Need to create redirect tuple (it will point to new inner
1148 * tuple) but right now the new tuple's location is not known
1149 * yet. So, set the redirection pointer to "impossible" value
1150 * and remember its position to update tuple later.
1151 */
1152 if (nToDelete > 0)
1153 redirectTuplePos = toDelete[0];
1154 spgPageIndexMultiDelete(state, current->page,
1155 toDelete, nToDelete,
1156 SPGIST_REDIRECT,
1157 SPGIST_PLACEHOLDER,
1158 SPGIST_METAPAGE_BLKNO,
1159 FirstOffsetNumber);
1160 }
1161 else
1162 {
1163 /*
1164 * During index build there is not concurrent searches, so we
1165 * don't need to create redirection tuple.
1166 */
1167 spgPageIndexMultiDelete(state, current->page,
1168 toDelete, nToDelete,
1169 SPGIST_PLACEHOLDER,
1170 SPGIST_PLACEHOLDER,
1171 InvalidBlockNumber,
1172 InvalidOffsetNumber);
1173 }
1174 }
1175 }
1176
1177 /*
1178 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1179 * nodes.
1180 */
1181 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1182 for (i = 0; i < nToInsert; i++)
1183 {
1184 SpGistLeafTuple it = newLeafs[i];
1185 Buffer leafBuffer;
1186 BlockNumber leafBlock;
1187 OffsetNumber newoffset;
1188
1189 /* Which page is it going to? */
1190 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1191 leafBlock = BufferGetBlockNumber(leafBuffer);
1192
1193 /* Link tuple into correct chain for its node */
1194 n = out.mapTuplesToNodes[i];
1195
1196 if (ItemPointerIsValid(&nodes[n]->t_tid))
1197 {
1198 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1199 it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1200 }
1201 else
1202 it->nextOffset = InvalidOffsetNumber;
1203
1204 /* Insert it on page */
1205 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1206 (Item) it, it->size,
1207 &startOffsets[leafPageSelect[i]],
1208 false);
1209 toInsert[i] = newoffset;
1210
1211 /* ... and complete the chain linking */
1212 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1213
1214 /* Also copy leaf tuple into WAL data */
1215 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1216 leafptr += newLeafs[i]->size;
1217 }
1218
1219 /*
1220 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1221 * current->buffer will be marked below, after we're entirely done
1222 * modifying it.
1223 */
1224 if (newLeafBuffer != InvalidBuffer)
1225 {
1226 MarkBufferDirty(newLeafBuffer);
1227 }
1228
1229 /* Remember current buffer, since we're about to change "current" */
1230 saveCurrent = *current;
1231
1232 /*
1233 * Store the new innerTuple
1234 */
1235 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1236 {
1237 /*
1238 * new inner tuple goes to parent page
1239 */
1240 Assert(current->buffer != parent->buffer);
1241
1242 /* Repoint "current" at the new inner tuple */
1243 current->blkno = parent->blkno;
1244 current->buffer = parent->buffer;
1245 current->page = parent->page;
1246 xlrec.offnumInner = current->offnum =
1247 SpGistPageAddNewItem(state, current->page,
1248 (Item) innerTuple, innerTuple->size,
1249 NULL, false);
1250
1251 /*
1252 * Update parent node link and mark parent page dirty
1253 */
1254 xlrec.innerIsParent = true;
1255 xlrec.offnumParent = parent->offnum;
1256 xlrec.nodeI = parent->node;
1257 saveNodeLink(index, parent, current->blkno, current->offnum);
1258
1259 /*
1260 * Update redirection link (in old current buffer)
1261 */
1262 if (redirectTuplePos != InvalidOffsetNumber)
1263 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1264 current->blkno, current->offnum);
1265
1266 /* Done modifying old current buffer, mark it dirty */
1267 MarkBufferDirty(saveCurrent.buffer);
1268 }
1269 else if (parent->buffer != InvalidBuffer)
1270 {
1271 /*
1272 * new inner tuple will be stored on a new page
1273 */
1274 Assert(newInnerBuffer != InvalidBuffer);
1275
1276 /* Repoint "current" at the new inner tuple */
1277 current->buffer = newInnerBuffer;
1278 current->blkno = BufferGetBlockNumber(current->buffer);
1279 current->page = BufferGetPage(current->buffer);
1280 xlrec.offnumInner = current->offnum =
1281 SpGistPageAddNewItem(state, current->page,
1282 (Item) innerTuple, innerTuple->size,
1283 NULL, false);
1284
1285 /* Done modifying new current buffer, mark it dirty */
1286 MarkBufferDirty(current->buffer);
1287
1288 /*
1289 * Update parent node link and mark parent page dirty
1290 */
1291 xlrec.innerIsParent = (parent->buffer == current->buffer);
1292 xlrec.offnumParent = parent->offnum;
1293 xlrec.nodeI = parent->node;
1294 saveNodeLink(index, parent, current->blkno, current->offnum);
1295
1296 /*
1297 * Update redirection link (in old current buffer)
1298 */
1299 if (redirectTuplePos != InvalidOffsetNumber)
1300 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1301 current->blkno, current->offnum);
1302
1303 /* Done modifying old current buffer, mark it dirty */
1304 MarkBufferDirty(saveCurrent.buffer);
1305 }
1306 else
1307 {
1308 /*
1309 * Splitting root page, which was a leaf but now becomes inner page
1310 * (and so "current" continues to point at it)
1311 */
1312 Assert(SpGistBlockIsRoot(current->blkno));
1313 Assert(redirectTuplePos == InvalidOffsetNumber);
1314
1315 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1316 xlrec.initInner = true;
1317 xlrec.innerIsParent = false;
1318
1319 xlrec.offnumInner = current->offnum =
1320 PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1321 InvalidOffsetNumber, false, false);
1322 if (current->offnum != FirstOffsetNumber)
1323 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1324 innerTuple->size);
1325
1326 /* No parent link to update, nor redirection to do */
1327 xlrec.offnumParent = InvalidOffsetNumber;
1328 xlrec.nodeI = 0;
1329
1330 /* Done modifying new current buffer, mark it dirty */
1331 MarkBufferDirty(current->buffer);
1332
1333 /* saveCurrent doesn't represent a different buffer */
1334 saveCurrent.buffer = InvalidBuffer;
1335 }
1336
1337 if (RelationNeedsWAL(index))
1338 {
1339 XLogRecPtr recptr;
1340 int flags;
1341
1342 XLogBeginInsert();
1343
1344 xlrec.nInsert = nToInsert;
1345 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1346
1347 XLogRegisterData((char *) toDelete,
1348 sizeof(OffsetNumber) * xlrec.nDelete);
1349 XLogRegisterData((char *) toInsert,
1350 sizeof(OffsetNumber) * xlrec.nInsert);
1351 XLogRegisterData((char *) leafPageSelect,
1352 sizeof(uint8) * xlrec.nInsert);
1353 XLogRegisterData((char *) innerTuple, innerTuple->size);
1354 XLogRegisterData(leafdata, leafptr - leafdata);
1355
1356 /* Old leaf page */
1357 if (BufferIsValid(saveCurrent.buffer))
1358 {
1359 flags = REGBUF_STANDARD;
1360 if (xlrec.initSrc)
1361 flags |= REGBUF_WILL_INIT;
1362 XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1363 }
1364
1365 /* New leaf page */
1366 if (BufferIsValid(newLeafBuffer))
1367 {
1368 flags = REGBUF_STANDARD;
1369 if (xlrec.initDest)
1370 flags |= REGBUF_WILL_INIT;
1371 XLogRegisterBuffer(1, newLeafBuffer, flags);
1372 }
1373
1374 /* Inner page */
1375 flags = REGBUF_STANDARD;
1376 if (xlrec.initInner)
1377 flags |= REGBUF_WILL_INIT;
1378 XLogRegisterBuffer(2, current->buffer, flags);
1379
1380 /* Parent page, if different from inner page */
1381 if (parent->buffer != InvalidBuffer)
1382 {
1383 if (parent->buffer != current->buffer)
1384 XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1385 else
1386 Assert(xlrec.innerIsParent);
1387 }
1388
1389 /* Issue the WAL record */
1390 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1391
1392 /* Update page LSNs on all affected pages */
1393 if (newLeafBuffer != InvalidBuffer)
1394 {
1395 Page page = BufferGetPage(newLeafBuffer);
1396
1397 PageSetLSN(page, recptr);
1398 }
1399
1400 if (saveCurrent.buffer != InvalidBuffer)
1401 {
1402 Page page = BufferGetPage(saveCurrent.buffer);
1403
1404 PageSetLSN(page, recptr);
1405 }
1406
1407 PageSetLSN(current->page, recptr);
1408
1409 if (parent->buffer != InvalidBuffer)
1410 {
1411 PageSetLSN(parent->page, recptr);
1412 }
1413 }
1414
1415 END_CRIT_SECTION();
1416
1417 /* Update local free-space cache and unlock buffers */
1418 if (newLeafBuffer != InvalidBuffer)
1419 {
1420 SpGistSetLastUsedPage(index, newLeafBuffer);
1421 UnlockReleaseBuffer(newLeafBuffer);
1422 }
1423 if (saveCurrent.buffer != InvalidBuffer)
1424 {
1425 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1426 UnlockReleaseBuffer(saveCurrent.buffer);
1427 }
1428
1429 return insertedNew;
1430 }
1431
1432 /*
1433 * spgMatchNode action: descend to N'th child node of current inner tuple
1434 */
1435 static void
spgMatchNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN)1436 spgMatchNodeAction(Relation index, SpGistState *state,
1437 SpGistInnerTuple innerTuple,
1438 SPPageDesc *current, SPPageDesc *parent, int nodeN)
1439 {
1440 int i;
1441 SpGistNodeTuple node;
1442
1443 /* Release previous parent buffer if any */
1444 if (parent->buffer != InvalidBuffer &&
1445 parent->buffer != current->buffer)
1446 {
1447 SpGistSetLastUsedPage(index, parent->buffer);
1448 UnlockReleaseBuffer(parent->buffer);
1449 }
1450
1451 /* Repoint parent to specified node of current inner tuple */
1452 parent->blkno = current->blkno;
1453 parent->buffer = current->buffer;
1454 parent->page = current->page;
1455 parent->offnum = current->offnum;
1456 parent->node = nodeN;
1457
1458 /* Locate that node */
1459 SGITITERATE(innerTuple, i, node)
1460 {
1461 if (i == nodeN)
1462 break;
1463 }
1464
1465 if (i != nodeN)
1466 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1467 nodeN);
1468
1469 /* Point current to the downlink location, if any */
1470 if (ItemPointerIsValid(&node->t_tid))
1471 {
1472 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1473 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1474 }
1475 else
1476 {
1477 /* Downlink is empty, so we'll need to find a new page */
1478 current->blkno = InvalidBlockNumber;
1479 current->offnum = InvalidOffsetNumber;
1480 }
1481
1482 current->buffer = InvalidBuffer;
1483 current->page = NULL;
1484 }
1485
1486 /*
1487 * spgAddNode action: add a node to the inner tuple at current
1488 */
1489 static void
spgAddNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN,Datum nodeLabel)1490 spgAddNodeAction(Relation index, SpGistState *state,
1491 SpGistInnerTuple innerTuple,
1492 SPPageDesc *current, SPPageDesc *parent,
1493 int nodeN, Datum nodeLabel)
1494 {
1495 SpGistInnerTuple newInnerTuple;
1496 spgxlogAddNode xlrec;
1497
1498 /* Should not be applied to nulls */
1499 Assert(!SpGistPageStoresNulls(current->page));
1500
1501 /* Construct new inner tuple with additional node */
1502 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1503
1504 /* Prepare WAL record */
1505 STORE_STATE(state, xlrec.stateSrc);
1506 xlrec.offnum = current->offnum;
1507
1508 /* we don't fill these unless we need to change the parent downlink */
1509 xlrec.parentBlk = -1;
1510 xlrec.offnumParent = InvalidOffsetNumber;
1511 xlrec.nodeI = 0;
1512
1513 /* we don't fill these unless tuple has to be moved */
1514 xlrec.offnumNew = InvalidOffsetNumber;
1515 xlrec.newPage = false;
1516
1517 if (PageGetExactFreeSpace(current->page) >=
1518 newInnerTuple->size - innerTuple->size)
1519 {
1520 /*
1521 * We can replace the inner tuple by new version in-place
1522 */
1523 START_CRIT_SECTION();
1524
1525 PageIndexTupleDelete(current->page, current->offnum);
1526 if (PageAddItem(current->page,
1527 (Item) newInnerTuple, newInnerTuple->size,
1528 current->offnum, false, false) != current->offnum)
1529 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1530 newInnerTuple->size);
1531
1532 MarkBufferDirty(current->buffer);
1533
1534 if (RelationNeedsWAL(index))
1535 {
1536 XLogRecPtr recptr;
1537
1538 XLogBeginInsert();
1539 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1540 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1541
1542 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1543
1544 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1545
1546 PageSetLSN(current->page, recptr);
1547 }
1548
1549 END_CRIT_SECTION();
1550 }
1551 else
1552 {
1553 /*
1554 * move inner tuple to another page, and update parent
1555 */
1556 SpGistDeadTuple dt;
1557 SPPageDesc saveCurrent;
1558
1559 /*
1560 * It should not be possible to get here for the root page, since we
1561 * allow only one inner tuple on the root page, and spgFormInnerTuple
1562 * always checks that inner tuples don't exceed the size of a page.
1563 */
1564 if (SpGistBlockIsRoot(current->blkno))
1565 elog(ERROR, "cannot enlarge root tuple any more");
1566 Assert(parent->buffer != InvalidBuffer);
1567
1568 saveCurrent = *current;
1569
1570 xlrec.offnumParent = parent->offnum;
1571 xlrec.nodeI = parent->node;
1572
1573 /*
1574 * obtain new buffer with the same parity as current, since it will be
1575 * a child of same parent tuple
1576 */
1577 current->buffer = SpGistGetBuffer(index,
1578 GBUF_INNER_PARITY(current->blkno),
1579 newInnerTuple->size + sizeof(ItemIdData),
1580 &xlrec.newPage);
1581 current->blkno = BufferGetBlockNumber(current->buffer);
1582 current->page = BufferGetPage(current->buffer);
1583
1584 /*
1585 * Let's just make real sure new current isn't same as old. Right now
1586 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1587 * delete placeholder tuples before checking space, maybe it wouldn't
1588 * be impossible. The case would appear to work except that WAL
1589 * replay would be subtly wrong, so I think a mere assert isn't enough
1590 * here.
1591 */
1592 if (current->blkno == saveCurrent.blkno)
1593 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1594
1595 /*
1596 * New current and parent buffer will both be modified; but note that
1597 * parent buffer could be same as either new or old current.
1598 */
1599 if (parent->buffer == saveCurrent.buffer)
1600 xlrec.parentBlk = 0;
1601 else if (parent->buffer == current->buffer)
1602 xlrec.parentBlk = 1;
1603 else
1604 xlrec.parentBlk = 2;
1605
1606 START_CRIT_SECTION();
1607
1608 /* insert new ... */
1609 xlrec.offnumNew = current->offnum =
1610 SpGistPageAddNewItem(state, current->page,
1611 (Item) newInnerTuple, newInnerTuple->size,
1612 NULL, false);
1613
1614 MarkBufferDirty(current->buffer);
1615
1616 /* update parent's downlink and mark parent page dirty */
1617 saveNodeLink(index, parent, current->blkno, current->offnum);
1618
1619 /*
1620 * Replace old tuple with a placeholder or redirection tuple. Unless
1621 * doing an index build, we have to insert a redirection tuple for
1622 * possible concurrent scans. We can't just delete it in any case,
1623 * because that could change the offsets of other tuples on the page,
1624 * breaking downlinks from their parents.
1625 */
1626 if (state->isBuild)
1627 dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1628 InvalidBlockNumber, InvalidOffsetNumber);
1629 else
1630 dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1631 current->blkno, current->offnum);
1632
1633 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1634 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1635 saveCurrent.offnum,
1636 false, false) != saveCurrent.offnum)
1637 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1638 dt->size);
1639
1640 if (state->isBuild)
1641 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1642 else
1643 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1644
1645 MarkBufferDirty(saveCurrent.buffer);
1646
1647 if (RelationNeedsWAL(index))
1648 {
1649 XLogRecPtr recptr;
1650 int flags;
1651
1652 XLogBeginInsert();
1653
1654 /* orig page */
1655 XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1656 /* new page */
1657 flags = REGBUF_STANDARD;
1658 if (xlrec.newPage)
1659 flags |= REGBUF_WILL_INIT;
1660 XLogRegisterBuffer(1, current->buffer, flags);
1661 /* parent page (if different from orig and new) */
1662 if (xlrec.parentBlk == 2)
1663 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1664
1665 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1666 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1667
1668 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1669
1670 /* we don't bother to check if any of these are redundant */
1671 PageSetLSN(current->page, recptr);
1672 PageSetLSN(parent->page, recptr);
1673 PageSetLSN(saveCurrent.page, recptr);
1674 }
1675
1676 END_CRIT_SECTION();
1677
1678 /* Release saveCurrent if it's not same as current or parent */
1679 if (saveCurrent.buffer != current->buffer &&
1680 saveCurrent.buffer != parent->buffer)
1681 {
1682 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1683 UnlockReleaseBuffer(saveCurrent.buffer);
1684 }
1685 }
1686 }
1687
1688 /*
1689 * spgSplitNode action: split inner tuple at current into prefix and postfix
1690 */
1691 static void
spgSplitNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,spgChooseOut * out)1692 spgSplitNodeAction(Relation index, SpGistState *state,
1693 SpGistInnerTuple innerTuple,
1694 SPPageDesc *current, spgChooseOut *out)
1695 {
1696 SpGistInnerTuple prefixTuple,
1697 postfixTuple;
1698 SpGistNodeTuple node,
1699 *nodes;
1700 BlockNumber postfixBlkno;
1701 OffsetNumber postfixOffset;
1702 int i;
1703 spgxlogSplitTuple xlrec;
1704 Buffer newBuffer = InvalidBuffer;
1705
1706 /* Should not be applied to nulls */
1707 Assert(!SpGistPageStoresNulls(current->page));
1708
1709 /*
1710 * Construct new prefix tuple, containing a single node with the specified
1711 * label. (We'll update the node's downlink to point to the new postfix
1712 * tuple, below.)
1713 */
1714 node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
1715
1716 prefixTuple = spgFormInnerTuple(state,
1717 out->result.splitTuple.prefixHasPrefix,
1718 out->result.splitTuple.prefixPrefixDatum,
1719 1, &node);
1720
1721 /* it must fit in the space that innerTuple now occupies */
1722 if (prefixTuple->size > innerTuple->size)
1723 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1724
1725 /*
1726 * Construct new postfix tuple, containing all nodes of innerTuple with
1727 * same node datums, but with the prefix specified by the picksplit
1728 * function.
1729 */
1730 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1731 SGITITERATE(innerTuple, i, node)
1732 {
1733 nodes[i] = node;
1734 }
1735
1736 postfixTuple = spgFormInnerTuple(state,
1737 out->result.splitTuple.postfixHasPrefix,
1738 out->result.splitTuple.postfixPrefixDatum,
1739 innerTuple->nNodes, nodes);
1740
1741 /* Postfix tuple is allTheSame if original tuple was */
1742 postfixTuple->allTheSame = innerTuple->allTheSame;
1743
1744 /* prep data for WAL record */
1745 xlrec.newPage = false;
1746
1747 /*
1748 * If we can't fit both tuples on the current page, get a new page for the
1749 * postfix tuple. In particular, can't split to the root page.
1750 *
1751 * For the space calculation, note that prefixTuple replaces innerTuple
1752 * but postfixTuple will be a new entry.
1753 */
1754 if (SpGistBlockIsRoot(current->blkno) ||
1755 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1756 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1757 {
1758 /*
1759 * Choose page with next triple parity, because postfix tuple is a
1760 * child of prefix one
1761 */
1762 newBuffer = SpGistGetBuffer(index,
1763 GBUF_INNER_PARITY(current->blkno + 1),
1764 postfixTuple->size + sizeof(ItemIdData),
1765 &xlrec.newPage);
1766 }
1767
1768 START_CRIT_SECTION();
1769
1770 /*
1771 * Replace old tuple by prefix tuple
1772 */
1773 PageIndexTupleDelete(current->page, current->offnum);
1774 xlrec.offnumPrefix = PageAddItem(current->page,
1775 (Item) prefixTuple, prefixTuple->size,
1776 current->offnum, false, false);
1777 if (xlrec.offnumPrefix != current->offnum)
1778 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1779 prefixTuple->size);
1780
1781 /*
1782 * put postfix tuple into appropriate page
1783 */
1784 if (newBuffer == InvalidBuffer)
1785 {
1786 postfixBlkno = current->blkno;
1787 xlrec.offnumPostfix = postfixOffset =
1788 SpGistPageAddNewItem(state, current->page,
1789 (Item) postfixTuple, postfixTuple->size,
1790 NULL, false);
1791 xlrec.postfixBlkSame = true;
1792 }
1793 else
1794 {
1795 postfixBlkno = BufferGetBlockNumber(newBuffer);
1796 xlrec.offnumPostfix = postfixOffset =
1797 SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1798 (Item) postfixTuple, postfixTuple->size,
1799 NULL, false);
1800 MarkBufferDirty(newBuffer);
1801 xlrec.postfixBlkSame = false;
1802 }
1803
1804 /*
1805 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1806 * (We can't avoid this step by doing the above two steps in opposite
1807 * order, because there might not be enough space on the page to insert
1808 * the postfix tuple first.) We have to update the local copy of the
1809 * prefixTuple too, because that's what will be written to WAL.
1810 */
1811 spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1812 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1813 PageGetItemId(current->page, current->offnum));
1814 spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1815
1816 MarkBufferDirty(current->buffer);
1817
1818 if (RelationNeedsWAL(index))
1819 {
1820 XLogRecPtr recptr;
1821
1822 XLogBeginInsert();
1823 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1824 XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1825 XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1826
1827 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1828 if (newBuffer != InvalidBuffer)
1829 {
1830 int flags;
1831
1832 flags = REGBUF_STANDARD;
1833 if (xlrec.newPage)
1834 flags |= REGBUF_WILL_INIT;
1835 XLogRegisterBuffer(1, newBuffer, flags);
1836 }
1837
1838 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1839
1840 PageSetLSN(current->page, recptr);
1841
1842 if (newBuffer != InvalidBuffer)
1843 {
1844 PageSetLSN(BufferGetPage(newBuffer), recptr);
1845 }
1846 }
1847
1848 END_CRIT_SECTION();
1849
1850 /* Update local free-space cache and release buffer */
1851 if (newBuffer != InvalidBuffer)
1852 {
1853 SpGistSetLastUsedPage(index, newBuffer);
1854 UnlockReleaseBuffer(newBuffer);
1855 }
1856 }
1857
1858 /*
1859 * Insert one item into the index.
1860 *
1861 * Returns true on success, false if we failed to complete the insertion
1862 * (typically because of conflict with a concurrent insert). In the latter
1863 * case, caller should re-call spgdoinsert() with the same args.
1864 */
1865 bool
spgdoinsert(Relation index,SpGistState * state,ItemPointer heapPtr,Datum datum,bool isnull)1866 spgdoinsert(Relation index, SpGistState *state,
1867 ItemPointer heapPtr, Datum datum, bool isnull)
1868 {
1869 bool result = true;
1870 int level = 0;
1871 Datum leafDatum;
1872 int leafSize;
1873 int bestLeafSize;
1874 int numNoProgressCycles = 0;
1875 SPPageDesc current,
1876 parent;
1877 FmgrInfo *procinfo = NULL;
1878
1879 /*
1880 * Look up FmgrInfo of the user-defined choose function once, to save
1881 * cycles in the loop below.
1882 */
1883 if (!isnull)
1884 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1885
1886 /*
1887 * Since we don't use index_form_tuple in this AM, we have to make sure
1888 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1889 * that.
1890 */
1891 if (!isnull && state->attType.attlen == -1)
1892 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1893
1894 leafDatum = datum;
1895
1896 /*
1897 * Compute space needed for a leaf tuple containing the given datum.
1898 *
1899 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1900 * suffixing, bail out now rather than doing a lot of useless work.
1901 */
1902 if (!isnull)
1903 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1904 SpGistGetTypeSize(&state->attType, leafDatum);
1905 else
1906 leafSize = SGDTSIZE + sizeof(ItemIdData);
1907
1908 if (leafSize > SPGIST_PAGE_CAPACITY &&
1909 (isnull || !state->config.longValuesOK))
1910 ereport(ERROR,
1911 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1912 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1913 leafSize - sizeof(ItemIdData),
1914 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1915 RelationGetRelationName(index)),
1916 errhint("Values larger than a buffer page cannot be indexed.")));
1917 bestLeafSize = leafSize;
1918
1919 /* Initialize "current" to the appropriate root page */
1920 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1921 current.buffer = InvalidBuffer;
1922 current.page = NULL;
1923 current.offnum = FirstOffsetNumber;
1924 current.node = -1;
1925
1926 /* "parent" is invalid for the moment */
1927 parent.blkno = InvalidBlockNumber;
1928 parent.buffer = InvalidBuffer;
1929 parent.page = NULL;
1930 parent.offnum = InvalidOffsetNumber;
1931 parent.node = -1;
1932
1933 /*
1934 * Before entering the loop, try to clear any pending interrupt condition.
1935 * If a query cancel is pending, we might as well accept it now not later;
1936 * while if a non-canceling condition is pending, servicing it here avoids
1937 * having to restart the insertion and redo all the work so far.
1938 */
1939 CHECK_FOR_INTERRUPTS();
1940
1941 for (;;)
1942 {
1943 bool isNew = false;
1944
1945 /*
1946 * Bail out if query cancel is pending. We must have this somewhere
1947 * in the loop since a broken opclass could produce an infinite
1948 * picksplit loop. However, because we'll be holding buffer lock(s)
1949 * after the first iteration, ProcessInterrupts() wouldn't be able to
1950 * throw a cancel error here. Hence, if we see that an interrupt is
1951 * pending, break out of the loop and deal with the situation below.
1952 * Set result = false because we must restart the insertion if the
1953 * interrupt isn't a query-cancel-or-die case.
1954 */
1955 if (INTERRUPTS_PENDING_CONDITION())
1956 {
1957 result = false;
1958 break;
1959 }
1960
1961 if (current.blkno == InvalidBlockNumber)
1962 {
1963 /*
1964 * Create a leaf page. If leafSize is too large to fit on a page,
1965 * we won't actually use the page yet, but it simplifies the API
1966 * for doPickSplit to always have a leaf page at hand; so just
1967 * quietly limit our request to a page size.
1968 */
1969 current.buffer =
1970 SpGistGetBuffer(index,
1971 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1972 Min(leafSize, SPGIST_PAGE_CAPACITY),
1973 &isNew);
1974 current.blkno = BufferGetBlockNumber(current.buffer);
1975 }
1976 else if (parent.buffer == InvalidBuffer)
1977 {
1978 /* we hold no parent-page lock, so no deadlock is possible */
1979 current.buffer = ReadBuffer(index, current.blkno);
1980 LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1981 }
1982 else if (current.blkno != parent.blkno)
1983 {
1984 /* descend to a new child page */
1985 current.buffer = ReadBuffer(index, current.blkno);
1986
1987 /*
1988 * Attempt to acquire lock on child page. We must beware of
1989 * deadlock against another insertion process descending from that
1990 * page to our parent page (see README). If we fail to get lock,
1991 * abandon the insertion and tell our caller to start over.
1992 *
1993 * XXX this could be improved, because failing to get lock on a
1994 * buffer is not proof of a deadlock situation; the lock might be
1995 * held by a reader, or even just background writer/checkpointer
1996 * process. Perhaps it'd be worth retrying after sleeping a bit?
1997 */
1998 if (!ConditionalLockBuffer(current.buffer))
1999 {
2000 ReleaseBuffer(current.buffer);
2001 UnlockReleaseBuffer(parent.buffer);
2002 return false;
2003 }
2004 }
2005 else
2006 {
2007 /* inner tuple can be stored on the same page as parent one */
2008 current.buffer = parent.buffer;
2009 }
2010 current.page = BufferGetPage(current.buffer);
2011
2012 /* should not arrive at a page of the wrong type */
2013 if (isnull ? !SpGistPageStoresNulls(current.page) :
2014 SpGistPageStoresNulls(current.page))
2015 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2016 current.blkno);
2017
2018 if (SpGistPageIsLeaf(current.page))
2019 {
2020 SpGistLeafTuple leafTuple;
2021 int nToSplit,
2022 sizeToSplit;
2023
2024 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2025 if (leafTuple->size + sizeof(ItemIdData) <=
2026 SpGistPageGetFreeSpace(current.page, 1))
2027 {
2028 /* it fits on page, so insert it and we're done */
2029 addLeafTuple(index, state, leafTuple,
2030 ¤t, &parent, isnull, isNew);
2031 break;
2032 }
2033 else if ((sizeToSplit =
2034 checkSplitConditions(index, state, ¤t,
2035 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2036 nToSplit < 64 &&
2037 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2038 {
2039 /*
2040 * the amount of data is pretty small, so just move the whole
2041 * chain to another leaf page rather than splitting it.
2042 */
2043 Assert(!isNew);
2044 moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
2045 break; /* we're done */
2046 }
2047 else
2048 {
2049 /* picksplit */
2050 if (doPickSplit(index, state, ¤t, &parent,
2051 leafTuple, level, isnull, isNew))
2052 break; /* doPickSplit installed new tuples */
2053
2054 /* leaf tuple will not be inserted yet */
2055 pfree(leafTuple);
2056
2057 /*
2058 * current now describes new inner tuple, go insert into it
2059 */
2060 Assert(!SpGistPageIsLeaf(current.page));
2061 goto process_inner_tuple;
2062 }
2063 }
2064 else /* non-leaf page */
2065 {
2066 /*
2067 * Apply the opclass choose function to figure out how to insert
2068 * the given datum into the current inner tuple.
2069 */
2070 SpGistInnerTuple innerTuple;
2071 spgChooseIn in;
2072 spgChooseOut out;
2073
2074 /*
2075 * spgAddNode and spgSplitTuple cases will loop back to here to
2076 * complete the insertion operation. Just in case the choose
2077 * function is broken and produces add or split requests
2078 * repeatedly, check for query cancel (see comments above).
2079 */
2080 process_inner_tuple:
2081 if (INTERRUPTS_PENDING_CONDITION())
2082 {
2083 result = false;
2084 break;
2085 }
2086
2087 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2088 PageGetItemId(current.page, current.offnum));
2089
2090 in.datum = datum;
2091 in.leafDatum = leafDatum;
2092 in.level = level;
2093 in.allTheSame = innerTuple->allTheSame;
2094 in.hasPrefix = (innerTuple->prefixSize > 0);
2095 in.prefixDatum = SGITDATUM(innerTuple, state);
2096 in.nNodes = innerTuple->nNodes;
2097 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2098
2099 memset(&out, 0, sizeof(out));
2100
2101 if (!isnull)
2102 {
2103 /* use user-defined choose method */
2104 FunctionCall2Coll(procinfo,
2105 index->rd_indcollation[0],
2106 PointerGetDatum(&in),
2107 PointerGetDatum(&out));
2108 }
2109 else
2110 {
2111 /* force "match" action (to insert to random subnode) */
2112 out.resultType = spgMatchNode;
2113 }
2114
2115 if (innerTuple->allTheSame)
2116 {
2117 /*
2118 * It's not allowed to do an AddNode at an allTheSame tuple.
2119 * Opclass must say "match", in which case we choose a random
2120 * one of the nodes to descend into, or "split".
2121 */
2122 if (out.resultType == spgAddNode)
2123 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2124 else if (out.resultType == spgMatchNode)
2125 out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2126 }
2127
2128 switch (out.resultType)
2129 {
2130 case spgMatchNode:
2131 /* Descend to N'th child node */
2132 spgMatchNodeAction(index, state, innerTuple,
2133 ¤t, &parent,
2134 out.result.matchNode.nodeN);
2135 /* Adjust level as per opclass request */
2136 level += out.result.matchNode.levelAdd;
2137 /* Replace leafDatum and recompute leafSize */
2138 if (!isnull)
2139 {
2140 leafDatum = out.result.matchNode.restDatum;
2141 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2142 SpGistGetTypeSize(&state->attType, leafDatum);
2143 }
2144
2145 /*
2146 * Check new tuple size; fail if it can't fit, unless the
2147 * opclass says it can handle the situation by suffixing.
2148 *
2149 * A buggy opclass might not ever make the leaf datum
2150 * small enough, causing an infinite loop. To detect such
2151 * a loop, check to see if we are making progress by
2152 * reducing the leafSize in each pass. This is a bit
2153 * tricky though. Because of alignment considerations,
2154 * the total tuple size might not decrease on every pass.
2155 * Also, there are edge cases where the choose method
2156 * might seem to not make progress for a cycle or two.
2157 * Somewhat arbitrarily, we allow up to 10 no-progress
2158 * iterations before failing. (This limit should be more
2159 * than MAXALIGN, to accommodate opclasses that trim one
2160 * byte from the leaf datum per pass.)
2161 */
2162 if (leafSize > SPGIST_PAGE_CAPACITY)
2163 {
2164 bool ok = false;
2165
2166 if (state->config.longValuesOK && !isnull)
2167 {
2168 if (leafSize < bestLeafSize)
2169 {
2170 ok = true;
2171 bestLeafSize = leafSize;
2172 numNoProgressCycles = 0;
2173 }
2174 else if (++numNoProgressCycles < 10)
2175 ok = true;
2176 }
2177 if (!ok)
2178 ereport(ERROR,
2179 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2180 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2181 leafSize - sizeof(ItemIdData),
2182 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2183 RelationGetRelationName(index)),
2184 errhint("Values larger than a buffer page cannot be indexed.")));
2185 }
2186
2187 /*
2188 * Loop around and attempt to insert the new leafDatum at
2189 * "current" (which might reference an existing child
2190 * tuple, or might be invalid to force us to find a new
2191 * page for the tuple).
2192 */
2193 break;
2194 case spgAddNode:
2195 /* AddNode is not sensible if nodes don't have labels */
2196 if (in.nodeLabels == NULL)
2197 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2198 /* Add node to inner tuple, per request */
2199 spgAddNodeAction(index, state, innerTuple,
2200 ¤t, &parent,
2201 out.result.addNode.nodeN,
2202 out.result.addNode.nodeLabel);
2203
2204 /*
2205 * Retry insertion into the enlarged node. We assume that
2206 * we'll get a MatchNode result this time.
2207 */
2208 goto process_inner_tuple;
2209 break;
2210 case spgSplitTuple:
2211 /* Split inner tuple, per request */
2212 spgSplitNodeAction(index, state, innerTuple,
2213 ¤t, &out);
2214
2215 /* Retry insertion into the split node */
2216 goto process_inner_tuple;
2217 break;
2218 default:
2219 elog(ERROR, "unrecognized SPGiST choose result: %d",
2220 (int) out.resultType);
2221 break;
2222 }
2223 }
2224 } /* end loop */
2225
2226 /*
2227 * Release any buffers we're still holding. Beware of possibility that
2228 * current and parent reference same buffer.
2229 */
2230 if (current.buffer != InvalidBuffer)
2231 {
2232 SpGistSetLastUsedPage(index, current.buffer);
2233 UnlockReleaseBuffer(current.buffer);
2234 }
2235 if (parent.buffer != InvalidBuffer &&
2236 parent.buffer != current.buffer)
2237 {
2238 SpGistSetLastUsedPage(index, parent.buffer);
2239 UnlockReleaseBuffer(parent.buffer);
2240 }
2241
2242 /*
2243 * We do not support being called while some outer function is holding a
2244 * buffer lock (or any other reason to postpone query cancels). If that
2245 * were the case, telling the caller to retry would create an infinite
2246 * loop.
2247 */
2248 Assert(INTERRUPTS_CAN_BE_PROCESSED());
2249
2250 /*
2251 * Finally, check for interrupts again. If there was a query cancel,
2252 * ProcessInterrupts() will be able to throw the error here. If it was
2253 * some other kind of interrupt that can just be cleared, return false to
2254 * tell our caller to retry.
2255 */
2256 CHECK_FOR_INTERRUPTS();
2257
2258 return result;
2259 }
2260