1 /*------------------------------------------------------------------------- 2 * 3 * spgutils.c 4 * various support functions for SP-GiST 5 * 6 * 7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994, Regents of the University of California 9 * 10 * IDENTIFICATION 11 * src/backend/access/spgist/spgutils.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 16 #include "postgres.h" 17 18 #include "access/amvalidate.h" 19 #include "access/htup_details.h" 20 #include "access/reloptions.h" 21 #include "access/spgist_private.h" 22 #include "access/transam.h" 23 #include "access/xact.h" 24 #include "catalog/pg_amop.h" 25 #include "storage/bufmgr.h" 26 #include "storage/indexfsm.h" 27 #include "storage/lmgr.h" 28 #include "utils/builtins.h" 29 #include "utils/catcache.h" 30 #include "utils/index_selfuncs.h" 31 #include "utils/lsyscache.h" 32 #include "utils/syscache.h" 33 34 35 /* 36 * SP-GiST handler function: return IndexAmRoutine with access method parameters 37 * and callbacks. 38 */ 39 Datum 40 spghandler(PG_FUNCTION_ARGS) 41 { 42 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine); 43 44 amroutine->amstrategies = 0; 45 amroutine->amsupport = SPGISTNProc; 46 amroutine->amcanorder = false; 47 amroutine->amcanorderbyop = true; 48 amroutine->amcanbackward = false; 49 amroutine->amcanunique = false; 50 amroutine->amcanmulticol = false; 51 amroutine->amoptionalkey = true; 52 amroutine->amsearcharray = false; 53 amroutine->amsearchnulls = true; 54 amroutine->amstorage = false; 55 amroutine->amclusterable = false; 56 amroutine->ampredlocks = false; 57 amroutine->amcanparallel = false; 58 amroutine->amcaninclude = false; 59 amroutine->amkeytype = InvalidOid; 60 61 amroutine->ambuild = spgbuild; 62 amroutine->ambuildempty = spgbuildempty; 63 amroutine->aminsert = spginsert; 64 amroutine->ambulkdelete = spgbulkdelete; 65 amroutine->amvacuumcleanup = spgvacuumcleanup; 66 amroutine->amcanreturn = spgcanreturn; 67 amroutine->amcostestimate = spgcostestimate; 68 amroutine->amoptions = spgoptions; 69 amroutine->amproperty = spgproperty; 70 amroutine->ambuildphasename = NULL; 71 amroutine->amvalidate = spgvalidate; 72 amroutine->ambeginscan = spgbeginscan; 73 amroutine->amrescan = spgrescan; 74 amroutine->amgettuple = spggettuple; 75 amroutine->amgetbitmap = spggetbitmap; 76 amroutine->amendscan = spgendscan; 77 amroutine->ammarkpos = NULL; 78 amroutine->amrestrpos = NULL; 79 amroutine->amestimateparallelscan = NULL; 80 amroutine->aminitparallelscan = NULL; 81 amroutine->amparallelrescan = NULL; 82 83 PG_RETURN_POINTER(amroutine); 84 } 85 86 /* Fill in a SpGistTypeDesc struct with info about the specified data type */ 87 static void 88 fillTypeDesc(SpGistTypeDesc *desc, Oid type) 89 { 90 desc->type = type; 91 get_typlenbyval(type, &desc->attlen, &desc->attbyval); 92 } 93 94 /* 95 * Fetch local cache of AM-specific info about the index, initializing it 96 * if necessary 97 */ 98 SpGistCache * 99 spgGetCache(Relation index) 100 { 101 SpGistCache *cache; 102 103 if (index->rd_amcache == NULL) 104 { 105 Oid atttype; 106 spgConfigIn in; 107 FmgrInfo *procinfo; 108 Buffer metabuffer; 109 SpGistMetaPageData *metadata; 110 111 cache = MemoryContextAllocZero(index->rd_indexcxt, 112 sizeof(SpGistCache)); 113 114 /* SPGiST doesn't support multi-column indexes */ 115 Assert(index->rd_att->natts == 1); 116 117 /* 118 * Get the actual data type of the indexed column from the index 119 * tupdesc. We pass this to the opclass config function so that 120 * polymorphic opclasses are possible. 121 */ 122 atttype = TupleDescAttr(index->rd_att, 0)->atttypid; 123 124 /* Call the config function to get config info for the opclass */ 125 in.attType = atttype; 126 127 procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC); 128 FunctionCall2Coll(procinfo, 129 index->rd_indcollation[0], 130 PointerGetDatum(&in), 131 PointerGetDatum(&cache->config)); 132 133 /* Get the information we need about each relevant datatype */ 134 fillTypeDesc(&cache->attType, atttype); 135 136 if (OidIsValid(cache->config.leafType) && 137 cache->config.leafType != atttype) 138 { 139 if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC))) 140 ereport(ERROR, 141 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 142 errmsg("compress method must be defined when leaf type is different from input type"))); 143 144 fillTypeDesc(&cache->attLeafType, cache->config.leafType); 145 } 146 else 147 { 148 cache->attLeafType = cache->attType; 149 } 150 151 fillTypeDesc(&cache->attPrefixType, cache->config.prefixType); 152 fillTypeDesc(&cache->attLabelType, cache->config.labelType); 153 154 /* Last, get the lastUsedPages data from the metapage */ 155 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO); 156 LockBuffer(metabuffer, BUFFER_LOCK_SHARE); 157 158 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer)); 159 160 if (metadata->magicNumber != SPGIST_MAGIC_NUMBER) 161 elog(ERROR, "index \"%s\" is not an SP-GiST index", 162 RelationGetRelationName(index)); 163 164 cache->lastUsedPages = metadata->lastUsedPages; 165 166 UnlockReleaseBuffer(metabuffer); 167 168 index->rd_amcache = (void *) cache; 169 } 170 else 171 { 172 /* assume it's up to date */ 173 cache = (SpGistCache *) index->rd_amcache; 174 } 175 176 return cache; 177 } 178 179 /* Initialize SpGistState for working with the given index */ 180 void 181 initSpGistState(SpGistState *state, Relation index) 182 { 183 SpGistCache *cache; 184 185 /* Get cached static information about index */ 186 cache = spgGetCache(index); 187 188 state->config = cache->config; 189 state->attType = cache->attType; 190 state->attLeafType = cache->attLeafType; 191 state->attPrefixType = cache->attPrefixType; 192 state->attLabelType = cache->attLabelType; 193 194 /* Make workspace for constructing dead tuples */ 195 state->deadTupleStorage = palloc0(SGDTSIZE); 196 197 /* Set XID to use in redirection tuples */ 198 state->myXid = GetTopTransactionIdIfAny(); 199 200 /* Assume we're not in an index build (spgbuild will override) */ 201 state->isBuild = false; 202 } 203 204 /* 205 * Allocate a new page (either by recycling, or by extending the index file). 206 * 207 * The returned buffer is already pinned and exclusive-locked. 208 * Caller is responsible for initializing the page by calling SpGistInitBuffer. 209 */ 210 Buffer 211 SpGistNewBuffer(Relation index) 212 { 213 Buffer buffer; 214 bool needLock; 215 216 /* First, try to get a page from FSM */ 217 for (;;) 218 { 219 BlockNumber blkno = GetFreeIndexPage(index); 220 221 if (blkno == InvalidBlockNumber) 222 break; /* nothing known to FSM */ 223 224 /* 225 * The fixed pages shouldn't ever be listed in FSM, but just in case 226 * one is, ignore it. 227 */ 228 if (SpGistBlockIsFixed(blkno)) 229 continue; 230 231 buffer = ReadBuffer(index, blkno); 232 233 /* 234 * We have to guard against the possibility that someone else already 235 * recycled this page; the buffer may be locked if so. 236 */ 237 if (ConditionalLockBuffer(buffer)) 238 { 239 Page page = BufferGetPage(buffer); 240 241 if (PageIsNew(page)) 242 return buffer; /* OK to use, if never initialized */ 243 244 if (SpGistPageIsDeleted(page) || PageIsEmpty(page)) 245 return buffer; /* OK to use */ 246 247 LockBuffer(buffer, BUFFER_LOCK_UNLOCK); 248 } 249 250 /* Can't use it, so release buffer and try again */ 251 ReleaseBuffer(buffer); 252 } 253 254 /* Must extend the file */ 255 needLock = !RELATION_IS_LOCAL(index); 256 if (needLock) 257 LockRelationForExtension(index, ExclusiveLock); 258 259 buffer = ReadBuffer(index, P_NEW); 260 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); 261 262 if (needLock) 263 UnlockRelationForExtension(index, ExclusiveLock); 264 265 return buffer; 266 } 267 268 /* 269 * Update index metapage's lastUsedPages info from local cache, if possible 270 * 271 * Updating meta page isn't critical for index working, so 272 * 1 use ConditionalLockBuffer to improve concurrency 273 * 2 don't WAL-log metabuffer changes to decrease WAL traffic 274 */ 275 void 276 SpGistUpdateMetaPage(Relation index) 277 { 278 SpGistCache *cache = (SpGistCache *) index->rd_amcache; 279 280 if (cache != NULL) 281 { 282 Buffer metabuffer; 283 284 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO); 285 286 if (ConditionalLockBuffer(metabuffer)) 287 { 288 Page metapage = BufferGetPage(metabuffer); 289 SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage); 290 291 metadata->lastUsedPages = cache->lastUsedPages; 292 293 /* 294 * Set pd_lower just past the end of the metadata. This is 295 * essential, because without doing so, metadata will be lost if 296 * xlog.c compresses the page. (We must do this here because 297 * pre-v11 versions of PG did not set the metapage's pd_lower 298 * correctly, so a pg_upgraded index might contain the wrong 299 * value.) 300 */ 301 ((PageHeader) metapage)->pd_lower = 302 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage; 303 304 MarkBufferDirty(metabuffer); 305 UnlockReleaseBuffer(metabuffer); 306 } 307 else 308 { 309 ReleaseBuffer(metabuffer); 310 } 311 } 312 } 313 314 /* Macro to select proper element of lastUsedPages cache depending on flags */ 315 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */ 316 #define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES]) 317 318 /* 319 * Allocate and initialize a new buffer of the type and parity specified by 320 * flags. The returned buffer is already pinned and exclusive-locked. 321 * 322 * When requesting an inner page, if we get one with the wrong parity, 323 * we just release the buffer and try again. We will get a different page 324 * because GetFreeIndexPage will have marked the page used in FSM. The page 325 * is entered in our local lastUsedPages cache, so there's some hope of 326 * making use of it later in this session, but otherwise we rely on VACUUM 327 * to eventually re-enter the page in FSM, making it available for recycling. 328 * Note that such a page does not get marked dirty here, so unless it's used 329 * fairly soon, the buffer will just get discarded and the page will remain 330 * as it was on disk. 331 * 332 * When we return a buffer to the caller, the page is *not* entered into 333 * the lastUsedPages cache; we expect the caller will do so after it's taken 334 * whatever space it will use. This is because after the caller has used up 335 * some space, the page might have less space than whatever was cached already 336 * so we'd rather not trash the old cache entry. 337 */ 338 static Buffer 339 allocNewBuffer(Relation index, int flags) 340 { 341 SpGistCache *cache = spgGetCache(index); 342 uint16 pageflags = 0; 343 344 if (GBUF_REQ_LEAF(flags)) 345 pageflags |= SPGIST_LEAF; 346 if (GBUF_REQ_NULLS(flags)) 347 pageflags |= SPGIST_NULLS; 348 349 for (;;) 350 { 351 Buffer buffer; 352 353 buffer = SpGistNewBuffer(index); 354 SpGistInitBuffer(buffer, pageflags); 355 356 if (pageflags & SPGIST_LEAF) 357 { 358 /* Leaf pages have no parity concerns, so just use it */ 359 return buffer; 360 } 361 else 362 { 363 BlockNumber blkno = BufferGetBlockNumber(buffer); 364 int blkFlags = GBUF_INNER_PARITY(blkno); 365 366 if ((flags & GBUF_PARITY_MASK) == blkFlags) 367 { 368 /* Page has right parity, use it */ 369 return buffer; 370 } 371 else 372 { 373 /* Page has wrong parity, record it in cache and try again */ 374 if (pageflags & SPGIST_NULLS) 375 blkFlags |= GBUF_NULLS; 376 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno; 377 cache->lastUsedPages.cachedPage[blkFlags].freeSpace = 378 PageGetExactFreeSpace(BufferGetPage(buffer)); 379 UnlockReleaseBuffer(buffer); 380 } 381 } 382 } 383 } 384 385 /* 386 * Get a buffer of the type and parity specified by flags, having at least 387 * as much free space as indicated by needSpace. We use the lastUsedPages 388 * cache to assign the same buffer previously requested when possible. 389 * The returned buffer is already pinned and exclusive-locked. 390 * 391 * *isNew is set true if the page was initialized here, false if it was 392 * already valid. 393 */ 394 Buffer 395 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew) 396 { 397 SpGistCache *cache = spgGetCache(index); 398 SpGistLastUsedPage *lup; 399 400 /* Bail out if even an empty page wouldn't meet the demand */ 401 if (needSpace > SPGIST_PAGE_CAPACITY) 402 elog(ERROR, "desired SPGiST tuple size is too big"); 403 404 /* 405 * If possible, increase the space request to include relation's 406 * fillfactor. This ensures that when we add unrelated tuples to a page, 407 * we try to keep 100-fillfactor% available for adding tuples that are 408 * related to the ones already on it. But fillfactor mustn't cause an 409 * error for requests that would otherwise be legal. 410 */ 411 needSpace += RelationGetTargetPageFreeSpace(index, 412 SPGIST_DEFAULT_FILLFACTOR); 413 needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY); 414 415 /* Get the cache entry for this flags setting */ 416 lup = GET_LUP(cache, flags); 417 418 /* If we have nothing cached, just turn it over to allocNewBuffer */ 419 if (lup->blkno == InvalidBlockNumber) 420 { 421 *isNew = true; 422 return allocNewBuffer(index, flags); 423 } 424 425 /* fixed pages should never be in cache */ 426 Assert(!SpGistBlockIsFixed(lup->blkno)); 427 428 /* If cached freeSpace isn't enough, don't bother looking at the page */ 429 if (lup->freeSpace >= needSpace) 430 { 431 Buffer buffer; 432 Page page; 433 434 buffer = ReadBuffer(index, lup->blkno); 435 436 if (!ConditionalLockBuffer(buffer)) 437 { 438 /* 439 * buffer is locked by another process, so return a new buffer 440 */ 441 ReleaseBuffer(buffer); 442 *isNew = true; 443 return allocNewBuffer(index, flags); 444 } 445 446 page = BufferGetPage(buffer); 447 448 if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page)) 449 { 450 /* OK to initialize the page */ 451 uint16 pageflags = 0; 452 453 if (GBUF_REQ_LEAF(flags)) 454 pageflags |= SPGIST_LEAF; 455 if (GBUF_REQ_NULLS(flags)) 456 pageflags |= SPGIST_NULLS; 457 SpGistInitBuffer(buffer, pageflags); 458 lup->freeSpace = PageGetExactFreeSpace(page) - needSpace; 459 *isNew = true; 460 return buffer; 461 } 462 463 /* 464 * Check that page is of right type and has enough space. We must 465 * recheck this since our cache isn't necessarily up to date. 466 */ 467 if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) && 468 (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page))) 469 { 470 int freeSpace = PageGetExactFreeSpace(page); 471 472 if (freeSpace >= needSpace) 473 { 474 /* Success, update freespace info and return the buffer */ 475 lup->freeSpace = freeSpace - needSpace; 476 *isNew = false; 477 return buffer; 478 } 479 } 480 481 /* 482 * fallback to allocation of new buffer 483 */ 484 UnlockReleaseBuffer(buffer); 485 } 486 487 /* No success with cache, so return a new buffer */ 488 *isNew = true; 489 return allocNewBuffer(index, flags); 490 } 491 492 /* 493 * Update lastUsedPages cache when done modifying a page. 494 * 495 * We update the appropriate cache entry if it already contained this page 496 * (its freeSpace is likely obsolete), or if this page has more space than 497 * whatever we had cached. 498 */ 499 void 500 SpGistSetLastUsedPage(Relation index, Buffer buffer) 501 { 502 SpGistCache *cache = spgGetCache(index); 503 SpGistLastUsedPage *lup; 504 int freeSpace; 505 Page page = BufferGetPage(buffer); 506 BlockNumber blkno = BufferGetBlockNumber(buffer); 507 int flags; 508 509 /* Never enter fixed pages (root pages) in cache, though */ 510 if (SpGistBlockIsFixed(blkno)) 511 return; 512 513 if (SpGistPageIsLeaf(page)) 514 flags = GBUF_LEAF; 515 else 516 flags = GBUF_INNER_PARITY(blkno); 517 if (SpGistPageStoresNulls(page)) 518 flags |= GBUF_NULLS; 519 520 lup = GET_LUP(cache, flags); 521 522 freeSpace = PageGetExactFreeSpace(page); 523 if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno || 524 lup->freeSpace < freeSpace) 525 { 526 lup->blkno = blkno; 527 lup->freeSpace = freeSpace; 528 } 529 } 530 531 /* 532 * Initialize an SPGiST page to empty, with specified flags 533 */ 534 void 535 SpGistInitPage(Page page, uint16 f) 536 { 537 SpGistPageOpaque opaque; 538 539 PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData))); 540 opaque = SpGistPageGetOpaque(page); 541 memset(opaque, 0, sizeof(SpGistPageOpaqueData)); 542 opaque->flags = f; 543 opaque->spgist_page_id = SPGIST_PAGE_ID; 544 } 545 546 /* 547 * Initialize a buffer's page to empty, with specified flags 548 */ 549 void 550 SpGistInitBuffer(Buffer b, uint16 f) 551 { 552 Assert(BufferGetPageSize(b) == BLCKSZ); 553 SpGistInitPage(BufferGetPage(b), f); 554 } 555 556 /* 557 * Initialize metadata page 558 */ 559 void 560 SpGistInitMetapage(Page page) 561 { 562 SpGistMetaPageData *metadata; 563 int i; 564 565 SpGistInitPage(page, SPGIST_META); 566 metadata = SpGistPageGetMeta(page); 567 memset(metadata, 0, sizeof(SpGistMetaPageData)); 568 metadata->magicNumber = SPGIST_MAGIC_NUMBER; 569 570 /* initialize last-used-page cache to empty */ 571 for (i = 0; i < SPGIST_CACHED_PAGES; i++) 572 metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber; 573 574 /* 575 * Set pd_lower just past the end of the metadata. This is essential, 576 * because without doing so, metadata will be lost if xlog.c compresses 577 * the page. 578 */ 579 ((PageHeader) page)->pd_lower = 580 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page; 581 } 582 583 /* 584 * reloptions processing for SPGiST 585 */ 586 bytea * 587 spgoptions(Datum reloptions, bool validate) 588 { 589 return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST); 590 } 591 592 /* 593 * Get the space needed to store a non-null datum of the indicated type. 594 * Note the result is already rounded up to a MAXALIGN boundary. 595 * Also, we follow the SPGiST convention that pass-by-val types are 596 * just stored in their Datum representation (compare memcpyDatum). 597 */ 598 unsigned int 599 SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum) 600 { 601 unsigned int size; 602 603 if (att->attbyval) 604 size = sizeof(Datum); 605 else if (att->attlen > 0) 606 size = att->attlen; 607 else 608 size = VARSIZE_ANY(datum); 609 610 return MAXALIGN(size); 611 } 612 613 /* 614 * Copy the given non-null datum to *target 615 */ 616 static void 617 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum) 618 { 619 unsigned int size; 620 621 if (att->attbyval) 622 { 623 memcpy(target, &datum, sizeof(Datum)); 624 } 625 else 626 { 627 size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum); 628 memcpy(target, DatumGetPointer(datum), size); 629 } 630 } 631 632 /* 633 * Construct a leaf tuple containing the given heap TID and datum value 634 */ 635 SpGistLeafTuple 636 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, 637 Datum datum, bool isnull) 638 { 639 SpGistLeafTuple tup; 640 unsigned int size; 641 642 /* compute space needed (note result is already maxaligned) */ 643 size = SGLTHDRSZ; 644 if (!isnull) 645 size += SpGistGetTypeSize(&state->attLeafType, datum); 646 647 /* 648 * Ensure that we can replace the tuple with a dead tuple later. This 649 * test is unnecessary when !isnull, but let's be safe. 650 */ 651 if (size < SGDTSIZE) 652 size = SGDTSIZE; 653 654 /* OK, form the tuple */ 655 tup = (SpGistLeafTuple) palloc0(size); 656 657 tup->size = size; 658 tup->nextOffset = InvalidOffsetNumber; 659 tup->heapPtr = *heapPtr; 660 if (!isnull) 661 memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum); 662 663 return tup; 664 } 665 666 /* 667 * Construct a node (to go into an inner tuple) containing the given label 668 * 669 * Note that the node's downlink is just set invalid here. Caller will fill 670 * it in later. 671 */ 672 SpGistNodeTuple 673 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull) 674 { 675 SpGistNodeTuple tup; 676 unsigned int size; 677 unsigned short infomask = 0; 678 679 /* compute space needed (note result is already maxaligned) */ 680 size = SGNTHDRSZ; 681 if (!isnull) 682 size += SpGistGetTypeSize(&state->attLabelType, label); 683 684 /* 685 * Here we make sure that the size will fit in the field reserved for it 686 * in t_info. 687 */ 688 if ((size & INDEX_SIZE_MASK) != size) 689 ereport(ERROR, 690 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 691 errmsg("index row requires %zu bytes, maximum size is %zu", 692 (Size) size, (Size) INDEX_SIZE_MASK))); 693 694 tup = (SpGistNodeTuple) palloc0(size); 695 696 if (isnull) 697 infomask |= INDEX_NULL_MASK; 698 /* we don't bother setting the INDEX_VAR_MASK bit */ 699 infomask |= size; 700 tup->t_info = infomask; 701 702 /* The TID field will be filled in later */ 703 ItemPointerSetInvalid(&tup->t_tid); 704 705 if (!isnull) 706 memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label); 707 708 return tup; 709 } 710 711 /* 712 * Construct an inner tuple containing the given prefix and node array 713 */ 714 SpGistInnerTuple 715 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, 716 int nNodes, SpGistNodeTuple *nodes) 717 { 718 SpGistInnerTuple tup; 719 unsigned int size; 720 unsigned int prefixSize; 721 int i; 722 char *ptr; 723 724 /* Compute size needed */ 725 if (hasPrefix) 726 prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix); 727 else 728 prefixSize = 0; 729 730 size = SGITHDRSZ + prefixSize; 731 732 /* Note: we rely on node tuple sizes to be maxaligned already */ 733 for (i = 0; i < nNodes; i++) 734 size += IndexTupleSize(nodes[i]); 735 736 /* 737 * Ensure that we can replace the tuple with a dead tuple later. This 738 * test is unnecessary given current tuple layouts, but let's be safe. 739 */ 740 if (size < SGDTSIZE) 741 size = SGDTSIZE; 742 743 /* 744 * Inner tuple should be small enough to fit on a page 745 */ 746 if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)) 747 ereport(ERROR, 748 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 749 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu", 750 (Size) size, 751 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)), 752 errhint("Values larger than a buffer page cannot be indexed."))); 753 754 /* 755 * Check for overflow of header fields --- probably can't fail if the 756 * above succeeded, but let's be paranoid 757 */ 758 if (size > SGITMAXSIZE || 759 prefixSize > SGITMAXPREFIXSIZE || 760 nNodes > SGITMAXNNODES) 761 elog(ERROR, "SPGiST inner tuple header field is too small"); 762 763 /* OK, form the tuple */ 764 tup = (SpGistInnerTuple) palloc0(size); 765 766 tup->nNodes = nNodes; 767 tup->prefixSize = prefixSize; 768 tup->size = size; 769 770 if (hasPrefix) 771 memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix); 772 773 ptr = (char *) SGITNODEPTR(tup); 774 775 for (i = 0; i < nNodes; i++) 776 { 777 SpGistNodeTuple node = nodes[i]; 778 779 memcpy(ptr, node, IndexTupleSize(node)); 780 ptr += IndexTupleSize(node); 781 } 782 783 return tup; 784 } 785 786 /* 787 * Construct a "dead" tuple to replace a tuple being deleted. 788 * 789 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER. 790 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and 791 * the xid field is filled in automatically. 792 * 793 * This is called in critical sections, so we don't use palloc; the tuple 794 * is built in preallocated storage. It should be copied before another 795 * call with different parameters can occur. 796 */ 797 SpGistDeadTuple 798 spgFormDeadTuple(SpGistState *state, int tupstate, 799 BlockNumber blkno, OffsetNumber offnum) 800 { 801 SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage; 802 803 tuple->tupstate = tupstate; 804 tuple->size = SGDTSIZE; 805 tuple->nextOffset = InvalidOffsetNumber; 806 807 if (tupstate == SPGIST_REDIRECT) 808 { 809 ItemPointerSet(&tuple->pointer, blkno, offnum); 810 Assert(TransactionIdIsValid(state->myXid)); 811 tuple->xid = state->myXid; 812 } 813 else 814 { 815 ItemPointerSetInvalid(&tuple->pointer); 816 tuple->xid = InvalidTransactionId; 817 } 818 819 return tuple; 820 } 821 822 /* 823 * Extract the label datums of the nodes within innerTuple 824 * 825 * Returns NULL if label datums are NULLs 826 */ 827 Datum * 828 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple) 829 { 830 Datum *nodeLabels; 831 int i; 832 SpGistNodeTuple node; 833 834 /* Either all the labels must be NULL, or none. */ 835 node = SGITNODEPTR(innerTuple); 836 if (IndexTupleHasNulls(node)) 837 { 838 SGITITERATE(innerTuple, i, node) 839 { 840 if (!IndexTupleHasNulls(node)) 841 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple"); 842 } 843 /* They're all null, so just return NULL */ 844 return NULL; 845 } 846 else 847 { 848 nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes); 849 SGITITERATE(innerTuple, i, node) 850 { 851 if (IndexTupleHasNulls(node)) 852 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple"); 853 nodeLabels[i] = SGNTDATUM(node, state); 854 } 855 return nodeLabels; 856 } 857 } 858 859 /* 860 * Add a new item to the page, replacing a PLACEHOLDER item if possible. 861 * Return the location it's inserted at, or InvalidOffsetNumber on failure. 862 * 863 * If startOffset isn't NULL, we start searching for placeholders at 864 * *startOffset, and update that to the next place to search. This is just 865 * an optimization for repeated insertions. 866 * 867 * If errorOK is false, we throw error when there's not enough room, 868 * rather than returning InvalidOffsetNumber. 869 */ 870 OffsetNumber 871 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, 872 OffsetNumber *startOffset, bool errorOK) 873 { 874 SpGistPageOpaque opaque = SpGistPageGetOpaque(page); 875 OffsetNumber i, 876 maxoff, 877 offnum; 878 879 if (opaque->nPlaceholder > 0 && 880 PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size)) 881 { 882 /* Try to replace a placeholder */ 883 maxoff = PageGetMaxOffsetNumber(page); 884 offnum = InvalidOffsetNumber; 885 886 for (;;) 887 { 888 if (startOffset && *startOffset != InvalidOffsetNumber) 889 i = *startOffset; 890 else 891 i = FirstOffsetNumber; 892 for (; i <= maxoff; i++) 893 { 894 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page, 895 PageGetItemId(page, i)); 896 897 if (it->tupstate == SPGIST_PLACEHOLDER) 898 { 899 offnum = i; 900 break; 901 } 902 } 903 904 /* Done if we found a placeholder */ 905 if (offnum != InvalidOffsetNumber) 906 break; 907 908 if (startOffset && *startOffset != InvalidOffsetNumber) 909 { 910 /* Hint was no good, re-search from beginning */ 911 *startOffset = InvalidOffsetNumber; 912 continue; 913 } 914 915 /* Hmm, no placeholder found? */ 916 opaque->nPlaceholder = 0; 917 break; 918 } 919 920 if (offnum != InvalidOffsetNumber) 921 { 922 /* Replace the placeholder tuple */ 923 PageIndexTupleDelete(page, offnum); 924 925 offnum = PageAddItem(page, item, size, offnum, false, false); 926 927 /* 928 * We should not have failed given the size check at the top of 929 * the function, but test anyway. If we did fail, we must PANIC 930 * because we've already deleted the placeholder tuple, and 931 * there's no other way to keep the damage from getting to disk. 932 */ 933 if (offnum != InvalidOffsetNumber) 934 { 935 Assert(opaque->nPlaceholder > 0); 936 opaque->nPlaceholder--; 937 if (startOffset) 938 *startOffset = offnum + 1; 939 } 940 else 941 elog(PANIC, "failed to add item of size %u to SPGiST index page", 942 (int) size); 943 944 return offnum; 945 } 946 } 947 948 /* No luck in replacing a placeholder, so just add it to the page */ 949 offnum = PageAddItem(page, item, size, 950 InvalidOffsetNumber, false, false); 951 952 if (offnum == InvalidOffsetNumber && !errorOK) 953 elog(ERROR, "failed to add item of size %u to SPGiST index page", 954 (int) size); 955 956 return offnum; 957 } 958 959 /* 960 * spgproperty() -- Check boolean properties of indexes. 961 * 962 * This is optional for most AMs, but is required for SP-GiST because the core 963 * property code doesn't support AMPROP_DISTANCE_ORDERABLE. 964 */ 965 bool 966 spgproperty(Oid index_oid, int attno, 967 IndexAMProperty prop, const char *propname, 968 bool *res, bool *isnull) 969 { 970 Oid opclass, 971 opfamily, 972 opcintype; 973 CatCList *catlist; 974 int i; 975 976 /* Only answer column-level inquiries */ 977 if (attno == 0) 978 return false; 979 980 switch (prop) 981 { 982 case AMPROP_DISTANCE_ORDERABLE: 983 break; 984 default: 985 return false; 986 } 987 988 /* 989 * Currently, SP-GiST distance-ordered scans require that there be a 990 * distance operator in the opclass with the default types. So we assume 991 * that if such a operator exists, then there's a reason for it. 992 */ 993 994 /* First we need to know the column's opclass. */ 995 opclass = get_index_column_opclass(index_oid, attno); 996 if (!OidIsValid(opclass)) 997 { 998 *isnull = true; 999 return true; 1000 } 1001 1002 /* Now look up the opclass family and input datatype. */ 1003 if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype)) 1004 { 1005 *isnull = true; 1006 return true; 1007 } 1008 1009 /* And now we can check whether the operator is provided. */ 1010 catlist = SearchSysCacheList1(AMOPSTRATEGY, 1011 ObjectIdGetDatum(opfamily)); 1012 1013 *res = false; 1014 1015 for (i = 0; i < catlist->n_members; i++) 1016 { 1017 HeapTuple amoptup = &catlist->members[i]->tuple; 1018 Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup); 1019 1020 if (amopform->amoppurpose == AMOP_ORDER && 1021 (amopform->amoplefttype == opcintype || 1022 amopform->amoprighttype == opcintype) && 1023 opfamily_can_sort_type(amopform->amopsortfamily, 1024 get_op_rettype(amopform->amopopr))) 1025 { 1026 *res = true; 1027 break; 1028 } 1029 } 1030 1031 ReleaseSysCacheList(catlist); 1032 1033 *isnull = false; 1034 1035 return true; 1036 } 1037