1 /*-------------------------------------------------------------------------
2 *
3 * spgdoinsert.c
4 * implementation of insert algorithm
5 *
6 *
7 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgdoinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/bufmgr.h"
24 #include "utils/rel.h"
25
26
27 /*
28 * SPPageDesc tracks all info about a page we are inserting into. In some
29 * situations it actually identifies a tuple, or even a specific node within
30 * an inner tuple. But any of the fields can be invalid. If the buffer
31 * field is valid, it implies we hold pin and exclusive lock on that buffer.
32 * page pointer should be valid exactly when buffer is.
33 */
34 typedef struct SPPageDesc
35 {
36 BlockNumber blkno; /* block number, or InvalidBlockNumber */
37 Buffer buffer; /* page's buffer number, or InvalidBuffer */
38 Page page; /* pointer to page buffer, or NULL */
39 OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
40 int node; /* node number within inner tuple, or -1 */
41 } SPPageDesc;
42
43
44 /*
45 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
46 * is used to update the parent inner tuple's downlink after a move or
47 * split operation.
48 */
49 void
spgUpdateNodeLink(SpGistInnerTuple tup,int nodeN,BlockNumber blkno,OffsetNumber offset)50 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
51 BlockNumber blkno, OffsetNumber offset)
52 {
53 int i;
54 SpGistNodeTuple node;
55
56 SGITITERATE(tup, i, node)
57 {
58 if (i == nodeN)
59 {
60 ItemPointerSet(&node->t_tid, blkno, offset);
61 return;
62 }
63 }
64
65 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66 nodeN);
67 }
68
69 /*
70 * Form a new inner tuple containing one more node than the given one, with
71 * the specified label datum, inserted at offset "offset" in the node array.
72 * The new tuple's prefix is the same as the old one's.
73 *
74 * Note that the new node initially has an invalid downlink. We'll find a
75 * page to point it to later.
76 */
77 static SpGistInnerTuple
addNode(SpGistState * state,SpGistInnerTuple tuple,Datum label,int offset)78 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
79 {
80 SpGistNodeTuple node,
81 *nodes;
82 int i;
83
84 /* if offset is negative, insert at end */
85 if (offset < 0)
86 offset = tuple->nNodes;
87 else if (offset > tuple->nNodes)
88 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89
90 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91 SGITITERATE(tuple, i, node)
92 {
93 if (i < offset)
94 nodes[i] = node;
95 else
96 nodes[i + 1] = node;
97 }
98
99 nodes[offset] = spgFormNodeTuple(state, label, false);
100
101 return spgFormInnerTuple(state,
102 (tuple->prefixSize > 0),
103 SGITDATUM(tuple, state),
104 tuple->nNodes + 1,
105 nodes);
106 }
107
108 /* qsort comparator for sorting OffsetNumbers */
109 static int
cmpOffsetNumbers(const void * a,const void * b)110 cmpOffsetNumbers(const void *a, const void *b)
111 {
112 if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113 return 0;
114 return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
116
117 /*
118 * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 *
120 * The first tuple in the given list is replaced with a dead tuple of type
121 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 * with dead tuples of type "reststate". If either firststate or reststate
123 * is REDIRECT, blkno/offnum specify where to link to.
124 *
125 * NB: this is used during WAL replay, so beware of trying to make it too
126 * smart. In particular, it shouldn't use "state" except for calling
127 * spgFormDeadTuple(). This is also used in a critical section, so no
128 * pallocs either!
129 */
130 void
spgPageIndexMultiDelete(SpGistState * state,Page page,OffsetNumber * itemnos,int nitems,int firststate,int reststate,BlockNumber blkno,OffsetNumber offnum)131 spgPageIndexMultiDelete(SpGistState *state, Page page,
132 OffsetNumber *itemnos, int nitems,
133 int firststate, int reststate,
134 BlockNumber blkno, OffsetNumber offnum)
135 {
136 OffsetNumber firstItem;
137 OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 SpGistDeadTuple tuple = NULL;
139 int i;
140
141 if (nitems == 0)
142 return; /* nothing to do */
143
144 /*
145 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 * targets to be listed in sorted order, so we have to sort the itemnos
147 * array. (This also greatly simplifies the math for reinserting the
148 * replacement tuples.) However, we must not scribble on the caller's
149 * array, so we have to make a copy.
150 */
151 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 if (nitems > 1)
153 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154
155 PageIndexMultiDelete(page, sortednos, nitems);
156
157 firstItem = itemnos[0];
158
159 for (i = 0; i < nitems; i++)
160 {
161 OffsetNumber itemno = sortednos[i];
162 int tupstate;
163
164 tupstate = (itemno == firstItem) ? firststate : reststate;
165 if (tuple == NULL || tuple->tupstate != tupstate)
166 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167
168 if (PageAddItem(page, (Item) tuple, tuple->size,
169 itemno, false, false) != itemno)
170 elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 tuple->size);
172
173 if (tupstate == SPGIST_REDIRECT)
174 SpGistPageGetOpaque(page)->nRedirection++;
175 else if (tupstate == SPGIST_PLACEHOLDER)
176 SpGistPageGetOpaque(page)->nPlaceholder++;
177 }
178 }
179
180 /*
181 * Update the parent inner tuple's downlink, and mark the parent buffer
182 * dirty (this must be the last change to the parent page in the current
183 * WAL action).
184 */
185 static void
saveNodeLink(Relation index,SPPageDesc * parent,BlockNumber blkno,OffsetNumber offnum)186 saveNodeLink(Relation index, SPPageDesc *parent,
187 BlockNumber blkno, OffsetNumber offnum)
188 {
189 SpGistInnerTuple innerTuple;
190
191 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 PageGetItemId(parent->page, parent->offnum));
193
194 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195
196 MarkBufferDirty(parent->buffer);
197 }
198
199 /*
200 * Add a leaf tuple to a leaf page where there is known to be room for it
201 */
202 static void
addLeafTuple(Relation index,SpGistState * state,SpGistLeafTuple leafTuple,SPPageDesc * current,SPPageDesc * parent,bool isNulls,bool isNew)203 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 {
206 spgxlogAddLeaf xlrec;
207
208 xlrec.newPage = isNew;
209 xlrec.storesNulls = isNulls;
210
211 /* these will be filled below as needed */
212 xlrec.offnumLeaf = InvalidOffsetNumber;
213 xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 xlrec.offnumParent = InvalidOffsetNumber;
215 xlrec.nodeI = 0;
216
217 START_CRIT_SECTION();
218
219 if (current->offnum == InvalidOffsetNumber ||
220 SpGistBlockIsRoot(current->blkno))
221 {
222 /* Tuple is not part of a chain */
223 leafTuple->nextOffset = InvalidOffsetNumber;
224 current->offnum = SpGistPageAddNewItem(state, current->page,
225 (Item) leafTuple, leafTuple->size,
226 NULL, false);
227
228 xlrec.offnumLeaf = current->offnum;
229
230 /* Must update parent's downlink if any */
231 if (parent->buffer != InvalidBuffer)
232 {
233 xlrec.offnumParent = parent->offnum;
234 xlrec.nodeI = parent->node;
235
236 saveNodeLink(index, parent, current->blkno, current->offnum);
237 }
238 }
239 else
240 {
241 /*
242 * Tuple must be inserted into existing chain. We mustn't change the
243 * chain's head address, but we don't need to chase the entire chain
244 * to put the tuple at the end; we can insert it second.
245 *
246 * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 * in which case we should replace the DEAD tuple in-place.
248 */
249 SpGistLeafTuple head;
250 OffsetNumber offnum;
251
252 head = (SpGistLeafTuple) PageGetItem(current->page,
253 PageGetItemId(current->page, current->offnum));
254 if (head->tupstate == SPGIST_LIVE)
255 {
256 leafTuple->nextOffset = head->nextOffset;
257 offnum = SpGistPageAddNewItem(state, current->page,
258 (Item) leafTuple, leafTuple->size,
259 NULL, false);
260
261 /*
262 * re-get head of list because it could have been moved on page,
263 * and set new second element
264 */
265 head = (SpGistLeafTuple) PageGetItem(current->page,
266 PageGetItemId(current->page, current->offnum));
267 head->nextOffset = offnum;
268
269 xlrec.offnumLeaf = offnum;
270 xlrec.offnumHeadLeaf = current->offnum;
271 }
272 else if (head->tupstate == SPGIST_DEAD)
273 {
274 leafTuple->nextOffset = InvalidOffsetNumber;
275 PageIndexTupleDelete(current->page, current->offnum);
276 if (PageAddItem(current->page,
277 (Item) leafTuple, leafTuple->size,
278 current->offnum, false, false) != current->offnum)
279 elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 leafTuple->size);
281
282 /* WAL replay distinguishes this case by equal offnums */
283 xlrec.offnumLeaf = current->offnum;
284 xlrec.offnumHeadLeaf = current->offnum;
285 }
286 else
287 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 }
289
290 MarkBufferDirty(current->buffer);
291
292 if (RelationNeedsWAL(index))
293 {
294 XLogRecPtr recptr;
295 int flags;
296
297 XLogBeginInsert();
298 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 XLogRegisterData((char *) leafTuple, leafTuple->size);
300
301 flags = REGBUF_STANDARD;
302 if (xlrec.newPage)
303 flags |= REGBUF_WILL_INIT;
304 XLogRegisterBuffer(0, current->buffer, flags);
305 if (xlrec.offnumParent != InvalidOffsetNumber)
306 XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307
308 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309
310 PageSetLSN(current->page, recptr);
311
312 /* update parent only if we actually changed it */
313 if (xlrec.offnumParent != InvalidOffsetNumber)
314 {
315 PageSetLSN(parent->page, recptr);
316 }
317 }
318
319 END_CRIT_SECTION();
320 }
321
322 /*
323 * Count the number and total size of leaf tuples in the chain starting at
324 * current->offnum. Return number into *nToSplit and total size as function
325 * result.
326 *
327 * Klugy special case when considering the root page (i.e., root is a leaf
328 * page, but we're about to split for the first time): return fake large
329 * values to force spgdoinsert() to take the doPickSplit rather than
330 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331 */
332 static int
checkSplitConditions(Relation index,SpGistState * state,SPPageDesc * current,int * nToSplit)333 checkSplitConditions(Relation index, SpGistState *state,
334 SPPageDesc *current, int *nToSplit)
335 {
336 int i,
337 n = 0,
338 totalSize = 0;
339
340 if (SpGistBlockIsRoot(current->blkno))
341 {
342 /* return impossible values to force split */
343 *nToSplit = BLCKSZ;
344 return BLCKSZ;
345 }
346
347 i = current->offnum;
348 while (i != InvalidOffsetNumber)
349 {
350 SpGistLeafTuple it;
351
352 Assert(i >= FirstOffsetNumber &&
353 i <= PageGetMaxOffsetNumber(current->page));
354 it = (SpGistLeafTuple) PageGetItem(current->page,
355 PageGetItemId(current->page, i));
356 if (it->tupstate == SPGIST_LIVE)
357 {
358 n++;
359 totalSize += it->size + sizeof(ItemIdData);
360 }
361 else if (it->tupstate == SPGIST_DEAD)
362 {
363 /* We could see a DEAD tuple as first/only chain item */
364 Assert(i == current->offnum);
365 Assert(it->nextOffset == InvalidOffsetNumber);
366 /* Don't count it in result, because it won't go to other page */
367 }
368 else
369 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370
371 i = it->nextOffset;
372 }
373
374 *nToSplit = n;
375
376 return totalSize;
377 }
378
379 /*
380 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 * but the chain has to be moved because there's not enough room to add
382 * newLeafTuple to its page. We use this method when the chain contains
383 * very little data so a split would be inefficient. We are sure we can
384 * fit the chain plus newLeafTuple on one other page.
385 */
386 static void
moveLeafs(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,bool isNulls)387 moveLeafs(Relation index, SpGistState *state,
388 SPPageDesc *current, SPPageDesc *parent,
389 SpGistLeafTuple newLeafTuple, bool isNulls)
390 {
391 int i,
392 nDelete,
393 nInsert,
394 size;
395 Buffer nbuf;
396 Page npage;
397 SpGistLeafTuple it;
398 OffsetNumber r = InvalidOffsetNumber,
399 startOffset = InvalidOffsetNumber;
400 bool replaceDead = false;
401 OffsetNumber *toDelete;
402 OffsetNumber *toInsert;
403 BlockNumber nblkno;
404 spgxlogMoveLeafs xlrec;
405 char *leafdata,
406 *leafptr;
407
408 /* This doesn't work on root page */
409 Assert(parent->buffer != InvalidBuffer);
410 Assert(parent->buffer != current->buffer);
411
412 /* Locate the tuples to be moved, and count up the space needed */
413 i = PageGetMaxOffsetNumber(current->page);
414 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416
417 size = newLeafTuple->size + sizeof(ItemIdData);
418
419 nDelete = 0;
420 i = current->offnum;
421 while (i != InvalidOffsetNumber)
422 {
423 SpGistLeafTuple it;
424
425 Assert(i >= FirstOffsetNumber &&
426 i <= PageGetMaxOffsetNumber(current->page));
427 it = (SpGistLeafTuple) PageGetItem(current->page,
428 PageGetItemId(current->page, i));
429
430 if (it->tupstate == SPGIST_LIVE)
431 {
432 toDelete[nDelete] = i;
433 size += it->size + sizeof(ItemIdData);
434 nDelete++;
435 }
436 else if (it->tupstate == SPGIST_DEAD)
437 {
438 /* We could see a DEAD tuple as first/only chain item */
439 Assert(i == current->offnum);
440 Assert(it->nextOffset == InvalidOffsetNumber);
441 /* We don't want to move it, so don't count it in size */
442 toDelete[nDelete] = i;
443 nDelete++;
444 replaceDead = true;
445 }
446 else
447 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448
449 i = it->nextOffset;
450 }
451
452 /* Find a leaf page that will hold them */
453 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454 size, &xlrec.newPage);
455 npage = BufferGetPage(nbuf);
456 nblkno = BufferGetBlockNumber(nbuf);
457 Assert(nblkno != current->blkno);
458
459 leafdata = leafptr = palloc(size);
460
461 START_CRIT_SECTION();
462
463 /* copy all the old tuples to new page, unless they're dead */
464 nInsert = 0;
465 if (!replaceDead)
466 {
467 for (i = 0; i < nDelete; i++)
468 {
469 it = (SpGistLeafTuple) PageGetItem(current->page,
470 PageGetItemId(current->page, toDelete[i]));
471 Assert(it->tupstate == SPGIST_LIVE);
472
473 /*
474 * Update chain link (notice the chain order gets reversed, but we
475 * don't care). We're modifying the tuple on the source page
476 * here, but it's okay since we're about to delete it.
477 */
478 it->nextOffset = r;
479
480 r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481 &startOffset, false);
482
483 toInsert[nInsert] = r;
484 nInsert++;
485
486 /* save modified tuple into leafdata as well */
487 memcpy(leafptr, it, it->size);
488 leafptr += it->size;
489 }
490 }
491
492 /* add the new tuple as well */
493 newLeafTuple->nextOffset = r;
494 r = SpGistPageAddNewItem(state, npage,
495 (Item) newLeafTuple, newLeafTuple->size,
496 &startOffset, false);
497 toInsert[nInsert] = r;
498 nInsert++;
499 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500 leafptr += newLeafTuple->size;
501
502 /*
503 * Now delete the old tuples, leaving a redirection pointer behind for the
504 * first one, unless we're doing an index build; in which case there can't
505 * be any concurrent scan so we need not provide a redirect.
506 */
507 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
508 state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
509 SPGIST_PLACEHOLDER,
510 nblkno, r);
511
512 /* Update parent's downlink and mark parent page dirty */
513 saveNodeLink(index, parent, nblkno, r);
514
515 /* Mark the leaf pages too */
516 MarkBufferDirty(current->buffer);
517 MarkBufferDirty(nbuf);
518
519 if (RelationNeedsWAL(index))
520 {
521 XLogRecPtr recptr;
522
523 /* prepare WAL info */
524 STORE_STATE(state, xlrec.stateSrc);
525
526 xlrec.nMoves = nDelete;
527 xlrec.replaceDead = replaceDead;
528 xlrec.storesNulls = isNulls;
529
530 xlrec.offnumParent = parent->offnum;
531 xlrec.nodeI = parent->node;
532
533 XLogBeginInsert();
534 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535 XLogRegisterData((char *) toDelete,
536 sizeof(OffsetNumber) * nDelete);
537 XLogRegisterData((char *) toInsert,
538 sizeof(OffsetNumber) * nInsert);
539 XLogRegisterData((char *) leafdata, leafptr - leafdata);
540
541 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
542 XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
543 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
544
545 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546
547 PageSetLSN(current->page, recptr);
548 PageSetLSN(npage, recptr);
549 PageSetLSN(parent->page, recptr);
550 }
551
552 END_CRIT_SECTION();
553
554 /* Update local free-space cache and release new buffer */
555 SpGistSetLastUsedPage(index, nbuf);
556 UnlockReleaseBuffer(nbuf);
557 }
558
559 /*
560 * Update previously-created redirection tuple with appropriate destination
561 *
562 * We use this when it's not convenient to know the destination first.
563 * The tuple should have been made with the "impossible" destination of
564 * the metapage.
565 */
566 static void
setRedirectionTuple(SPPageDesc * current,OffsetNumber position,BlockNumber blkno,OffsetNumber offnum)567 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
568 BlockNumber blkno, OffsetNumber offnum)
569 {
570 SpGistDeadTuple dt;
571
572 dt = (SpGistDeadTuple) PageGetItem(current->page,
573 PageGetItemId(current->page, position));
574 Assert(dt->tupstate == SPGIST_REDIRECT);
575 Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
576 ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
578
579 /*
580 * Test to see if the user-defined picksplit function failed to do its job,
581 * ie, it put all the leaf tuples into the same node.
582 * If so, randomly divide the tuples into several nodes (all with the same
583 * label) and return TRUE to select allTheSame mode for this inner tuple.
584 *
585 * (This code is also used to forcibly select allTheSame mode for nulls.)
586 *
587 * If we know that the leaf tuples wouldn't all fit on one page, then we
588 * exclude the last tuple (which is the incoming new tuple that forced a split)
589 * from the check to see if more than one node is used. The reason for this
590 * is that if the existing tuples are put into only one chain, then even if
591 * we move them all to an empty page, there would still not be room for the
592 * new tuple, so we'd get into an infinite loop of picksplit attempts.
593 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594 * be split across pages. (Exercise for the reader: figure out why this
595 * fixes the problem even when there is only one old tuple.)
596 */
597 static bool
checkAllTheSame(spgPickSplitIn * in,spgPickSplitOut * out,bool tooBig,bool * includeNew)598 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599 bool *includeNew)
600 {
601 int theNode;
602 int limit;
603 int i;
604
605 /* For the moment, assume we can include the new leaf tuple */
606 *includeNew = true;
607
608 /* If there's only the new leaf tuple, don't select allTheSame mode */
609 if (in->nTuples <= 1)
610 return false;
611
612 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613 limit = tooBig ? in->nTuples - 1 : in->nTuples;
614
615 /* Check to see if more than one node is populated */
616 theNode = out->mapTuplesToNodes[0];
617 for (i = 1; i < limit; i++)
618 {
619 if (out->mapTuplesToNodes[i] != theNode)
620 return false;
621 }
622
623 /* Nope, so override the picksplit function's decisions */
624
625 /* If the new tuple is in its own node, it can't be included in split */
626 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627 *includeNew = false;
628
629 out->nNodes = 8; /* arbitrary number of child nodes */
630
631 /* Random assignment of tuples to nodes (note we include new tuple) */
632 for (i = 0; i < in->nTuples; i++)
633 out->mapTuplesToNodes[i] = i % out->nNodes;
634
635 /* The opclass may not use node labels, but if it does, duplicate 'em */
636 if (out->nodeLabels)
637 {
638 Datum theLabel = out->nodeLabels[theNode];
639
640 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641 for (i = 0; i < out->nNodes; i++)
642 out->nodeLabels[i] = theLabel;
643 }
644
645 /* We don't touch the prefix or the leaf tuple datum assignments */
646
647 return true;
648 }
649
650 /*
651 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652 * but the chain has to be split because there's not enough room to add
653 * newLeafTuple to its page.
654 *
655 * This function splits the leaf tuple set according to picksplit's rules,
656 * creating one or more new chains that are spread across the current page
657 * and an additional leaf page (we assume that two leaf pages will be
658 * sufficient). A new inner tuple is created, and the parent downlink
659 * pointer is updated to point to that inner tuple instead of the leaf chain.
660 *
661 * On exit, current contains the address of the new inner tuple.
662 *
663 * Returns true if we successfully inserted newLeafTuple during this function,
664 * false if caller still has to do it (meaning another picksplit operation is
665 * probably needed). Failure could occur if the picksplit result is fairly
666 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667 * Because we force the picksplit result to be at least two chains, each
668 * cycle will get rid of at least one leaf tuple from the chain, so the loop
669 * will eventually terminate if lack of balance is the issue. If the tuple
670 * is too big, we assume that repeated picksplit operations will eventually
671 * make it small enough by repeated prefix-stripping. A broken opclass could
672 * make this an infinite loop, though, so spgdoinsert() checks that the
673 * leaf datums get smaller each time.
674 */
675 static bool
doPickSplit(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,int level,bool isNulls,bool isNew)676 doPickSplit(Relation index, SpGistState *state,
677 SPPageDesc *current, SPPageDesc *parent,
678 SpGistLeafTuple newLeafTuple,
679 int level, bool isNulls, bool isNew)
680 {
681 bool insertedNew = false;
682 spgPickSplitIn in;
683 spgPickSplitOut out;
684 FmgrInfo *procinfo;
685 bool includeNew;
686 int i,
687 max,
688 n;
689 SpGistInnerTuple innerTuple;
690 SpGistNodeTuple node,
691 *nodes;
692 Buffer newInnerBuffer,
693 newLeafBuffer;
694 ItemPointerData *heapPtrs;
695 uint8 *leafPageSelect;
696 int *leafSizes;
697 OffsetNumber *toDelete;
698 OffsetNumber *toInsert;
699 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700 OffsetNumber startOffsets[2];
701 SpGistLeafTuple *newLeafs;
702 int spaceToDelete;
703 int currentFreeSpace;
704 int totalLeafSizes;
705 bool allTheSame;
706 spgxlogPickSplit xlrec;
707 char *leafdata,
708 *leafptr;
709 SPPageDesc saveCurrent;
710 int nToDelete,
711 nToInsert,
712 maxToInclude;
713
714 in.level = level;
715
716 /*
717 * Allocate per-leaf-tuple work arrays with max possible size
718 */
719 max = PageGetMaxOffsetNumber(current->page);
720 n = max + 1;
721 in.datums = (Datum *) palloc(sizeof(Datum) * n);
722 heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
723 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
726 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
727
728 STORE_STATE(state, xlrec.stateSrc);
729
730 /*
731 * Form list of leaf tuples which will be distributed as split result;
732 * also, count up the amount of space that will be freed from current.
733 * (Note that in the non-root case, we won't actually delete the old
734 * tuples, only replace them with redirects or placeholders.)
735 *
736 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
737 * page. For a pass-by-value data type we will fetch a word that must
738 * exist even though it may contain garbage (because of the fact that leaf
739 * tuples must have size at least SGDTSIZE). For a pass-by-reference type
740 * we are just computing a pointer that isn't going to get dereferenced.
741 * So it's not worth guarding the calls with isNulls checks.
742 */
743 nToInsert = 0;
744 nToDelete = 0;
745 spaceToDelete = 0;
746 if (SpGistBlockIsRoot(current->blkno))
747 {
748 /*
749 * We are splitting the root (which up to now is also a leaf page).
750 * Its tuples are not linked, so scan sequentially to get them all. We
751 * ignore the original value of current->offnum.
752 */
753 for (i = FirstOffsetNumber; i <= max; i++)
754 {
755 SpGistLeafTuple it;
756
757 it = (SpGistLeafTuple) PageGetItem(current->page,
758 PageGetItemId(current->page, i));
759 if (it->tupstate == SPGIST_LIVE)
760 {
761 in.datums[nToInsert] = SGLTDATUM(it, state);
762 heapPtrs[nToInsert] = it->heapPtr;
763 nToInsert++;
764 toDelete[nToDelete] = i;
765 nToDelete++;
766 /* we will delete the tuple altogether, so count full space */
767 spaceToDelete += it->size + sizeof(ItemIdData);
768 }
769 else /* tuples on root should be live */
770 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
771 }
772 }
773 else
774 {
775 /* Normal case, just collect the leaf tuples in the chain */
776 i = current->offnum;
777 while (i != InvalidOffsetNumber)
778 {
779 SpGistLeafTuple it;
780
781 Assert(i >= FirstOffsetNumber && i <= max);
782 it = (SpGistLeafTuple) PageGetItem(current->page,
783 PageGetItemId(current->page, i));
784 if (it->tupstate == SPGIST_LIVE)
785 {
786 in.datums[nToInsert] = SGLTDATUM(it, state);
787 heapPtrs[nToInsert] = it->heapPtr;
788 nToInsert++;
789 toDelete[nToDelete] = i;
790 nToDelete++;
791 /* we will not delete the tuple, only replace with dead */
792 Assert(it->size >= SGDTSIZE);
793 spaceToDelete += it->size - SGDTSIZE;
794 }
795 else if (it->tupstate == SPGIST_DEAD)
796 {
797 /* We could see a DEAD tuple as first/only chain item */
798 Assert(i == current->offnum);
799 Assert(it->nextOffset == InvalidOffsetNumber);
800 toDelete[nToDelete] = i;
801 nToDelete++;
802 /* replacing it with redirect will save no space */
803 }
804 else
805 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
806
807 i = it->nextOffset;
808 }
809 }
810 in.nTuples = nToInsert;
811
812 /*
813 * We may not actually insert new tuple because another picksplit may be
814 * necessary due to too large value, but we will try to allocate enough
815 * space to include it; and in any case it has to be included in the input
816 * for the picksplit function. So don't increment nToInsert yet.
817 */
818 in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
819 heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
820 in.nTuples++;
821
822 memset(&out, 0, sizeof(out));
823
824 if (!isNulls)
825 {
826 /*
827 * Perform split using user-defined method.
828 */
829 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
830 FunctionCall2Coll(procinfo,
831 index->rd_indcollation[0],
832 PointerGetDatum(&in),
833 PointerGetDatum(&out));
834
835 /*
836 * Form new leaf tuples and count up the total space needed.
837 */
838 totalLeafSizes = 0;
839 for (i = 0; i < in.nTuples; i++)
840 {
841 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
842 out.leafTupleDatums[i],
843 false);
844 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
845 }
846 }
847 else
848 {
849 /*
850 * Perform dummy split that puts all tuples into one node.
851 * checkAllTheSame will override this and force allTheSame mode.
852 */
853 out.hasPrefix = false;
854 out.nNodes = 1;
855 out.nodeLabels = NULL;
856 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
857
858 /*
859 * Form new leaf tuples and count up the total space needed.
860 */
861 totalLeafSizes = 0;
862 for (i = 0; i < in.nTuples; i++)
863 {
864 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
865 (Datum) 0,
866 true);
867 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
868 }
869 }
870
871 /*
872 * Check to see if the picksplit function failed to separate the values,
873 * ie, it put them all into the same child node. If so, select allTheSame
874 * mode and create a random split instead. See comments for
875 * checkAllTheSame as to why we need to know if the new leaf tuples could
876 * fit on one page.
877 */
878 allTheSame = checkAllTheSame(&in, &out,
879 totalLeafSizes > SPGIST_PAGE_CAPACITY,
880 &includeNew);
881
882 /*
883 * If checkAllTheSame decided we must exclude the new tuple, don't
884 * consider it any further.
885 */
886 if (includeNew)
887 maxToInclude = in.nTuples;
888 else
889 {
890 maxToInclude = in.nTuples - 1;
891 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
892 }
893
894 /*
895 * Allocate per-node work arrays. Since checkAllTheSame could replace
896 * out.nNodes with a value larger than the number of tuples on the input
897 * page, we can't allocate these arrays before here.
898 */
899 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
900 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
901
902 /*
903 * Form nodes of inner tuple and inner tuple itself
904 */
905 for (i = 0; i < out.nNodes; i++)
906 {
907 Datum label = (Datum) 0;
908 bool labelisnull = (out.nodeLabels == NULL);
909
910 if (!labelisnull)
911 label = out.nodeLabels[i];
912 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
913 }
914 innerTuple = spgFormInnerTuple(state,
915 out.hasPrefix, out.prefixDatum,
916 out.nNodes, nodes);
917 innerTuple->allTheSame = allTheSame;
918
919 /*
920 * Update nodes[] array to point into the newly formed innerTuple, so that
921 * we can adjust their downlinks below.
922 */
923 SGITITERATE(innerTuple, i, node)
924 {
925 nodes[i] = node;
926 }
927
928 /*
929 * Re-scan new leaf tuples and count up the space needed under each node.
930 */
931 for (i = 0; i < maxToInclude; i++)
932 {
933 n = out.mapTuplesToNodes[i];
934 if (n < 0 || n >= out.nNodes)
935 elog(ERROR, "inconsistent result of SPGiST picksplit function");
936 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
937 }
938
939 /*
940 * To perform the split, we must insert a new inner tuple, which can't go
941 * on a leaf page; and unless we are splitting the root page, we must then
942 * update the parent tuple's downlink to point to the inner tuple. If
943 * there is room, we'll put the new inner tuple on the same page as the
944 * parent tuple, otherwise we need another non-leaf buffer. But if the
945 * parent page is the root, we can't add the new inner tuple there,
946 * because the root page must have only one inner tuple.
947 */
948 xlrec.initInner = false;
949 if (parent->buffer != InvalidBuffer &&
950 !SpGistBlockIsRoot(parent->blkno) &&
951 (SpGistPageGetFreeSpace(parent->page, 1) >=
952 innerTuple->size + sizeof(ItemIdData)))
953 {
954 /* New inner tuple will fit on parent page */
955 newInnerBuffer = parent->buffer;
956 }
957 else if (parent->buffer != InvalidBuffer)
958 {
959 /* Send tuple to page with next triple parity (see README) */
960 newInnerBuffer = SpGistGetBuffer(index,
961 GBUF_INNER_PARITY(parent->blkno + 1) |
962 (isNulls ? GBUF_NULLS : 0),
963 innerTuple->size + sizeof(ItemIdData),
964 &xlrec.initInner);
965 }
966 else
967 {
968 /* Root page split ... inner tuple will go to root page */
969 newInnerBuffer = InvalidBuffer;
970 }
971
972 /*
973 * The new leaf tuples converted from the existing ones should require the
974 * same or less space, and therefore should all fit onto one page
975 * (although that's not necessarily the current page, since we can't
976 * delete the old tuples but only replace them with placeholders).
977 * However, the incoming new tuple might not also fit, in which case we
978 * might need another picksplit cycle to reduce it some more.
979 *
980 * If there's not room to put everything back onto the current page, then
981 * we decide on a per-node basis which tuples go to the new page. (We do
982 * it like that because leaf tuple chains can't cross pages, so we must
983 * place all leaf tuples belonging to the same parent node on the same
984 * page.)
985 *
986 * If we are splitting the root page (turning it from a leaf page into an
987 * inner page), then no leaf tuples can go back to the current page; they
988 * must all go somewhere else.
989 */
990 if (!SpGistBlockIsRoot(current->blkno))
991 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
992 else
993 currentFreeSpace = 0; /* prevent assigning any tuples to current */
994
995 xlrec.initDest = false;
996
997 if (totalLeafSizes <= currentFreeSpace)
998 {
999 /* All the leaf tuples will fit on current page */
1000 newLeafBuffer = InvalidBuffer;
1001 /* mark new leaf tuple as included in insertions, if allowed */
1002 if (includeNew)
1003 {
1004 nToInsert++;
1005 insertedNew = true;
1006 }
1007 for (i = 0; i < nToInsert; i++)
1008 leafPageSelect[i] = 0; /* signifies current page */
1009 }
1010 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1011 {
1012 /*
1013 * We're trying to split up a long value by repeated suffixing, but
1014 * it's not going to fit yet. Don't bother allocating a second leaf
1015 * buffer that we won't be able to use.
1016 */
1017 newLeafBuffer = InvalidBuffer;
1018 Assert(includeNew);
1019 Assert(nToInsert == 0);
1020 }
1021 else
1022 {
1023 /* We will need another leaf page */
1024 uint8 *nodePageSelect;
1025 int curspace;
1026 int newspace;
1027
1028 newLeafBuffer = SpGistGetBuffer(index,
1029 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1030 Min(totalLeafSizes,
1031 SPGIST_PAGE_CAPACITY),
1032 &xlrec.initDest);
1033
1034 /*
1035 * Attempt to assign node groups to the two pages. We might fail to
1036 * do so, even if totalLeafSizes is less than the available space,
1037 * because we can't split a group across pages.
1038 */
1039 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1040
1041 curspace = currentFreeSpace;
1042 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1043 for (i = 0; i < out.nNodes; i++)
1044 {
1045 if (leafSizes[i] <= curspace)
1046 {
1047 nodePageSelect[i] = 0; /* signifies current page */
1048 curspace -= leafSizes[i];
1049 }
1050 else
1051 {
1052 nodePageSelect[i] = 1; /* signifies new leaf page */
1053 newspace -= leafSizes[i];
1054 }
1055 }
1056 if (curspace >= 0 && newspace >= 0)
1057 {
1058 /* Successful assignment, so we can include the new leaf tuple */
1059 if (includeNew)
1060 {
1061 nToInsert++;
1062 insertedNew = true;
1063 }
1064 }
1065 else if (includeNew)
1066 {
1067 /* We must exclude the new leaf tuple from the split */
1068 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1069
1070 leafSizes[nodeOfNewTuple] -=
1071 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1072
1073 /* Repeat the node assignment process --- should succeed now */
1074 curspace = currentFreeSpace;
1075 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1076 for (i = 0; i < out.nNodes; i++)
1077 {
1078 if (leafSizes[i] <= curspace)
1079 {
1080 nodePageSelect[i] = 0; /* signifies current page */
1081 curspace -= leafSizes[i];
1082 }
1083 else
1084 {
1085 nodePageSelect[i] = 1; /* signifies new leaf page */
1086 newspace -= leafSizes[i];
1087 }
1088 }
1089 if (curspace < 0 || newspace < 0)
1090 elog(ERROR, "failed to divide leaf tuple groups across pages");
1091 }
1092 else
1093 {
1094 /* oops, we already excluded new tuple ... should not get here */
1095 elog(ERROR, "failed to divide leaf tuple groups across pages");
1096 }
1097 /* Expand the per-node assignments to be shown per leaf tuple */
1098 for (i = 0; i < nToInsert; i++)
1099 {
1100 n = out.mapTuplesToNodes[i];
1101 leafPageSelect[i] = nodePageSelect[n];
1102 }
1103 }
1104
1105 /* Start preparing WAL record */
1106 xlrec.nDelete = 0;
1107 xlrec.initSrc = isNew;
1108 xlrec.storesNulls = isNulls;
1109 xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1110
1111 leafdata = leafptr = (char *) palloc(totalLeafSizes);
1112
1113 /* Here we begin making the changes to the target pages */
1114 START_CRIT_SECTION();
1115
1116 /*
1117 * Delete old leaf tuples from current buffer, except when we're splitting
1118 * the root; in that case there's no need because we'll re-init the page
1119 * below. We do this first to make room for reinserting new leaf tuples.
1120 */
1121 if (!SpGistBlockIsRoot(current->blkno))
1122 {
1123 /*
1124 * Init buffer instead of deleting individual tuples, but only if
1125 * there aren't any other live tuples and only during build; otherwise
1126 * we need to set a redirection tuple for concurrent scans.
1127 */
1128 if (state->isBuild &&
1129 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1130 PageGetMaxOffsetNumber(current->page))
1131 {
1132 SpGistInitBuffer(current->buffer,
1133 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1134 xlrec.initSrc = true;
1135 }
1136 else if (isNew)
1137 {
1138 /* don't expose the freshly init'd buffer as a backup block */
1139 Assert(nToDelete == 0);
1140 }
1141 else
1142 {
1143 xlrec.nDelete = nToDelete;
1144
1145 if (!state->isBuild)
1146 {
1147 /*
1148 * Need to create redirect tuple (it will point to new inner
1149 * tuple) but right now the new tuple's location is not known
1150 * yet. So, set the redirection pointer to "impossible" value
1151 * and remember its position to update tuple later.
1152 */
1153 if (nToDelete > 0)
1154 redirectTuplePos = toDelete[0];
1155 spgPageIndexMultiDelete(state, current->page,
1156 toDelete, nToDelete,
1157 SPGIST_REDIRECT,
1158 SPGIST_PLACEHOLDER,
1159 SPGIST_METAPAGE_BLKNO,
1160 FirstOffsetNumber);
1161 }
1162 else
1163 {
1164 /*
1165 * During index build there is not concurrent searches, so we
1166 * don't need to create redirection tuple.
1167 */
1168 spgPageIndexMultiDelete(state, current->page,
1169 toDelete, nToDelete,
1170 SPGIST_PLACEHOLDER,
1171 SPGIST_PLACEHOLDER,
1172 InvalidBlockNumber,
1173 InvalidOffsetNumber);
1174 }
1175 }
1176 }
1177
1178 /*
1179 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1180 * nodes.
1181 */
1182 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1183 for (i = 0; i < nToInsert; i++)
1184 {
1185 SpGistLeafTuple it = newLeafs[i];
1186 Buffer leafBuffer;
1187 BlockNumber leafBlock;
1188 OffsetNumber newoffset;
1189
1190 /* Which page is it going to? */
1191 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1192 leafBlock = BufferGetBlockNumber(leafBuffer);
1193
1194 /* Link tuple into correct chain for its node */
1195 n = out.mapTuplesToNodes[i];
1196
1197 if (ItemPointerIsValid(&nodes[n]->t_tid))
1198 {
1199 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1200 it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1201 }
1202 else
1203 it->nextOffset = InvalidOffsetNumber;
1204
1205 /* Insert it on page */
1206 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1207 (Item) it, it->size,
1208 &startOffsets[leafPageSelect[i]],
1209 false);
1210 toInsert[i] = newoffset;
1211
1212 /* ... and complete the chain linking */
1213 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1214
1215 /* Also copy leaf tuple into WAL data */
1216 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1217 leafptr += newLeafs[i]->size;
1218 }
1219
1220 /*
1221 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1222 * current->buffer will be marked below, after we're entirely done
1223 * modifying it.
1224 */
1225 if (newLeafBuffer != InvalidBuffer)
1226 {
1227 MarkBufferDirty(newLeafBuffer);
1228 }
1229
1230 /* Remember current buffer, since we're about to change "current" */
1231 saveCurrent = *current;
1232
1233 /*
1234 * Store the new innerTuple
1235 */
1236 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1237 {
1238 /*
1239 * new inner tuple goes to parent page
1240 */
1241 Assert(current->buffer != parent->buffer);
1242
1243 /* Repoint "current" at the new inner tuple */
1244 current->blkno = parent->blkno;
1245 current->buffer = parent->buffer;
1246 current->page = parent->page;
1247 xlrec.offnumInner = current->offnum =
1248 SpGistPageAddNewItem(state, current->page,
1249 (Item) innerTuple, innerTuple->size,
1250 NULL, false);
1251
1252 /*
1253 * Update parent node link and mark parent page dirty
1254 */
1255 xlrec.innerIsParent = true;
1256 xlrec.offnumParent = parent->offnum;
1257 xlrec.nodeI = parent->node;
1258 saveNodeLink(index, parent, current->blkno, current->offnum);
1259
1260 /*
1261 * Update redirection link (in old current buffer)
1262 */
1263 if (redirectTuplePos != InvalidOffsetNumber)
1264 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1265 current->blkno, current->offnum);
1266
1267 /* Done modifying old current buffer, mark it dirty */
1268 MarkBufferDirty(saveCurrent.buffer);
1269 }
1270 else if (parent->buffer != InvalidBuffer)
1271 {
1272 /*
1273 * new inner tuple will be stored on a new page
1274 */
1275 Assert(newInnerBuffer != InvalidBuffer);
1276
1277 /* Repoint "current" at the new inner tuple */
1278 current->buffer = newInnerBuffer;
1279 current->blkno = BufferGetBlockNumber(current->buffer);
1280 current->page = BufferGetPage(current->buffer);
1281 xlrec.offnumInner = current->offnum =
1282 SpGistPageAddNewItem(state, current->page,
1283 (Item) innerTuple, innerTuple->size,
1284 NULL, false);
1285
1286 /* Done modifying new current buffer, mark it dirty */
1287 MarkBufferDirty(current->buffer);
1288
1289 /*
1290 * Update parent node link and mark parent page dirty
1291 */
1292 xlrec.innerIsParent = (parent->buffer == current->buffer);
1293 xlrec.offnumParent = parent->offnum;
1294 xlrec.nodeI = parent->node;
1295 saveNodeLink(index, parent, current->blkno, current->offnum);
1296
1297 /*
1298 * Update redirection link (in old current buffer)
1299 */
1300 if (redirectTuplePos != InvalidOffsetNumber)
1301 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1302 current->blkno, current->offnum);
1303
1304 /* Done modifying old current buffer, mark it dirty */
1305 MarkBufferDirty(saveCurrent.buffer);
1306 }
1307 else
1308 {
1309 /*
1310 * Splitting root page, which was a leaf but now becomes inner page
1311 * (and so "current" continues to point at it)
1312 */
1313 Assert(SpGistBlockIsRoot(current->blkno));
1314 Assert(redirectTuplePos == InvalidOffsetNumber);
1315
1316 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1317 xlrec.initInner = true;
1318 xlrec.innerIsParent = false;
1319
1320 xlrec.offnumInner = current->offnum =
1321 PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1322 InvalidOffsetNumber, false, false);
1323 if (current->offnum != FirstOffsetNumber)
1324 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1325 innerTuple->size);
1326
1327 /* No parent link to update, nor redirection to do */
1328 xlrec.offnumParent = InvalidOffsetNumber;
1329 xlrec.nodeI = 0;
1330
1331 /* Done modifying new current buffer, mark it dirty */
1332 MarkBufferDirty(current->buffer);
1333
1334 /* saveCurrent doesn't represent a different buffer */
1335 saveCurrent.buffer = InvalidBuffer;
1336 }
1337
1338 if (RelationNeedsWAL(index))
1339 {
1340 XLogRecPtr recptr;
1341 int flags;
1342
1343 XLogBeginInsert();
1344
1345 xlrec.nInsert = nToInsert;
1346 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1347
1348 XLogRegisterData((char *) toDelete,
1349 sizeof(OffsetNumber) * xlrec.nDelete);
1350 XLogRegisterData((char *) toInsert,
1351 sizeof(OffsetNumber) * xlrec.nInsert);
1352 XLogRegisterData((char *) leafPageSelect,
1353 sizeof(uint8) * xlrec.nInsert);
1354 XLogRegisterData((char *) innerTuple, innerTuple->size);
1355 XLogRegisterData(leafdata, leafptr - leafdata);
1356
1357 /* Old leaf page */
1358 if (BufferIsValid(saveCurrent.buffer))
1359 {
1360 flags = REGBUF_STANDARD;
1361 if (xlrec.initSrc)
1362 flags |= REGBUF_WILL_INIT;
1363 XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1364 }
1365
1366 /* New leaf page */
1367 if (BufferIsValid(newLeafBuffer))
1368 {
1369 flags = REGBUF_STANDARD;
1370 if (xlrec.initDest)
1371 flags |= REGBUF_WILL_INIT;
1372 XLogRegisterBuffer(1, newLeafBuffer, flags);
1373 }
1374
1375 /* Inner page */
1376 flags = REGBUF_STANDARD;
1377 if (xlrec.initInner)
1378 flags |= REGBUF_WILL_INIT;
1379 XLogRegisterBuffer(2, current->buffer, flags);
1380
1381 /* Parent page, if different from inner page */
1382 if (parent->buffer != InvalidBuffer)
1383 {
1384 if (parent->buffer != current->buffer)
1385 XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1386 else
1387 Assert(xlrec.innerIsParent);
1388 }
1389
1390 /* Issue the WAL record */
1391 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1392
1393 /* Update page LSNs on all affected pages */
1394 if (newLeafBuffer != InvalidBuffer)
1395 {
1396 Page page = BufferGetPage(newLeafBuffer);
1397
1398 PageSetLSN(page, recptr);
1399 }
1400
1401 if (saveCurrent.buffer != InvalidBuffer)
1402 {
1403 Page page = BufferGetPage(saveCurrent.buffer);
1404
1405 PageSetLSN(page, recptr);
1406 }
1407
1408 PageSetLSN(current->page, recptr);
1409
1410 if (parent->buffer != InvalidBuffer)
1411 {
1412 PageSetLSN(parent->page, recptr);
1413 }
1414 }
1415
1416 END_CRIT_SECTION();
1417
1418 /* Update local free-space cache and unlock buffers */
1419 if (newLeafBuffer != InvalidBuffer)
1420 {
1421 SpGistSetLastUsedPage(index, newLeafBuffer);
1422 UnlockReleaseBuffer(newLeafBuffer);
1423 }
1424 if (saveCurrent.buffer != InvalidBuffer)
1425 {
1426 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1427 UnlockReleaseBuffer(saveCurrent.buffer);
1428 }
1429
1430 return insertedNew;
1431 }
1432
1433 /*
1434 * spgMatchNode action: descend to N'th child node of current inner tuple
1435 */
1436 static void
spgMatchNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN)1437 spgMatchNodeAction(Relation index, SpGistState *state,
1438 SpGistInnerTuple innerTuple,
1439 SPPageDesc *current, SPPageDesc *parent, int nodeN)
1440 {
1441 int i;
1442 SpGistNodeTuple node;
1443
1444 /* Release previous parent buffer if any */
1445 if (parent->buffer != InvalidBuffer &&
1446 parent->buffer != current->buffer)
1447 {
1448 SpGistSetLastUsedPage(index, parent->buffer);
1449 UnlockReleaseBuffer(parent->buffer);
1450 }
1451
1452 /* Repoint parent to specified node of current inner tuple */
1453 parent->blkno = current->blkno;
1454 parent->buffer = current->buffer;
1455 parent->page = current->page;
1456 parent->offnum = current->offnum;
1457 parent->node = nodeN;
1458
1459 /* Locate that node */
1460 SGITITERATE(innerTuple, i, node)
1461 {
1462 if (i == nodeN)
1463 break;
1464 }
1465
1466 if (i != nodeN)
1467 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1468 nodeN);
1469
1470 /* Point current to the downlink location, if any */
1471 if (ItemPointerIsValid(&node->t_tid))
1472 {
1473 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1474 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1475 }
1476 else
1477 {
1478 /* Downlink is empty, so we'll need to find a new page */
1479 current->blkno = InvalidBlockNumber;
1480 current->offnum = InvalidOffsetNumber;
1481 }
1482
1483 current->buffer = InvalidBuffer;
1484 current->page = NULL;
1485 }
1486
1487 /*
1488 * spgAddNode action: add a node to the inner tuple at current
1489 */
1490 static void
spgAddNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN,Datum nodeLabel)1491 spgAddNodeAction(Relation index, SpGistState *state,
1492 SpGistInnerTuple innerTuple,
1493 SPPageDesc *current, SPPageDesc *parent,
1494 int nodeN, Datum nodeLabel)
1495 {
1496 SpGistInnerTuple newInnerTuple;
1497 spgxlogAddNode xlrec;
1498
1499 /* Should not be applied to nulls */
1500 Assert(!SpGistPageStoresNulls(current->page));
1501
1502 /* Construct new inner tuple with additional node */
1503 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1504
1505 /* Prepare WAL record */
1506 STORE_STATE(state, xlrec.stateSrc);
1507 xlrec.offnum = current->offnum;
1508
1509 /* we don't fill these unless we need to change the parent downlink */
1510 xlrec.parentBlk = -1;
1511 xlrec.offnumParent = InvalidOffsetNumber;
1512 xlrec.nodeI = 0;
1513
1514 /* we don't fill these unless tuple has to be moved */
1515 xlrec.offnumNew = InvalidOffsetNumber;
1516 xlrec.newPage = false;
1517
1518 if (PageGetExactFreeSpace(current->page) >=
1519 newInnerTuple->size - innerTuple->size)
1520 {
1521 /*
1522 * We can replace the inner tuple by new version in-place
1523 */
1524 START_CRIT_SECTION();
1525
1526 PageIndexTupleDelete(current->page, current->offnum);
1527 if (PageAddItem(current->page,
1528 (Item) newInnerTuple, newInnerTuple->size,
1529 current->offnum, false, false) != current->offnum)
1530 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1531 newInnerTuple->size);
1532
1533 MarkBufferDirty(current->buffer);
1534
1535 if (RelationNeedsWAL(index))
1536 {
1537 XLogRecPtr recptr;
1538
1539 XLogBeginInsert();
1540 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1541 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1542
1543 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1544
1545 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1546
1547 PageSetLSN(current->page, recptr);
1548 }
1549
1550 END_CRIT_SECTION();
1551 }
1552 else
1553 {
1554 /*
1555 * move inner tuple to another page, and update parent
1556 */
1557 SpGistDeadTuple dt;
1558 SPPageDesc saveCurrent;
1559
1560 /*
1561 * It should not be possible to get here for the root page, since we
1562 * allow only one inner tuple on the root page, and spgFormInnerTuple
1563 * always checks that inner tuples don't exceed the size of a page.
1564 */
1565 if (SpGistBlockIsRoot(current->blkno))
1566 elog(ERROR, "cannot enlarge root tuple any more");
1567 Assert(parent->buffer != InvalidBuffer);
1568
1569 saveCurrent = *current;
1570
1571 xlrec.offnumParent = parent->offnum;
1572 xlrec.nodeI = parent->node;
1573
1574 /*
1575 * obtain new buffer with the same parity as current, since it will be
1576 * a child of same parent tuple
1577 */
1578 current->buffer = SpGistGetBuffer(index,
1579 GBUF_INNER_PARITY(current->blkno),
1580 newInnerTuple->size + sizeof(ItemIdData),
1581 &xlrec.newPage);
1582 current->blkno = BufferGetBlockNumber(current->buffer);
1583 current->page = BufferGetPage(current->buffer);
1584
1585 /*
1586 * Let's just make real sure new current isn't same as old. Right now
1587 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1588 * delete placeholder tuples before checking space, maybe it wouldn't
1589 * be impossible. The case would appear to work except that WAL
1590 * replay would be subtly wrong, so I think a mere assert isn't enough
1591 * here.
1592 */
1593 if (current->blkno == saveCurrent.blkno)
1594 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1595
1596 /*
1597 * New current and parent buffer will both be modified; but note that
1598 * parent buffer could be same as either new or old current.
1599 */
1600 if (parent->buffer == saveCurrent.buffer)
1601 xlrec.parentBlk = 0;
1602 else if (parent->buffer == current->buffer)
1603 xlrec.parentBlk = 1;
1604 else
1605 xlrec.parentBlk = 2;
1606
1607 START_CRIT_SECTION();
1608
1609 /* insert new ... */
1610 xlrec.offnumNew = current->offnum =
1611 SpGistPageAddNewItem(state, current->page,
1612 (Item) newInnerTuple, newInnerTuple->size,
1613 NULL, false);
1614
1615 MarkBufferDirty(current->buffer);
1616
1617 /* update parent's downlink and mark parent page dirty */
1618 saveNodeLink(index, parent, current->blkno, current->offnum);
1619
1620 /*
1621 * Replace old tuple with a placeholder or redirection tuple. Unless
1622 * doing an index build, we have to insert a redirection tuple for
1623 * possible concurrent scans. We can't just delete it in any case,
1624 * because that could change the offsets of other tuples on the page,
1625 * breaking downlinks from their parents.
1626 */
1627 if (state->isBuild)
1628 dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1629 InvalidBlockNumber, InvalidOffsetNumber);
1630 else
1631 dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1632 current->blkno, current->offnum);
1633
1634 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1635 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1636 saveCurrent.offnum,
1637 false, false) != saveCurrent.offnum)
1638 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1639 dt->size);
1640
1641 if (state->isBuild)
1642 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1643 else
1644 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1645
1646 MarkBufferDirty(saveCurrent.buffer);
1647
1648 if (RelationNeedsWAL(index))
1649 {
1650 XLogRecPtr recptr;
1651 int flags;
1652
1653 XLogBeginInsert();
1654
1655 /* orig page */
1656 XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1657 /* new page */
1658 flags = REGBUF_STANDARD;
1659 if (xlrec.newPage)
1660 flags |= REGBUF_WILL_INIT;
1661 XLogRegisterBuffer(1, current->buffer, flags);
1662 /* parent page (if different from orig and new) */
1663 if (xlrec.parentBlk == 2)
1664 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1665
1666 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1667 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1668
1669 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1670
1671 /* we don't bother to check if any of these are redundant */
1672 PageSetLSN(current->page, recptr);
1673 PageSetLSN(parent->page, recptr);
1674 PageSetLSN(saveCurrent.page, recptr);
1675 }
1676
1677 END_CRIT_SECTION();
1678
1679 /* Release saveCurrent if it's not same as current or parent */
1680 if (saveCurrent.buffer != current->buffer &&
1681 saveCurrent.buffer != parent->buffer)
1682 {
1683 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1684 UnlockReleaseBuffer(saveCurrent.buffer);
1685 }
1686 }
1687 }
1688
1689 /*
1690 * spgSplitNode action: split inner tuple at current into prefix and postfix
1691 */
1692 static void
spgSplitNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,spgChooseOut * out)1693 spgSplitNodeAction(Relation index, SpGistState *state,
1694 SpGistInnerTuple innerTuple,
1695 SPPageDesc *current, spgChooseOut *out)
1696 {
1697 SpGistInnerTuple prefixTuple,
1698 postfixTuple;
1699 SpGistNodeTuple node,
1700 *nodes;
1701 BlockNumber postfixBlkno;
1702 OffsetNumber postfixOffset;
1703 int i;
1704 spgxlogSplitTuple xlrec;
1705 Buffer newBuffer = InvalidBuffer;
1706
1707 /* Should not be applied to nulls */
1708 Assert(!SpGistPageStoresNulls(current->page));
1709
1710 /* Check opclass gave us sane values */
1711 if (out->result.splitTuple.prefixNNodes <= 0 ||
1712 out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1713 elog(ERROR, "invalid number of prefix nodes: %d",
1714 out->result.splitTuple.prefixNNodes);
1715 if (out->result.splitTuple.childNodeN < 0 ||
1716 out->result.splitTuple.childNodeN >=
1717 out->result.splitTuple.prefixNNodes)
1718 elog(ERROR, "invalid child node number: %d",
1719 out->result.splitTuple.childNodeN);
1720
1721 /*
1722 * Construct new prefix tuple with requested number of nodes. We'll fill
1723 * in the childNodeN'th node's downlink below.
1724 */
1725 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1726 out->result.splitTuple.prefixNNodes);
1727
1728 for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1729 {
1730 Datum label = (Datum) 0;
1731 bool labelisnull;
1732
1733 labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1734 if (!labelisnull)
1735 label = out->result.splitTuple.prefixNodeLabels[i];
1736 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1737 }
1738
1739 prefixTuple = spgFormInnerTuple(state,
1740 out->result.splitTuple.prefixHasPrefix,
1741 out->result.splitTuple.prefixPrefixDatum,
1742 out->result.splitTuple.prefixNNodes,
1743 nodes);
1744
1745 /* it must fit in the space that innerTuple now occupies */
1746 if (prefixTuple->size > innerTuple->size)
1747 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1748
1749 /*
1750 * Construct new postfix tuple, containing all nodes of innerTuple with
1751 * same node datums, but with the prefix specified by the picksplit
1752 * function.
1753 */
1754 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1755 SGITITERATE(innerTuple, i, node)
1756 {
1757 nodes[i] = node;
1758 }
1759
1760 postfixTuple = spgFormInnerTuple(state,
1761 out->result.splitTuple.postfixHasPrefix,
1762 out->result.splitTuple.postfixPrefixDatum,
1763 innerTuple->nNodes, nodes);
1764
1765 /* Postfix tuple is allTheSame if original tuple was */
1766 postfixTuple->allTheSame = innerTuple->allTheSame;
1767
1768 /* prep data for WAL record */
1769 xlrec.newPage = false;
1770
1771 /*
1772 * If we can't fit both tuples on the current page, get a new page for the
1773 * postfix tuple. In particular, can't split to the root page.
1774 *
1775 * For the space calculation, note that prefixTuple replaces innerTuple
1776 * but postfixTuple will be a new entry.
1777 */
1778 if (SpGistBlockIsRoot(current->blkno) ||
1779 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1780 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1781 {
1782 /*
1783 * Choose page with next triple parity, because postfix tuple is a
1784 * child of prefix one
1785 */
1786 newBuffer = SpGistGetBuffer(index,
1787 GBUF_INNER_PARITY(current->blkno + 1),
1788 postfixTuple->size + sizeof(ItemIdData),
1789 &xlrec.newPage);
1790 }
1791
1792 START_CRIT_SECTION();
1793
1794 /*
1795 * Replace old tuple by prefix tuple
1796 */
1797 PageIndexTupleDelete(current->page, current->offnum);
1798 xlrec.offnumPrefix = PageAddItem(current->page,
1799 (Item) prefixTuple, prefixTuple->size,
1800 current->offnum, false, false);
1801 if (xlrec.offnumPrefix != current->offnum)
1802 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1803 prefixTuple->size);
1804
1805 /*
1806 * put postfix tuple into appropriate page
1807 */
1808 if (newBuffer == InvalidBuffer)
1809 {
1810 postfixBlkno = current->blkno;
1811 xlrec.offnumPostfix = postfixOffset =
1812 SpGistPageAddNewItem(state, current->page,
1813 (Item) postfixTuple, postfixTuple->size,
1814 NULL, false);
1815 xlrec.postfixBlkSame = true;
1816 }
1817 else
1818 {
1819 postfixBlkno = BufferGetBlockNumber(newBuffer);
1820 xlrec.offnumPostfix = postfixOffset =
1821 SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1822 (Item) postfixTuple, postfixTuple->size,
1823 NULL, false);
1824 MarkBufferDirty(newBuffer);
1825 xlrec.postfixBlkSame = false;
1826 }
1827
1828 /*
1829 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1830 * (We can't avoid this step by doing the above two steps in opposite
1831 * order, because there might not be enough space on the page to insert
1832 * the postfix tuple first.) We have to update the local copy of the
1833 * prefixTuple too, because that's what will be written to WAL.
1834 */
1835 spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1836 postfixBlkno, postfixOffset);
1837 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1838 PageGetItemId(current->page, current->offnum));
1839 spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1840 postfixBlkno, postfixOffset);
1841
1842 MarkBufferDirty(current->buffer);
1843
1844 if (RelationNeedsWAL(index))
1845 {
1846 XLogRecPtr recptr;
1847
1848 XLogBeginInsert();
1849 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1850 XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1851 XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1852
1853 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1854 if (newBuffer != InvalidBuffer)
1855 {
1856 int flags;
1857
1858 flags = REGBUF_STANDARD;
1859 if (xlrec.newPage)
1860 flags |= REGBUF_WILL_INIT;
1861 XLogRegisterBuffer(1, newBuffer, flags);
1862 }
1863
1864 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1865
1866 PageSetLSN(current->page, recptr);
1867
1868 if (newBuffer != InvalidBuffer)
1869 {
1870 PageSetLSN(BufferGetPage(newBuffer), recptr);
1871 }
1872 }
1873
1874 END_CRIT_SECTION();
1875
1876 /* Update local free-space cache and release buffer */
1877 if (newBuffer != InvalidBuffer)
1878 {
1879 SpGistSetLastUsedPage(index, newBuffer);
1880 UnlockReleaseBuffer(newBuffer);
1881 }
1882 }
1883
1884 /*
1885 * Insert one item into the index.
1886 *
1887 * Returns true on success, false if we failed to complete the insertion
1888 * (typically because of conflict with a concurrent insert). In the latter
1889 * case, caller should re-call spgdoinsert() with the same args.
1890 */
1891 bool
spgdoinsert(Relation index,SpGistState * state,ItemPointer heapPtr,Datum datum,bool isnull)1892 spgdoinsert(Relation index, SpGistState *state,
1893 ItemPointer heapPtr, Datum datum, bool isnull)
1894 {
1895 bool result = true;
1896 int level = 0;
1897 Datum leafDatum;
1898 int leafSize;
1899 int bestLeafSize;
1900 int numNoProgressCycles = 0;
1901 SPPageDesc current,
1902 parent;
1903 FmgrInfo *procinfo = NULL;
1904
1905 /*
1906 * Look up FmgrInfo of the user-defined choose function once, to save
1907 * cycles in the loop below.
1908 */
1909 if (!isnull)
1910 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1911
1912 /*
1913 * Since we don't use index_form_tuple in this AM, we have to make sure
1914 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1915 * that.
1916 */
1917 if (!isnull && state->attType.attlen == -1)
1918 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1919
1920 leafDatum = datum;
1921
1922 /*
1923 * Compute space needed for a leaf tuple containing the given datum.
1924 *
1925 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1926 * suffixing, bail out now rather than doing a lot of useless work.
1927 */
1928 if (!isnull)
1929 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1930 SpGistGetTypeSize(&state->attType, leafDatum);
1931 else
1932 leafSize = SGDTSIZE + sizeof(ItemIdData);
1933
1934 if (leafSize > SPGIST_PAGE_CAPACITY &&
1935 (isnull || !state->config.longValuesOK))
1936 ereport(ERROR,
1937 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1938 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1939 leafSize - sizeof(ItemIdData),
1940 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1941 RelationGetRelationName(index)),
1942 errhint("Values larger than a buffer page cannot be indexed.")));
1943 bestLeafSize = leafSize;
1944
1945 /* Initialize "current" to the appropriate root page */
1946 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1947 current.buffer = InvalidBuffer;
1948 current.page = NULL;
1949 current.offnum = FirstOffsetNumber;
1950 current.node = -1;
1951
1952 /* "parent" is invalid for the moment */
1953 parent.blkno = InvalidBlockNumber;
1954 parent.buffer = InvalidBuffer;
1955 parent.page = NULL;
1956 parent.offnum = InvalidOffsetNumber;
1957 parent.node = -1;
1958
1959 /*
1960 * Before entering the loop, try to clear any pending interrupt condition.
1961 * If a query cancel is pending, we might as well accept it now not later;
1962 * while if a non-canceling condition is pending, servicing it here avoids
1963 * having to restart the insertion and redo all the work so far.
1964 */
1965 CHECK_FOR_INTERRUPTS();
1966
1967 for (;;)
1968 {
1969 bool isNew = false;
1970
1971 /*
1972 * Bail out if query cancel is pending. We must have this somewhere
1973 * in the loop since a broken opclass could produce an infinite
1974 * picksplit loop. However, because we'll be holding buffer lock(s)
1975 * after the first iteration, ProcessInterrupts() wouldn't be able to
1976 * throw a cancel error here. Hence, if we see that an interrupt is
1977 * pending, break out of the loop and deal with the situation below.
1978 * Set result = false because we must restart the insertion if the
1979 * interrupt isn't a query-cancel-or-die case.
1980 */
1981 if (INTERRUPTS_PENDING_CONDITION())
1982 {
1983 result = false;
1984 break;
1985 }
1986
1987 if (current.blkno == InvalidBlockNumber)
1988 {
1989 /*
1990 * Create a leaf page. If leafSize is too large to fit on a page,
1991 * we won't actually use the page yet, but it simplifies the API
1992 * for doPickSplit to always have a leaf page at hand; so just
1993 * quietly limit our request to a page size.
1994 */
1995 current.buffer =
1996 SpGistGetBuffer(index,
1997 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1998 Min(leafSize, SPGIST_PAGE_CAPACITY),
1999 &isNew);
2000 current.blkno = BufferGetBlockNumber(current.buffer);
2001 }
2002 else if (parent.buffer == InvalidBuffer)
2003 {
2004 /* we hold no parent-page lock, so no deadlock is possible */
2005 current.buffer = ReadBuffer(index, current.blkno);
2006 LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2007 }
2008 else if (current.blkno != parent.blkno)
2009 {
2010 /* descend to a new child page */
2011 current.buffer = ReadBuffer(index, current.blkno);
2012
2013 /*
2014 * Attempt to acquire lock on child page. We must beware of
2015 * deadlock against another insertion process descending from that
2016 * page to our parent page (see README). If we fail to get lock,
2017 * abandon the insertion and tell our caller to start over.
2018 *
2019 * XXX this could be improved, because failing to get lock on a
2020 * buffer is not proof of a deadlock situation; the lock might be
2021 * held by a reader, or even just background writer/checkpointer
2022 * process. Perhaps it'd be worth retrying after sleeping a bit?
2023 */
2024 if (!ConditionalLockBuffer(current.buffer))
2025 {
2026 ReleaseBuffer(current.buffer);
2027 UnlockReleaseBuffer(parent.buffer);
2028 return false;
2029 }
2030 }
2031 else
2032 {
2033 /* inner tuple can be stored on the same page as parent one */
2034 current.buffer = parent.buffer;
2035 }
2036 current.page = BufferGetPage(current.buffer);
2037
2038 /* should not arrive at a page of the wrong type */
2039 if (isnull ? !SpGistPageStoresNulls(current.page) :
2040 SpGistPageStoresNulls(current.page))
2041 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2042 current.blkno);
2043
2044 if (SpGistPageIsLeaf(current.page))
2045 {
2046 SpGistLeafTuple leafTuple;
2047 int nToSplit,
2048 sizeToSplit;
2049
2050 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2051 if (leafTuple->size + sizeof(ItemIdData) <=
2052 SpGistPageGetFreeSpace(current.page, 1))
2053 {
2054 /* it fits on page, so insert it and we're done */
2055 addLeafTuple(index, state, leafTuple,
2056 ¤t, &parent, isnull, isNew);
2057 break;
2058 }
2059 else if ((sizeToSplit =
2060 checkSplitConditions(index, state, ¤t,
2061 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2062 nToSplit < 64 &&
2063 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2064 {
2065 /*
2066 * the amount of data is pretty small, so just move the whole
2067 * chain to another leaf page rather than splitting it.
2068 */
2069 Assert(!isNew);
2070 moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
2071 break; /* we're done */
2072 }
2073 else
2074 {
2075 /* picksplit */
2076 if (doPickSplit(index, state, ¤t, &parent,
2077 leafTuple, level, isnull, isNew))
2078 break; /* doPickSplit installed new tuples */
2079
2080 /* leaf tuple will not be inserted yet */
2081 pfree(leafTuple);
2082
2083 /*
2084 * current now describes new inner tuple, go insert into it
2085 */
2086 Assert(!SpGistPageIsLeaf(current.page));
2087 goto process_inner_tuple;
2088 }
2089 }
2090 else /* non-leaf page */
2091 {
2092 /*
2093 * Apply the opclass choose function to figure out how to insert
2094 * the given datum into the current inner tuple.
2095 */
2096 SpGistInnerTuple innerTuple;
2097 spgChooseIn in;
2098 spgChooseOut out;
2099
2100 /*
2101 * spgAddNode and spgSplitTuple cases will loop back to here to
2102 * complete the insertion operation. Just in case the choose
2103 * function is broken and produces add or split requests
2104 * repeatedly, check for query cancel (see comments above).
2105 */
2106 process_inner_tuple:
2107 if (INTERRUPTS_PENDING_CONDITION())
2108 {
2109 result = false;
2110 break;
2111 }
2112
2113 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2114 PageGetItemId(current.page, current.offnum));
2115
2116 in.datum = datum;
2117 in.leafDatum = leafDatum;
2118 in.level = level;
2119 in.allTheSame = innerTuple->allTheSame;
2120 in.hasPrefix = (innerTuple->prefixSize > 0);
2121 in.prefixDatum = SGITDATUM(innerTuple, state);
2122 in.nNodes = innerTuple->nNodes;
2123 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2124
2125 memset(&out, 0, sizeof(out));
2126
2127 if (!isnull)
2128 {
2129 /* use user-defined choose method */
2130 FunctionCall2Coll(procinfo,
2131 index->rd_indcollation[0],
2132 PointerGetDatum(&in),
2133 PointerGetDatum(&out));
2134 }
2135 else
2136 {
2137 /* force "match" action (to insert to random subnode) */
2138 out.resultType = spgMatchNode;
2139 }
2140
2141 if (innerTuple->allTheSame)
2142 {
2143 /*
2144 * It's not allowed to do an AddNode at an allTheSame tuple.
2145 * Opclass must say "match", in which case we choose a random
2146 * one of the nodes to descend into, or "split".
2147 */
2148 if (out.resultType == spgAddNode)
2149 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2150 else if (out.resultType == spgMatchNode)
2151 out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2152 }
2153
2154 switch (out.resultType)
2155 {
2156 case spgMatchNode:
2157 /* Descend to N'th child node */
2158 spgMatchNodeAction(index, state, innerTuple,
2159 ¤t, &parent,
2160 out.result.matchNode.nodeN);
2161 /* Adjust level as per opclass request */
2162 level += out.result.matchNode.levelAdd;
2163 /* Replace leafDatum and recompute leafSize */
2164 if (!isnull)
2165 {
2166 leafDatum = out.result.matchNode.restDatum;
2167 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2168 SpGistGetTypeSize(&state->attType, leafDatum);
2169 }
2170
2171 /*
2172 * Check new tuple size; fail if it can't fit, unless the
2173 * opclass says it can handle the situation by suffixing.
2174 *
2175 * A buggy opclass might not ever make the leaf datum
2176 * small enough, causing an infinite loop. To detect such
2177 * a loop, check to see if we are making progress by
2178 * reducing the leafSize in each pass. This is a bit
2179 * tricky though. Because of alignment considerations,
2180 * the total tuple size might not decrease on every pass.
2181 * Also, there are edge cases where the choose method
2182 * might seem to not make progress for a cycle or two.
2183 * Somewhat arbitrarily, we allow up to 10 no-progress
2184 * iterations before failing. (This limit should be more
2185 * than MAXALIGN, to accommodate opclasses that trim one
2186 * byte from the leaf datum per pass.)
2187 */
2188 if (leafSize > SPGIST_PAGE_CAPACITY)
2189 {
2190 bool ok = false;
2191
2192 if (state->config.longValuesOK && !isnull)
2193 {
2194 if (leafSize < bestLeafSize)
2195 {
2196 ok = true;
2197 bestLeafSize = leafSize;
2198 numNoProgressCycles = 0;
2199 }
2200 else if (++numNoProgressCycles < 10)
2201 ok = true;
2202 }
2203 if (!ok)
2204 ereport(ERROR,
2205 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2206 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2207 leafSize - sizeof(ItemIdData),
2208 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2209 RelationGetRelationName(index)),
2210 errhint("Values larger than a buffer page cannot be indexed.")));
2211 }
2212
2213 /*
2214 * Loop around and attempt to insert the new leafDatum at
2215 * "current" (which might reference an existing child
2216 * tuple, or might be invalid to force us to find a new
2217 * page for the tuple).
2218 */
2219 break;
2220 case spgAddNode:
2221 /* AddNode is not sensible if nodes don't have labels */
2222 if (in.nodeLabels == NULL)
2223 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2224 /* Add node to inner tuple, per request */
2225 spgAddNodeAction(index, state, innerTuple,
2226 ¤t, &parent,
2227 out.result.addNode.nodeN,
2228 out.result.addNode.nodeLabel);
2229
2230 /*
2231 * Retry insertion into the enlarged node. We assume that
2232 * we'll get a MatchNode result this time.
2233 */
2234 goto process_inner_tuple;
2235 break;
2236 case spgSplitTuple:
2237 /* Split inner tuple, per request */
2238 spgSplitNodeAction(index, state, innerTuple,
2239 ¤t, &out);
2240
2241 /* Retry insertion into the split node */
2242 goto process_inner_tuple;
2243 break;
2244 default:
2245 elog(ERROR, "unrecognized SPGiST choose result: %d",
2246 (int) out.resultType);
2247 break;
2248 }
2249 }
2250 } /* end loop */
2251
2252 /*
2253 * Release any buffers we're still holding. Beware of possibility that
2254 * current and parent reference same buffer.
2255 */
2256 if (current.buffer != InvalidBuffer)
2257 {
2258 SpGistSetLastUsedPage(index, current.buffer);
2259 UnlockReleaseBuffer(current.buffer);
2260 }
2261 if (parent.buffer != InvalidBuffer &&
2262 parent.buffer != current.buffer)
2263 {
2264 SpGistSetLastUsedPage(index, parent.buffer);
2265 UnlockReleaseBuffer(parent.buffer);
2266 }
2267
2268 /*
2269 * We do not support being called while some outer function is holding a
2270 * buffer lock (or any other reason to postpone query cancels). If that
2271 * were the case, telling the caller to retry would create an infinite
2272 * loop.
2273 */
2274 Assert(INTERRUPTS_CAN_BE_PROCESSED());
2275
2276 /*
2277 * Finally, check for interrupts again. If there was a query cancel,
2278 * ProcessInterrupts() will be able to throw the error here. If it was
2279 * some other kind of interrupt that can just be cleared, return false to
2280 * tell our caller to retry.
2281 */
2282 CHECK_FOR_INTERRUPTS();
2283
2284 return result;
2285 }
2286