1 /*-------------------------------------------------------------------------
2  *
3  * spgutils.c
4  *	  various support functions for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/amvalidate.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "access/spgist_private.h"
22 #include "access/transam.h"
23 #include "access/xact.h"
24 #include "catalog/pg_amop.h"
25 #include "storage/bufmgr.h"
26 #include "storage/indexfsm.h"
27 #include "storage/lmgr.h"
28 #include "utils/builtins.h"
29 #include "utils/catcache.h"
30 #include "utils/index_selfuncs.h"
31 #include "utils/lsyscache.h"
32 #include "utils/syscache.h"
33 
34 
35 /*
36  * SP-GiST handler function: return IndexAmRoutine with access method parameters
37  * and callbacks.
38  */
39 Datum
40 spghandler(PG_FUNCTION_ARGS)
41 {
42 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
43 
44 	amroutine->amstrategies = 0;
45 	amroutine->amsupport = SPGISTNProc;
46 	amroutine->amcanorder = false;
47 	amroutine->amcanorderbyop = true;
48 	amroutine->amcanbackward = false;
49 	amroutine->amcanunique = false;
50 	amroutine->amcanmulticol = false;
51 	amroutine->amoptionalkey = true;
52 	amroutine->amsearcharray = false;
53 	amroutine->amsearchnulls = true;
54 	amroutine->amstorage = false;
55 	amroutine->amclusterable = false;
56 	amroutine->ampredlocks = false;
57 	amroutine->amcanparallel = false;
58 	amroutine->amcaninclude = false;
59 	amroutine->amkeytype = InvalidOid;
60 
61 	amroutine->ambuild = spgbuild;
62 	amroutine->ambuildempty = spgbuildempty;
63 	amroutine->aminsert = spginsert;
64 	amroutine->ambulkdelete = spgbulkdelete;
65 	amroutine->amvacuumcleanup = spgvacuumcleanup;
66 	amroutine->amcanreturn = spgcanreturn;
67 	amroutine->amcostestimate = spgcostestimate;
68 	amroutine->amoptions = spgoptions;
69 	amroutine->amproperty = spgproperty;
70 	amroutine->ambuildphasename = NULL;
71 	amroutine->amvalidate = spgvalidate;
72 	amroutine->ambeginscan = spgbeginscan;
73 	amroutine->amrescan = spgrescan;
74 	amroutine->amgettuple = spggettuple;
75 	amroutine->amgetbitmap = spggetbitmap;
76 	amroutine->amendscan = spgendscan;
77 	amroutine->ammarkpos = NULL;
78 	amroutine->amrestrpos = NULL;
79 	amroutine->amestimateparallelscan = NULL;
80 	amroutine->aminitparallelscan = NULL;
81 	amroutine->amparallelrescan = NULL;
82 
83 	PG_RETURN_POINTER(amroutine);
84 }
85 
86 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
87 static void
88 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
89 {
90 	desc->type = type;
91 	get_typlenbyval(type, &desc->attlen, &desc->attbyval);
92 }
93 
94 /*
95  * Fetch local cache of AM-specific info about the index, initializing it
96  * if necessary
97  */
98 SpGistCache *
99 spgGetCache(Relation index)
100 {
101 	SpGistCache *cache;
102 
103 	if (index->rd_amcache == NULL)
104 	{
105 		Oid			atttype;
106 		spgConfigIn in;
107 		FmgrInfo   *procinfo;
108 		Buffer		metabuffer;
109 		SpGistMetaPageData *metadata;
110 
111 		cache = MemoryContextAllocZero(index->rd_indexcxt,
112 									   sizeof(SpGistCache));
113 
114 		/* SPGiST doesn't support multi-column indexes */
115 		Assert(index->rd_att->natts == 1);
116 
117 		/*
118 		 * Get the actual data type of the indexed column from the index
119 		 * tupdesc.  We pass this to the opclass config function so that
120 		 * polymorphic opclasses are possible.
121 		 */
122 		atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
123 
124 		/* Call the config function to get config info for the opclass */
125 		in.attType = atttype;
126 
127 		procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
128 		FunctionCall2Coll(procinfo,
129 						  index->rd_indcollation[0],
130 						  PointerGetDatum(&in),
131 						  PointerGetDatum(&cache->config));
132 
133 		/* Get the information we need about each relevant datatype */
134 		fillTypeDesc(&cache->attType, atttype);
135 
136 		if (OidIsValid(cache->config.leafType) &&
137 			cache->config.leafType != atttype)
138 		{
139 			if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
140 				ereport(ERROR,
141 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
142 						 errmsg("compress method must be defined when leaf type is different from input type")));
143 
144 			fillTypeDesc(&cache->attLeafType, cache->config.leafType);
145 		}
146 		else
147 		{
148 			cache->attLeafType = cache->attType;
149 		}
150 
151 		fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
152 		fillTypeDesc(&cache->attLabelType, cache->config.labelType);
153 
154 		/* Last, get the lastUsedPages data from the metapage */
155 		metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
156 		LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
157 
158 		metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
159 
160 		if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
161 			elog(ERROR, "index \"%s\" is not an SP-GiST index",
162 				 RelationGetRelationName(index));
163 
164 		cache->lastUsedPages = metadata->lastUsedPages;
165 
166 		UnlockReleaseBuffer(metabuffer);
167 
168 		index->rd_amcache = (void *) cache;
169 	}
170 	else
171 	{
172 		/* assume it's up to date */
173 		cache = (SpGistCache *) index->rd_amcache;
174 	}
175 
176 	return cache;
177 }
178 
179 /* Initialize SpGistState for working with the given index */
180 void
181 initSpGistState(SpGistState *state, Relation index)
182 {
183 	SpGistCache *cache;
184 
185 	/* Get cached static information about index */
186 	cache = spgGetCache(index);
187 
188 	state->config = cache->config;
189 	state->attType = cache->attType;
190 	state->attLeafType = cache->attLeafType;
191 	state->attPrefixType = cache->attPrefixType;
192 	state->attLabelType = cache->attLabelType;
193 
194 	/* Make workspace for constructing dead tuples */
195 	state->deadTupleStorage = palloc0(SGDTSIZE);
196 
197 	/* Set XID to use in redirection tuples */
198 	state->myXid = GetTopTransactionIdIfAny();
199 
200 	/* Assume we're not in an index build (spgbuild will override) */
201 	state->isBuild = false;
202 }
203 
204 /*
205  * Allocate a new page (either by recycling, or by extending the index file).
206  *
207  * The returned buffer is already pinned and exclusive-locked.
208  * Caller is responsible for initializing the page by calling SpGistInitBuffer.
209  */
210 Buffer
211 SpGistNewBuffer(Relation index)
212 {
213 	Buffer		buffer;
214 	bool		needLock;
215 
216 	/* First, try to get a page from FSM */
217 	for (;;)
218 	{
219 		BlockNumber blkno = GetFreeIndexPage(index);
220 
221 		if (blkno == InvalidBlockNumber)
222 			break;				/* nothing known to FSM */
223 
224 		/*
225 		 * The fixed pages shouldn't ever be listed in FSM, but just in case
226 		 * one is, ignore it.
227 		 */
228 		if (SpGistBlockIsFixed(blkno))
229 			continue;
230 
231 		buffer = ReadBuffer(index, blkno);
232 
233 		/*
234 		 * We have to guard against the possibility that someone else already
235 		 * recycled this page; the buffer may be locked if so.
236 		 */
237 		if (ConditionalLockBuffer(buffer))
238 		{
239 			Page		page = BufferGetPage(buffer);
240 
241 			if (PageIsNew(page))
242 				return buffer;	/* OK to use, if never initialized */
243 
244 			if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
245 				return buffer;	/* OK to use */
246 
247 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
248 		}
249 
250 		/* Can't use it, so release buffer and try again */
251 		ReleaseBuffer(buffer);
252 	}
253 
254 	/* Must extend the file */
255 	needLock = !RELATION_IS_LOCAL(index);
256 	if (needLock)
257 		LockRelationForExtension(index, ExclusiveLock);
258 
259 	buffer = ReadBuffer(index, P_NEW);
260 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
261 
262 	if (needLock)
263 		UnlockRelationForExtension(index, ExclusiveLock);
264 
265 	return buffer;
266 }
267 
268 /*
269  * Update index metapage's lastUsedPages info from local cache, if possible
270  *
271  * Updating meta page isn't critical for index working, so
272  * 1 use ConditionalLockBuffer to improve concurrency
273  * 2 don't WAL-log metabuffer changes to decrease WAL traffic
274  */
275 void
276 SpGistUpdateMetaPage(Relation index)
277 {
278 	SpGistCache *cache = (SpGistCache *) index->rd_amcache;
279 
280 	if (cache != NULL)
281 	{
282 		Buffer		metabuffer;
283 
284 		metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
285 
286 		if (ConditionalLockBuffer(metabuffer))
287 		{
288 			Page		metapage = BufferGetPage(metabuffer);
289 			SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
290 
291 			metadata->lastUsedPages = cache->lastUsedPages;
292 
293 			/*
294 			 * Set pd_lower just past the end of the metadata.  This is
295 			 * essential, because without doing so, metadata will be lost if
296 			 * xlog.c compresses the page.  (We must do this here because
297 			 * pre-v11 versions of PG did not set the metapage's pd_lower
298 			 * correctly, so a pg_upgraded index might contain the wrong
299 			 * value.)
300 			 */
301 			((PageHeader) metapage)->pd_lower =
302 				((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
303 
304 			MarkBufferDirty(metabuffer);
305 			UnlockReleaseBuffer(metabuffer);
306 		}
307 		else
308 		{
309 			ReleaseBuffer(metabuffer);
310 		}
311 	}
312 }
313 
314 /* Macro to select proper element of lastUsedPages cache depending on flags */
315 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
316 #define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
317 
318 /*
319  * Allocate and initialize a new buffer of the type and parity specified by
320  * flags.  The returned buffer is already pinned and exclusive-locked.
321  *
322  * When requesting an inner page, if we get one with the wrong parity,
323  * we just release the buffer and try again.  We will get a different page
324  * because GetFreeIndexPage will have marked the page used in FSM.  The page
325  * is entered in our local lastUsedPages cache, so there's some hope of
326  * making use of it later in this session, but otherwise we rely on VACUUM
327  * to eventually re-enter the page in FSM, making it available for recycling.
328  * Note that such a page does not get marked dirty here, so unless it's used
329  * fairly soon, the buffer will just get discarded and the page will remain
330  * as it was on disk.
331  *
332  * When we return a buffer to the caller, the page is *not* entered into
333  * the lastUsedPages cache; we expect the caller will do so after it's taken
334  * whatever space it will use.  This is because after the caller has used up
335  * some space, the page might have less space than whatever was cached already
336  * so we'd rather not trash the old cache entry.
337  */
338 static Buffer
339 allocNewBuffer(Relation index, int flags)
340 {
341 	SpGistCache *cache = spgGetCache(index);
342 	uint16		pageflags = 0;
343 
344 	if (GBUF_REQ_LEAF(flags))
345 		pageflags |= SPGIST_LEAF;
346 	if (GBUF_REQ_NULLS(flags))
347 		pageflags |= SPGIST_NULLS;
348 
349 	for (;;)
350 	{
351 		Buffer		buffer;
352 
353 		buffer = SpGistNewBuffer(index);
354 		SpGistInitBuffer(buffer, pageflags);
355 
356 		if (pageflags & SPGIST_LEAF)
357 		{
358 			/* Leaf pages have no parity concerns, so just use it */
359 			return buffer;
360 		}
361 		else
362 		{
363 			BlockNumber blkno = BufferGetBlockNumber(buffer);
364 			int			blkFlags = GBUF_INNER_PARITY(blkno);
365 
366 			if ((flags & GBUF_PARITY_MASK) == blkFlags)
367 			{
368 				/* Page has right parity, use it */
369 				return buffer;
370 			}
371 			else
372 			{
373 				/* Page has wrong parity, record it in cache and try again */
374 				if (pageflags & SPGIST_NULLS)
375 					blkFlags |= GBUF_NULLS;
376 				cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
377 				cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
378 					PageGetExactFreeSpace(BufferGetPage(buffer));
379 				UnlockReleaseBuffer(buffer);
380 			}
381 		}
382 	}
383 }
384 
385 /*
386  * Get a buffer of the type and parity specified by flags, having at least
387  * as much free space as indicated by needSpace.  We use the lastUsedPages
388  * cache to assign the same buffer previously requested when possible.
389  * The returned buffer is already pinned and exclusive-locked.
390  *
391  * *isNew is set true if the page was initialized here, false if it was
392  * already valid.
393  */
394 Buffer
395 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
396 {
397 	SpGistCache *cache = spgGetCache(index);
398 	SpGistLastUsedPage *lup;
399 
400 	/* Bail out if even an empty page wouldn't meet the demand */
401 	if (needSpace > SPGIST_PAGE_CAPACITY)
402 		elog(ERROR, "desired SPGiST tuple size is too big");
403 
404 	/*
405 	 * If possible, increase the space request to include relation's
406 	 * fillfactor.  This ensures that when we add unrelated tuples to a page,
407 	 * we try to keep 100-fillfactor% available for adding tuples that are
408 	 * related to the ones already on it.  But fillfactor mustn't cause an
409 	 * error for requests that would otherwise be legal.
410 	 */
411 	needSpace += RelationGetTargetPageFreeSpace(index,
412 												SPGIST_DEFAULT_FILLFACTOR);
413 	needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
414 
415 	/* Get the cache entry for this flags setting */
416 	lup = GET_LUP(cache, flags);
417 
418 	/* If we have nothing cached, just turn it over to allocNewBuffer */
419 	if (lup->blkno == InvalidBlockNumber)
420 	{
421 		*isNew = true;
422 		return allocNewBuffer(index, flags);
423 	}
424 
425 	/* fixed pages should never be in cache */
426 	Assert(!SpGistBlockIsFixed(lup->blkno));
427 
428 	/* If cached freeSpace isn't enough, don't bother looking at the page */
429 	if (lup->freeSpace >= needSpace)
430 	{
431 		Buffer		buffer;
432 		Page		page;
433 
434 		buffer = ReadBuffer(index, lup->blkno);
435 
436 		if (!ConditionalLockBuffer(buffer))
437 		{
438 			/*
439 			 * buffer is locked by another process, so return a new buffer
440 			 */
441 			ReleaseBuffer(buffer);
442 			*isNew = true;
443 			return allocNewBuffer(index, flags);
444 		}
445 
446 		page = BufferGetPage(buffer);
447 
448 		if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
449 		{
450 			/* OK to initialize the page */
451 			uint16		pageflags = 0;
452 
453 			if (GBUF_REQ_LEAF(flags))
454 				pageflags |= SPGIST_LEAF;
455 			if (GBUF_REQ_NULLS(flags))
456 				pageflags |= SPGIST_NULLS;
457 			SpGistInitBuffer(buffer, pageflags);
458 			lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
459 			*isNew = true;
460 			return buffer;
461 		}
462 
463 		/*
464 		 * Check that page is of right type and has enough space.  We must
465 		 * recheck this since our cache isn't necessarily up to date.
466 		 */
467 		if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
468 			(GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
469 		{
470 			int			freeSpace = PageGetExactFreeSpace(page);
471 
472 			if (freeSpace >= needSpace)
473 			{
474 				/* Success, update freespace info and return the buffer */
475 				lup->freeSpace = freeSpace - needSpace;
476 				*isNew = false;
477 				return buffer;
478 			}
479 		}
480 
481 		/*
482 		 * fallback to allocation of new buffer
483 		 */
484 		UnlockReleaseBuffer(buffer);
485 	}
486 
487 	/* No success with cache, so return a new buffer */
488 	*isNew = true;
489 	return allocNewBuffer(index, flags);
490 }
491 
492 /*
493  * Update lastUsedPages cache when done modifying a page.
494  *
495  * We update the appropriate cache entry if it already contained this page
496  * (its freeSpace is likely obsolete), or if this page has more space than
497  * whatever we had cached.
498  */
499 void
500 SpGistSetLastUsedPage(Relation index, Buffer buffer)
501 {
502 	SpGistCache *cache = spgGetCache(index);
503 	SpGistLastUsedPage *lup;
504 	int			freeSpace;
505 	Page		page = BufferGetPage(buffer);
506 	BlockNumber blkno = BufferGetBlockNumber(buffer);
507 	int			flags;
508 
509 	/* Never enter fixed pages (root pages) in cache, though */
510 	if (SpGistBlockIsFixed(blkno))
511 		return;
512 
513 	if (SpGistPageIsLeaf(page))
514 		flags = GBUF_LEAF;
515 	else
516 		flags = GBUF_INNER_PARITY(blkno);
517 	if (SpGistPageStoresNulls(page))
518 		flags |= GBUF_NULLS;
519 
520 	lup = GET_LUP(cache, flags);
521 
522 	freeSpace = PageGetExactFreeSpace(page);
523 	if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
524 		lup->freeSpace < freeSpace)
525 	{
526 		lup->blkno = blkno;
527 		lup->freeSpace = freeSpace;
528 	}
529 }
530 
531 /*
532  * Initialize an SPGiST page to empty, with specified flags
533  */
534 void
535 SpGistInitPage(Page page, uint16 f)
536 {
537 	SpGistPageOpaque opaque;
538 
539 	PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
540 	opaque = SpGistPageGetOpaque(page);
541 	memset(opaque, 0, sizeof(SpGistPageOpaqueData));
542 	opaque->flags = f;
543 	opaque->spgist_page_id = SPGIST_PAGE_ID;
544 }
545 
546 /*
547  * Initialize a buffer's page to empty, with specified flags
548  */
549 void
550 SpGistInitBuffer(Buffer b, uint16 f)
551 {
552 	Assert(BufferGetPageSize(b) == BLCKSZ);
553 	SpGistInitPage(BufferGetPage(b), f);
554 }
555 
556 /*
557  * Initialize metadata page
558  */
559 void
560 SpGistInitMetapage(Page page)
561 {
562 	SpGistMetaPageData *metadata;
563 	int			i;
564 
565 	SpGistInitPage(page, SPGIST_META);
566 	metadata = SpGistPageGetMeta(page);
567 	memset(metadata, 0, sizeof(SpGistMetaPageData));
568 	metadata->magicNumber = SPGIST_MAGIC_NUMBER;
569 
570 	/* initialize last-used-page cache to empty */
571 	for (i = 0; i < SPGIST_CACHED_PAGES; i++)
572 		metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
573 
574 	/*
575 	 * Set pd_lower just past the end of the metadata.  This is essential,
576 	 * because without doing so, metadata will be lost if xlog.c compresses
577 	 * the page.
578 	 */
579 	((PageHeader) page)->pd_lower =
580 		((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
581 }
582 
583 /*
584  * reloptions processing for SPGiST
585  */
586 bytea *
587 spgoptions(Datum reloptions, bool validate)
588 {
589 	return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
590 }
591 
592 /*
593  * Get the space needed to store a non-null datum of the indicated type.
594  * Note the result is already rounded up to a MAXALIGN boundary.
595  * Also, we follow the SPGiST convention that pass-by-val types are
596  * just stored in their Datum representation (compare memcpyDatum).
597  */
598 unsigned int
599 SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
600 {
601 	unsigned int size;
602 
603 	if (att->attbyval)
604 		size = sizeof(Datum);
605 	else if (att->attlen > 0)
606 		size = att->attlen;
607 	else
608 		size = VARSIZE_ANY(datum);
609 
610 	return MAXALIGN(size);
611 }
612 
613 /*
614  * Copy the given non-null datum to *target
615  */
616 static void
617 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
618 {
619 	unsigned int size;
620 
621 	if (att->attbyval)
622 	{
623 		memcpy(target, &datum, sizeof(Datum));
624 	}
625 	else
626 	{
627 		size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
628 		memcpy(target, DatumGetPointer(datum), size);
629 	}
630 }
631 
632 /*
633  * Construct a leaf tuple containing the given heap TID and datum value
634  */
635 SpGistLeafTuple
636 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
637 				 Datum datum, bool isnull)
638 {
639 	SpGistLeafTuple tup;
640 	unsigned int size;
641 
642 	/* compute space needed (note result is already maxaligned) */
643 	size = SGLTHDRSZ;
644 	if (!isnull)
645 		size += SpGistGetTypeSize(&state->attLeafType, datum);
646 
647 	/*
648 	 * Ensure that we can replace the tuple with a dead tuple later.  This
649 	 * test is unnecessary when !isnull, but let's be safe.
650 	 */
651 	if (size < SGDTSIZE)
652 		size = SGDTSIZE;
653 
654 	/* OK, form the tuple */
655 	tup = (SpGistLeafTuple) palloc0(size);
656 
657 	tup->size = size;
658 	tup->nextOffset = InvalidOffsetNumber;
659 	tup->heapPtr = *heapPtr;
660 	if (!isnull)
661 		memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
662 
663 	return tup;
664 }
665 
666 /*
667  * Construct a node (to go into an inner tuple) containing the given label
668  *
669  * Note that the node's downlink is just set invalid here.  Caller will fill
670  * it in later.
671  */
672 SpGistNodeTuple
673 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
674 {
675 	SpGistNodeTuple tup;
676 	unsigned int size;
677 	unsigned short infomask = 0;
678 
679 	/* compute space needed (note result is already maxaligned) */
680 	size = SGNTHDRSZ;
681 	if (!isnull)
682 		size += SpGistGetTypeSize(&state->attLabelType, label);
683 
684 	/*
685 	 * Here we make sure that the size will fit in the field reserved for it
686 	 * in t_info.
687 	 */
688 	if ((size & INDEX_SIZE_MASK) != size)
689 		ereport(ERROR,
690 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
691 				 errmsg("index row requires %zu bytes, maximum size is %zu",
692 						(Size) size, (Size) INDEX_SIZE_MASK)));
693 
694 	tup = (SpGistNodeTuple) palloc0(size);
695 
696 	if (isnull)
697 		infomask |= INDEX_NULL_MASK;
698 	/* we don't bother setting the INDEX_VAR_MASK bit */
699 	infomask |= size;
700 	tup->t_info = infomask;
701 
702 	/* The TID field will be filled in later */
703 	ItemPointerSetInvalid(&tup->t_tid);
704 
705 	if (!isnull)
706 		memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
707 
708 	return tup;
709 }
710 
711 /*
712  * Construct an inner tuple containing the given prefix and node array
713  */
714 SpGistInnerTuple
715 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
716 				  int nNodes, SpGistNodeTuple *nodes)
717 {
718 	SpGistInnerTuple tup;
719 	unsigned int size;
720 	unsigned int prefixSize;
721 	int			i;
722 	char	   *ptr;
723 
724 	/* Compute size needed */
725 	if (hasPrefix)
726 		prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
727 	else
728 		prefixSize = 0;
729 
730 	size = SGITHDRSZ + prefixSize;
731 
732 	/* Note: we rely on node tuple sizes to be maxaligned already */
733 	for (i = 0; i < nNodes; i++)
734 		size += IndexTupleSize(nodes[i]);
735 
736 	/*
737 	 * Ensure that we can replace the tuple with a dead tuple later.  This
738 	 * test is unnecessary given current tuple layouts, but let's be safe.
739 	 */
740 	if (size < SGDTSIZE)
741 		size = SGDTSIZE;
742 
743 	/*
744 	 * Inner tuple should be small enough to fit on a page
745 	 */
746 	if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
747 		ereport(ERROR,
748 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
749 				 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
750 						(Size) size,
751 						SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
752 				 errhint("Values larger than a buffer page cannot be indexed.")));
753 
754 	/*
755 	 * Check for overflow of header fields --- probably can't fail if the
756 	 * above succeeded, but let's be paranoid
757 	 */
758 	if (size > SGITMAXSIZE ||
759 		prefixSize > SGITMAXPREFIXSIZE ||
760 		nNodes > SGITMAXNNODES)
761 		elog(ERROR, "SPGiST inner tuple header field is too small");
762 
763 	/* OK, form the tuple */
764 	tup = (SpGistInnerTuple) palloc0(size);
765 
766 	tup->nNodes = nNodes;
767 	tup->prefixSize = prefixSize;
768 	tup->size = size;
769 
770 	if (hasPrefix)
771 		memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
772 
773 	ptr = (char *) SGITNODEPTR(tup);
774 
775 	for (i = 0; i < nNodes; i++)
776 	{
777 		SpGistNodeTuple node = nodes[i];
778 
779 		memcpy(ptr, node, IndexTupleSize(node));
780 		ptr += IndexTupleSize(node);
781 	}
782 
783 	return tup;
784 }
785 
786 /*
787  * Construct a "dead" tuple to replace a tuple being deleted.
788  *
789  * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
790  * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
791  * the xid field is filled in automatically.
792  *
793  * This is called in critical sections, so we don't use palloc; the tuple
794  * is built in preallocated storage.  It should be copied before another
795  * call with different parameters can occur.
796  */
797 SpGistDeadTuple
798 spgFormDeadTuple(SpGistState *state, int tupstate,
799 				 BlockNumber blkno, OffsetNumber offnum)
800 {
801 	SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
802 
803 	tuple->tupstate = tupstate;
804 	tuple->size = SGDTSIZE;
805 	tuple->nextOffset = InvalidOffsetNumber;
806 
807 	if (tupstate == SPGIST_REDIRECT)
808 	{
809 		ItemPointerSet(&tuple->pointer, blkno, offnum);
810 		Assert(TransactionIdIsValid(state->myXid));
811 		tuple->xid = state->myXid;
812 	}
813 	else
814 	{
815 		ItemPointerSetInvalid(&tuple->pointer);
816 		tuple->xid = InvalidTransactionId;
817 	}
818 
819 	return tuple;
820 }
821 
822 /*
823  * Extract the label datums of the nodes within innerTuple
824  *
825  * Returns NULL if label datums are NULLs
826  */
827 Datum *
828 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
829 {
830 	Datum	   *nodeLabels;
831 	int			i;
832 	SpGistNodeTuple node;
833 
834 	/* Either all the labels must be NULL, or none. */
835 	node = SGITNODEPTR(innerTuple);
836 	if (IndexTupleHasNulls(node))
837 	{
838 		SGITITERATE(innerTuple, i, node)
839 		{
840 			if (!IndexTupleHasNulls(node))
841 				elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
842 		}
843 		/* They're all null, so just return NULL */
844 		return NULL;
845 	}
846 	else
847 	{
848 		nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
849 		SGITITERATE(innerTuple, i, node)
850 		{
851 			if (IndexTupleHasNulls(node))
852 				elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
853 			nodeLabels[i] = SGNTDATUM(node, state);
854 		}
855 		return nodeLabels;
856 	}
857 }
858 
859 /*
860  * Add a new item to the page, replacing a PLACEHOLDER item if possible.
861  * Return the location it's inserted at, or InvalidOffsetNumber on failure.
862  *
863  * If startOffset isn't NULL, we start searching for placeholders at
864  * *startOffset, and update that to the next place to search.  This is just
865  * an optimization for repeated insertions.
866  *
867  * If errorOK is false, we throw error when there's not enough room,
868  * rather than returning InvalidOffsetNumber.
869  */
870 OffsetNumber
871 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
872 					 OffsetNumber *startOffset, bool errorOK)
873 {
874 	SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
875 	OffsetNumber i,
876 				maxoff,
877 				offnum;
878 
879 	if (opaque->nPlaceholder > 0 &&
880 		PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
881 	{
882 		/* Try to replace a placeholder */
883 		maxoff = PageGetMaxOffsetNumber(page);
884 		offnum = InvalidOffsetNumber;
885 
886 		for (;;)
887 		{
888 			if (startOffset && *startOffset != InvalidOffsetNumber)
889 				i = *startOffset;
890 			else
891 				i = FirstOffsetNumber;
892 			for (; i <= maxoff; i++)
893 			{
894 				SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
895 																   PageGetItemId(page, i));
896 
897 				if (it->tupstate == SPGIST_PLACEHOLDER)
898 				{
899 					offnum = i;
900 					break;
901 				}
902 			}
903 
904 			/* Done if we found a placeholder */
905 			if (offnum != InvalidOffsetNumber)
906 				break;
907 
908 			if (startOffset && *startOffset != InvalidOffsetNumber)
909 			{
910 				/* Hint was no good, re-search from beginning */
911 				*startOffset = InvalidOffsetNumber;
912 				continue;
913 			}
914 
915 			/* Hmm, no placeholder found? */
916 			opaque->nPlaceholder = 0;
917 			break;
918 		}
919 
920 		if (offnum != InvalidOffsetNumber)
921 		{
922 			/* Replace the placeholder tuple */
923 			PageIndexTupleDelete(page, offnum);
924 
925 			offnum = PageAddItem(page, item, size, offnum, false, false);
926 
927 			/*
928 			 * We should not have failed given the size check at the top of
929 			 * the function, but test anyway.  If we did fail, we must PANIC
930 			 * because we've already deleted the placeholder tuple, and
931 			 * there's no other way to keep the damage from getting to disk.
932 			 */
933 			if (offnum != InvalidOffsetNumber)
934 			{
935 				Assert(opaque->nPlaceholder > 0);
936 				opaque->nPlaceholder--;
937 				if (startOffset)
938 					*startOffset = offnum + 1;
939 			}
940 			else
941 				elog(PANIC, "failed to add item of size %u to SPGiST index page",
942 					 (int) size);
943 
944 			return offnum;
945 		}
946 	}
947 
948 	/* No luck in replacing a placeholder, so just add it to the page */
949 	offnum = PageAddItem(page, item, size,
950 						 InvalidOffsetNumber, false, false);
951 
952 	if (offnum == InvalidOffsetNumber && !errorOK)
953 		elog(ERROR, "failed to add item of size %u to SPGiST index page",
954 			 (int) size);
955 
956 	return offnum;
957 }
958 
959 /*
960  *	spgproperty() -- Check boolean properties of indexes.
961  *
962  * This is optional for most AMs, but is required for SP-GiST because the core
963  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
964  */
965 bool
966 spgproperty(Oid index_oid, int attno,
967 			IndexAMProperty prop, const char *propname,
968 			bool *res, bool *isnull)
969 {
970 	Oid			opclass,
971 				opfamily,
972 				opcintype;
973 	CatCList   *catlist;
974 	int			i;
975 
976 	/* Only answer column-level inquiries */
977 	if (attno == 0)
978 		return false;
979 
980 	switch (prop)
981 	{
982 		case AMPROP_DISTANCE_ORDERABLE:
983 			break;
984 		default:
985 			return false;
986 	}
987 
988 	/*
989 	 * Currently, SP-GiST distance-ordered scans require that there be a
990 	 * distance operator in the opclass with the default types. So we assume
991 	 * that if such a operator exists, then there's a reason for it.
992 	 */
993 
994 	/* First we need to know the column's opclass. */
995 	opclass = get_index_column_opclass(index_oid, attno);
996 	if (!OidIsValid(opclass))
997 	{
998 		*isnull = true;
999 		return true;
1000 	}
1001 
1002 	/* Now look up the opclass family and input datatype. */
1003 	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1004 	{
1005 		*isnull = true;
1006 		return true;
1007 	}
1008 
1009 	/* And now we can check whether the operator is provided. */
1010 	catlist = SearchSysCacheList1(AMOPSTRATEGY,
1011 								  ObjectIdGetDatum(opfamily));
1012 
1013 	*res = false;
1014 
1015 	for (i = 0; i < catlist->n_members; i++)
1016 	{
1017 		HeapTuple	amoptup = &catlist->members[i]->tuple;
1018 		Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
1019 
1020 		if (amopform->amoppurpose == AMOP_ORDER &&
1021 			(amopform->amoplefttype == opcintype ||
1022 			 amopform->amoprighttype == opcintype) &&
1023 			opfamily_can_sort_type(amopform->amopsortfamily,
1024 								   get_op_rettype(amopform->amopopr)))
1025 		{
1026 			*res = true;
1027 			break;
1028 		}
1029 	}
1030 
1031 	ReleaseSysCacheList(catlist);
1032 
1033 	*isnull = false;
1034 
1035 	return true;
1036 }
1037