1 /*-------------------------------------------------------------------------
2 *
3 * spgutils.c
4 * various support functions for SP-GiST
5 *
6 *
7 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgutils.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/amvalidate.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "access/spgist_private.h"
22 #include "access/toast_compression.h"
23 #include "access/transam.h"
24 #include "access/xact.h"
25 #include "catalog/pg_amop.h"
26 #include "commands/vacuum.h"
27 #include "nodes/nodeFuncs.h"
28 #include "storage/bufmgr.h"
29 #include "storage/indexfsm.h"
30 #include "storage/lmgr.h"
31 #include "utils/builtins.h"
32 #include "utils/catcache.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/lsyscache.h"
35 #include "utils/syscache.h"
36
37
38 /*
39 * SP-GiST handler function: return IndexAmRoutine with access method parameters
40 * and callbacks.
41 */
42 Datum
spghandler(PG_FUNCTION_ARGS)43 spghandler(PG_FUNCTION_ARGS)
44 {
45 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
46
47 amroutine->amstrategies = 0;
48 amroutine->amsupport = SPGISTNProc;
49 amroutine->amoptsprocnum = SPGIST_OPTIONS_PROC;
50 amroutine->amcanorder = false;
51 amroutine->amcanorderbyop = true;
52 amroutine->amcanbackward = false;
53 amroutine->amcanunique = false;
54 amroutine->amcanmulticol = false;
55 amroutine->amoptionalkey = true;
56 amroutine->amsearcharray = false;
57 amroutine->amsearchnulls = true;
58 amroutine->amstorage = true;
59 amroutine->amclusterable = false;
60 amroutine->ampredlocks = false;
61 amroutine->amcanparallel = false;
62 amroutine->amcaninclude = true;
63 amroutine->amusemaintenanceworkmem = false;
64 amroutine->amparallelvacuumoptions =
65 VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
66 amroutine->amkeytype = InvalidOid;
67
68 amroutine->ambuild = spgbuild;
69 amroutine->ambuildempty = spgbuildempty;
70 amroutine->aminsert = spginsert;
71 amroutine->ambulkdelete = spgbulkdelete;
72 amroutine->amvacuumcleanup = spgvacuumcleanup;
73 amroutine->amcanreturn = spgcanreturn;
74 amroutine->amcostestimate = spgcostestimate;
75 amroutine->amoptions = spgoptions;
76 amroutine->amproperty = spgproperty;
77 amroutine->ambuildphasename = NULL;
78 amroutine->amvalidate = spgvalidate;
79 amroutine->amadjustmembers = spgadjustmembers;
80 amroutine->ambeginscan = spgbeginscan;
81 amroutine->amrescan = spgrescan;
82 amroutine->amgettuple = spggettuple;
83 amroutine->amgetbitmap = spggetbitmap;
84 amroutine->amendscan = spgendscan;
85 amroutine->ammarkpos = NULL;
86 amroutine->amrestrpos = NULL;
87 amroutine->amestimateparallelscan = NULL;
88 amroutine->aminitparallelscan = NULL;
89 amroutine->amparallelrescan = NULL;
90
91 PG_RETURN_POINTER(amroutine);
92 }
93
94 /*
95 * GetIndexInputType
96 * Determine the nominal input data type for an index column
97 *
98 * We define the "nominal" input type as the associated opclass's opcintype,
99 * or if that is a polymorphic type, the base type of the heap column or
100 * expression that is the index's input. The reason for preferring the
101 * opcintype is that non-polymorphic opclasses probably don't want to hear
102 * about binary-compatible input types. For instance, if a text opclass
103 * is being used with a varchar heap column, we want to report "text" not
104 * "varchar". Likewise, opclasses don't want to hear about domain types,
105 * so if we do consult the actual input type, we make sure to flatten domains.
106 *
107 * At some point maybe this should go somewhere else, but it's not clear
108 * if any other index AMs have a use for it.
109 */
110 static Oid
GetIndexInputType(Relation index,AttrNumber indexcol)111 GetIndexInputType(Relation index, AttrNumber indexcol)
112 {
113 Oid opcintype;
114 AttrNumber heapcol;
115 List *indexprs;
116 ListCell *indexpr_item;
117
118 Assert(index->rd_index != NULL);
119 Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts);
120 opcintype = index->rd_opcintype[indexcol - 1];
121 if (!IsPolymorphicType(opcintype))
122 return opcintype;
123 heapcol = index->rd_index->indkey.values[indexcol - 1];
124 if (heapcol != 0) /* Simple index column? */
125 return getBaseType(get_atttype(index->rd_index->indrelid, heapcol));
126
127 /*
128 * If the index expressions are already cached, skip calling
129 * RelationGetIndexExpressions, as it will make a copy which is overkill.
130 * We're not going to modify the trees, and we're not going to do anything
131 * that would invalidate the relcache entry before we're done.
132 */
133 if (index->rd_indexprs)
134 indexprs = index->rd_indexprs;
135 else
136 indexprs = RelationGetIndexExpressions(index);
137 indexpr_item = list_head(indexprs);
138 for (int i = 1; i <= index->rd_index->indnkeyatts; i++)
139 {
140 if (index->rd_index->indkey.values[i - 1] == 0)
141 {
142 /* expression column */
143 if (indexpr_item == NULL)
144 elog(ERROR, "wrong number of index expressions");
145 if (i == indexcol)
146 return getBaseType(exprType((Node *) lfirst(indexpr_item)));
147 indexpr_item = lnext(indexprs, indexpr_item);
148 }
149 }
150 elog(ERROR, "wrong number of index expressions");
151 return InvalidOid; /* keep compiler quiet */
152 }
153
154 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
155 static void
fillTypeDesc(SpGistTypeDesc * desc,Oid type)156 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
157 {
158 HeapTuple tp;
159 Form_pg_type typtup;
160
161 desc->type = type;
162 tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
163 if (!HeapTupleIsValid(tp))
164 elog(ERROR, "cache lookup failed for type %u", type);
165 typtup = (Form_pg_type) GETSTRUCT(tp);
166 desc->attlen = typtup->typlen;
167 desc->attbyval = typtup->typbyval;
168 desc->attalign = typtup->typalign;
169 desc->attstorage = typtup->typstorage;
170 ReleaseSysCache(tp);
171 }
172
173 /*
174 * Fetch local cache of AM-specific info about the index, initializing it
175 * if necessary
176 */
177 SpGistCache *
spgGetCache(Relation index)178 spgGetCache(Relation index)
179 {
180 SpGistCache *cache;
181
182 if (index->rd_amcache == NULL)
183 {
184 Oid atttype;
185 spgConfigIn in;
186 FmgrInfo *procinfo;
187 Buffer metabuffer;
188 SpGistMetaPageData *metadata;
189
190 cache = MemoryContextAllocZero(index->rd_indexcxt,
191 sizeof(SpGistCache));
192
193 /* SPGiST must have one key column and can also have INCLUDE columns */
194 Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1);
195 Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS);
196
197 /*
198 * Get the actual (well, nominal) data type of the key column. We
199 * pass this to the opclass config function so that polymorphic
200 * opclasses are possible.
201 */
202 atttype = GetIndexInputType(index, spgKeyColumn + 1);
203
204 /* Call the config function to get config info for the opclass */
205 in.attType = atttype;
206
207 procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
208 FunctionCall2Coll(procinfo,
209 index->rd_indcollation[spgKeyColumn],
210 PointerGetDatum(&in),
211 PointerGetDatum(&cache->config));
212
213 /*
214 * If leafType isn't specified, use the declared index column type,
215 * which index.c will have derived from the opclass's opcintype.
216 * (Although we now make spgvalidate.c warn if these aren't the same,
217 * old user-defined opclasses may not set the STORAGE parameter
218 * correctly, so believe leafType if it's given.)
219 */
220 if (!OidIsValid(cache->config.leafType))
221 cache->config.leafType =
222 TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid;
223
224 /* Get the information we need about each relevant datatype */
225 fillTypeDesc(&cache->attType, atttype);
226
227 if (cache->config.leafType != atttype)
228 {
229 if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
230 ereport(ERROR,
231 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
232 errmsg("compress method must be defined when leaf type is different from input type")));
233
234 fillTypeDesc(&cache->attLeafType, cache->config.leafType);
235 }
236 else
237 {
238 /* Save lookups in this common case */
239 cache->attLeafType = cache->attType;
240 }
241
242 fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
243 fillTypeDesc(&cache->attLabelType, cache->config.labelType);
244
245 /* Last, get the lastUsedPages data from the metapage */
246 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
247 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
248
249 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
250
251 if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
252 elog(ERROR, "index \"%s\" is not an SP-GiST index",
253 RelationGetRelationName(index));
254
255 cache->lastUsedPages = metadata->lastUsedPages;
256
257 UnlockReleaseBuffer(metabuffer);
258
259 index->rd_amcache = (void *) cache;
260 }
261 else
262 {
263 /* assume it's up to date */
264 cache = (SpGistCache *) index->rd_amcache;
265 }
266
267 return cache;
268 }
269
270 /*
271 * Compute a tuple descriptor for leaf tuples or index-only-scan result tuples.
272 *
273 * We can use the relcache's tupdesc as-is in many cases, and it's always
274 * OK so far as any INCLUDE columns are concerned. However, the entry for
275 * the key column has to match leafType in the first case or attType in the
276 * second case. While the relcache's tupdesc *should* show leafType, this
277 * might not hold for legacy user-defined opclasses, since before v14 they
278 * were not allowed to declare their true storage type in CREATE OPCLASS.
279 * Also, attType can be different from what is in the relcache.
280 *
281 * This function gives back either a pointer to the relcache's tupdesc
282 * if that is suitable, or a palloc'd copy that's been adjusted to match
283 * the specified key column type. We can avoid doing any catalog lookups
284 * here by insisting that the caller pass an SpGistTypeDesc not just an OID.
285 */
286 TupleDesc
getSpGistTupleDesc(Relation index,SpGistTypeDesc * keyType)287 getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType)
288 {
289 TupleDesc outTupDesc;
290 Form_pg_attribute att;
291
292 if (keyType->type ==
293 TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid)
294 outTupDesc = RelationGetDescr(index);
295 else
296 {
297 outTupDesc = CreateTupleDescCopy(RelationGetDescr(index));
298 att = TupleDescAttr(outTupDesc, spgKeyColumn);
299 /* It's sufficient to update the type-dependent fields of the column */
300 att->atttypid = keyType->type;
301 att->atttypmod = -1;
302 att->attlen = keyType->attlen;
303 att->attbyval = keyType->attbyval;
304 att->attalign = keyType->attalign;
305 att->attstorage = keyType->attstorage;
306 /* We shouldn't need to bother with making these valid: */
307 att->attcompression = InvalidCompressionMethod;
308 att->attcollation = InvalidOid;
309 /* In case we changed typlen, we'd better reset following offsets */
310 for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++)
311 TupleDescAttr(outTupDesc, i)->attcacheoff = -1;
312 }
313 return outTupDesc;
314 }
315
316 /* Initialize SpGistState for working with the given index */
317 void
initSpGistState(SpGistState * state,Relation index)318 initSpGistState(SpGistState *state, Relation index)
319 {
320 SpGistCache *cache;
321
322 state->index = index;
323
324 /* Get cached static information about index */
325 cache = spgGetCache(index);
326
327 state->config = cache->config;
328 state->attType = cache->attType;
329 state->attLeafType = cache->attLeafType;
330 state->attPrefixType = cache->attPrefixType;
331 state->attLabelType = cache->attLabelType;
332
333 /* Ensure we have a valid descriptor for leaf tuples */
334 state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType);
335
336 /* Make workspace for constructing dead tuples */
337 state->deadTupleStorage = palloc0(SGDTSIZE);
338
339 /* Set XID to use in redirection tuples */
340 state->myXid = GetTopTransactionIdIfAny();
341
342 /* Assume we're not in an index build (spgbuild will override) */
343 state->isBuild = false;
344 }
345
346 /*
347 * Allocate a new page (either by recycling, or by extending the index file).
348 *
349 * The returned buffer is already pinned and exclusive-locked.
350 * Caller is responsible for initializing the page by calling SpGistInitBuffer.
351 */
352 Buffer
SpGistNewBuffer(Relation index)353 SpGistNewBuffer(Relation index)
354 {
355 Buffer buffer;
356 bool needLock;
357
358 /* First, try to get a page from FSM */
359 for (;;)
360 {
361 BlockNumber blkno = GetFreeIndexPage(index);
362
363 if (blkno == InvalidBlockNumber)
364 break; /* nothing known to FSM */
365
366 /*
367 * The fixed pages shouldn't ever be listed in FSM, but just in case
368 * one is, ignore it.
369 */
370 if (SpGistBlockIsFixed(blkno))
371 continue;
372
373 buffer = ReadBuffer(index, blkno);
374
375 /*
376 * We have to guard against the possibility that someone else already
377 * recycled this page; the buffer may be locked if so.
378 */
379 if (ConditionalLockBuffer(buffer))
380 {
381 Page page = BufferGetPage(buffer);
382
383 if (PageIsNew(page))
384 return buffer; /* OK to use, if never initialized */
385
386 if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
387 return buffer; /* OK to use */
388
389 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
390 }
391
392 /* Can't use it, so release buffer and try again */
393 ReleaseBuffer(buffer);
394 }
395
396 /* Must extend the file */
397 needLock = !RELATION_IS_LOCAL(index);
398 if (needLock)
399 LockRelationForExtension(index, ExclusiveLock);
400
401 buffer = ReadBuffer(index, P_NEW);
402 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
403
404 if (needLock)
405 UnlockRelationForExtension(index, ExclusiveLock);
406
407 return buffer;
408 }
409
410 /*
411 * Update index metapage's lastUsedPages info from local cache, if possible
412 *
413 * Updating meta page isn't critical for index working, so
414 * 1 use ConditionalLockBuffer to improve concurrency
415 * 2 don't WAL-log metabuffer changes to decrease WAL traffic
416 */
417 void
SpGistUpdateMetaPage(Relation index)418 SpGistUpdateMetaPage(Relation index)
419 {
420 SpGistCache *cache = (SpGistCache *) index->rd_amcache;
421
422 if (cache != NULL)
423 {
424 Buffer metabuffer;
425
426 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
427
428 if (ConditionalLockBuffer(metabuffer))
429 {
430 Page metapage = BufferGetPage(metabuffer);
431 SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
432
433 metadata->lastUsedPages = cache->lastUsedPages;
434
435 /*
436 * Set pd_lower just past the end of the metadata. This is
437 * essential, because without doing so, metadata will be lost if
438 * xlog.c compresses the page. (We must do this here because
439 * pre-v11 versions of PG did not set the metapage's pd_lower
440 * correctly, so a pg_upgraded index might contain the wrong
441 * value.)
442 */
443 ((PageHeader) metapage)->pd_lower =
444 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
445
446 MarkBufferDirty(metabuffer);
447 UnlockReleaseBuffer(metabuffer);
448 }
449 else
450 {
451 ReleaseBuffer(metabuffer);
452 }
453 }
454 }
455
456 /* Macro to select proper element of lastUsedPages cache depending on flags */
457 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
458 #define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
459
460 /*
461 * Allocate and initialize a new buffer of the type and parity specified by
462 * flags. The returned buffer is already pinned and exclusive-locked.
463 *
464 * When requesting an inner page, if we get one with the wrong parity,
465 * we just release the buffer and try again. We will get a different page
466 * because GetFreeIndexPage will have marked the page used in FSM. The page
467 * is entered in our local lastUsedPages cache, so there's some hope of
468 * making use of it later in this session, but otherwise we rely on VACUUM
469 * to eventually re-enter the page in FSM, making it available for recycling.
470 * Note that such a page does not get marked dirty here, so unless it's used
471 * fairly soon, the buffer will just get discarded and the page will remain
472 * as it was on disk.
473 *
474 * When we return a buffer to the caller, the page is *not* entered into
475 * the lastUsedPages cache; we expect the caller will do so after it's taken
476 * whatever space it will use. This is because after the caller has used up
477 * some space, the page might have less space than whatever was cached already
478 * so we'd rather not trash the old cache entry.
479 */
480 static Buffer
allocNewBuffer(Relation index,int flags)481 allocNewBuffer(Relation index, int flags)
482 {
483 SpGistCache *cache = spgGetCache(index);
484 uint16 pageflags = 0;
485
486 if (GBUF_REQ_LEAF(flags))
487 pageflags |= SPGIST_LEAF;
488 if (GBUF_REQ_NULLS(flags))
489 pageflags |= SPGIST_NULLS;
490
491 for (;;)
492 {
493 Buffer buffer;
494
495 buffer = SpGistNewBuffer(index);
496 SpGistInitBuffer(buffer, pageflags);
497
498 if (pageflags & SPGIST_LEAF)
499 {
500 /* Leaf pages have no parity concerns, so just use it */
501 return buffer;
502 }
503 else
504 {
505 BlockNumber blkno = BufferGetBlockNumber(buffer);
506 int blkFlags = GBUF_INNER_PARITY(blkno);
507
508 if ((flags & GBUF_PARITY_MASK) == blkFlags)
509 {
510 /* Page has right parity, use it */
511 return buffer;
512 }
513 else
514 {
515 /* Page has wrong parity, record it in cache and try again */
516 if (pageflags & SPGIST_NULLS)
517 blkFlags |= GBUF_NULLS;
518 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
519 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
520 PageGetExactFreeSpace(BufferGetPage(buffer));
521 UnlockReleaseBuffer(buffer);
522 }
523 }
524 }
525 }
526
527 /*
528 * Get a buffer of the type and parity specified by flags, having at least
529 * as much free space as indicated by needSpace. We use the lastUsedPages
530 * cache to assign the same buffer previously requested when possible.
531 * The returned buffer is already pinned and exclusive-locked.
532 *
533 * *isNew is set true if the page was initialized here, false if it was
534 * already valid.
535 */
536 Buffer
SpGistGetBuffer(Relation index,int flags,int needSpace,bool * isNew)537 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
538 {
539 SpGistCache *cache = spgGetCache(index);
540 SpGistLastUsedPage *lup;
541
542 /* Bail out if even an empty page wouldn't meet the demand */
543 if (needSpace > SPGIST_PAGE_CAPACITY)
544 elog(ERROR, "desired SPGiST tuple size is too big");
545
546 /*
547 * If possible, increase the space request to include relation's
548 * fillfactor. This ensures that when we add unrelated tuples to a page,
549 * we try to keep 100-fillfactor% available for adding tuples that are
550 * related to the ones already on it. But fillfactor mustn't cause an
551 * error for requests that would otherwise be legal.
552 */
553 needSpace += SpGistGetTargetPageFreeSpace(index);
554 needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
555
556 /* Get the cache entry for this flags setting */
557 lup = GET_LUP(cache, flags);
558
559 /* If we have nothing cached, just turn it over to allocNewBuffer */
560 if (lup->blkno == InvalidBlockNumber)
561 {
562 *isNew = true;
563 return allocNewBuffer(index, flags);
564 }
565
566 /* fixed pages should never be in cache */
567 Assert(!SpGistBlockIsFixed(lup->blkno));
568
569 /* If cached freeSpace isn't enough, don't bother looking at the page */
570 if (lup->freeSpace >= needSpace)
571 {
572 Buffer buffer;
573 Page page;
574
575 buffer = ReadBuffer(index, lup->blkno);
576
577 if (!ConditionalLockBuffer(buffer))
578 {
579 /*
580 * buffer is locked by another process, so return a new buffer
581 */
582 ReleaseBuffer(buffer);
583 *isNew = true;
584 return allocNewBuffer(index, flags);
585 }
586
587 page = BufferGetPage(buffer);
588
589 if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
590 {
591 /* OK to initialize the page */
592 uint16 pageflags = 0;
593
594 if (GBUF_REQ_LEAF(flags))
595 pageflags |= SPGIST_LEAF;
596 if (GBUF_REQ_NULLS(flags))
597 pageflags |= SPGIST_NULLS;
598 SpGistInitBuffer(buffer, pageflags);
599 lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
600 *isNew = true;
601 return buffer;
602 }
603
604 /*
605 * Check that page is of right type and has enough space. We must
606 * recheck this since our cache isn't necessarily up to date.
607 */
608 if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
609 (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
610 {
611 int freeSpace = PageGetExactFreeSpace(page);
612
613 if (freeSpace >= needSpace)
614 {
615 /* Success, update freespace info and return the buffer */
616 lup->freeSpace = freeSpace - needSpace;
617 *isNew = false;
618 return buffer;
619 }
620 }
621
622 /*
623 * fallback to allocation of new buffer
624 */
625 UnlockReleaseBuffer(buffer);
626 }
627
628 /* No success with cache, so return a new buffer */
629 *isNew = true;
630 return allocNewBuffer(index, flags);
631 }
632
633 /*
634 * Update lastUsedPages cache when done modifying a page.
635 *
636 * We update the appropriate cache entry if it already contained this page
637 * (its freeSpace is likely obsolete), or if this page has more space than
638 * whatever we had cached.
639 */
640 void
SpGistSetLastUsedPage(Relation index,Buffer buffer)641 SpGistSetLastUsedPage(Relation index, Buffer buffer)
642 {
643 SpGistCache *cache = spgGetCache(index);
644 SpGistLastUsedPage *lup;
645 int freeSpace;
646 Page page = BufferGetPage(buffer);
647 BlockNumber blkno = BufferGetBlockNumber(buffer);
648 int flags;
649
650 /* Never enter fixed pages (root pages) in cache, though */
651 if (SpGistBlockIsFixed(blkno))
652 return;
653
654 if (SpGistPageIsLeaf(page))
655 flags = GBUF_LEAF;
656 else
657 flags = GBUF_INNER_PARITY(blkno);
658 if (SpGistPageStoresNulls(page))
659 flags |= GBUF_NULLS;
660
661 lup = GET_LUP(cache, flags);
662
663 freeSpace = PageGetExactFreeSpace(page);
664 if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
665 lup->freeSpace < freeSpace)
666 {
667 lup->blkno = blkno;
668 lup->freeSpace = freeSpace;
669 }
670 }
671
672 /*
673 * Initialize an SPGiST page to empty, with specified flags
674 */
675 void
SpGistInitPage(Page page,uint16 f)676 SpGistInitPage(Page page, uint16 f)
677 {
678 SpGistPageOpaque opaque;
679
680 PageInit(page, BLCKSZ, sizeof(SpGistPageOpaqueData));
681 opaque = SpGistPageGetOpaque(page);
682 opaque->flags = f;
683 opaque->spgist_page_id = SPGIST_PAGE_ID;
684 }
685
686 /*
687 * Initialize a buffer's page to empty, with specified flags
688 */
689 void
SpGistInitBuffer(Buffer b,uint16 f)690 SpGistInitBuffer(Buffer b, uint16 f)
691 {
692 Assert(BufferGetPageSize(b) == BLCKSZ);
693 SpGistInitPage(BufferGetPage(b), f);
694 }
695
696 /*
697 * Initialize metadata page
698 */
699 void
SpGistInitMetapage(Page page)700 SpGistInitMetapage(Page page)
701 {
702 SpGistMetaPageData *metadata;
703 int i;
704
705 SpGistInitPage(page, SPGIST_META);
706 metadata = SpGistPageGetMeta(page);
707 memset(metadata, 0, sizeof(SpGistMetaPageData));
708 metadata->magicNumber = SPGIST_MAGIC_NUMBER;
709
710 /* initialize last-used-page cache to empty */
711 for (i = 0; i < SPGIST_CACHED_PAGES; i++)
712 metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
713
714 /*
715 * Set pd_lower just past the end of the metadata. This is essential,
716 * because without doing so, metadata will be lost if xlog.c compresses
717 * the page.
718 */
719 ((PageHeader) page)->pd_lower =
720 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
721 }
722
723 /*
724 * reloptions processing for SPGiST
725 */
726 bytea *
spgoptions(Datum reloptions,bool validate)727 spgoptions(Datum reloptions, bool validate)
728 {
729 static const relopt_parse_elt tab[] = {
730 {"fillfactor", RELOPT_TYPE_INT, offsetof(SpGistOptions, fillfactor)},
731 };
732
733 return (bytea *) build_reloptions(reloptions, validate,
734 RELOPT_KIND_SPGIST,
735 sizeof(SpGistOptions),
736 tab, lengthof(tab));
737
738 }
739
740 /*
741 * Get the space needed to store a non-null datum of the indicated type
742 * in an inner tuple (that is, as a prefix or node label).
743 * Note the result is already rounded up to a MAXALIGN boundary.
744 * Here we follow the convention that pass-by-val types are just stored
745 * in their Datum representation (compare memcpyInnerDatum).
746 */
747 unsigned int
SpGistGetInnerTypeSize(SpGistTypeDesc * att,Datum datum)748 SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum)
749 {
750 unsigned int size;
751
752 if (att->attbyval)
753 size = sizeof(Datum);
754 else if (att->attlen > 0)
755 size = att->attlen;
756 else
757 size = VARSIZE_ANY(datum);
758
759 return MAXALIGN(size);
760 }
761
762 /*
763 * Copy the given non-null datum to *target, in the inner-tuple case
764 */
765 static void
memcpyInnerDatum(void * target,SpGistTypeDesc * att,Datum datum)766 memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum)
767 {
768 unsigned int size;
769
770 if (att->attbyval)
771 {
772 memcpy(target, &datum, sizeof(Datum));
773 }
774 else
775 {
776 size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
777 memcpy(target, DatumGetPointer(datum), size);
778 }
779 }
780
781 /*
782 * Compute space required for a leaf tuple holding the given data.
783 *
784 * This must match the size-calculation portion of spgFormLeafTuple.
785 */
786 Size
SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,Datum * datums,bool * isnulls)787 SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,
788 Datum *datums, bool *isnulls)
789 {
790 Size size;
791 Size data_size;
792 bool needs_null_mask = false;
793 int natts = tupleDescriptor->natts;
794
795 /*
796 * Decide whether we need a nulls bitmask.
797 *
798 * If there is only a key attribute (natts == 1), never use a bitmask, for
799 * compatibility with the pre-v14 layout of leaf tuples. Otherwise, we
800 * need one if any attribute is null.
801 */
802 if (natts > 1)
803 {
804 for (int i = 0; i < natts; i++)
805 {
806 if (isnulls[i])
807 {
808 needs_null_mask = true;
809 break;
810 }
811 }
812 }
813
814 /*
815 * Calculate size of the data part; same as for heap tuples.
816 */
817 data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
818
819 /*
820 * Compute total size.
821 */
822 size = SGLTHDRSZ(needs_null_mask);
823 size += data_size;
824 size = MAXALIGN(size);
825
826 /*
827 * Ensure that we can replace the tuple with a dead tuple later. This test
828 * is unnecessary when there are any non-null attributes, but be safe.
829 */
830 if (size < SGDTSIZE)
831 size = SGDTSIZE;
832
833 return size;
834 }
835
836 /*
837 * Construct a leaf tuple containing the given heap TID and datum values
838 */
839 SpGistLeafTuple
spgFormLeafTuple(SpGistState * state,ItemPointer heapPtr,Datum * datums,bool * isnulls)840 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
841 Datum *datums, bool *isnulls)
842 {
843 SpGistLeafTuple tup;
844 TupleDesc tupleDescriptor = state->leafTupDesc;
845 Size size;
846 Size hoff;
847 Size data_size;
848 bool needs_null_mask = false;
849 int natts = tupleDescriptor->natts;
850 char *tp; /* ptr to tuple data */
851 uint16 tupmask = 0; /* unused heap_fill_tuple output */
852
853 /*
854 * Decide whether we need a nulls bitmask.
855 *
856 * If there is only a key attribute (natts == 1), never use a bitmask, for
857 * compatibility with the pre-v14 layout of leaf tuples. Otherwise, we
858 * need one if any attribute is null.
859 */
860 if (natts > 1)
861 {
862 for (int i = 0; i < natts; i++)
863 {
864 if (isnulls[i])
865 {
866 needs_null_mask = true;
867 break;
868 }
869 }
870 }
871
872 /*
873 * Calculate size of the data part; same as for heap tuples.
874 */
875 data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
876
877 /*
878 * Compute total size.
879 */
880 hoff = SGLTHDRSZ(needs_null_mask);
881 size = hoff + data_size;
882 size = MAXALIGN(size);
883
884 /*
885 * Ensure that we can replace the tuple with a dead tuple later. This test
886 * is unnecessary when there are any non-null attributes, but be safe.
887 */
888 if (size < SGDTSIZE)
889 size = SGDTSIZE;
890
891 /* OK, form the tuple */
892 tup = (SpGistLeafTuple) palloc0(size);
893
894 tup->size = size;
895 SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber);
896 tup->heapPtr = *heapPtr;
897
898 tp = (char *) tup + hoff;
899
900 if (needs_null_mask)
901 {
902 bits8 *bp; /* ptr to null bitmap in tuple */
903
904 /* Set nullmask presence bit in SpGistLeafTuple header */
905 SGLT_SET_HASNULLMASK(tup, true);
906 /* Fill the data area and null mask */
907 bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
908 heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
909 &tupmask, bp);
910 }
911 else if (natts > 1 || !isnulls[spgKeyColumn])
912 {
913 /* Fill data area only */
914 heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
915 &tupmask, (bits8 *) NULL);
916 }
917 /* otherwise we have no data, nor a bitmap, to fill */
918
919 return tup;
920 }
921
922 /*
923 * Construct a node (to go into an inner tuple) containing the given label
924 *
925 * Note that the node's downlink is just set invalid here. Caller will fill
926 * it in later.
927 */
928 SpGistNodeTuple
spgFormNodeTuple(SpGistState * state,Datum label,bool isnull)929 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
930 {
931 SpGistNodeTuple tup;
932 unsigned int size;
933 unsigned short infomask = 0;
934
935 /* compute space needed (note result is already maxaligned) */
936 size = SGNTHDRSZ;
937 if (!isnull)
938 size += SpGistGetInnerTypeSize(&state->attLabelType, label);
939
940 /*
941 * Here we make sure that the size will fit in the field reserved for it
942 * in t_info.
943 */
944 if ((size & INDEX_SIZE_MASK) != size)
945 ereport(ERROR,
946 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
947 errmsg("index row requires %zu bytes, maximum size is %zu",
948 (Size) size, (Size) INDEX_SIZE_MASK)));
949
950 tup = (SpGistNodeTuple) palloc0(size);
951
952 if (isnull)
953 infomask |= INDEX_NULL_MASK;
954 /* we don't bother setting the INDEX_VAR_MASK bit */
955 infomask |= size;
956 tup->t_info = infomask;
957
958 /* The TID field will be filled in later */
959 ItemPointerSetInvalid(&tup->t_tid);
960
961 if (!isnull)
962 memcpyInnerDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
963
964 return tup;
965 }
966
967 /*
968 * Construct an inner tuple containing the given prefix and node array
969 */
970 SpGistInnerTuple
spgFormInnerTuple(SpGistState * state,bool hasPrefix,Datum prefix,int nNodes,SpGistNodeTuple * nodes)971 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
972 int nNodes, SpGistNodeTuple *nodes)
973 {
974 SpGistInnerTuple tup;
975 unsigned int size;
976 unsigned int prefixSize;
977 int i;
978 char *ptr;
979
980 /* Compute size needed */
981 if (hasPrefix)
982 prefixSize = SpGistGetInnerTypeSize(&state->attPrefixType, prefix);
983 else
984 prefixSize = 0;
985
986 size = SGITHDRSZ + prefixSize;
987
988 /* Note: we rely on node tuple sizes to be maxaligned already */
989 for (i = 0; i < nNodes; i++)
990 size += IndexTupleSize(nodes[i]);
991
992 /*
993 * Ensure that we can replace the tuple with a dead tuple later. This
994 * test is unnecessary given current tuple layouts, but let's be safe.
995 */
996 if (size < SGDTSIZE)
997 size = SGDTSIZE;
998
999 /*
1000 * Inner tuple should be small enough to fit on a page
1001 */
1002 if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
1003 ereport(ERROR,
1004 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1005 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
1006 (Size) size,
1007 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1008 errhint("Values larger than a buffer page cannot be indexed.")));
1009
1010 /*
1011 * Check for overflow of header fields --- probably can't fail if the
1012 * above succeeded, but let's be paranoid
1013 */
1014 if (size > SGITMAXSIZE ||
1015 prefixSize > SGITMAXPREFIXSIZE ||
1016 nNodes > SGITMAXNNODES)
1017 elog(ERROR, "SPGiST inner tuple header field is too small");
1018
1019 /* OK, form the tuple */
1020 tup = (SpGistInnerTuple) palloc0(size);
1021
1022 tup->nNodes = nNodes;
1023 tup->prefixSize = prefixSize;
1024 tup->size = size;
1025
1026 if (hasPrefix)
1027 memcpyInnerDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
1028
1029 ptr = (char *) SGITNODEPTR(tup);
1030
1031 for (i = 0; i < nNodes; i++)
1032 {
1033 SpGistNodeTuple node = nodes[i];
1034
1035 memcpy(ptr, node, IndexTupleSize(node));
1036 ptr += IndexTupleSize(node);
1037 }
1038
1039 return tup;
1040 }
1041
1042 /*
1043 * Construct a "dead" tuple to replace a tuple being deleted.
1044 *
1045 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
1046 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
1047 * the xid field is filled in automatically.
1048 *
1049 * This is called in critical sections, so we don't use palloc; the tuple
1050 * is built in preallocated storage. It should be copied before another
1051 * call with different parameters can occur.
1052 */
1053 SpGistDeadTuple
spgFormDeadTuple(SpGistState * state,int tupstate,BlockNumber blkno,OffsetNumber offnum)1054 spgFormDeadTuple(SpGistState *state, int tupstate,
1055 BlockNumber blkno, OffsetNumber offnum)
1056 {
1057 SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
1058
1059 tuple->tupstate = tupstate;
1060 tuple->size = SGDTSIZE;
1061 SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber);
1062
1063 if (tupstate == SPGIST_REDIRECT)
1064 {
1065 ItemPointerSet(&tuple->pointer, blkno, offnum);
1066 Assert(TransactionIdIsValid(state->myXid));
1067 tuple->xid = state->myXid;
1068 }
1069 else
1070 {
1071 ItemPointerSetInvalid(&tuple->pointer);
1072 tuple->xid = InvalidTransactionId;
1073 }
1074
1075 return tuple;
1076 }
1077
1078 /*
1079 * Convert an SPGiST leaf tuple into Datum/isnull arrays.
1080 *
1081 * The caller must allocate sufficient storage for the output arrays.
1082 * (INDEX_MAX_KEYS entries should be enough.)
1083 */
1084 void
spgDeformLeafTuple(SpGistLeafTuple tup,TupleDesc tupleDescriptor,Datum * datums,bool * isnulls,bool keyColumnIsNull)1085 spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor,
1086 Datum *datums, bool *isnulls, bool keyColumnIsNull)
1087 {
1088 bool hasNullsMask = SGLT_GET_HASNULLMASK(tup);
1089 char *tp; /* ptr to tuple data */
1090 bits8 *bp; /* ptr to null bitmap in tuple */
1091
1092 if (keyColumnIsNull && tupleDescriptor->natts == 1)
1093 {
1094 /*
1095 * Trivial case: there is only the key attribute and we're in a nulls
1096 * tree. The hasNullsMask bit in the tuple header should not be set
1097 * (and thus we can't use index_deform_tuple_internal), but
1098 * nonetheless the result is NULL.
1099 *
1100 * Note: currently this is dead code, because noplace calls this when
1101 * there is only the key attribute. But we should cover the case.
1102 */
1103 Assert(!hasNullsMask);
1104
1105 datums[spgKeyColumn] = (Datum) 0;
1106 isnulls[spgKeyColumn] = true;
1107 return;
1108 }
1109
1110 tp = (char *) tup + SGLTHDRSZ(hasNullsMask);
1111 bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
1112
1113 index_deform_tuple_internal(tupleDescriptor,
1114 datums, isnulls,
1115 tp, bp, hasNullsMask);
1116
1117 /*
1118 * Key column isnull value from the tuple should be consistent with
1119 * keyColumnIsNull flag from the caller.
1120 */
1121 Assert(keyColumnIsNull == isnulls[spgKeyColumn]);
1122 }
1123
1124 /*
1125 * Extract the label datums of the nodes within innerTuple
1126 *
1127 * Returns NULL if label datums are NULLs
1128 */
1129 Datum *
spgExtractNodeLabels(SpGistState * state,SpGistInnerTuple innerTuple)1130 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
1131 {
1132 Datum *nodeLabels;
1133 int i;
1134 SpGistNodeTuple node;
1135
1136 /* Either all the labels must be NULL, or none. */
1137 node = SGITNODEPTR(innerTuple);
1138 if (IndexTupleHasNulls(node))
1139 {
1140 SGITITERATE(innerTuple, i, node)
1141 {
1142 if (!IndexTupleHasNulls(node))
1143 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
1144 }
1145 /* They're all null, so just return NULL */
1146 return NULL;
1147 }
1148 else
1149 {
1150 nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
1151 SGITITERATE(innerTuple, i, node)
1152 {
1153 if (IndexTupleHasNulls(node))
1154 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
1155 nodeLabels[i] = SGNTDATUM(node, state);
1156 }
1157 return nodeLabels;
1158 }
1159 }
1160
1161 /*
1162 * Add a new item to the page, replacing a PLACEHOLDER item if possible.
1163 * Return the location it's inserted at, or InvalidOffsetNumber on failure.
1164 *
1165 * If startOffset isn't NULL, we start searching for placeholders at
1166 * *startOffset, and update that to the next place to search. This is just
1167 * an optimization for repeated insertions.
1168 *
1169 * If errorOK is false, we throw error when there's not enough room,
1170 * rather than returning InvalidOffsetNumber.
1171 */
1172 OffsetNumber
SpGistPageAddNewItem(SpGistState * state,Page page,Item item,Size size,OffsetNumber * startOffset,bool errorOK)1173 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
1174 OffsetNumber *startOffset, bool errorOK)
1175 {
1176 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
1177 OffsetNumber i,
1178 maxoff,
1179 offnum;
1180
1181 if (opaque->nPlaceholder > 0 &&
1182 PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
1183 {
1184 /* Try to replace a placeholder */
1185 maxoff = PageGetMaxOffsetNumber(page);
1186 offnum = InvalidOffsetNumber;
1187
1188 for (;;)
1189 {
1190 if (startOffset && *startOffset != InvalidOffsetNumber)
1191 i = *startOffset;
1192 else
1193 i = FirstOffsetNumber;
1194 for (; i <= maxoff; i++)
1195 {
1196 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
1197 PageGetItemId(page, i));
1198
1199 if (it->tupstate == SPGIST_PLACEHOLDER)
1200 {
1201 offnum = i;
1202 break;
1203 }
1204 }
1205
1206 /* Done if we found a placeholder */
1207 if (offnum != InvalidOffsetNumber)
1208 break;
1209
1210 if (startOffset && *startOffset != InvalidOffsetNumber)
1211 {
1212 /* Hint was no good, re-search from beginning */
1213 *startOffset = InvalidOffsetNumber;
1214 continue;
1215 }
1216
1217 /* Hmm, no placeholder found? */
1218 opaque->nPlaceholder = 0;
1219 break;
1220 }
1221
1222 if (offnum != InvalidOffsetNumber)
1223 {
1224 /* Replace the placeholder tuple */
1225 PageIndexTupleDelete(page, offnum);
1226
1227 offnum = PageAddItem(page, item, size, offnum, false, false);
1228
1229 /*
1230 * We should not have failed given the size check at the top of
1231 * the function, but test anyway. If we did fail, we must PANIC
1232 * because we've already deleted the placeholder tuple, and
1233 * there's no other way to keep the damage from getting to disk.
1234 */
1235 if (offnum != InvalidOffsetNumber)
1236 {
1237 Assert(opaque->nPlaceholder > 0);
1238 opaque->nPlaceholder--;
1239 if (startOffset)
1240 *startOffset = offnum + 1;
1241 }
1242 else
1243 elog(PANIC, "failed to add item of size %u to SPGiST index page",
1244 (int) size);
1245
1246 return offnum;
1247 }
1248 }
1249
1250 /* No luck in replacing a placeholder, so just add it to the page */
1251 offnum = PageAddItem(page, item, size,
1252 InvalidOffsetNumber, false, false);
1253
1254 if (offnum == InvalidOffsetNumber && !errorOK)
1255 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1256 (int) size);
1257
1258 return offnum;
1259 }
1260
1261 /*
1262 * spgproperty() -- Check boolean properties of indexes.
1263 *
1264 * This is optional for most AMs, but is required for SP-GiST because the core
1265 * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
1266 */
1267 bool
spgproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)1268 spgproperty(Oid index_oid, int attno,
1269 IndexAMProperty prop, const char *propname,
1270 bool *res, bool *isnull)
1271 {
1272 Oid opclass,
1273 opfamily,
1274 opcintype;
1275 CatCList *catlist;
1276 int i;
1277
1278 /* Only answer column-level inquiries */
1279 if (attno == 0)
1280 return false;
1281
1282 switch (prop)
1283 {
1284 case AMPROP_DISTANCE_ORDERABLE:
1285 break;
1286 default:
1287 return false;
1288 }
1289
1290 /*
1291 * Currently, SP-GiST distance-ordered scans require that there be a
1292 * distance operator in the opclass with the default types. So we assume
1293 * that if such a operator exists, then there's a reason for it.
1294 */
1295
1296 /* First we need to know the column's opclass. */
1297 opclass = get_index_column_opclass(index_oid, attno);
1298 if (!OidIsValid(opclass))
1299 {
1300 *isnull = true;
1301 return true;
1302 }
1303
1304 /* Now look up the opclass family and input datatype. */
1305 if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1306 {
1307 *isnull = true;
1308 return true;
1309 }
1310
1311 /* And now we can check whether the operator is provided. */
1312 catlist = SearchSysCacheList1(AMOPSTRATEGY,
1313 ObjectIdGetDatum(opfamily));
1314
1315 *res = false;
1316
1317 for (i = 0; i < catlist->n_members; i++)
1318 {
1319 HeapTuple amoptup = &catlist->members[i]->tuple;
1320 Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
1321
1322 if (amopform->amoppurpose == AMOP_ORDER &&
1323 (amopform->amoplefttype == opcintype ||
1324 amopform->amoprighttype == opcintype) &&
1325 opfamily_can_sort_type(amopform->amopsortfamily,
1326 get_op_rettype(amopform->amopopr)))
1327 {
1328 *res = true;
1329 break;
1330 }
1331 }
1332
1333 ReleaseSysCacheList(catlist);
1334
1335 *isnull = false;
1336
1337 return true;
1338 }
1339