1 /*-------------------------------------------------------------------------
2  *
3  * gistutil.c
4  *	  utilities routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/gist/gistutil.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include <math.h>
17 
18 #include "access/gist_private.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "catalog/pg_opclass.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 #include "utils/builtins.h"
25 #include "utils/syscache.h"
26 
27 
28 /*
29  * Write itup vector to page, has no control of free space.
30  */
31 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)32 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
33 {
34 	OffsetNumber l = InvalidOffsetNumber;
35 	int			i;
36 
37 	if (off == InvalidOffsetNumber)
38 		off = (PageIsEmpty(page)) ? FirstOffsetNumber :
39 			OffsetNumberNext(PageGetMaxOffsetNumber(page));
40 
41 	for (i = 0; i < len; i++)
42 	{
43 		Size		sz = IndexTupleSize(itup[i]);
44 
45 		l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
46 		if (l == InvalidOffsetNumber)
47 			elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
48 				 i, len, (int) sz);
49 		off++;
50 	}
51 }
52 
53 /*
54  * Check space for itup vector on page
55  */
56 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)57 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
58 {
59 	unsigned int size = freespace,
60 				deleted = 0;
61 	int			i;
62 
63 	for (i = 0; i < len; i++)
64 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
65 
66 	if (todelete != InvalidOffsetNumber)
67 	{
68 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
69 
70 		deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
71 	}
72 
73 	return (PageGetFreeSpace(page) + deleted < size);
74 }
75 
76 bool
gistfitpage(IndexTuple * itvec,int len)77 gistfitpage(IndexTuple *itvec, int len)
78 {
79 	int			i;
80 	Size		size = 0;
81 
82 	for (i = 0; i < len; i++)
83 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
84 
85 	/* TODO: Consider fillfactor */
86 	return (size <= GiSTPageSize);
87 }
88 
89 /*
90  * Read buffer into itup vector
91  */
92 IndexTuple *
gistextractpage(Page page,int * len)93 gistextractpage(Page page, int *len /* out */ )
94 {
95 	OffsetNumber i,
96 				maxoff;
97 	IndexTuple *itvec;
98 
99 	maxoff = PageGetMaxOffsetNumber(page);
100 	*len = maxoff;
101 	itvec = palloc(sizeof(IndexTuple) * maxoff);
102 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
103 		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
104 
105 	return itvec;
106 }
107 
108 /*
109  * join two vectors into one
110  */
111 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)112 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
113 {
114 	itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
115 	memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
116 	*len += addlen;
117 	return itvec;
118 }
119 
120 /*
121  * make plain IndexTupleVector
122  */
123 
124 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)125 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
126 {
127 	char	   *ptr,
128 			   *ret;
129 	int			i;
130 
131 	*memlen = 0;
132 
133 	for (i = 0; i < veclen; i++)
134 		*memlen += IndexTupleSize(vec[i]);
135 
136 	ptr = ret = palloc(*memlen);
137 
138 	for (i = 0; i < veclen; i++)
139 	{
140 		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
141 		ptr += IndexTupleSize(vec[i]);
142 	}
143 
144 	return (IndexTupleData *) ret;
145 }
146 
147 /*
148  * Make unions of keys in IndexTuple vector (one union datum per index column).
149  * Union Datums are returned into the attr/isnull arrays.
150  * Resulting Datums aren't compressed.
151  */
152 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)153 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
154 				   Datum *attr, bool *isnull)
155 {
156 	int			i;
157 	GistEntryVector *evec;
158 	int			attrsize;
159 
160 	evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
161 
162 	for (i = 0; i < giststate->tupdesc->natts; i++)
163 	{
164 		int			j;
165 
166 		/* Collect non-null datums for this column */
167 		evec->n = 0;
168 		for (j = 0; j < len; j++)
169 		{
170 			Datum		datum;
171 			bool		IsNull;
172 
173 			datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
174 			if (IsNull)
175 				continue;
176 
177 			gistdentryinit(giststate, i,
178 						   evec->vector + evec->n,
179 						   datum,
180 						   NULL, NULL, (OffsetNumber) 0,
181 						   FALSE, IsNull);
182 			evec->n++;
183 		}
184 
185 		/* If this column was all NULLs, the union is NULL */
186 		if (evec->n == 0)
187 		{
188 			attr[i] = (Datum) 0;
189 			isnull[i] = TRUE;
190 		}
191 		else
192 		{
193 			if (evec->n == 1)
194 			{
195 				/* unionFn may expect at least two inputs */
196 				evec->n = 2;
197 				evec->vector[1] = evec->vector[0];
198 			}
199 
200 			/* Make union and store in attr array */
201 			attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
202 										giststate->supportCollation[i],
203 										PointerGetDatum(evec),
204 										PointerGetDatum(&attrsize));
205 
206 			isnull[i] = FALSE;
207 		}
208 	}
209 }
210 
211 /*
212  * Return an IndexTuple containing the result of applying the "union"
213  * method to the specified IndexTuple vector.
214  */
215 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)216 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
217 {
218 	Datum		attr[INDEX_MAX_KEYS];
219 	bool		isnull[INDEX_MAX_KEYS];
220 
221 	gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
222 
223 	return gistFormTuple(giststate, r, attr, isnull, false);
224 }
225 
226 /*
227  * makes union of two key
228  */
229 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)230 gistMakeUnionKey(GISTSTATE *giststate, int attno,
231 				 GISTENTRY *entry1, bool isnull1,
232 				 GISTENTRY *entry2, bool isnull2,
233 				 Datum *dst, bool *dstisnull)
234 {
235 	/* we need a GistEntryVector with room for exactly 2 elements */
236 	union
237 	{
238 		GistEntryVector gev;
239 		char		padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
240 	}			storage;
241 	GistEntryVector *evec = &storage.gev;
242 	int			dstsize;
243 
244 	evec->n = 2;
245 
246 	if (isnull1 && isnull2)
247 	{
248 		*dstisnull = TRUE;
249 		*dst = (Datum) 0;
250 	}
251 	else
252 	{
253 		if (isnull1 == FALSE && isnull2 == FALSE)
254 		{
255 			evec->vector[0] = *entry1;
256 			evec->vector[1] = *entry2;
257 		}
258 		else if (isnull1 == FALSE)
259 		{
260 			evec->vector[0] = *entry1;
261 			evec->vector[1] = *entry1;
262 		}
263 		else
264 		{
265 			evec->vector[0] = *entry2;
266 			evec->vector[1] = *entry2;
267 		}
268 
269 		*dstisnull = FALSE;
270 		*dst = FunctionCall2Coll(&giststate->unionFn[attno],
271 								 giststate->supportCollation[attno],
272 								 PointerGetDatum(evec),
273 								 PointerGetDatum(&dstsize));
274 	}
275 }
276 
277 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)278 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
279 {
280 	bool		result;
281 
282 	FunctionCall3Coll(&giststate->equalFn[attno],
283 					  giststate->supportCollation[attno],
284 					  a, b,
285 					  PointerGetDatum(&result));
286 	return result;
287 }
288 
289 /*
290  * Decompress all keys in tuple
291  */
292 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)293 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
294 				  OffsetNumber o, GISTENTRY *attdata, bool *isnull)
295 {
296 	int			i;
297 
298 	for (i = 0; i < r->rd_att->natts; i++)
299 	{
300 		Datum		datum;
301 
302 		datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
303 		gistdentryinit(giststate, i, &attdata[i],
304 					   datum, r, p, o,
305 					   FALSE, isnull[i]);
306 	}
307 }
308 
309 /*
310  * Forms union of oldtup and addtup, if union == oldtup then return NULL
311  */
312 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)313 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
314 {
315 	bool		neednew = FALSE;
316 	GISTENTRY	oldentries[INDEX_MAX_KEYS],
317 				addentries[INDEX_MAX_KEYS];
318 	bool		oldisnull[INDEX_MAX_KEYS],
319 				addisnull[INDEX_MAX_KEYS];
320 	Datum		attr[INDEX_MAX_KEYS];
321 	bool		isnull[INDEX_MAX_KEYS];
322 	IndexTuple	newtup = NULL;
323 	int			i;
324 
325 	gistDeCompressAtt(giststate, r, oldtup, NULL,
326 					  (OffsetNumber) 0, oldentries, oldisnull);
327 
328 	gistDeCompressAtt(giststate, r, addtup, NULL,
329 					  (OffsetNumber) 0, addentries, addisnull);
330 
331 	for (i = 0; i < r->rd_att->natts; i++)
332 	{
333 		gistMakeUnionKey(giststate, i,
334 						 oldentries + i, oldisnull[i],
335 						 addentries + i, addisnull[i],
336 						 attr + i, isnull + i);
337 
338 		if (neednew)
339 			/* we already need new key, so we can skip check */
340 			continue;
341 
342 		if (isnull[i])
343 			/* union of key may be NULL if and only if both keys are NULL */
344 			continue;
345 
346 		if (!addisnull[i])
347 		{
348 			if (oldisnull[i] ||
349 				!gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
350 				neednew = true;
351 		}
352 	}
353 
354 	if (neednew)
355 	{
356 		/* need to update key */
357 		newtup = gistFormTuple(giststate, r, attr, isnull, false);
358 		newtup->t_tid = oldtup->t_tid;
359 	}
360 
361 	return newtup;
362 }
363 
364 /*
365  * Search an upper index page for the entry with lowest penalty for insertion
366  * of the new index key contained in "it".
367  *
368  * Returns the index of the page entry to insert into.
369  */
370 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)371 gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
372 		   GISTSTATE *giststate)
373 {
374 	OffsetNumber result;
375 	OffsetNumber maxoff;
376 	OffsetNumber i;
377 	float		best_penalty[INDEX_MAX_KEYS];
378 	GISTENTRY	entry,
379 				identry[INDEX_MAX_KEYS];
380 	bool		isnull[INDEX_MAX_KEYS];
381 	int			keep_current_best;
382 
383 	Assert(!GistPageIsLeaf(p));
384 
385 	gistDeCompressAtt(giststate, r,
386 					  it, NULL, (OffsetNumber) 0,
387 					  identry, isnull);
388 
389 	/* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
390 	result = FirstOffsetNumber;
391 
392 	/*
393 	 * The index may have multiple columns, and there's a penalty value for
394 	 * each column.  The penalty associated with a column that appears earlier
395 	 * in the index definition is strictly more important than the penalty of
396 	 * a column that appears later in the index definition.
397 	 *
398 	 * best_penalty[j] is the best penalty we have seen so far for column j,
399 	 * or -1 when we haven't yet examined column j.  Array entries to the
400 	 * right of the first -1 are undefined.
401 	 */
402 	best_penalty[0] = -1;
403 
404 	/*
405 	 * If we find a tuple that's exactly as good as the currently best one, we
406 	 * could use either one.  When inserting a lot of tuples with the same or
407 	 * similar keys, it's preferable to descend down the same path when
408 	 * possible, as that's more cache-friendly.  On the other hand, if all
409 	 * inserts land on the same leaf page after a split, we're never going to
410 	 * insert anything to the other half of the split, and will end up using
411 	 * only 50% of the available space.  Distributing the inserts evenly would
412 	 * lead to better space usage, but that hurts cache-locality during
413 	 * insertion.  To get the best of both worlds, when we find a tuple that's
414 	 * exactly as good as the previous best, choose randomly whether to stick
415 	 * to the old best, or use the new one.  Once we decide to stick to the
416 	 * old best, we keep sticking to it for any subsequent equally good tuples
417 	 * we might find.  This favors tuples with low offsets, but still allows
418 	 * some inserts to go to other equally-good subtrees.
419 	 *
420 	 * keep_current_best is -1 if we haven't yet had to make a random choice
421 	 * whether to keep the current best tuple.  If we have done so, and
422 	 * decided to keep it, keep_current_best is 1; if we've decided to
423 	 * replace, keep_current_best is 0.  (This state will be reset to -1 as
424 	 * soon as we've made the replacement, but sometimes we make the choice in
425 	 * advance of actually finding a replacement best tuple.)
426 	 */
427 	keep_current_best = -1;
428 
429 	/*
430 	 * Loop over tuples on page.
431 	 */
432 	maxoff = PageGetMaxOffsetNumber(p);
433 	Assert(maxoff >= FirstOffsetNumber);
434 
435 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
436 	{
437 		IndexTuple	itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
438 		bool		zero_penalty;
439 		int			j;
440 
441 		zero_penalty = true;
442 
443 		/* Loop over index attributes. */
444 		for (j = 0; j < r->rd_att->natts; j++)
445 		{
446 			Datum		datum;
447 			float		usize;
448 			bool		IsNull;
449 
450 			/* Compute penalty for this column. */
451 			datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
452 			gistdentryinit(giststate, j, &entry, datum, r, p, i,
453 						   FALSE, IsNull);
454 			usize = gistpenalty(giststate, j, &entry, IsNull,
455 								&identry[j], isnull[j]);
456 			if (usize > 0)
457 				zero_penalty = false;
458 
459 			if (best_penalty[j] < 0 || usize < best_penalty[j])
460 			{
461 				/*
462 				 * New best penalty for column.  Tentatively select this tuple
463 				 * as the target, and record the best penalty.  Then reset the
464 				 * next column's penalty to "unknown" (and indirectly, the
465 				 * same for all the ones to its right).  This will force us to
466 				 * adopt this tuple's penalty values as the best for all the
467 				 * remaining columns during subsequent loop iterations.
468 				 */
469 				result = i;
470 				best_penalty[j] = usize;
471 
472 				if (j < r->rd_att->natts - 1)
473 					best_penalty[j + 1] = -1;
474 
475 				/* we have new best, so reset keep-it decision */
476 				keep_current_best = -1;
477 			}
478 			else if (best_penalty[j] == usize)
479 			{
480 				/*
481 				 * The current tuple is exactly as good for this column as the
482 				 * best tuple seen so far.  The next iteration of this loop
483 				 * will compare the next column.
484 				 */
485 			}
486 			else
487 			{
488 				/*
489 				 * The current tuple is worse for this column than the best
490 				 * tuple seen so far.  Skip the remaining columns and move on
491 				 * to the next tuple, if any.
492 				 */
493 				zero_penalty = false;	/* so outer loop won't exit */
494 				break;
495 			}
496 		}
497 
498 		/*
499 		 * If we looped past the last column, and did not update "result",
500 		 * then this tuple is exactly as good as the prior best tuple.
501 		 */
502 		if (j == r->rd_att->natts && result != i)
503 		{
504 			if (keep_current_best == -1)
505 			{
506 				/* we didn't make the random choice yet for this old best */
507 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
508 			}
509 			if (keep_current_best == 0)
510 			{
511 				/* we choose to use the new tuple */
512 				result = i;
513 				/* choose again if there are even more exactly-as-good ones */
514 				keep_current_best = -1;
515 			}
516 		}
517 
518 		/*
519 		 * If we find a tuple with zero penalty for all columns, and we've
520 		 * decided we don't want to search for another tuple with equal
521 		 * penalty, there's no need to examine remaining tuples; just break
522 		 * out of the loop and return it.
523 		 */
524 		if (zero_penalty)
525 		{
526 			if (keep_current_best == -1)
527 			{
528 				/* we didn't make the random choice yet for this old best */
529 				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
530 			}
531 			if (keep_current_best == 1)
532 				break;
533 		}
534 	}
535 
536 	return result;
537 }
538 
539 /*
540  * initialize a GiST entry with a decompressed version of key
541  */
542 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)543 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
544 			   Datum k, Relation r, Page pg, OffsetNumber o,
545 			   bool l, bool isNull)
546 {
547 	if (!isNull)
548 	{
549 		GISTENTRY  *dep;
550 
551 		gistentryinit(*e, k, r, pg, o, l);
552 		dep = (GISTENTRY *)
553 			DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
554 										   giststate->supportCollation[nkey],
555 											  PointerGetDatum(e)));
556 		/* decompressFn may just return the given pointer */
557 		if (dep != e)
558 			gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
559 						  dep->leafkey);
560 	}
561 	else
562 		gistentryinit(*e, (Datum) 0, r, pg, o, l);
563 }
564 
565 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum attdata[],bool isnull[],bool isleaf)566 gistFormTuple(GISTSTATE *giststate, Relation r,
567 			  Datum attdata[], bool isnull[], bool isleaf)
568 {
569 	Datum		compatt[INDEX_MAX_KEYS];
570 	int			i;
571 	IndexTuple	res;
572 
573 	/*
574 	 * Call the compress method on each attribute.
575 	 */
576 	for (i = 0; i < r->rd_att->natts; i++)
577 	{
578 		if (isnull[i])
579 			compatt[i] = (Datum) 0;
580 		else
581 		{
582 			GISTENTRY	centry;
583 			GISTENTRY  *cep;
584 
585 			gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
586 						  isleaf);
587 			cep = (GISTENTRY *)
588 				DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
589 											  giststate->supportCollation[i],
590 												  PointerGetDatum(&centry)));
591 			compatt[i] = cep->key;
592 		}
593 	}
594 
595 	res = index_form_tuple(giststate->tupdesc, compatt, isnull);
596 
597 	/*
598 	 * The offset number on tuples on internal pages is unused. For historical
599 	 * reasons, it is set to 0xffff.
600 	 */
601 	ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
602 	return res;
603 }
604 
605 /*
606  * initialize a GiST entry with fetched value in key field
607  */
608 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)609 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
610 {
611 	GISTENTRY	fentry;
612 	GISTENTRY  *fep;
613 
614 	gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
615 
616 	fep = (GISTENTRY *)
617 		DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
618 										  giststate->supportCollation[nkey],
619 										  PointerGetDatum(&fentry)));
620 
621 	/* fetchFn set 'key', return it to the caller */
622 	return fep->key;
623 }
624 
625 /*
626  * Fetch all keys in tuple.
627  * returns new IndexTuple that contains GISTENTRY with fetched data
628  */
629 IndexTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)630 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
631 {
632 	MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
633 	Datum		fetchatt[INDEX_MAX_KEYS];
634 	bool		isnull[INDEX_MAX_KEYS];
635 	int			i;
636 
637 	for (i = 0; i < r->rd_att->natts; i++)
638 	{
639 		Datum		datum;
640 
641 		datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
642 
643 		if (giststate->fetchFn[i].fn_oid != InvalidOid)
644 		{
645 			if (!isnull[i])
646 				fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
647 			else
648 				fetchatt[i] = (Datum) 0;
649 		}
650 		else
651 		{
652 			/*
653 			 * Index-only scans not supported for this column. Since the
654 			 * planner chose an index-only scan anyway, it is not interested
655 			 * in this column, and we can replace it with a NULL.
656 			 */
657 			isnull[i] = true;
658 			fetchatt[i] = (Datum) 0;
659 		}
660 	}
661 	MemoryContextSwitchTo(oldcxt);
662 
663 	return index_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
664 }
665 
666 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)667 gistpenalty(GISTSTATE *giststate, int attno,
668 			GISTENTRY *orig, bool isNullOrig,
669 			GISTENTRY *add, bool isNullAdd)
670 {
671 	float		penalty = 0.0;
672 
673 	if (giststate->penaltyFn[attno].fn_strict == FALSE ||
674 		(isNullOrig == FALSE && isNullAdd == FALSE))
675 	{
676 		FunctionCall3Coll(&giststate->penaltyFn[attno],
677 						  giststate->supportCollation[attno],
678 						  PointerGetDatum(orig),
679 						  PointerGetDatum(add),
680 						  PointerGetDatum(&penalty));
681 		/* disallow negative or NaN penalty */
682 		if (isnan(penalty) || penalty < 0.0)
683 			penalty = 0.0;
684 	}
685 	else if (isNullOrig && isNullAdd)
686 		penalty = 0.0;
687 	else
688 	{
689 		/* try to prevent mixing null and non-null values */
690 		penalty = get_float4_infinity();
691 	}
692 
693 	return penalty;
694 }
695 
696 /*
697  * Initialize a new index page
698  */
699 void
GISTInitBuffer(Buffer b,uint32 f)700 GISTInitBuffer(Buffer b, uint32 f)
701 {
702 	GISTPageOpaque opaque;
703 	Page		page;
704 	Size		pageSize;
705 
706 	pageSize = BufferGetPageSize(b);
707 	page = BufferGetPage(b);
708 	PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
709 
710 	opaque = GistPageGetOpaque(page);
711 	/* page was already zeroed by PageInit, so this is not needed: */
712 	/* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
713 	opaque->rightlink = InvalidBlockNumber;
714 	opaque->flags = f;
715 	opaque->gist_page_id = GIST_PAGE_ID;
716 }
717 
718 /*
719  * Verify that a freshly-read page looks sane.
720  */
721 void
gistcheckpage(Relation rel,Buffer buf)722 gistcheckpage(Relation rel, Buffer buf)
723 {
724 	Page		page = BufferGetPage(buf);
725 
726 	/*
727 	 * ReadBuffer verifies that every newly-read page passes
728 	 * PageHeaderIsValid, which means it either contains a reasonably sane
729 	 * page header or is all-zero.  We have to defend against the all-zero
730 	 * case, however.
731 	 */
732 	if (PageIsNew(page))
733 		ereport(ERROR,
734 				(errcode(ERRCODE_INDEX_CORRUPTED),
735 			 errmsg("index \"%s\" contains unexpected zero page at block %u",
736 					RelationGetRelationName(rel),
737 					BufferGetBlockNumber(buf)),
738 				 errhint("Please REINDEX it.")));
739 
740 	/*
741 	 * Additionally check that the special area looks sane.
742 	 */
743 	if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
744 		ereport(ERROR,
745 				(errcode(ERRCODE_INDEX_CORRUPTED),
746 				 errmsg("index \"%s\" contains corrupted page at block %u",
747 						RelationGetRelationName(rel),
748 						BufferGetBlockNumber(buf)),
749 				 errhint("Please REINDEX it.")));
750 }
751 
752 
753 /*
754  * Allocate a new page (either by recycling, or by extending the index file)
755  *
756  * The returned buffer is already pinned and exclusive-locked
757  *
758  * Caller is responsible for initializing the page by calling GISTInitBuffer
759  */
760 Buffer
gistNewBuffer(Relation r)761 gistNewBuffer(Relation r)
762 {
763 	Buffer		buffer;
764 	bool		needLock;
765 
766 	/* First, try to get a page from FSM */
767 	for (;;)
768 	{
769 		BlockNumber blkno = GetFreeIndexPage(r);
770 
771 		if (blkno == InvalidBlockNumber)
772 			break;				/* nothing left in FSM */
773 
774 		buffer = ReadBuffer(r, blkno);
775 
776 		/*
777 		 * We have to guard against the possibility that someone else already
778 		 * recycled this page; the buffer may be locked if so.
779 		 */
780 		if (ConditionalLockBuffer(buffer))
781 		{
782 			Page		page = BufferGetPage(buffer);
783 
784 			if (PageIsNew(page))
785 				return buffer;	/* OK to use, if never initialized */
786 
787 			gistcheckpage(r, buffer);
788 
789 			if (GistPageIsDeleted(page))
790 				return buffer;	/* OK to use */
791 
792 			LockBuffer(buffer, GIST_UNLOCK);
793 		}
794 
795 		/* Can't use it, so release buffer and try again */
796 		ReleaseBuffer(buffer);
797 	}
798 
799 	/* Must extend the file */
800 	needLock = !RELATION_IS_LOCAL(r);
801 
802 	if (needLock)
803 		LockRelationForExtension(r, ExclusiveLock);
804 
805 	buffer = ReadBuffer(r, P_NEW);
806 	LockBuffer(buffer, GIST_EXCLUSIVE);
807 
808 	if (needLock)
809 		UnlockRelationForExtension(r, ExclusiveLock);
810 
811 	return buffer;
812 }
813 
814 bytea *
gistoptions(Datum reloptions,bool validate)815 gistoptions(Datum reloptions, bool validate)
816 {
817 	relopt_value *options;
818 	GiSTOptions *rdopts;
819 	int			numoptions;
820 	static const relopt_parse_elt tab[] = {
821 		{"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
822 		{"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
823 	};
824 
825 	options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
826 							  &numoptions);
827 
828 	/* if none set, we're done */
829 	if (numoptions == 0)
830 		return NULL;
831 
832 	rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
833 
834 	fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
835 				   validate, tab, lengthof(tab));
836 
837 	pfree(options);
838 
839 	return (bytea *) rdopts;
840 }
841 
842 /*
843  *	gistproperty() -- Check boolean properties of indexes.
844  *
845  * This is optional for most AMs, but is required for GiST because the core
846  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.  We also handle
847  * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
848  */
849 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)850 gistproperty(Oid index_oid, int attno,
851 			 IndexAMProperty prop, const char *propname,
852 			 bool *res, bool *isnull)
853 {
854 	HeapTuple	tuple;
855 	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
856 	Form_pg_opclass rd_opclass;
857 	Datum		datum;
858 	bool		disnull;
859 	oidvector  *indclass;
860 	Oid			opclass,
861 				opfamily,
862 				opcintype;
863 	int16		procno;
864 
865 	/* Only answer column-level inquiries */
866 	if (attno == 0)
867 		return false;
868 
869 	/*
870 	 * Currently, GiST distance-ordered scans require that there be a distance
871 	 * function in the opclass with the default types (i.e. the one loaded
872 	 * into the relcache entry, see initGISTstate).  So we assume that if such
873 	 * a function exists, then there's a reason for it (rather than grubbing
874 	 * through all the opfamily's operators to find an ordered one).
875 	 *
876 	 * Essentially the same code can test whether we support returning the
877 	 * column data, since that's true if the opclass provides a fetch proc.
878 	 */
879 
880 	switch (prop)
881 	{
882 		case AMPROP_DISTANCE_ORDERABLE:
883 			procno = GIST_DISTANCE_PROC;
884 			break;
885 		case AMPROP_RETURNABLE:
886 			procno = GIST_FETCH_PROC;
887 			break;
888 		default:
889 			return false;
890 	}
891 
892 	/* First we need to know the column's opclass. */
893 
894 	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
895 	if (!HeapTupleIsValid(tuple))
896 	{
897 		*isnull = true;
898 		return true;
899 	}
900 	rd_index = (Form_pg_index) GETSTRUCT(tuple);
901 
902 	/* caller is supposed to guarantee this */
903 	Assert(attno > 0 && attno <= rd_index->indnatts);
904 
905 	datum = SysCacheGetAttr(INDEXRELID, tuple,
906 							Anum_pg_index_indclass, &disnull);
907 	Assert(!disnull);
908 
909 	indclass = ((oidvector *) DatumGetPointer(datum));
910 	opclass = indclass->values[attno - 1];
911 
912 	ReleaseSysCache(tuple);
913 
914 	/* Now look up the opclass family and input datatype. */
915 
916 	tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
917 	if (!HeapTupleIsValid(tuple))
918 	{
919 		*isnull = true;
920 		return true;
921 	}
922 	rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
923 
924 	opfamily = rd_opclass->opcfamily;
925 	opcintype = rd_opclass->opcintype;
926 
927 	ReleaseSysCache(tuple);
928 
929 	/* And now we can check whether the function is provided. */
930 
931 	*res = SearchSysCacheExists4(AMPROCNUM,
932 								 ObjectIdGetDatum(opfamily),
933 								 ObjectIdGetDatum(opcintype),
934 								 ObjectIdGetDatum(opcintype),
935 								 Int16GetDatum(procno));
936 	return true;
937 }
938 
939 /*
940  * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
941  * to detect concurrent page splits anyway. This function provides a fake
942  * sequence of LSNs for that purpose.
943  */
944 XLogRecPtr
gistGetFakeLSN(Relation rel)945 gistGetFakeLSN(Relation rel)
946 {
947 	static XLogRecPtr counter = 1;
948 
949 	if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
950 	{
951 		/*
952 		 * Temporary relations are only accessible in our session, so a simple
953 		 * backend-local counter will do.
954 		 */
955 		return counter++;
956 	}
957 	else
958 	{
959 		/*
960 		 * Unlogged relations are accessible from other backends, and survive
961 		 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
962 		 */
963 		Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
964 		return GetFakeLSNForUnloggedRel();
965 	}
966 }
967