1 /*-------------------------------------------------------------------------
2  *
3  * gistutil.c
4  *	  utilities routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gist/gistutil.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include <math.h>
17 
18 #include "access/gist_private.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "catalog/pg_opclass.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 #include "utils/float.h"
25 #include "utils/syscache.h"
26 #include "utils/snapmgr.h"
27 #include "utils/lsyscache.h"
28 
29 
30 /*
31  * Write itup vector to page, has no control of free space.
32  */
33 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)34 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
35 {
36 	OffsetNumber l = InvalidOffsetNumber;
37 	int			i;
38 
39 	if (off == InvalidOffsetNumber)
40 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
41 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
42 
43 	for (i = 0; i < len; i++)
44 	{
45 		Size		sz = IndexTupleSize(itup[i]);
46 
47 		l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
48 		if (l == InvalidOffsetNumber)
49 			elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
50 				 i, len, (int) sz);
51 		off++;
52 	}
53 }
54 
55 /*
56  * Check space for itup vector on page
57  */
58 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)59 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
60 {
61 	unsigned int size = freespace,
62 				deleted = 0;
63 	int			i;
64 
65 	for (i = 0; i < len; i++)
66 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
67 
68 	if (todelete != InvalidOffsetNumber)
69 	{
70 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
71 
72 		deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
73 	}
74 
75 	return (PageGetFreeSpace(page) + deleted < size);
76 }
77 
78 bool
gistfitpage(IndexTuple * itvec,int len)79 gistfitpage(IndexTuple *itvec, int len)
80 {
81 	int			i;
82 	Size		size = 0;
83 
84 	for (i = 0; i < len; i++)
85 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
86 
87 	/* TODO: Consider fillfactor */
88 	return (size <= GiSTPageSize);
89 }
90 
91 /*
92  * Read buffer into itup vector
93  */
94 IndexTuple *
gistextractpage(Page page,int * len)95 gistextractpage(Page page, int *len /* out */ )
96 {
97 	OffsetNumber i,
98 				maxoff;
99 	IndexTuple *itvec;
100 
101 	maxoff = PageGetMaxOffsetNumber(page);
102 	*len = maxoff;
103 	itvec = palloc(sizeof(IndexTuple) * maxoff);
104 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
105 		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
106 
107 	return itvec;
108 }
109 
110 /*
111  * join two vectors into one
112  */
113 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)114 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
115 {
116 	itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
117 	memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
118 	*len += addlen;
119 	return itvec;
120 }
121 
122 /*
123  * make plain IndexTupleVector
124  */
125 
126 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)127 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
128 {
129 	char	   *ptr,
130 			   *ret;
131 	int			i;
132 
133 	*memlen = 0;
134 
135 	for (i = 0; i < veclen; i++)
136 		*memlen += IndexTupleSize(vec[i]);
137 
138 	ptr = ret = palloc(*memlen);
139 
140 	for (i = 0; i < veclen; i++)
141 	{
142 		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
143 		ptr += IndexTupleSize(vec[i]);
144 	}
145 
146 	return (IndexTupleData *) ret;
147 }
148 
149 /*
150  * Make unions of keys in IndexTuple vector (one union datum per index column).
151  * Union Datums are returned into the attr/isnull arrays.
152  * Resulting Datums aren't compressed.
153  */
154 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)155 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
156 				   Datum *attr, bool *isnull)
157 {
158 	int			i;
159 	GistEntryVector *evec;
160 	int			attrsize;
161 
162 	evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
163 
164 	for (i = 0; i < giststate->nonLeafTupdesc->natts; i++)
165 	{
166 		int			j;
167 
168 		/* Collect non-null datums for this column */
169 		evec->n = 0;
170 		for (j = 0; j < len; j++)
171 		{
172 			Datum		datum;
173 			bool		IsNull;
174 
175 			datum = index_getattr(itvec[j], i + 1, giststate->leafTupdesc,
176 								  &IsNull);
177 			if (IsNull)
178 				continue;
179 
180 			gistdentryinit(giststate, i,
181 						   evec->vector + evec->n,
182 						   datum,
183 						   NULL, NULL, (OffsetNumber) 0,
184 						   false, IsNull);
185 			evec->n++;
186 		}
187 
188 		/* If this column was all NULLs, the union is NULL */
189 		if (evec->n == 0)
190 		{
191 			attr[i] = (Datum) 0;
192 			isnull[i] = true;
193 		}
194 		else
195 		{
196 			if (evec->n == 1)
197 			{
198 				/* unionFn may expect at least two inputs */
199 				evec->n = 2;
200 				evec->vector[1] = evec->vector[0];
201 			}
202 
203 			/* Make union and store in attr array */
204 			attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
205 										giststate->supportCollation[i],
206 										PointerGetDatum(evec),
207 										PointerGetDatum(&attrsize));
208 
209 			isnull[i] = false;
210 		}
211 	}
212 }
213 
214 /*
215  * Return an IndexTuple containing the result of applying the "union"
216  * method to the specified IndexTuple vector.
217  */
218 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)219 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
220 {
221 	Datum		attr[INDEX_MAX_KEYS];
222 	bool		isnull[INDEX_MAX_KEYS];
223 
224 	gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
225 
226 	return gistFormTuple(giststate, r, attr, isnull, false);
227 }
228 
229 /*
230  * makes union of two key
231  */
232 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)233 gistMakeUnionKey(GISTSTATE *giststate, int attno,
234 				 GISTENTRY *entry1, bool isnull1,
235 				 GISTENTRY *entry2, bool isnull2,
236 				 Datum *dst, bool *dstisnull)
237 {
238 	/* we need a GistEntryVector with room for exactly 2 elements */
239 	union
240 	{
241 		GistEntryVector gev;
242 		char		padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
243 	}			storage;
244 	GistEntryVector *evec = &storage.gev;
245 	int			dstsize;
246 
247 	evec->n = 2;
248 
249 	if (isnull1 && isnull2)
250 	{
251 		*dstisnull = true;
252 		*dst = (Datum) 0;
253 	}
254 	else
255 	{
256 		if (isnull1 == false && isnull2 == false)
257 		{
258 			evec->vector[0] = *entry1;
259 			evec->vector[1] = *entry2;
260 		}
261 		else if (isnull1 == false)
262 		{
263 			evec->vector[0] = *entry1;
264 			evec->vector[1] = *entry1;
265 		}
266 		else
267 		{
268 			evec->vector[0] = *entry2;
269 			evec->vector[1] = *entry2;
270 		}
271 
272 		*dstisnull = false;
273 		*dst = FunctionCall2Coll(&giststate->unionFn[attno],
274 								 giststate->supportCollation[attno],
275 								 PointerGetDatum(evec),
276 								 PointerGetDatum(&dstsize));
277 	}
278 }
279 
280 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)281 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
282 {
283 	bool		result;
284 
285 	FunctionCall3Coll(&giststate->equalFn[attno],
286 					  giststate->supportCollation[attno],
287 					  a, b,
288 					  PointerGetDatum(&result));
289 	return result;
290 }
291 
292 /*
293  * Decompress all keys in tuple
294  */
295 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)296 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
297 				  OffsetNumber o, GISTENTRY *attdata, bool *isnull)
298 {
299 	int			i;
300 
301 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
302 	{
303 		Datum		datum;
304 
305 		datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
306 		gistdentryinit(giststate, i, &attdata[i],
307 					   datum, r, p, o,
308 					   false, isnull[i]);
309 	}
310 }
311 
312 /*
313  * Forms union of oldtup and addtup, if union == oldtup then return NULL
314  */
315 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)316 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
317 {
318 	bool		neednew = false;
319 	GISTENTRY	oldentries[INDEX_MAX_KEYS],
320 				addentries[INDEX_MAX_KEYS];
321 	bool		oldisnull[INDEX_MAX_KEYS],
322 				addisnull[INDEX_MAX_KEYS];
323 	Datum		attr[INDEX_MAX_KEYS];
324 	bool		isnull[INDEX_MAX_KEYS];
325 	IndexTuple	newtup = NULL;
326 	int			i;
327 
328 	gistDeCompressAtt(giststate, r, oldtup, NULL,
329 					  (OffsetNumber) 0, oldentries, oldisnull);
330 
331 	gistDeCompressAtt(giststate, r, addtup, NULL,
332 					  (OffsetNumber) 0, addentries, addisnull);
333 
334 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
335 	{
336 		gistMakeUnionKey(giststate, i,
337 						 oldentries + i, oldisnull[i],
338 						 addentries + i, addisnull[i],
339 						 attr + i, isnull + i);
340 
341 		if (neednew)
342 			/* we already need new key, so we can skip check */
343 			continue;
344 
345 		if (isnull[i])
346 			/* union of key may be NULL if and only if both keys are NULL */
347 			continue;
348 
349 		if (!addisnull[i])
350 		{
351 			if (oldisnull[i] ||
352 				!gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
353 				neednew = true;
354 		}
355 	}
356 
357 	if (neednew)
358 	{
359 		/* need to update key */
360 		newtup = gistFormTuple(giststate, r, attr, isnull, false);
361 		newtup->t_tid = oldtup->t_tid;
362 	}
363 
364 	return newtup;
365 }
366 
367 /*
368  * Search an upper index page for the entry with lowest penalty for insertion
369  * of the new index key contained in "it".
370  *
371  * Returns the index of the page entry to insert into.
372  */
373 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)374 gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
375 		   GISTSTATE *giststate)
376 {
377 	OffsetNumber result;
378 	OffsetNumber maxoff;
379 	OffsetNumber i;
380 	float		best_penalty[INDEX_MAX_KEYS];
381 	GISTENTRY	entry,
382 				identry[INDEX_MAX_KEYS];
383 	bool		isnull[INDEX_MAX_KEYS];
384 	int			keep_current_best;
385 
386 	Assert(!GistPageIsLeaf(p));
387 
388 	gistDeCompressAtt(giststate, r,
389 					  it, NULL, (OffsetNumber) 0,
390 					  identry, isnull);
391 
392 	/* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
393 	result = FirstOffsetNumber;
394 
395 	/*
396 	 * The index may have multiple columns, and there's a penalty value for
397 	 * each column.  The penalty associated with a column that appears earlier
398 	 * in the index definition is strictly more important than the penalty of
399 	 * a column that appears later in the index definition.
400 	 *
401 	 * best_penalty[j] is the best penalty we have seen so far for column j,
402 	 * or -1 when we haven't yet examined column j.  Array entries to the
403 	 * right of the first -1 are undefined.
404 	 */
405 	best_penalty[0] = -1;
406 
407 	/*
408 	 * If we find a tuple that's exactly as good as the currently best one, we
409 	 * could use either one.  When inserting a lot of tuples with the same or
410 	 * similar keys, it's preferable to descend down the same path when
411 	 * possible, as that's more cache-friendly.  On the other hand, if all
412 	 * inserts land on the same leaf page after a split, we're never going to
413 	 * insert anything to the other half of the split, and will end up using
414 	 * only 50% of the available space.  Distributing the inserts evenly would
415 	 * lead to better space usage, but that hurts cache-locality during
416 	 * insertion.  To get the best of both worlds, when we find a tuple that's
417 	 * exactly as good as the previous best, choose randomly whether to stick
418 	 * to the old best, or use the new one.  Once we decide to stick to the
419 	 * old best, we keep sticking to it for any subsequent equally good tuples
420 	 * we might find.  This favors tuples with low offsets, but still allows
421 	 * some inserts to go to other equally-good subtrees.
422 	 *
423 	 * keep_current_best is -1 if we haven't yet had to make a random choice
424 	 * whether to keep the current best tuple.  If we have done so, and
425 	 * decided to keep it, keep_current_best is 1; if we've decided to
426 	 * replace, keep_current_best is 0.  (This state will be reset to -1 as
427 	 * soon as we've made the replacement, but sometimes we make the choice in
428 	 * advance of actually finding a replacement best tuple.)
429 	 */
430 	keep_current_best = -1;
431 
432 	/*
433 	 * Loop over tuples on page.
434 	 */
435 	maxoff = PageGetMaxOffsetNumber(p);
436 	Assert(maxoff >= FirstOffsetNumber);
437 
438 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
439 	{
440 		IndexTuple	itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
441 		bool		zero_penalty;
442 		int			j;
443 
444 		zero_penalty = true;
445 
446 		/* Loop over index attributes. */
447 		for (j = 0; j < IndexRelationGetNumberOfKeyAttributes(r); j++)
448 		{
449 			Datum		datum;
450 			float		usize;
451 			bool		IsNull;
452 
453 			/* Compute penalty for this column. */
454 			datum = index_getattr(itup, j + 1, giststate->leafTupdesc,
455 								  &IsNull);
456 			gistdentryinit(giststate, j, &entry, datum, r, p, i,
457 						   false, IsNull);
458 			usize = gistpenalty(giststate, j, &entry, IsNull,
459 								&identry[j], isnull[j]);
460 			if (usize > 0)
461 				zero_penalty = false;
462 
463 			if (best_penalty[j] < 0 || usize < best_penalty[j])
464 			{
465 				/*
466 				 * New best penalty for column.  Tentatively select this tuple
467 				 * as the target, and record the best penalty.  Then reset the
468 				 * next column's penalty to "unknown" (and indirectly, the
469 				 * same for all the ones to its right).  This will force us to
470 				 * adopt this tuple's penalty values as the best for all the
471 				 * remaining columns during subsequent loop iterations.
472 				 */
473 				result = i;
474 				best_penalty[j] = usize;
475 
476 				if (j < IndexRelationGetNumberOfKeyAttributes(r) - 1)
477 					best_penalty[j + 1] = -1;
478 
479 				/* we have new best, so reset keep-it decision */
480 				keep_current_best = -1;
481 			}
482 			else if (best_penalty[j] == usize)
483 			{
484 				/*
485 				 * The current tuple is exactly as good for this column as the
486 				 * best tuple seen so far.  The next iteration of this loop
487 				 * will compare the next column.
488 				 */
489 			}
490 			else
491 			{
492 				/*
493 				 * The current tuple is worse for this column than the best
494 				 * tuple seen so far.  Skip the remaining columns and move on
495 				 * to the next tuple, if any.
496 				 */
497 				zero_penalty = false;	/* so outer loop won't exit */
498 				break;
499 			}
500 		}
501 
502 		/*
503 		 * If we looped past the last column, and did not update "result",
504 		 * then this tuple is exactly as good as the prior best tuple.
505 		 */
506 		if (j == IndexRelationGetNumberOfKeyAttributes(r) && result != i)
507 		{
508 			if (keep_current_best == -1)
509 			{
510 				/* we didn't make the random choice yet for this old best */
511 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
512 			}
513 			if (keep_current_best == 0)
514 			{
515 				/* we choose to use the new tuple */
516 				result = i;
517 				/* choose again if there are even more exactly-as-good ones */
518 				keep_current_best = -1;
519 			}
520 		}
521 
522 		/*
523 		 * If we find a tuple with zero penalty for all columns, and we've
524 		 * decided we don't want to search for another tuple with equal
525 		 * penalty, there's no need to examine remaining tuples; just break
526 		 * out of the loop and return it.
527 		 */
528 		if (zero_penalty)
529 		{
530 			if (keep_current_best == -1)
531 			{
532 				/* we didn't make the random choice yet for this old best */
533 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
534 			}
535 			if (keep_current_best == 1)
536 				break;
537 		}
538 	}
539 
540 	return result;
541 }
542 
543 /*
544  * initialize a GiST entry with a decompressed version of key
545  */
546 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)547 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
548 			   Datum k, Relation r, Page pg, OffsetNumber o,
549 			   bool l, bool isNull)
550 {
551 	if (!isNull)
552 	{
553 		GISTENTRY  *dep;
554 
555 		gistentryinit(*e, k, r, pg, o, l);
556 
557 		/* there may not be a decompress function in opclass */
558 		if (!OidIsValid(giststate->decompressFn[nkey].fn_oid))
559 			return;
560 
561 		dep = (GISTENTRY *)
562 			DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
563 											  giststate->supportCollation[nkey],
564 											  PointerGetDatum(e)));
565 		/* decompressFn may just return the given pointer */
566 		if (dep != e)
567 			gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
568 						  dep->leafkey);
569 	}
570 	else
571 		gistentryinit(*e, (Datum) 0, r, pg, o, l);
572 }
573 
574 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum attdata[],bool isnull[],bool isleaf)575 gistFormTuple(GISTSTATE *giststate, Relation r,
576 			  Datum attdata[], bool isnull[], bool isleaf)
577 {
578 	Datum		compatt[INDEX_MAX_KEYS];
579 	int			i;
580 	IndexTuple	res;
581 
582 	/*
583 	 * Call the compress method on each attribute.
584 	 */
585 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
586 	{
587 		if (isnull[i])
588 			compatt[i] = (Datum) 0;
589 		else
590 		{
591 			GISTENTRY	centry;
592 			GISTENTRY  *cep;
593 
594 			gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
595 						  isleaf);
596 			/* there may not be a compress function in opclass */
597 			if (OidIsValid(giststate->compressFn[i].fn_oid))
598 				cep = (GISTENTRY *)
599 					DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
600 													  giststate->supportCollation[i],
601 													  PointerGetDatum(&centry)));
602 			else
603 				cep = &centry;
604 			compatt[i] = cep->key;
605 		}
606 	}
607 
608 	if (isleaf)
609 	{
610 		/*
611 		 * Emplace each included attribute if any.
612 		 */
613 		for (; i < r->rd_att->natts; i++)
614 		{
615 			if (isnull[i])
616 				compatt[i] = (Datum) 0;
617 			else
618 				compatt[i] = attdata[i];
619 		}
620 	}
621 
622 	res = index_form_tuple(isleaf ? giststate->leafTupdesc :
623 						   giststate->nonLeafTupdesc,
624 						   compatt, isnull);
625 
626 	/*
627 	 * The offset number on tuples on internal pages is unused. For historical
628 	 * reasons, it is set to 0xffff.
629 	 */
630 	ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
631 	return res;
632 }
633 
634 /*
635  * initialize a GiST entry with fetched value in key field
636  */
637 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)638 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
639 {
640 	GISTENTRY	fentry;
641 	GISTENTRY  *fep;
642 
643 	gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
644 
645 	fep = (GISTENTRY *)
646 		DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
647 										  giststate->supportCollation[nkey],
648 										  PointerGetDatum(&fentry)));
649 
650 	/* fetchFn set 'key', return it to the caller */
651 	return fep->key;
652 }
653 
654 /*
655  * Fetch all keys in tuple.
656  * Returns a new HeapTuple containing the originally-indexed data.
657  */
658 HeapTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)659 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
660 {
661 	MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
662 	Datum		fetchatt[INDEX_MAX_KEYS];
663 	bool		isnull[INDEX_MAX_KEYS];
664 	int			i;
665 
666 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
667 	{
668 		Datum		datum;
669 
670 		datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
671 
672 		if (giststate->fetchFn[i].fn_oid != InvalidOid)
673 		{
674 			if (!isnull[i])
675 				fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
676 			else
677 				fetchatt[i] = (Datum) 0;
678 		}
679 		else if (giststate->compressFn[i].fn_oid == InvalidOid)
680 		{
681 			/*
682 			 * If opclass does not provide compress method that could change
683 			 * original value, att is necessarily stored in original form.
684 			 */
685 			if (!isnull[i])
686 				fetchatt[i] = datum;
687 			else
688 				fetchatt[i] = (Datum) 0;
689 		}
690 		else
691 		{
692 			/*
693 			 * Index-only scans not supported for this column. Since the
694 			 * planner chose an index-only scan anyway, it is not interested
695 			 * in this column, and we can replace it with a NULL.
696 			 */
697 			isnull[i] = true;
698 			fetchatt[i] = (Datum) 0;
699 		}
700 	}
701 
702 	/*
703 	 * Get each included attribute.
704 	 */
705 	for (; i < r->rd_att->natts; i++)
706 	{
707 		fetchatt[i] = index_getattr(tuple, i + 1, giststate->leafTupdesc,
708 									&isnull[i]);
709 	}
710 	MemoryContextSwitchTo(oldcxt);
711 
712 	return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
713 }
714 
715 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)716 gistpenalty(GISTSTATE *giststate, int attno,
717 			GISTENTRY *orig, bool isNullOrig,
718 			GISTENTRY *add, bool isNullAdd)
719 {
720 	float		penalty = 0.0;
721 
722 	if (giststate->penaltyFn[attno].fn_strict == false ||
723 		(isNullOrig == false && isNullAdd == false))
724 	{
725 		FunctionCall3Coll(&giststate->penaltyFn[attno],
726 						  giststate->supportCollation[attno],
727 						  PointerGetDatum(orig),
728 						  PointerGetDatum(add),
729 						  PointerGetDatum(&penalty));
730 		/* disallow negative or NaN penalty */
731 		if (isnan(penalty) || penalty < 0.0)
732 			penalty = 0.0;
733 	}
734 	else if (isNullOrig && isNullAdd)
735 		penalty = 0.0;
736 	else
737 	{
738 		/* try to prevent mixing null and non-null values */
739 		penalty = get_float4_infinity();
740 	}
741 
742 	return penalty;
743 }
744 
745 /*
746  * Initialize a new index page
747  */
748 void
GISTInitBuffer(Buffer b,uint32 f)749 GISTInitBuffer(Buffer b, uint32 f)
750 {
751 	GISTPageOpaque opaque;
752 	Page		page;
753 	Size		pageSize;
754 
755 	pageSize = BufferGetPageSize(b);
756 	page = BufferGetPage(b);
757 	PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
758 
759 	opaque = GistPageGetOpaque(page);
760 	/* page was already zeroed by PageInit, so this is not needed: */
761 	/* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
762 	opaque->rightlink = InvalidBlockNumber;
763 	opaque->flags = f;
764 	opaque->gist_page_id = GIST_PAGE_ID;
765 }
766 
767 /*
768  * Verify that a freshly-read page looks sane.
769  */
770 void
gistcheckpage(Relation rel,Buffer buf)771 gistcheckpage(Relation rel, Buffer buf)
772 {
773 	Page		page = BufferGetPage(buf);
774 
775 	/*
776 	 * ReadBuffer verifies that every newly-read page passes
777 	 * PageHeaderIsValid, which means it either contains a reasonably sane
778 	 * page header or is all-zero.  We have to defend against the all-zero
779 	 * case, however.
780 	 */
781 	if (PageIsNew(page))
782 		ereport(ERROR,
783 				(errcode(ERRCODE_INDEX_CORRUPTED),
784 				 errmsg("index \"%s\" contains unexpected zero page at block %u",
785 						RelationGetRelationName(rel),
786 						BufferGetBlockNumber(buf)),
787 				 errhint("Please REINDEX it.")));
788 
789 	/*
790 	 * Additionally check that the special area looks sane.
791 	 */
792 	if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
793 		ereport(ERROR,
794 				(errcode(ERRCODE_INDEX_CORRUPTED),
795 				 errmsg("index \"%s\" contains corrupted page at block %u",
796 						RelationGetRelationName(rel),
797 						BufferGetBlockNumber(buf)),
798 				 errhint("Please REINDEX it.")));
799 }
800 
801 
802 /*
803  * Allocate a new page (either by recycling, or by extending the index file)
804  *
805  * The returned buffer is already pinned and exclusive-locked
806  *
807  * Caller is responsible for initializing the page by calling GISTInitBuffer
808  */
809 Buffer
gistNewBuffer(Relation r)810 gistNewBuffer(Relation r)
811 {
812 	Buffer		buffer;
813 	bool		needLock;
814 
815 	/* First, try to get a page from FSM */
816 	for (;;)
817 	{
818 		BlockNumber blkno = GetFreeIndexPage(r);
819 
820 		if (blkno == InvalidBlockNumber)
821 			break;				/* nothing left in FSM */
822 
823 		buffer = ReadBuffer(r, blkno);
824 
825 		/*
826 		 * We have to guard against the possibility that someone else already
827 		 * recycled this page; the buffer may be locked if so.
828 		 */
829 		if (ConditionalLockBuffer(buffer))
830 		{
831 			Page		page = BufferGetPage(buffer);
832 
833 			/*
834 			 * If the page was never initialized, it's OK to use.
835 			 */
836 			if (PageIsNew(page))
837 				return buffer;
838 
839 			gistcheckpage(r, buffer);
840 
841 			/*
842 			 * Otherwise, recycle it if deleted, and too old to have any
843 			 * processes interested in it.
844 			 */
845 			if (gistPageRecyclable(page))
846 			{
847 				/*
848 				 * If we are generating WAL for Hot Standby then create a WAL
849 				 * record that will allow us to conflict with queries running
850 				 * on standby, in case they have snapshots older than the
851 				 * page's deleteXid.
852 				 */
853 				if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
854 					gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
855 
856 				return buffer;
857 			}
858 
859 			LockBuffer(buffer, GIST_UNLOCK);
860 		}
861 
862 		/* Can't use it, so release buffer and try again */
863 		ReleaseBuffer(buffer);
864 	}
865 
866 	/* Must extend the file */
867 	needLock = !RELATION_IS_LOCAL(r);
868 
869 	if (needLock)
870 		LockRelationForExtension(r, ExclusiveLock);
871 
872 	buffer = ReadBuffer(r, P_NEW);
873 	LockBuffer(buffer, GIST_EXCLUSIVE);
874 
875 	if (needLock)
876 		UnlockRelationForExtension(r, ExclusiveLock);
877 
878 	return buffer;
879 }
880 
881 /* Can this page be recycled yet? */
882 bool
gistPageRecyclable(Page page)883 gistPageRecyclable(Page page)
884 {
885 	if (PageIsNew(page))
886 		return true;
887 	if (GistPageIsDeleted(page))
888 	{
889 		/*
890 		 * The page was deleted, but when? If it was just deleted, a scan
891 		 * might have seen the downlink to it, and will read the page later.
892 		 * As long as that can happen, we must keep the deleted page around as
893 		 * a tombstone.
894 		 *
895 		 * Compare the deletion XID with RecentGlobalXmin. If deleteXid <
896 		 * RecentGlobalXmin, then no scan that's still in progress could have
897 		 * seen its downlink, and we can recycle it.
898 		 */
899 		FullTransactionId deletexid_full = GistPageGetDeleteXid(page);
900 		FullTransactionId recentxmin_full = GetFullRecentGlobalXmin();
901 
902 		if (FullTransactionIdPrecedes(deletexid_full, recentxmin_full))
903 			return true;
904 	}
905 	return false;
906 }
907 
908 bytea *
gistoptions(Datum reloptions,bool validate)909 gistoptions(Datum reloptions, bool validate)
910 {
911 	relopt_value *options;
912 	GiSTOptions *rdopts;
913 	int			numoptions;
914 	static const relopt_parse_elt tab[] = {
915 		{"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
916 		{"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
917 	};
918 
919 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
920 							  &numoptions);
921 
922 	/* if none set, we're done */
923 	if (numoptions == 0)
924 		return NULL;
925 
926 	rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
927 
928 	fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
929 				   validate, tab, lengthof(tab));
930 
931 	pfree(options);
932 
933 	return (bytea *) rdopts;
934 }
935 
936 /*
937  *	gistproperty() -- Check boolean properties of indexes.
938  *
939  * This is optional for most AMs, but is required for GiST because the core
940  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.  We also handle
941  * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
942  */
943 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)944 gistproperty(Oid index_oid, int attno,
945 			 IndexAMProperty prop, const char *propname,
946 			 bool *res, bool *isnull)
947 {
948 	Oid			opclass,
949 				opfamily,
950 				opcintype;
951 	int16		procno;
952 
953 	/* Only answer column-level inquiries */
954 	if (attno == 0)
955 		return false;
956 
957 	/*
958 	 * Currently, GiST distance-ordered scans require that there be a distance
959 	 * function in the opclass with the default types (i.e. the one loaded
960 	 * into the relcache entry, see initGISTstate).  So we assume that if such
961 	 * a function exists, then there's a reason for it (rather than grubbing
962 	 * through all the opfamily's operators to find an ordered one).
963 	 *
964 	 * Essentially the same code can test whether we support returning the
965 	 * column data, since that's true if the opclass provides a fetch proc.
966 	 */
967 
968 	switch (prop)
969 	{
970 		case AMPROP_DISTANCE_ORDERABLE:
971 			procno = GIST_DISTANCE_PROC;
972 			break;
973 		case AMPROP_RETURNABLE:
974 			procno = GIST_FETCH_PROC;
975 			break;
976 		default:
977 			return false;
978 	}
979 
980 	/* First we need to know the column's opclass. */
981 	opclass = get_index_column_opclass(index_oid, attno);
982 	if (!OidIsValid(opclass))
983 	{
984 		*isnull = true;
985 		return true;
986 	}
987 
988 	/* Now look up the opclass family and input datatype. */
989 	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
990 	{
991 		*isnull = true;
992 		return true;
993 	}
994 
995 	/* And now we can check whether the function is provided. */
996 
997 	*res = SearchSysCacheExists4(AMPROCNUM,
998 								 ObjectIdGetDatum(opfamily),
999 								 ObjectIdGetDatum(opcintype),
1000 								 ObjectIdGetDatum(opcintype),
1001 								 Int16GetDatum(procno));
1002 
1003 	/*
1004 	 * Special case: even without a fetch function, AMPROP_RETURNABLE is true
1005 	 * if the opclass has no compress function.
1006 	 */
1007 	if (prop == AMPROP_RETURNABLE && !*res)
1008 	{
1009 		*res = !SearchSysCacheExists4(AMPROCNUM,
1010 									  ObjectIdGetDatum(opfamily),
1011 									  ObjectIdGetDatum(opcintype),
1012 									  ObjectIdGetDatum(opcintype),
1013 									  Int16GetDatum(GIST_COMPRESS_PROC));
1014 	}
1015 
1016 	*isnull = false;
1017 
1018 	return true;
1019 }
1020 
1021 /*
1022  * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
1023  * to detect concurrent page splits anyway. This function provides a fake
1024  * sequence of LSNs for that purpose.
1025  */
1026 XLogRecPtr
gistGetFakeLSN(Relation rel)1027 gistGetFakeLSN(Relation rel)
1028 {
1029 	static XLogRecPtr counter = FirstNormalUnloggedLSN;
1030 
1031 	if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
1032 	{
1033 		/*
1034 		 * Temporary relations are only accessible in our session, so a simple
1035 		 * backend-local counter will do.
1036 		 */
1037 		return counter++;
1038 	}
1039 	else
1040 	{
1041 		/*
1042 		 * Unlogged relations are accessible from other backends, and survive
1043 		 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
1044 		 */
1045 		Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
1046 		return GetFakeLSNForUnloggedRel();
1047 	}
1048 }
1049