1 /*-------------------------------------------------------------------------
2 *
3 * spgscan.c
4 * routines for scanning SP-GiST indexes
5 *
6 *
7 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgscan.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/relscan.h"
20 #include "access/spgist_private.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "storage/bufmgr.h"
24 #include "utils/datum.h"
25 #include "utils/float.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29
30 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
31 Datum leafValue, bool isNull, bool recheck,
32 bool recheckDistances, double *distances);
33
34 /*
35 * Pairing heap comparison function for the SpGistSearchItem queue.
36 * KNN-searches currently only support NULLS LAST. So, preserve this logic
37 * here.
38 */
39 static int
pairingheap_SpGistSearchItem_cmp(const pairingheap_node * a,const pairingheap_node * b,void * arg)40 pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
41 const pairingheap_node *b, void *arg)
42 {
43 const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
44 const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
45 SpGistScanOpaque so = (SpGistScanOpaque) arg;
46 int i;
47
48 if (sa->isNull)
49 {
50 if (!sb->isNull)
51 return -1;
52 }
53 else if (sb->isNull)
54 {
55 return 1;
56 }
57 else
58 {
59 /* Order according to distance comparison */
60 for (i = 0; i < so->numberOfNonNullOrderBys; i++)
61 {
62 if (isnan(sa->distances[i]) && isnan(sb->distances[i]))
63 continue; /* NaN == NaN */
64 if (isnan(sa->distances[i]))
65 return -1; /* NaN > number */
66 if (isnan(sb->distances[i]))
67 return 1; /* number < NaN */
68 if (sa->distances[i] != sb->distances[i])
69 return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
70 }
71 }
72
73 /* Leaf items go before inner pages, to ensure a depth-first search */
74 if (sa->isLeaf && !sb->isLeaf)
75 return 1;
76 if (!sa->isLeaf && sb->isLeaf)
77 return -1;
78
79 return 0;
80 }
81
82 static void
spgFreeSearchItem(SpGistScanOpaque so,SpGistSearchItem * item)83 spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
84 {
85 if (!so->state.attLeafType.attbyval &&
86 DatumGetPointer(item->value) != NULL)
87 pfree(DatumGetPointer(item->value));
88
89 if (item->traversalValue)
90 pfree(item->traversalValue);
91
92 pfree(item);
93 }
94
95 /*
96 * Add SpGistSearchItem to queue
97 *
98 * Called in queue context
99 */
100 static void
spgAddSearchItemToQueue(SpGistScanOpaque so,SpGistSearchItem * item)101 spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item)
102 {
103 pairingheap_add(so->scanQueue, &item->phNode);
104 }
105
106 static SpGistSearchItem *
spgAllocSearchItem(SpGistScanOpaque so,bool isnull,double * distances)107 spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances)
108 {
109 /* allocate distance array only for non-NULL items */
110 SpGistSearchItem *item =
111 palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfNonNullOrderBys));
112
113 item->isNull = isnull;
114
115 if (!isnull && so->numberOfNonNullOrderBys > 0)
116 memcpy(item->distances, distances,
117 sizeof(item->distances[0]) * so->numberOfNonNullOrderBys);
118
119 return item;
120 }
121
122 static void
spgAddStartItem(SpGistScanOpaque so,bool isnull)123 spgAddStartItem(SpGistScanOpaque so, bool isnull)
124 {
125 SpGistSearchItem *startEntry =
126 spgAllocSearchItem(so, isnull, so->zeroDistances);
127
128 ItemPointerSet(&startEntry->heapPtr,
129 isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
130 FirstOffsetNumber);
131 startEntry->isLeaf = false;
132 startEntry->level = 0;
133 startEntry->value = (Datum) 0;
134 startEntry->traversalValue = NULL;
135 startEntry->recheck = false;
136 startEntry->recheckDistances = false;
137
138 spgAddSearchItemToQueue(so, startEntry);
139 }
140
141 /*
142 * Initialize queue to search the root page, resetting
143 * any previously active scan
144 */
145 static void
resetSpGistScanOpaque(SpGistScanOpaque so)146 resetSpGistScanOpaque(SpGistScanOpaque so)
147 {
148 MemoryContext oldCtx;
149
150 MemoryContextReset(so->traversalCxt);
151
152 oldCtx = MemoryContextSwitchTo(so->traversalCxt);
153
154 /* initialize queue only for distance-ordered scans */
155 so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so);
156
157 if (so->searchNulls)
158 /* Add a work item to scan the null index entries */
159 spgAddStartItem(so, true);
160
161 if (so->searchNonNulls)
162 /* Add a work item to scan the non-null index entries */
163 spgAddStartItem(so, false);
164
165 MemoryContextSwitchTo(oldCtx);
166
167 if (so->numberOfOrderBys > 0)
168 {
169 /* Must pfree distances to avoid memory leak */
170 int i;
171
172 for (i = 0; i < so->nPtrs; i++)
173 if (so->distances[i])
174 pfree(so->distances[i]);
175 }
176
177 if (so->want_itup)
178 {
179 /* Must pfree reconstructed tuples to avoid memory leak */
180 int i;
181
182 for (i = 0; i < so->nPtrs; i++)
183 pfree(so->reconTups[i]);
184 }
185 so->iPtr = so->nPtrs = 0;
186 }
187
188 /*
189 * Prepare scan keys in SpGistScanOpaque from caller-given scan keys
190 *
191 * Sets searchNulls, searchNonNulls, numberOfKeys, keyData fields of *so.
192 *
193 * The point here is to eliminate null-related considerations from what the
194 * opclass consistent functions need to deal with. We assume all SPGiST-
195 * indexable operators are strict, so any null RHS value makes the scan
196 * condition unsatisfiable. We also pull out any IS NULL/IS NOT NULL
197 * conditions; their effect is reflected into searchNulls/searchNonNulls.
198 */
199 static void
spgPrepareScanKeys(IndexScanDesc scan)200 spgPrepareScanKeys(IndexScanDesc scan)
201 {
202 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
203 bool qual_ok;
204 bool haveIsNull;
205 bool haveNotNull;
206 int nkeys;
207 int i;
208
209 so->numberOfOrderBys = scan->numberOfOrderBys;
210 so->orderByData = scan->orderByData;
211
212 if (so->numberOfOrderBys <= 0)
213 so->numberOfNonNullOrderBys = 0;
214 else
215 {
216 int j = 0;
217
218 /*
219 * Remove all NULL keys, but remember their offsets in the original
220 * array.
221 */
222 for (i = 0; i < scan->numberOfOrderBys; i++)
223 {
224 ScanKey skey = &so->orderByData[i];
225
226 if (skey->sk_flags & SK_ISNULL)
227 so->nonNullOrderByOffsets[i] = -1;
228 else
229 {
230 if (i != j)
231 so->orderByData[j] = *skey;
232
233 so->nonNullOrderByOffsets[i] = j++;
234 }
235 }
236
237 so->numberOfNonNullOrderBys = j;
238 }
239
240 if (scan->numberOfKeys <= 0)
241 {
242 /* If no quals, whole-index scan is required */
243 so->searchNulls = true;
244 so->searchNonNulls = true;
245 so->numberOfKeys = 0;
246 return;
247 }
248
249 /* Examine the given quals */
250 qual_ok = true;
251 haveIsNull = haveNotNull = false;
252 nkeys = 0;
253 for (i = 0; i < scan->numberOfKeys; i++)
254 {
255 ScanKey skey = &scan->keyData[i];
256
257 if (skey->sk_flags & SK_SEARCHNULL)
258 haveIsNull = true;
259 else if (skey->sk_flags & SK_SEARCHNOTNULL)
260 haveNotNull = true;
261 else if (skey->sk_flags & SK_ISNULL)
262 {
263 /* ordinary qual with null argument - unsatisfiable */
264 qual_ok = false;
265 break;
266 }
267 else
268 {
269 /* ordinary qual, propagate into so->keyData */
270 so->keyData[nkeys++] = *skey;
271 /* this effectively creates a not-null requirement */
272 haveNotNull = true;
273 }
274 }
275
276 /* IS NULL in combination with something else is unsatisfiable */
277 if (haveIsNull && haveNotNull)
278 qual_ok = false;
279
280 /* Emit results */
281 if (qual_ok)
282 {
283 so->searchNulls = haveIsNull;
284 so->searchNonNulls = haveNotNull;
285 so->numberOfKeys = nkeys;
286 }
287 else
288 {
289 so->searchNulls = false;
290 so->searchNonNulls = false;
291 so->numberOfKeys = 0;
292 }
293 }
294
295 IndexScanDesc
spgbeginscan(Relation rel,int keysz,int orderbysz)296 spgbeginscan(Relation rel, int keysz, int orderbysz)
297 {
298 IndexScanDesc scan;
299 SpGistScanOpaque so;
300 int i;
301
302 scan = RelationGetIndexScan(rel, keysz, orderbysz);
303
304 so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
305 if (keysz > 0)
306 so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz);
307 else
308 so->keyData = NULL;
309 initSpGistState(&so->state, scan->indexRelation);
310
311 so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
312 "SP-GiST search temporary context",
313 ALLOCSET_DEFAULT_SIZES);
314 so->traversalCxt = AllocSetContextCreate(CurrentMemoryContext,
315 "SP-GiST traversal-value context",
316 ALLOCSET_DEFAULT_SIZES);
317
318 /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
319 so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
320
321 /* Allocate various arrays needed for order-by scans */
322 if (scan->numberOfOrderBys > 0)
323 {
324 /* This will be filled in spgrescan, but allocate the space here */
325 so->orderByTypes = (Oid *)
326 palloc(sizeof(Oid) * scan->numberOfOrderBys);
327 so->nonNullOrderByOffsets = (int *)
328 palloc(sizeof(int) * scan->numberOfOrderBys);
329
330 /* These arrays have constant contents, so we can fill them now */
331 so->zeroDistances = (double *)
332 palloc(sizeof(double) * scan->numberOfOrderBys);
333 so->infDistances = (double *)
334 palloc(sizeof(double) * scan->numberOfOrderBys);
335
336 for (i = 0; i < scan->numberOfOrderBys; i++)
337 {
338 so->zeroDistances[i] = 0.0;
339 so->infDistances[i] = get_float8_infinity();
340 }
341
342 scan->xs_orderbyvals = (Datum *)
343 palloc0(sizeof(Datum) * scan->numberOfOrderBys);
344 scan->xs_orderbynulls = (bool *)
345 palloc(sizeof(bool) * scan->numberOfOrderBys);
346 memset(scan->xs_orderbynulls, true,
347 sizeof(bool) * scan->numberOfOrderBys);
348 }
349
350 fmgr_info_copy(&so->innerConsistentFn,
351 index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
352 CurrentMemoryContext);
353
354 fmgr_info_copy(&so->leafConsistentFn,
355 index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
356 CurrentMemoryContext);
357
358 so->indexCollation = rel->rd_indcollation[0];
359
360 scan->opaque = so;
361
362 return scan;
363 }
364
365 void
spgrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)366 spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
367 ScanKey orderbys, int norderbys)
368 {
369 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
370
371 /* copy scankeys into local storage */
372 if (scankey && scan->numberOfKeys > 0)
373 memmove(scan->keyData, scankey,
374 scan->numberOfKeys * sizeof(ScanKeyData));
375
376 /* initialize order-by data if needed */
377 if (orderbys && scan->numberOfOrderBys > 0)
378 {
379 int i;
380
381 memmove(scan->orderByData, orderbys,
382 scan->numberOfOrderBys * sizeof(ScanKeyData));
383
384 for (i = 0; i < scan->numberOfOrderBys; i++)
385 {
386 ScanKey skey = &scan->orderByData[i];
387
388 /*
389 * Look up the datatype returned by the original ordering
390 * operator. SP-GiST always uses a float8 for the distance
391 * function, but the ordering operator could be anything else.
392 *
393 * XXX: The distance function is only allowed to be lossy if the
394 * ordering operator's result type is float4 or float8. Otherwise
395 * we don't know how to return the distance to the executor. But
396 * we cannot check that here, as we won't know if the distance
397 * function is lossy until it returns *recheck = true for the
398 * first time.
399 */
400 so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
401 }
402 }
403
404 /* preprocess scankeys, set up the representation in *so */
405 spgPrepareScanKeys(scan);
406
407 /* set up starting queue entries */
408 resetSpGistScanOpaque(so);
409
410 /* count an indexscan for stats */
411 pgstat_count_index_scan(scan->indexRelation);
412 }
413
414 void
spgendscan(IndexScanDesc scan)415 spgendscan(IndexScanDesc scan)
416 {
417 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
418
419 MemoryContextDelete(so->tempCxt);
420 MemoryContextDelete(so->traversalCxt);
421
422 if (so->keyData)
423 pfree(so->keyData);
424
425 if (so->state.deadTupleStorage)
426 pfree(so->state.deadTupleStorage);
427
428 if (scan->numberOfOrderBys > 0)
429 {
430 pfree(so->orderByTypes);
431 pfree(so->nonNullOrderByOffsets);
432 pfree(so->zeroDistances);
433 pfree(so->infDistances);
434 pfree(scan->xs_orderbyvals);
435 pfree(scan->xs_orderbynulls);
436 }
437
438 pfree(so);
439 }
440
441 /*
442 * Leaf SpGistSearchItem constructor, called in queue context
443 */
444 static SpGistSearchItem *
spgNewHeapItem(SpGistScanOpaque so,int level,ItemPointer heapPtr,Datum leafValue,bool recheck,bool recheckDistances,bool isnull,double * distances)445 spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
446 Datum leafValue, bool recheck, bool recheckDistances,
447 bool isnull, double *distances)
448 {
449 SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
450
451 item->level = level;
452 item->heapPtr = *heapPtr;
453
454 /*
455 * If we need the reconstructed value, copy it to queue cxt out of tmp
456 * cxt. Caution: the leaf_consistent method may not have supplied a value
457 * if we didn't ask it to, and mildly-broken methods might supply one of
458 * the wrong type. Also, while the correct leafValue type is attType not
459 * leafType, pre-v14 Postgres versions have historically used attLeafType
460 * here; let's not confuse matters even more by changing that in a minor
461 * release.
462 */
463 if (so->want_itup)
464 item->value = isnull ? (Datum) 0 :
465 datumCopy(leafValue, so->state.attLeafType.attbyval,
466 so->state.attLeafType.attlen);
467 else
468 item->value = (Datum) 0;
469 item->traversalValue = NULL;
470 item->isLeaf = true;
471 item->recheck = recheck;
472 item->recheckDistances = recheckDistances;
473
474 return item;
475 }
476
477 /*
478 * Test whether a leaf tuple satisfies all the scan keys
479 *
480 * *reportedSome is set to true if:
481 * the scan is not ordered AND the item satisfies the scankeys
482 */
483 static bool
spgLeafTest(SpGistScanOpaque so,SpGistSearchItem * item,SpGistLeafTuple leafTuple,bool isnull,bool * reportedSome,storeRes_func storeRes)484 spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
485 SpGistLeafTuple leafTuple, bool isnull,
486 bool *reportedSome, storeRes_func storeRes)
487 {
488 Datum leafValue;
489 double *distances;
490 bool result;
491 bool recheck;
492 bool recheckDistances;
493
494 if (isnull)
495 {
496 /* Should not have arrived on a nulls page unless nulls are wanted */
497 Assert(so->searchNulls);
498 leafValue = (Datum) 0;
499 distances = NULL;
500 recheck = false;
501 recheckDistances = false;
502 result = true;
503 }
504 else
505 {
506 spgLeafConsistentIn in;
507 spgLeafConsistentOut out;
508
509 /* use temp context for calling leaf_consistent */
510 MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
511
512 in.scankeys = so->keyData;
513 in.nkeys = so->numberOfKeys;
514 in.orderbys = so->orderByData;
515 in.norderbys = so->numberOfNonNullOrderBys;
516 in.reconstructedValue = item->value;
517 in.traversalValue = item->traversalValue;
518 in.level = item->level;
519 in.returnData = so->want_itup;
520 in.leafDatum = SGLTDATUM(leafTuple, &so->state);
521
522 out.leafValue = (Datum) 0;
523 out.recheck = false;
524 out.distances = NULL;
525 out.recheckDistances = false;
526
527 result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
528 so->indexCollation,
529 PointerGetDatum(&in),
530 PointerGetDatum(&out)));
531 recheck = out.recheck;
532 recheckDistances = out.recheckDistances;
533 leafValue = out.leafValue;
534 distances = out.distances;
535
536 MemoryContextSwitchTo(oldCxt);
537 }
538
539 if (result)
540 {
541 /* item passes the scankeys */
542 if (so->numberOfNonNullOrderBys > 0)
543 {
544 /* the scan is ordered -> add the item to the queue */
545 MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
546 SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
547 &leafTuple->heapPtr,
548 leafValue,
549 recheck,
550 recheckDistances,
551 isnull,
552 distances);
553
554 spgAddSearchItemToQueue(so, heapItem);
555
556 MemoryContextSwitchTo(oldCxt);
557 }
558 else
559 {
560 /* non-ordered scan, so report the item right away */
561 Assert(!recheckDistances);
562 storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
563 recheck, false, NULL);
564 *reportedSome = true;
565 }
566 }
567
568 return result;
569 }
570
571 /* A bundle initializer for inner_consistent methods */
572 static void
spgInitInnerConsistentIn(spgInnerConsistentIn * in,SpGistScanOpaque so,SpGistSearchItem * item,SpGistInnerTuple innerTuple)573 spgInitInnerConsistentIn(spgInnerConsistentIn *in,
574 SpGistScanOpaque so,
575 SpGistSearchItem *item,
576 SpGistInnerTuple innerTuple)
577 {
578 in->scankeys = so->keyData;
579 in->orderbys = so->orderByData;
580 in->nkeys = so->numberOfKeys;
581 in->norderbys = so->numberOfNonNullOrderBys;
582 in->reconstructedValue = item->value;
583 in->traversalMemoryContext = so->traversalCxt;
584 in->traversalValue = item->traversalValue;
585 in->level = item->level;
586 in->returnData = so->want_itup;
587 in->allTheSame = innerTuple->allTheSame;
588 in->hasPrefix = (innerTuple->prefixSize > 0);
589 in->prefixDatum = SGITDATUM(innerTuple, &so->state);
590 in->nNodes = innerTuple->nNodes;
591 in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
592 }
593
594 static SpGistSearchItem *
spgMakeInnerItem(SpGistScanOpaque so,SpGistSearchItem * parentItem,SpGistNodeTuple tuple,spgInnerConsistentOut * out,int i,bool isnull,double * distances)595 spgMakeInnerItem(SpGistScanOpaque so,
596 SpGistSearchItem *parentItem,
597 SpGistNodeTuple tuple,
598 spgInnerConsistentOut *out, int i, bool isnull,
599 double *distances)
600 {
601 SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
602
603 item->heapPtr = tuple->t_tid;
604 item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
605 : parentItem->level;
606
607 /* Must copy value out of temp context */
608 item->value = out->reconstructedValues
609 ? datumCopy(out->reconstructedValues[i],
610 so->state.attLeafType.attbyval,
611 so->state.attLeafType.attlen)
612 : (Datum) 0;
613
614 /*
615 * Elements of out.traversalValues should be allocated in
616 * in.traversalMemoryContext, which is actually a long lived context of
617 * index scan.
618 */
619 item->traversalValue =
620 out->traversalValues ? out->traversalValues[i] : NULL;
621
622 item->isLeaf = false;
623 item->recheck = false;
624 item->recheckDistances = false;
625
626 return item;
627 }
628
629 static void
spgInnerTest(SpGistScanOpaque so,SpGistSearchItem * item,SpGistInnerTuple innerTuple,bool isnull)630 spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
631 SpGistInnerTuple innerTuple, bool isnull)
632 {
633 MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
634 spgInnerConsistentOut out;
635 int nNodes = innerTuple->nNodes;
636 int i;
637
638 memset(&out, 0, sizeof(out));
639
640 if (!isnull)
641 {
642 spgInnerConsistentIn in;
643
644 spgInitInnerConsistentIn(&in, so, item, innerTuple);
645
646 /* use user-defined inner consistent method */
647 FunctionCall2Coll(&so->innerConsistentFn,
648 so->indexCollation,
649 PointerGetDatum(&in),
650 PointerGetDatum(&out));
651 }
652 else
653 {
654 /* force all children to be visited */
655 out.nNodes = nNodes;
656 out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
657 for (i = 0; i < nNodes; i++)
658 out.nodeNumbers[i] = i;
659 }
660
661 /* If allTheSame, they should all or none of them match */
662 if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes)
663 elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
664
665 if (out.nNodes)
666 {
667 /* collect node pointers */
668 SpGistNodeTuple node;
669 SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * nNodes);
670
671 SGITITERATE(innerTuple, i, node)
672 {
673 nodes[i] = node;
674 }
675
676 MemoryContextSwitchTo(so->traversalCxt);
677
678 for (i = 0; i < out.nNodes; i++)
679 {
680 int nodeN = out.nodeNumbers[i];
681 SpGistSearchItem *innerItem;
682 double *distances;
683
684 Assert(nodeN >= 0 && nodeN < nNodes);
685
686 node = nodes[nodeN];
687
688 if (!ItemPointerIsValid(&node->t_tid))
689 continue;
690
691 /*
692 * Use infinity distances if innerConsistentFn() failed to return
693 * them or if is a NULL item (their distances are really unused).
694 */
695 distances = out.distances ? out.distances[i] : so->infDistances;
696
697 innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull,
698 distances);
699
700 spgAddSearchItemToQueue(so, innerItem);
701 }
702 }
703
704 MemoryContextSwitchTo(oldCxt);
705 }
706
707 /* Returns a next item in an (ordered) scan or null if the index is exhausted */
708 static SpGistSearchItem *
spgGetNextQueueItem(SpGistScanOpaque so)709 spgGetNextQueueItem(SpGistScanOpaque so)
710 {
711 if (pairingheap_is_empty(so->scanQueue))
712 return NULL; /* Done when both heaps are empty */
713
714 /* Return item; caller is responsible to pfree it */
715 return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue);
716 }
717
718 enum SpGistSpecialOffsetNumbers
719 {
720 SpGistBreakOffsetNumber = InvalidOffsetNumber,
721 SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
722 SpGistErrorOffsetNumber = MaxOffsetNumber + 2
723 };
724
725 static OffsetNumber
spgTestLeafTuple(SpGistScanOpaque so,SpGistSearchItem * item,Page page,OffsetNumber offset,bool isnull,bool isroot,bool * reportedSome,storeRes_func storeRes)726 spgTestLeafTuple(SpGistScanOpaque so,
727 SpGistSearchItem *item,
728 Page page, OffsetNumber offset,
729 bool isnull, bool isroot,
730 bool *reportedSome,
731 storeRes_func storeRes)
732 {
733 SpGistLeafTuple leafTuple = (SpGistLeafTuple)
734 PageGetItem(page, PageGetItemId(page, offset));
735
736 if (leafTuple->tupstate != SPGIST_LIVE)
737 {
738 if (!isroot) /* all tuples on root should be live */
739 {
740 if (leafTuple->tupstate == SPGIST_REDIRECT)
741 {
742 /* redirection tuple should be first in chain */
743 Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
744 /* transfer attention to redirect point */
745 item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer;
746 Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO);
747 return SpGistRedirectOffsetNumber;
748 }
749
750 if (leafTuple->tupstate == SPGIST_DEAD)
751 {
752 /* dead tuple should be first in chain */
753 Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
754 /* No live entries on this page */
755 Assert(leafTuple->nextOffset == InvalidOffsetNumber);
756 return SpGistBreakOffsetNumber;
757 }
758 }
759
760 /* We should not arrive at a placeholder */
761 elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
762 return SpGistErrorOffsetNumber;
763 }
764
765 Assert(ItemPointerIsValid(&leafTuple->heapPtr));
766
767 spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
768
769 return leafTuple->nextOffset;
770 }
771
772 /*
773 * Walk the tree and report all tuples passing the scan quals to the storeRes
774 * subroutine.
775 *
776 * If scanWholeIndex is true, we'll do just that. If not, we'll stop at the
777 * next page boundary once we have reported at least one tuple.
778 */
779 static void
spgWalk(Relation index,SpGistScanOpaque so,bool scanWholeIndex,storeRes_func storeRes,Snapshot snapshot)780 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
781 storeRes_func storeRes, Snapshot snapshot)
782 {
783 Buffer buffer = InvalidBuffer;
784 bool reportedSome = false;
785
786 while (scanWholeIndex || !reportedSome)
787 {
788 SpGistSearchItem *item = spgGetNextQueueItem(so);
789
790 if (item == NULL)
791 break; /* No more items in queue -> done */
792
793 redirect:
794 /* Check for interrupts, just in case of infinite loop */
795 CHECK_FOR_INTERRUPTS();
796
797 if (item->isLeaf)
798 {
799 /* We store heap items in the queue only in case of ordered search */
800 Assert(so->numberOfNonNullOrderBys > 0);
801 storeRes(so, &item->heapPtr, item->value, item->isNull,
802 item->recheck, item->recheckDistances, item->distances);
803 reportedSome = true;
804 }
805 else
806 {
807 BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr);
808 OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr);
809 Page page;
810 bool isnull;
811
812 if (buffer == InvalidBuffer)
813 {
814 buffer = ReadBuffer(index, blkno);
815 LockBuffer(buffer, BUFFER_LOCK_SHARE);
816 }
817 else if (blkno != BufferGetBlockNumber(buffer))
818 {
819 UnlockReleaseBuffer(buffer);
820 buffer = ReadBuffer(index, blkno);
821 LockBuffer(buffer, BUFFER_LOCK_SHARE);
822 }
823
824 /* else new pointer points to the same page, no work needed */
825
826 page = BufferGetPage(buffer);
827 TestForOldSnapshot(snapshot, index, page);
828
829 isnull = SpGistPageStoresNulls(page) ? true : false;
830
831 if (SpGistPageIsLeaf(page))
832 {
833 /* Page is a leaf - that is, all it's tuples are heap items */
834 OffsetNumber max = PageGetMaxOffsetNumber(page);
835
836 if (SpGistBlockIsRoot(blkno))
837 {
838 /* When root is a leaf, examine all its tuples */
839 for (offset = FirstOffsetNumber; offset <= max; offset++)
840 (void) spgTestLeafTuple(so, item, page, offset,
841 isnull, true,
842 &reportedSome, storeRes);
843 }
844 else
845 {
846 /* Normal case: just examine the chain we arrived at */
847 while (offset != InvalidOffsetNumber)
848 {
849 Assert(offset >= FirstOffsetNumber && offset <= max);
850 offset = spgTestLeafTuple(so, item, page, offset,
851 isnull, false,
852 &reportedSome, storeRes);
853 if (offset == SpGistRedirectOffsetNumber)
854 goto redirect;
855 }
856 }
857 }
858 else /* page is inner */
859 {
860 SpGistInnerTuple innerTuple = (SpGistInnerTuple)
861 PageGetItem(page, PageGetItemId(page, offset));
862
863 if (innerTuple->tupstate != SPGIST_LIVE)
864 {
865 if (innerTuple->tupstate == SPGIST_REDIRECT)
866 {
867 /* transfer attention to redirect point */
868 item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer;
869 Assert(ItemPointerGetBlockNumber(&item->heapPtr) !=
870 SPGIST_METAPAGE_BLKNO);
871 goto redirect;
872 }
873 elog(ERROR, "unexpected SPGiST tuple state: %d",
874 innerTuple->tupstate);
875 }
876
877 spgInnerTest(so, item, innerTuple, isnull);
878 }
879 }
880
881 /* done with this scan item */
882 spgFreeSearchItem(so, item);
883 /* clear temp context before proceeding to the next one */
884 MemoryContextReset(so->tempCxt);
885 }
886
887 if (buffer != InvalidBuffer)
888 UnlockReleaseBuffer(buffer);
889 }
890
891
892 /* storeRes subroutine for getbitmap case */
893 static void
storeBitmap(SpGistScanOpaque so,ItemPointer heapPtr,Datum leafValue,bool isnull,bool recheck,bool recheckDistances,double * distances)894 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
895 Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
896 double *distances)
897 {
898 Assert(!recheckDistances && !distances);
899 tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
900 so->ntids++;
901 }
902
903 int64
spggetbitmap(IndexScanDesc scan,TIDBitmap * tbm)904 spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
905 {
906 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
907
908 /* Copy want_itup to *so so we don't need to pass it around separately */
909 so->want_itup = false;
910
911 so->tbm = tbm;
912 so->ntids = 0;
913
914 spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot);
915
916 return so->ntids;
917 }
918
919 /* storeRes subroutine for gettuple case */
920 static void
storeGettuple(SpGistScanOpaque so,ItemPointer heapPtr,Datum leafValue,bool isnull,bool recheck,bool recheckDistances,double * nonNullDistances)921 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
922 Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
923 double *nonNullDistances)
924 {
925 Assert(so->nPtrs < MaxIndexTuplesPerPage);
926 so->heapPtrs[so->nPtrs] = *heapPtr;
927 so->recheck[so->nPtrs] = recheck;
928 so->recheckDistances[so->nPtrs] = recheckDistances;
929
930 if (so->numberOfOrderBys > 0)
931 {
932 if (isnull || so->numberOfNonNullOrderBys <= 0)
933 so->distances[so->nPtrs] = NULL;
934 else
935 {
936 IndexOrderByDistance *distances =
937 palloc(sizeof(distances[0]) * so->numberOfOrderBys);
938 int i;
939
940 for (i = 0; i < so->numberOfOrderBys; i++)
941 {
942 int offset = so->nonNullOrderByOffsets[i];
943
944 if (offset >= 0)
945 {
946 /* Copy non-NULL distance value */
947 distances[i].value = nonNullDistances[offset];
948 distances[i].isnull = false;
949 }
950 else
951 {
952 /* Set distance's NULL flag. */
953 distances[i].value = 0.0;
954 distances[i].isnull = true;
955 }
956 }
957
958 so->distances[so->nPtrs] = distances;
959 }
960 }
961
962 if (so->want_itup)
963 {
964 /*
965 * Reconstruct index data. We have to copy the datum out of the temp
966 * context anyway, so we may as well create the tuple here.
967 */
968 so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
969 &leafValue,
970 &isnull);
971 }
972 so->nPtrs++;
973 }
974
975 bool
spggettuple(IndexScanDesc scan,ScanDirection dir)976 spggettuple(IndexScanDesc scan, ScanDirection dir)
977 {
978 SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
979
980 if (dir != ForwardScanDirection)
981 elog(ERROR, "SP-GiST only supports forward scan direction");
982
983 /* Copy want_itup to *so so we don't need to pass it around separately */
984 so->want_itup = scan->xs_want_itup;
985
986 for (;;)
987 {
988 if (so->iPtr < so->nPtrs)
989 {
990 /* continuing to return reported tuples */
991 scan->xs_heaptid = so->heapPtrs[so->iPtr];
992 scan->xs_recheck = so->recheck[so->iPtr];
993 scan->xs_hitup = so->reconTups[so->iPtr];
994
995 if (so->numberOfOrderBys > 0)
996 index_store_float8_orderby_distances(scan, so->orderByTypes,
997 so->distances[so->iPtr],
998 so->recheckDistances[so->iPtr]);
999 so->iPtr++;
1000 return true;
1001 }
1002
1003 if (so->numberOfOrderBys > 0)
1004 {
1005 /* Must pfree distances to avoid memory leak */
1006 int i;
1007
1008 for (i = 0; i < so->nPtrs; i++)
1009 if (so->distances[i])
1010 pfree(so->distances[i]);
1011 }
1012
1013 if (so->want_itup)
1014 {
1015 /* Must pfree reconstructed tuples to avoid memory leak */
1016 int i;
1017
1018 for (i = 0; i < so->nPtrs; i++)
1019 pfree(so->reconTups[i]);
1020 }
1021 so->iPtr = so->nPtrs = 0;
1022
1023 spgWalk(scan->indexRelation, so, false, storeGettuple,
1024 scan->xs_snapshot);
1025
1026 if (so->nPtrs == 0)
1027 break; /* must have completed scan */
1028 }
1029
1030 return false;
1031 }
1032
1033 bool
spgcanreturn(Relation index,int attno)1034 spgcanreturn(Relation index, int attno)
1035 {
1036 SpGistCache *cache;
1037
1038 /* We can do it if the opclass config function says so */
1039 cache = spgGetCache(index);
1040
1041 return cache->config.canReturnData;
1042 }
1043