1 /*-------------------------------------------------------------------------
2  *
3  * gistutil.c
4  *	  utilities routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gist/gistutil.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include <float.h>
17 #include <math.h>
18 
19 #include "access/gist_private.h"
20 #include "access/htup_details.h"
21 #include "access/reloptions.h"
22 #include "catalog/pg_opclass.h"
23 #include "storage/indexfsm.h"
24 #include "storage/lmgr.h"
25 #include "utils/builtins.h"
26 #include "utils/syscache.h"
27 
28 
29 /*
30  * Write itup vector to page, has no control of free space.
31  */
32 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)33 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
34 {
35 	OffsetNumber l = InvalidOffsetNumber;
36 	int			i;
37 
38 	if (off == InvalidOffsetNumber)
39 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
40 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
41 
42 	for (i = 0; i < len; i++)
43 	{
44 		Size		sz = IndexTupleSize(itup[i]);
45 
46 		l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
47 		if (l == InvalidOffsetNumber)
48 			elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
49 				 i, len, (int) sz);
50 		off++;
51 	}
52 }
53 
54 /*
55  * Check space for itup vector on page
56  */
57 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)58 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
59 {
60 	unsigned int size = freespace,
61 				deleted = 0;
62 	int			i;
63 
64 	for (i = 0; i < len; i++)
65 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
66 
67 	if (todelete != InvalidOffsetNumber)
68 	{
69 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
70 
71 		deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
72 	}
73 
74 	return (PageGetFreeSpace(page) + deleted < size);
75 }
76 
77 bool
gistfitpage(IndexTuple * itvec,int len)78 gistfitpage(IndexTuple *itvec, int len)
79 {
80 	int			i;
81 	Size		size = 0;
82 
83 	for (i = 0; i < len; i++)
84 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
85 
86 	/* TODO: Consider fillfactor */
87 	return (size <= GiSTPageSize);
88 }
89 
90 /*
91  * Read buffer into itup vector
92  */
93 IndexTuple *
gistextractpage(Page page,int * len)94 gistextractpage(Page page, int *len /* out */ )
95 {
96 	OffsetNumber i,
97 				maxoff;
98 	IndexTuple *itvec;
99 
100 	maxoff = PageGetMaxOffsetNumber(page);
101 	*len = maxoff;
102 	itvec = palloc(sizeof(IndexTuple) * maxoff);
103 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
104 		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
105 
106 	return itvec;
107 }
108 
109 /*
110  * join two vectors into one
111  */
112 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)113 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
114 {
115 	itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
116 	memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
117 	*len += addlen;
118 	return itvec;
119 }
120 
121 /*
122  * make plain IndexTupleVector
123  */
124 
125 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)126 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
127 {
128 	char	   *ptr,
129 			   *ret;
130 	int			i;
131 
132 	*memlen = 0;
133 
134 	for (i = 0; i < veclen; i++)
135 		*memlen += IndexTupleSize(vec[i]);
136 
137 	ptr = ret = palloc(*memlen);
138 
139 	for (i = 0; i < veclen; i++)
140 	{
141 		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
142 		ptr += IndexTupleSize(vec[i]);
143 	}
144 
145 	return (IndexTupleData *) ret;
146 }
147 
148 /*
149  * Make unions of keys in IndexTuple vector (one union datum per index column).
150  * Union Datums are returned into the attr/isnull arrays.
151  * Resulting Datums aren't compressed.
152  */
153 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)154 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
155 				   Datum *attr, bool *isnull)
156 {
157 	int			i;
158 	GistEntryVector *evec;
159 	int			attrsize;
160 
161 	evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
162 
163 	for (i = 0; i < giststate->tupdesc->natts; i++)
164 	{
165 		int			j;
166 
167 		/* Collect non-null datums for this column */
168 		evec->n = 0;
169 		for (j = 0; j < len; j++)
170 		{
171 			Datum		datum;
172 			bool		IsNull;
173 
174 			datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
175 			if (IsNull)
176 				continue;
177 
178 			gistdentryinit(giststate, i,
179 						   evec->vector + evec->n,
180 						   datum,
181 						   NULL, NULL, (OffsetNumber) 0,
182 						   FALSE, IsNull);
183 			evec->n++;
184 		}
185 
186 		/* If this column was all NULLs, the union is NULL */
187 		if (evec->n == 0)
188 		{
189 			attr[i] = (Datum) 0;
190 			isnull[i] = TRUE;
191 		}
192 		else
193 		{
194 			if (evec->n == 1)
195 			{
196 				/* unionFn may expect at least two inputs */
197 				evec->n = 2;
198 				evec->vector[1] = evec->vector[0];
199 			}
200 
201 			/* Make union and store in attr array */
202 			attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
203 										giststate->supportCollation[i],
204 										PointerGetDatum(evec),
205 										PointerGetDatum(&attrsize));
206 
207 			isnull[i] = FALSE;
208 		}
209 	}
210 }
211 
212 /*
213  * Return an IndexTuple containing the result of applying the "union"
214  * method to the specified IndexTuple vector.
215  */
216 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)217 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
218 {
219 	Datum		attr[INDEX_MAX_KEYS];
220 	bool		isnull[INDEX_MAX_KEYS];
221 
222 	gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
223 
224 	return gistFormTuple(giststate, r, attr, isnull, false);
225 }
226 
227 /*
228  * makes union of two key
229  */
230 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)231 gistMakeUnionKey(GISTSTATE *giststate, int attno,
232 				 GISTENTRY *entry1, bool isnull1,
233 				 GISTENTRY *entry2, bool isnull2,
234 				 Datum *dst, bool *dstisnull)
235 {
236 	/* we need a GistEntryVector with room for exactly 2 elements */
237 	union
238 	{
239 		GistEntryVector gev;
240 		char		padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
241 	}			storage;
242 	GistEntryVector *evec = &storage.gev;
243 	int			dstsize;
244 
245 	evec->n = 2;
246 
247 	if (isnull1 && isnull2)
248 	{
249 		*dstisnull = TRUE;
250 		*dst = (Datum) 0;
251 	}
252 	else
253 	{
254 		if (isnull1 == FALSE && isnull2 == FALSE)
255 		{
256 			evec->vector[0] = *entry1;
257 			evec->vector[1] = *entry2;
258 		}
259 		else if (isnull1 == FALSE)
260 		{
261 			evec->vector[0] = *entry1;
262 			evec->vector[1] = *entry1;
263 		}
264 		else
265 		{
266 			evec->vector[0] = *entry2;
267 			evec->vector[1] = *entry2;
268 		}
269 
270 		*dstisnull = FALSE;
271 		*dst = FunctionCall2Coll(&giststate->unionFn[attno],
272 								 giststate->supportCollation[attno],
273 								 PointerGetDatum(evec),
274 								 PointerGetDatum(&dstsize));
275 	}
276 }
277 
278 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)279 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
280 {
281 	bool		result;
282 
283 	FunctionCall3Coll(&giststate->equalFn[attno],
284 					  giststate->supportCollation[attno],
285 					  a, b,
286 					  PointerGetDatum(&result));
287 	return result;
288 }
289 
290 /*
291  * Decompress all keys in tuple
292  */
293 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)294 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
295 				  OffsetNumber o, GISTENTRY *attdata, bool *isnull)
296 {
297 	int			i;
298 
299 	for (i = 0; i < r->rd_att->natts; i++)
300 	{
301 		Datum		datum;
302 
303 		datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
304 		gistdentryinit(giststate, i, &attdata[i],
305 					   datum, r, p, o,
306 					   FALSE, isnull[i]);
307 	}
308 }
309 
310 /*
311  * Forms union of oldtup and addtup, if union == oldtup then return NULL
312  */
313 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)314 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
315 {
316 	bool		neednew = FALSE;
317 	GISTENTRY	oldentries[INDEX_MAX_KEYS],
318 				addentries[INDEX_MAX_KEYS];
319 	bool		oldisnull[INDEX_MAX_KEYS],
320 				addisnull[INDEX_MAX_KEYS];
321 	Datum		attr[INDEX_MAX_KEYS];
322 	bool		isnull[INDEX_MAX_KEYS];
323 	IndexTuple	newtup = NULL;
324 	int			i;
325 
326 	gistDeCompressAtt(giststate, r, oldtup, NULL,
327 					  (OffsetNumber) 0, oldentries, oldisnull);
328 
329 	gistDeCompressAtt(giststate, r, addtup, NULL,
330 					  (OffsetNumber) 0, addentries, addisnull);
331 
332 	for (i = 0; i < r->rd_att->natts; i++)
333 	{
334 		gistMakeUnionKey(giststate, i,
335 						 oldentries + i, oldisnull[i],
336 						 addentries + i, addisnull[i],
337 						 attr + i, isnull + i);
338 
339 		if (neednew)
340 			/* we already need new key, so we can skip check */
341 			continue;
342 
343 		if (isnull[i])
344 			/* union of key may be NULL if and only if both keys are NULL */
345 			continue;
346 
347 		if (!addisnull[i])
348 		{
349 			if (oldisnull[i] ||
350 				!gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
351 				neednew = true;
352 		}
353 	}
354 
355 	if (neednew)
356 	{
357 		/* need to update key */
358 		newtup = gistFormTuple(giststate, r, attr, isnull, false);
359 		newtup->t_tid = oldtup->t_tid;
360 	}
361 
362 	return newtup;
363 }
364 
365 /*
366  * Search an upper index page for the entry with lowest penalty for insertion
367  * of the new index key contained in "it".
368  *
369  * Returns the index of the page entry to insert into.
370  */
371 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)372 gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
373 		   GISTSTATE *giststate)
374 {
375 	OffsetNumber result;
376 	OffsetNumber maxoff;
377 	OffsetNumber i;
378 	float		best_penalty[INDEX_MAX_KEYS];
379 	GISTENTRY	entry,
380 				identry[INDEX_MAX_KEYS];
381 	bool		isnull[INDEX_MAX_KEYS];
382 	int			keep_current_best;
383 
384 	Assert(!GistPageIsLeaf(p));
385 
386 	gistDeCompressAtt(giststate, r,
387 					  it, NULL, (OffsetNumber) 0,
388 					  identry, isnull);
389 
390 	/* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
391 	result = FirstOffsetNumber;
392 
393 	/*
394 	 * The index may have multiple columns, and there's a penalty value for
395 	 * each column.  The penalty associated with a column that appears earlier
396 	 * in the index definition is strictly more important than the penalty of
397 	 * a column that appears later in the index definition.
398 	 *
399 	 * best_penalty[j] is the best penalty we have seen so far for column j,
400 	 * or -1 when we haven't yet examined column j.  Array entries to the
401 	 * right of the first -1 are undefined.
402 	 */
403 	best_penalty[0] = -1;
404 
405 	/*
406 	 * If we find a tuple that's exactly as good as the currently best one, we
407 	 * could use either one.  When inserting a lot of tuples with the same or
408 	 * similar keys, it's preferable to descend down the same path when
409 	 * possible, as that's more cache-friendly.  On the other hand, if all
410 	 * inserts land on the same leaf page after a split, we're never going to
411 	 * insert anything to the other half of the split, and will end up using
412 	 * only 50% of the available space.  Distributing the inserts evenly would
413 	 * lead to better space usage, but that hurts cache-locality during
414 	 * insertion.  To get the best of both worlds, when we find a tuple that's
415 	 * exactly as good as the previous best, choose randomly whether to stick
416 	 * to the old best, or use the new one.  Once we decide to stick to the
417 	 * old best, we keep sticking to it for any subsequent equally good tuples
418 	 * we might find.  This favors tuples with low offsets, but still allows
419 	 * some inserts to go to other equally-good subtrees.
420 	 *
421 	 * keep_current_best is -1 if we haven't yet had to make a random choice
422 	 * whether to keep the current best tuple.  If we have done so, and
423 	 * decided to keep it, keep_current_best is 1; if we've decided to
424 	 * replace, keep_current_best is 0.  (This state will be reset to -1 as
425 	 * soon as we've made the replacement, but sometimes we make the choice in
426 	 * advance of actually finding a replacement best tuple.)
427 	 */
428 	keep_current_best = -1;
429 
430 	/*
431 	 * Loop over tuples on page.
432 	 */
433 	maxoff = PageGetMaxOffsetNumber(p);
434 	Assert(maxoff >= FirstOffsetNumber);
435 
436 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
437 	{
438 		IndexTuple	itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
439 		bool		zero_penalty;
440 		int			j;
441 
442 		zero_penalty = true;
443 
444 		/* Loop over index attributes. */
445 		for (j = 0; j < r->rd_att->natts; j++)
446 		{
447 			Datum		datum;
448 			float		usize;
449 			bool		IsNull;
450 
451 			/* Compute penalty for this column. */
452 			datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
453 			gistdentryinit(giststate, j, &entry, datum, r, p, i,
454 						   FALSE, IsNull);
455 			usize = gistpenalty(giststate, j, &entry, IsNull,
456 								&identry[j], isnull[j]);
457 			if (usize > 0)
458 				zero_penalty = false;
459 
460 			if (best_penalty[j] < 0 || usize < best_penalty[j])
461 			{
462 				/*
463 				 * New best penalty for column.  Tentatively select this tuple
464 				 * as the target, and record the best penalty.  Then reset the
465 				 * next column's penalty to "unknown" (and indirectly, the
466 				 * same for all the ones to its right).  This will force us to
467 				 * adopt this tuple's penalty values as the best for all the
468 				 * remaining columns during subsequent loop iterations.
469 				 */
470 				result = i;
471 				best_penalty[j] = usize;
472 
473 				if (j < r->rd_att->natts - 1)
474 					best_penalty[j + 1] = -1;
475 
476 				/* we have new best, so reset keep-it decision */
477 				keep_current_best = -1;
478 			}
479 			else if (best_penalty[j] == usize)
480 			{
481 				/*
482 				 * The current tuple is exactly as good for this column as the
483 				 * best tuple seen so far.  The next iteration of this loop
484 				 * will compare the next column.
485 				 */
486 			}
487 			else
488 			{
489 				/*
490 				 * The current tuple is worse for this column than the best
491 				 * tuple seen so far.  Skip the remaining columns and move on
492 				 * to the next tuple, if any.
493 				 */
494 				zero_penalty = false;	/* so outer loop won't exit */
495 				break;
496 			}
497 		}
498 
499 		/*
500 		 * If we looped past the last column, and did not update "result",
501 		 * then this tuple is exactly as good as the prior best tuple.
502 		 */
503 		if (j == r->rd_att->natts && result != i)
504 		{
505 			if (keep_current_best == -1)
506 			{
507 				/* we didn't make the random choice yet for this old best */
508 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
509 			}
510 			if (keep_current_best == 0)
511 			{
512 				/* we choose to use the new tuple */
513 				result = i;
514 				/* choose again if there are even more exactly-as-good ones */
515 				keep_current_best = -1;
516 			}
517 		}
518 
519 		/*
520 		 * If we find a tuple with zero penalty for all columns, and we've
521 		 * decided we don't want to search for another tuple with equal
522 		 * penalty, there's no need to examine remaining tuples; just break
523 		 * out of the loop and return it.
524 		 */
525 		if (zero_penalty)
526 		{
527 			if (keep_current_best == -1)
528 			{
529 				/* we didn't make the random choice yet for this old best */
530 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
531 			}
532 			if (keep_current_best == 1)
533 				break;
534 		}
535 	}
536 
537 	return result;
538 }
539 
540 /*
541  * initialize a GiST entry with a decompressed version of key
542  */
543 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)544 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
545 			   Datum k, Relation r, Page pg, OffsetNumber o,
546 			   bool l, bool isNull)
547 {
548 	if (!isNull)
549 	{
550 		GISTENTRY  *dep;
551 
552 		gistentryinit(*e, k, r, pg, o, l);
553 		dep = (GISTENTRY *)
554 			DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
555 											  giststate->supportCollation[nkey],
556 											  PointerGetDatum(e)));
557 		/* decompressFn may just return the given pointer */
558 		if (dep != e)
559 			gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
560 						  dep->leafkey);
561 	}
562 	else
563 		gistentryinit(*e, (Datum) 0, r, pg, o, l);
564 }
565 
566 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum attdata[],bool isnull[],bool isleaf)567 gistFormTuple(GISTSTATE *giststate, Relation r,
568 			  Datum attdata[], bool isnull[], bool isleaf)
569 {
570 	Datum		compatt[INDEX_MAX_KEYS];
571 	int			i;
572 	IndexTuple	res;
573 
574 	/*
575 	 * Call the compress method on each attribute.
576 	 */
577 	for (i = 0; i < r->rd_att->natts; i++)
578 	{
579 		if (isnull[i])
580 			compatt[i] = (Datum) 0;
581 		else
582 		{
583 			GISTENTRY	centry;
584 			GISTENTRY  *cep;
585 
586 			gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
587 						  isleaf);
588 			cep = (GISTENTRY *)
589 				DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
590 												  giststate->supportCollation[i],
591 												  PointerGetDatum(&centry)));
592 			compatt[i] = cep->key;
593 		}
594 	}
595 
596 	res = index_form_tuple(giststate->tupdesc, compatt, isnull);
597 
598 	/*
599 	 * The offset number on tuples on internal pages is unused. For historical
600 	 * reasons, it is set to 0xffff.
601 	 */
602 	ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
603 	return res;
604 }
605 
606 /*
607  * initialize a GiST entry with fetched value in key field
608  */
609 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)610 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
611 {
612 	GISTENTRY	fentry;
613 	GISTENTRY  *fep;
614 
615 	gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
616 
617 	fep = (GISTENTRY *)
618 		DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
619 										  giststate->supportCollation[nkey],
620 										  PointerGetDatum(&fentry)));
621 
622 	/* fetchFn set 'key', return it to the caller */
623 	return fep->key;
624 }
625 
626 /*
627  * Fetch all keys in tuple.
628  * Returns a new HeapTuple containing the originally-indexed data.
629  */
630 HeapTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)631 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
632 {
633 	MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
634 	Datum		fetchatt[INDEX_MAX_KEYS];
635 	bool		isnull[INDEX_MAX_KEYS];
636 	int			i;
637 
638 	for (i = 0; i < r->rd_att->natts; i++)
639 	{
640 		Datum		datum;
641 
642 		datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
643 
644 		if (giststate->fetchFn[i].fn_oid != InvalidOid)
645 		{
646 			if (!isnull[i])
647 				fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
648 			else
649 				fetchatt[i] = (Datum) 0;
650 		}
651 		else
652 		{
653 			/*
654 			 * Index-only scans not supported for this column. Since the
655 			 * planner chose an index-only scan anyway, it is not interested
656 			 * in this column, and we can replace it with a NULL.
657 			 */
658 			isnull[i] = true;
659 			fetchatt[i] = (Datum) 0;
660 		}
661 	}
662 	MemoryContextSwitchTo(oldcxt);
663 
664 	return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
665 }
666 
667 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)668 gistpenalty(GISTSTATE *giststate, int attno,
669 			GISTENTRY *orig, bool isNullOrig,
670 			GISTENTRY *add, bool isNullAdd)
671 {
672 	float		penalty = 0.0;
673 
674 	if (giststate->penaltyFn[attno].fn_strict == FALSE ||
675 		(isNullOrig == FALSE && isNullAdd == FALSE))
676 	{
677 		FunctionCall3Coll(&giststate->penaltyFn[attno],
678 						  giststate->supportCollation[attno],
679 						  PointerGetDatum(orig),
680 						  PointerGetDatum(add),
681 						  PointerGetDatum(&penalty));
682 		/* disallow negative or NaN penalty */
683 		if (isnan(penalty) || penalty < 0.0)
684 			penalty = 0.0;
685 	}
686 	else if (isNullOrig && isNullAdd)
687 		penalty = 0.0;
688 	else
689 	{
690 		/* try to prevent mixing null and non-null values */
691 		penalty = get_float4_infinity();
692 	}
693 
694 	return penalty;
695 }
696 
697 /*
698  * Initialize a new index page
699  */
700 void
GISTInitBuffer(Buffer b,uint32 f)701 GISTInitBuffer(Buffer b, uint32 f)
702 {
703 	GISTPageOpaque opaque;
704 	Page		page;
705 	Size		pageSize;
706 
707 	pageSize = BufferGetPageSize(b);
708 	page = BufferGetPage(b);
709 	PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
710 
711 	opaque = GistPageGetOpaque(page);
712 	/* page was already zeroed by PageInit, so this is not needed: */
713 	/* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
714 	opaque->rightlink = InvalidBlockNumber;
715 	opaque->flags = f;
716 	opaque->gist_page_id = GIST_PAGE_ID;
717 }
718 
719 /*
720  * Verify that a freshly-read page looks sane.
721  */
722 void
gistcheckpage(Relation rel,Buffer buf)723 gistcheckpage(Relation rel, Buffer buf)
724 {
725 	Page		page = BufferGetPage(buf);
726 
727 	/*
728 	 * ReadBuffer verifies that every newly-read page passes
729 	 * PageHeaderIsValid, which means it either contains a reasonably sane
730 	 * page header or is all-zero.  We have to defend against the all-zero
731 	 * case, however.
732 	 */
733 	if (PageIsNew(page))
734 		ereport(ERROR,
735 				(errcode(ERRCODE_INDEX_CORRUPTED),
736 				 errmsg("index \"%s\" contains unexpected zero page at block %u",
737 						RelationGetRelationName(rel),
738 						BufferGetBlockNumber(buf)),
739 				 errhint("Please REINDEX it.")));
740 
741 	/*
742 	 * Additionally check that the special area looks sane.
743 	 */
744 	if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
745 		ereport(ERROR,
746 				(errcode(ERRCODE_INDEX_CORRUPTED),
747 				 errmsg("index \"%s\" contains corrupted page at block %u",
748 						RelationGetRelationName(rel),
749 						BufferGetBlockNumber(buf)),
750 				 errhint("Please REINDEX it.")));
751 }
752 
753 
754 /*
755  * Allocate a new page (either by recycling, or by extending the index file)
756  *
757  * The returned buffer is already pinned and exclusive-locked
758  *
759  * Caller is responsible for initializing the page by calling GISTInitBuffer
760  */
761 Buffer
gistNewBuffer(Relation r)762 gistNewBuffer(Relation r)
763 {
764 	Buffer		buffer;
765 	bool		needLock;
766 
767 	/* First, try to get a page from FSM */
768 	for (;;)
769 	{
770 		BlockNumber blkno = GetFreeIndexPage(r);
771 
772 		if (blkno == InvalidBlockNumber)
773 			break;				/* nothing left in FSM */
774 
775 		buffer = ReadBuffer(r, blkno);
776 
777 		/*
778 		 * We have to guard against the possibility that someone else already
779 		 * recycled this page; the buffer may be locked if so.
780 		 */
781 		if (ConditionalLockBuffer(buffer))
782 		{
783 			Page		page = BufferGetPage(buffer);
784 
785 			if (PageIsNew(page))
786 				return buffer;	/* OK to use, if never initialized */
787 
788 			gistcheckpage(r, buffer);
789 
790 			if (GistPageIsDeleted(page))
791 				return buffer;	/* OK to use */
792 
793 			LockBuffer(buffer, GIST_UNLOCK);
794 		}
795 
796 		/* Can't use it, so release buffer and try again */
797 		ReleaseBuffer(buffer);
798 	}
799 
800 	/* Must extend the file */
801 	needLock = !RELATION_IS_LOCAL(r);
802 
803 	if (needLock)
804 		LockRelationForExtension(r, ExclusiveLock);
805 
806 	buffer = ReadBuffer(r, P_NEW);
807 	LockBuffer(buffer, GIST_EXCLUSIVE);
808 
809 	if (needLock)
810 		UnlockRelationForExtension(r, ExclusiveLock);
811 
812 	return buffer;
813 }
814 
815 bytea *
gistoptions(Datum reloptions,bool validate)816 gistoptions(Datum reloptions, bool validate)
817 {
818 	relopt_value *options;
819 	GiSTOptions *rdopts;
820 	int			numoptions;
821 	static const relopt_parse_elt tab[] = {
822 		{"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
823 		{"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
824 	};
825 
826 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
827 							  &numoptions);
828 
829 	/* if none set, we're done */
830 	if (numoptions == 0)
831 		return NULL;
832 
833 	rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
834 
835 	fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
836 				   validate, tab, lengthof(tab));
837 
838 	pfree(options);
839 
840 	return (bytea *) rdopts;
841 }
842 
843 /*
844  *	gistproperty() -- Check boolean properties of indexes.
845  *
846  * This is optional for most AMs, but is required for GiST because the core
847  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.  We also handle
848  * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
849  */
850 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)851 gistproperty(Oid index_oid, int attno,
852 			 IndexAMProperty prop, const char *propname,
853 			 bool *res, bool *isnull)
854 {
855 	HeapTuple	tuple;
856 	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
857 	Form_pg_opclass rd_opclass;
858 	Datum		datum;
859 	bool		disnull;
860 	oidvector  *indclass;
861 	Oid			opclass,
862 				opfamily,
863 				opcintype;
864 	int16		procno;
865 
866 	/* Only answer column-level inquiries */
867 	if (attno == 0)
868 		return false;
869 
870 	/*
871 	 * Currently, GiST distance-ordered scans require that there be a distance
872 	 * function in the opclass with the default types (i.e. the one loaded
873 	 * into the relcache entry, see initGISTstate).  So we assume that if such
874 	 * a function exists, then there's a reason for it (rather than grubbing
875 	 * through all the opfamily's operators to find an ordered one).
876 	 *
877 	 * Essentially the same code can test whether we support returning the
878 	 * column data, since that's true if the opclass provides a fetch proc.
879 	 */
880 
881 	switch (prop)
882 	{
883 		case AMPROP_DISTANCE_ORDERABLE:
884 			procno = GIST_DISTANCE_PROC;
885 			break;
886 		case AMPROP_RETURNABLE:
887 			procno = GIST_FETCH_PROC;
888 			break;
889 		default:
890 			return false;
891 	}
892 
893 	/* First we need to know the column's opclass. */
894 
895 	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
896 	if (!HeapTupleIsValid(tuple))
897 	{
898 		*isnull = true;
899 		return true;
900 	}
901 	rd_index = (Form_pg_index) GETSTRUCT(tuple);
902 
903 	/* caller is supposed to guarantee this */
904 	Assert(attno > 0 && attno <= rd_index->indnatts);
905 
906 	datum = SysCacheGetAttr(INDEXRELID, tuple,
907 							Anum_pg_index_indclass, &disnull);
908 	Assert(!disnull);
909 
910 	indclass = ((oidvector *) DatumGetPointer(datum));
911 	opclass = indclass->values[attno - 1];
912 
913 	ReleaseSysCache(tuple);
914 
915 	/* Now look up the opclass family and input datatype. */
916 
917 	tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
918 	if (!HeapTupleIsValid(tuple))
919 	{
920 		*isnull = true;
921 		return true;
922 	}
923 	rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
924 
925 	opfamily = rd_opclass->opcfamily;
926 	opcintype = rd_opclass->opcintype;
927 
928 	ReleaseSysCache(tuple);
929 
930 	/* And now we can check whether the function is provided. */
931 
932 	*res = SearchSysCacheExists4(AMPROCNUM,
933 								 ObjectIdGetDatum(opfamily),
934 								 ObjectIdGetDatum(opcintype),
935 								 ObjectIdGetDatum(opcintype),
936 								 Int16GetDatum(procno));
937 	return true;
938 }
939 
940 /*
941  * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
942  * to detect concurrent page splits anyway. This function provides a fake
943  * sequence of LSNs for that purpose.
944  */
945 XLogRecPtr
gistGetFakeLSN(Relation rel)946 gistGetFakeLSN(Relation rel)
947 {
948 	static XLogRecPtr counter = 1;
949 
950 	if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
951 	{
952 		/*
953 		 * Temporary relations are only accessible in our session, so a simple
954 		 * backend-local counter will do.
955 		 */
956 		return counter++;
957 	}
958 	else
959 	{
960 		/*
961 		 * Unlogged relations are accessible from other backends, and survive
962 		 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
963 		 */
964 		Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
965 		return GetFakeLSNForUnloggedRel();
966 	}
967 }
968