1 /*-------------------------------------------------------------------------
2 *
3 * gistutil.c
4 * utilities routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gistutil.c
12 *-------------------------------------------------------------------------
13 */
14 #include "postgres.h"
15
16 #include <float.h>
17 #include <math.h>
18
19 #include "access/gist_private.h"
20 #include "access/htup_details.h"
21 #include "access/reloptions.h"
22 #include "catalog/pg_opclass.h"
23 #include "storage/indexfsm.h"
24 #include "storage/lmgr.h"
25 #include "utils/builtins.h"
26 #include "utils/syscache.h"
27
28
29 /*
30 * Write itup vector to page, has no control of free space.
31 */
32 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)33 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
34 {
35 OffsetNumber l = InvalidOffsetNumber;
36 int i;
37
38 if (off == InvalidOffsetNumber)
39 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
40 OffsetNumberNext(PageGetMaxOffsetNumber(page));
41
42 for (i = 0; i < len; i++)
43 {
44 Size sz = IndexTupleSize(itup[i]);
45
46 l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
47 if (l == InvalidOffsetNumber)
48 elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
49 i, len, (int) sz);
50 off++;
51 }
52 }
53
54 /*
55 * Check space for itup vector on page
56 */
57 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)58 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
59 {
60 unsigned int size = freespace,
61 deleted = 0;
62 int i;
63
64 for (i = 0; i < len; i++)
65 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
66
67 if (todelete != InvalidOffsetNumber)
68 {
69 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
70
71 deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
72 }
73
74 return (PageGetFreeSpace(page) + deleted < size);
75 }
76
77 bool
gistfitpage(IndexTuple * itvec,int len)78 gistfitpage(IndexTuple *itvec, int len)
79 {
80 int i;
81 Size size = 0;
82
83 for (i = 0; i < len; i++)
84 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
85
86 /* TODO: Consider fillfactor */
87 return (size <= GiSTPageSize);
88 }
89
90 /*
91 * Read buffer into itup vector
92 */
93 IndexTuple *
gistextractpage(Page page,int * len)94 gistextractpage(Page page, int *len /* out */ )
95 {
96 OffsetNumber i,
97 maxoff;
98 IndexTuple *itvec;
99
100 maxoff = PageGetMaxOffsetNumber(page);
101 *len = maxoff;
102 itvec = palloc(sizeof(IndexTuple) * maxoff);
103 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
104 itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
105
106 return itvec;
107 }
108
109 /*
110 * join two vectors into one
111 */
112 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)113 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
114 {
115 itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
116 memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
117 *len += addlen;
118 return itvec;
119 }
120
121 /*
122 * make plain IndexTupleVector
123 */
124
125 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)126 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
127 {
128 char *ptr,
129 *ret;
130 int i;
131
132 *memlen = 0;
133
134 for (i = 0; i < veclen; i++)
135 *memlen += IndexTupleSize(vec[i]);
136
137 ptr = ret = palloc(*memlen);
138
139 for (i = 0; i < veclen; i++)
140 {
141 memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
142 ptr += IndexTupleSize(vec[i]);
143 }
144
145 return (IndexTupleData *) ret;
146 }
147
148 /*
149 * Make unions of keys in IndexTuple vector (one union datum per index column).
150 * Union Datums are returned into the attr/isnull arrays.
151 * Resulting Datums aren't compressed.
152 */
153 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)154 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
155 Datum *attr, bool *isnull)
156 {
157 int i;
158 GistEntryVector *evec;
159 int attrsize;
160
161 evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
162
163 for (i = 0; i < giststate->tupdesc->natts; i++)
164 {
165 int j;
166
167 /* Collect non-null datums for this column */
168 evec->n = 0;
169 for (j = 0; j < len; j++)
170 {
171 Datum datum;
172 bool IsNull;
173
174 datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
175 if (IsNull)
176 continue;
177
178 gistdentryinit(giststate, i,
179 evec->vector + evec->n,
180 datum,
181 NULL, NULL, (OffsetNumber) 0,
182 FALSE, IsNull);
183 evec->n++;
184 }
185
186 /* If this column was all NULLs, the union is NULL */
187 if (evec->n == 0)
188 {
189 attr[i] = (Datum) 0;
190 isnull[i] = TRUE;
191 }
192 else
193 {
194 if (evec->n == 1)
195 {
196 /* unionFn may expect at least two inputs */
197 evec->n = 2;
198 evec->vector[1] = evec->vector[0];
199 }
200
201 /* Make union and store in attr array */
202 attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
203 giststate->supportCollation[i],
204 PointerGetDatum(evec),
205 PointerGetDatum(&attrsize));
206
207 isnull[i] = FALSE;
208 }
209 }
210 }
211
212 /*
213 * Return an IndexTuple containing the result of applying the "union"
214 * method to the specified IndexTuple vector.
215 */
216 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)217 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
218 {
219 Datum attr[INDEX_MAX_KEYS];
220 bool isnull[INDEX_MAX_KEYS];
221
222 gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
223
224 return gistFormTuple(giststate, r, attr, isnull, false);
225 }
226
227 /*
228 * makes union of two key
229 */
230 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)231 gistMakeUnionKey(GISTSTATE *giststate, int attno,
232 GISTENTRY *entry1, bool isnull1,
233 GISTENTRY *entry2, bool isnull2,
234 Datum *dst, bool *dstisnull)
235 {
236 /* we need a GistEntryVector with room for exactly 2 elements */
237 union
238 {
239 GistEntryVector gev;
240 char padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
241 } storage;
242 GistEntryVector *evec = &storage.gev;
243 int dstsize;
244
245 evec->n = 2;
246
247 if (isnull1 && isnull2)
248 {
249 *dstisnull = TRUE;
250 *dst = (Datum) 0;
251 }
252 else
253 {
254 if (isnull1 == FALSE && isnull2 == FALSE)
255 {
256 evec->vector[0] = *entry1;
257 evec->vector[1] = *entry2;
258 }
259 else if (isnull1 == FALSE)
260 {
261 evec->vector[0] = *entry1;
262 evec->vector[1] = *entry1;
263 }
264 else
265 {
266 evec->vector[0] = *entry2;
267 evec->vector[1] = *entry2;
268 }
269
270 *dstisnull = FALSE;
271 *dst = FunctionCall2Coll(&giststate->unionFn[attno],
272 giststate->supportCollation[attno],
273 PointerGetDatum(evec),
274 PointerGetDatum(&dstsize));
275 }
276 }
277
278 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)279 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
280 {
281 bool result;
282
283 FunctionCall3Coll(&giststate->equalFn[attno],
284 giststate->supportCollation[attno],
285 a, b,
286 PointerGetDatum(&result));
287 return result;
288 }
289
290 /*
291 * Decompress all keys in tuple
292 */
293 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)294 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
295 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
296 {
297 int i;
298
299 for (i = 0; i < r->rd_att->natts; i++)
300 {
301 Datum datum;
302
303 datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
304 gistdentryinit(giststate, i, &attdata[i],
305 datum, r, p, o,
306 FALSE, isnull[i]);
307 }
308 }
309
310 /*
311 * Forms union of oldtup and addtup, if union == oldtup then return NULL
312 */
313 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)314 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
315 {
316 bool neednew = FALSE;
317 GISTENTRY oldentries[INDEX_MAX_KEYS],
318 addentries[INDEX_MAX_KEYS];
319 bool oldisnull[INDEX_MAX_KEYS],
320 addisnull[INDEX_MAX_KEYS];
321 Datum attr[INDEX_MAX_KEYS];
322 bool isnull[INDEX_MAX_KEYS];
323 IndexTuple newtup = NULL;
324 int i;
325
326 gistDeCompressAtt(giststate, r, oldtup, NULL,
327 (OffsetNumber) 0, oldentries, oldisnull);
328
329 gistDeCompressAtt(giststate, r, addtup, NULL,
330 (OffsetNumber) 0, addentries, addisnull);
331
332 for (i = 0; i < r->rd_att->natts; i++)
333 {
334 gistMakeUnionKey(giststate, i,
335 oldentries + i, oldisnull[i],
336 addentries + i, addisnull[i],
337 attr + i, isnull + i);
338
339 if (neednew)
340 /* we already need new key, so we can skip check */
341 continue;
342
343 if (isnull[i])
344 /* union of key may be NULL if and only if both keys are NULL */
345 continue;
346
347 if (!addisnull[i])
348 {
349 if (oldisnull[i] ||
350 !gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
351 neednew = true;
352 }
353 }
354
355 if (neednew)
356 {
357 /* need to update key */
358 newtup = gistFormTuple(giststate, r, attr, isnull, false);
359 newtup->t_tid = oldtup->t_tid;
360 }
361
362 return newtup;
363 }
364
365 /*
366 * Search an upper index page for the entry with lowest penalty for insertion
367 * of the new index key contained in "it".
368 *
369 * Returns the index of the page entry to insert into.
370 */
371 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)372 gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
373 GISTSTATE *giststate)
374 {
375 OffsetNumber result;
376 OffsetNumber maxoff;
377 OffsetNumber i;
378 float best_penalty[INDEX_MAX_KEYS];
379 GISTENTRY entry,
380 identry[INDEX_MAX_KEYS];
381 bool isnull[INDEX_MAX_KEYS];
382 int keep_current_best;
383
384 Assert(!GistPageIsLeaf(p));
385
386 gistDeCompressAtt(giststate, r,
387 it, NULL, (OffsetNumber) 0,
388 identry, isnull);
389
390 /* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
391 result = FirstOffsetNumber;
392
393 /*
394 * The index may have multiple columns, and there's a penalty value for
395 * each column. The penalty associated with a column that appears earlier
396 * in the index definition is strictly more important than the penalty of
397 * a column that appears later in the index definition.
398 *
399 * best_penalty[j] is the best penalty we have seen so far for column j,
400 * or -1 when we haven't yet examined column j. Array entries to the
401 * right of the first -1 are undefined.
402 */
403 best_penalty[0] = -1;
404
405 /*
406 * If we find a tuple that's exactly as good as the currently best one, we
407 * could use either one. When inserting a lot of tuples with the same or
408 * similar keys, it's preferable to descend down the same path when
409 * possible, as that's more cache-friendly. On the other hand, if all
410 * inserts land on the same leaf page after a split, we're never going to
411 * insert anything to the other half of the split, and will end up using
412 * only 50% of the available space. Distributing the inserts evenly would
413 * lead to better space usage, but that hurts cache-locality during
414 * insertion. To get the best of both worlds, when we find a tuple that's
415 * exactly as good as the previous best, choose randomly whether to stick
416 * to the old best, or use the new one. Once we decide to stick to the
417 * old best, we keep sticking to it for any subsequent equally good tuples
418 * we might find. This favors tuples with low offsets, but still allows
419 * some inserts to go to other equally-good subtrees.
420 *
421 * keep_current_best is -1 if we haven't yet had to make a random choice
422 * whether to keep the current best tuple. If we have done so, and
423 * decided to keep it, keep_current_best is 1; if we've decided to
424 * replace, keep_current_best is 0. (This state will be reset to -1 as
425 * soon as we've made the replacement, but sometimes we make the choice in
426 * advance of actually finding a replacement best tuple.)
427 */
428 keep_current_best = -1;
429
430 /*
431 * Loop over tuples on page.
432 */
433 maxoff = PageGetMaxOffsetNumber(p);
434 Assert(maxoff >= FirstOffsetNumber);
435
436 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
437 {
438 IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
439 bool zero_penalty;
440 int j;
441
442 zero_penalty = true;
443
444 /* Loop over index attributes. */
445 for (j = 0; j < r->rd_att->natts; j++)
446 {
447 Datum datum;
448 float usize;
449 bool IsNull;
450
451 /* Compute penalty for this column. */
452 datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
453 gistdentryinit(giststate, j, &entry, datum, r, p, i,
454 FALSE, IsNull);
455 usize = gistpenalty(giststate, j, &entry, IsNull,
456 &identry[j], isnull[j]);
457 if (usize > 0)
458 zero_penalty = false;
459
460 if (best_penalty[j] < 0 || usize < best_penalty[j])
461 {
462 /*
463 * New best penalty for column. Tentatively select this tuple
464 * as the target, and record the best penalty. Then reset the
465 * next column's penalty to "unknown" (and indirectly, the
466 * same for all the ones to its right). This will force us to
467 * adopt this tuple's penalty values as the best for all the
468 * remaining columns during subsequent loop iterations.
469 */
470 result = i;
471 best_penalty[j] = usize;
472
473 if (j < r->rd_att->natts - 1)
474 best_penalty[j + 1] = -1;
475
476 /* we have new best, so reset keep-it decision */
477 keep_current_best = -1;
478 }
479 else if (best_penalty[j] == usize)
480 {
481 /*
482 * The current tuple is exactly as good for this column as the
483 * best tuple seen so far. The next iteration of this loop
484 * will compare the next column.
485 */
486 }
487 else
488 {
489 /*
490 * The current tuple is worse for this column than the best
491 * tuple seen so far. Skip the remaining columns and move on
492 * to the next tuple, if any.
493 */
494 zero_penalty = false; /* so outer loop won't exit */
495 break;
496 }
497 }
498
499 /*
500 * If we looped past the last column, and did not update "result",
501 * then this tuple is exactly as good as the prior best tuple.
502 */
503 if (j == r->rd_att->natts && result != i)
504 {
505 if (keep_current_best == -1)
506 {
507 /* we didn't make the random choice yet for this old best */
508 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
509 }
510 if (keep_current_best == 0)
511 {
512 /* we choose to use the new tuple */
513 result = i;
514 /* choose again if there are even more exactly-as-good ones */
515 keep_current_best = -1;
516 }
517 }
518
519 /*
520 * If we find a tuple with zero penalty for all columns, and we've
521 * decided we don't want to search for another tuple with equal
522 * penalty, there's no need to examine remaining tuples; just break
523 * out of the loop and return it.
524 */
525 if (zero_penalty)
526 {
527 if (keep_current_best == -1)
528 {
529 /* we didn't make the random choice yet for this old best */
530 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
531 }
532 if (keep_current_best == 1)
533 break;
534 }
535 }
536
537 return result;
538 }
539
540 /*
541 * initialize a GiST entry with a decompressed version of key
542 */
543 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)544 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
545 Datum k, Relation r, Page pg, OffsetNumber o,
546 bool l, bool isNull)
547 {
548 if (!isNull)
549 {
550 GISTENTRY *dep;
551
552 gistentryinit(*e, k, r, pg, o, l);
553 dep = (GISTENTRY *)
554 DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
555 giststate->supportCollation[nkey],
556 PointerGetDatum(e)));
557 /* decompressFn may just return the given pointer */
558 if (dep != e)
559 gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
560 dep->leafkey);
561 }
562 else
563 gistentryinit(*e, (Datum) 0, r, pg, o, l);
564 }
565
566 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum attdata[],bool isnull[],bool isleaf)567 gistFormTuple(GISTSTATE *giststate, Relation r,
568 Datum attdata[], bool isnull[], bool isleaf)
569 {
570 Datum compatt[INDEX_MAX_KEYS];
571 int i;
572 IndexTuple res;
573
574 /*
575 * Call the compress method on each attribute.
576 */
577 for (i = 0; i < r->rd_att->natts; i++)
578 {
579 if (isnull[i])
580 compatt[i] = (Datum) 0;
581 else
582 {
583 GISTENTRY centry;
584 GISTENTRY *cep;
585
586 gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
587 isleaf);
588 cep = (GISTENTRY *)
589 DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
590 giststate->supportCollation[i],
591 PointerGetDatum(¢ry)));
592 compatt[i] = cep->key;
593 }
594 }
595
596 res = index_form_tuple(giststate->tupdesc, compatt, isnull);
597
598 /*
599 * The offset number on tuples on internal pages is unused. For historical
600 * reasons, it is set to 0xffff.
601 */
602 ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
603 return res;
604 }
605
606 /*
607 * initialize a GiST entry with fetched value in key field
608 */
609 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)610 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
611 {
612 GISTENTRY fentry;
613 GISTENTRY *fep;
614
615 gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
616
617 fep = (GISTENTRY *)
618 DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
619 giststate->supportCollation[nkey],
620 PointerGetDatum(&fentry)));
621
622 /* fetchFn set 'key', return it to the caller */
623 return fep->key;
624 }
625
626 /*
627 * Fetch all keys in tuple.
628 * Returns a new HeapTuple containing the originally-indexed data.
629 */
630 HeapTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)631 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
632 {
633 MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
634 Datum fetchatt[INDEX_MAX_KEYS];
635 bool isnull[INDEX_MAX_KEYS];
636 int i;
637
638 for (i = 0; i < r->rd_att->natts; i++)
639 {
640 Datum datum;
641
642 datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
643
644 if (giststate->fetchFn[i].fn_oid != InvalidOid)
645 {
646 if (!isnull[i])
647 fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
648 else
649 fetchatt[i] = (Datum) 0;
650 }
651 else
652 {
653 /*
654 * Index-only scans not supported for this column. Since the
655 * planner chose an index-only scan anyway, it is not interested
656 * in this column, and we can replace it with a NULL.
657 */
658 isnull[i] = true;
659 fetchatt[i] = (Datum) 0;
660 }
661 }
662 MemoryContextSwitchTo(oldcxt);
663
664 return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
665 }
666
667 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)668 gistpenalty(GISTSTATE *giststate, int attno,
669 GISTENTRY *orig, bool isNullOrig,
670 GISTENTRY *add, bool isNullAdd)
671 {
672 float penalty = 0.0;
673
674 if (giststate->penaltyFn[attno].fn_strict == FALSE ||
675 (isNullOrig == FALSE && isNullAdd == FALSE))
676 {
677 FunctionCall3Coll(&giststate->penaltyFn[attno],
678 giststate->supportCollation[attno],
679 PointerGetDatum(orig),
680 PointerGetDatum(add),
681 PointerGetDatum(&penalty));
682 /* disallow negative or NaN penalty */
683 if (isnan(penalty) || penalty < 0.0)
684 penalty = 0.0;
685 }
686 else if (isNullOrig && isNullAdd)
687 penalty = 0.0;
688 else
689 {
690 /* try to prevent mixing null and non-null values */
691 penalty = get_float4_infinity();
692 }
693
694 return penalty;
695 }
696
697 /*
698 * Initialize a new index page
699 */
700 void
GISTInitBuffer(Buffer b,uint32 f)701 GISTInitBuffer(Buffer b, uint32 f)
702 {
703 GISTPageOpaque opaque;
704 Page page;
705 Size pageSize;
706
707 pageSize = BufferGetPageSize(b);
708 page = BufferGetPage(b);
709 PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
710
711 opaque = GistPageGetOpaque(page);
712 /* page was already zeroed by PageInit, so this is not needed: */
713 /* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
714 opaque->rightlink = InvalidBlockNumber;
715 opaque->flags = f;
716 opaque->gist_page_id = GIST_PAGE_ID;
717 }
718
719 /*
720 * Verify that a freshly-read page looks sane.
721 */
722 void
gistcheckpage(Relation rel,Buffer buf)723 gistcheckpage(Relation rel, Buffer buf)
724 {
725 Page page = BufferGetPage(buf);
726
727 /*
728 * ReadBuffer verifies that every newly-read page passes
729 * PageHeaderIsValid, which means it either contains a reasonably sane
730 * page header or is all-zero. We have to defend against the all-zero
731 * case, however.
732 */
733 if (PageIsNew(page))
734 ereport(ERROR,
735 (errcode(ERRCODE_INDEX_CORRUPTED),
736 errmsg("index \"%s\" contains unexpected zero page at block %u",
737 RelationGetRelationName(rel),
738 BufferGetBlockNumber(buf)),
739 errhint("Please REINDEX it.")));
740
741 /*
742 * Additionally check that the special area looks sane.
743 */
744 if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
745 ereport(ERROR,
746 (errcode(ERRCODE_INDEX_CORRUPTED),
747 errmsg("index \"%s\" contains corrupted page at block %u",
748 RelationGetRelationName(rel),
749 BufferGetBlockNumber(buf)),
750 errhint("Please REINDEX it.")));
751 }
752
753
754 /*
755 * Allocate a new page (either by recycling, or by extending the index file)
756 *
757 * The returned buffer is already pinned and exclusive-locked
758 *
759 * Caller is responsible for initializing the page by calling GISTInitBuffer
760 */
761 Buffer
gistNewBuffer(Relation r)762 gistNewBuffer(Relation r)
763 {
764 Buffer buffer;
765 bool needLock;
766
767 /* First, try to get a page from FSM */
768 for (;;)
769 {
770 BlockNumber blkno = GetFreeIndexPage(r);
771
772 if (blkno == InvalidBlockNumber)
773 break; /* nothing left in FSM */
774
775 buffer = ReadBuffer(r, blkno);
776
777 /*
778 * We have to guard against the possibility that someone else already
779 * recycled this page; the buffer may be locked if so.
780 */
781 if (ConditionalLockBuffer(buffer))
782 {
783 Page page = BufferGetPage(buffer);
784
785 if (PageIsNew(page))
786 return buffer; /* OK to use, if never initialized */
787
788 gistcheckpage(r, buffer);
789
790 if (GistPageIsDeleted(page))
791 return buffer; /* OK to use */
792
793 LockBuffer(buffer, GIST_UNLOCK);
794 }
795
796 /* Can't use it, so release buffer and try again */
797 ReleaseBuffer(buffer);
798 }
799
800 /* Must extend the file */
801 needLock = !RELATION_IS_LOCAL(r);
802
803 if (needLock)
804 LockRelationForExtension(r, ExclusiveLock);
805
806 buffer = ReadBuffer(r, P_NEW);
807 LockBuffer(buffer, GIST_EXCLUSIVE);
808
809 if (needLock)
810 UnlockRelationForExtension(r, ExclusiveLock);
811
812 return buffer;
813 }
814
815 bytea *
gistoptions(Datum reloptions,bool validate)816 gistoptions(Datum reloptions, bool validate)
817 {
818 relopt_value *options;
819 GiSTOptions *rdopts;
820 int numoptions;
821 static const relopt_parse_elt tab[] = {
822 {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
823 {"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
824 };
825
826 options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
827 &numoptions);
828
829 /* if none set, we're done */
830 if (numoptions == 0)
831 return NULL;
832
833 rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
834
835 fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
836 validate, tab, lengthof(tab));
837
838 pfree(options);
839
840 return (bytea *) rdopts;
841 }
842
843 /*
844 * gistproperty() -- Check boolean properties of indexes.
845 *
846 * This is optional for most AMs, but is required for GiST because the core
847 * property code doesn't support AMPROP_DISTANCE_ORDERABLE. We also handle
848 * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
849 */
850 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)851 gistproperty(Oid index_oid, int attno,
852 IndexAMProperty prop, const char *propname,
853 bool *res, bool *isnull)
854 {
855 HeapTuple tuple;
856 Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
857 Form_pg_opclass rd_opclass;
858 Datum datum;
859 bool disnull;
860 oidvector *indclass;
861 Oid opclass,
862 opfamily,
863 opcintype;
864 int16 procno;
865
866 /* Only answer column-level inquiries */
867 if (attno == 0)
868 return false;
869
870 /*
871 * Currently, GiST distance-ordered scans require that there be a distance
872 * function in the opclass with the default types (i.e. the one loaded
873 * into the relcache entry, see initGISTstate). So we assume that if such
874 * a function exists, then there's a reason for it (rather than grubbing
875 * through all the opfamily's operators to find an ordered one).
876 *
877 * Essentially the same code can test whether we support returning the
878 * column data, since that's true if the opclass provides a fetch proc.
879 */
880
881 switch (prop)
882 {
883 case AMPROP_DISTANCE_ORDERABLE:
884 procno = GIST_DISTANCE_PROC;
885 break;
886 case AMPROP_RETURNABLE:
887 procno = GIST_FETCH_PROC;
888 break;
889 default:
890 return false;
891 }
892
893 /* First we need to know the column's opclass. */
894
895 tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
896 if (!HeapTupleIsValid(tuple))
897 {
898 *isnull = true;
899 return true;
900 }
901 rd_index = (Form_pg_index) GETSTRUCT(tuple);
902
903 /* caller is supposed to guarantee this */
904 Assert(attno > 0 && attno <= rd_index->indnatts);
905
906 datum = SysCacheGetAttr(INDEXRELID, tuple,
907 Anum_pg_index_indclass, &disnull);
908 Assert(!disnull);
909
910 indclass = ((oidvector *) DatumGetPointer(datum));
911 opclass = indclass->values[attno - 1];
912
913 ReleaseSysCache(tuple);
914
915 /* Now look up the opclass family and input datatype. */
916
917 tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
918 if (!HeapTupleIsValid(tuple))
919 {
920 *isnull = true;
921 return true;
922 }
923 rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
924
925 opfamily = rd_opclass->opcfamily;
926 opcintype = rd_opclass->opcintype;
927
928 ReleaseSysCache(tuple);
929
930 /* And now we can check whether the function is provided. */
931
932 *res = SearchSysCacheExists4(AMPROCNUM,
933 ObjectIdGetDatum(opfamily),
934 ObjectIdGetDatum(opcintype),
935 ObjectIdGetDatum(opcintype),
936 Int16GetDatum(procno));
937 return true;
938 }
939
940 /*
941 * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
942 * to detect concurrent page splits anyway. This function provides a fake
943 * sequence of LSNs for that purpose.
944 */
945 XLogRecPtr
gistGetFakeLSN(Relation rel)946 gistGetFakeLSN(Relation rel)
947 {
948 static XLogRecPtr counter = 1;
949
950 if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
951 {
952 /*
953 * Temporary relations are only accessible in our session, so a simple
954 * backend-local counter will do.
955 */
956 return counter++;
957 }
958 else
959 {
960 /*
961 * Unlogged relations are accessible from other backends, and survive
962 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
963 */
964 Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
965 return GetFakeLSNForUnloggedRel();
966 }
967 }
968