1 /*------------------------------------------------------------------------- 2 * 3 * gistutil.c 4 * utilities routines for the postgres GiST index access method. 5 * 6 * 7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994, Regents of the University of California 9 * 10 * IDENTIFICATION 11 * src/backend/access/gist/gistutil.c 12 *------------------------------------------------------------------------- 13 */ 14 #include "postgres.h" 15 16 #include <math.h> 17 18 #include "access/gist_private.h" 19 #include "access/htup_details.h" 20 #include "access/reloptions.h" 21 #include "catalog/pg_opclass.h" 22 #include "storage/indexfsm.h" 23 #include "storage/lmgr.h" 24 #include "utils/float.h" 25 #include "utils/syscache.h" 26 #include "utils/snapmgr.h" 27 #include "utils/lsyscache.h" 28 29 30 /* 31 * Write itup vector to page, has no control of free space. 32 */ 33 void 34 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off) 35 { 36 OffsetNumber l = InvalidOffsetNumber; 37 int i; 38 39 if (off == InvalidOffsetNumber) 40 off = (PageIsEmpty(page)) ? FirstOffsetNumber : 41 OffsetNumberNext(PageGetMaxOffsetNumber(page)); 42 43 for (i = 0; i < len; i++) 44 { 45 Size sz = IndexTupleSize(itup[i]); 46 47 l = PageAddItem(page, (Item) itup[i], sz, off, false, false); 48 if (l == InvalidOffsetNumber) 49 elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes", 50 i, len, (int) sz); 51 off++; 52 } 53 } 54 55 /* 56 * Check space for itup vector on page 57 */ 58 bool 59 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace) 60 { 61 unsigned int size = freespace, 62 deleted = 0; 63 int i; 64 65 for (i = 0; i < len; i++) 66 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); 67 68 if (todelete != InvalidOffsetNumber) 69 { 70 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete)); 71 72 deleted = IndexTupleSize(itup) + sizeof(ItemIdData); 73 } 74 75 return (PageGetFreeSpace(page) + deleted < size); 76 } 77 78 bool 79 gistfitpage(IndexTuple *itvec, int len) 80 { 81 int i; 82 Size size = 0; 83 84 for (i = 0; i < len; i++) 85 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); 86 87 /* TODO: Consider fillfactor */ 88 return (size <= GiSTPageSize); 89 } 90 91 /* 92 * Read buffer into itup vector 93 */ 94 IndexTuple * 95 gistextractpage(Page page, int *len /* out */ ) 96 { 97 OffsetNumber i, 98 maxoff; 99 IndexTuple *itvec; 100 101 maxoff = PageGetMaxOffsetNumber(page); 102 *len = maxoff; 103 itvec = palloc(sizeof(IndexTuple) * maxoff); 104 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 105 itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); 106 107 return itvec; 108 } 109 110 /* 111 * join two vectors into one 112 */ 113 IndexTuple * 114 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) 115 { 116 itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); 117 memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); 118 *len += addlen; 119 return itvec; 120 } 121 122 /* 123 * make plain IndexTupleVector 124 */ 125 126 IndexTupleData * 127 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) 128 { 129 char *ptr, 130 *ret; 131 int i; 132 133 *memlen = 0; 134 135 for (i = 0; i < veclen; i++) 136 *memlen += IndexTupleSize(vec[i]); 137 138 ptr = ret = palloc(*memlen); 139 140 for (i = 0; i < veclen; i++) 141 { 142 memcpy(ptr, vec[i], IndexTupleSize(vec[i])); 143 ptr += IndexTupleSize(vec[i]); 144 } 145 146 return (IndexTupleData *) ret; 147 } 148 149 /* 150 * Make unions of keys in IndexTuple vector (one union datum per index column). 151 * Union Datums are returned into the attr/isnull arrays. 152 * Resulting Datums aren't compressed. 153 */ 154 void 155 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, 156 Datum *attr, bool *isnull) 157 { 158 int i; 159 GistEntryVector *evec; 160 int attrsize; 161 162 evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ); 163 164 for (i = 0; i < giststate->nonLeafTupdesc->natts; i++) 165 { 166 int j; 167 168 /* Collect non-null datums for this column */ 169 evec->n = 0; 170 for (j = 0; j < len; j++) 171 { 172 Datum datum; 173 bool IsNull; 174 175 datum = index_getattr(itvec[j], i + 1, giststate->leafTupdesc, 176 &IsNull); 177 if (IsNull) 178 continue; 179 180 gistdentryinit(giststate, i, 181 evec->vector + evec->n, 182 datum, 183 NULL, NULL, (OffsetNumber) 0, 184 false, IsNull); 185 evec->n++; 186 } 187 188 /* If this column was all NULLs, the union is NULL */ 189 if (evec->n == 0) 190 { 191 attr[i] = (Datum) 0; 192 isnull[i] = true; 193 } 194 else 195 { 196 if (evec->n == 1) 197 { 198 /* unionFn may expect at least two inputs */ 199 evec->n = 2; 200 evec->vector[1] = evec->vector[0]; 201 } 202 203 /* Make union and store in attr array */ 204 attr[i] = FunctionCall2Coll(&giststate->unionFn[i], 205 giststate->supportCollation[i], 206 PointerGetDatum(evec), 207 PointerGetDatum(&attrsize)); 208 209 isnull[i] = false; 210 } 211 } 212 } 213 214 /* 215 * Return an IndexTuple containing the result of applying the "union" 216 * method to the specified IndexTuple vector. 217 */ 218 IndexTuple 219 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) 220 { 221 Datum attr[INDEX_MAX_KEYS]; 222 bool isnull[INDEX_MAX_KEYS]; 223 224 gistMakeUnionItVec(giststate, itvec, len, attr, isnull); 225 226 return gistFormTuple(giststate, r, attr, isnull, false); 227 } 228 229 /* 230 * makes union of two key 231 */ 232 void 233 gistMakeUnionKey(GISTSTATE *giststate, int attno, 234 GISTENTRY *entry1, bool isnull1, 235 GISTENTRY *entry2, bool isnull2, 236 Datum *dst, bool *dstisnull) 237 { 238 /* we need a GistEntryVector with room for exactly 2 elements */ 239 union 240 { 241 GistEntryVector gev; 242 char padding[2 * sizeof(GISTENTRY) + GEVHDRSZ]; 243 } storage; 244 GistEntryVector *evec = &storage.gev; 245 int dstsize; 246 247 evec->n = 2; 248 249 if (isnull1 && isnull2) 250 { 251 *dstisnull = true; 252 *dst = (Datum) 0; 253 } 254 else 255 { 256 if (isnull1 == false && isnull2 == false) 257 { 258 evec->vector[0] = *entry1; 259 evec->vector[1] = *entry2; 260 } 261 else if (isnull1 == false) 262 { 263 evec->vector[0] = *entry1; 264 evec->vector[1] = *entry1; 265 } 266 else 267 { 268 evec->vector[0] = *entry2; 269 evec->vector[1] = *entry2; 270 } 271 272 *dstisnull = false; 273 *dst = FunctionCall2Coll(&giststate->unionFn[attno], 274 giststate->supportCollation[attno], 275 PointerGetDatum(evec), 276 PointerGetDatum(&dstsize)); 277 } 278 } 279 280 bool 281 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) 282 { 283 bool result; 284 285 FunctionCall3Coll(&giststate->equalFn[attno], 286 giststate->supportCollation[attno], 287 a, b, 288 PointerGetDatum(&result)); 289 return result; 290 } 291 292 /* 293 * Decompress all keys in tuple 294 */ 295 void 296 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, 297 OffsetNumber o, GISTENTRY *attdata, bool *isnull) 298 { 299 int i; 300 301 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++) 302 { 303 Datum datum; 304 305 datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]); 306 gistdentryinit(giststate, i, &attdata[i], 307 datum, r, p, o, 308 false, isnull[i]); 309 } 310 } 311 312 /* 313 * Forms union of oldtup and addtup, if union == oldtup then return NULL 314 */ 315 IndexTuple 316 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) 317 { 318 bool neednew = false; 319 GISTENTRY oldentries[INDEX_MAX_KEYS], 320 addentries[INDEX_MAX_KEYS]; 321 bool oldisnull[INDEX_MAX_KEYS], 322 addisnull[INDEX_MAX_KEYS]; 323 Datum attr[INDEX_MAX_KEYS]; 324 bool isnull[INDEX_MAX_KEYS]; 325 IndexTuple newtup = NULL; 326 int i; 327 328 gistDeCompressAtt(giststate, r, oldtup, NULL, 329 (OffsetNumber) 0, oldentries, oldisnull); 330 331 gistDeCompressAtt(giststate, r, addtup, NULL, 332 (OffsetNumber) 0, addentries, addisnull); 333 334 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++) 335 { 336 gistMakeUnionKey(giststate, i, 337 oldentries + i, oldisnull[i], 338 addentries + i, addisnull[i], 339 attr + i, isnull + i); 340 341 if (neednew) 342 /* we already need new key, so we can skip check */ 343 continue; 344 345 if (isnull[i]) 346 /* union of key may be NULL if and only if both keys are NULL */ 347 continue; 348 349 if (!addisnull[i]) 350 { 351 if (oldisnull[i] || 352 !gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i])) 353 neednew = true; 354 } 355 } 356 357 if (neednew) 358 { 359 /* need to update key */ 360 newtup = gistFormTuple(giststate, r, attr, isnull, false); 361 newtup->t_tid = oldtup->t_tid; 362 } 363 364 return newtup; 365 } 366 367 /* 368 * Search an upper index page for the entry with lowest penalty for insertion 369 * of the new index key contained in "it". 370 * 371 * Returns the index of the page entry to insert into. 372 */ 373 OffsetNumber 374 gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ 375 GISTSTATE *giststate) 376 { 377 OffsetNumber result; 378 OffsetNumber maxoff; 379 OffsetNumber i; 380 float best_penalty[INDEX_MAX_KEYS]; 381 GISTENTRY entry, 382 identry[INDEX_MAX_KEYS]; 383 bool isnull[INDEX_MAX_KEYS]; 384 int keep_current_best; 385 386 Assert(!GistPageIsLeaf(p)); 387 388 gistDeCompressAtt(giststate, r, 389 it, NULL, (OffsetNumber) 0, 390 identry, isnull); 391 392 /* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */ 393 result = FirstOffsetNumber; 394 395 /* 396 * The index may have multiple columns, and there's a penalty value for 397 * each column. The penalty associated with a column that appears earlier 398 * in the index definition is strictly more important than the penalty of 399 * a column that appears later in the index definition. 400 * 401 * best_penalty[j] is the best penalty we have seen so far for column j, 402 * or -1 when we haven't yet examined column j. Array entries to the 403 * right of the first -1 are undefined. 404 */ 405 best_penalty[0] = -1; 406 407 /* 408 * If we find a tuple that's exactly as good as the currently best one, we 409 * could use either one. When inserting a lot of tuples with the same or 410 * similar keys, it's preferable to descend down the same path when 411 * possible, as that's more cache-friendly. On the other hand, if all 412 * inserts land on the same leaf page after a split, we're never going to 413 * insert anything to the other half of the split, and will end up using 414 * only 50% of the available space. Distributing the inserts evenly would 415 * lead to better space usage, but that hurts cache-locality during 416 * insertion. To get the best of both worlds, when we find a tuple that's 417 * exactly as good as the previous best, choose randomly whether to stick 418 * to the old best, or use the new one. Once we decide to stick to the 419 * old best, we keep sticking to it for any subsequent equally good tuples 420 * we might find. This favors tuples with low offsets, but still allows 421 * some inserts to go to other equally-good subtrees. 422 * 423 * keep_current_best is -1 if we haven't yet had to make a random choice 424 * whether to keep the current best tuple. If we have done so, and 425 * decided to keep it, keep_current_best is 1; if we've decided to 426 * replace, keep_current_best is 0. (This state will be reset to -1 as 427 * soon as we've made the replacement, but sometimes we make the choice in 428 * advance of actually finding a replacement best tuple.) 429 */ 430 keep_current_best = -1; 431 432 /* 433 * Loop over tuples on page. 434 */ 435 maxoff = PageGetMaxOffsetNumber(p); 436 Assert(maxoff >= FirstOffsetNumber); 437 438 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 439 { 440 IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); 441 bool zero_penalty; 442 int j; 443 444 zero_penalty = true; 445 446 /* Loop over index attributes. */ 447 for (j = 0; j < IndexRelationGetNumberOfKeyAttributes(r); j++) 448 { 449 Datum datum; 450 float usize; 451 bool IsNull; 452 453 /* Compute penalty for this column. */ 454 datum = index_getattr(itup, j + 1, giststate->leafTupdesc, 455 &IsNull); 456 gistdentryinit(giststate, j, &entry, datum, r, p, i, 457 false, IsNull); 458 usize = gistpenalty(giststate, j, &entry, IsNull, 459 &identry[j], isnull[j]); 460 if (usize > 0) 461 zero_penalty = false; 462 463 if (best_penalty[j] < 0 || usize < best_penalty[j]) 464 { 465 /* 466 * New best penalty for column. Tentatively select this tuple 467 * as the target, and record the best penalty. Then reset the 468 * next column's penalty to "unknown" (and indirectly, the 469 * same for all the ones to its right). This will force us to 470 * adopt this tuple's penalty values as the best for all the 471 * remaining columns during subsequent loop iterations. 472 */ 473 result = i; 474 best_penalty[j] = usize; 475 476 if (j < IndexRelationGetNumberOfKeyAttributes(r) - 1) 477 best_penalty[j + 1] = -1; 478 479 /* we have new best, so reset keep-it decision */ 480 keep_current_best = -1; 481 } 482 else if (best_penalty[j] == usize) 483 { 484 /* 485 * The current tuple is exactly as good for this column as the 486 * best tuple seen so far. The next iteration of this loop 487 * will compare the next column. 488 */ 489 } 490 else 491 { 492 /* 493 * The current tuple is worse for this column than the best 494 * tuple seen so far. Skip the remaining columns and move on 495 * to the next tuple, if any. 496 */ 497 zero_penalty = false; /* so outer loop won't exit */ 498 break; 499 } 500 } 501 502 /* 503 * If we looped past the last column, and did not update "result", 504 * then this tuple is exactly as good as the prior best tuple. 505 */ 506 if (j == IndexRelationGetNumberOfKeyAttributes(r) && result != i) 507 { 508 if (keep_current_best == -1) 509 { 510 /* we didn't make the random choice yet for this old best */ 511 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0; 512 } 513 if (keep_current_best == 0) 514 { 515 /* we choose to use the new tuple */ 516 result = i; 517 /* choose again if there are even more exactly-as-good ones */ 518 keep_current_best = -1; 519 } 520 } 521 522 /* 523 * If we find a tuple with zero penalty for all columns, and we've 524 * decided we don't want to search for another tuple with equal 525 * penalty, there's no need to examine remaining tuples; just break 526 * out of the loop and return it. 527 */ 528 if (zero_penalty) 529 { 530 if (keep_current_best == -1) 531 { 532 /* we didn't make the random choice yet for this old best */ 533 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0; 534 } 535 if (keep_current_best == 1) 536 break; 537 } 538 } 539 540 return result; 541 } 542 543 /* 544 * initialize a GiST entry with a decompressed version of key 545 */ 546 void 547 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, 548 Datum k, Relation r, Page pg, OffsetNumber o, 549 bool l, bool isNull) 550 { 551 if (!isNull) 552 { 553 GISTENTRY *dep; 554 555 gistentryinit(*e, k, r, pg, o, l); 556 557 /* there may not be a decompress function in opclass */ 558 if (!OidIsValid(giststate->decompressFn[nkey].fn_oid)) 559 return; 560 561 dep = (GISTENTRY *) 562 DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey], 563 giststate->supportCollation[nkey], 564 PointerGetDatum(e))); 565 /* decompressFn may just return the given pointer */ 566 if (dep != e) 567 gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, 568 dep->leafkey); 569 } 570 else 571 gistentryinit(*e, (Datum) 0, r, pg, o, l); 572 } 573 574 IndexTuple 575 gistFormTuple(GISTSTATE *giststate, Relation r, 576 Datum attdata[], bool isnull[], bool isleaf) 577 { 578 Datum compatt[INDEX_MAX_KEYS]; 579 int i; 580 IndexTuple res; 581 582 /* 583 * Call the compress method on each attribute. 584 */ 585 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++) 586 { 587 if (isnull[i]) 588 compatt[i] = (Datum) 0; 589 else 590 { 591 GISTENTRY centry; 592 GISTENTRY *cep; 593 594 gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0, 595 isleaf); 596 /* there may not be a compress function in opclass */ 597 if (OidIsValid(giststate->compressFn[i].fn_oid)) 598 cep = (GISTENTRY *) 599 DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i], 600 giststate->supportCollation[i], 601 PointerGetDatum(¢ry))); 602 else 603 cep = ¢ry; 604 compatt[i] = cep->key; 605 } 606 } 607 608 if (isleaf) 609 { 610 /* 611 * Emplace each included attribute if any. 612 */ 613 for (; i < r->rd_att->natts; i++) 614 { 615 if (isnull[i]) 616 compatt[i] = (Datum) 0; 617 else 618 compatt[i] = attdata[i]; 619 } 620 } 621 622 res = index_form_tuple(isleaf ? giststate->leafTupdesc : 623 giststate->nonLeafTupdesc, 624 compatt, isnull); 625 626 /* 627 * The offset number on tuples on internal pages is unused. For historical 628 * reasons, it is set to 0xffff. 629 */ 630 ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff); 631 return res; 632 } 633 634 /* 635 * initialize a GiST entry with fetched value in key field 636 */ 637 static Datum 638 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r) 639 { 640 GISTENTRY fentry; 641 GISTENTRY *fep; 642 643 gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false); 644 645 fep = (GISTENTRY *) 646 DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey], 647 giststate->supportCollation[nkey], 648 PointerGetDatum(&fentry))); 649 650 /* fetchFn set 'key', return it to the caller */ 651 return fep->key; 652 } 653 654 /* 655 * Fetch all keys in tuple. 656 * Returns a new HeapTuple containing the originally-indexed data. 657 */ 658 HeapTuple 659 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple) 660 { 661 MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt); 662 Datum fetchatt[INDEX_MAX_KEYS]; 663 bool isnull[INDEX_MAX_KEYS]; 664 int i; 665 666 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++) 667 { 668 Datum datum; 669 670 datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]); 671 672 if (giststate->fetchFn[i].fn_oid != InvalidOid) 673 { 674 if (!isnull[i]) 675 fetchatt[i] = gistFetchAtt(giststate, i, datum, r); 676 else 677 fetchatt[i] = (Datum) 0; 678 } 679 else if (giststate->compressFn[i].fn_oid == InvalidOid) 680 { 681 /* 682 * If opclass does not provide compress method that could change 683 * original value, att is necessarily stored in original form. 684 */ 685 if (!isnull[i]) 686 fetchatt[i] = datum; 687 else 688 fetchatt[i] = (Datum) 0; 689 } 690 else 691 { 692 /* 693 * Index-only scans not supported for this column. Since the 694 * planner chose an index-only scan anyway, it is not interested 695 * in this column, and we can replace it with a NULL. 696 */ 697 isnull[i] = true; 698 fetchatt[i] = (Datum) 0; 699 } 700 } 701 702 /* 703 * Get each included attribute. 704 */ 705 for (; i < r->rd_att->natts; i++) 706 { 707 fetchatt[i] = index_getattr(tuple, i + 1, giststate->leafTupdesc, 708 &isnull[i]); 709 } 710 MemoryContextSwitchTo(oldcxt); 711 712 return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull); 713 } 714 715 float 716 gistpenalty(GISTSTATE *giststate, int attno, 717 GISTENTRY *orig, bool isNullOrig, 718 GISTENTRY *add, bool isNullAdd) 719 { 720 float penalty = 0.0; 721 722 if (giststate->penaltyFn[attno].fn_strict == false || 723 (isNullOrig == false && isNullAdd == false)) 724 { 725 FunctionCall3Coll(&giststate->penaltyFn[attno], 726 giststate->supportCollation[attno], 727 PointerGetDatum(orig), 728 PointerGetDatum(add), 729 PointerGetDatum(&penalty)); 730 /* disallow negative or NaN penalty */ 731 if (isnan(penalty) || penalty < 0.0) 732 penalty = 0.0; 733 } 734 else if (isNullOrig && isNullAdd) 735 penalty = 0.0; 736 else 737 { 738 /* try to prevent mixing null and non-null values */ 739 penalty = get_float4_infinity(); 740 } 741 742 return penalty; 743 } 744 745 /* 746 * Initialize a new index page 747 */ 748 void 749 GISTInitBuffer(Buffer b, uint32 f) 750 { 751 GISTPageOpaque opaque; 752 Page page; 753 Size pageSize; 754 755 pageSize = BufferGetPageSize(b); 756 page = BufferGetPage(b); 757 PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); 758 759 opaque = GistPageGetOpaque(page); 760 /* page was already zeroed by PageInit, so this is not needed: */ 761 /* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */ 762 opaque->rightlink = InvalidBlockNumber; 763 opaque->flags = f; 764 opaque->gist_page_id = GIST_PAGE_ID; 765 } 766 767 /* 768 * Verify that a freshly-read page looks sane. 769 */ 770 void 771 gistcheckpage(Relation rel, Buffer buf) 772 { 773 Page page = BufferGetPage(buf); 774 775 /* 776 * ReadBuffer verifies that every newly-read page passes 777 * PageHeaderIsValid, which means it either contains a reasonably sane 778 * page header or is all-zero. We have to defend against the all-zero 779 * case, however. 780 */ 781 if (PageIsNew(page)) 782 ereport(ERROR, 783 (errcode(ERRCODE_INDEX_CORRUPTED), 784 errmsg("index \"%s\" contains unexpected zero page at block %u", 785 RelationGetRelationName(rel), 786 BufferGetBlockNumber(buf)), 787 errhint("Please REINDEX it."))); 788 789 /* 790 * Additionally check that the special area looks sane. 791 */ 792 if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData))) 793 ereport(ERROR, 794 (errcode(ERRCODE_INDEX_CORRUPTED), 795 errmsg("index \"%s\" contains corrupted page at block %u", 796 RelationGetRelationName(rel), 797 BufferGetBlockNumber(buf)), 798 errhint("Please REINDEX it."))); 799 } 800 801 802 /* 803 * Allocate a new page (either by recycling, or by extending the index file) 804 * 805 * The returned buffer is already pinned and exclusive-locked 806 * 807 * Caller is responsible for initializing the page by calling GISTInitBuffer 808 */ 809 Buffer 810 gistNewBuffer(Relation r) 811 { 812 Buffer buffer; 813 bool needLock; 814 815 /* First, try to get a page from FSM */ 816 for (;;) 817 { 818 BlockNumber blkno = GetFreeIndexPage(r); 819 820 if (blkno == InvalidBlockNumber) 821 break; /* nothing left in FSM */ 822 823 buffer = ReadBuffer(r, blkno); 824 825 /* 826 * We have to guard against the possibility that someone else already 827 * recycled this page; the buffer may be locked if so. 828 */ 829 if (ConditionalLockBuffer(buffer)) 830 { 831 Page page = BufferGetPage(buffer); 832 833 /* 834 * If the page was never initialized, it's OK to use. 835 */ 836 if (PageIsNew(page)) 837 return buffer; 838 839 gistcheckpage(r, buffer); 840 841 /* 842 * Otherwise, recycle it if deleted, and too old to have any 843 * processes interested in it. 844 */ 845 if (gistPageRecyclable(page)) 846 { 847 /* 848 * If we are generating WAL for Hot Standby then create a WAL 849 * record that will allow us to conflict with queries running 850 * on standby, in case they have snapshots older than the 851 * page's deleteXid. 852 */ 853 if (XLogStandbyInfoActive() && RelationNeedsWAL(r)) 854 gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page)); 855 856 return buffer; 857 } 858 859 LockBuffer(buffer, GIST_UNLOCK); 860 } 861 862 /* Can't use it, so release buffer and try again */ 863 ReleaseBuffer(buffer); 864 } 865 866 /* Must extend the file */ 867 needLock = !RELATION_IS_LOCAL(r); 868 869 if (needLock) 870 LockRelationForExtension(r, ExclusiveLock); 871 872 buffer = ReadBuffer(r, P_NEW); 873 LockBuffer(buffer, GIST_EXCLUSIVE); 874 875 if (needLock) 876 UnlockRelationForExtension(r, ExclusiveLock); 877 878 return buffer; 879 } 880 881 /* Can this page be recycled yet? */ 882 bool 883 gistPageRecyclable(Page page) 884 { 885 if (PageIsNew(page)) 886 return true; 887 if (GistPageIsDeleted(page)) 888 { 889 /* 890 * The page was deleted, but when? If it was just deleted, a scan 891 * might have seen the downlink to it, and will read the page later. 892 * As long as that can happen, we must keep the deleted page around as 893 * a tombstone. 894 * 895 * Compare the deletion XID with RecentGlobalXmin. If deleteXid < 896 * RecentGlobalXmin, then no scan that's still in progress could have 897 * seen its downlink, and we can recycle it. 898 */ 899 FullTransactionId deletexid_full = GistPageGetDeleteXid(page); 900 FullTransactionId recentxmin_full = GetFullRecentGlobalXmin(); 901 902 if (FullTransactionIdPrecedes(deletexid_full, recentxmin_full)) 903 return true; 904 } 905 return false; 906 } 907 908 bytea * 909 gistoptions(Datum reloptions, bool validate) 910 { 911 relopt_value *options; 912 GiSTOptions *rdopts; 913 int numoptions; 914 static const relopt_parse_elt tab[] = { 915 {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)}, 916 {"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)} 917 }; 918 919 options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST, 920 &numoptions); 921 922 /* if none set, we're done */ 923 if (numoptions == 0) 924 return NULL; 925 926 rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions); 927 928 fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions, 929 validate, tab, lengthof(tab)); 930 931 pfree(options); 932 933 return (bytea *) rdopts; 934 } 935 936 /* 937 * gistproperty() -- Check boolean properties of indexes. 938 * 939 * This is optional for most AMs, but is required for GiST because the core 940 * property code doesn't support AMPROP_DISTANCE_ORDERABLE. We also handle 941 * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn. 942 */ 943 bool 944 gistproperty(Oid index_oid, int attno, 945 IndexAMProperty prop, const char *propname, 946 bool *res, bool *isnull) 947 { 948 Oid opclass, 949 opfamily, 950 opcintype; 951 int16 procno; 952 953 /* Only answer column-level inquiries */ 954 if (attno == 0) 955 return false; 956 957 /* 958 * Currently, GiST distance-ordered scans require that there be a distance 959 * function in the opclass with the default types (i.e. the one loaded 960 * into the relcache entry, see initGISTstate). So we assume that if such 961 * a function exists, then there's a reason for it (rather than grubbing 962 * through all the opfamily's operators to find an ordered one). 963 * 964 * Essentially the same code can test whether we support returning the 965 * column data, since that's true if the opclass provides a fetch proc. 966 */ 967 968 switch (prop) 969 { 970 case AMPROP_DISTANCE_ORDERABLE: 971 procno = GIST_DISTANCE_PROC; 972 break; 973 case AMPROP_RETURNABLE: 974 procno = GIST_FETCH_PROC; 975 break; 976 default: 977 return false; 978 } 979 980 /* First we need to know the column's opclass. */ 981 opclass = get_index_column_opclass(index_oid, attno); 982 if (!OidIsValid(opclass)) 983 { 984 *isnull = true; 985 return true; 986 } 987 988 /* Now look up the opclass family and input datatype. */ 989 if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype)) 990 { 991 *isnull = true; 992 return true; 993 } 994 995 /* And now we can check whether the function is provided. */ 996 997 *res = SearchSysCacheExists4(AMPROCNUM, 998 ObjectIdGetDatum(opfamily), 999 ObjectIdGetDatum(opcintype), 1000 ObjectIdGetDatum(opcintype), 1001 Int16GetDatum(procno)); 1002 1003 /* 1004 * Special case: even without a fetch function, AMPROP_RETURNABLE is true 1005 * if the opclass has no compress function. 1006 */ 1007 if (prop == AMPROP_RETURNABLE && !*res) 1008 { 1009 *res = !SearchSysCacheExists4(AMPROCNUM, 1010 ObjectIdGetDatum(opfamily), 1011 ObjectIdGetDatum(opcintype), 1012 ObjectIdGetDatum(opcintype), 1013 Int16GetDatum(GIST_COMPRESS_PROC)); 1014 } 1015 1016 *isnull = false; 1017 1018 return true; 1019 } 1020 1021 /* 1022 * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs 1023 * to detect concurrent page splits anyway. This function provides a fake 1024 * sequence of LSNs for that purpose. 1025 */ 1026 XLogRecPtr 1027 gistGetFakeLSN(Relation rel) 1028 { 1029 static XLogRecPtr counter = FirstNormalUnloggedLSN; 1030 1031 if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) 1032 { 1033 /* 1034 * Temporary relations are only accessible in our session, so a simple 1035 * backend-local counter will do. 1036 */ 1037 return counter++; 1038 } 1039 else 1040 { 1041 /* 1042 * Unlogged relations are accessible from other backends, and survive 1043 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us. 1044 */ 1045 Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED); 1046 return GetFakeLSNForUnloggedRel(); 1047 } 1048 } 1049