1 /*-------------------------------------------------------------------------
2  *
3  * gistutil.c
4  *	  utilities routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gist/gistutil.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include <math.h>
17 
18 #include "access/gist_private.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "catalog/pg_opclass.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 #include "utils/float.h"
25 #include "utils/lsyscache.h"
26 #include "utils/snapmgr.h"
27 #include "utils/syscache.h"
28 
29 /*
30  * Write itup vector to page, has no control of free space.
31  */
32 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)33 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
34 {
35 	int			i;
36 
37 	if (off == InvalidOffsetNumber)
38 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
39 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
40 
41 	for (i = 0; i < len; i++)
42 	{
43 		Size		sz = IndexTupleSize(itup[i]);
44 		OffsetNumber l;
45 
46 		l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
47 		if (l == InvalidOffsetNumber)
48 			elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
49 				 i, len, (int) sz);
50 		off++;
51 	}
52 }
53 
54 /*
55  * Check space for itup vector on page
56  */
57 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)58 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
59 {
60 	unsigned int size = freespace,
61 				deleted = 0;
62 	int			i;
63 
64 	for (i = 0; i < len; i++)
65 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
66 
67 	if (todelete != InvalidOffsetNumber)
68 	{
69 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
70 
71 		deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
72 	}
73 
74 	return (PageGetFreeSpace(page) + deleted < size);
75 }
76 
77 bool
gistfitpage(IndexTuple * itvec,int len)78 gistfitpage(IndexTuple *itvec, int len)
79 {
80 	int			i;
81 	Size		size = 0;
82 
83 	for (i = 0; i < len; i++)
84 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
85 
86 	/* TODO: Consider fillfactor */
87 	return (size <= GiSTPageSize);
88 }
89 
90 /*
91  * Read buffer into itup vector
92  */
93 IndexTuple *
gistextractpage(Page page,int * len)94 gistextractpage(Page page, int *len /* out */ )
95 {
96 	OffsetNumber i,
97 				maxoff;
98 	IndexTuple *itvec;
99 
100 	maxoff = PageGetMaxOffsetNumber(page);
101 	*len = maxoff;
102 	itvec = palloc(sizeof(IndexTuple) * maxoff);
103 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
104 		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
105 
106 	return itvec;
107 }
108 
109 /*
110  * join two vectors into one
111  */
112 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)113 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
114 {
115 	itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
116 	memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
117 	*len += addlen;
118 	return itvec;
119 }
120 
121 /*
122  * make plain IndexTuple vector
123  */
124 
125 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)126 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
127 {
128 	char	   *ptr,
129 			   *ret;
130 	int			i;
131 
132 	*memlen = 0;
133 
134 	for (i = 0; i < veclen; i++)
135 		*memlen += IndexTupleSize(vec[i]);
136 
137 	ptr = ret = palloc(*memlen);
138 
139 	for (i = 0; i < veclen; i++)
140 	{
141 		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
142 		ptr += IndexTupleSize(vec[i]);
143 	}
144 
145 	return (IndexTupleData *) ret;
146 }
147 
148 /*
149  * Make unions of keys in IndexTuple vector (one union datum per index column).
150  * Union Datums are returned into the attr/isnull arrays.
151  * Resulting Datums aren't compressed.
152  */
153 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)154 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
155 				   Datum *attr, bool *isnull)
156 {
157 	int			i;
158 	GistEntryVector *evec;
159 	int			attrsize;
160 
161 	evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
162 
163 	for (i = 0; i < giststate->nonLeafTupdesc->natts; i++)
164 	{
165 		int			j;
166 
167 		/* Collect non-null datums for this column */
168 		evec->n = 0;
169 		for (j = 0; j < len; j++)
170 		{
171 			Datum		datum;
172 			bool		IsNull;
173 
174 			datum = index_getattr(itvec[j], i + 1, giststate->leafTupdesc,
175 								  &IsNull);
176 			if (IsNull)
177 				continue;
178 
179 			gistdentryinit(giststate, i,
180 						   evec->vector + evec->n,
181 						   datum,
182 						   NULL, NULL, (OffsetNumber) 0,
183 						   false, IsNull);
184 			evec->n++;
185 		}
186 
187 		/* If this column was all NULLs, the union is NULL */
188 		if (evec->n == 0)
189 		{
190 			attr[i] = (Datum) 0;
191 			isnull[i] = true;
192 		}
193 		else
194 		{
195 			if (evec->n == 1)
196 			{
197 				/* unionFn may expect at least two inputs */
198 				evec->n = 2;
199 				evec->vector[1] = evec->vector[0];
200 			}
201 
202 			/* Make union and store in attr array */
203 			attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
204 										giststate->supportCollation[i],
205 										PointerGetDatum(evec),
206 										PointerGetDatum(&attrsize));
207 
208 			isnull[i] = false;
209 		}
210 	}
211 }
212 
213 /*
214  * Return an IndexTuple containing the result of applying the "union"
215  * method to the specified IndexTuple vector.
216  */
217 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)218 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
219 {
220 	Datum		attr[INDEX_MAX_KEYS];
221 	bool		isnull[INDEX_MAX_KEYS];
222 
223 	gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
224 
225 	return gistFormTuple(giststate, r, attr, isnull, false);
226 }
227 
228 /*
229  * makes union of two key
230  */
231 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)232 gistMakeUnionKey(GISTSTATE *giststate, int attno,
233 				 GISTENTRY *entry1, bool isnull1,
234 				 GISTENTRY *entry2, bool isnull2,
235 				 Datum *dst, bool *dstisnull)
236 {
237 	/* we need a GistEntryVector with room for exactly 2 elements */
238 	union
239 	{
240 		GistEntryVector gev;
241 		char		padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
242 	}			storage;
243 	GistEntryVector *evec = &storage.gev;
244 	int			dstsize;
245 
246 	evec->n = 2;
247 
248 	if (isnull1 && isnull2)
249 	{
250 		*dstisnull = true;
251 		*dst = (Datum) 0;
252 	}
253 	else
254 	{
255 		if (isnull1 == false && isnull2 == false)
256 		{
257 			evec->vector[0] = *entry1;
258 			evec->vector[1] = *entry2;
259 		}
260 		else if (isnull1 == false)
261 		{
262 			evec->vector[0] = *entry1;
263 			evec->vector[1] = *entry1;
264 		}
265 		else
266 		{
267 			evec->vector[0] = *entry2;
268 			evec->vector[1] = *entry2;
269 		}
270 
271 		*dstisnull = false;
272 		*dst = FunctionCall2Coll(&giststate->unionFn[attno],
273 								 giststate->supportCollation[attno],
274 								 PointerGetDatum(evec),
275 								 PointerGetDatum(&dstsize));
276 	}
277 }
278 
279 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)280 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
281 {
282 	bool		result;
283 
284 	FunctionCall3Coll(&giststate->equalFn[attno],
285 					  giststate->supportCollation[attno],
286 					  a, b,
287 					  PointerGetDatum(&result));
288 	return result;
289 }
290 
291 /*
292  * Decompress all keys in tuple
293  */
294 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)295 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
296 				  OffsetNumber o, GISTENTRY *attdata, bool *isnull)
297 {
298 	int			i;
299 
300 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
301 	{
302 		Datum		datum;
303 
304 		datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
305 		gistdentryinit(giststate, i, &attdata[i],
306 					   datum, r, p, o,
307 					   false, isnull[i]);
308 	}
309 }
310 
311 /*
312  * Forms union of oldtup and addtup, if union == oldtup then return NULL
313  */
314 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)315 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
316 {
317 	bool		neednew = false;
318 	GISTENTRY	oldentries[INDEX_MAX_KEYS],
319 				addentries[INDEX_MAX_KEYS];
320 	bool		oldisnull[INDEX_MAX_KEYS],
321 				addisnull[INDEX_MAX_KEYS];
322 	Datum		attr[INDEX_MAX_KEYS];
323 	bool		isnull[INDEX_MAX_KEYS];
324 	IndexTuple	newtup = NULL;
325 	int			i;
326 
327 	gistDeCompressAtt(giststate, r, oldtup, NULL,
328 					  (OffsetNumber) 0, oldentries, oldisnull);
329 
330 	gistDeCompressAtt(giststate, r, addtup, NULL,
331 					  (OffsetNumber) 0, addentries, addisnull);
332 
333 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
334 	{
335 		gistMakeUnionKey(giststate, i,
336 						 oldentries + i, oldisnull[i],
337 						 addentries + i, addisnull[i],
338 						 attr + i, isnull + i);
339 
340 		if (neednew)
341 			/* we already need new key, so we can skip check */
342 			continue;
343 
344 		if (isnull[i])
345 			/* union of key may be NULL if and only if both keys are NULL */
346 			continue;
347 
348 		if (!addisnull[i])
349 		{
350 			if (oldisnull[i] ||
351 				!gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
352 				neednew = true;
353 		}
354 	}
355 
356 	if (neednew)
357 	{
358 		/* need to update key */
359 		newtup = gistFormTuple(giststate, r, attr, isnull, false);
360 		newtup->t_tid = oldtup->t_tid;
361 	}
362 
363 	return newtup;
364 }
365 
366 /*
367  * Search an upper index page for the entry with lowest penalty for insertion
368  * of the new index key contained in "it".
369  *
370  * Returns the index of the page entry to insert into.
371  */
372 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)373 gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
374 		   GISTSTATE *giststate)
375 {
376 	OffsetNumber result;
377 	OffsetNumber maxoff;
378 	OffsetNumber i;
379 	float		best_penalty[INDEX_MAX_KEYS];
380 	GISTENTRY	entry,
381 				identry[INDEX_MAX_KEYS];
382 	bool		isnull[INDEX_MAX_KEYS];
383 	int			keep_current_best;
384 
385 	Assert(!GistPageIsLeaf(p));
386 
387 	gistDeCompressAtt(giststate, r,
388 					  it, NULL, (OffsetNumber) 0,
389 					  identry, isnull);
390 
391 	/* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
392 	result = FirstOffsetNumber;
393 
394 	/*
395 	 * The index may have multiple columns, and there's a penalty value for
396 	 * each column.  The penalty associated with a column that appears earlier
397 	 * in the index definition is strictly more important than the penalty of
398 	 * a column that appears later in the index definition.
399 	 *
400 	 * best_penalty[j] is the best penalty we have seen so far for column j,
401 	 * or -1 when we haven't yet examined column j.  Array entries to the
402 	 * right of the first -1 are undefined.
403 	 */
404 	best_penalty[0] = -1;
405 
406 	/*
407 	 * If we find a tuple that's exactly as good as the currently best one, we
408 	 * could use either one.  When inserting a lot of tuples with the same or
409 	 * similar keys, it's preferable to descend down the same path when
410 	 * possible, as that's more cache-friendly.  On the other hand, if all
411 	 * inserts land on the same leaf page after a split, we're never going to
412 	 * insert anything to the other half of the split, and will end up using
413 	 * only 50% of the available space.  Distributing the inserts evenly would
414 	 * lead to better space usage, but that hurts cache-locality during
415 	 * insertion.  To get the best of both worlds, when we find a tuple that's
416 	 * exactly as good as the previous best, choose randomly whether to stick
417 	 * to the old best, or use the new one.  Once we decide to stick to the
418 	 * old best, we keep sticking to it for any subsequent equally good tuples
419 	 * we might find.  This favors tuples with low offsets, but still allows
420 	 * some inserts to go to other equally-good subtrees.
421 	 *
422 	 * keep_current_best is -1 if we haven't yet had to make a random choice
423 	 * whether to keep the current best tuple.  If we have done so, and
424 	 * decided to keep it, keep_current_best is 1; if we've decided to
425 	 * replace, keep_current_best is 0.  (This state will be reset to -1 as
426 	 * soon as we've made the replacement, but sometimes we make the choice in
427 	 * advance of actually finding a replacement best tuple.)
428 	 */
429 	keep_current_best = -1;
430 
431 	/*
432 	 * Loop over tuples on page.
433 	 */
434 	maxoff = PageGetMaxOffsetNumber(p);
435 	Assert(maxoff >= FirstOffsetNumber);
436 
437 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
438 	{
439 		IndexTuple	itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
440 		bool		zero_penalty;
441 		int			j;
442 
443 		zero_penalty = true;
444 
445 		/* Loop over index attributes. */
446 		for (j = 0; j < IndexRelationGetNumberOfKeyAttributes(r); j++)
447 		{
448 			Datum		datum;
449 			float		usize;
450 			bool		IsNull;
451 
452 			/* Compute penalty for this column. */
453 			datum = index_getattr(itup, j + 1, giststate->leafTupdesc,
454 								  &IsNull);
455 			gistdentryinit(giststate, j, &entry, datum, r, p, i,
456 						   false, IsNull);
457 			usize = gistpenalty(giststate, j, &entry, IsNull,
458 								&identry[j], isnull[j]);
459 			if (usize > 0)
460 				zero_penalty = false;
461 
462 			if (best_penalty[j] < 0 || usize < best_penalty[j])
463 			{
464 				/*
465 				 * New best penalty for column.  Tentatively select this tuple
466 				 * as the target, and record the best penalty.  Then reset the
467 				 * next column's penalty to "unknown" (and indirectly, the
468 				 * same for all the ones to its right).  This will force us to
469 				 * adopt this tuple's penalty values as the best for all the
470 				 * remaining columns during subsequent loop iterations.
471 				 */
472 				result = i;
473 				best_penalty[j] = usize;
474 
475 				if (j < IndexRelationGetNumberOfKeyAttributes(r) - 1)
476 					best_penalty[j + 1] = -1;
477 
478 				/* we have new best, so reset keep-it decision */
479 				keep_current_best = -1;
480 			}
481 			else if (best_penalty[j] == usize)
482 			{
483 				/*
484 				 * The current tuple is exactly as good for this column as the
485 				 * best tuple seen so far.  The next iteration of this loop
486 				 * will compare the next column.
487 				 */
488 			}
489 			else
490 			{
491 				/*
492 				 * The current tuple is worse for this column than the best
493 				 * tuple seen so far.  Skip the remaining columns and move on
494 				 * to the next tuple, if any.
495 				 */
496 				zero_penalty = false;	/* so outer loop won't exit */
497 				break;
498 			}
499 		}
500 
501 		/*
502 		 * If we looped past the last column, and did not update "result",
503 		 * then this tuple is exactly as good as the prior best tuple.
504 		 */
505 		if (j == IndexRelationGetNumberOfKeyAttributes(r) && result != i)
506 		{
507 			if (keep_current_best == -1)
508 			{
509 				/* we didn't make the random choice yet for this old best */
510 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
511 			}
512 			if (keep_current_best == 0)
513 			{
514 				/* we choose to use the new tuple */
515 				result = i;
516 				/* choose again if there are even more exactly-as-good ones */
517 				keep_current_best = -1;
518 			}
519 		}
520 
521 		/*
522 		 * If we find a tuple with zero penalty for all columns, and we've
523 		 * decided we don't want to search for another tuple with equal
524 		 * penalty, there's no need to examine remaining tuples; just break
525 		 * out of the loop and return it.
526 		 */
527 		if (zero_penalty)
528 		{
529 			if (keep_current_best == -1)
530 			{
531 				/* we didn't make the random choice yet for this old best */
532 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
533 			}
534 			if (keep_current_best == 1)
535 				break;
536 		}
537 	}
538 
539 	return result;
540 }
541 
542 /*
543  * initialize a GiST entry with a decompressed version of key
544  */
545 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)546 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
547 			   Datum k, Relation r, Page pg, OffsetNumber o,
548 			   bool l, bool isNull)
549 {
550 	if (!isNull)
551 	{
552 		GISTENTRY  *dep;
553 
554 		gistentryinit(*e, k, r, pg, o, l);
555 
556 		/* there may not be a decompress function in opclass */
557 		if (!OidIsValid(giststate->decompressFn[nkey].fn_oid))
558 			return;
559 
560 		dep = (GISTENTRY *)
561 			DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
562 											  giststate->supportCollation[nkey],
563 											  PointerGetDatum(e)));
564 		/* decompressFn may just return the given pointer */
565 		if (dep != e)
566 			gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
567 						  dep->leafkey);
568 	}
569 	else
570 		gistentryinit(*e, (Datum) 0, r, pg, o, l);
571 }
572 
573 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum * attdata,bool * isnull,bool isleaf)574 gistFormTuple(GISTSTATE *giststate, Relation r,
575 			  Datum *attdata, bool *isnull, bool isleaf)
576 {
577 	Datum		compatt[INDEX_MAX_KEYS];
578 	IndexTuple	res;
579 
580 	gistCompressValues(giststate, r, attdata, isnull, isleaf, compatt);
581 
582 	res = index_form_tuple(isleaf ? giststate->leafTupdesc :
583 						   giststate->nonLeafTupdesc,
584 						   compatt, isnull);
585 
586 	/*
587 	 * The offset number on tuples on internal pages is unused. For historical
588 	 * reasons, it is set to 0xffff.
589 	 */
590 	ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
591 	return res;
592 }
593 
594 void
gistCompressValues(GISTSTATE * giststate,Relation r,Datum * attdata,bool * isnull,bool isleaf,Datum * compatt)595 gistCompressValues(GISTSTATE *giststate, Relation r,
596 				   Datum *attdata, bool *isnull, bool isleaf, Datum *compatt)
597 {
598 	int			i;
599 
600 	/*
601 	 * Call the compress method on each attribute.
602 	 */
603 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
604 	{
605 		if (isnull[i])
606 			compatt[i] = (Datum) 0;
607 		else
608 		{
609 			GISTENTRY	centry;
610 			GISTENTRY  *cep;
611 
612 			gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
613 						  isleaf);
614 			/* there may not be a compress function in opclass */
615 			if (OidIsValid(giststate->compressFn[i].fn_oid))
616 				cep = (GISTENTRY *)
617 					DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
618 													  giststate->supportCollation[i],
619 													  PointerGetDatum(&centry)));
620 			else
621 				cep = &centry;
622 			compatt[i] = cep->key;
623 		}
624 	}
625 
626 	if (isleaf)
627 	{
628 		/*
629 		 * Emplace each included attribute if any.
630 		 */
631 		for (; i < r->rd_att->natts; i++)
632 		{
633 			if (isnull[i])
634 				compatt[i] = (Datum) 0;
635 			else
636 				compatt[i] = attdata[i];
637 		}
638 	}
639 }
640 
641 /*
642  * initialize a GiST entry with fetched value in key field
643  */
644 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)645 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
646 {
647 	GISTENTRY	fentry;
648 	GISTENTRY  *fep;
649 
650 	gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
651 
652 	fep = (GISTENTRY *)
653 		DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
654 										  giststate->supportCollation[nkey],
655 										  PointerGetDatum(&fentry)));
656 
657 	/* fetchFn set 'key', return it to the caller */
658 	return fep->key;
659 }
660 
661 /*
662  * Fetch all keys in tuple.
663  * Returns a new HeapTuple containing the originally-indexed data.
664  */
665 HeapTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)666 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
667 {
668 	MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
669 	Datum		fetchatt[INDEX_MAX_KEYS];
670 	bool		isnull[INDEX_MAX_KEYS];
671 	int			i;
672 
673 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
674 	{
675 		Datum		datum;
676 
677 		datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
678 
679 		if (giststate->fetchFn[i].fn_oid != InvalidOid)
680 		{
681 			if (!isnull[i])
682 				fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
683 			else
684 				fetchatt[i] = (Datum) 0;
685 		}
686 		else if (giststate->compressFn[i].fn_oid == InvalidOid)
687 		{
688 			/*
689 			 * If opclass does not provide compress method that could change
690 			 * original value, att is necessarily stored in original form.
691 			 */
692 			if (!isnull[i])
693 				fetchatt[i] = datum;
694 			else
695 				fetchatt[i] = (Datum) 0;
696 		}
697 		else
698 		{
699 			/*
700 			 * Index-only scans not supported for this column. Since the
701 			 * planner chose an index-only scan anyway, it is not interested
702 			 * in this column, and we can replace it with a NULL.
703 			 */
704 			isnull[i] = true;
705 			fetchatt[i] = (Datum) 0;
706 		}
707 	}
708 
709 	/*
710 	 * Get each included attribute.
711 	 */
712 	for (; i < r->rd_att->natts; i++)
713 	{
714 		fetchatt[i] = index_getattr(tuple, i + 1, giststate->leafTupdesc,
715 									&isnull[i]);
716 	}
717 	MemoryContextSwitchTo(oldcxt);
718 
719 	return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
720 }
721 
722 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)723 gistpenalty(GISTSTATE *giststate, int attno,
724 			GISTENTRY *orig, bool isNullOrig,
725 			GISTENTRY *add, bool isNullAdd)
726 {
727 	float		penalty = 0.0;
728 
729 	if (giststate->penaltyFn[attno].fn_strict == false ||
730 		(isNullOrig == false && isNullAdd == false))
731 	{
732 		FunctionCall3Coll(&giststate->penaltyFn[attno],
733 						  giststate->supportCollation[attno],
734 						  PointerGetDatum(orig),
735 						  PointerGetDatum(add),
736 						  PointerGetDatum(&penalty));
737 		/* disallow negative or NaN penalty */
738 		if (isnan(penalty) || penalty < 0.0)
739 			penalty = 0.0;
740 	}
741 	else if (isNullOrig && isNullAdd)
742 		penalty = 0.0;
743 	else
744 	{
745 		/* try to prevent mixing null and non-null values */
746 		penalty = get_float4_infinity();
747 	}
748 
749 	return penalty;
750 }
751 
752 /*
753  * Initialize a new index page
754  */
755 void
gistinitpage(Page page,uint32 f)756 gistinitpage(Page page, uint32 f)
757 {
758 	GISTPageOpaque opaque;
759 
760 	PageInit(page, BLCKSZ, sizeof(GISTPageOpaqueData));
761 
762 	opaque = GistPageGetOpaque(page);
763 	opaque->rightlink = InvalidBlockNumber;
764 	opaque->flags = f;
765 	opaque->gist_page_id = GIST_PAGE_ID;
766 }
767 
768 /*
769  * Initialize a new index buffer
770  */
771 void
GISTInitBuffer(Buffer b,uint32 f)772 GISTInitBuffer(Buffer b, uint32 f)
773 {
774 	Page		page;
775 
776 	page = BufferGetPage(b);
777 	gistinitpage(page, f);
778 }
779 
780 /*
781  * Verify that a freshly-read page looks sane.
782  */
783 void
gistcheckpage(Relation rel,Buffer buf)784 gistcheckpage(Relation rel, Buffer buf)
785 {
786 	Page		page = BufferGetPage(buf);
787 
788 	/*
789 	 * ReadBuffer verifies that every newly-read page passes
790 	 * PageHeaderIsValid, which means it either contains a reasonably sane
791 	 * page header or is all-zero.  We have to defend against the all-zero
792 	 * case, however.
793 	 */
794 	if (PageIsNew(page))
795 		ereport(ERROR,
796 				(errcode(ERRCODE_INDEX_CORRUPTED),
797 				 errmsg("index \"%s\" contains unexpected zero page at block %u",
798 						RelationGetRelationName(rel),
799 						BufferGetBlockNumber(buf)),
800 				 errhint("Please REINDEX it.")));
801 
802 	/*
803 	 * Additionally check that the special area looks sane.
804 	 */
805 	if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
806 		ereport(ERROR,
807 				(errcode(ERRCODE_INDEX_CORRUPTED),
808 				 errmsg("index \"%s\" contains corrupted page at block %u",
809 						RelationGetRelationName(rel),
810 						BufferGetBlockNumber(buf)),
811 				 errhint("Please REINDEX it.")));
812 }
813 
814 
815 /*
816  * Allocate a new page (either by recycling, or by extending the index file)
817  *
818  * The returned buffer is already pinned and exclusive-locked
819  *
820  * Caller is responsible for initializing the page by calling GISTInitBuffer
821  */
822 Buffer
gistNewBuffer(Relation r)823 gistNewBuffer(Relation r)
824 {
825 	Buffer		buffer;
826 	bool		needLock;
827 
828 	/* First, try to get a page from FSM */
829 	for (;;)
830 	{
831 		BlockNumber blkno = GetFreeIndexPage(r);
832 
833 		if (blkno == InvalidBlockNumber)
834 			break;				/* nothing left in FSM */
835 
836 		buffer = ReadBuffer(r, blkno);
837 
838 		/*
839 		 * We have to guard against the possibility that someone else already
840 		 * recycled this page; the buffer may be locked if so.
841 		 */
842 		if (ConditionalLockBuffer(buffer))
843 		{
844 			Page		page = BufferGetPage(buffer);
845 
846 			/*
847 			 * If the page was never initialized, it's OK to use.
848 			 */
849 			if (PageIsNew(page))
850 				return buffer;
851 
852 			gistcheckpage(r, buffer);
853 
854 			/*
855 			 * Otherwise, recycle it if deleted, and too old to have any
856 			 * processes interested in it.
857 			 */
858 			if (gistPageRecyclable(page))
859 			{
860 				/*
861 				 * If we are generating WAL for Hot Standby then create a WAL
862 				 * record that will allow us to conflict with queries running
863 				 * on standby, in case they have snapshots older than the
864 				 * page's deleteXid.
865 				 */
866 				if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
867 					gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
868 
869 				return buffer;
870 			}
871 
872 			LockBuffer(buffer, GIST_UNLOCK);
873 		}
874 
875 		/* Can't use it, so release buffer and try again */
876 		ReleaseBuffer(buffer);
877 	}
878 
879 	/* Must extend the file */
880 	needLock = !RELATION_IS_LOCAL(r);
881 
882 	if (needLock)
883 		LockRelationForExtension(r, ExclusiveLock);
884 
885 	buffer = ReadBuffer(r, P_NEW);
886 	LockBuffer(buffer, GIST_EXCLUSIVE);
887 
888 	if (needLock)
889 		UnlockRelationForExtension(r, ExclusiveLock);
890 
891 	return buffer;
892 }
893 
894 /* Can this page be recycled yet? */
895 bool
gistPageRecyclable(Page page)896 gistPageRecyclable(Page page)
897 {
898 	if (PageIsNew(page))
899 		return true;
900 	if (GistPageIsDeleted(page))
901 	{
902 		/*
903 		 * The page was deleted, but when? If it was just deleted, a scan
904 		 * might have seen the downlink to it, and will read the page later.
905 		 * As long as that can happen, we must keep the deleted page around as
906 		 * a tombstone.
907 		 *
908 		 * For that check if the deletion XID could still be visible to
909 		 * anyone. If not, then no scan that's still in progress could have
910 		 * seen its downlink, and we can recycle it.
911 		 */
912 		FullTransactionId deletexid_full = GistPageGetDeleteXid(page);
913 
914 		return GlobalVisCheckRemovableFullXid(NULL, deletexid_full);
915 	}
916 	return false;
917 }
918 
919 bytea *
gistoptions(Datum reloptions,bool validate)920 gistoptions(Datum reloptions, bool validate)
921 {
922 	static const relopt_parse_elt tab[] = {
923 		{"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
924 		{"buffering", RELOPT_TYPE_ENUM, offsetof(GiSTOptions, buffering_mode)}
925 	};
926 
927 	return (bytea *) build_reloptions(reloptions, validate,
928 									  RELOPT_KIND_GIST,
929 									  sizeof(GiSTOptions),
930 									  tab, lengthof(tab));
931 }
932 
933 /*
934  *	gistproperty() -- Check boolean properties of indexes.
935  *
936  * This is optional for most AMs, but is required for GiST because the core
937  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.  We also handle
938  * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
939  */
940 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)941 gistproperty(Oid index_oid, int attno,
942 			 IndexAMProperty prop, const char *propname,
943 			 bool *res, bool *isnull)
944 {
945 	Oid			opclass,
946 				opfamily,
947 				opcintype;
948 	int16		procno;
949 
950 	/* Only answer column-level inquiries */
951 	if (attno == 0)
952 		return false;
953 
954 	/*
955 	 * Currently, GiST distance-ordered scans require that there be a distance
956 	 * function in the opclass with the default types (i.e. the one loaded
957 	 * into the relcache entry, see initGISTstate).  So we assume that if such
958 	 * a function exists, then there's a reason for it (rather than grubbing
959 	 * through all the opfamily's operators to find an ordered one).
960 	 *
961 	 * Essentially the same code can test whether we support returning the
962 	 * column data, since that's true if the opclass provides a fetch proc.
963 	 */
964 
965 	switch (prop)
966 	{
967 		case AMPROP_DISTANCE_ORDERABLE:
968 			procno = GIST_DISTANCE_PROC;
969 			break;
970 		case AMPROP_RETURNABLE:
971 			procno = GIST_FETCH_PROC;
972 			break;
973 		default:
974 			return false;
975 	}
976 
977 	/* First we need to know the column's opclass. */
978 	opclass = get_index_column_opclass(index_oid, attno);
979 	if (!OidIsValid(opclass))
980 	{
981 		*isnull = true;
982 		return true;
983 	}
984 
985 	/* Now look up the opclass family and input datatype. */
986 	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
987 	{
988 		*isnull = true;
989 		return true;
990 	}
991 
992 	/* And now we can check whether the function is provided. */
993 
994 	*res = SearchSysCacheExists4(AMPROCNUM,
995 								 ObjectIdGetDatum(opfamily),
996 								 ObjectIdGetDatum(opcintype),
997 								 ObjectIdGetDatum(opcintype),
998 								 Int16GetDatum(procno));
999 
1000 	/*
1001 	 * Special case: even without a fetch function, AMPROP_RETURNABLE is true
1002 	 * if the opclass has no compress function.
1003 	 */
1004 	if (prop == AMPROP_RETURNABLE && !*res)
1005 	{
1006 		*res = !SearchSysCacheExists4(AMPROCNUM,
1007 									  ObjectIdGetDatum(opfamily),
1008 									  ObjectIdGetDatum(opcintype),
1009 									  ObjectIdGetDatum(opcintype),
1010 									  Int16GetDatum(GIST_COMPRESS_PROC));
1011 	}
1012 
1013 	*isnull = false;
1014 
1015 	return true;
1016 }
1017 
1018 /*
1019  * Some indexes are not WAL-logged, but we need LSNs to detect concurrent page
1020  * splits anyway. This function provides a fake sequence of LSNs for that
1021  * purpose.
1022  */
1023 XLogRecPtr
gistGetFakeLSN(Relation rel)1024 gistGetFakeLSN(Relation rel)
1025 {
1026 	if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
1027 	{
1028 		/*
1029 		 * Temporary relations are only accessible in our session, so a simple
1030 		 * backend-local counter will do.
1031 		 */
1032 		static XLogRecPtr counter = FirstNormalUnloggedLSN;
1033 
1034 		return counter++;
1035 	}
1036 	else if (RelationIsPermanent(rel))
1037 	{
1038 		/*
1039 		 * WAL-logging on this relation will start after commit, so its LSNs
1040 		 * must be distinct numbers smaller than the LSN at the next commit.
1041 		 * Emit a dummy WAL record if insert-LSN hasn't advanced after the
1042 		 * last call.
1043 		 */
1044 		static XLogRecPtr lastlsn = InvalidXLogRecPtr;
1045 		XLogRecPtr	currlsn = GetXLogInsertRecPtr();
1046 
1047 		/* Shouldn't be called for WAL-logging relations */
1048 		Assert(!RelationNeedsWAL(rel));
1049 
1050 		/* No need for an actual record if we already have a distinct LSN */
1051 		if (!XLogRecPtrIsInvalid(lastlsn) && lastlsn == currlsn)
1052 			currlsn = gistXLogAssignLSN();
1053 
1054 		lastlsn = currlsn;
1055 		return currlsn;
1056 	}
1057 	else
1058 	{
1059 		/*
1060 		 * Unlogged relations are accessible from other backends, and survive
1061 		 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
1062 		 */
1063 		Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
1064 		return GetFakeLSNForUnloggedRel();
1065 	}
1066 }
1067