1 /*-------------------------------------------------------------------------
2 *
3 * gistutil.c
4 * utilities routines for the postgres GiST index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gist/gistutil.c
12 *-------------------------------------------------------------------------
13 */
14 #include "postgres.h"
15
16 #include <math.h>
17
18 #include "access/gist_private.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "catalog/pg_opclass.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 #include "utils/builtins.h"
25 #include "utils/syscache.h"
26
27
28 /*
29 * Write itup vector to page, has no control of free space.
30 */
31 void
gistfillbuffer(Page page,IndexTuple * itup,int len,OffsetNumber off)32 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
33 {
34 OffsetNumber l = InvalidOffsetNumber;
35 int i;
36
37 if (off == InvalidOffsetNumber)
38 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
39 OffsetNumberNext(PageGetMaxOffsetNumber(page));
40
41 for (i = 0; i < len; i++)
42 {
43 Size sz = IndexTupleSize(itup[i]);
44
45 l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
46 if (l == InvalidOffsetNumber)
47 elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
48 i, len, (int) sz);
49 off++;
50 }
51 }
52
53 /*
54 * Check space for itup vector on page
55 */
56 bool
gistnospace(Page page,IndexTuple * itvec,int len,OffsetNumber todelete,Size freespace)57 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
58 {
59 unsigned int size = freespace,
60 deleted = 0;
61 int i;
62
63 for (i = 0; i < len; i++)
64 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
65
66 if (todelete != InvalidOffsetNumber)
67 {
68 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
69
70 deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
71 }
72
73 return (PageGetFreeSpace(page) + deleted < size);
74 }
75
76 bool
gistfitpage(IndexTuple * itvec,int len)77 gistfitpage(IndexTuple *itvec, int len)
78 {
79 int i;
80 Size size = 0;
81
82 for (i = 0; i < len; i++)
83 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
84
85 /* TODO: Consider fillfactor */
86 return (size <= GiSTPageSize);
87 }
88
89 /*
90 * Read buffer into itup vector
91 */
92 IndexTuple *
gistextractpage(Page page,int * len)93 gistextractpage(Page page, int *len /* out */ )
94 {
95 OffsetNumber i,
96 maxoff;
97 IndexTuple *itvec;
98
99 maxoff = PageGetMaxOffsetNumber(page);
100 *len = maxoff;
101 itvec = palloc(sizeof(IndexTuple) * maxoff);
102 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
103 itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
104
105 return itvec;
106 }
107
108 /*
109 * join two vectors into one
110 */
111 IndexTuple *
gistjoinvector(IndexTuple * itvec,int * len,IndexTuple * additvec,int addlen)112 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
113 {
114 itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
115 memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
116 *len += addlen;
117 return itvec;
118 }
119
120 /*
121 * make plain IndexTupleVector
122 */
123
124 IndexTupleData *
gistfillitupvec(IndexTuple * vec,int veclen,int * memlen)125 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
126 {
127 char *ptr,
128 *ret;
129 int i;
130
131 *memlen = 0;
132
133 for (i = 0; i < veclen; i++)
134 *memlen += IndexTupleSize(vec[i]);
135
136 ptr = ret = palloc(*memlen);
137
138 for (i = 0; i < veclen; i++)
139 {
140 memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
141 ptr += IndexTupleSize(vec[i]);
142 }
143
144 return (IndexTupleData *) ret;
145 }
146
147 /*
148 * Make unions of keys in IndexTuple vector (one union datum per index column).
149 * Union Datums are returned into the attr/isnull arrays.
150 * Resulting Datums aren't compressed.
151 */
152 void
gistMakeUnionItVec(GISTSTATE * giststate,IndexTuple * itvec,int len,Datum * attr,bool * isnull)153 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
154 Datum *attr, bool *isnull)
155 {
156 int i;
157 GistEntryVector *evec;
158 int attrsize;
159
160 evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
161
162 for (i = 0; i < giststate->tupdesc->natts; i++)
163 {
164 int j;
165
166 /* Collect non-null datums for this column */
167 evec->n = 0;
168 for (j = 0; j < len; j++)
169 {
170 Datum datum;
171 bool IsNull;
172
173 datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
174 if (IsNull)
175 continue;
176
177 gistdentryinit(giststate, i,
178 evec->vector + evec->n,
179 datum,
180 NULL, NULL, (OffsetNumber) 0,
181 FALSE, IsNull);
182 evec->n++;
183 }
184
185 /* If this column was all NULLs, the union is NULL */
186 if (evec->n == 0)
187 {
188 attr[i] = (Datum) 0;
189 isnull[i] = TRUE;
190 }
191 else
192 {
193 if (evec->n == 1)
194 {
195 /* unionFn may expect at least two inputs */
196 evec->n = 2;
197 evec->vector[1] = evec->vector[0];
198 }
199
200 /* Make union and store in attr array */
201 attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
202 giststate->supportCollation[i],
203 PointerGetDatum(evec),
204 PointerGetDatum(&attrsize));
205
206 isnull[i] = FALSE;
207 }
208 }
209 }
210
211 /*
212 * Return an IndexTuple containing the result of applying the "union"
213 * method to the specified IndexTuple vector.
214 */
215 IndexTuple
gistunion(Relation r,IndexTuple * itvec,int len,GISTSTATE * giststate)216 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
217 {
218 Datum attr[INDEX_MAX_KEYS];
219 bool isnull[INDEX_MAX_KEYS];
220
221 gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
222
223 return gistFormTuple(giststate, r, attr, isnull, false);
224 }
225
226 /*
227 * makes union of two key
228 */
229 void
gistMakeUnionKey(GISTSTATE * giststate,int attno,GISTENTRY * entry1,bool isnull1,GISTENTRY * entry2,bool isnull2,Datum * dst,bool * dstisnull)230 gistMakeUnionKey(GISTSTATE *giststate, int attno,
231 GISTENTRY *entry1, bool isnull1,
232 GISTENTRY *entry2, bool isnull2,
233 Datum *dst, bool *dstisnull)
234 {
235 /* we need a GistEntryVector with room for exactly 2 elements */
236 union
237 {
238 GistEntryVector gev;
239 char padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
240 } storage;
241 GistEntryVector *evec = &storage.gev;
242 int dstsize;
243
244 evec->n = 2;
245
246 if (isnull1 && isnull2)
247 {
248 *dstisnull = TRUE;
249 *dst = (Datum) 0;
250 }
251 else
252 {
253 if (isnull1 == FALSE && isnull2 == FALSE)
254 {
255 evec->vector[0] = *entry1;
256 evec->vector[1] = *entry2;
257 }
258 else if (isnull1 == FALSE)
259 {
260 evec->vector[0] = *entry1;
261 evec->vector[1] = *entry1;
262 }
263 else
264 {
265 evec->vector[0] = *entry2;
266 evec->vector[1] = *entry2;
267 }
268
269 *dstisnull = FALSE;
270 *dst = FunctionCall2Coll(&giststate->unionFn[attno],
271 giststate->supportCollation[attno],
272 PointerGetDatum(evec),
273 PointerGetDatum(&dstsize));
274 }
275 }
276
277 bool
gistKeyIsEQ(GISTSTATE * giststate,int attno,Datum a,Datum b)278 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
279 {
280 bool result;
281
282 FunctionCall3Coll(&giststate->equalFn[attno],
283 giststate->supportCollation[attno],
284 a, b,
285 PointerGetDatum(&result));
286 return result;
287 }
288
289 /*
290 * Decompress all keys in tuple
291 */
292 void
gistDeCompressAtt(GISTSTATE * giststate,Relation r,IndexTuple tuple,Page p,OffsetNumber o,GISTENTRY * attdata,bool * isnull)293 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
294 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
295 {
296 int i;
297
298 for (i = 0; i < r->rd_att->natts; i++)
299 {
300 Datum datum;
301
302 datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
303 gistdentryinit(giststate, i, &attdata[i],
304 datum, r, p, o,
305 FALSE, isnull[i]);
306 }
307 }
308
309 /*
310 * Forms union of oldtup and addtup, if union == oldtup then return NULL
311 */
312 IndexTuple
gistgetadjusted(Relation r,IndexTuple oldtup,IndexTuple addtup,GISTSTATE * giststate)313 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
314 {
315 bool neednew = FALSE;
316 GISTENTRY oldentries[INDEX_MAX_KEYS],
317 addentries[INDEX_MAX_KEYS];
318 bool oldisnull[INDEX_MAX_KEYS],
319 addisnull[INDEX_MAX_KEYS];
320 Datum attr[INDEX_MAX_KEYS];
321 bool isnull[INDEX_MAX_KEYS];
322 IndexTuple newtup = NULL;
323 int i;
324
325 gistDeCompressAtt(giststate, r, oldtup, NULL,
326 (OffsetNumber) 0, oldentries, oldisnull);
327
328 gistDeCompressAtt(giststate, r, addtup, NULL,
329 (OffsetNumber) 0, addentries, addisnull);
330
331 for (i = 0; i < r->rd_att->natts; i++)
332 {
333 gistMakeUnionKey(giststate, i,
334 oldentries + i, oldisnull[i],
335 addentries + i, addisnull[i],
336 attr + i, isnull + i);
337
338 if (neednew)
339 /* we already need new key, so we can skip check */
340 continue;
341
342 if (isnull[i])
343 /* union of key may be NULL if and only if both keys are NULL */
344 continue;
345
346 if (!addisnull[i])
347 {
348 if (oldisnull[i] ||
349 !gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
350 neednew = true;
351 }
352 }
353
354 if (neednew)
355 {
356 /* need to update key */
357 newtup = gistFormTuple(giststate, r, attr, isnull, false);
358 newtup->t_tid = oldtup->t_tid;
359 }
360
361 return newtup;
362 }
363
364 /*
365 * Search an upper index page for the entry with lowest penalty for insertion
366 * of the new index key contained in "it".
367 *
368 * Returns the index of the page entry to insert into.
369 */
370 OffsetNumber
gistchoose(Relation r,Page p,IndexTuple it,GISTSTATE * giststate)371 gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
372 GISTSTATE *giststate)
373 {
374 OffsetNumber result;
375 OffsetNumber maxoff;
376 OffsetNumber i;
377 float best_penalty[INDEX_MAX_KEYS];
378 GISTENTRY entry,
379 identry[INDEX_MAX_KEYS];
380 bool isnull[INDEX_MAX_KEYS];
381 int keep_current_best;
382
383 Assert(!GistPageIsLeaf(p));
384
385 gistDeCompressAtt(giststate, r,
386 it, NULL, (OffsetNumber) 0,
387 identry, isnull);
388
389 /* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
390 result = FirstOffsetNumber;
391
392 /*
393 * The index may have multiple columns, and there's a penalty value for
394 * each column. The penalty associated with a column that appears earlier
395 * in the index definition is strictly more important than the penalty of
396 * a column that appears later in the index definition.
397 *
398 * best_penalty[j] is the best penalty we have seen so far for column j,
399 * or -1 when we haven't yet examined column j. Array entries to the
400 * right of the first -1 are undefined.
401 */
402 best_penalty[0] = -1;
403
404 /*
405 * If we find a tuple that's exactly as good as the currently best one, we
406 * could use either one. When inserting a lot of tuples with the same or
407 * similar keys, it's preferable to descend down the same path when
408 * possible, as that's more cache-friendly. On the other hand, if all
409 * inserts land on the same leaf page after a split, we're never going to
410 * insert anything to the other half of the split, and will end up using
411 * only 50% of the available space. Distributing the inserts evenly would
412 * lead to better space usage, but that hurts cache-locality during
413 * insertion. To get the best of both worlds, when we find a tuple that's
414 * exactly as good as the previous best, choose randomly whether to stick
415 * to the old best, or use the new one. Once we decide to stick to the
416 * old best, we keep sticking to it for any subsequent equally good tuples
417 * we might find. This favors tuples with low offsets, but still allows
418 * some inserts to go to other equally-good subtrees.
419 *
420 * keep_current_best is -1 if we haven't yet had to make a random choice
421 * whether to keep the current best tuple. If we have done so, and
422 * decided to keep it, keep_current_best is 1; if we've decided to
423 * replace, keep_current_best is 0. (This state will be reset to -1 as
424 * soon as we've made the replacement, but sometimes we make the choice in
425 * advance of actually finding a replacement best tuple.)
426 */
427 keep_current_best = -1;
428
429 /*
430 * Loop over tuples on page.
431 */
432 maxoff = PageGetMaxOffsetNumber(p);
433 Assert(maxoff >= FirstOffsetNumber);
434
435 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
436 {
437 IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
438 bool zero_penalty;
439 int j;
440
441 zero_penalty = true;
442
443 /* Loop over index attributes. */
444 for (j = 0; j < r->rd_att->natts; j++)
445 {
446 Datum datum;
447 float usize;
448 bool IsNull;
449
450 /* Compute penalty for this column. */
451 datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
452 gistdentryinit(giststate, j, &entry, datum, r, p, i,
453 FALSE, IsNull);
454 usize = gistpenalty(giststate, j, &entry, IsNull,
455 &identry[j], isnull[j]);
456 if (usize > 0)
457 zero_penalty = false;
458
459 if (best_penalty[j] < 0 || usize < best_penalty[j])
460 {
461 /*
462 * New best penalty for column. Tentatively select this tuple
463 * as the target, and record the best penalty. Then reset the
464 * next column's penalty to "unknown" (and indirectly, the
465 * same for all the ones to its right). This will force us to
466 * adopt this tuple's penalty values as the best for all the
467 * remaining columns during subsequent loop iterations.
468 */
469 result = i;
470 best_penalty[j] = usize;
471
472 if (j < r->rd_att->natts - 1)
473 best_penalty[j + 1] = -1;
474
475 /* we have new best, so reset keep-it decision */
476 keep_current_best = -1;
477 }
478 else if (best_penalty[j] == usize)
479 {
480 /*
481 * The current tuple is exactly as good for this column as the
482 * best tuple seen so far. The next iteration of this loop
483 * will compare the next column.
484 */
485 }
486 else
487 {
488 /*
489 * The current tuple is worse for this column than the best
490 * tuple seen so far. Skip the remaining columns and move on
491 * to the next tuple, if any.
492 */
493 zero_penalty = false; /* so outer loop won't exit */
494 break;
495 }
496 }
497
498 /*
499 * If we looped past the last column, and did not update "result",
500 * then this tuple is exactly as good as the prior best tuple.
501 */
502 if (j == r->rd_att->natts && result != i)
503 {
504 if (keep_current_best == -1)
505 {
506 /* we didn't make the random choice yet for this old best */
507 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
508 }
509 if (keep_current_best == 0)
510 {
511 /* we choose to use the new tuple */
512 result = i;
513 /* choose again if there are even more exactly-as-good ones */
514 keep_current_best = -1;
515 }
516 }
517
518 /*
519 * If we find a tuple with zero penalty for all columns, and we've
520 * decided we don't want to search for another tuple with equal
521 * penalty, there's no need to examine remaining tuples; just break
522 * out of the loop and return it.
523 */
524 if (zero_penalty)
525 {
526 if (keep_current_best == -1)
527 {
528 /* we didn't make the random choice yet for this old best */
529 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
530 }
531 if (keep_current_best == 1)
532 break;
533 }
534 }
535
536 return result;
537 }
538
539 /*
540 * initialize a GiST entry with a decompressed version of key
541 */
542 void
gistdentryinit(GISTSTATE * giststate,int nkey,GISTENTRY * e,Datum k,Relation r,Page pg,OffsetNumber o,bool l,bool isNull)543 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
544 Datum k, Relation r, Page pg, OffsetNumber o,
545 bool l, bool isNull)
546 {
547 if (!isNull)
548 {
549 GISTENTRY *dep;
550
551 gistentryinit(*e, k, r, pg, o, l);
552 dep = (GISTENTRY *)
553 DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
554 giststate->supportCollation[nkey],
555 PointerGetDatum(e)));
556 /* decompressFn may just return the given pointer */
557 if (dep != e)
558 gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
559 dep->leafkey);
560 }
561 else
562 gistentryinit(*e, (Datum) 0, r, pg, o, l);
563 }
564
565 IndexTuple
gistFormTuple(GISTSTATE * giststate,Relation r,Datum attdata[],bool isnull[],bool isleaf)566 gistFormTuple(GISTSTATE *giststate, Relation r,
567 Datum attdata[], bool isnull[], bool isleaf)
568 {
569 Datum compatt[INDEX_MAX_KEYS];
570 int i;
571 IndexTuple res;
572
573 /*
574 * Call the compress method on each attribute.
575 */
576 for (i = 0; i < r->rd_att->natts; i++)
577 {
578 if (isnull[i])
579 compatt[i] = (Datum) 0;
580 else
581 {
582 GISTENTRY centry;
583 GISTENTRY *cep;
584
585 gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
586 isleaf);
587 cep = (GISTENTRY *)
588 DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
589 giststate->supportCollation[i],
590 PointerGetDatum(¢ry)));
591 compatt[i] = cep->key;
592 }
593 }
594
595 res = index_form_tuple(giststate->tupdesc, compatt, isnull);
596
597 /*
598 * The offset number on tuples on internal pages is unused. For historical
599 * reasons, it is set to 0xffff.
600 */
601 ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
602 return res;
603 }
604
605 /*
606 * initialize a GiST entry with fetched value in key field
607 */
608 static Datum
gistFetchAtt(GISTSTATE * giststate,int nkey,Datum k,Relation r)609 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
610 {
611 GISTENTRY fentry;
612 GISTENTRY *fep;
613
614 gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
615
616 fep = (GISTENTRY *)
617 DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
618 giststate->supportCollation[nkey],
619 PointerGetDatum(&fentry)));
620
621 /* fetchFn set 'key', return it to the caller */
622 return fep->key;
623 }
624
625 /*
626 * Fetch all keys in tuple.
627 * returns new IndexTuple that contains GISTENTRY with fetched data
628 */
629 IndexTuple
gistFetchTuple(GISTSTATE * giststate,Relation r,IndexTuple tuple)630 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
631 {
632 MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
633 Datum fetchatt[INDEX_MAX_KEYS];
634 bool isnull[INDEX_MAX_KEYS];
635 int i;
636
637 for (i = 0; i < r->rd_att->natts; i++)
638 {
639 Datum datum;
640
641 datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
642
643 if (giststate->fetchFn[i].fn_oid != InvalidOid)
644 {
645 if (!isnull[i])
646 fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
647 else
648 fetchatt[i] = (Datum) 0;
649 }
650 else
651 {
652 /*
653 * Index-only scans not supported for this column. Since the
654 * planner chose an index-only scan anyway, it is not interested
655 * in this column, and we can replace it with a NULL.
656 */
657 isnull[i] = true;
658 fetchatt[i] = (Datum) 0;
659 }
660 }
661 MemoryContextSwitchTo(oldcxt);
662
663 return index_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
664 }
665
666 float
gistpenalty(GISTSTATE * giststate,int attno,GISTENTRY * orig,bool isNullOrig,GISTENTRY * add,bool isNullAdd)667 gistpenalty(GISTSTATE *giststate, int attno,
668 GISTENTRY *orig, bool isNullOrig,
669 GISTENTRY *add, bool isNullAdd)
670 {
671 float penalty = 0.0;
672
673 if (giststate->penaltyFn[attno].fn_strict == FALSE ||
674 (isNullOrig == FALSE && isNullAdd == FALSE))
675 {
676 FunctionCall3Coll(&giststate->penaltyFn[attno],
677 giststate->supportCollation[attno],
678 PointerGetDatum(orig),
679 PointerGetDatum(add),
680 PointerGetDatum(&penalty));
681 /* disallow negative or NaN penalty */
682 if (isnan(penalty) || penalty < 0.0)
683 penalty = 0.0;
684 }
685 else if (isNullOrig && isNullAdd)
686 penalty = 0.0;
687 else
688 {
689 /* try to prevent mixing null and non-null values */
690 penalty = get_float4_infinity();
691 }
692
693 return penalty;
694 }
695
696 /*
697 * Initialize a new index page
698 */
699 void
GISTInitBuffer(Buffer b,uint32 f)700 GISTInitBuffer(Buffer b, uint32 f)
701 {
702 GISTPageOpaque opaque;
703 Page page;
704 Size pageSize;
705
706 pageSize = BufferGetPageSize(b);
707 page = BufferGetPage(b);
708 PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
709
710 opaque = GistPageGetOpaque(page);
711 /* page was already zeroed by PageInit, so this is not needed: */
712 /* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
713 opaque->rightlink = InvalidBlockNumber;
714 opaque->flags = f;
715 opaque->gist_page_id = GIST_PAGE_ID;
716 }
717
718 /*
719 * Verify that a freshly-read page looks sane.
720 */
721 void
gistcheckpage(Relation rel,Buffer buf)722 gistcheckpage(Relation rel, Buffer buf)
723 {
724 Page page = BufferGetPage(buf);
725
726 /*
727 * ReadBuffer verifies that every newly-read page passes
728 * PageHeaderIsValid, which means it either contains a reasonably sane
729 * page header or is all-zero. We have to defend against the all-zero
730 * case, however.
731 */
732 if (PageIsNew(page))
733 ereport(ERROR,
734 (errcode(ERRCODE_INDEX_CORRUPTED),
735 errmsg("index \"%s\" contains unexpected zero page at block %u",
736 RelationGetRelationName(rel),
737 BufferGetBlockNumber(buf)),
738 errhint("Please REINDEX it.")));
739
740 /*
741 * Additionally check that the special area looks sane.
742 */
743 if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
744 ereport(ERROR,
745 (errcode(ERRCODE_INDEX_CORRUPTED),
746 errmsg("index \"%s\" contains corrupted page at block %u",
747 RelationGetRelationName(rel),
748 BufferGetBlockNumber(buf)),
749 errhint("Please REINDEX it.")));
750 }
751
752
753 /*
754 * Allocate a new page (either by recycling, or by extending the index file)
755 *
756 * The returned buffer is already pinned and exclusive-locked
757 *
758 * Caller is responsible for initializing the page by calling GISTInitBuffer
759 */
760 Buffer
gistNewBuffer(Relation r)761 gistNewBuffer(Relation r)
762 {
763 Buffer buffer;
764 bool needLock;
765
766 /* First, try to get a page from FSM */
767 for (;;)
768 {
769 BlockNumber blkno = GetFreeIndexPage(r);
770
771 if (blkno == InvalidBlockNumber)
772 break; /* nothing left in FSM */
773
774 buffer = ReadBuffer(r, blkno);
775
776 /*
777 * We have to guard against the possibility that someone else already
778 * recycled this page; the buffer may be locked if so.
779 */
780 if (ConditionalLockBuffer(buffer))
781 {
782 Page page = BufferGetPage(buffer);
783
784 if (PageIsNew(page))
785 return buffer; /* OK to use, if never initialized */
786
787 gistcheckpage(r, buffer);
788
789 if (GistPageIsDeleted(page))
790 return buffer; /* OK to use */
791
792 LockBuffer(buffer, GIST_UNLOCK);
793 }
794
795 /* Can't use it, so release buffer and try again */
796 ReleaseBuffer(buffer);
797 }
798
799 /* Must extend the file */
800 needLock = !RELATION_IS_LOCAL(r);
801
802 if (needLock)
803 LockRelationForExtension(r, ExclusiveLock);
804
805 buffer = ReadBuffer(r, P_NEW);
806 LockBuffer(buffer, GIST_EXCLUSIVE);
807
808 if (needLock)
809 UnlockRelationForExtension(r, ExclusiveLock);
810
811 return buffer;
812 }
813
814 bytea *
gistoptions(Datum reloptions,bool validate)815 gistoptions(Datum reloptions, bool validate)
816 {
817 relopt_value *options;
818 GiSTOptions *rdopts;
819 int numoptions;
820 static const relopt_parse_elt tab[] = {
821 {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
822 {"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
823 };
824
825 options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
826 &numoptions);
827
828 /* if none set, we're done */
829 if (numoptions == 0)
830 return NULL;
831
832 rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
833
834 fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
835 validate, tab, lengthof(tab));
836
837 pfree(options);
838
839 return (bytea *) rdopts;
840 }
841
842 /*
843 * gistproperty() -- Check boolean properties of indexes.
844 *
845 * This is optional for most AMs, but is required for GiST because the core
846 * property code doesn't support AMPROP_DISTANCE_ORDERABLE. We also handle
847 * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
848 */
849 bool
gistproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)850 gistproperty(Oid index_oid, int attno,
851 IndexAMProperty prop, const char *propname,
852 bool *res, bool *isnull)
853 {
854 HeapTuple tuple;
855 Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
856 Form_pg_opclass rd_opclass;
857 Datum datum;
858 bool disnull;
859 oidvector *indclass;
860 Oid opclass,
861 opfamily,
862 opcintype;
863 int16 procno;
864
865 /* Only answer column-level inquiries */
866 if (attno == 0)
867 return false;
868
869 /*
870 * Currently, GiST distance-ordered scans require that there be a distance
871 * function in the opclass with the default types (i.e. the one loaded
872 * into the relcache entry, see initGISTstate). So we assume that if such
873 * a function exists, then there's a reason for it (rather than grubbing
874 * through all the opfamily's operators to find an ordered one).
875 *
876 * Essentially the same code can test whether we support returning the
877 * column data, since that's true if the opclass provides a fetch proc.
878 */
879
880 switch (prop)
881 {
882 case AMPROP_DISTANCE_ORDERABLE:
883 procno = GIST_DISTANCE_PROC;
884 break;
885 case AMPROP_RETURNABLE:
886 procno = GIST_FETCH_PROC;
887 break;
888 default:
889 return false;
890 }
891
892 /* First we need to know the column's opclass. */
893
894 tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
895 if (!HeapTupleIsValid(tuple))
896 {
897 *isnull = true;
898 return true;
899 }
900 rd_index = (Form_pg_index) GETSTRUCT(tuple);
901
902 /* caller is supposed to guarantee this */
903 Assert(attno > 0 && attno <= rd_index->indnatts);
904
905 datum = SysCacheGetAttr(INDEXRELID, tuple,
906 Anum_pg_index_indclass, &disnull);
907 Assert(!disnull);
908
909 indclass = ((oidvector *) DatumGetPointer(datum));
910 opclass = indclass->values[attno - 1];
911
912 ReleaseSysCache(tuple);
913
914 /* Now look up the opclass family and input datatype. */
915
916 tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
917 if (!HeapTupleIsValid(tuple))
918 {
919 *isnull = true;
920 return true;
921 }
922 rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
923
924 opfamily = rd_opclass->opcfamily;
925 opcintype = rd_opclass->opcintype;
926
927 ReleaseSysCache(tuple);
928
929 /* And now we can check whether the function is provided. */
930
931 *res = SearchSysCacheExists4(AMPROCNUM,
932 ObjectIdGetDatum(opfamily),
933 ObjectIdGetDatum(opcintype),
934 ObjectIdGetDatum(opcintype),
935 Int16GetDatum(procno));
936 return true;
937 }
938
939 /*
940 * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
941 * to detect concurrent page splits anyway. This function provides a fake
942 * sequence of LSNs for that purpose.
943 */
944 XLogRecPtr
gistGetFakeLSN(Relation rel)945 gistGetFakeLSN(Relation rel)
946 {
947 static XLogRecPtr counter = 1;
948
949 if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
950 {
951 /*
952 * Temporary relations are only accessible in our session, so a simple
953 * backend-local counter will do.
954 */
955 return counter++;
956 }
957 else
958 {
959 /*
960 * Unlogged relations are accessible from other backends, and survive
961 * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
962 */
963 Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
964 return GetFakeLSNForUnloggedRel();
965 }
966 }
967