1 /*-------------------------------------------------------------------------
2  *
3  * spgutils.c
4  *	  various support functions for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/amvalidate.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "access/spgist_private.h"
22 #include "access/toast_compression.h"
23 #include "access/transam.h"
24 #include "access/xact.h"
25 #include "catalog/pg_amop.h"
26 #include "commands/vacuum.h"
27 #include "nodes/nodeFuncs.h"
28 #include "storage/bufmgr.h"
29 #include "storage/indexfsm.h"
30 #include "storage/lmgr.h"
31 #include "utils/builtins.h"
32 #include "utils/catcache.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/lsyscache.h"
35 #include "utils/syscache.h"
36 
37 
38 /*
39  * SP-GiST handler function: return IndexAmRoutine with access method parameters
40  * and callbacks.
41  */
42 Datum
spghandler(PG_FUNCTION_ARGS)43 spghandler(PG_FUNCTION_ARGS)
44 {
45 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
46 
47 	amroutine->amstrategies = 0;
48 	amroutine->amsupport = SPGISTNProc;
49 	amroutine->amoptsprocnum = SPGIST_OPTIONS_PROC;
50 	amroutine->amcanorder = false;
51 	amroutine->amcanorderbyop = true;
52 	amroutine->amcanbackward = false;
53 	amroutine->amcanunique = false;
54 	amroutine->amcanmulticol = false;
55 	amroutine->amoptionalkey = true;
56 	amroutine->amsearcharray = false;
57 	amroutine->amsearchnulls = true;
58 	amroutine->amstorage = true;
59 	amroutine->amclusterable = false;
60 	amroutine->ampredlocks = false;
61 	amroutine->amcanparallel = false;
62 	amroutine->amcaninclude = true;
63 	amroutine->amusemaintenanceworkmem = false;
64 	amroutine->amparallelvacuumoptions =
65 		VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
66 	amroutine->amkeytype = InvalidOid;
67 
68 	amroutine->ambuild = spgbuild;
69 	amroutine->ambuildempty = spgbuildempty;
70 	amroutine->aminsert = spginsert;
71 	amroutine->ambulkdelete = spgbulkdelete;
72 	amroutine->amvacuumcleanup = spgvacuumcleanup;
73 	amroutine->amcanreturn = spgcanreturn;
74 	amroutine->amcostestimate = spgcostestimate;
75 	amroutine->amoptions = spgoptions;
76 	amroutine->amproperty = spgproperty;
77 	amroutine->ambuildphasename = NULL;
78 	amroutine->amvalidate = spgvalidate;
79 	amroutine->amadjustmembers = spgadjustmembers;
80 	amroutine->ambeginscan = spgbeginscan;
81 	amroutine->amrescan = spgrescan;
82 	amroutine->amgettuple = spggettuple;
83 	amroutine->amgetbitmap = spggetbitmap;
84 	amroutine->amendscan = spgendscan;
85 	amroutine->ammarkpos = NULL;
86 	amroutine->amrestrpos = NULL;
87 	amroutine->amestimateparallelscan = NULL;
88 	amroutine->aminitparallelscan = NULL;
89 	amroutine->amparallelrescan = NULL;
90 
91 	PG_RETURN_POINTER(amroutine);
92 }
93 
94 /*
95  * GetIndexInputType
96  *		Determine the nominal input data type for an index column
97  *
98  * We define the "nominal" input type as the associated opclass's opcintype,
99  * or if that is a polymorphic type, the base type of the heap column or
100  * expression that is the index's input.  The reason for preferring the
101  * opcintype is that non-polymorphic opclasses probably don't want to hear
102  * about binary-compatible input types.  For instance, if a text opclass
103  * is being used with a varchar heap column, we want to report "text" not
104  * "varchar".  Likewise, opclasses don't want to hear about domain types,
105  * so if we do consult the actual input type, we make sure to flatten domains.
106  *
107  * At some point maybe this should go somewhere else, but it's not clear
108  * if any other index AMs have a use for it.
109  */
110 static Oid
GetIndexInputType(Relation index,AttrNumber indexcol)111 GetIndexInputType(Relation index, AttrNumber indexcol)
112 {
113 	Oid			opcintype;
114 	AttrNumber	heapcol;
115 	List	   *indexprs;
116 	ListCell   *indexpr_item;
117 
118 	Assert(index->rd_index != NULL);
119 	Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts);
120 	opcintype = index->rd_opcintype[indexcol - 1];
121 	if (!IsPolymorphicType(opcintype))
122 		return opcintype;
123 	heapcol = index->rd_index->indkey.values[indexcol - 1];
124 	if (heapcol != 0)			/* Simple index column? */
125 		return getBaseType(get_atttype(index->rd_index->indrelid, heapcol));
126 
127 	/*
128 	 * If the index expressions are already cached, skip calling
129 	 * RelationGetIndexExpressions, as it will make a copy which is overkill.
130 	 * We're not going to modify the trees, and we're not going to do anything
131 	 * that would invalidate the relcache entry before we're done.
132 	 */
133 	if (index->rd_indexprs)
134 		indexprs = index->rd_indexprs;
135 	else
136 		indexprs = RelationGetIndexExpressions(index);
137 	indexpr_item = list_head(indexprs);
138 	for (int i = 1; i <= index->rd_index->indnkeyatts; i++)
139 	{
140 		if (index->rd_index->indkey.values[i - 1] == 0)
141 		{
142 			/* expression column */
143 			if (indexpr_item == NULL)
144 				elog(ERROR, "wrong number of index expressions");
145 			if (i == indexcol)
146 				return getBaseType(exprType((Node *) lfirst(indexpr_item)));
147 			indexpr_item = lnext(indexprs, indexpr_item);
148 		}
149 	}
150 	elog(ERROR, "wrong number of index expressions");
151 	return InvalidOid;			/* keep compiler quiet */
152 }
153 
154 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
155 static void
fillTypeDesc(SpGistTypeDesc * desc,Oid type)156 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
157 {
158 	HeapTuple	tp;
159 	Form_pg_type typtup;
160 
161 	desc->type = type;
162 	tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
163 	if (!HeapTupleIsValid(tp))
164 		elog(ERROR, "cache lookup failed for type %u", type);
165 	typtup = (Form_pg_type) GETSTRUCT(tp);
166 	desc->attlen = typtup->typlen;
167 	desc->attbyval = typtup->typbyval;
168 	desc->attalign = typtup->typalign;
169 	desc->attstorage = typtup->typstorage;
170 	ReleaseSysCache(tp);
171 }
172 
173 /*
174  * Fetch local cache of AM-specific info about the index, initializing it
175  * if necessary
176  */
177 SpGistCache *
spgGetCache(Relation index)178 spgGetCache(Relation index)
179 {
180 	SpGistCache *cache;
181 
182 	if (index->rd_amcache == NULL)
183 	{
184 		Oid			atttype;
185 		spgConfigIn in;
186 		FmgrInfo   *procinfo;
187 		Buffer		metabuffer;
188 		SpGistMetaPageData *metadata;
189 
190 		cache = MemoryContextAllocZero(index->rd_indexcxt,
191 									   sizeof(SpGistCache));
192 
193 		/* SPGiST must have one key column and can also have INCLUDE columns */
194 		Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1);
195 		Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS);
196 
197 		/*
198 		 * Get the actual (well, nominal) data type of the key column.  We
199 		 * pass this to the opclass config function so that polymorphic
200 		 * opclasses are possible.
201 		 */
202 		atttype = GetIndexInputType(index, spgKeyColumn + 1);
203 
204 		/* Call the config function to get config info for the opclass */
205 		in.attType = atttype;
206 
207 		procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
208 		FunctionCall2Coll(procinfo,
209 						  index->rd_indcollation[spgKeyColumn],
210 						  PointerGetDatum(&in),
211 						  PointerGetDatum(&cache->config));
212 
213 		/*
214 		 * If leafType isn't specified, use the declared index column type,
215 		 * which index.c will have derived from the opclass's opcintype.
216 		 * (Although we now make spgvalidate.c warn if these aren't the same,
217 		 * old user-defined opclasses may not set the STORAGE parameter
218 		 * correctly, so believe leafType if it's given.)
219 		 */
220 		if (!OidIsValid(cache->config.leafType))
221 			cache->config.leafType =
222 				TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid;
223 
224 		/* Get the information we need about each relevant datatype */
225 		fillTypeDesc(&cache->attType, atttype);
226 
227 		if (cache->config.leafType != atttype)
228 		{
229 			if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
230 				ereport(ERROR,
231 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
232 						 errmsg("compress method must be defined when leaf type is different from input type")));
233 
234 			fillTypeDesc(&cache->attLeafType, cache->config.leafType);
235 		}
236 		else
237 		{
238 			/* Save lookups in this common case */
239 			cache->attLeafType = cache->attType;
240 		}
241 
242 		fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
243 		fillTypeDesc(&cache->attLabelType, cache->config.labelType);
244 
245 		/* Last, get the lastUsedPages data from the metapage */
246 		metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
247 		LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
248 
249 		metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
250 
251 		if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
252 			elog(ERROR, "index \"%s\" is not an SP-GiST index",
253 				 RelationGetRelationName(index));
254 
255 		cache->lastUsedPages = metadata->lastUsedPages;
256 
257 		UnlockReleaseBuffer(metabuffer);
258 
259 		index->rd_amcache = (void *) cache;
260 	}
261 	else
262 	{
263 		/* assume it's up to date */
264 		cache = (SpGistCache *) index->rd_amcache;
265 	}
266 
267 	return cache;
268 }
269 
270 /*
271  * Compute a tuple descriptor for leaf tuples or index-only-scan result tuples.
272  *
273  * We can use the relcache's tupdesc as-is in many cases, and it's always
274  * OK so far as any INCLUDE columns are concerned.  However, the entry for
275  * the key column has to match leafType in the first case or attType in the
276  * second case.  While the relcache's tupdesc *should* show leafType, this
277  * might not hold for legacy user-defined opclasses, since before v14 they
278  * were not allowed to declare their true storage type in CREATE OPCLASS.
279  * Also, attType can be different from what is in the relcache.
280  *
281  * This function gives back either a pointer to the relcache's tupdesc
282  * if that is suitable, or a palloc'd copy that's been adjusted to match
283  * the specified key column type.  We can avoid doing any catalog lookups
284  * here by insisting that the caller pass an SpGistTypeDesc not just an OID.
285  */
286 TupleDesc
getSpGistTupleDesc(Relation index,SpGistTypeDesc * keyType)287 getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType)
288 {
289 	TupleDesc	outTupDesc;
290 	Form_pg_attribute att;
291 
292 	if (keyType->type ==
293 		TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid)
294 		outTupDesc = RelationGetDescr(index);
295 	else
296 	{
297 		outTupDesc = CreateTupleDescCopy(RelationGetDescr(index));
298 		att = TupleDescAttr(outTupDesc, spgKeyColumn);
299 		/* It's sufficient to update the type-dependent fields of the column */
300 		att->atttypid = keyType->type;
301 		att->atttypmod = -1;
302 		att->attlen = keyType->attlen;
303 		att->attbyval = keyType->attbyval;
304 		att->attalign = keyType->attalign;
305 		att->attstorage = keyType->attstorage;
306 		/* We shouldn't need to bother with making these valid: */
307 		att->attcompression = InvalidCompressionMethod;
308 		att->attcollation = InvalidOid;
309 		/* In case we changed typlen, we'd better reset following offsets */
310 		for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++)
311 			TupleDescAttr(outTupDesc, i)->attcacheoff = -1;
312 	}
313 	return outTupDesc;
314 }
315 
316 /* Initialize SpGistState for working with the given index */
317 void
initSpGistState(SpGistState * state,Relation index)318 initSpGistState(SpGistState *state, Relation index)
319 {
320 	SpGistCache *cache;
321 
322 	state->index = index;
323 
324 	/* Get cached static information about index */
325 	cache = spgGetCache(index);
326 
327 	state->config = cache->config;
328 	state->attType = cache->attType;
329 	state->attLeafType = cache->attLeafType;
330 	state->attPrefixType = cache->attPrefixType;
331 	state->attLabelType = cache->attLabelType;
332 
333 	/* Ensure we have a valid descriptor for leaf tuples */
334 	state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType);
335 
336 	/* Make workspace for constructing dead tuples */
337 	state->deadTupleStorage = palloc0(SGDTSIZE);
338 
339 	/* Set XID to use in redirection tuples */
340 	state->myXid = GetTopTransactionIdIfAny();
341 
342 	/* Assume we're not in an index build (spgbuild will override) */
343 	state->isBuild = false;
344 }
345 
346 /*
347  * Allocate a new page (either by recycling, or by extending the index file).
348  *
349  * The returned buffer is already pinned and exclusive-locked.
350  * Caller is responsible for initializing the page by calling SpGistInitBuffer.
351  */
352 Buffer
SpGistNewBuffer(Relation index)353 SpGistNewBuffer(Relation index)
354 {
355 	Buffer		buffer;
356 	bool		needLock;
357 
358 	/* First, try to get a page from FSM */
359 	for (;;)
360 	{
361 		BlockNumber blkno = GetFreeIndexPage(index);
362 
363 		if (blkno == InvalidBlockNumber)
364 			break;				/* nothing known to FSM */
365 
366 		/*
367 		 * The fixed pages shouldn't ever be listed in FSM, but just in case
368 		 * one is, ignore it.
369 		 */
370 		if (SpGistBlockIsFixed(blkno))
371 			continue;
372 
373 		buffer = ReadBuffer(index, blkno);
374 
375 		/*
376 		 * We have to guard against the possibility that someone else already
377 		 * recycled this page; the buffer may be locked if so.
378 		 */
379 		if (ConditionalLockBuffer(buffer))
380 		{
381 			Page		page = BufferGetPage(buffer);
382 
383 			if (PageIsNew(page))
384 				return buffer;	/* OK to use, if never initialized */
385 
386 			if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
387 				return buffer;	/* OK to use */
388 
389 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
390 		}
391 
392 		/* Can't use it, so release buffer and try again */
393 		ReleaseBuffer(buffer);
394 	}
395 
396 	/* Must extend the file */
397 	needLock = !RELATION_IS_LOCAL(index);
398 	if (needLock)
399 		LockRelationForExtension(index, ExclusiveLock);
400 
401 	buffer = ReadBuffer(index, P_NEW);
402 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
403 
404 	if (needLock)
405 		UnlockRelationForExtension(index, ExclusiveLock);
406 
407 	return buffer;
408 }
409 
410 /*
411  * Update index metapage's lastUsedPages info from local cache, if possible
412  *
413  * Updating meta page isn't critical for index working, so
414  * 1 use ConditionalLockBuffer to improve concurrency
415  * 2 don't WAL-log metabuffer changes to decrease WAL traffic
416  */
417 void
SpGistUpdateMetaPage(Relation index)418 SpGistUpdateMetaPage(Relation index)
419 {
420 	SpGistCache *cache = (SpGistCache *) index->rd_amcache;
421 
422 	if (cache != NULL)
423 	{
424 		Buffer		metabuffer;
425 
426 		metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
427 
428 		if (ConditionalLockBuffer(metabuffer))
429 		{
430 			Page		metapage = BufferGetPage(metabuffer);
431 			SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
432 
433 			metadata->lastUsedPages = cache->lastUsedPages;
434 
435 			/*
436 			 * Set pd_lower just past the end of the metadata.  This is
437 			 * essential, because without doing so, metadata will be lost if
438 			 * xlog.c compresses the page.  (We must do this here because
439 			 * pre-v11 versions of PG did not set the metapage's pd_lower
440 			 * correctly, so a pg_upgraded index might contain the wrong
441 			 * value.)
442 			 */
443 			((PageHeader) metapage)->pd_lower =
444 				((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
445 
446 			MarkBufferDirty(metabuffer);
447 			UnlockReleaseBuffer(metabuffer);
448 		}
449 		else
450 		{
451 			ReleaseBuffer(metabuffer);
452 		}
453 	}
454 }
455 
456 /* Macro to select proper element of lastUsedPages cache depending on flags */
457 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
458 #define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
459 
460 /*
461  * Allocate and initialize a new buffer of the type and parity specified by
462  * flags.  The returned buffer is already pinned and exclusive-locked.
463  *
464  * When requesting an inner page, if we get one with the wrong parity,
465  * we just release the buffer and try again.  We will get a different page
466  * because GetFreeIndexPage will have marked the page used in FSM.  The page
467  * is entered in our local lastUsedPages cache, so there's some hope of
468  * making use of it later in this session, but otherwise we rely on VACUUM
469  * to eventually re-enter the page in FSM, making it available for recycling.
470  * Note that such a page does not get marked dirty here, so unless it's used
471  * fairly soon, the buffer will just get discarded and the page will remain
472  * as it was on disk.
473  *
474  * When we return a buffer to the caller, the page is *not* entered into
475  * the lastUsedPages cache; we expect the caller will do so after it's taken
476  * whatever space it will use.  This is because after the caller has used up
477  * some space, the page might have less space than whatever was cached already
478  * so we'd rather not trash the old cache entry.
479  */
480 static Buffer
allocNewBuffer(Relation index,int flags)481 allocNewBuffer(Relation index, int flags)
482 {
483 	SpGistCache *cache = spgGetCache(index);
484 	uint16		pageflags = 0;
485 
486 	if (GBUF_REQ_LEAF(flags))
487 		pageflags |= SPGIST_LEAF;
488 	if (GBUF_REQ_NULLS(flags))
489 		pageflags |= SPGIST_NULLS;
490 
491 	for (;;)
492 	{
493 		Buffer		buffer;
494 
495 		buffer = SpGistNewBuffer(index);
496 		SpGistInitBuffer(buffer, pageflags);
497 
498 		if (pageflags & SPGIST_LEAF)
499 		{
500 			/* Leaf pages have no parity concerns, so just use it */
501 			return buffer;
502 		}
503 		else
504 		{
505 			BlockNumber blkno = BufferGetBlockNumber(buffer);
506 			int			blkFlags = GBUF_INNER_PARITY(blkno);
507 
508 			if ((flags & GBUF_PARITY_MASK) == blkFlags)
509 			{
510 				/* Page has right parity, use it */
511 				return buffer;
512 			}
513 			else
514 			{
515 				/* Page has wrong parity, record it in cache and try again */
516 				if (pageflags & SPGIST_NULLS)
517 					blkFlags |= GBUF_NULLS;
518 				cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
519 				cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
520 					PageGetExactFreeSpace(BufferGetPage(buffer));
521 				UnlockReleaseBuffer(buffer);
522 			}
523 		}
524 	}
525 }
526 
527 /*
528  * Get a buffer of the type and parity specified by flags, having at least
529  * as much free space as indicated by needSpace.  We use the lastUsedPages
530  * cache to assign the same buffer previously requested when possible.
531  * The returned buffer is already pinned and exclusive-locked.
532  *
533  * *isNew is set true if the page was initialized here, false if it was
534  * already valid.
535  */
536 Buffer
SpGistGetBuffer(Relation index,int flags,int needSpace,bool * isNew)537 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
538 {
539 	SpGistCache *cache = spgGetCache(index);
540 	SpGistLastUsedPage *lup;
541 
542 	/* Bail out if even an empty page wouldn't meet the demand */
543 	if (needSpace > SPGIST_PAGE_CAPACITY)
544 		elog(ERROR, "desired SPGiST tuple size is too big");
545 
546 	/*
547 	 * If possible, increase the space request to include relation's
548 	 * fillfactor.  This ensures that when we add unrelated tuples to a page,
549 	 * we try to keep 100-fillfactor% available for adding tuples that are
550 	 * related to the ones already on it.  But fillfactor mustn't cause an
551 	 * error for requests that would otherwise be legal.
552 	 */
553 	needSpace += SpGistGetTargetPageFreeSpace(index);
554 	needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
555 
556 	/* Get the cache entry for this flags setting */
557 	lup = GET_LUP(cache, flags);
558 
559 	/* If we have nothing cached, just turn it over to allocNewBuffer */
560 	if (lup->blkno == InvalidBlockNumber)
561 	{
562 		*isNew = true;
563 		return allocNewBuffer(index, flags);
564 	}
565 
566 	/* fixed pages should never be in cache */
567 	Assert(!SpGistBlockIsFixed(lup->blkno));
568 
569 	/* If cached freeSpace isn't enough, don't bother looking at the page */
570 	if (lup->freeSpace >= needSpace)
571 	{
572 		Buffer		buffer;
573 		Page		page;
574 
575 		buffer = ReadBuffer(index, lup->blkno);
576 
577 		if (!ConditionalLockBuffer(buffer))
578 		{
579 			/*
580 			 * buffer is locked by another process, so return a new buffer
581 			 */
582 			ReleaseBuffer(buffer);
583 			*isNew = true;
584 			return allocNewBuffer(index, flags);
585 		}
586 
587 		page = BufferGetPage(buffer);
588 
589 		if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
590 		{
591 			/* OK to initialize the page */
592 			uint16		pageflags = 0;
593 
594 			if (GBUF_REQ_LEAF(flags))
595 				pageflags |= SPGIST_LEAF;
596 			if (GBUF_REQ_NULLS(flags))
597 				pageflags |= SPGIST_NULLS;
598 			SpGistInitBuffer(buffer, pageflags);
599 			lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
600 			*isNew = true;
601 			return buffer;
602 		}
603 
604 		/*
605 		 * Check that page is of right type and has enough space.  We must
606 		 * recheck this since our cache isn't necessarily up to date.
607 		 */
608 		if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
609 			(GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
610 		{
611 			int			freeSpace = PageGetExactFreeSpace(page);
612 
613 			if (freeSpace >= needSpace)
614 			{
615 				/* Success, update freespace info and return the buffer */
616 				lup->freeSpace = freeSpace - needSpace;
617 				*isNew = false;
618 				return buffer;
619 			}
620 		}
621 
622 		/*
623 		 * fallback to allocation of new buffer
624 		 */
625 		UnlockReleaseBuffer(buffer);
626 	}
627 
628 	/* No success with cache, so return a new buffer */
629 	*isNew = true;
630 	return allocNewBuffer(index, flags);
631 }
632 
633 /*
634  * Update lastUsedPages cache when done modifying a page.
635  *
636  * We update the appropriate cache entry if it already contained this page
637  * (its freeSpace is likely obsolete), or if this page has more space than
638  * whatever we had cached.
639  */
640 void
SpGistSetLastUsedPage(Relation index,Buffer buffer)641 SpGistSetLastUsedPage(Relation index, Buffer buffer)
642 {
643 	SpGistCache *cache = spgGetCache(index);
644 	SpGistLastUsedPage *lup;
645 	int			freeSpace;
646 	Page		page = BufferGetPage(buffer);
647 	BlockNumber blkno = BufferGetBlockNumber(buffer);
648 	int			flags;
649 
650 	/* Never enter fixed pages (root pages) in cache, though */
651 	if (SpGistBlockIsFixed(blkno))
652 		return;
653 
654 	if (SpGistPageIsLeaf(page))
655 		flags = GBUF_LEAF;
656 	else
657 		flags = GBUF_INNER_PARITY(blkno);
658 	if (SpGistPageStoresNulls(page))
659 		flags |= GBUF_NULLS;
660 
661 	lup = GET_LUP(cache, flags);
662 
663 	freeSpace = PageGetExactFreeSpace(page);
664 	if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
665 		lup->freeSpace < freeSpace)
666 	{
667 		lup->blkno = blkno;
668 		lup->freeSpace = freeSpace;
669 	}
670 }
671 
672 /*
673  * Initialize an SPGiST page to empty, with specified flags
674  */
675 void
SpGistInitPage(Page page,uint16 f)676 SpGistInitPage(Page page, uint16 f)
677 {
678 	SpGistPageOpaque opaque;
679 
680 	PageInit(page, BLCKSZ, sizeof(SpGistPageOpaqueData));
681 	opaque = SpGistPageGetOpaque(page);
682 	opaque->flags = f;
683 	opaque->spgist_page_id = SPGIST_PAGE_ID;
684 }
685 
686 /*
687  * Initialize a buffer's page to empty, with specified flags
688  */
689 void
SpGistInitBuffer(Buffer b,uint16 f)690 SpGistInitBuffer(Buffer b, uint16 f)
691 {
692 	Assert(BufferGetPageSize(b) == BLCKSZ);
693 	SpGistInitPage(BufferGetPage(b), f);
694 }
695 
696 /*
697  * Initialize metadata page
698  */
699 void
SpGistInitMetapage(Page page)700 SpGistInitMetapage(Page page)
701 {
702 	SpGistMetaPageData *metadata;
703 	int			i;
704 
705 	SpGistInitPage(page, SPGIST_META);
706 	metadata = SpGistPageGetMeta(page);
707 	memset(metadata, 0, sizeof(SpGistMetaPageData));
708 	metadata->magicNumber = SPGIST_MAGIC_NUMBER;
709 
710 	/* initialize last-used-page cache to empty */
711 	for (i = 0; i < SPGIST_CACHED_PAGES; i++)
712 		metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
713 
714 	/*
715 	 * Set pd_lower just past the end of the metadata.  This is essential,
716 	 * because without doing so, metadata will be lost if xlog.c compresses
717 	 * the page.
718 	 */
719 	((PageHeader) page)->pd_lower =
720 		((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
721 }
722 
723 /*
724  * reloptions processing for SPGiST
725  */
726 bytea *
spgoptions(Datum reloptions,bool validate)727 spgoptions(Datum reloptions, bool validate)
728 {
729 	static const relopt_parse_elt tab[] = {
730 		{"fillfactor", RELOPT_TYPE_INT, offsetof(SpGistOptions, fillfactor)},
731 	};
732 
733 	return (bytea *) build_reloptions(reloptions, validate,
734 									  RELOPT_KIND_SPGIST,
735 									  sizeof(SpGistOptions),
736 									  tab, lengthof(tab));
737 
738 }
739 
740 /*
741  * Get the space needed to store a non-null datum of the indicated type
742  * in an inner tuple (that is, as a prefix or node label).
743  * Note the result is already rounded up to a MAXALIGN boundary.
744  * Here we follow the convention that pass-by-val types are just stored
745  * in their Datum representation (compare memcpyInnerDatum).
746  */
747 unsigned int
SpGistGetInnerTypeSize(SpGistTypeDesc * att,Datum datum)748 SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum)
749 {
750 	unsigned int size;
751 
752 	if (att->attbyval)
753 		size = sizeof(Datum);
754 	else if (att->attlen > 0)
755 		size = att->attlen;
756 	else
757 		size = VARSIZE_ANY(datum);
758 
759 	return MAXALIGN(size);
760 }
761 
762 /*
763  * Copy the given non-null datum to *target, in the inner-tuple case
764  */
765 static void
memcpyInnerDatum(void * target,SpGistTypeDesc * att,Datum datum)766 memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum)
767 {
768 	unsigned int size;
769 
770 	if (att->attbyval)
771 	{
772 		memcpy(target, &datum, sizeof(Datum));
773 	}
774 	else
775 	{
776 		size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
777 		memcpy(target, DatumGetPointer(datum), size);
778 	}
779 }
780 
781 /*
782  * Compute space required for a leaf tuple holding the given data.
783  *
784  * This must match the size-calculation portion of spgFormLeafTuple.
785  */
786 Size
SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,Datum * datums,bool * isnulls)787 SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,
788 					   Datum *datums, bool *isnulls)
789 {
790 	Size		size;
791 	Size		data_size;
792 	bool		needs_null_mask = false;
793 	int			natts = tupleDescriptor->natts;
794 
795 	/*
796 	 * Decide whether we need a nulls bitmask.
797 	 *
798 	 * If there is only a key attribute (natts == 1), never use a bitmask, for
799 	 * compatibility with the pre-v14 layout of leaf tuples.  Otherwise, we
800 	 * need one if any attribute is null.
801 	 */
802 	if (natts > 1)
803 	{
804 		for (int i = 0; i < natts; i++)
805 		{
806 			if (isnulls[i])
807 			{
808 				needs_null_mask = true;
809 				break;
810 			}
811 		}
812 	}
813 
814 	/*
815 	 * Calculate size of the data part; same as for heap tuples.
816 	 */
817 	data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
818 
819 	/*
820 	 * Compute total size.
821 	 */
822 	size = SGLTHDRSZ(needs_null_mask);
823 	size += data_size;
824 	size = MAXALIGN(size);
825 
826 	/*
827 	 * Ensure that we can replace the tuple with a dead tuple later. This test
828 	 * is unnecessary when there are any non-null attributes, but be safe.
829 	 */
830 	if (size < SGDTSIZE)
831 		size = SGDTSIZE;
832 
833 	return size;
834 }
835 
836 /*
837  * Construct a leaf tuple containing the given heap TID and datum values
838  */
839 SpGistLeafTuple
spgFormLeafTuple(SpGistState * state,ItemPointer heapPtr,Datum * datums,bool * isnulls)840 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
841 				 Datum *datums, bool *isnulls)
842 {
843 	SpGistLeafTuple tup;
844 	TupleDesc	tupleDescriptor = state->leafTupDesc;
845 	Size		size;
846 	Size		hoff;
847 	Size		data_size;
848 	bool		needs_null_mask = false;
849 	int			natts = tupleDescriptor->natts;
850 	char	   *tp;				/* ptr to tuple data */
851 	uint16		tupmask = 0;	/* unused heap_fill_tuple output */
852 
853 	/*
854 	 * Decide whether we need a nulls bitmask.
855 	 *
856 	 * If there is only a key attribute (natts == 1), never use a bitmask, for
857 	 * compatibility with the pre-v14 layout of leaf tuples.  Otherwise, we
858 	 * need one if any attribute is null.
859 	 */
860 	if (natts > 1)
861 	{
862 		for (int i = 0; i < natts; i++)
863 		{
864 			if (isnulls[i])
865 			{
866 				needs_null_mask = true;
867 				break;
868 			}
869 		}
870 	}
871 
872 	/*
873 	 * Calculate size of the data part; same as for heap tuples.
874 	 */
875 	data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
876 
877 	/*
878 	 * Compute total size.
879 	 */
880 	hoff = SGLTHDRSZ(needs_null_mask);
881 	size = hoff + data_size;
882 	size = MAXALIGN(size);
883 
884 	/*
885 	 * Ensure that we can replace the tuple with a dead tuple later. This test
886 	 * is unnecessary when there are any non-null attributes, but be safe.
887 	 */
888 	if (size < SGDTSIZE)
889 		size = SGDTSIZE;
890 
891 	/* OK, form the tuple */
892 	tup = (SpGistLeafTuple) palloc0(size);
893 
894 	tup->size = size;
895 	SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber);
896 	tup->heapPtr = *heapPtr;
897 
898 	tp = (char *) tup + hoff;
899 
900 	if (needs_null_mask)
901 	{
902 		bits8	   *bp;			/* ptr to null bitmap in tuple */
903 
904 		/* Set nullmask presence bit in SpGistLeafTuple header */
905 		SGLT_SET_HASNULLMASK(tup, true);
906 		/* Fill the data area and null mask */
907 		bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
908 		heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
909 						&tupmask, bp);
910 	}
911 	else if (natts > 1 || !isnulls[spgKeyColumn])
912 	{
913 		/* Fill data area only */
914 		heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
915 						&tupmask, (bits8 *) NULL);
916 	}
917 	/* otherwise we have no data, nor a bitmap, to fill */
918 
919 	return tup;
920 }
921 
922 /*
923  * Construct a node (to go into an inner tuple) containing the given label
924  *
925  * Note that the node's downlink is just set invalid here.  Caller will fill
926  * it in later.
927  */
928 SpGistNodeTuple
spgFormNodeTuple(SpGistState * state,Datum label,bool isnull)929 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
930 {
931 	SpGistNodeTuple tup;
932 	unsigned int size;
933 	unsigned short infomask = 0;
934 
935 	/* compute space needed (note result is already maxaligned) */
936 	size = SGNTHDRSZ;
937 	if (!isnull)
938 		size += SpGistGetInnerTypeSize(&state->attLabelType, label);
939 
940 	/*
941 	 * Here we make sure that the size will fit in the field reserved for it
942 	 * in t_info.
943 	 */
944 	if ((size & INDEX_SIZE_MASK) != size)
945 		ereport(ERROR,
946 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
947 				 errmsg("index row requires %zu bytes, maximum size is %zu",
948 						(Size) size, (Size) INDEX_SIZE_MASK)));
949 
950 	tup = (SpGistNodeTuple) palloc0(size);
951 
952 	if (isnull)
953 		infomask |= INDEX_NULL_MASK;
954 	/* we don't bother setting the INDEX_VAR_MASK bit */
955 	infomask |= size;
956 	tup->t_info = infomask;
957 
958 	/* The TID field will be filled in later */
959 	ItemPointerSetInvalid(&tup->t_tid);
960 
961 	if (!isnull)
962 		memcpyInnerDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
963 
964 	return tup;
965 }
966 
967 /*
968  * Construct an inner tuple containing the given prefix and node array
969  */
970 SpGistInnerTuple
spgFormInnerTuple(SpGistState * state,bool hasPrefix,Datum prefix,int nNodes,SpGistNodeTuple * nodes)971 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
972 				  int nNodes, SpGistNodeTuple *nodes)
973 {
974 	SpGistInnerTuple tup;
975 	unsigned int size;
976 	unsigned int prefixSize;
977 	int			i;
978 	char	   *ptr;
979 
980 	/* Compute size needed */
981 	if (hasPrefix)
982 		prefixSize = SpGistGetInnerTypeSize(&state->attPrefixType, prefix);
983 	else
984 		prefixSize = 0;
985 
986 	size = SGITHDRSZ + prefixSize;
987 
988 	/* Note: we rely on node tuple sizes to be maxaligned already */
989 	for (i = 0; i < nNodes; i++)
990 		size += IndexTupleSize(nodes[i]);
991 
992 	/*
993 	 * Ensure that we can replace the tuple with a dead tuple later.  This
994 	 * test is unnecessary given current tuple layouts, but let's be safe.
995 	 */
996 	if (size < SGDTSIZE)
997 		size = SGDTSIZE;
998 
999 	/*
1000 	 * Inner tuple should be small enough to fit on a page
1001 	 */
1002 	if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
1003 		ereport(ERROR,
1004 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1005 				 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
1006 						(Size) size,
1007 						SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1008 				 errhint("Values larger than a buffer page cannot be indexed.")));
1009 
1010 	/*
1011 	 * Check for overflow of header fields --- probably can't fail if the
1012 	 * above succeeded, but let's be paranoid
1013 	 */
1014 	if (size > SGITMAXSIZE ||
1015 		prefixSize > SGITMAXPREFIXSIZE ||
1016 		nNodes > SGITMAXNNODES)
1017 		elog(ERROR, "SPGiST inner tuple header field is too small");
1018 
1019 	/* OK, form the tuple */
1020 	tup = (SpGistInnerTuple) palloc0(size);
1021 
1022 	tup->nNodes = nNodes;
1023 	tup->prefixSize = prefixSize;
1024 	tup->size = size;
1025 
1026 	if (hasPrefix)
1027 		memcpyInnerDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
1028 
1029 	ptr = (char *) SGITNODEPTR(tup);
1030 
1031 	for (i = 0; i < nNodes; i++)
1032 	{
1033 		SpGistNodeTuple node = nodes[i];
1034 
1035 		memcpy(ptr, node, IndexTupleSize(node));
1036 		ptr += IndexTupleSize(node);
1037 	}
1038 
1039 	return tup;
1040 }
1041 
1042 /*
1043  * Construct a "dead" tuple to replace a tuple being deleted.
1044  *
1045  * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
1046  * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
1047  * the xid field is filled in automatically.
1048  *
1049  * This is called in critical sections, so we don't use palloc; the tuple
1050  * is built in preallocated storage.  It should be copied before another
1051  * call with different parameters can occur.
1052  */
1053 SpGistDeadTuple
spgFormDeadTuple(SpGistState * state,int tupstate,BlockNumber blkno,OffsetNumber offnum)1054 spgFormDeadTuple(SpGistState *state, int tupstate,
1055 				 BlockNumber blkno, OffsetNumber offnum)
1056 {
1057 	SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
1058 
1059 	tuple->tupstate = tupstate;
1060 	tuple->size = SGDTSIZE;
1061 	SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber);
1062 
1063 	if (tupstate == SPGIST_REDIRECT)
1064 	{
1065 		ItemPointerSet(&tuple->pointer, blkno, offnum);
1066 		Assert(TransactionIdIsValid(state->myXid));
1067 		tuple->xid = state->myXid;
1068 	}
1069 	else
1070 	{
1071 		ItemPointerSetInvalid(&tuple->pointer);
1072 		tuple->xid = InvalidTransactionId;
1073 	}
1074 
1075 	return tuple;
1076 }
1077 
1078 /*
1079  * Convert an SPGiST leaf tuple into Datum/isnull arrays.
1080  *
1081  * The caller must allocate sufficient storage for the output arrays.
1082  * (INDEX_MAX_KEYS entries should be enough.)
1083  */
1084 void
spgDeformLeafTuple(SpGistLeafTuple tup,TupleDesc tupleDescriptor,Datum * datums,bool * isnulls,bool keyColumnIsNull)1085 spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor,
1086 				   Datum *datums, bool *isnulls, bool keyColumnIsNull)
1087 {
1088 	bool		hasNullsMask = SGLT_GET_HASNULLMASK(tup);
1089 	char	   *tp;				/* ptr to tuple data */
1090 	bits8	   *bp;				/* ptr to null bitmap in tuple */
1091 
1092 	if (keyColumnIsNull && tupleDescriptor->natts == 1)
1093 	{
1094 		/*
1095 		 * Trivial case: there is only the key attribute and we're in a nulls
1096 		 * tree.  The hasNullsMask bit in the tuple header should not be set
1097 		 * (and thus we can't use index_deform_tuple_internal), but
1098 		 * nonetheless the result is NULL.
1099 		 *
1100 		 * Note: currently this is dead code, because noplace calls this when
1101 		 * there is only the key attribute.  But we should cover the case.
1102 		 */
1103 		Assert(!hasNullsMask);
1104 
1105 		datums[spgKeyColumn] = (Datum) 0;
1106 		isnulls[spgKeyColumn] = true;
1107 		return;
1108 	}
1109 
1110 	tp = (char *) tup + SGLTHDRSZ(hasNullsMask);
1111 	bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
1112 
1113 	index_deform_tuple_internal(tupleDescriptor,
1114 								datums, isnulls,
1115 								tp, bp, hasNullsMask);
1116 
1117 	/*
1118 	 * Key column isnull value from the tuple should be consistent with
1119 	 * keyColumnIsNull flag from the caller.
1120 	 */
1121 	Assert(keyColumnIsNull == isnulls[spgKeyColumn]);
1122 }
1123 
1124 /*
1125  * Extract the label datums of the nodes within innerTuple
1126  *
1127  * Returns NULL if label datums are NULLs
1128  */
1129 Datum *
spgExtractNodeLabels(SpGistState * state,SpGistInnerTuple innerTuple)1130 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
1131 {
1132 	Datum	   *nodeLabels;
1133 	int			i;
1134 	SpGistNodeTuple node;
1135 
1136 	/* Either all the labels must be NULL, or none. */
1137 	node = SGITNODEPTR(innerTuple);
1138 	if (IndexTupleHasNulls(node))
1139 	{
1140 		SGITITERATE(innerTuple, i, node)
1141 		{
1142 			if (!IndexTupleHasNulls(node))
1143 				elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
1144 		}
1145 		/* They're all null, so just return NULL */
1146 		return NULL;
1147 	}
1148 	else
1149 	{
1150 		nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
1151 		SGITITERATE(innerTuple, i, node)
1152 		{
1153 			if (IndexTupleHasNulls(node))
1154 				elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
1155 			nodeLabels[i] = SGNTDATUM(node, state);
1156 		}
1157 		return nodeLabels;
1158 	}
1159 }
1160 
1161 /*
1162  * Add a new item to the page, replacing a PLACEHOLDER item if possible.
1163  * Return the location it's inserted at, or InvalidOffsetNumber on failure.
1164  *
1165  * If startOffset isn't NULL, we start searching for placeholders at
1166  * *startOffset, and update that to the next place to search.  This is just
1167  * an optimization for repeated insertions.
1168  *
1169  * If errorOK is false, we throw error when there's not enough room,
1170  * rather than returning InvalidOffsetNumber.
1171  */
1172 OffsetNumber
SpGistPageAddNewItem(SpGistState * state,Page page,Item item,Size size,OffsetNumber * startOffset,bool errorOK)1173 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
1174 					 OffsetNumber *startOffset, bool errorOK)
1175 {
1176 	SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
1177 	OffsetNumber i,
1178 				maxoff,
1179 				offnum;
1180 
1181 	if (opaque->nPlaceholder > 0 &&
1182 		PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
1183 	{
1184 		/* Try to replace a placeholder */
1185 		maxoff = PageGetMaxOffsetNumber(page);
1186 		offnum = InvalidOffsetNumber;
1187 
1188 		for (;;)
1189 		{
1190 			if (startOffset && *startOffset != InvalidOffsetNumber)
1191 				i = *startOffset;
1192 			else
1193 				i = FirstOffsetNumber;
1194 			for (; i <= maxoff; i++)
1195 			{
1196 				SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
1197 																   PageGetItemId(page, i));
1198 
1199 				if (it->tupstate == SPGIST_PLACEHOLDER)
1200 				{
1201 					offnum = i;
1202 					break;
1203 				}
1204 			}
1205 
1206 			/* Done if we found a placeholder */
1207 			if (offnum != InvalidOffsetNumber)
1208 				break;
1209 
1210 			if (startOffset && *startOffset != InvalidOffsetNumber)
1211 			{
1212 				/* Hint was no good, re-search from beginning */
1213 				*startOffset = InvalidOffsetNumber;
1214 				continue;
1215 			}
1216 
1217 			/* Hmm, no placeholder found? */
1218 			opaque->nPlaceholder = 0;
1219 			break;
1220 		}
1221 
1222 		if (offnum != InvalidOffsetNumber)
1223 		{
1224 			/* Replace the placeholder tuple */
1225 			PageIndexTupleDelete(page, offnum);
1226 
1227 			offnum = PageAddItem(page, item, size, offnum, false, false);
1228 
1229 			/*
1230 			 * We should not have failed given the size check at the top of
1231 			 * the function, but test anyway.  If we did fail, we must PANIC
1232 			 * because we've already deleted the placeholder tuple, and
1233 			 * there's no other way to keep the damage from getting to disk.
1234 			 */
1235 			if (offnum != InvalidOffsetNumber)
1236 			{
1237 				Assert(opaque->nPlaceholder > 0);
1238 				opaque->nPlaceholder--;
1239 				if (startOffset)
1240 					*startOffset = offnum + 1;
1241 			}
1242 			else
1243 				elog(PANIC, "failed to add item of size %u to SPGiST index page",
1244 					 (int) size);
1245 
1246 			return offnum;
1247 		}
1248 	}
1249 
1250 	/* No luck in replacing a placeholder, so just add it to the page */
1251 	offnum = PageAddItem(page, item, size,
1252 						 InvalidOffsetNumber, false, false);
1253 
1254 	if (offnum == InvalidOffsetNumber && !errorOK)
1255 		elog(ERROR, "failed to add item of size %u to SPGiST index page",
1256 			 (int) size);
1257 
1258 	return offnum;
1259 }
1260 
1261 /*
1262  *	spgproperty() -- Check boolean properties of indexes.
1263  *
1264  * This is optional for most AMs, but is required for SP-GiST because the core
1265  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
1266  */
1267 bool
spgproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)1268 spgproperty(Oid index_oid, int attno,
1269 			IndexAMProperty prop, const char *propname,
1270 			bool *res, bool *isnull)
1271 {
1272 	Oid			opclass,
1273 				opfamily,
1274 				opcintype;
1275 	CatCList   *catlist;
1276 	int			i;
1277 
1278 	/* Only answer column-level inquiries */
1279 	if (attno == 0)
1280 		return false;
1281 
1282 	switch (prop)
1283 	{
1284 		case AMPROP_DISTANCE_ORDERABLE:
1285 			break;
1286 		default:
1287 			return false;
1288 	}
1289 
1290 	/*
1291 	 * Currently, SP-GiST distance-ordered scans require that there be a
1292 	 * distance operator in the opclass with the default types. So we assume
1293 	 * that if such a operator exists, then there's a reason for it.
1294 	 */
1295 
1296 	/* First we need to know the column's opclass. */
1297 	opclass = get_index_column_opclass(index_oid, attno);
1298 	if (!OidIsValid(opclass))
1299 	{
1300 		*isnull = true;
1301 		return true;
1302 	}
1303 
1304 	/* Now look up the opclass family and input datatype. */
1305 	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1306 	{
1307 		*isnull = true;
1308 		return true;
1309 	}
1310 
1311 	/* And now we can check whether the operator is provided. */
1312 	catlist = SearchSysCacheList1(AMOPSTRATEGY,
1313 								  ObjectIdGetDatum(opfamily));
1314 
1315 	*res = false;
1316 
1317 	for (i = 0; i < catlist->n_members; i++)
1318 	{
1319 		HeapTuple	amoptup = &catlist->members[i]->tuple;
1320 		Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
1321 
1322 		if (amopform->amoppurpose == AMOP_ORDER &&
1323 			(amopform->amoplefttype == opcintype ||
1324 			 amopform->amoprighttype == opcintype) &&
1325 			opfamily_can_sort_type(amopform->amopsortfamily,
1326 								   get_op_rettype(amopform->amopopr)))
1327 		{
1328 			*res = true;
1329 			break;
1330 		}
1331 	}
1332 
1333 	ReleaseSysCacheList(catlist);
1334 
1335 	*isnull = false;
1336 
1337 	return true;
1338 }
1339