1 /*-------------------------------------------------------------------------
2 *
3 * spgscan.c
4 * routines for scanning SP-GiST indexes
5 *
6 *
js_export()7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgscan.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/relscan.h"
20 #include "access/spgist_private.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "storage/bufmgr.h"
24 #include "utils/datum.h"
25 #include "utils/float.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29
30 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
31 Datum leafValue, bool isNull, bool recheck,
32 bool recheckDistances, double *distances);
33
34 /*
35 * Pairing heap comparison function for the SpGistSearchItem queue.
36 * KNN-searches currently only support NULLS LAST. So, preserve this logic
37 * here.
38 */
39 static int
40 pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
41 const pairingheap_node *b, void *arg)
42 {
43 const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
44 const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
45 SpGistScanOpaque so = (SpGistScanOpaque) arg;
46 int i;
47
48 if (sa->isNull)
49 {
export()50 if (!sb->isNull)
51 return -1;
52 }
53 else if (sb->isNull)
54 {
55 return 1;
56 }
57 else
58 {
59 /* Order according to distance comparison */
60 for (i = 0; i < so->numberOfNonNullOrderBys; i++)
61 {
62 if (isnan(sa->distances[i]) && isnan(sb->distances[i]))
63 continue; /* NaN == NaN */
64 if (isnan(sa->distances[i]))
65 return -1; /* NaN > number */
66 if (isnan(sb->distances[i]))
67 return 1; /* number < NaN */
68 if (sa->distances[i] != sb->distances[i])
69 return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
70 }
71 }
72
73 /* Leaf items go before inner pages, to ensure a depth-first search */
74 if (sa->isLeaf && !sb->isLeaf)
75 return 1;
76 if (!sa->isLeaf && sb->isLeaf)
77 return -1;
78
79 return 0;
80 }
81
82 static void
83 spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
84 {
import()85 if (!so->state.attLeafType.attbyval &&
86 DatumGetPointer(item->value) != NULL)
87 pfree(DatumGetPointer(item->value));
88
89 if (item->traversalValue)
90 pfree(item->traversalValue);
91
92 pfree(item);
93 }
94
95 /*
96 * Add SpGistSearchItem to queue
97 *
98 * Called in queue context
99 */
100 static void
101 spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item)
102 {
103 pairingheap_add(so->scanQueue, &item->phNode);
104 }
105
106 static SpGistSearchItem *
107 spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances)
108 {
109 /* allocate distance array only for non-NULL items */
110 SpGistSearchItem *item =
111 palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfNonNullOrderBys));
112
113 item->isNull = isnull;
pass_array()114
115 if (!isnull && so->numberOfNonNullOrderBys > 0)
116 memcpy(item->distances, distances,
117 sizeof(item->distances[0]) * so->numberOfNonNullOrderBys);
118
119 return item;
120 }
121
122 static void
123 spgAddStartItem(SpGistScanOpaque so, bool isnull)
124 {
125 SpGistSearchItem *startEntry =
126 spgAllocSearchItem(so, isnull, so->zeroDistances);
127
128 ItemPointerSet(&startEntry->heapPtr,
129 isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
130 FirstOffsetNumber);
131 startEntry->isLeaf = false;
132 startEntry->level = 0;
133 startEntry->value = (Datum) 0;
134 startEntry->traversalValue = NULL;
135 startEntry->recheck = false;
136 startEntry->recheckDistances = false;
137
138 spgAddSearchItemToQueue(so, startEntry);
139 }
140
141 /*
142 * Initialize queue to search the root page, resetting
143 * any previously active scan
144 */
145 static void
146 resetSpGistScanOpaque(SpGistScanOpaque so)
147 {
148 MemoryContext oldCtx;
149
150 /*
151 * clear traversal context before proceeding to the next scan; this must
152 * not happen before the freeScanStack above, else we get double-free
153 * crashes.
154 */
155 MemoryContextReset(so->traversalCxt);
156
157 oldCtx = MemoryContextSwitchTo(so->traversalCxt);
158
159 /* initialize queue only for distance-ordered scans */
160 so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so);
161
162 if (so->searchNulls)
163 /* Add a work item to scan the null index entries */
164 spgAddStartItem(so, true);
165
166 if (so->searchNonNulls)
167 /* Add a work item to scan the non-null index entries */
168 spgAddStartItem(so, false);
169
170 MemoryContextSwitchTo(oldCtx);
171
172 if (so->numberOfOrderBys > 0)
173 {
174 /* Must pfree distances to avoid memory leak */
175 int i;
176
177 for (i = 0; i < so->nPtrs; i++)
178 if (so->distances[i])
179 pfree(so->distances[i]);
180 }
181
182 if (so->want_itup)
183 {
184 /* Must pfree reconstructed tuples to avoid memory leak */
185 int i;
186
187 for (i = 0; i < so->nPtrs; i++)
188 pfree(so->reconTups[i]);
189 }
190 so->iPtr = so->nPtrs = 0;
191 }
192
export_mut()193 /*
194 * Prepare scan keys in SpGistScanOpaque from caller-given scan keys
195 *
196 * Sets searchNulls, searchNonNulls, numberOfKeys, keyData fields of *so.
197 *
198 * The point here is to eliminate null-related considerations from what the
199 * opclass consistent functions need to deal with. We assume all SPGiST-
200 * indexable operators are strict, so any null RHS value makes the scan
201 * condition unsatisfiable. We also pull out any IS NULL/IS NOT NULL
202 * conditions; their effect is reflected into searchNulls/searchNonNulls.
203 */
204 static void
205 spgPrepareScanKeys(IndexScanDesc scan)
206 {
207 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
208 bool qual_ok;
209 bool haveIsNull;
210 bool haveNotNull;
211 int nkeys;
212 int i;
213
214 so->numberOfOrderBys = scan->numberOfOrderBys;
215 so->orderByData = scan->orderByData;
216
217 if (so->numberOfOrderBys <= 0)
218 so->numberOfNonNullOrderBys = 0;
219 else
220 {
221 int j = 0;
222
223 /*
224 * Remove all NULL keys, but remember their offsets in the original
225 * array.
226 */
227 for (i = 0; i < scan->numberOfOrderBys; i++)
228 {
229 ScanKey skey = &so->orderByData[i];
230
231 if (skey->sk_flags & SK_ISNULL)
232 so->nonNullOrderByOffsets[i] = -1;
233 else
234 {
235 if (i != j)
236 so->orderByData[j] = *skey;
237
238 so->nonNullOrderByOffsets[i] = j++;
239 }
240 }
241
242 so->numberOfNonNullOrderBys = j;
243 }
244
245 if (scan->numberOfKeys <= 0)
246 {
247 /* If no quals, whole-index scan is required */
248 so->searchNulls = true;
249 so->searchNonNulls = true;
250 so->numberOfKeys = 0;
251 return;
252 }
253
254 /* Examine the given quals */
255 qual_ok = true;
256 haveIsNull = haveNotNull = false;
257 nkeys = 0;
258 for (i = 0; i < scan->numberOfKeys; i++)
259 {
260 ScanKey skey = &scan->keyData[i];
261
262 if (skey->sk_flags & SK_SEARCHNULL)
263 haveIsNull = true;
264 else if (skey->sk_flags & SK_SEARCHNOTNULL)
265 haveNotNull = true;
266 else if (skey->sk_flags & SK_ISNULL)
267 {
268 /* ordinary qual with null argument - unsatisfiable */
269 qual_ok = false;
270 break;
271 }
272 else
273 {
274 /* ordinary qual, propagate into so->keyData */
275 so->keyData[nkeys++] = *skey;
276 /* this effectively creates a not-null requirement */
277 haveNotNull = true;
278 }
279 }
280
281 /* IS NULL in combination with something else is unsatisfiable */
282 if (haveIsNull && haveNotNull)
283 qual_ok = false;
284
285 /* Emit results */
286 if (qual_ok)
287 {
288 so->searchNulls = haveIsNull;
289 so->searchNonNulls = haveNotNull;
290 so->numberOfKeys = nkeys;
291 }
292 else
293 {
294 so->searchNulls = false;
295 so->searchNonNulls = false;
296 so->numberOfKeys = 0;
297 }
298 }
299
300 IndexScanDesc
301 spgbeginscan(Relation rel, int keysz, int orderbysz)
302 {
303 IndexScanDesc scan;
304 SpGistScanOpaque so;
305 int i;
306
307 scan = RelationGetIndexScan(rel, keysz, orderbysz);
308
309 so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
310 if (keysz > 0)
311 so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz);
312 else
313 so->keyData = NULL;
314 initSpGistState(&so->state, scan->indexRelation);
315
316 so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
317 "SP-GiST search temporary context",
318 ALLOCSET_DEFAULT_SIZES);
319 so->traversalCxt = AllocSetContextCreate(CurrentMemoryContext,
320 "SP-GiST traversal-value context",
321 ALLOCSET_DEFAULT_SIZES);
322
323 /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
324 so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
325
326 /* Allocate various arrays needed for order-by scans */
327 if (scan->numberOfOrderBys > 0)
328 {
329 /* This will be filled in spgrescan, but allocate the space here */
330 so->orderByTypes = (Oid *)
331 palloc(sizeof(Oid) * scan->numberOfOrderBys);
332 so->nonNullOrderByOffsets = (int *)
333 palloc(sizeof(int) * scan->numberOfOrderBys);
334
335 /* These arrays have constant contents, so we can fill them now */
336 so->zeroDistances = (double *)
337 palloc(sizeof(double) * scan->numberOfOrderBys);
338 so->infDistances = (double *)
339 palloc(sizeof(double) * scan->numberOfOrderBys);
340
341 for (i = 0; i < scan->numberOfOrderBys; i++)
342 {
343 so->zeroDistances[i] = 0.0;
344 so->infDistances[i] = get_float8_infinity();
345 }
346
347 scan->xs_orderbyvals = (Datum *)
348 palloc0(sizeof(Datum) * scan->numberOfOrderBys);
349 scan->xs_orderbynulls = (bool *)
350 palloc(sizeof(bool) * scan->numberOfOrderBys);
351 memset(scan->xs_orderbynulls, true,
352 sizeof(bool) * scan->numberOfOrderBys);
353 }
354
355 fmgr_info_copy(&so->innerConsistentFn,
356 index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
357 CurrentMemoryContext);
358
359 fmgr_info_copy(&so->leafConsistentFn,
360 index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
361 CurrentMemoryContext);
362
363 so->indexCollation = rel->rd_indcollation[0];
364
365 scan->opaque = so;
366
367 return scan;
368 }
369
370 void
371 spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
372 ScanKey orderbys, int norderbys)
373 {
374 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
375
376 /* copy scankeys into local storage */
377 if (scankey && scan->numberOfKeys > 0)
378 memmove(scan->keyData, scankey,
379 scan->numberOfKeys * sizeof(ScanKeyData));
380
381 /* initialize order-by data if needed */
382 if (orderbys && scan->numberOfOrderBys > 0)
383 {
384 int i;
385
386 memmove(scan->orderByData, orderbys,
387 scan->numberOfOrderBys * sizeof(ScanKeyData));
388
389 for (i = 0; i < scan->numberOfOrderBys; i++)
390 {
391 ScanKey skey = &scan->orderByData[i];
392
393 /*
394 * Look up the datatype returned by the original ordering
395 * operator. SP-GiST always uses a float8 for the distance
396 * function, but the ordering operator could be anything else.
397 *
398 * XXX: The distance function is only allowed to be lossy if the
399 * ordering operator's result type is float4 or float8. Otherwise
400 * we don't know how to return the distance to the executor. But
401 * we cannot check that here, as we won't know if the distance
402 * function is lossy until it returns *recheck = true for the
403 * first time.
404 */
405 so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
406 }
407 }
408
409 /* preprocess scankeys, set up the representation in *so */
410 spgPrepareScanKeys(scan);
411
412 /* set up starting queue entries */
413 resetSpGistScanOpaque(so);
414
415 /* count an indexscan for stats */
416 pgstat_count_index_scan(scan->indexRelation);
417 }
418
419 void
420 spgendscan(IndexScanDesc scan)
421 {
422 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
423
424 MemoryContextDelete(so->tempCxt);
425 MemoryContextDelete(so->traversalCxt);
426
427 if (so->keyData)
428 pfree(so->keyData);
429
430 if (so->state.deadTupleStorage)
431 pfree(so->state.deadTupleStorage);
432
433 if (scan->numberOfOrderBys > 0)
434 {
435 pfree(so->orderByTypes);
436 pfree(so->nonNullOrderByOffsets);
437 pfree(so->zeroDistances);
438 pfree(so->infDistances);
439 pfree(scan->xs_orderbyvals);
440 pfree(scan->xs_orderbynulls);
441 }
442
443 pfree(so);
444 }
445
446 /*
447 * Leaf SpGistSearchItem constructor, called in queue context
448 */
449 static SpGistSearchItem *
450 spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
451 Datum leafValue, bool recheck, bool recheckDistances,
452 bool isnull, double *distances)
453 {
454 SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
455
456 item->level = level;
457 item->heapPtr = *heapPtr;
458
459 /*
460 * If we need the reconstructed value, copy it to queue cxt out of tmp
461 * cxt. Caution: the leaf_consistent method may not have supplied a value
462 * if we didn't ask it to, and mildly-broken methods might supply one of
463 * the wrong type. Also, while the correct leafValue type is attType not
464 * leafType, pre-v14 Postgres versions have historically used attLeafType
465 * here; let's not confuse matters even more by changing that in a minor
466 * release.
467 */
468 if (so->want_itup)
469 item->value = isnull ? (Datum) 0 :
470 datumCopy(leafValue, so->state.attLeafType.attbyval,
471 so->state.attLeafType.attlen);
472 else
473 item->value = (Datum) 0;
474 item->traversalValue = NULL;
475 item->isLeaf = true;
476 item->recheck = recheck;
477 item->recheckDistances = recheckDistances;
478
479 return item;
480 }
481
482 /*
483 * Test whether a leaf tuple satisfies all the scan keys
484 *
485 * *reportedSome is set to true if:
486 * the scan is not ordered AND the item satisfies the scankeys
487 */
488 static bool
489 spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
490 SpGistLeafTuple leafTuple, bool isnull,
491 bool *reportedSome, storeRes_func storeRes)
492 {
493 Datum leafValue;
494 double *distances;
495 bool result;
496 bool recheck;
497 bool recheckDistances;
498
499 if (isnull)
500 {
501 /* Should not have arrived on a nulls page unless nulls are wanted */
502 Assert(so->searchNulls);
503 leafValue = (Datum) 0;
504 distances = NULL;
505 recheck = false;
506 recheckDistances = false;
507 result = true;
508 }
509 else
510 {
511 spgLeafConsistentIn in;
512 spgLeafConsistentOut out;
513
514 /* use temp context for calling leaf_consistent */
515 MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
516
517 in.scankeys = so->keyData;
518 in.nkeys = so->numberOfKeys;
519 in.orderbys = so->orderByData;
520 in.norderbys = so->numberOfNonNullOrderBys;
521 in.reconstructedValue = item->value;
522 in.traversalValue = item->traversalValue;
523 in.level = item->level;
524 in.returnData = so->want_itup;
525 in.leafDatum = SGLTDATUM(leafTuple, &so->state);
526
527 out.leafValue = (Datum) 0;
528 out.recheck = false;
529 out.distances = NULL;
530 out.recheckDistances = false;
531
532 result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
533 so->indexCollation,
534 PointerGetDatum(&in),
535 PointerGetDatum(&out)));
536 recheck = out.recheck;
537 recheckDistances = out.recheckDistances;
538 leafValue = out.leafValue;
539 distances = out.distances;
540
541 MemoryContextSwitchTo(oldCxt);
542 }
543
544 if (result)
545 {
546 /* item passes the scankeys */
547 if (so->numberOfNonNullOrderBys > 0)
548 {
549 /* the scan is ordered -> add the item to the queue */
550 MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
551 SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
552 &leafTuple->heapPtr,
553 leafValue,
554 recheck,
555 recheckDistances,
556 isnull,
557 distances);
558
559 spgAddSearchItemToQueue(so, heapItem);
560
561 MemoryContextSwitchTo(oldCxt);
562 }
563 else
564 {
565 /* non-ordered scan, so report the item right away */
566 Assert(!recheckDistances);
567 storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
568 recheck, false, NULL);
569 *reportedSome = true;
570 }
571 }
572
573 return result;
574 }
575
576 /* A bundle initializer for inner_consistent methods */
577 static void
578 spgInitInnerConsistentIn(spgInnerConsistentIn *in,
579 SpGistScanOpaque so,
580 SpGistSearchItem *item,
581 SpGistInnerTuple innerTuple)
582 {
583 in->scankeys = so->keyData;
584 in->orderbys = so->orderByData;
585 in->nkeys = so->numberOfKeys;
586 in->norderbys = so->numberOfNonNullOrderBys;
587 in->reconstructedValue = item->value;
588 in->traversalMemoryContext = so->traversalCxt;
589 in->traversalValue = item->traversalValue;
590 in->level = item->level;
591 in->returnData = so->want_itup;
592 in->allTheSame = innerTuple->allTheSame;
593 in->hasPrefix = (innerTuple->prefixSize > 0);
594 in->prefixDatum = SGITDATUM(innerTuple, &so->state);
595 in->nNodes = innerTuple->nNodes;
596 in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
597 }
598
599 static SpGistSearchItem *
600 spgMakeInnerItem(SpGistScanOpaque so,
601 SpGistSearchItem *parentItem,
602 SpGistNodeTuple tuple,
603 spgInnerConsistentOut *out, int i, bool isnull,
604 double *distances)
605 {
606 SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
607
608 item->heapPtr = tuple->t_tid;
609 item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
610 : parentItem->level;
611
612 /* Must copy value out of temp context */
613 item->value = out->reconstructedValues
614 ? datumCopy(out->reconstructedValues[i],
615 so->state.attLeafType.attbyval,
616 so->state.attLeafType.attlen)
617 : (Datum) 0;
618
619 /*
620 * Elements of out.traversalValues should be allocated in
621 * in.traversalMemoryContext, which is actually a long lived context of
622 * index scan.
623 */
624 item->traversalValue =
625 out->traversalValues ? out->traversalValues[i] : NULL;
626
627 item->isLeaf = false;
628 item->recheck = false;
629 item->recheckDistances = false;
630
631 return item;
632 }
633
634 static void
635 spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
636 SpGistInnerTuple innerTuple, bool isnull)
637 {
638 MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
639 spgInnerConsistentOut out;
640 int nNodes = innerTuple->nNodes;
641 int i;
642
643 memset(&out, 0, sizeof(out));
644
645 if (!isnull)
646 {
647 spgInnerConsistentIn in;
648
649 spgInitInnerConsistentIn(&in, so, item, innerTuple);
650
651 /* use user-defined inner consistent method */
652 FunctionCall2Coll(&so->innerConsistentFn,
653 so->indexCollation,
654 PointerGetDatum(&in),
655 PointerGetDatum(&out));
656 }
657 else
658 {
659 /* force all children to be visited */
660 out.nNodes = nNodes;
661 out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
662 for (i = 0; i < nNodes; i++)
663 out.nodeNumbers[i] = i;
664 }
665
666 /* If allTheSame, they should all or none of them match */
667 if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes)
668 elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
669
670 if (out.nNodes)
671 {
672 /* collect node pointers */
673 SpGistNodeTuple node;
674 SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc(
675 sizeof(SpGistNodeTuple) * nNodes);
676
677 SGITITERATE(innerTuple, i, node)
678 {
679 nodes[i] = node;
680 }
681
682 MemoryContextSwitchTo(so->traversalCxt);
683
684 for (i = 0; i < out.nNodes; i++)
685 {
686 int nodeN = out.nodeNumbers[i];
687 SpGistSearchItem *innerItem;
688 double *distances;
689
690 Assert(nodeN >= 0 && nodeN < nNodes);
691
692 node = nodes[nodeN];
693
694 if (!ItemPointerIsValid(&node->t_tid))
695 continue;
696
697 /*
698 * Use infinity distances if innerConsistent() failed to return
699 * them or if is a NULL item (their distances are really unused).
700 */
701 distances = out.distances ? out.distances[i] : so->infDistances;
702
703 innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull,
704 distances);
705
706 spgAddSearchItemToQueue(so, innerItem);
707 }
708 }
709
710 MemoryContextSwitchTo(oldCxt);
711 }
712
713 /* Returns a next item in an (ordered) scan or null if the index is exhausted */
714 static SpGistSearchItem *
715 spgGetNextQueueItem(SpGistScanOpaque so)
716 {
717 if (pairingheap_is_empty(so->scanQueue))
718 return NULL; /* Done when both heaps are empty */
719
720 /* Return item; caller is responsible to pfree it */
721 return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue);
722 }
723
724 enum SpGistSpecialOffsetNumbers
725 {
726 SpGistBreakOffsetNumber = InvalidOffsetNumber,
727 SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
728 SpGistErrorOffsetNumber = MaxOffsetNumber + 2
729 };
730
731 static OffsetNumber
732 spgTestLeafTuple(SpGistScanOpaque so,
733 SpGistSearchItem *item,
734 Page page, OffsetNumber offset,
735 bool isnull, bool isroot,
736 bool *reportedSome,
737 storeRes_func storeRes)
738 {
739 SpGistLeafTuple leafTuple = (SpGistLeafTuple)
740 PageGetItem(page, PageGetItemId(page, offset));
741
742 if (leafTuple->tupstate != SPGIST_LIVE)
743 {
744 if (!isroot) /* all tuples on root should be live */
745 {
746 if (leafTuple->tupstate == SPGIST_REDIRECT)
747 {
748 /* redirection tuple should be first in chain */
749 Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
750 /* transfer attention to redirect point */
751 item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer;
752 Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO);
753 return SpGistRedirectOffsetNumber;
754 }
755
756 if (leafTuple->tupstate == SPGIST_DEAD)
757 {
758 /* dead tuple should be first in chain */
759 Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
760 /* No live entries on this page */
761 Assert(leafTuple->nextOffset == InvalidOffsetNumber);
762 return SpGistBreakOffsetNumber;
763 }
764 }
765
766 /* We should not arrive at a placeholder */
767 elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
768 return SpGistErrorOffsetNumber;
769 }
770
771 Assert(ItemPointerIsValid(&leafTuple->heapPtr));
772
773 spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
774
775 return leafTuple->nextOffset;
776 }
777
778 /*
779 * Walk the tree and report all tuples passing the scan quals to the storeRes
780 * subroutine.
781 *
782 * If scanWholeIndex is true, we'll do just that. If not, we'll stop at the
783 * next page boundary once we have reported at least one tuple.
784 */
785 static void
786 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
787 storeRes_func storeRes, Snapshot snapshot)
788 {
789 Buffer buffer = InvalidBuffer;
790 bool reportedSome = false;
791
792 while (scanWholeIndex || !reportedSome)
793 {
794 SpGistSearchItem *item = spgGetNextQueueItem(so);
795
796 if (item == NULL)
797 break; /* No more items in queue -> done */
798
799 redirect:
800 /* Check for interrupts, just in case of infinite loop */
801 CHECK_FOR_INTERRUPTS();
802
803 if (item->isLeaf)
804 {
805 /* We store heap items in the queue only in case of ordered search */
806 Assert(so->numberOfNonNullOrderBys > 0);
807 storeRes(so, &item->heapPtr, item->value, item->isNull,
808 item->recheck, item->recheckDistances, item->distances);
809 reportedSome = true;
810 }
811 else
812 {
813 BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr);
814 OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr);
815 Page page;
816 bool isnull;
817
818 if (buffer == InvalidBuffer)
819 {
820 buffer = ReadBuffer(index, blkno);
821 LockBuffer(buffer, BUFFER_LOCK_SHARE);
822 }
823 else if (blkno != BufferGetBlockNumber(buffer))
824 {
825 UnlockReleaseBuffer(buffer);
826 buffer = ReadBuffer(index, blkno);
827 LockBuffer(buffer, BUFFER_LOCK_SHARE);
828 }
829
830 /* else new pointer points to the same page, no work needed */
831
832 page = BufferGetPage(buffer);
833 TestForOldSnapshot(snapshot, index, page);
834
835 isnull = SpGistPageStoresNulls(page) ? true : false;
836
837 if (SpGistPageIsLeaf(page))
838 {
839 /* Page is a leaf - that is, all it's tuples are heap items */
840 OffsetNumber max = PageGetMaxOffsetNumber(page);
841
842 if (SpGistBlockIsRoot(blkno))
843 {
844 /* When root is a leaf, examine all its tuples */
845 for (offset = FirstOffsetNumber; offset <= max; offset++)
846 (void) spgTestLeafTuple(so, item, page, offset,
847 isnull, true,
848 &reportedSome, storeRes);
849 }
850 else
851 {
852 /* Normal case: just examine the chain we arrived at */
853 while (offset != InvalidOffsetNumber)
854 {
855 Assert(offset >= FirstOffsetNumber && offset <= max);
856 offset = spgTestLeafTuple(so, item, page, offset,
857 isnull, false,
858 &reportedSome, storeRes);
859 if (offset == SpGistRedirectOffsetNumber)
860 goto redirect;
861 }
862 }
863 }
864 else /* page is inner */
865 {
866 SpGistInnerTuple innerTuple = (SpGistInnerTuple)
867 PageGetItem(page, PageGetItemId(page, offset));
868
869 if (innerTuple->tupstate != SPGIST_LIVE)
870 {
871 if (innerTuple->tupstate == SPGIST_REDIRECT)
872 {
873 /* transfer attention to redirect point */
874 item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer;
875 Assert(ItemPointerGetBlockNumber(&item->heapPtr) !=
876 SPGIST_METAPAGE_BLKNO);
877 goto redirect;
878 }
879 elog(ERROR, "unexpected SPGiST tuple state: %d",
880 innerTuple->tupstate);
881 }
882
883 spgInnerTest(so, item, innerTuple, isnull);
884 }
885 }
886
887 /* done with this scan item */
888 spgFreeSearchItem(so, item);
889 /* clear temp context before proceeding to the next one */
890 MemoryContextReset(so->tempCxt);
891 }
892
893 if (buffer != InvalidBuffer)
894 UnlockReleaseBuffer(buffer);
895 }
896
897
898 /* storeRes subroutine for getbitmap case */
899 static void
900 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
901 Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
902 double *distances)
903 {
904 Assert(!recheckDistances && !distances);
905 tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
906 so->ntids++;
907 }
908
909 int64
910 spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
911 {
912 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
913
914 /* Copy want_itup to *so so we don't need to pass it around separately */
915 so->want_itup = false;
916
917 so->tbm = tbm;
918 so->ntids = 0;
919
920 spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot);
921
922 return so->ntids;
923 }
924
925 /* storeRes subroutine for gettuple case */
926 static void
927 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
928 Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
929 double *nonNullDistances)
930 {
931 Assert(so->nPtrs < MaxIndexTuplesPerPage);
932 so->heapPtrs[so->nPtrs] = *heapPtr;
933 so->recheck[so->nPtrs] = recheck;
934 so->recheckDistances[so->nPtrs] = recheckDistances;
935
936 if (so->numberOfOrderBys > 0)
937 {
938 if (isnull || so->numberOfNonNullOrderBys <= 0)
939 so->distances[so->nPtrs] = NULL;
940 else
941 {
942 IndexOrderByDistance *distances =
943 palloc(sizeof(distances[0]) * so->numberOfOrderBys);
944 int i;
945
946 for (i = 0; i < so->numberOfOrderBys; i++)
947 {
948 int offset = so->nonNullOrderByOffsets[i];
949
950 if (offset >= 0)
951 {
952 /* Copy non-NULL distance value */
953 distances[i].value = nonNullDistances[offset];
954 distances[i].isnull = false;
955 }
956 else
957 {
958 /* Set distance's NULL flag. */
959 distances[i].value = 0.0;
960 distances[i].isnull = true;
961 }
962 }
963
964 so->distances[so->nPtrs] = distances;
965 }
966 }
967
968 if (so->want_itup)
969 {
970 /*
971 * Reconstruct index data. We have to copy the datum out of the temp
972 * context anyway, so we may as well create the tuple here.
973 */
974 so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
975 &leafValue,
976 &isnull);
977 }
978 so->nPtrs++;
979 }
980
981 bool
982 spggettuple(IndexScanDesc scan, ScanDirection dir)
983 {
984 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
985
986 if (dir != ForwardScanDirection)
987 elog(ERROR, "SP-GiST only supports forward scan direction");
988
989 /* Copy want_itup to *so so we don't need to pass it around separately */
990 so->want_itup = scan->xs_want_itup;
991
992 for (;;)
993 {
994 if (so->iPtr < so->nPtrs)
995 {
996 /* continuing to return reported tuples */
997 scan->xs_heaptid = so->heapPtrs[so->iPtr];
998 scan->xs_recheck = so->recheck[so->iPtr];
999 scan->xs_hitup = so->reconTups[so->iPtr];
1000
1001 if (so->numberOfOrderBys > 0)
1002 index_store_float8_orderby_distances(scan, so->orderByTypes,
1003 so->distances[so->iPtr],
1004 so->recheckDistances[so->iPtr]);
1005 so->iPtr++;
1006 return true;
1007 }
1008
1009 if (so->numberOfOrderBys > 0)
1010 {
1011 /* Must pfree distances to avoid memory leak */
1012 int i;
1013
1014 for (i = 0; i < so->nPtrs; i++)
1015 if (so->distances[i])
1016 pfree(so->distances[i]);
1017 }
1018
1019 if (so->want_itup)
1020 {
1021 /* Must pfree reconstructed tuples to avoid memory leak */
1022 int i;
1023
1024 for (i = 0; i < so->nPtrs; i++)
1025 pfree(so->reconTups[i]);
1026 }
1027 so->iPtr = so->nPtrs = 0;
1028
1029 spgWalk(scan->indexRelation, so, false, storeGettuple,
1030 scan->xs_snapshot);
1031
1032 if (so->nPtrs == 0)
1033 break; /* must have completed scan */
1034 }
1035
1036 return false;
1037 }
1038
1039 bool
1040 spgcanreturn(Relation index, int attno)
1041 {
1042 SpGistCache *cache;
1043
1044 /* We can do it if the opclass config function says so */
1045 cache = spgGetCache(index);
1046
1047 return cache->config.canReturnData;
1048 }
1049