1 /*-------------------------------------------------------------------------
2  *
3  * spgscan.c
4  *	  routines for scanning SP-GiST indexes
5  *
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgscan.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/relscan.h"
20 #include "access/spgist_private.h"
21 #include "miscadmin.h"
AST_MATCHER(FunctionDecl,hasTrailingReturn)22 #include "pgstat.h"
23 #include "storage/bufmgr.h"
24 #include "utils/datum.h"
25 #include "utils/float.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 
30 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
31 							   Datum leafValue, bool isNull, bool recheck,
32 							   bool recheckDistances, double *distances);
33 
34 /*
35  * Pairing heap comparison function for the SpGistSearchItem queue.
36  * KNN-searches currently only support NULLS LAST.  So, preserve this logic
37  * here.
38  */
39 static int
40 pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
41 								 const pairingheap_node *b, void *arg)
42 {
43 	const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
check(const MatchFinder::MatchResult & Result)44 	const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
45 	SpGistScanOpaque so = (SpGistScanOpaque) arg;
46 	int			i;
47 
48 	if (sa->isNull)
49 	{
50 		if (!sb->isNull)
51 			return -1;
52 	}
53 	else if (sb->isNull)
54 	{
55 		return 1;
56 	}
57 	else
58 	{
59 		/* Order according to distance comparison */
60 		for (i = 0; i < so->numberOfNonNullOrderBys; i++)
61 		{
62 			if (isnan(sa->distances[i]) && isnan(sb->distances[i]))
63 				continue;		/* NaN == NaN */
64 			if (isnan(sa->distances[i]))
65 				return -1;		/* NaN > number */
66 			if (isnan(sb->distances[i]))
67 				return 1;		/* number < NaN */
68 			if (sa->distances[i] != sb->distances[i])
69 				return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
70 		}
71 	}
72 
73 	/* Leaf items go before inner pages, to ensure a depth-first search */
74 	if (sa->isLeaf && !sb->isLeaf)
75 		return 1;
76 	if (!sa->isLeaf && sb->isLeaf)
77 		return -1;
78 
79 	return 0;
80 }
81 
82 static void
83 spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
84 {
85 	if (!so->state.attLeafType.attbyval &&
86 		DatumGetPointer(item->value) != NULL)
87 		pfree(DatumGetPointer(item->value));
88 
89 	if (item->traversalValue)
90 		pfree(item->traversalValue);
91 
92 	pfree(item);
93 }
94 
95 /*
96  * Add SpGistSearchItem to queue
97  *
98  * Called in queue context
99  */
100 static void
101 spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item)
102 {
103 	pairingheap_add(so->scanQueue, &item->phNode);
104 }
105 
106 static SpGistSearchItem *
107 spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances)
108 {
109 	/* allocate distance array only for non-NULL items */
110 	SpGistSearchItem *item =
111 	palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfNonNullOrderBys));
112 
113 	item->isNull = isnull;
114 
115 	if (!isnull && so->numberOfNonNullOrderBys > 0)
116 		memcpy(item->distances, distances,
117 			   sizeof(item->distances[0]) * so->numberOfNonNullOrderBys);
118 
119 	return item;
120 }
121 
122 static void
123 spgAddStartItem(SpGistScanOpaque so, bool isnull)
124 {
125 	SpGistSearchItem *startEntry =
126 	spgAllocSearchItem(so, isnull, so->zeroDistances);
127 
128 	ItemPointerSet(&startEntry->heapPtr,
129 				   isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
130 				   FirstOffsetNumber);
131 	startEntry->isLeaf = false;
132 	startEntry->level = 0;
133 	startEntry->value = (Datum) 0;
134 	startEntry->traversalValue = NULL;
135 	startEntry->recheck = false;
136 	startEntry->recheckDistances = false;
137 
138 	spgAddSearchItemToQueue(so, startEntry);
139 }
140 
141 /*
142  * Initialize queue to search the root page, resetting
143  * any previously active scan
144  */
145 static void
146 resetSpGistScanOpaque(SpGistScanOpaque so)
147 {
148 	MemoryContext oldCtx;
149 
150 	MemoryContextReset(so->traversalCxt);
151 
152 	oldCtx = MemoryContextSwitchTo(so->traversalCxt);
153 
154 	/* initialize queue only for distance-ordered scans */
155 	so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so);
156 
157 	if (so->searchNulls)
158 		/* Add a work item to scan the null index entries */
159 		spgAddStartItem(so, true);
160 
161 	if (so->searchNonNulls)
162 		/* Add a work item to scan the non-null index entries */
163 		spgAddStartItem(so, false);
164 
165 	MemoryContextSwitchTo(oldCtx);
166 
167 	if (so->numberOfOrderBys > 0)
168 	{
169 		/* Must pfree distances to avoid memory leak */
170 		int			i;
171 
172 		for (i = 0; i < so->nPtrs; i++)
173 			if (so->distances[i])
174 				pfree(so->distances[i]);
175 	}
176 
177 	if (so->want_itup)
178 	{
179 		/* Must pfree reconstructed tuples to avoid memory leak */
180 		int			i;
181 
182 		for (i = 0; i < so->nPtrs; i++)
183 			pfree(so->reconTups[i]);
184 	}
185 	so->iPtr = so->nPtrs = 0;
186 }
187 
188 /*
189  * Prepare scan keys in SpGistScanOpaque from caller-given scan keys
190  *
191  * Sets searchNulls, searchNonNulls, numberOfKeys, keyData fields of *so.
192  *
193  * The point here is to eliminate null-related considerations from what the
194  * opclass consistent functions need to deal with.  We assume all SPGiST-
195  * indexable operators are strict, so any null RHS value makes the scan
196  * condition unsatisfiable.  We also pull out any IS NULL/IS NOT NULL
197  * conditions; their effect is reflected into searchNulls/searchNonNulls.
198  */
199 static void
200 spgPrepareScanKeys(IndexScanDesc scan)
201 {
202 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
203 	bool		qual_ok;
204 	bool		haveIsNull;
205 	bool		haveNotNull;
206 	int			nkeys;
207 	int			i;
208 
209 	so->numberOfOrderBys = scan->numberOfOrderBys;
210 	so->orderByData = scan->orderByData;
211 
212 	if (so->numberOfOrderBys <= 0)
213 		so->numberOfNonNullOrderBys = 0;
214 	else
215 	{
216 		int			j = 0;
217 
218 		/*
219 		 * Remove all NULL keys, but remember their offsets in the original
220 		 * array.
221 		 */
222 		for (i = 0; i < scan->numberOfOrderBys; i++)
223 		{
224 			ScanKey		skey = &so->orderByData[i];
225 
226 			if (skey->sk_flags & SK_ISNULL)
227 				so->nonNullOrderByOffsets[i] = -1;
228 			else
229 			{
230 				if (i != j)
231 					so->orderByData[j] = *skey;
232 
233 				so->nonNullOrderByOffsets[i] = j++;
234 			}
235 		}
236 
237 		so->numberOfNonNullOrderBys = j;
238 	}
239 
240 	if (scan->numberOfKeys <= 0)
241 	{
242 		/* If no quals, whole-index scan is required */
243 		so->searchNulls = true;
244 		so->searchNonNulls = true;
245 		so->numberOfKeys = 0;
246 		return;
247 	}
248 
249 	/* Examine the given quals */
250 	qual_ok = true;
251 	haveIsNull = haveNotNull = false;
252 	nkeys = 0;
253 	for (i = 0; i < scan->numberOfKeys; i++)
254 	{
255 		ScanKey		skey = &scan->keyData[i];
256 
257 		if (skey->sk_flags & SK_SEARCHNULL)
258 			haveIsNull = true;
259 		else if (skey->sk_flags & SK_SEARCHNOTNULL)
260 			haveNotNull = true;
261 		else if (skey->sk_flags & SK_ISNULL)
262 		{
263 			/* ordinary qual with null argument - unsatisfiable */
264 			qual_ok = false;
265 			break;
266 		}
267 		else
268 		{
269 			/* ordinary qual, propagate into so->keyData */
270 			so->keyData[nkeys++] = *skey;
271 			/* this effectively creates a not-null requirement */
272 			haveNotNull = true;
273 		}
274 	}
275 
276 	/* IS NULL in combination with something else is unsatisfiable */
277 	if (haveIsNull && haveNotNull)
278 		qual_ok = false;
279 
280 	/* Emit results */
281 	if (qual_ok)
282 	{
283 		so->searchNulls = haveIsNull;
284 		so->searchNonNulls = haveNotNull;
285 		so->numberOfKeys = nkeys;
286 	}
287 	else
288 	{
289 		so->searchNulls = false;
290 		so->searchNonNulls = false;
291 		so->numberOfKeys = 0;
292 	}
293 }
294 
295 IndexScanDesc
296 spgbeginscan(Relation rel, int keysz, int orderbysz)
297 {
298 	IndexScanDesc scan;
299 	SpGistScanOpaque so;
300 	int			i;
301 
302 	scan = RelationGetIndexScan(rel, keysz, orderbysz);
303 
304 	so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
305 	if (keysz > 0)
306 		so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz);
307 	else
308 		so->keyData = NULL;
309 	initSpGistState(&so->state, scan->indexRelation);
310 
311 	so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
312 										"SP-GiST search temporary context",
313 										ALLOCSET_DEFAULT_SIZES);
314 	so->traversalCxt = AllocSetContextCreate(CurrentMemoryContext,
315 											 "SP-GiST traversal-value context",
316 											 ALLOCSET_DEFAULT_SIZES);
317 
318 	/* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
319 	so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
320 
321 	/* Allocate various arrays needed for order-by scans */
322 	if (scan->numberOfOrderBys > 0)
323 	{
324 		/* This will be filled in spgrescan, but allocate the space here */
325 		so->orderByTypes = (Oid *)
326 			palloc(sizeof(Oid) * scan->numberOfOrderBys);
327 		so->nonNullOrderByOffsets = (int *)
328 			palloc(sizeof(int) * scan->numberOfOrderBys);
329 
330 		/* These arrays have constant contents, so we can fill them now */
331 		so->zeroDistances = (double *)
332 			palloc(sizeof(double) * scan->numberOfOrderBys);
333 		so->infDistances = (double *)
334 			palloc(sizeof(double) * scan->numberOfOrderBys);
335 
336 		for (i = 0; i < scan->numberOfOrderBys; i++)
337 		{
338 			so->zeroDistances[i] = 0.0;
339 			so->infDistances[i] = get_float8_infinity();
340 		}
341 
342 		scan->xs_orderbyvals = (Datum *)
343 			palloc0(sizeof(Datum) * scan->numberOfOrderBys);
344 		scan->xs_orderbynulls = (bool *)
345 			palloc(sizeof(bool) * scan->numberOfOrderBys);
346 		memset(scan->xs_orderbynulls, true,
347 			   sizeof(bool) * scan->numberOfOrderBys);
348 	}
349 
350 	fmgr_info_copy(&so->innerConsistentFn,
351 				   index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
352 				   CurrentMemoryContext);
353 
354 	fmgr_info_copy(&so->leafConsistentFn,
355 				   index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
356 				   CurrentMemoryContext);
357 
358 	so->indexCollation = rel->rd_indcollation[0];
359 
360 	scan->opaque = so;
361 
362 	return scan;
363 }
364 
365 void
366 spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
367 		  ScanKey orderbys, int norderbys)
368 {
369 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
370 
371 	/* copy scankeys into local storage */
372 	if (scankey && scan->numberOfKeys > 0)
373 		memmove(scan->keyData, scankey,
374 				scan->numberOfKeys * sizeof(ScanKeyData));
375 
376 	/* initialize order-by data if needed */
377 	if (orderbys && scan->numberOfOrderBys > 0)
378 	{
379 		int			i;
380 
381 		memmove(scan->orderByData, orderbys,
382 				scan->numberOfOrderBys * sizeof(ScanKeyData));
383 
384 		for (i = 0; i < scan->numberOfOrderBys; i++)
385 		{
386 			ScanKey		skey = &scan->orderByData[i];
387 
388 			/*
389 			 * Look up the datatype returned by the original ordering
390 			 * operator. SP-GiST always uses a float8 for the distance
391 			 * function, but the ordering operator could be anything else.
392 			 *
393 			 * XXX: The distance function is only allowed to be lossy if the
394 			 * ordering operator's result type is float4 or float8.  Otherwise
395 			 * we don't know how to return the distance to the executor.  But
396 			 * we cannot check that here, as we won't know if the distance
397 			 * function is lossy until it returns *recheck = true for the
398 			 * first time.
399 			 */
400 			so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
401 		}
402 	}
403 
404 	/* preprocess scankeys, set up the representation in *so */
405 	spgPrepareScanKeys(scan);
406 
407 	/* set up starting queue entries */
408 	resetSpGistScanOpaque(so);
409 
410 	/* count an indexscan for stats */
411 	pgstat_count_index_scan(scan->indexRelation);
412 }
413 
414 void
415 spgendscan(IndexScanDesc scan)
416 {
417 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
418 
419 	MemoryContextDelete(so->tempCxt);
420 	MemoryContextDelete(so->traversalCxt);
421 
422 	if (so->keyData)
423 		pfree(so->keyData);
424 
425 	if (so->state.deadTupleStorage)
426 		pfree(so->state.deadTupleStorage);
427 
428 	if (scan->numberOfOrderBys > 0)
429 	{
430 		pfree(so->orderByTypes);
431 		pfree(so->nonNullOrderByOffsets);
432 		pfree(so->zeroDistances);
433 		pfree(so->infDistances);
434 		pfree(scan->xs_orderbyvals);
435 		pfree(scan->xs_orderbynulls);
436 	}
437 
438 	pfree(so);
439 }
440 
441 /*
442  * Leaf SpGistSearchItem constructor, called in queue context
443  */
444 static SpGistSearchItem *
445 spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
446 			   Datum leafValue, bool recheck, bool recheckDistances,
447 			   bool isnull, double *distances)
448 {
449 	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
450 
451 	item->level = level;
452 	item->heapPtr = *heapPtr;
453 
454 	/*
455 	 * If we need the reconstructed value, copy it to queue cxt out of tmp
456 	 * cxt.  Caution: the leaf_consistent method may not have supplied a value
457 	 * if we didn't ask it to, and mildly-broken methods might supply one of
458 	 * the wrong type.  Also, while the correct leafValue type is attType not
459 	 * leafType, pre-v14 Postgres versions have historically used attLeafType
460 	 * here; let's not confuse matters even more by changing that in a minor
461 	 * release.
462 	 */
463 	if (so->want_itup)
464 		item->value = isnull ? (Datum) 0 :
465 			datumCopy(leafValue, so->state.attLeafType.attbyval,
466 					  so->state.attLeafType.attlen);
467 	else
468 		item->value = (Datum) 0;
469 	item->traversalValue = NULL;
470 	item->isLeaf = true;
471 	item->recheck = recheck;
472 	item->recheckDistances = recheckDistances;
473 
474 	return item;
475 }
476 
477 /*
478  * Test whether a leaf tuple satisfies all the scan keys
479  *
480  * *reportedSome is set to true if:
481  *		the scan is not ordered AND the item satisfies the scankeys
482  */
483 static bool
484 spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
485 			SpGistLeafTuple leafTuple, bool isnull,
486 			bool *reportedSome, storeRes_func storeRes)
487 {
488 	Datum		leafValue;
489 	double	   *distances;
490 	bool		result;
491 	bool		recheck;
492 	bool		recheckDistances;
493 
494 	if (isnull)
495 	{
496 		/* Should not have arrived on a nulls page unless nulls are wanted */
497 		Assert(so->searchNulls);
498 		leafValue = (Datum) 0;
499 		distances = NULL;
500 		recheck = false;
501 		recheckDistances = false;
502 		result = true;
503 	}
504 	else
505 	{
506 		spgLeafConsistentIn in;
507 		spgLeafConsistentOut out;
508 
509 		/* use temp context for calling leaf_consistent */
510 		MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
511 
512 		in.scankeys = so->keyData;
513 		in.nkeys = so->numberOfKeys;
514 		in.orderbys = so->orderByData;
515 		in.norderbys = so->numberOfNonNullOrderBys;
516 		in.reconstructedValue = item->value;
517 		in.traversalValue = item->traversalValue;
518 		in.level = item->level;
519 		in.returnData = so->want_itup;
520 		in.leafDatum = SGLTDATUM(leafTuple, &so->state);
521 
522 		out.leafValue = (Datum) 0;
523 		out.recheck = false;
524 		out.distances = NULL;
525 		out.recheckDistances = false;
526 
527 		result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
528 												so->indexCollation,
529 												PointerGetDatum(&in),
530 												PointerGetDatum(&out)));
531 		recheck = out.recheck;
532 		recheckDistances = out.recheckDistances;
533 		leafValue = out.leafValue;
534 		distances = out.distances;
535 
536 		MemoryContextSwitchTo(oldCxt);
537 	}
538 
539 	if (result)
540 	{
541 		/* item passes the scankeys */
542 		if (so->numberOfNonNullOrderBys > 0)
543 		{
544 			/* the scan is ordered -> add the item to the queue */
545 			MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
546 			SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
547 														&leafTuple->heapPtr,
548 														leafValue,
549 														recheck,
550 														recheckDistances,
551 														isnull,
552 														distances);
553 
554 			spgAddSearchItemToQueue(so, heapItem);
555 
556 			MemoryContextSwitchTo(oldCxt);
557 		}
558 		else
559 		{
560 			/* non-ordered scan, so report the item right away */
561 			Assert(!recheckDistances);
562 			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
563 					 recheck, false, NULL);
564 			*reportedSome = true;
565 		}
566 	}
567 
568 	return result;
569 }
570 
571 /* A bundle initializer for inner_consistent methods */
572 static void
573 spgInitInnerConsistentIn(spgInnerConsistentIn *in,
574 						 SpGistScanOpaque so,
575 						 SpGistSearchItem *item,
576 						 SpGistInnerTuple innerTuple)
577 {
578 	in->scankeys = so->keyData;
579 	in->orderbys = so->orderByData;
580 	in->nkeys = so->numberOfKeys;
581 	in->norderbys = so->numberOfNonNullOrderBys;
582 	in->reconstructedValue = item->value;
583 	in->traversalMemoryContext = so->traversalCxt;
584 	in->traversalValue = item->traversalValue;
585 	in->level = item->level;
586 	in->returnData = so->want_itup;
587 	in->allTheSame = innerTuple->allTheSame;
588 	in->hasPrefix = (innerTuple->prefixSize > 0);
589 	in->prefixDatum = SGITDATUM(innerTuple, &so->state);
590 	in->nNodes = innerTuple->nNodes;
591 	in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
592 }
593 
594 static SpGistSearchItem *
595 spgMakeInnerItem(SpGistScanOpaque so,
596 				 SpGistSearchItem *parentItem,
597 				 SpGistNodeTuple tuple,
598 				 spgInnerConsistentOut *out, int i, bool isnull,
599 				 double *distances)
600 {
601 	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
602 
603 	item->heapPtr = tuple->t_tid;
604 	item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
605 		: parentItem->level;
606 
607 	/* Must copy value out of temp context */
608 	item->value = out->reconstructedValues
609 		? datumCopy(out->reconstructedValues[i],
610 					so->state.attLeafType.attbyval,
611 					so->state.attLeafType.attlen)
612 		: (Datum) 0;
613 
614 	/*
615 	 * Elements of out.traversalValues should be allocated in
616 	 * in.traversalMemoryContext, which is actually a long lived context of
617 	 * index scan.
618 	 */
619 	item->traversalValue =
620 		out->traversalValues ? out->traversalValues[i] : NULL;
621 
622 	item->isLeaf = false;
623 	item->recheck = false;
624 	item->recheckDistances = false;
625 
626 	return item;
627 }
628 
629 static void
630 spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
631 			 SpGistInnerTuple innerTuple, bool isnull)
632 {
633 	MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
634 	spgInnerConsistentOut out;
635 	int			nNodes = innerTuple->nNodes;
636 	int			i;
637 
638 	memset(&out, 0, sizeof(out));
639 
640 	if (!isnull)
641 	{
642 		spgInnerConsistentIn in;
643 
644 		spgInitInnerConsistentIn(&in, so, item, innerTuple);
645 
646 		/* use user-defined inner consistent method */
647 		FunctionCall2Coll(&so->innerConsistentFn,
648 						  so->indexCollation,
649 						  PointerGetDatum(&in),
650 						  PointerGetDatum(&out));
651 	}
652 	else
653 	{
654 		/* force all children to be visited */
655 		out.nNodes = nNodes;
656 		out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
657 		for (i = 0; i < nNodes; i++)
658 			out.nodeNumbers[i] = i;
659 	}
660 
661 	/* If allTheSame, they should all or none of them match */
662 	if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes)
663 		elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
664 
665 	if (out.nNodes)
666 	{
667 		/* collect node pointers */
668 		SpGistNodeTuple node;
669 		SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * nNodes);
670 
671 		SGITITERATE(innerTuple, i, node)
672 		{
673 			nodes[i] = node;
674 		}
675 
676 		MemoryContextSwitchTo(so->traversalCxt);
677 
678 		for (i = 0; i < out.nNodes; i++)
679 		{
680 			int			nodeN = out.nodeNumbers[i];
681 			SpGistSearchItem *innerItem;
682 			double	   *distances;
683 
684 			Assert(nodeN >= 0 && nodeN < nNodes);
685 
686 			node = nodes[nodeN];
687 
688 			if (!ItemPointerIsValid(&node->t_tid))
689 				continue;
690 
691 			/*
692 			 * Use infinity distances if innerConsistentFn() failed to return
693 			 * them or if is a NULL item (their distances are really unused).
694 			 */
695 			distances = out.distances ? out.distances[i] : so->infDistances;
696 
697 			innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull,
698 										 distances);
699 
700 			spgAddSearchItemToQueue(so, innerItem);
701 		}
702 	}
703 
704 	MemoryContextSwitchTo(oldCxt);
705 }
706 
707 /* Returns a next item in an (ordered) scan or null if the index is exhausted */
708 static SpGistSearchItem *
709 spgGetNextQueueItem(SpGistScanOpaque so)
710 {
711 	if (pairingheap_is_empty(so->scanQueue))
712 		return NULL;			/* Done when both heaps are empty */
713 
714 	/* Return item; caller is responsible to pfree it */
715 	return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue);
716 }
717 
718 enum SpGistSpecialOffsetNumbers
719 {
720 	SpGistBreakOffsetNumber = InvalidOffsetNumber,
721 	SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
722 	SpGistErrorOffsetNumber = MaxOffsetNumber + 2
723 };
724 
725 static OffsetNumber
726 spgTestLeafTuple(SpGistScanOpaque so,
727 				 SpGistSearchItem *item,
728 				 Page page, OffsetNumber offset,
729 				 bool isnull, bool isroot,
730 				 bool *reportedSome,
731 				 storeRes_func storeRes)
732 {
733 	SpGistLeafTuple leafTuple = (SpGistLeafTuple)
734 	PageGetItem(page, PageGetItemId(page, offset));
735 
736 	if (leafTuple->tupstate != SPGIST_LIVE)
737 	{
738 		if (!isroot)			/* all tuples on root should be live */
739 		{
740 			if (leafTuple->tupstate == SPGIST_REDIRECT)
741 			{
742 				/* redirection tuple should be first in chain */
743 				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
744 				/* transfer attention to redirect point */
745 				item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer;
746 				Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO);
747 				return SpGistRedirectOffsetNumber;
748 			}
749 
750 			if (leafTuple->tupstate == SPGIST_DEAD)
751 			{
752 				/* dead tuple should be first in chain */
753 				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
754 				/* No live entries on this page */
755 				Assert(leafTuple->nextOffset == InvalidOffsetNumber);
756 				return SpGistBreakOffsetNumber;
757 			}
758 		}
759 
760 		/* We should not arrive at a placeholder */
761 		elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
762 		return SpGistErrorOffsetNumber;
763 	}
764 
765 	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
766 
767 	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
768 
769 	return leafTuple->nextOffset;
770 }
771 
772 /*
773  * Walk the tree and report all tuples passing the scan quals to the storeRes
774  * subroutine.
775  *
776  * If scanWholeIndex is true, we'll do just that.  If not, we'll stop at the
777  * next page boundary once we have reported at least one tuple.
778  */
779 static void
780 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
781 		storeRes_func storeRes, Snapshot snapshot)
782 {
783 	Buffer		buffer = InvalidBuffer;
784 	bool		reportedSome = false;
785 
786 	while (scanWholeIndex || !reportedSome)
787 	{
788 		SpGistSearchItem *item = spgGetNextQueueItem(so);
789 
790 		if (item == NULL)
791 			break;				/* No more items in queue -> done */
792 
793 redirect:
794 		/* Check for interrupts, just in case of infinite loop */
795 		CHECK_FOR_INTERRUPTS();
796 
797 		if (item->isLeaf)
798 		{
799 			/* We store heap items in the queue only in case of ordered search */
800 			Assert(so->numberOfNonNullOrderBys > 0);
801 			storeRes(so, &item->heapPtr, item->value, item->isNull,
802 					 item->recheck, item->recheckDistances, item->distances);
803 			reportedSome = true;
804 		}
805 		else
806 		{
807 			BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr);
808 			OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr);
809 			Page		page;
810 			bool		isnull;
811 
812 			if (buffer == InvalidBuffer)
813 			{
814 				buffer = ReadBuffer(index, blkno);
815 				LockBuffer(buffer, BUFFER_LOCK_SHARE);
816 			}
817 			else if (blkno != BufferGetBlockNumber(buffer))
818 			{
819 				UnlockReleaseBuffer(buffer);
820 				buffer = ReadBuffer(index, blkno);
821 				LockBuffer(buffer, BUFFER_LOCK_SHARE);
822 			}
823 
824 			/* else new pointer points to the same page, no work needed */
825 
826 			page = BufferGetPage(buffer);
827 			TestForOldSnapshot(snapshot, index, page);
828 
829 			isnull = SpGistPageStoresNulls(page) ? true : false;
830 
831 			if (SpGistPageIsLeaf(page))
832 			{
833 				/* Page is a leaf - that is, all it's tuples are heap items */
834 				OffsetNumber max = PageGetMaxOffsetNumber(page);
835 
836 				if (SpGistBlockIsRoot(blkno))
837 				{
838 					/* When root is a leaf, examine all its tuples */
839 					for (offset = FirstOffsetNumber; offset <= max; offset++)
840 						(void) spgTestLeafTuple(so, item, page, offset,
841 												isnull, true,
842 												&reportedSome, storeRes);
843 				}
844 				else
845 				{
846 					/* Normal case: just examine the chain we arrived at */
847 					while (offset != InvalidOffsetNumber)
848 					{
849 						Assert(offset >= FirstOffsetNumber && offset <= max);
850 						offset = spgTestLeafTuple(so, item, page, offset,
851 												  isnull, false,
852 												  &reportedSome, storeRes);
853 						if (offset == SpGistRedirectOffsetNumber)
854 							goto redirect;
855 					}
856 				}
857 			}
858 			else				/* page is inner */
859 			{
860 				SpGistInnerTuple innerTuple = (SpGistInnerTuple)
861 				PageGetItem(page, PageGetItemId(page, offset));
862 
863 				if (innerTuple->tupstate != SPGIST_LIVE)
864 				{
865 					if (innerTuple->tupstate == SPGIST_REDIRECT)
866 					{
867 						/* transfer attention to redirect point */
868 						item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer;
869 						Assert(ItemPointerGetBlockNumber(&item->heapPtr) !=
870 							   SPGIST_METAPAGE_BLKNO);
871 						goto redirect;
872 					}
873 					elog(ERROR, "unexpected SPGiST tuple state: %d",
874 						 innerTuple->tupstate);
875 				}
876 
877 				spgInnerTest(so, item, innerTuple, isnull);
878 			}
879 		}
880 
881 		/* done with this scan item */
882 		spgFreeSearchItem(so, item);
883 		/* clear temp context before proceeding to the next one */
884 		MemoryContextReset(so->tempCxt);
885 	}
886 
887 	if (buffer != InvalidBuffer)
888 		UnlockReleaseBuffer(buffer);
889 }
890 
891 
892 /* storeRes subroutine for getbitmap case */
893 static void
894 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
895 			Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
896 			double *distances)
897 {
898 	Assert(!recheckDistances && !distances);
899 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
900 	so->ntids++;
901 }
902 
903 int64
904 spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
905 {
906 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
907 
908 	/* Copy want_itup to *so so we don't need to pass it around separately */
909 	so->want_itup = false;
910 
911 	so->tbm = tbm;
912 	so->ntids = 0;
913 
914 	spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot);
915 
916 	return so->ntids;
917 }
918 
919 /* storeRes subroutine for gettuple case */
920 static void
921 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
922 			  Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
923 			  double *nonNullDistances)
924 {
925 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
926 	so->heapPtrs[so->nPtrs] = *heapPtr;
927 	so->recheck[so->nPtrs] = recheck;
928 	so->recheckDistances[so->nPtrs] = recheckDistances;
929 
930 	if (so->numberOfOrderBys > 0)
931 	{
932 		if (isnull || so->numberOfNonNullOrderBys <= 0)
933 			so->distances[so->nPtrs] = NULL;
934 		else
935 		{
936 			IndexOrderByDistance *distances =
937 			palloc(sizeof(distances[0]) * so->numberOfOrderBys);
938 			int			i;
939 
940 			for (i = 0; i < so->numberOfOrderBys; i++)
941 			{
942 				int			offset = so->nonNullOrderByOffsets[i];
943 
944 				if (offset >= 0)
945 				{
946 					/* Copy non-NULL distance value */
947 					distances[i].value = nonNullDistances[offset];
948 					distances[i].isnull = false;
949 				}
950 				else
951 				{
952 					/* Set distance's NULL flag. */
953 					distances[i].value = 0.0;
954 					distances[i].isnull = true;
955 				}
956 			}
957 
958 			so->distances[so->nPtrs] = distances;
959 		}
960 	}
961 
962 	if (so->want_itup)
963 	{
964 		/*
965 		 * Reconstruct index data.  We have to copy the datum out of the temp
966 		 * context anyway, so we may as well create the tuple here.
967 		 */
968 		so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
969 												   &leafValue,
970 												   &isnull);
971 	}
972 	so->nPtrs++;
973 }
974 
975 bool
976 spggettuple(IndexScanDesc scan, ScanDirection dir)
977 {
978 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
979 
980 	if (dir != ForwardScanDirection)
981 		elog(ERROR, "SP-GiST only supports forward scan direction");
982 
983 	/* Copy want_itup to *so so we don't need to pass it around separately */
984 	so->want_itup = scan->xs_want_itup;
985 
986 	for (;;)
987 	{
988 		if (so->iPtr < so->nPtrs)
989 		{
990 			/* continuing to return reported tuples */
991 			scan->xs_heaptid = so->heapPtrs[so->iPtr];
992 			scan->xs_recheck = so->recheck[so->iPtr];
993 			scan->xs_hitup = so->reconTups[so->iPtr];
994 
995 			if (so->numberOfOrderBys > 0)
996 				index_store_float8_orderby_distances(scan, so->orderByTypes,
997 													 so->distances[so->iPtr],
998 													 so->recheckDistances[so->iPtr]);
999 			so->iPtr++;
1000 			return true;
1001 		}
1002 
1003 		if (so->numberOfOrderBys > 0)
1004 		{
1005 			/* Must pfree distances to avoid memory leak */
1006 			int			i;
1007 
1008 			for (i = 0; i < so->nPtrs; i++)
1009 				if (so->distances[i])
1010 					pfree(so->distances[i]);
1011 		}
1012 
1013 		if (so->want_itup)
1014 		{
1015 			/* Must pfree reconstructed tuples to avoid memory leak */
1016 			int			i;
1017 
1018 			for (i = 0; i < so->nPtrs; i++)
1019 				pfree(so->reconTups[i]);
1020 		}
1021 		so->iPtr = so->nPtrs = 0;
1022 
1023 		spgWalk(scan->indexRelation, so, false, storeGettuple,
1024 				scan->xs_snapshot);
1025 
1026 		if (so->nPtrs == 0)
1027 			break;				/* must have completed scan */
1028 	}
1029 
1030 	return false;
1031 }
1032 
1033 bool
1034 spgcanreturn(Relation index, int attno)
1035 {
1036 	SpGistCache *cache;
1037 
1038 	/* We can do it if the opclass config function says so */
1039 	cache = spgGetCache(index);
1040 
1041 	return cache->config.canReturnData;
1042 }
1043