1 /*-------------------------------------------------------------------------
2 *
3 * gistutil.c
4 * utilities routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gistutil.c
12 *-------------------------------------------------------------------------
13 */
14 #include "postgres.h"
15
16 #include <math.h>
17
18 #include "access/gist_private.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "catalog/pg_opclass.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 #include "utils/float.h"
25 #include "utils/lsyscache.h"
26 #include "utils/snapmgr.h"
27 #include "utils/syscache.h"
28
29 /*
30 * Write itup vector to page, has no control of free space.
31 */
32 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)33 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
34 {
35 int i;
36
37 if (off == InvalidOffsetNumber)
38 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
39 OffsetNumberNext(PageGetMaxOffsetNumber(page));
40
41 for (i = 0; i < len; i++)
42 {
43 Size sz = IndexTupleSize(itup[i]);
44 OffsetNumber l;
45
46 l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
47 if (l == InvalidOffsetNumber)
48 elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
49 i, len, (int) sz);
50 off++;
51 }
52 }
53
54 /*
55 * Check space for itup vector on page
56 */
57 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)58 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
59 {
60 unsigned int size = freespace,
61 deleted = 0;
62 int i;
63
64 for (i = 0; i < len; i++)
65 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
66
67 if (todelete != InvalidOffsetNumber)
68 {
69 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
70
71 deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
72 }
73
74 return (PageGetFreeSpace(page) + deleted < size);
75 }
76
77 bool
gistfitpage(IndexTuple * itvec,int len)78 gistfitpage(IndexTuple *itvec, int len)
79 {
80 int i;
81 Size size = 0;
82
83 for (i = 0; i < len; i++)
84 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
85
86 /* TODO: Consider fillfactor */
87 return (size <= GiSTPageSize);
88 }
89
90 /*
91 * Read buffer into itup vector
92 */
93 IndexTuple *
gistextractpage(Page page,int * len)94 gistextractpage(Page page, int *len /* out */ )
95 {
96 OffsetNumber i,
97 maxoff;
98 IndexTuple *itvec;
99
100 maxoff = PageGetMaxOffsetNumber(page);
101 *len = maxoff;
102 itvec = palloc(sizeof(IndexTuple) * maxoff);
103 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
104 itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
105
106 return itvec;
107 }
108
109 /*
110 * join two vectors into one
111 */
112 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)113 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
114 {
115 itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
116 memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
117 *len += addlen;
118 return itvec;
119 }
120
121 /*
122 * make plain IndexTuple vector
123 */
124
125 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)126 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
127 {
128 char *ptr,
129 *ret;
130 int i;
131
132 *memlen = 0;
133
134 for (i = 0; i < veclen; i++)
135 *memlen += IndexTupleSize(vec[i]);
136
137 ptr = ret = palloc(*memlen);
138
139 for (i = 0; i < veclen; i++)
140 {
141 memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
142 ptr += IndexTupleSize(vec[i]);
143 }
144
145 return (IndexTupleData *) ret;
146 }
147
148 /*
149 * Make unions of keys in IndexTuple vector (one union datum per index column).
150 * Union Datums are returned into the attr/isnull arrays.
151 * Resulting Datums aren't compressed.
152 */
153 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)154 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
155 Datum *attr, bool *isnull)
156 {
157 int i;
158 GistEntryVector *evec;
159 int attrsize;
160
161 evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
162
163 for (i = 0; i < giststate->nonLeafTupdesc->natts; i++)
164 {
165 int j;
166
167 /* Collect non-null datums for this column */
168 evec->n = 0;
169 for (j = 0; j < len; j++)
170 {
171 Datum datum;
172 bool IsNull;
173
174 datum = index_getattr(itvec[j], i + 1, giststate->leafTupdesc,
175 &IsNull);
176 if (IsNull)
177 continue;
178
179 gistdentryinit(giststate, i,
180 evec->vector + evec->n,
181 datum,
182 NULL, NULL, (OffsetNumber) 0,
183 false, IsNull);
184 evec->n++;
185 }
186
187 /* If this column was all NULLs, the union is NULL */
188 if (evec->n == 0)
189 {
190 attr[i] = (Datum) 0;
191 isnull[i] = true;
192 }
193 else
194 {
195 if (evec->n == 1)
196 {
197 /* unionFn may expect at least two inputs */
198 evec->n = 2;
199 evec->vector[1] = evec->vector[0];
200 }
201
202 /* Make union and store in attr array */
203 attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
204 giststate->supportCollation[i],
205 PointerGetDatum(evec),
206 PointerGetDatum(&attrsize));
207
208 isnull[i] = false;
209 }
210 }
211 }
212
213 /*
214 * Return an IndexTuple containing the result of applying the "union"
215 * method to the specified IndexTuple vector.
216 */
217 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)218 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
219 {
220 Datum attr[INDEX_MAX_KEYS];
221 bool isnull[INDEX_MAX_KEYS];
222
223 gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
224
225 return gistFormTuple(giststate, r, attr, isnull, false);
226 }
227
228 /*
229 * makes union of two key
230 */
231 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)232 gistMakeUnionKey(GISTSTATE *giststate, int attno,
233 GISTENTRY *entry1, bool isnull1,
234 GISTENTRY *entry2, bool isnull2,
235 Datum *dst, bool *dstisnull)
236 {
237 /* we need a GistEntryVector with room for exactly 2 elements */
238 union
239 {
240 GistEntryVector gev;
241 char padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
242 } storage;
243 GistEntryVector *evec = &storage.gev;
244 int dstsize;
245
246 evec->n = 2;
247
248 if (isnull1 && isnull2)
249 {
250 *dstisnull = true;
251 *dst = (Datum) 0;
252 }
253 else
254 {
255 if (isnull1 == false && isnull2 == false)
256 {
257 evec->vector[0] = *entry1;
258 evec->vector[1] = *entry2;
259 }
260 else if (isnull1 == false)
261 {
262 evec->vector[0] = *entry1;
263 evec->vector[1] = *entry1;
264 }
265 else
266 {
267 evec->vector[0] = *entry2;
268 evec->vector[1] = *entry2;
269 }
270
271 *dstisnull = false;
272 *dst = FunctionCall2Coll(&giststate->unionFn[attno],
273 giststate->supportCollation[attno],
274 PointerGetDatum(evec),
275 PointerGetDatum(&dstsize));
276 }
277 }
278
279 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)280 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
281 {
282 bool result;
283
284 FunctionCall3Coll(&giststate->equalFn[attno],
285 giststate->supportCollation[attno],
286 a, b,
287 PointerGetDatum(&result));
288 return result;
289 }
290
291 /*
292 * Decompress all keys in tuple
293 */
294 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)295 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
296 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
297 {
298 int i;
299
300 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
301 {
302 Datum datum;
303
304 datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
305 gistdentryinit(giststate, i, &attdata[i],
306 datum, r, p, o,
307 false, isnull[i]);
308 }
309 }
310
311 /*
312 * Forms union of oldtup and addtup, if union == oldtup then return NULL
313 */
314 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)315 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
316 {
317 bool neednew = false;
318 GISTENTRY oldentries[INDEX_MAX_KEYS],
319 addentries[INDEX_MAX_KEYS];
320 bool oldisnull[INDEX_MAX_KEYS],
321 addisnull[INDEX_MAX_KEYS];
322 Datum attr[INDEX_MAX_KEYS];
323 bool isnull[INDEX_MAX_KEYS];
324 IndexTuple newtup = NULL;
325 int i;
326
327 gistDeCompressAtt(giststate, r, oldtup, NULL,
328 (OffsetNumber) 0, oldentries, oldisnull);
329
330 gistDeCompressAtt(giststate, r, addtup, NULL,
331 (OffsetNumber) 0, addentries, addisnull);
332
333 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
334 {
335 gistMakeUnionKey(giststate, i,
336 oldentries + i, oldisnull[i],
337 addentries + i, addisnull[i],
338 attr + i, isnull + i);
339
340 if (neednew)
341 /* we already need new key, so we can skip check */
342 continue;
343
344 if (isnull[i])
345 /* union of key may be NULL if and only if both keys are NULL */
346 continue;
347
348 if (!addisnull[i])
349 {
350 if (oldisnull[i] ||
351 !gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
352 neednew = true;
353 }
354 }
355
356 if (neednew)
357 {
358 /* need to update key */
359 newtup = gistFormTuple(giststate, r, attr, isnull, false);
360 newtup->t_tid = oldtup->t_tid;
361 }
362
363 return newtup;
364 }
365
366 /*
367 * Search an upper index page for the entry with lowest penalty for insertion
368 * of the new index key contained in "it".
369 *
370 * Returns the index of the page entry to insert into.
371 */
372 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)373 gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
374 GISTSTATE *giststate)
375 {
376 OffsetNumber result;
377 OffsetNumber maxoff;
378 OffsetNumber i;
379 float best_penalty[INDEX_MAX_KEYS];
380 GISTENTRY entry,
381 identry[INDEX_MAX_KEYS];
382 bool isnull[INDEX_MAX_KEYS];
383 int keep_current_best;
384
385 Assert(!GistPageIsLeaf(p));
386
387 gistDeCompressAtt(giststate, r,
388 it, NULL, (OffsetNumber) 0,
389 identry, isnull);
390
391 /* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
392 result = FirstOffsetNumber;
393
394 /*
395 * The index may have multiple columns, and there's a penalty value for
396 * each column. The penalty associated with a column that appears earlier
397 * in the index definition is strictly more important than the penalty of
398 * a column that appears later in the index definition.
399 *
400 * best_penalty[j] is the best penalty we have seen so far for column j,
401 * or -1 when we haven't yet examined column j. Array entries to the
402 * right of the first -1 are undefined.
403 */
404 best_penalty[0] = -1;
405
406 /*
407 * If we find a tuple that's exactly as good as the currently best one, we
408 * could use either one. When inserting a lot of tuples with the same or
409 * similar keys, it's preferable to descend down the same path when
410 * possible, as that's more cache-friendly. On the other hand, if all
411 * inserts land on the same leaf page after a split, we're never going to
412 * insert anything to the other half of the split, and will end up using
413 * only 50% of the available space. Distributing the inserts evenly would
414 * lead to better space usage, but that hurts cache-locality during
415 * insertion. To get the best of both worlds, when we find a tuple that's
416 * exactly as good as the previous best, choose randomly whether to stick
417 * to the old best, or use the new one. Once we decide to stick to the
418 * old best, we keep sticking to it for any subsequent equally good tuples
419 * we might find. This favors tuples with low offsets, but still allows
420 * some inserts to go to other equally-good subtrees.
421 *
422 * keep_current_best is -1 if we haven't yet had to make a random choice
423 * whether to keep the current best tuple. If we have done so, and
424 * decided to keep it, keep_current_best is 1; if we've decided to
425 * replace, keep_current_best is 0. (This state will be reset to -1 as
426 * soon as we've made the replacement, but sometimes we make the choice in
427 * advance of actually finding a replacement best tuple.)
428 */
429 keep_current_best = -1;
430
431 /*
432 * Loop over tuples on page.
433 */
434 maxoff = PageGetMaxOffsetNumber(p);
435 Assert(maxoff >= FirstOffsetNumber);
436
437 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
438 {
439 IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
440 bool zero_penalty;
441 int j;
442
443 zero_penalty = true;
444
445 /* Loop over index attributes. */
446 for (j = 0; j < IndexRelationGetNumberOfKeyAttributes(r); j++)
447 {
448 Datum datum;
449 float usize;
450 bool IsNull;
451
452 /* Compute penalty for this column. */
453 datum = index_getattr(itup, j + 1, giststate->leafTupdesc,
454 &IsNull);
455 gistdentryinit(giststate, j, &entry, datum, r, p, i,
456 false, IsNull);
457 usize = gistpenalty(giststate, j, &entry, IsNull,
458 &identry[j], isnull[j]);
459 if (usize > 0)
460 zero_penalty = false;
461
462 if (best_penalty[j] < 0 || usize < best_penalty[j])
463 {
464 /*
465 * New best penalty for column. Tentatively select this tuple
466 * as the target, and record the best penalty. Then reset the
467 * next column's penalty to "unknown" (and indirectly, the
468 * same for all the ones to its right). This will force us to
469 * adopt this tuple's penalty values as the best for all the
470 * remaining columns during subsequent loop iterations.
471 */
472 result = i;
473 best_penalty[j] = usize;
474
475 if (j < IndexRelationGetNumberOfKeyAttributes(r) - 1)
476 best_penalty[j + 1] = -1;
477
478 /* we have new best, so reset keep-it decision */
479 keep_current_best = -1;
480 }
481 else if (best_penalty[j] == usize)
482 {
483 /*
484 * The current tuple is exactly as good for this column as the
485 * best tuple seen so far. The next iteration of this loop
486 * will compare the next column.
487 */
488 }
489 else
490 {
491 /*
492 * The current tuple is worse for this column than the best
493 * tuple seen so far. Skip the remaining columns and move on
494 * to the next tuple, if any.
495 */
496 zero_penalty = false; /* so outer loop won't exit */
497 break;
498 }
499 }
500
501 /*
502 * If we looped past the last column, and did not update "result",
503 * then this tuple is exactly as good as the prior best tuple.
504 */
505 if (j == IndexRelationGetNumberOfKeyAttributes(r) && result != i)
506 {
507 if (keep_current_best == -1)
508 {
509 /* we didn't make the random choice yet for this old best */
510 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
511 }
512 if (keep_current_best == 0)
513 {
514 /* we choose to use the new tuple */
515 result = i;
516 /* choose again if there are even more exactly-as-good ones */
517 keep_current_best = -1;
518 }
519 }
520
521 /*
522 * If we find a tuple with zero penalty for all columns, and we've
523 * decided we don't want to search for another tuple with equal
524 * penalty, there's no need to examine remaining tuples; just break
525 * out of the loop and return it.
526 */
527 if (zero_penalty)
528 {
529 if (keep_current_best == -1)
530 {
531 /* we didn't make the random choice yet for this old best */
532 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
533 }
534 if (keep_current_best == 1)
535 break;
536 }
537 }
538
539 return result;
540 }
541
542 /*
543 * initialize a GiST entry with a decompressed version of key
544 */
545 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)546 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
547 Datum k, Relation r, Page pg, OffsetNumber o,
548 bool l, bool isNull)
549 {
550 if (!isNull)
551 {
552 GISTENTRY *dep;
553
554 gistentryinit(*e, k, r, pg, o, l);
555
556 /* there may not be a decompress function in opclass */
557 if (!OidIsValid(giststate->decompressFn[nkey].fn_oid))
558 return;
559
560 dep = (GISTENTRY *)
561 DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
562 giststate->supportCollation[nkey],
563 PointerGetDatum(e)));
564 /* decompressFn may just return the given pointer */
565 if (dep != e)
566 gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
567 dep->leafkey);
568 }
569 else
570 gistentryinit(*e, (Datum) 0, r, pg, o, l);
571 }
572
573 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum * attdata,bool * isnull,bool isleaf)574 gistFormTuple(GISTSTATE *giststate, Relation r,
575 Datum *attdata, bool *isnull, bool isleaf)
576 {
577 Datum compatt[INDEX_MAX_KEYS];
578 IndexTuple res;
579
580 gistCompressValues(giststate, r, attdata, isnull, isleaf, compatt);
581
582 res = index_form_tuple(isleaf ? giststate->leafTupdesc :
583 giststate->nonLeafTupdesc,
584 compatt, isnull);
585
586 /*
587 * The offset number on tuples on internal pages is unused. For historical
588 * reasons, it is set to 0xffff.
589 */
590 ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
591 return res;
592 }
593
594 void
gistCompressValues(GISTSTATE * giststate,Relation r,Datum * attdata,bool * isnull,bool isleaf,Datum * compatt)595 gistCompressValues(GISTSTATE *giststate, Relation r,
596 Datum *attdata, bool *isnull, bool isleaf, Datum *compatt)
597 {
598 int i;
599
600 /*
601 * Call the compress method on each attribute.
602 */
603 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
604 {
605 if (isnull[i])
606 compatt[i] = (Datum) 0;
607 else
608 {
609 GISTENTRY centry;
610 GISTENTRY *cep;
611
612 gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
613 isleaf);
614 /* there may not be a compress function in opclass */
615 if (OidIsValid(giststate->compressFn[i].fn_oid))
616 cep = (GISTENTRY *)
617 DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
618 giststate->supportCollation[i],
619 PointerGetDatum(¢ry)));
620 else
621 cep = ¢ry;
622 compatt[i] = cep->key;
623 }
624 }
625
626 if (isleaf)
627 {
628 /*
629 * Emplace each included attribute if any.
630 */
631 for (; i < r->rd_att->natts; i++)
632 {
633 if (isnull[i])
634 compatt[i] = (Datum) 0;
635 else
636 compatt[i] = attdata[i];
637 }
638 }
639 }
640
641 /*
642 * initialize a GiST entry with fetched value in key field
643 */
644 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)645 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
646 {
647 GISTENTRY fentry;
648 GISTENTRY *fep;
649
650 gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
651
652 fep = (GISTENTRY *)
653 DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
654 giststate->supportCollation[nkey],
655 PointerGetDatum(&fentry)));
656
657 /* fetchFn set 'key', return it to the caller */
658 return fep->key;
659 }
660
661 /*
662 * Fetch all keys in tuple.
663 * Returns a new HeapTuple containing the originally-indexed data.
664 */
665 HeapTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)666 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
667 {
668 MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
669 Datum fetchatt[INDEX_MAX_KEYS];
670 bool isnull[INDEX_MAX_KEYS];
671 int i;
672
673 for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
674 {
675 Datum datum;
676
677 datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
678
679 if (giststate->fetchFn[i].fn_oid != InvalidOid)
680 {
681 if (!isnull[i])
682 fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
683 else
684 fetchatt[i] = (Datum) 0;
685 }
686 else if (giststate->compressFn[i].fn_oid == InvalidOid)
687 {
688 /*
689 * If opclass does not provide compress method that could change
690 * original value, att is necessarily stored in original form.
691 */
692 if (!isnull[i])
693 fetchatt[i] = datum;
694 else
695 fetchatt[i] = (Datum) 0;
696 }
697 else
698 {
699 /*
700 * Index-only scans not supported for this column. Since the
701 * planner chose an index-only scan anyway, it is not interested
702 * in this column, and we can replace it with a NULL.
703 */
704 isnull[i] = true;
705 fetchatt[i] = (Datum) 0;
706 }
707 }
708
709 /*
710 * Get each included attribute.
711 */
712 for (; i < r->rd_att->natts; i++)
713 {
714 fetchatt[i] = index_getattr(tuple, i + 1, giststate->leafTupdesc,
715 &isnull[i]);
716 }
717 MemoryContextSwitchTo(oldcxt);
718
719 return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
720 }
721
722 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)723 gistpenalty(GISTSTATE *giststate, int attno,
724 GISTENTRY *orig, bool isNullOrig,
725 GISTENTRY *add, bool isNullAdd)
726 {
727 float penalty = 0.0;
728
729 if (giststate->penaltyFn[attno].fn_strict == false ||
730 (isNullOrig == false && isNullAdd == false))
731 {
732 FunctionCall3Coll(&giststate->penaltyFn[attno],
733 giststate->supportCollation[attno],
734 PointerGetDatum(orig),
735 PointerGetDatum(add),
736 PointerGetDatum(&penalty));
737 /* disallow negative or NaN penalty */
738 if (isnan(penalty) || penalty < 0.0)
739 penalty = 0.0;
740 }
741 else if (isNullOrig && isNullAdd)
742 penalty = 0.0;
743 else
744 {
745 /* try to prevent mixing null and non-null values */
746 penalty = get_float4_infinity();
747 }
748
749 return penalty;
750 }
751
752 /*
753 * Initialize a new index page
754 */
755 void
gistinitpage(Page page,uint32 f)756 gistinitpage(Page page, uint32 f)
757 {
758 GISTPageOpaque opaque;
759
760 PageInit(page, BLCKSZ, sizeof(GISTPageOpaqueData));
761
762 opaque = GistPageGetOpaque(page);
763 opaque->rightlink = InvalidBlockNumber;
764 opaque->flags = f;
765 opaque->gist_page_id = GIST_PAGE_ID;
766 }
767
768 /*
769 * Initialize a new index buffer
770 */
771 void
GISTInitBuffer(Buffer b,uint32 f)772 GISTInitBuffer(Buffer b, uint32 f)
773 {
774 Page page;
775
776 page = BufferGetPage(b);
777 gistinitpage(page, f);
778 }
779
780 /*
781 * Verify that a freshly-read page looks sane.
782 */
783 void
gistcheckpage(Relation rel,Buffer buf)784 gistcheckpage(Relation rel, Buffer buf)
785 {
786 Page page = BufferGetPage(buf);
787
788 /*
789 * ReadBuffer verifies that every newly-read page passes
790 * PageHeaderIsValid, which means it either contains a reasonably sane
791 * page header or is all-zero. We have to defend against the all-zero
792 * case, however.
793 */
794 if (PageIsNew(page))
795 ereport(ERROR,
796 (errcode(ERRCODE_INDEX_CORRUPTED),
797 errmsg("index \"%s\" contains unexpected zero page at block %u",
798 RelationGetRelationName(rel),
799 BufferGetBlockNumber(buf)),
800 errhint("Please REINDEX it.")));
801
802 /*
803 * Additionally check that the special area looks sane.
804 */
805 if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
806 ereport(ERROR,
807 (errcode(ERRCODE_INDEX_CORRUPTED),
808 errmsg("index \"%s\" contains corrupted page at block %u",
809 RelationGetRelationName(rel),
810 BufferGetBlockNumber(buf)),
811 errhint("Please REINDEX it.")));
812 }
813
814
815 /*
816 * Allocate a new page (either by recycling, or by extending the index file)
817 *
818 * The returned buffer is already pinned and exclusive-locked
819 *
820 * Caller is responsible for initializing the page by calling GISTInitBuffer
821 */
822 Buffer
gistNewBuffer(Relation r)823 gistNewBuffer(Relation r)
824 {
825 Buffer buffer;
826 bool needLock;
827
828 /* First, try to get a page from FSM */
829 for (;;)
830 {
831 BlockNumber blkno = GetFreeIndexPage(r);
832
833 if (blkno == InvalidBlockNumber)
834 break; /* nothing left in FSM */
835
836 buffer = ReadBuffer(r, blkno);
837
838 /*
839 * We have to guard against the possibility that someone else already
840 * recycled this page; the buffer may be locked if so.
841 */
842 if (ConditionalLockBuffer(buffer))
843 {
844 Page page = BufferGetPage(buffer);
845
846 /*
847 * If the page was never initialized, it's OK to use.
848 */
849 if (PageIsNew(page))
850 return buffer;
851
852 gistcheckpage(r, buffer);
853
854 /*
855 * Otherwise, recycle it if deleted, and too old to have any
856 * processes interested in it.
857 */
858 if (gistPageRecyclable(page))
859 {
860 /*
861 * If we are generating WAL for Hot Standby then create a WAL
862 * record that will allow us to conflict with queries running
863 * on standby, in case they have snapshots older than the
864 * page's deleteXid.
865 */
866 if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
867 gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
868
869 return buffer;
870 }
871
872 LockBuffer(buffer, GIST_UNLOCK);
873 }
874
875 /* Can't use it, so release buffer and try again */
876 ReleaseBuffer(buffer);
877 }
878
879 /* Must extend the file */
880 needLock = !RELATION_IS_LOCAL(r);
881
882 if (needLock)
883 LockRelationForExtension(r, ExclusiveLock);
884
885 buffer = ReadBuffer(r, P_NEW);
886 LockBuffer(buffer, GIST_EXCLUSIVE);
887
888 if (needLock)
889 UnlockRelationForExtension(r, ExclusiveLock);
890
891 return buffer;
892 }
893
894 /* Can this page be recycled yet? */
895 bool
gistPageRecyclable(Page page)896 gistPageRecyclable(Page page)
897 {
898 if (PageIsNew(page))
899 return true;
900 if (GistPageIsDeleted(page))
901 {
902 /*
903 * The page was deleted, but when? If it was just deleted, a scan
904 * might have seen the downlink to it, and will read the page later.
905 * As long as that can happen, we must keep the deleted page around as
906 * a tombstone.
907 *
908 * For that check if the deletion XID could still be visible to
909 * anyone. If not, then no scan that's still in progress could have
910 * seen its downlink, and we can recycle it.
911 */
912 FullTransactionId deletexid_full = GistPageGetDeleteXid(page);
913
914 return GlobalVisCheckRemovableFullXid(NULL, deletexid_full);
915 }
916 return false;
917 }
918
919 bytea *
gistoptions(Datum reloptions,bool validate)920 gistoptions(Datum reloptions, bool validate)
921 {
922 static const relopt_parse_elt tab[] = {
923 {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
924 {"buffering", RELOPT_TYPE_ENUM, offsetof(GiSTOptions, buffering_mode)}
925 };
926
927 return (bytea *) build_reloptions(reloptions, validate,
928 RELOPT_KIND_GIST,
929 sizeof(GiSTOptions),
930 tab, lengthof(tab));
931 }
932
933 /*
934 * gistproperty() -- Check boolean properties of indexes.
935 *
936 * This is optional for most AMs, but is required for GiST because the core
937 * property code doesn't support AMPROP_DISTANCE_ORDERABLE. We also handle
938 * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
939 */
940 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)941 gistproperty(Oid index_oid, int attno,
942 IndexAMProperty prop, const char *propname,
943 bool *res, bool *isnull)
944 {
945 Oid opclass,
946 opfamily,
947 opcintype;
948 int16 procno;
949
950 /* Only answer column-level inquiries */
951 if (attno == 0)
952 return false;
953
954 /*
955 * Currently, GiST distance-ordered scans require that there be a distance
956 * function in the opclass with the default types (i.e. the one loaded
957 * into the relcache entry, see initGISTstate). So we assume that if such
958 * a function exists, then there's a reason for it (rather than grubbing
959 * through all the opfamily's operators to find an ordered one).
960 *
961 * Essentially the same code can test whether we support returning the
962 * column data, since that's true if the opclass provides a fetch proc.
963 */
964
965 switch (prop)
966 {
967 case AMPROP_DISTANCE_ORDERABLE:
968 procno = GIST_DISTANCE_PROC;
969 break;
970 case AMPROP_RETURNABLE:
971 procno = GIST_FETCH_PROC;
972 break;
973 default:
974 return false;
975 }
976
977 /* First we need to know the column's opclass. */
978 opclass = get_index_column_opclass(index_oid, attno);
979 if (!OidIsValid(opclass))
980 {
981 *isnull = true;
982 return true;
983 }
984
985 /* Now look up the opclass family and input datatype. */
986 if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
987 {
988 *isnull = true;
989 return true;
990 }
991
992 /* And now we can check whether the function is provided. */
993
994 *res = SearchSysCacheExists4(AMPROCNUM,
995 ObjectIdGetDatum(opfamily),
996 ObjectIdGetDatum(opcintype),
997 ObjectIdGetDatum(opcintype),
998 Int16GetDatum(procno));
999
1000 /*
1001 * Special case: even without a fetch function, AMPROP_RETURNABLE is true
1002 * if the opclass has no compress function.
1003 */
1004 if (prop == AMPROP_RETURNABLE && !*res)
1005 {
1006 *res = !SearchSysCacheExists4(AMPROCNUM,
1007 ObjectIdGetDatum(opfamily),
1008 ObjectIdGetDatum(opcintype),
1009 ObjectIdGetDatum(opcintype),
1010 Int16GetDatum(GIST_COMPRESS_PROC));
1011 }
1012
1013 *isnull = false;
1014
1015 return true;
1016 }
1017
1018 /*
1019 * Some indexes are not WAL-logged, but we need LSNs to detect concurrent page
1020 * splits anyway. This function provides a fake sequence of LSNs for that
1021 * purpose.
1022 */
1023 XLogRecPtr
gistGetFakeLSN(Relation rel)1024 gistGetFakeLSN(Relation rel)
1025 {
1026 if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
1027 {
1028 /*
1029 * Temporary relations are only accessible in our session, so a simple
1030 * backend-local counter will do.
1031 */
1032 static XLogRecPtr counter = FirstNormalUnloggedLSN;
1033
1034 return counter++;
1035 }
1036 else if (RelationIsPermanent(rel))
1037 {
1038 /*
1039 * WAL-logging on this relation will start after commit, so its LSNs
1040 * must be distinct numbers smaller than the LSN at the next commit.
1041 * Emit a dummy WAL record if insert-LSN hasn't advanced after the
1042 * last call.
1043 */
1044 static XLogRecPtr lastlsn = InvalidXLogRecPtr;
1045 XLogRecPtr currlsn = GetXLogInsertRecPtr();
1046
1047 /* Shouldn't be called for WAL-logging relations */
1048 Assert(!RelationNeedsWAL(rel));
1049
1050 /* No need for an actual record if we already have a distinct LSN */
1051 if (!XLogRecPtrIsInvalid(lastlsn) && lastlsn == currlsn)
1052 currlsn = gistXLogAssignLSN();
1053
1054 lastlsn = currlsn;
1055 return currlsn;
1056 }
1057 else
1058 {
1059 /*
1060 * Unlogged relations are accessible from other backends, and survive
1061 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
1062 */
1063 Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
1064 return GetFakeLSNForUnloggedRel();
1065 }
1066 }
1067