1 /*-------------------------------------------------------------------------
2 *
3 * spgdoinsert.c
4 * implementation of insert algorithm
5 *
6 *
7 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgdoinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/bufmgr.h"
24 #include "utils/rel.h"
25
26
27 /*
28 * SPPageDesc tracks all info about a page we are inserting into. In some
29 * situations it actually identifies a tuple, or even a specific node within
30 * an inner tuple. But any of the fields can be invalid. If the buffer
31 * field is valid, it implies we hold pin and exclusive lock on that buffer.
32 * page pointer should be valid exactly when buffer is.
33 */
34 typedef struct SPPageDesc
35 {
36 BlockNumber blkno; /* block number, or InvalidBlockNumber */
37 Buffer buffer; /* page's buffer number, or InvalidBuffer */
38 Page page; /* pointer to page buffer, or NULL */
39 OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
40 int node; /* node number within inner tuple, or -1 */
41 } SPPageDesc;
42
43
44 /*
45 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
46 * is used to update the parent inner tuple's downlink after a move or
47 * split operation.
48 */
49 void
spgUpdateNodeLink(SpGistInnerTuple tup,int nodeN,BlockNumber blkno,OffsetNumber offset)50 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
51 BlockNumber blkno, OffsetNumber offset)
52 {
53 int i;
54 SpGistNodeTuple node;
55
56 SGITITERATE(tup, i, node)
57 {
58 if (i == nodeN)
59 {
60 ItemPointerSet(&node->t_tid, blkno, offset);
61 return;
62 }
63 }
64
65 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66 nodeN);
67 }
68
69 /*
70 * Form a new inner tuple containing one more node than the given one, with
71 * the specified label datum, inserted at offset "offset" in the node array.
72 * The new tuple's prefix is the same as the old one's.
73 *
74 * Note that the new node initially has an invalid downlink. We'll find a
75 * page to point it to later.
76 */
77 static SpGistInnerTuple
addNode(SpGistState * state,SpGistInnerTuple tuple,Datum label,int offset)78 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
79 {
80 SpGistNodeTuple node,
81 *nodes;
82 int i;
83
84 /* if offset is negative, insert at end */
85 if (offset < 0)
86 offset = tuple->nNodes;
87 else if (offset > tuple->nNodes)
88 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89
90 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91 SGITITERATE(tuple, i, node)
92 {
93 if (i < offset)
94 nodes[i] = node;
95 else
96 nodes[i + 1] = node;
97 }
98
99 nodes[offset] = spgFormNodeTuple(state, label, false);
100
101 return spgFormInnerTuple(state,
102 (tuple->prefixSize > 0),
103 SGITDATUM(tuple, state),
104 tuple->nNodes + 1,
105 nodes);
106 }
107
108 /* qsort comparator for sorting OffsetNumbers */
109 static int
cmpOffsetNumbers(const void * a,const void * b)110 cmpOffsetNumbers(const void *a, const void *b)
111 {
112 if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113 return 0;
114 return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 }
116
117 /*
118 * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 *
120 * The first tuple in the given list is replaced with a dead tuple of type
121 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 * with dead tuples of type "reststate". If either firststate or reststate
123 * is REDIRECT, blkno/offnum specify where to link to.
124 *
125 * NB: this is used during WAL replay, so beware of trying to make it too
126 * smart. In particular, it shouldn't use "state" except for calling
127 * spgFormDeadTuple(). This is also used in a critical section, so no
128 * pallocs either!
129 */
130 void
spgPageIndexMultiDelete(SpGistState * state,Page page,OffsetNumber * itemnos,int nitems,int firststate,int reststate,BlockNumber blkno,OffsetNumber offnum)131 spgPageIndexMultiDelete(SpGistState *state, Page page,
132 OffsetNumber *itemnos, int nitems,
133 int firststate, int reststate,
134 BlockNumber blkno, OffsetNumber offnum)
135 {
136 OffsetNumber firstItem;
137 OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 SpGistDeadTuple tuple = NULL;
139 int i;
140
141 if (nitems == 0)
142 return; /* nothing to do */
143
144 /*
145 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 * targets to be listed in sorted order, so we have to sort the itemnos
147 * array. (This also greatly simplifies the math for reinserting the
148 * replacement tuples.) However, we must not scribble on the caller's
149 * array, so we have to make a copy.
150 */
151 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 if (nitems > 1)
153 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154
155 PageIndexMultiDelete(page, sortednos, nitems);
156
157 firstItem = itemnos[0];
158
159 for (i = 0; i < nitems; i++)
160 {
161 OffsetNumber itemno = sortednos[i];
162 int tupstate;
163
164 tupstate = (itemno == firstItem) ? firststate : reststate;
165 if (tuple == NULL || tuple->tupstate != tupstate)
166 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167
168 if (PageAddItem(page, (Item) tuple, tuple->size,
169 itemno, false, false) != itemno)
170 elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 tuple->size);
172
173 if (tupstate == SPGIST_REDIRECT)
174 SpGistPageGetOpaque(page)->nRedirection++;
175 else if (tupstate == SPGIST_PLACEHOLDER)
176 SpGistPageGetOpaque(page)->nPlaceholder++;
177 }
178 }
179
180 /*
181 * Update the parent inner tuple's downlink, and mark the parent buffer
182 * dirty (this must be the last change to the parent page in the current
183 * WAL action).
184 */
185 static void
saveNodeLink(Relation index,SPPageDesc * parent,BlockNumber blkno,OffsetNumber offnum)186 saveNodeLink(Relation index, SPPageDesc *parent,
187 BlockNumber blkno, OffsetNumber offnum)
188 {
189 SpGistInnerTuple innerTuple;
190
191 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 PageGetItemId(parent->page, parent->offnum));
193
194 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195
196 MarkBufferDirty(parent->buffer);
197 }
198
199 /*
200 * Add a leaf tuple to a leaf page where there is known to be room for it
201 */
202 static void
addLeafTuple(Relation index,SpGistState * state,SpGistLeafTuple leafTuple,SPPageDesc * current,SPPageDesc * parent,bool isNulls,bool isNew)203 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 {
206 spgxlogAddLeaf xlrec;
207
208 xlrec.newPage = isNew;
209 xlrec.storesNulls = isNulls;
210
211 /* these will be filled below as needed */
212 xlrec.offnumLeaf = InvalidOffsetNumber;
213 xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 xlrec.offnumParent = InvalidOffsetNumber;
215 xlrec.nodeI = 0;
216
217 START_CRIT_SECTION();
218
219 if (current->offnum == InvalidOffsetNumber ||
220 SpGistBlockIsRoot(current->blkno))
221 {
222 /* Tuple is not part of a chain */
223 SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
224 current->offnum = SpGistPageAddNewItem(state, current->page,
225 (Item) leafTuple, leafTuple->size,
226 NULL, false);
227
228 xlrec.offnumLeaf = current->offnum;
229
230 /* Must update parent's downlink if any */
231 if (parent->buffer != InvalidBuffer)
232 {
233 xlrec.offnumParent = parent->offnum;
234 xlrec.nodeI = parent->node;
235
236 saveNodeLink(index, parent, current->blkno, current->offnum);
237 }
238 }
239 else
240 {
241 /*
242 * Tuple must be inserted into existing chain. We mustn't change the
243 * chain's head address, but we don't need to chase the entire chain
244 * to put the tuple at the end; we can insert it second.
245 *
246 * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 * in which case we should replace the DEAD tuple in-place.
248 */
249 SpGistLeafTuple head;
250 OffsetNumber offnum;
251
252 head = (SpGistLeafTuple) PageGetItem(current->page,
253 PageGetItemId(current->page, current->offnum));
254 if (head->tupstate == SPGIST_LIVE)
255 {
256 SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257 offnum = SpGistPageAddNewItem(state, current->page,
258 (Item) leafTuple, leafTuple->size,
259 NULL, false);
260
261 /*
262 * re-get head of list because it could have been moved on page,
263 * and set new second element
264 */
265 head = (SpGistLeafTuple) PageGetItem(current->page,
266 PageGetItemId(current->page, current->offnum));
267 SGLT_SET_NEXTOFFSET(head, offnum);
268
269 xlrec.offnumLeaf = offnum;
270 xlrec.offnumHeadLeaf = current->offnum;
271 }
272 else if (head->tupstate == SPGIST_DEAD)
273 {
274 SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
275 PageIndexTupleDelete(current->page, current->offnum);
276 if (PageAddItem(current->page,
277 (Item) leafTuple, leafTuple->size,
278 current->offnum, false, false) != current->offnum)
279 elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 leafTuple->size);
281
282 /* WAL replay distinguishes this case by equal offnums */
283 xlrec.offnumLeaf = current->offnum;
284 xlrec.offnumHeadLeaf = current->offnum;
285 }
286 else
287 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 }
289
290 MarkBufferDirty(current->buffer);
291
292 if (RelationNeedsWAL(index) && !state->isBuild)
293 {
294 XLogRecPtr recptr;
295 int flags;
296
297 XLogBeginInsert();
298 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 XLogRegisterData((char *) leafTuple, leafTuple->size);
300
301 flags = REGBUF_STANDARD;
302 if (xlrec.newPage)
303 flags |= REGBUF_WILL_INIT;
304 XLogRegisterBuffer(0, current->buffer, flags);
305 if (xlrec.offnumParent != InvalidOffsetNumber)
306 XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307
308 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309
310 PageSetLSN(current->page, recptr);
311
312 /* update parent only if we actually changed it */
313 if (xlrec.offnumParent != InvalidOffsetNumber)
314 {
315 PageSetLSN(parent->page, recptr);
316 }
317 }
318
319 END_CRIT_SECTION();
320 }
321
322 /*
323 * Count the number and total size of leaf tuples in the chain starting at
324 * current->offnum. Return number into *nToSplit and total size as function
325 * result.
326 *
327 * Klugy special case when considering the root page (i.e., root is a leaf
328 * page, but we're about to split for the first time): return fake large
329 * values to force spgdoinsert() to take the doPickSplit rather than
330 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331 */
332 static int
checkSplitConditions(Relation index,SpGistState * state,SPPageDesc * current,int * nToSplit)333 checkSplitConditions(Relation index, SpGistState *state,
334 SPPageDesc *current, int *nToSplit)
335 {
336 int i,
337 n = 0,
338 totalSize = 0;
339
340 if (SpGistBlockIsRoot(current->blkno))
341 {
342 /* return impossible values to force split */
343 *nToSplit = BLCKSZ;
344 return BLCKSZ;
345 }
346
347 i = current->offnum;
348 while (i != InvalidOffsetNumber)
349 {
350 SpGistLeafTuple it;
351
352 Assert(i >= FirstOffsetNumber &&
353 i <= PageGetMaxOffsetNumber(current->page));
354 it = (SpGistLeafTuple) PageGetItem(current->page,
355 PageGetItemId(current->page, i));
356 if (it->tupstate == SPGIST_LIVE)
357 {
358 n++;
359 totalSize += it->size + sizeof(ItemIdData);
360 }
361 else if (it->tupstate == SPGIST_DEAD)
362 {
363 /* We could see a DEAD tuple as first/only chain item */
364 Assert(i == current->offnum);
365 Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
366 /* Don't count it in result, because it won't go to other page */
367 }
368 else
369 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370
371 i = SGLT_GET_NEXTOFFSET(it);
372 }
373
374 *nToSplit = n;
375
376 return totalSize;
377 }
378
379 /*
380 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 * but the chain has to be moved because there's not enough room to add
382 * newLeafTuple to its page. We use this method when the chain contains
383 * very little data so a split would be inefficient. We are sure we can
384 * fit the chain plus newLeafTuple on one other page.
385 */
386 static void
moveLeafs(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,bool isNulls)387 moveLeafs(Relation index, SpGistState *state,
388 SPPageDesc *current, SPPageDesc *parent,
389 SpGistLeafTuple newLeafTuple, bool isNulls)
390 {
391 int i,
392 nDelete,
393 nInsert,
394 size;
395 Buffer nbuf;
396 Page npage;
397 SpGistLeafTuple it;
398 OffsetNumber r = InvalidOffsetNumber,
399 startOffset = InvalidOffsetNumber;
400 bool replaceDead = false;
401 OffsetNumber *toDelete;
402 OffsetNumber *toInsert;
403 BlockNumber nblkno;
404 spgxlogMoveLeafs xlrec;
405 char *leafdata,
406 *leafptr;
407
408 /* This doesn't work on root page */
409 Assert(parent->buffer != InvalidBuffer);
410 Assert(parent->buffer != current->buffer);
411
412 /* Locate the tuples to be moved, and count up the space needed */
413 i = PageGetMaxOffsetNumber(current->page);
414 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416
417 size = newLeafTuple->size + sizeof(ItemIdData);
418
419 nDelete = 0;
420 i = current->offnum;
421 while (i != InvalidOffsetNumber)
422 {
423 SpGistLeafTuple it;
424
425 Assert(i >= FirstOffsetNumber &&
426 i <= PageGetMaxOffsetNumber(current->page));
427 it = (SpGistLeafTuple) PageGetItem(current->page,
428 PageGetItemId(current->page, i));
429
430 if (it->tupstate == SPGIST_LIVE)
431 {
432 toDelete[nDelete] = i;
433 size += it->size + sizeof(ItemIdData);
434 nDelete++;
435 }
436 else if (it->tupstate == SPGIST_DEAD)
437 {
438 /* We could see a DEAD tuple as first/only chain item */
439 Assert(i == current->offnum);
440 Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
441 /* We don't want to move it, so don't count it in size */
442 toDelete[nDelete] = i;
443 nDelete++;
444 replaceDead = true;
445 }
446 else
447 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448
449 i = SGLT_GET_NEXTOFFSET(it);
450 }
451
452 /* Find a leaf page that will hold them */
453 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454 size, &xlrec.newPage);
455 npage = BufferGetPage(nbuf);
456 nblkno = BufferGetBlockNumber(nbuf);
457 Assert(nblkno != current->blkno);
458
459 leafdata = leafptr = palloc(size);
460
461 START_CRIT_SECTION();
462
463 /* copy all the old tuples to new page, unless they're dead */
464 nInsert = 0;
465 if (!replaceDead)
466 {
467 for (i = 0; i < nDelete; i++)
468 {
469 it = (SpGistLeafTuple) PageGetItem(current->page,
470 PageGetItemId(current->page, toDelete[i]));
471 Assert(it->tupstate == SPGIST_LIVE);
472
473 /*
474 * Update chain link (notice the chain order gets reversed, but we
475 * don't care). We're modifying the tuple on the source page
476 * here, but it's okay since we're about to delete it.
477 */
478 SGLT_SET_NEXTOFFSET(it, r);
479
480 r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481 &startOffset, false);
482
483 toInsert[nInsert] = r;
484 nInsert++;
485
486 /* save modified tuple into leafdata as well */
487 memcpy(leafptr, it, it->size);
488 leafptr += it->size;
489 }
490 }
491
492 /* add the new tuple as well */
493 SGLT_SET_NEXTOFFSET(newLeafTuple, r);
494 r = SpGistPageAddNewItem(state, npage,
495 (Item) newLeafTuple, newLeafTuple->size,
496 &startOffset, false);
497 toInsert[nInsert] = r;
498 nInsert++;
499 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500 leafptr += newLeafTuple->size;
501
502 /*
503 * Now delete the old tuples, leaving a redirection pointer behind for the
504 * first one, unless we're doing an index build; in which case there can't
505 * be any concurrent scan so we need not provide a redirect.
506 */
507 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
508 state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
509 SPGIST_PLACEHOLDER,
510 nblkno, r);
511
512 /* Update parent's downlink and mark parent page dirty */
513 saveNodeLink(index, parent, nblkno, r);
514
515 /* Mark the leaf pages too */
516 MarkBufferDirty(current->buffer);
517 MarkBufferDirty(nbuf);
518
519 if (RelationNeedsWAL(index) && !state->isBuild)
520 {
521 XLogRecPtr recptr;
522
523 /* prepare WAL info */
524 STORE_STATE(state, xlrec.stateSrc);
525
526 xlrec.nMoves = nDelete;
527 xlrec.replaceDead = replaceDead;
528 xlrec.storesNulls = isNulls;
529
530 xlrec.offnumParent = parent->offnum;
531 xlrec.nodeI = parent->node;
532
533 XLogBeginInsert();
534 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535 XLogRegisterData((char *) toDelete,
536 sizeof(OffsetNumber) * nDelete);
537 XLogRegisterData((char *) toInsert,
538 sizeof(OffsetNumber) * nInsert);
539 XLogRegisterData((char *) leafdata, leafptr - leafdata);
540
541 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
542 XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
543 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
544
545 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546
547 PageSetLSN(current->page, recptr);
548 PageSetLSN(npage, recptr);
549 PageSetLSN(parent->page, recptr);
550 }
551
552 END_CRIT_SECTION();
553
554 /* Update local free-space cache and release new buffer */
555 SpGistSetLastUsedPage(index, nbuf);
556 UnlockReleaseBuffer(nbuf);
557 }
558
559 /*
560 * Update previously-created redirection tuple with appropriate destination
561 *
562 * We use this when it's not convenient to know the destination first.
563 * The tuple should have been made with the "impossible" destination of
564 * the metapage.
565 */
566 static void
setRedirectionTuple(SPPageDesc * current,OffsetNumber position,BlockNumber blkno,OffsetNumber offnum)567 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
568 BlockNumber blkno, OffsetNumber offnum)
569 {
570 SpGistDeadTuple dt;
571
572 dt = (SpGistDeadTuple) PageGetItem(current->page,
573 PageGetItemId(current->page, position));
574 Assert(dt->tupstate == SPGIST_REDIRECT);
575 Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
576 ItemPointerSet(&dt->pointer, blkno, offnum);
577 }
578
579 /*
580 * Test to see if the user-defined picksplit function failed to do its job,
581 * ie, it put all the leaf tuples into the same node.
582 * If so, randomly divide the tuples into several nodes (all with the same
583 * label) and return true to select allTheSame mode for this inner tuple.
584 *
585 * (This code is also used to forcibly select allTheSame mode for nulls.)
586 *
587 * If we know that the leaf tuples wouldn't all fit on one page, then we
588 * exclude the last tuple (which is the incoming new tuple that forced a split)
589 * from the check to see if more than one node is used. The reason for this
590 * is that if the existing tuples are put into only one chain, then even if
591 * we move them all to an empty page, there would still not be room for the
592 * new tuple, so we'd get into an infinite loop of picksplit attempts.
593 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594 * be split across pages. (Exercise for the reader: figure out why this
595 * fixes the problem even when there is only one old tuple.)
596 */
597 static bool
checkAllTheSame(spgPickSplitIn * in,spgPickSplitOut * out,bool tooBig,bool * includeNew)598 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599 bool *includeNew)
600 {
601 int theNode;
602 int limit;
603 int i;
604
605 /* For the moment, assume we can include the new leaf tuple */
606 *includeNew = true;
607
608 /* If there's only the new leaf tuple, don't select allTheSame mode */
609 if (in->nTuples <= 1)
610 return false;
611
612 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613 limit = tooBig ? in->nTuples - 1 : in->nTuples;
614
615 /* Check to see if more than one node is populated */
616 theNode = out->mapTuplesToNodes[0];
617 for (i = 1; i < limit; i++)
618 {
619 if (out->mapTuplesToNodes[i] != theNode)
620 return false;
621 }
622
623 /* Nope, so override the picksplit function's decisions */
624
625 /* If the new tuple is in its own node, it can't be included in split */
626 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627 *includeNew = false;
628
629 out->nNodes = 8; /* arbitrary number of child nodes */
630
631 /* Random assignment of tuples to nodes (note we include new tuple) */
632 for (i = 0; i < in->nTuples; i++)
633 out->mapTuplesToNodes[i] = i % out->nNodes;
634
635 /* The opclass may not use node labels, but if it does, duplicate 'em */
636 if (out->nodeLabels)
637 {
638 Datum theLabel = out->nodeLabels[theNode];
639
640 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641 for (i = 0; i < out->nNodes; i++)
642 out->nodeLabels[i] = theLabel;
643 }
644
645 /* We don't touch the prefix or the leaf tuple datum assignments */
646
647 return true;
648 }
649
650 /*
651 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652 * but the chain has to be split because there's not enough room to add
653 * newLeafTuple to its page.
654 *
655 * This function splits the leaf tuple set according to picksplit's rules,
656 * creating one or more new chains that are spread across the current page
657 * and an additional leaf page (we assume that two leaf pages will be
658 * sufficient). A new inner tuple is created, and the parent downlink
659 * pointer is updated to point to that inner tuple instead of the leaf chain.
660 *
661 * On exit, current contains the address of the new inner tuple.
662 *
663 * Returns true if we successfully inserted newLeafTuple during this function,
664 * false if caller still has to do it (meaning another picksplit operation is
665 * probably needed). Failure could occur if the picksplit result is fairly
666 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667 * Because we force the picksplit result to be at least two chains, each
668 * cycle will get rid of at least one leaf tuple from the chain, so the loop
669 * will eventually terminate if lack of balance is the issue. If the tuple
670 * is too big, we assume that repeated picksplit operations will eventually
671 * make it small enough by repeated prefix-stripping. A broken opclass could
672 * make this an infinite loop, though, so spgdoinsert() checks that the
673 * leaf datums get smaller each time.
674 */
675 static bool
doPickSplit(Relation index,SpGistState * state,SPPageDesc * current,SPPageDesc * parent,SpGistLeafTuple newLeafTuple,int level,bool isNulls,bool isNew)676 doPickSplit(Relation index, SpGistState *state,
677 SPPageDesc *current, SPPageDesc *parent,
678 SpGistLeafTuple newLeafTuple,
679 int level, bool isNulls, bool isNew)
680 {
681 bool insertedNew = false;
682 spgPickSplitIn in;
683 spgPickSplitOut out;
684 FmgrInfo *procinfo;
685 bool includeNew;
686 int i,
687 max,
688 n;
689 SpGistInnerTuple innerTuple;
690 SpGistNodeTuple node,
691 *nodes;
692 Buffer newInnerBuffer,
693 newLeafBuffer;
694 uint8 *leafPageSelect;
695 int *leafSizes;
696 OffsetNumber *toDelete;
697 OffsetNumber *toInsert;
698 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699 OffsetNumber startOffsets[2];
700 SpGistLeafTuple *oldLeafs;
701 SpGistLeafTuple *newLeafs;
702 Datum leafDatums[INDEX_MAX_KEYS];
703 bool leafIsnulls[INDEX_MAX_KEYS];
704 int spaceToDelete;
705 int currentFreeSpace;
706 int totalLeafSizes;
707 bool allTheSame;
708 spgxlogPickSplit xlrec;
709 char *leafdata,
710 *leafptr;
711 SPPageDesc saveCurrent;
712 int nToDelete,
713 nToInsert,
714 maxToInclude;
715
716 in.level = level;
717
718 /*
719 * Allocate per-leaf-tuple work arrays with max possible size
720 */
721 max = PageGetMaxOffsetNumber(current->page);
722 n = max + 1;
723 in.datums = (Datum *) palloc(sizeof(Datum) * n);
724 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726 oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
727 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
729
730 STORE_STATE(state, xlrec.stateSrc);
731
732 /*
733 * Form list of leaf tuples which will be distributed as split result;
734 * also, count up the amount of space that will be freed from current.
735 * (Note that in the non-root case, we won't actually delete the old
736 * tuples, only replace them with redirects or placeholders.)
737 */
738 nToInsert = 0;
739 nToDelete = 0;
740 spaceToDelete = 0;
741 if (SpGistBlockIsRoot(current->blkno))
742 {
743 /*
744 * We are splitting the root (which up to now is also a leaf page).
745 * Its tuples are not linked, so scan sequentially to get them all. We
746 * ignore the original value of current->offnum.
747 */
748 for (i = FirstOffsetNumber; i <= max; i++)
749 {
750 SpGistLeafTuple it;
751
752 it = (SpGistLeafTuple) PageGetItem(current->page,
753 PageGetItemId(current->page, i));
754 if (it->tupstate == SPGIST_LIVE)
755 {
756 in.datums[nToInsert] =
757 isNulls ? (Datum) 0 : SGLTDATUM(it, state);
758 oldLeafs[nToInsert] = it;
759 nToInsert++;
760 toDelete[nToDelete] = i;
761 nToDelete++;
762 /* we will delete the tuple altogether, so count full space */
763 spaceToDelete += it->size + sizeof(ItemIdData);
764 }
765 else /* tuples on root should be live */
766 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
767 }
768 }
769 else
770 {
771 /* Normal case, just collect the leaf tuples in the chain */
772 i = current->offnum;
773 while (i != InvalidOffsetNumber)
774 {
775 SpGistLeafTuple it;
776
777 Assert(i >= FirstOffsetNumber && i <= max);
778 it = (SpGistLeafTuple) PageGetItem(current->page,
779 PageGetItemId(current->page, i));
780 if (it->tupstate == SPGIST_LIVE)
781 {
782 in.datums[nToInsert] =
783 isNulls ? (Datum) 0 : SGLTDATUM(it, state);
784 oldLeafs[nToInsert] = it;
785 nToInsert++;
786 toDelete[nToDelete] = i;
787 nToDelete++;
788 /* we will not delete the tuple, only replace with dead */
789 Assert(it->size >= SGDTSIZE);
790 spaceToDelete += it->size - SGDTSIZE;
791 }
792 else if (it->tupstate == SPGIST_DEAD)
793 {
794 /* We could see a DEAD tuple as first/only chain item */
795 Assert(i == current->offnum);
796 Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
797 toDelete[nToDelete] = i;
798 nToDelete++;
799 /* replacing it with redirect will save no space */
800 }
801 else
802 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
803
804 i = SGLT_GET_NEXTOFFSET(it);
805 }
806 }
807 in.nTuples = nToInsert;
808
809 /*
810 * We may not actually insert new tuple because another picksplit may be
811 * necessary due to too large value, but we will try to allocate enough
812 * space to include it; and in any case it has to be included in the input
813 * for the picksplit function. So don't increment nToInsert yet.
814 */
815 in.datums[in.nTuples] =
816 isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
817 oldLeafs[in.nTuples] = newLeafTuple;
818 in.nTuples++;
819
820 memset(&out, 0, sizeof(out));
821
822 if (!isNulls)
823 {
824 /*
825 * Perform split using user-defined method.
826 */
827 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
828 FunctionCall2Coll(procinfo,
829 index->rd_indcollation[0],
830 PointerGetDatum(&in),
831 PointerGetDatum(&out));
832
833 /*
834 * Form new leaf tuples and count up the total space needed.
835 */
836 totalLeafSizes = 0;
837 for (i = 0; i < in.nTuples; i++)
838 {
839 if (state->leafTupDesc->natts > 1)
840 spgDeformLeafTuple(oldLeafs[i],
841 state->leafTupDesc,
842 leafDatums,
843 leafIsnulls,
844 isNulls);
845
846 leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
847 leafIsnulls[spgKeyColumn] = false;
848
849 newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
850 leafDatums,
851 leafIsnulls);
852 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
853 }
854 }
855 else
856 {
857 /*
858 * Perform dummy split that puts all tuples into one node.
859 * checkAllTheSame will override this and force allTheSame mode.
860 */
861 out.hasPrefix = false;
862 out.nNodes = 1;
863 out.nodeLabels = NULL;
864 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
865
866 /*
867 * Form new leaf tuples and count up the total space needed.
868 */
869 totalLeafSizes = 0;
870 for (i = 0; i < in.nTuples; i++)
871 {
872 if (state->leafTupDesc->natts > 1)
873 spgDeformLeafTuple(oldLeafs[i],
874 state->leafTupDesc,
875 leafDatums,
876 leafIsnulls,
877 isNulls);
878
879 /*
880 * Nulls tree can contain only null key values.
881 */
882 leafDatums[spgKeyColumn] = (Datum) 0;
883 leafIsnulls[spgKeyColumn] = true;
884
885 newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
886 leafDatums,
887 leafIsnulls);
888 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
889 }
890 }
891
892 /*
893 * Check to see if the picksplit function failed to separate the values,
894 * ie, it put them all into the same child node. If so, select allTheSame
895 * mode and create a random split instead. See comments for
896 * checkAllTheSame as to why we need to know if the new leaf tuples could
897 * fit on one page.
898 */
899 allTheSame = checkAllTheSame(&in, &out,
900 totalLeafSizes > SPGIST_PAGE_CAPACITY,
901 &includeNew);
902
903 /*
904 * If checkAllTheSame decided we must exclude the new tuple, don't
905 * consider it any further.
906 */
907 if (includeNew)
908 maxToInclude = in.nTuples;
909 else
910 {
911 maxToInclude = in.nTuples - 1;
912 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
913 }
914
915 /*
916 * Allocate per-node work arrays. Since checkAllTheSame could replace
917 * out.nNodes with a value larger than the number of tuples on the input
918 * page, we can't allocate these arrays before here.
919 */
920 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
921 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
922
923 /*
924 * Form nodes of inner tuple and inner tuple itself
925 */
926 for (i = 0; i < out.nNodes; i++)
927 {
928 Datum label = (Datum) 0;
929 bool labelisnull = (out.nodeLabels == NULL);
930
931 if (!labelisnull)
932 label = out.nodeLabels[i];
933 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
934 }
935 innerTuple = spgFormInnerTuple(state,
936 out.hasPrefix, out.prefixDatum,
937 out.nNodes, nodes);
938 innerTuple->allTheSame = allTheSame;
939
940 /*
941 * Update nodes[] array to point into the newly formed innerTuple, so that
942 * we can adjust their downlinks below.
943 */
944 SGITITERATE(innerTuple, i, node)
945 {
946 nodes[i] = node;
947 }
948
949 /*
950 * Re-scan new leaf tuples and count up the space needed under each node.
951 */
952 for (i = 0; i < maxToInclude; i++)
953 {
954 n = out.mapTuplesToNodes[i];
955 if (n < 0 || n >= out.nNodes)
956 elog(ERROR, "inconsistent result of SPGiST picksplit function");
957 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
958 }
959
960 /*
961 * To perform the split, we must insert a new inner tuple, which can't go
962 * on a leaf page; and unless we are splitting the root page, we must then
963 * update the parent tuple's downlink to point to the inner tuple. If
964 * there is room, we'll put the new inner tuple on the same page as the
965 * parent tuple, otherwise we need another non-leaf buffer. But if the
966 * parent page is the root, we can't add the new inner tuple there,
967 * because the root page must have only one inner tuple.
968 */
969 xlrec.initInner = false;
970 if (parent->buffer != InvalidBuffer &&
971 !SpGistBlockIsRoot(parent->blkno) &&
972 (SpGistPageGetFreeSpace(parent->page, 1) >=
973 innerTuple->size + sizeof(ItemIdData)))
974 {
975 /* New inner tuple will fit on parent page */
976 newInnerBuffer = parent->buffer;
977 }
978 else if (parent->buffer != InvalidBuffer)
979 {
980 /* Send tuple to page with next triple parity (see README) */
981 newInnerBuffer = SpGistGetBuffer(index,
982 GBUF_INNER_PARITY(parent->blkno + 1) |
983 (isNulls ? GBUF_NULLS : 0),
984 innerTuple->size + sizeof(ItemIdData),
985 &xlrec.initInner);
986 }
987 else
988 {
989 /* Root page split ... inner tuple will go to root page */
990 newInnerBuffer = InvalidBuffer;
991 }
992
993 /*
994 * The new leaf tuples converted from the existing ones should require the
995 * same or less space, and therefore should all fit onto one page
996 * (although that's not necessarily the current page, since we can't
997 * delete the old tuples but only replace them with placeholders).
998 * However, the incoming new tuple might not also fit, in which case we
999 * might need another picksplit cycle to reduce it some more.
1000 *
1001 * If there's not room to put everything back onto the current page, then
1002 * we decide on a per-node basis which tuples go to the new page. (We do
1003 * it like that because leaf tuple chains can't cross pages, so we must
1004 * place all leaf tuples belonging to the same parent node on the same
1005 * page.)
1006 *
1007 * If we are splitting the root page (turning it from a leaf page into an
1008 * inner page), then no leaf tuples can go back to the current page; they
1009 * must all go somewhere else.
1010 */
1011 if (!SpGistBlockIsRoot(current->blkno))
1012 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1013 else
1014 currentFreeSpace = 0; /* prevent assigning any tuples to current */
1015
1016 xlrec.initDest = false;
1017
1018 if (totalLeafSizes <= currentFreeSpace)
1019 {
1020 /* All the leaf tuples will fit on current page */
1021 newLeafBuffer = InvalidBuffer;
1022 /* mark new leaf tuple as included in insertions, if allowed */
1023 if (includeNew)
1024 {
1025 nToInsert++;
1026 insertedNew = true;
1027 }
1028 for (i = 0; i < nToInsert; i++)
1029 leafPageSelect[i] = 0; /* signifies current page */
1030 }
1031 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1032 {
1033 /*
1034 * We're trying to split up a long value by repeated suffixing, but
1035 * it's not going to fit yet. Don't bother allocating a second leaf
1036 * buffer that we won't be able to use.
1037 */
1038 newLeafBuffer = InvalidBuffer;
1039 Assert(includeNew);
1040 Assert(nToInsert == 0);
1041 }
1042 else
1043 {
1044 /* We will need another leaf page */
1045 uint8 *nodePageSelect;
1046 int curspace;
1047 int newspace;
1048
1049 newLeafBuffer = SpGistGetBuffer(index,
1050 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1051 Min(totalLeafSizes,
1052 SPGIST_PAGE_CAPACITY),
1053 &xlrec.initDest);
1054
1055 /*
1056 * Attempt to assign node groups to the two pages. We might fail to
1057 * do so, even if totalLeafSizes is less than the available space,
1058 * because we can't split a group across pages.
1059 */
1060 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1061
1062 curspace = currentFreeSpace;
1063 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1064 for (i = 0; i < out.nNodes; i++)
1065 {
1066 if (leafSizes[i] <= curspace)
1067 {
1068 nodePageSelect[i] = 0; /* signifies current page */
1069 curspace -= leafSizes[i];
1070 }
1071 else
1072 {
1073 nodePageSelect[i] = 1; /* signifies new leaf page */
1074 newspace -= leafSizes[i];
1075 }
1076 }
1077 if (curspace >= 0 && newspace >= 0)
1078 {
1079 /* Successful assignment, so we can include the new leaf tuple */
1080 if (includeNew)
1081 {
1082 nToInsert++;
1083 insertedNew = true;
1084 }
1085 }
1086 else if (includeNew)
1087 {
1088 /* We must exclude the new leaf tuple from the split */
1089 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1090
1091 leafSizes[nodeOfNewTuple] -=
1092 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1093
1094 /* Repeat the node assignment process --- should succeed now */
1095 curspace = currentFreeSpace;
1096 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1097 for (i = 0; i < out.nNodes; i++)
1098 {
1099 if (leafSizes[i] <= curspace)
1100 {
1101 nodePageSelect[i] = 0; /* signifies current page */
1102 curspace -= leafSizes[i];
1103 }
1104 else
1105 {
1106 nodePageSelect[i] = 1; /* signifies new leaf page */
1107 newspace -= leafSizes[i];
1108 }
1109 }
1110 if (curspace < 0 || newspace < 0)
1111 elog(ERROR, "failed to divide leaf tuple groups across pages");
1112 }
1113 else
1114 {
1115 /* oops, we already excluded new tuple ... should not get here */
1116 elog(ERROR, "failed to divide leaf tuple groups across pages");
1117 }
1118 /* Expand the per-node assignments to be shown per leaf tuple */
1119 for (i = 0; i < nToInsert; i++)
1120 {
1121 n = out.mapTuplesToNodes[i];
1122 leafPageSelect[i] = nodePageSelect[n];
1123 }
1124 }
1125
1126 /* Start preparing WAL record */
1127 xlrec.nDelete = 0;
1128 xlrec.initSrc = isNew;
1129 xlrec.storesNulls = isNulls;
1130 xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1131
1132 leafdata = leafptr = (char *) palloc(totalLeafSizes);
1133
1134 /* Here we begin making the changes to the target pages */
1135 START_CRIT_SECTION();
1136
1137 /*
1138 * Delete old leaf tuples from current buffer, except when we're splitting
1139 * the root; in that case there's no need because we'll re-init the page
1140 * below. We do this first to make room for reinserting new leaf tuples.
1141 */
1142 if (!SpGistBlockIsRoot(current->blkno))
1143 {
1144 /*
1145 * Init buffer instead of deleting individual tuples, but only if
1146 * there aren't any other live tuples and only during build; otherwise
1147 * we need to set a redirection tuple for concurrent scans.
1148 */
1149 if (state->isBuild &&
1150 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1151 PageGetMaxOffsetNumber(current->page))
1152 {
1153 SpGistInitBuffer(current->buffer,
1154 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1155 xlrec.initSrc = true;
1156 }
1157 else if (isNew)
1158 {
1159 /* don't expose the freshly init'd buffer as a backup block */
1160 Assert(nToDelete == 0);
1161 }
1162 else
1163 {
1164 xlrec.nDelete = nToDelete;
1165
1166 if (!state->isBuild)
1167 {
1168 /*
1169 * Need to create redirect tuple (it will point to new inner
1170 * tuple) but right now the new tuple's location is not known
1171 * yet. So, set the redirection pointer to "impossible" value
1172 * and remember its position to update tuple later.
1173 */
1174 if (nToDelete > 0)
1175 redirectTuplePos = toDelete[0];
1176 spgPageIndexMultiDelete(state, current->page,
1177 toDelete, nToDelete,
1178 SPGIST_REDIRECT,
1179 SPGIST_PLACEHOLDER,
1180 SPGIST_METAPAGE_BLKNO,
1181 FirstOffsetNumber);
1182 }
1183 else
1184 {
1185 /*
1186 * During index build there is not concurrent searches, so we
1187 * don't need to create redirection tuple.
1188 */
1189 spgPageIndexMultiDelete(state, current->page,
1190 toDelete, nToDelete,
1191 SPGIST_PLACEHOLDER,
1192 SPGIST_PLACEHOLDER,
1193 InvalidBlockNumber,
1194 InvalidOffsetNumber);
1195 }
1196 }
1197 }
1198
1199 /*
1200 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1201 * nodes.
1202 */
1203 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1204 for (i = 0; i < nToInsert; i++)
1205 {
1206 SpGistLeafTuple it = newLeafs[i];
1207 Buffer leafBuffer;
1208 BlockNumber leafBlock;
1209 OffsetNumber newoffset;
1210
1211 /* Which page is it going to? */
1212 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1213 leafBlock = BufferGetBlockNumber(leafBuffer);
1214
1215 /* Link tuple into correct chain for its node */
1216 n = out.mapTuplesToNodes[i];
1217
1218 if (ItemPointerIsValid(&nodes[n]->t_tid))
1219 {
1220 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1221 SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1222 }
1223 else
1224 SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1225
1226 /* Insert it on page */
1227 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1228 (Item) it, it->size,
1229 &startOffsets[leafPageSelect[i]],
1230 false);
1231 toInsert[i] = newoffset;
1232
1233 /* ... and complete the chain linking */
1234 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1235
1236 /* Also copy leaf tuple into WAL data */
1237 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1238 leafptr += newLeafs[i]->size;
1239 }
1240
1241 /*
1242 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1243 * current->buffer will be marked below, after we're entirely done
1244 * modifying it.
1245 */
1246 if (newLeafBuffer != InvalidBuffer)
1247 {
1248 MarkBufferDirty(newLeafBuffer);
1249 }
1250
1251 /* Remember current buffer, since we're about to change "current" */
1252 saveCurrent = *current;
1253
1254 /*
1255 * Store the new innerTuple
1256 */
1257 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1258 {
1259 /*
1260 * new inner tuple goes to parent page
1261 */
1262 Assert(current->buffer != parent->buffer);
1263
1264 /* Repoint "current" at the new inner tuple */
1265 current->blkno = parent->blkno;
1266 current->buffer = parent->buffer;
1267 current->page = parent->page;
1268 xlrec.offnumInner = current->offnum =
1269 SpGistPageAddNewItem(state, current->page,
1270 (Item) innerTuple, innerTuple->size,
1271 NULL, false);
1272
1273 /*
1274 * Update parent node link and mark parent page dirty
1275 */
1276 xlrec.innerIsParent = true;
1277 xlrec.offnumParent = parent->offnum;
1278 xlrec.nodeI = parent->node;
1279 saveNodeLink(index, parent, current->blkno, current->offnum);
1280
1281 /*
1282 * Update redirection link (in old current buffer)
1283 */
1284 if (redirectTuplePos != InvalidOffsetNumber)
1285 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1286 current->blkno, current->offnum);
1287
1288 /* Done modifying old current buffer, mark it dirty */
1289 MarkBufferDirty(saveCurrent.buffer);
1290 }
1291 else if (parent->buffer != InvalidBuffer)
1292 {
1293 /*
1294 * new inner tuple will be stored on a new page
1295 */
1296 Assert(newInnerBuffer != InvalidBuffer);
1297
1298 /* Repoint "current" at the new inner tuple */
1299 current->buffer = newInnerBuffer;
1300 current->blkno = BufferGetBlockNumber(current->buffer);
1301 current->page = BufferGetPage(current->buffer);
1302 xlrec.offnumInner = current->offnum =
1303 SpGistPageAddNewItem(state, current->page,
1304 (Item) innerTuple, innerTuple->size,
1305 NULL, false);
1306
1307 /* Done modifying new current buffer, mark it dirty */
1308 MarkBufferDirty(current->buffer);
1309
1310 /*
1311 * Update parent node link and mark parent page dirty
1312 */
1313 xlrec.innerIsParent = (parent->buffer == current->buffer);
1314 xlrec.offnumParent = parent->offnum;
1315 xlrec.nodeI = parent->node;
1316 saveNodeLink(index, parent, current->blkno, current->offnum);
1317
1318 /*
1319 * Update redirection link (in old current buffer)
1320 */
1321 if (redirectTuplePos != InvalidOffsetNumber)
1322 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1323 current->blkno, current->offnum);
1324
1325 /* Done modifying old current buffer, mark it dirty */
1326 MarkBufferDirty(saveCurrent.buffer);
1327 }
1328 else
1329 {
1330 /*
1331 * Splitting root page, which was a leaf but now becomes inner page
1332 * (and so "current" continues to point at it)
1333 */
1334 Assert(SpGistBlockIsRoot(current->blkno));
1335 Assert(redirectTuplePos == InvalidOffsetNumber);
1336
1337 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1338 xlrec.initInner = true;
1339 xlrec.innerIsParent = false;
1340
1341 xlrec.offnumInner = current->offnum =
1342 PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1343 InvalidOffsetNumber, false, false);
1344 if (current->offnum != FirstOffsetNumber)
1345 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1346 innerTuple->size);
1347
1348 /* No parent link to update, nor redirection to do */
1349 xlrec.offnumParent = InvalidOffsetNumber;
1350 xlrec.nodeI = 0;
1351
1352 /* Done modifying new current buffer, mark it dirty */
1353 MarkBufferDirty(current->buffer);
1354
1355 /* saveCurrent doesn't represent a different buffer */
1356 saveCurrent.buffer = InvalidBuffer;
1357 }
1358
1359 if (RelationNeedsWAL(index) && !state->isBuild)
1360 {
1361 XLogRecPtr recptr;
1362 int flags;
1363
1364 XLogBeginInsert();
1365
1366 xlrec.nInsert = nToInsert;
1367 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1368
1369 XLogRegisterData((char *) toDelete,
1370 sizeof(OffsetNumber) * xlrec.nDelete);
1371 XLogRegisterData((char *) toInsert,
1372 sizeof(OffsetNumber) * xlrec.nInsert);
1373 XLogRegisterData((char *) leafPageSelect,
1374 sizeof(uint8) * xlrec.nInsert);
1375 XLogRegisterData((char *) innerTuple, innerTuple->size);
1376 XLogRegisterData(leafdata, leafptr - leafdata);
1377
1378 /* Old leaf page */
1379 if (BufferIsValid(saveCurrent.buffer))
1380 {
1381 flags = REGBUF_STANDARD;
1382 if (xlrec.initSrc)
1383 flags |= REGBUF_WILL_INIT;
1384 XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1385 }
1386
1387 /* New leaf page */
1388 if (BufferIsValid(newLeafBuffer))
1389 {
1390 flags = REGBUF_STANDARD;
1391 if (xlrec.initDest)
1392 flags |= REGBUF_WILL_INIT;
1393 XLogRegisterBuffer(1, newLeafBuffer, flags);
1394 }
1395
1396 /* Inner page */
1397 flags = REGBUF_STANDARD;
1398 if (xlrec.initInner)
1399 flags |= REGBUF_WILL_INIT;
1400 XLogRegisterBuffer(2, current->buffer, flags);
1401
1402 /* Parent page, if different from inner page */
1403 if (parent->buffer != InvalidBuffer)
1404 {
1405 if (parent->buffer != current->buffer)
1406 XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1407 else
1408 Assert(xlrec.innerIsParent);
1409 }
1410
1411 /* Issue the WAL record */
1412 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1413
1414 /* Update page LSNs on all affected pages */
1415 if (newLeafBuffer != InvalidBuffer)
1416 {
1417 Page page = BufferGetPage(newLeafBuffer);
1418
1419 PageSetLSN(page, recptr);
1420 }
1421
1422 if (saveCurrent.buffer != InvalidBuffer)
1423 {
1424 Page page = BufferGetPage(saveCurrent.buffer);
1425
1426 PageSetLSN(page, recptr);
1427 }
1428
1429 PageSetLSN(current->page, recptr);
1430
1431 if (parent->buffer != InvalidBuffer)
1432 {
1433 PageSetLSN(parent->page, recptr);
1434 }
1435 }
1436
1437 END_CRIT_SECTION();
1438
1439 /* Update local free-space cache and unlock buffers */
1440 if (newLeafBuffer != InvalidBuffer)
1441 {
1442 SpGistSetLastUsedPage(index, newLeafBuffer);
1443 UnlockReleaseBuffer(newLeafBuffer);
1444 }
1445 if (saveCurrent.buffer != InvalidBuffer)
1446 {
1447 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1448 UnlockReleaseBuffer(saveCurrent.buffer);
1449 }
1450
1451 return insertedNew;
1452 }
1453
1454 /*
1455 * spgMatchNode action: descend to N'th child node of current inner tuple
1456 */
1457 static void
spgMatchNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN)1458 spgMatchNodeAction(Relation index, SpGistState *state,
1459 SpGistInnerTuple innerTuple,
1460 SPPageDesc *current, SPPageDesc *parent, int nodeN)
1461 {
1462 int i;
1463 SpGistNodeTuple node;
1464
1465 /* Release previous parent buffer if any */
1466 if (parent->buffer != InvalidBuffer &&
1467 parent->buffer != current->buffer)
1468 {
1469 SpGistSetLastUsedPage(index, parent->buffer);
1470 UnlockReleaseBuffer(parent->buffer);
1471 }
1472
1473 /* Repoint parent to specified node of current inner tuple */
1474 parent->blkno = current->blkno;
1475 parent->buffer = current->buffer;
1476 parent->page = current->page;
1477 parent->offnum = current->offnum;
1478 parent->node = nodeN;
1479
1480 /* Locate that node */
1481 SGITITERATE(innerTuple, i, node)
1482 {
1483 if (i == nodeN)
1484 break;
1485 }
1486
1487 if (i != nodeN)
1488 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1489 nodeN);
1490
1491 /* Point current to the downlink location, if any */
1492 if (ItemPointerIsValid(&node->t_tid))
1493 {
1494 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1495 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1496 }
1497 else
1498 {
1499 /* Downlink is empty, so we'll need to find a new page */
1500 current->blkno = InvalidBlockNumber;
1501 current->offnum = InvalidOffsetNumber;
1502 }
1503
1504 current->buffer = InvalidBuffer;
1505 current->page = NULL;
1506 }
1507
1508 /*
1509 * spgAddNode action: add a node to the inner tuple at current
1510 */
1511 static void
spgAddNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,SPPageDesc * parent,int nodeN,Datum nodeLabel)1512 spgAddNodeAction(Relation index, SpGistState *state,
1513 SpGistInnerTuple innerTuple,
1514 SPPageDesc *current, SPPageDesc *parent,
1515 int nodeN, Datum nodeLabel)
1516 {
1517 SpGistInnerTuple newInnerTuple;
1518 spgxlogAddNode xlrec;
1519
1520 /* Should not be applied to nulls */
1521 Assert(!SpGistPageStoresNulls(current->page));
1522
1523 /* Construct new inner tuple with additional node */
1524 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1525
1526 /* Prepare WAL record */
1527 STORE_STATE(state, xlrec.stateSrc);
1528 xlrec.offnum = current->offnum;
1529
1530 /* we don't fill these unless we need to change the parent downlink */
1531 xlrec.parentBlk = -1;
1532 xlrec.offnumParent = InvalidOffsetNumber;
1533 xlrec.nodeI = 0;
1534
1535 /* we don't fill these unless tuple has to be moved */
1536 xlrec.offnumNew = InvalidOffsetNumber;
1537 xlrec.newPage = false;
1538
1539 if (PageGetExactFreeSpace(current->page) >=
1540 newInnerTuple->size - innerTuple->size)
1541 {
1542 /*
1543 * We can replace the inner tuple by new version in-place
1544 */
1545 START_CRIT_SECTION();
1546
1547 PageIndexTupleDelete(current->page, current->offnum);
1548 if (PageAddItem(current->page,
1549 (Item) newInnerTuple, newInnerTuple->size,
1550 current->offnum, false, false) != current->offnum)
1551 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1552 newInnerTuple->size);
1553
1554 MarkBufferDirty(current->buffer);
1555
1556 if (RelationNeedsWAL(index) && !state->isBuild)
1557 {
1558 XLogRecPtr recptr;
1559
1560 XLogBeginInsert();
1561 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1562 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1563
1564 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1565
1566 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1567
1568 PageSetLSN(current->page, recptr);
1569 }
1570
1571 END_CRIT_SECTION();
1572 }
1573 else
1574 {
1575 /*
1576 * move inner tuple to another page, and update parent
1577 */
1578 SpGistDeadTuple dt;
1579 SPPageDesc saveCurrent;
1580
1581 /*
1582 * It should not be possible to get here for the root page, since we
1583 * allow only one inner tuple on the root page, and spgFormInnerTuple
1584 * always checks that inner tuples don't exceed the size of a page.
1585 */
1586 if (SpGistBlockIsRoot(current->blkno))
1587 elog(ERROR, "cannot enlarge root tuple any more");
1588 Assert(parent->buffer != InvalidBuffer);
1589
1590 saveCurrent = *current;
1591
1592 xlrec.offnumParent = parent->offnum;
1593 xlrec.nodeI = parent->node;
1594
1595 /*
1596 * obtain new buffer with the same parity as current, since it will be
1597 * a child of same parent tuple
1598 */
1599 current->buffer = SpGistGetBuffer(index,
1600 GBUF_INNER_PARITY(current->blkno),
1601 newInnerTuple->size + sizeof(ItemIdData),
1602 &xlrec.newPage);
1603 current->blkno = BufferGetBlockNumber(current->buffer);
1604 current->page = BufferGetPage(current->buffer);
1605
1606 /*
1607 * Let's just make real sure new current isn't same as old. Right now
1608 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1609 * delete placeholder tuples before checking space, maybe it wouldn't
1610 * be impossible. The case would appear to work except that WAL
1611 * replay would be subtly wrong, so I think a mere assert isn't enough
1612 * here.
1613 */
1614 if (current->blkno == saveCurrent.blkno)
1615 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1616
1617 /*
1618 * New current and parent buffer will both be modified; but note that
1619 * parent buffer could be same as either new or old current.
1620 */
1621 if (parent->buffer == saveCurrent.buffer)
1622 xlrec.parentBlk = 0;
1623 else if (parent->buffer == current->buffer)
1624 xlrec.parentBlk = 1;
1625 else
1626 xlrec.parentBlk = 2;
1627
1628 START_CRIT_SECTION();
1629
1630 /* insert new ... */
1631 xlrec.offnumNew = current->offnum =
1632 SpGistPageAddNewItem(state, current->page,
1633 (Item) newInnerTuple, newInnerTuple->size,
1634 NULL, false);
1635
1636 MarkBufferDirty(current->buffer);
1637
1638 /* update parent's downlink and mark parent page dirty */
1639 saveNodeLink(index, parent, current->blkno, current->offnum);
1640
1641 /*
1642 * Replace old tuple with a placeholder or redirection tuple. Unless
1643 * doing an index build, we have to insert a redirection tuple for
1644 * possible concurrent scans. We can't just delete it in any case,
1645 * because that could change the offsets of other tuples on the page,
1646 * breaking downlinks from their parents.
1647 */
1648 if (state->isBuild)
1649 dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1650 InvalidBlockNumber, InvalidOffsetNumber);
1651 else
1652 dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1653 current->blkno, current->offnum);
1654
1655 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1656 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1657 saveCurrent.offnum,
1658 false, false) != saveCurrent.offnum)
1659 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1660 dt->size);
1661
1662 if (state->isBuild)
1663 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1664 else
1665 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1666
1667 MarkBufferDirty(saveCurrent.buffer);
1668
1669 if (RelationNeedsWAL(index) && !state->isBuild)
1670 {
1671 XLogRecPtr recptr;
1672 int flags;
1673
1674 XLogBeginInsert();
1675
1676 /* orig page */
1677 XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1678 /* new page */
1679 flags = REGBUF_STANDARD;
1680 if (xlrec.newPage)
1681 flags |= REGBUF_WILL_INIT;
1682 XLogRegisterBuffer(1, current->buffer, flags);
1683 /* parent page (if different from orig and new) */
1684 if (xlrec.parentBlk == 2)
1685 XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1686
1687 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1688 XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1689
1690 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1691
1692 /* we don't bother to check if any of these are redundant */
1693 PageSetLSN(current->page, recptr);
1694 PageSetLSN(parent->page, recptr);
1695 PageSetLSN(saveCurrent.page, recptr);
1696 }
1697
1698 END_CRIT_SECTION();
1699
1700 /* Release saveCurrent if it's not same as current or parent */
1701 if (saveCurrent.buffer != current->buffer &&
1702 saveCurrent.buffer != parent->buffer)
1703 {
1704 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1705 UnlockReleaseBuffer(saveCurrent.buffer);
1706 }
1707 }
1708 }
1709
1710 /*
1711 * spgSplitNode action: split inner tuple at current into prefix and postfix
1712 */
1713 static void
spgSplitNodeAction(Relation index,SpGistState * state,SpGistInnerTuple innerTuple,SPPageDesc * current,spgChooseOut * out)1714 spgSplitNodeAction(Relation index, SpGistState *state,
1715 SpGistInnerTuple innerTuple,
1716 SPPageDesc *current, spgChooseOut *out)
1717 {
1718 SpGistInnerTuple prefixTuple,
1719 postfixTuple;
1720 SpGistNodeTuple node,
1721 *nodes;
1722 BlockNumber postfixBlkno;
1723 OffsetNumber postfixOffset;
1724 int i;
1725 spgxlogSplitTuple xlrec;
1726 Buffer newBuffer = InvalidBuffer;
1727
1728 /* Should not be applied to nulls */
1729 Assert(!SpGistPageStoresNulls(current->page));
1730
1731 /* Check opclass gave us sane values */
1732 if (out->result.splitTuple.prefixNNodes <= 0 ||
1733 out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1734 elog(ERROR, "invalid number of prefix nodes: %d",
1735 out->result.splitTuple.prefixNNodes);
1736 if (out->result.splitTuple.childNodeN < 0 ||
1737 out->result.splitTuple.childNodeN >=
1738 out->result.splitTuple.prefixNNodes)
1739 elog(ERROR, "invalid child node number: %d",
1740 out->result.splitTuple.childNodeN);
1741
1742 /*
1743 * Construct new prefix tuple with requested number of nodes. We'll fill
1744 * in the childNodeN'th node's downlink below.
1745 */
1746 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1747 out->result.splitTuple.prefixNNodes);
1748
1749 for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1750 {
1751 Datum label = (Datum) 0;
1752 bool labelisnull;
1753
1754 labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1755 if (!labelisnull)
1756 label = out->result.splitTuple.prefixNodeLabels[i];
1757 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1758 }
1759
1760 prefixTuple = spgFormInnerTuple(state,
1761 out->result.splitTuple.prefixHasPrefix,
1762 out->result.splitTuple.prefixPrefixDatum,
1763 out->result.splitTuple.prefixNNodes,
1764 nodes);
1765
1766 /* it must fit in the space that innerTuple now occupies */
1767 if (prefixTuple->size > innerTuple->size)
1768 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1769
1770 /*
1771 * Construct new postfix tuple, containing all nodes of innerTuple with
1772 * same node datums, but with the prefix specified by the picksplit
1773 * function.
1774 */
1775 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1776 SGITITERATE(innerTuple, i, node)
1777 {
1778 nodes[i] = node;
1779 }
1780
1781 postfixTuple = spgFormInnerTuple(state,
1782 out->result.splitTuple.postfixHasPrefix,
1783 out->result.splitTuple.postfixPrefixDatum,
1784 innerTuple->nNodes, nodes);
1785
1786 /* Postfix tuple is allTheSame if original tuple was */
1787 postfixTuple->allTheSame = innerTuple->allTheSame;
1788
1789 /* prep data for WAL record */
1790 xlrec.newPage = false;
1791
1792 /*
1793 * If we can't fit both tuples on the current page, get a new page for the
1794 * postfix tuple. In particular, can't split to the root page.
1795 *
1796 * For the space calculation, note that prefixTuple replaces innerTuple
1797 * but postfixTuple will be a new entry.
1798 */
1799 if (SpGistBlockIsRoot(current->blkno) ||
1800 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1801 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1802 {
1803 /*
1804 * Choose page with next triple parity, because postfix tuple is a
1805 * child of prefix one
1806 */
1807 newBuffer = SpGistGetBuffer(index,
1808 GBUF_INNER_PARITY(current->blkno + 1),
1809 postfixTuple->size + sizeof(ItemIdData),
1810 &xlrec.newPage);
1811 }
1812
1813 START_CRIT_SECTION();
1814
1815 /*
1816 * Replace old tuple by prefix tuple
1817 */
1818 PageIndexTupleDelete(current->page, current->offnum);
1819 xlrec.offnumPrefix = PageAddItem(current->page,
1820 (Item) prefixTuple, prefixTuple->size,
1821 current->offnum, false, false);
1822 if (xlrec.offnumPrefix != current->offnum)
1823 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1824 prefixTuple->size);
1825
1826 /*
1827 * put postfix tuple into appropriate page
1828 */
1829 if (newBuffer == InvalidBuffer)
1830 {
1831 postfixBlkno = current->blkno;
1832 xlrec.offnumPostfix = postfixOffset =
1833 SpGistPageAddNewItem(state, current->page,
1834 (Item) postfixTuple, postfixTuple->size,
1835 NULL, false);
1836 xlrec.postfixBlkSame = true;
1837 }
1838 else
1839 {
1840 postfixBlkno = BufferGetBlockNumber(newBuffer);
1841 xlrec.offnumPostfix = postfixOffset =
1842 SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1843 (Item) postfixTuple, postfixTuple->size,
1844 NULL, false);
1845 MarkBufferDirty(newBuffer);
1846 xlrec.postfixBlkSame = false;
1847 }
1848
1849 /*
1850 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1851 * (We can't avoid this step by doing the above two steps in opposite
1852 * order, because there might not be enough space on the page to insert
1853 * the postfix tuple first.) We have to update the local copy of the
1854 * prefixTuple too, because that's what will be written to WAL.
1855 */
1856 spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1857 postfixBlkno, postfixOffset);
1858 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1859 PageGetItemId(current->page, current->offnum));
1860 spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1861 postfixBlkno, postfixOffset);
1862
1863 MarkBufferDirty(current->buffer);
1864
1865 if (RelationNeedsWAL(index) && !state->isBuild)
1866 {
1867 XLogRecPtr recptr;
1868
1869 XLogBeginInsert();
1870 XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1871 XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1872 XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1873
1874 XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1875 if (newBuffer != InvalidBuffer)
1876 {
1877 int flags;
1878
1879 flags = REGBUF_STANDARD;
1880 if (xlrec.newPage)
1881 flags |= REGBUF_WILL_INIT;
1882 XLogRegisterBuffer(1, newBuffer, flags);
1883 }
1884
1885 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1886
1887 PageSetLSN(current->page, recptr);
1888
1889 if (newBuffer != InvalidBuffer)
1890 {
1891 PageSetLSN(BufferGetPage(newBuffer), recptr);
1892 }
1893 }
1894
1895 END_CRIT_SECTION();
1896
1897 /* Update local free-space cache and release buffer */
1898 if (newBuffer != InvalidBuffer)
1899 {
1900 SpGistSetLastUsedPage(index, newBuffer);
1901 UnlockReleaseBuffer(newBuffer);
1902 }
1903 }
1904
1905 /*
1906 * Insert one item into the index.
1907 *
1908 * Returns true on success, false if we failed to complete the insertion
1909 * (typically because of conflict with a concurrent insert). In the latter
1910 * case, caller should re-call spgdoinsert() with the same args.
1911 */
1912 bool
spgdoinsert(Relation index,SpGistState * state,ItemPointer heapPtr,Datum * datums,bool * isnulls)1913 spgdoinsert(Relation index, SpGistState *state,
1914 ItemPointer heapPtr, Datum *datums, bool *isnulls)
1915 {
1916 bool result = true;
1917 TupleDesc leafDescriptor = state->leafTupDesc;
1918 bool isnull = isnulls[spgKeyColumn];
1919 int level = 0;
1920 Datum leafDatums[INDEX_MAX_KEYS];
1921 int leafSize;
1922 int bestLeafSize;
1923 int numNoProgressCycles = 0;
1924 SPPageDesc current,
1925 parent;
1926 FmgrInfo *procinfo = NULL;
1927
1928 /*
1929 * Look up FmgrInfo of the user-defined choose function once, to save
1930 * cycles in the loop below.
1931 */
1932 if (!isnull)
1933 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1934
1935 /*
1936 * Prepare the leaf datum to insert.
1937 *
1938 * If an optional "compress" method is provided, then call it to form the
1939 * leaf key datum from the input datum. Otherwise, store the input datum
1940 * as is. Since we don't use index_form_tuple in this AM, we have to make
1941 * sure value to be inserted is not toasted; FormIndexDatum doesn't
1942 * guarantee that. But we assume the "compress" method to return an
1943 * untoasted value.
1944 */
1945 if (!isnull)
1946 {
1947 if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1948 {
1949 FmgrInfo *compressProcinfo = NULL;
1950
1951 compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1952 leafDatums[spgKeyColumn] =
1953 FunctionCall1Coll(compressProcinfo,
1954 index->rd_indcollation[spgKeyColumn],
1955 datums[spgKeyColumn]);
1956 }
1957 else
1958 {
1959 Assert(state->attLeafType.type == state->attType.type);
1960
1961 if (state->attType.attlen == -1)
1962 leafDatums[spgKeyColumn] =
1963 PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1964 else
1965 leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1966 }
1967 }
1968 else
1969 leafDatums[spgKeyColumn] = (Datum) 0;
1970
1971 /* Likewise, ensure that any INCLUDE values are not toasted */
1972 for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1973 {
1974 if (!isnulls[i])
1975 {
1976 if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
1977 leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1978 else
1979 leafDatums[i] = datums[i];
1980 }
1981 else
1982 leafDatums[i] = (Datum) 0;
1983 }
1984
1985 /*
1986 * Compute space needed for a leaf tuple containing the given data.
1987 */
1988 leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1989 /* Account for an item pointer, too */
1990 leafSize += sizeof(ItemIdData);
1991
1992 /*
1993 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1994 * suffixing, bail out now rather than doing a lot of useless work.
1995 */
1996 if (leafSize > SPGIST_PAGE_CAPACITY &&
1997 (isnull || !state->config.longValuesOK))
1998 ereport(ERROR,
1999 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2000 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2001 leafSize - sizeof(ItemIdData),
2002 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2003 RelationGetRelationName(index)),
2004 errhint("Values larger than a buffer page cannot be indexed.")));
2005 bestLeafSize = leafSize;
2006
2007 /* Initialize "current" to the appropriate root page */
2008 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2009 current.buffer = InvalidBuffer;
2010 current.page = NULL;
2011 current.offnum = FirstOffsetNumber;
2012 current.node = -1;
2013
2014 /* "parent" is invalid for the moment */
2015 parent.blkno = InvalidBlockNumber;
2016 parent.buffer = InvalidBuffer;
2017 parent.page = NULL;
2018 parent.offnum = InvalidOffsetNumber;
2019 parent.node = -1;
2020
2021 /*
2022 * Before entering the loop, try to clear any pending interrupt condition.
2023 * If a query cancel is pending, we might as well accept it now not later;
2024 * while if a non-canceling condition is pending, servicing it here avoids
2025 * having to restart the insertion and redo all the work so far.
2026 */
2027 CHECK_FOR_INTERRUPTS();
2028
2029 for (;;)
2030 {
2031 bool isNew = false;
2032
2033 /*
2034 * Bail out if query cancel is pending. We must have this somewhere
2035 * in the loop since a broken opclass could produce an infinite
2036 * picksplit loop. However, because we'll be holding buffer lock(s)
2037 * after the first iteration, ProcessInterrupts() wouldn't be able to
2038 * throw a cancel error here. Hence, if we see that an interrupt is
2039 * pending, break out of the loop and deal with the situation below.
2040 * Set result = false because we must restart the insertion if the
2041 * interrupt isn't a query-cancel-or-die case.
2042 */
2043 if (INTERRUPTS_PENDING_CONDITION())
2044 {
2045 result = false;
2046 break;
2047 }
2048
2049 if (current.blkno == InvalidBlockNumber)
2050 {
2051 /*
2052 * Create a leaf page. If leafSize is too large to fit on a page,
2053 * we won't actually use the page yet, but it simplifies the API
2054 * for doPickSplit to always have a leaf page at hand; so just
2055 * quietly limit our request to a page size.
2056 */
2057 current.buffer =
2058 SpGistGetBuffer(index,
2059 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2060 Min(leafSize, SPGIST_PAGE_CAPACITY),
2061 &isNew);
2062 current.blkno = BufferGetBlockNumber(current.buffer);
2063 }
2064 else if (parent.buffer == InvalidBuffer)
2065 {
2066 /* we hold no parent-page lock, so no deadlock is possible */
2067 current.buffer = ReadBuffer(index, current.blkno);
2068 LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2069 }
2070 else if (current.blkno != parent.blkno)
2071 {
2072 /* descend to a new child page */
2073 current.buffer = ReadBuffer(index, current.blkno);
2074
2075 /*
2076 * Attempt to acquire lock on child page. We must beware of
2077 * deadlock against another insertion process descending from that
2078 * page to our parent page (see README). If we fail to get lock,
2079 * abandon the insertion and tell our caller to start over.
2080 *
2081 * XXX this could be improved, because failing to get lock on a
2082 * buffer is not proof of a deadlock situation; the lock might be
2083 * held by a reader, or even just background writer/checkpointer
2084 * process. Perhaps it'd be worth retrying after sleeping a bit?
2085 */
2086 if (!ConditionalLockBuffer(current.buffer))
2087 {
2088 ReleaseBuffer(current.buffer);
2089 UnlockReleaseBuffer(parent.buffer);
2090 return false;
2091 }
2092 }
2093 else
2094 {
2095 /* inner tuple can be stored on the same page as parent one */
2096 current.buffer = parent.buffer;
2097 }
2098 current.page = BufferGetPage(current.buffer);
2099
2100 /* should not arrive at a page of the wrong type */
2101 if (isnull ? !SpGistPageStoresNulls(current.page) :
2102 SpGistPageStoresNulls(current.page))
2103 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2104 current.blkno);
2105
2106 if (SpGistPageIsLeaf(current.page))
2107 {
2108 SpGistLeafTuple leafTuple;
2109 int nToSplit,
2110 sizeToSplit;
2111
2112 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2113 if (leafTuple->size + sizeof(ItemIdData) <=
2114 SpGistPageGetFreeSpace(current.page, 1))
2115 {
2116 /* it fits on page, so insert it and we're done */
2117 addLeafTuple(index, state, leafTuple,
2118 ¤t, &parent, isnull, isNew);
2119 break;
2120 }
2121 else if ((sizeToSplit =
2122 checkSplitConditions(index, state, ¤t,
2123 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2124 nToSplit < 64 &&
2125 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2126 {
2127 /*
2128 * the amount of data is pretty small, so just move the whole
2129 * chain to another leaf page rather than splitting it.
2130 */
2131 Assert(!isNew);
2132 moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
2133 break; /* we're done */
2134 }
2135 else
2136 {
2137 /* picksplit */
2138 if (doPickSplit(index, state, ¤t, &parent,
2139 leafTuple, level, isnull, isNew))
2140 break; /* doPickSplit installed new tuples */
2141
2142 /* leaf tuple will not be inserted yet */
2143 pfree(leafTuple);
2144
2145 /*
2146 * current now describes new inner tuple, go insert into it
2147 */
2148 Assert(!SpGistPageIsLeaf(current.page));
2149 goto process_inner_tuple;
2150 }
2151 }
2152 else /* non-leaf page */
2153 {
2154 /*
2155 * Apply the opclass choose function to figure out how to insert
2156 * the given datum into the current inner tuple.
2157 */
2158 SpGistInnerTuple innerTuple;
2159 spgChooseIn in;
2160 spgChooseOut out;
2161
2162 /*
2163 * spgAddNode and spgSplitTuple cases will loop back to here to
2164 * complete the insertion operation. Just in case the choose
2165 * function is broken and produces add or split requests
2166 * repeatedly, check for query cancel (see comments above).
2167 */
2168 process_inner_tuple:
2169 if (INTERRUPTS_PENDING_CONDITION())
2170 {
2171 result = false;
2172 break;
2173 }
2174
2175 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2176 PageGetItemId(current.page, current.offnum));
2177
2178 in.datum = datums[spgKeyColumn];
2179 in.leafDatum = leafDatums[spgKeyColumn];
2180 in.level = level;
2181 in.allTheSame = innerTuple->allTheSame;
2182 in.hasPrefix = (innerTuple->prefixSize > 0);
2183 in.prefixDatum = SGITDATUM(innerTuple, state);
2184 in.nNodes = innerTuple->nNodes;
2185 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2186
2187 memset(&out, 0, sizeof(out));
2188
2189 if (!isnull)
2190 {
2191 /* use user-defined choose method */
2192 FunctionCall2Coll(procinfo,
2193 index->rd_indcollation[0],
2194 PointerGetDatum(&in),
2195 PointerGetDatum(&out));
2196 }
2197 else
2198 {
2199 /* force "match" action (to insert to random subnode) */
2200 out.resultType = spgMatchNode;
2201 }
2202
2203 if (innerTuple->allTheSame)
2204 {
2205 /*
2206 * It's not allowed to do an AddNode at an allTheSame tuple.
2207 * Opclass must say "match", in which case we choose a random
2208 * one of the nodes to descend into, or "split".
2209 */
2210 if (out.resultType == spgAddNode)
2211 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2212 else if (out.resultType == spgMatchNode)
2213 out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2214 }
2215
2216 switch (out.resultType)
2217 {
2218 case spgMatchNode:
2219 /* Descend to N'th child node */
2220 spgMatchNodeAction(index, state, innerTuple,
2221 ¤t, &parent,
2222 out.result.matchNode.nodeN);
2223 /* Adjust level as per opclass request */
2224 level += out.result.matchNode.levelAdd;
2225 /* Replace leafDatum and recompute leafSize */
2226 if (!isnull)
2227 {
2228 leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2229 leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2230 leafDatums, isnulls);
2231 leafSize += sizeof(ItemIdData);
2232 }
2233
2234 /*
2235 * Check new tuple size; fail if it can't fit, unless the
2236 * opclass says it can handle the situation by suffixing.
2237 *
2238 * However, the opclass can only shorten the leaf datum,
2239 * which may not be enough to ever make the tuple fit,
2240 * since INCLUDE columns might alone use more than a page.
2241 * Depending on the opclass' behavior, that could lead to
2242 * an infinite loop --- spgtextproc.c, for example, will
2243 * just repeatedly generate an empty-string leaf datum
2244 * once it runs out of data. Actual bugs in opclasses
2245 * might cause infinite looping, too. To detect such a
2246 * loop, check to see if we are making progress by
2247 * reducing the leafSize in each pass. This is a bit
2248 * tricky though. Because of alignment considerations,
2249 * the total tuple size might not decrease on every pass.
2250 * Also, there are edge cases where the choose method
2251 * might seem to not make progress for a cycle or two.
2252 * Somewhat arbitrarily, we allow up to 10 no-progress
2253 * iterations before failing. (This limit should be more
2254 * than MAXALIGN, to accommodate opclasses that trim one
2255 * byte from the leaf datum per pass.)
2256 */
2257 if (leafSize > SPGIST_PAGE_CAPACITY)
2258 {
2259 bool ok = false;
2260
2261 if (state->config.longValuesOK && !isnull)
2262 {
2263 if (leafSize < bestLeafSize)
2264 {
2265 ok = true;
2266 bestLeafSize = leafSize;
2267 numNoProgressCycles = 0;
2268 }
2269 else if (++numNoProgressCycles < 10)
2270 ok = true;
2271 }
2272 if (!ok)
2273 ereport(ERROR,
2274 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2275 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2276 leafSize - sizeof(ItemIdData),
2277 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2278 RelationGetRelationName(index)),
2279 errhint("Values larger than a buffer page cannot be indexed.")));
2280 }
2281
2282 /*
2283 * Loop around and attempt to insert the new leafDatum at
2284 * "current" (which might reference an existing child
2285 * tuple, or might be invalid to force us to find a new
2286 * page for the tuple).
2287 */
2288 break;
2289 case spgAddNode:
2290 /* AddNode is not sensible if nodes don't have labels */
2291 if (in.nodeLabels == NULL)
2292 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2293 /* Add node to inner tuple, per request */
2294 spgAddNodeAction(index, state, innerTuple,
2295 ¤t, &parent,
2296 out.result.addNode.nodeN,
2297 out.result.addNode.nodeLabel);
2298
2299 /*
2300 * Retry insertion into the enlarged node. We assume that
2301 * we'll get a MatchNode result this time.
2302 */
2303 goto process_inner_tuple;
2304 break;
2305 case spgSplitTuple:
2306 /* Split inner tuple, per request */
2307 spgSplitNodeAction(index, state, innerTuple,
2308 ¤t, &out);
2309
2310 /* Retry insertion into the split node */
2311 goto process_inner_tuple;
2312 break;
2313 default:
2314 elog(ERROR, "unrecognized SPGiST choose result: %d",
2315 (int) out.resultType);
2316 break;
2317 }
2318 }
2319 } /* end loop */
2320
2321 /*
2322 * Release any buffers we're still holding. Beware of possibility that
2323 * current and parent reference same buffer.
2324 */
2325 if (current.buffer != InvalidBuffer)
2326 {
2327 SpGistSetLastUsedPage(index, current.buffer);
2328 UnlockReleaseBuffer(current.buffer);
2329 }
2330 if (parent.buffer != InvalidBuffer &&
2331 parent.buffer != current.buffer)
2332 {
2333 SpGistSetLastUsedPage(index, parent.buffer);
2334 UnlockReleaseBuffer(parent.buffer);
2335 }
2336
2337 /*
2338 * We do not support being called while some outer function is holding a
2339 * buffer lock (or any other reason to postpone query cancels). If that
2340 * were the case, telling the caller to retry would create an infinite
2341 * loop.
2342 */
2343 Assert(INTERRUPTS_CAN_BE_PROCESSED());
2344
2345 /*
2346 * Finally, check for interrupts again. If there was a query cancel,
2347 * ProcessInterrupts() will be able to throw the error here. If it was
2348 * some other kind of interrupt that can just be cleared, return false to
2349 * tell our caller to retry.
2350 */
2351 CHECK_FOR_INTERRUPTS();
2352
2353 return result;
2354 }
2355