1 /*-------------------------------------------------------------------------
2  *
3  * spgscan.c
4  *	  routines for scanning SP-GiST indexes
5  *
6  *
js_export()7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *			src/backend/access/spgist/spgscan.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/genam.h"
19 #include "access/relscan.h"
20 #include "access/spgist_private.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 #include "storage/bufmgr.h"
24 #include "utils/datum.h"
25 #include "utils/float.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 
30 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
31 							   Datum leafValue, bool isNull, bool recheck,
32 							   bool recheckDistances, double *distances);
33 
34 /*
35  * Pairing heap comparison function for the SpGistSearchItem queue.
36  * KNN-searches currently only support NULLS LAST.  So, preserve this logic
37  * here.
38  */
39 static int
40 pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
41 								 const pairingheap_node *b, void *arg)
42 {
43 	const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
44 	const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
45 	SpGistScanOpaque so = (SpGistScanOpaque) arg;
46 	int			i;
47 
48 	if (sa->isNull)
49 	{
export()50 		if (!sb->isNull)
51 			return -1;
52 	}
53 	else if (sb->isNull)
54 	{
55 		return 1;
56 	}
57 	else
58 	{
59 		/* Order according to distance comparison */
60 		for (i = 0; i < so->numberOfNonNullOrderBys; i++)
61 		{
62 			if (isnan(sa->distances[i]) && isnan(sb->distances[i]))
63 				continue;		/* NaN == NaN */
64 			if (isnan(sa->distances[i]))
65 				return -1;		/* NaN > number */
66 			if (isnan(sb->distances[i]))
67 				return 1;		/* number < NaN */
68 			if (sa->distances[i] != sb->distances[i])
69 				return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
70 		}
71 	}
72 
73 	/* Leaf items go before inner pages, to ensure a depth-first search */
74 	if (sa->isLeaf && !sb->isLeaf)
75 		return 1;
76 	if (!sa->isLeaf && sb->isLeaf)
77 		return -1;
78 
79 	return 0;
80 }
81 
82 static void
83 spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
84 {
import()85 	if (!so->state.attLeafType.attbyval &&
86 		DatumGetPointer(item->value) != NULL)
87 		pfree(DatumGetPointer(item->value));
88 
89 	if (item->traversalValue)
90 		pfree(item->traversalValue);
91 
92 	pfree(item);
93 }
94 
95 /*
96  * Add SpGistSearchItem to queue
97  *
98  * Called in queue context
99  */
100 static void
101 spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item)
102 {
103 	pairingheap_add(so->scanQueue, &item->phNode);
104 }
105 
106 static SpGistSearchItem *
107 spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances)
108 {
109 	/* allocate distance array only for non-NULL items */
110 	SpGistSearchItem *item =
111 	palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfNonNullOrderBys));
112 
113 	item->isNull = isnull;
pass_array()114 
115 	if (!isnull && so->numberOfNonNullOrderBys > 0)
116 		memcpy(item->distances, distances,
117 			   sizeof(item->distances[0]) * so->numberOfNonNullOrderBys);
118 
119 	return item;
120 }
121 
122 static void
123 spgAddStartItem(SpGistScanOpaque so, bool isnull)
124 {
125 	SpGistSearchItem *startEntry =
126 	spgAllocSearchItem(so, isnull, so->zeroDistances);
127 
128 	ItemPointerSet(&startEntry->heapPtr,
129 				   isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
130 				   FirstOffsetNumber);
131 	startEntry->isLeaf = false;
132 	startEntry->level = 0;
133 	startEntry->value = (Datum) 0;
134 	startEntry->traversalValue = NULL;
135 	startEntry->recheck = false;
136 	startEntry->recheckDistances = false;
137 
138 	spgAddSearchItemToQueue(so, startEntry);
139 }
140 
141 /*
142  * Initialize queue to search the root page, resetting
143  * any previously active scan
144  */
145 static void
146 resetSpGistScanOpaque(SpGistScanOpaque so)
147 {
148 	MemoryContext oldCtx;
149 
150 	/*
151 	 * clear traversal context before proceeding to the next scan; this must
152 	 * not happen before the freeScanStack above, else we get double-free
153 	 * crashes.
154 	 */
155 	MemoryContextReset(so->traversalCxt);
156 
157 	oldCtx = MemoryContextSwitchTo(so->traversalCxt);
158 
159 	/* initialize queue only for distance-ordered scans */
160 	so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so);
161 
162 	if (so->searchNulls)
163 		/* Add a work item to scan the null index entries */
164 		spgAddStartItem(so, true);
165 
166 	if (so->searchNonNulls)
167 		/* Add a work item to scan the non-null index entries */
168 		spgAddStartItem(so, false);
169 
170 	MemoryContextSwitchTo(oldCtx);
171 
172 	if (so->numberOfOrderBys > 0)
173 	{
174 		/* Must pfree distances to avoid memory leak */
175 		int			i;
176 
177 		for (i = 0; i < so->nPtrs; i++)
178 			if (so->distances[i])
179 				pfree(so->distances[i]);
180 	}
181 
182 	if (so->want_itup)
183 	{
184 		/* Must pfree reconstructed tuples to avoid memory leak */
185 		int			i;
186 
187 		for (i = 0; i < so->nPtrs; i++)
188 			pfree(so->reconTups[i]);
189 	}
190 	so->iPtr = so->nPtrs = 0;
191 }
192 
export_mut()193 /*
194  * Prepare scan keys in SpGistScanOpaque from caller-given scan keys
195  *
196  * Sets searchNulls, searchNonNulls, numberOfKeys, keyData fields of *so.
197  *
198  * The point here is to eliminate null-related considerations from what the
199  * opclass consistent functions need to deal with.  We assume all SPGiST-
200  * indexable operators are strict, so any null RHS value makes the scan
201  * condition unsatisfiable.  We also pull out any IS NULL/IS NOT NULL
202  * conditions; their effect is reflected into searchNulls/searchNonNulls.
203  */
204 static void
205 spgPrepareScanKeys(IndexScanDesc scan)
206 {
207 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
208 	bool		qual_ok;
209 	bool		haveIsNull;
210 	bool		haveNotNull;
211 	int			nkeys;
212 	int			i;
213 
214 	so->numberOfOrderBys = scan->numberOfOrderBys;
215 	so->orderByData = scan->orderByData;
216 
217 	if (so->numberOfOrderBys <= 0)
218 		so->numberOfNonNullOrderBys = 0;
219 	else
220 	{
221 		int			j = 0;
222 
223 		/*
224 		 * Remove all NULL keys, but remember their offsets in the original
225 		 * array.
226 		 */
227 		for (i = 0; i < scan->numberOfOrderBys; i++)
228 		{
229 			ScanKey		skey = &so->orderByData[i];
230 
231 			if (skey->sk_flags & SK_ISNULL)
232 				so->nonNullOrderByOffsets[i] = -1;
233 			else
234 			{
235 				if (i != j)
236 					so->orderByData[j] = *skey;
237 
238 				so->nonNullOrderByOffsets[i] = j++;
239 			}
240 		}
241 
242 		so->numberOfNonNullOrderBys = j;
243 	}
244 
245 	if (scan->numberOfKeys <= 0)
246 	{
247 		/* If no quals, whole-index scan is required */
248 		so->searchNulls = true;
249 		so->searchNonNulls = true;
250 		so->numberOfKeys = 0;
251 		return;
252 	}
253 
254 	/* Examine the given quals */
255 	qual_ok = true;
256 	haveIsNull = haveNotNull = false;
257 	nkeys = 0;
258 	for (i = 0; i < scan->numberOfKeys; i++)
259 	{
260 		ScanKey		skey = &scan->keyData[i];
261 
262 		if (skey->sk_flags & SK_SEARCHNULL)
263 			haveIsNull = true;
264 		else if (skey->sk_flags & SK_SEARCHNOTNULL)
265 			haveNotNull = true;
266 		else if (skey->sk_flags & SK_ISNULL)
267 		{
268 			/* ordinary qual with null argument - unsatisfiable */
269 			qual_ok = false;
270 			break;
271 		}
272 		else
273 		{
274 			/* ordinary qual, propagate into so->keyData */
275 			so->keyData[nkeys++] = *skey;
276 			/* this effectively creates a not-null requirement */
277 			haveNotNull = true;
278 		}
279 	}
280 
281 	/* IS NULL in combination with something else is unsatisfiable */
282 	if (haveIsNull && haveNotNull)
283 		qual_ok = false;
284 
285 	/* Emit results */
286 	if (qual_ok)
287 	{
288 		so->searchNulls = haveIsNull;
289 		so->searchNonNulls = haveNotNull;
290 		so->numberOfKeys = nkeys;
291 	}
292 	else
293 	{
294 		so->searchNulls = false;
295 		so->searchNonNulls = false;
296 		so->numberOfKeys = 0;
297 	}
298 }
299 
300 IndexScanDesc
301 spgbeginscan(Relation rel, int keysz, int orderbysz)
302 {
303 	IndexScanDesc scan;
304 	SpGistScanOpaque so;
305 	int			i;
306 
307 	scan = RelationGetIndexScan(rel, keysz, orderbysz);
308 
309 	so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
310 	if (keysz > 0)
311 		so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz);
312 	else
313 		so->keyData = NULL;
314 	initSpGistState(&so->state, scan->indexRelation);
315 
316 	so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
317 										"SP-GiST search temporary context",
318 										ALLOCSET_DEFAULT_SIZES);
319 	so->traversalCxt = AllocSetContextCreate(CurrentMemoryContext,
320 											 "SP-GiST traversal-value context",
321 											 ALLOCSET_DEFAULT_SIZES);
322 
323 	/* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
324 	so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
325 
326 	/* Allocate various arrays needed for order-by scans */
327 	if (scan->numberOfOrderBys > 0)
328 	{
329 		/* This will be filled in spgrescan, but allocate the space here */
330 		so->orderByTypes = (Oid *)
331 			palloc(sizeof(Oid) * scan->numberOfOrderBys);
332 		so->nonNullOrderByOffsets = (int *)
333 			palloc(sizeof(int) * scan->numberOfOrderBys);
334 
335 		/* These arrays have constant contents, so we can fill them now */
336 		so->zeroDistances = (double *)
337 			palloc(sizeof(double) * scan->numberOfOrderBys);
338 		so->infDistances = (double *)
339 			palloc(sizeof(double) * scan->numberOfOrderBys);
340 
341 		for (i = 0; i < scan->numberOfOrderBys; i++)
342 		{
343 			so->zeroDistances[i] = 0.0;
344 			so->infDistances[i] = get_float8_infinity();
345 		}
346 
347 		scan->xs_orderbyvals = (Datum *)
348 			palloc0(sizeof(Datum) * scan->numberOfOrderBys);
349 		scan->xs_orderbynulls = (bool *)
350 			palloc(sizeof(bool) * scan->numberOfOrderBys);
351 		memset(scan->xs_orderbynulls, true,
352 			   sizeof(bool) * scan->numberOfOrderBys);
353 	}
354 
355 	fmgr_info_copy(&so->innerConsistentFn,
356 				   index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
357 				   CurrentMemoryContext);
358 
359 	fmgr_info_copy(&so->leafConsistentFn,
360 				   index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
361 				   CurrentMemoryContext);
362 
363 	so->indexCollation = rel->rd_indcollation[0];
364 
365 	scan->opaque = so;
366 
367 	return scan;
368 }
369 
370 void
371 spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
372 		  ScanKey orderbys, int norderbys)
373 {
374 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
375 
376 	/* copy scankeys into local storage */
377 	if (scankey && scan->numberOfKeys > 0)
378 		memmove(scan->keyData, scankey,
379 				scan->numberOfKeys * sizeof(ScanKeyData));
380 
381 	/* initialize order-by data if needed */
382 	if (orderbys && scan->numberOfOrderBys > 0)
383 	{
384 		int			i;
385 
386 		memmove(scan->orderByData, orderbys,
387 				scan->numberOfOrderBys * sizeof(ScanKeyData));
388 
389 		for (i = 0; i < scan->numberOfOrderBys; i++)
390 		{
391 			ScanKey		skey = &scan->orderByData[i];
392 
393 			/*
394 			 * Look up the datatype returned by the original ordering
395 			 * operator. SP-GiST always uses a float8 for the distance
396 			 * function, but the ordering operator could be anything else.
397 			 *
398 			 * XXX: The distance function is only allowed to be lossy if the
399 			 * ordering operator's result type is float4 or float8.  Otherwise
400 			 * we don't know how to return the distance to the executor.  But
401 			 * we cannot check that here, as we won't know if the distance
402 			 * function is lossy until it returns *recheck = true for the
403 			 * first time.
404 			 */
405 			so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
406 		}
407 	}
408 
409 	/* preprocess scankeys, set up the representation in *so */
410 	spgPrepareScanKeys(scan);
411 
412 	/* set up starting queue entries */
413 	resetSpGistScanOpaque(so);
414 
415 	/* count an indexscan for stats */
416 	pgstat_count_index_scan(scan->indexRelation);
417 }
418 
419 void
420 spgendscan(IndexScanDesc scan)
421 {
422 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
423 
424 	MemoryContextDelete(so->tempCxt);
425 	MemoryContextDelete(so->traversalCxt);
426 
427 	if (so->keyData)
428 		pfree(so->keyData);
429 
430 	if (so->state.deadTupleStorage)
431 		pfree(so->state.deadTupleStorage);
432 
433 	if (scan->numberOfOrderBys > 0)
434 	{
435 		pfree(so->orderByTypes);
436 		pfree(so->nonNullOrderByOffsets);
437 		pfree(so->zeroDistances);
438 		pfree(so->infDistances);
439 		pfree(scan->xs_orderbyvals);
440 		pfree(scan->xs_orderbynulls);
441 	}
442 
443 	pfree(so);
444 }
445 
446 /*
447  * Leaf SpGistSearchItem constructor, called in queue context
448  */
449 static SpGistSearchItem *
450 spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
451 			   Datum leafValue, bool recheck, bool recheckDistances,
452 			   bool isnull, double *distances)
453 {
454 	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
455 
456 	item->level = level;
457 	item->heapPtr = *heapPtr;
458 
459 	/*
460 	 * If we need the reconstructed value, copy it to queue cxt out of tmp
461 	 * cxt.  Caution: the leaf_consistent method may not have supplied a value
462 	 * if we didn't ask it to, and mildly-broken methods might supply one of
463 	 * the wrong type.  Also, while the correct leafValue type is attType not
464 	 * leafType, pre-v14 Postgres versions have historically used attLeafType
465 	 * here; let's not confuse matters even more by changing that in a minor
466 	 * release.
467 	 */
468 	if (so->want_itup)
469 		item->value = isnull ? (Datum) 0 :
470 			datumCopy(leafValue, so->state.attLeafType.attbyval,
471 					  so->state.attLeafType.attlen);
472 	else
473 		item->value = (Datum) 0;
474 	item->traversalValue = NULL;
475 	item->isLeaf = true;
476 	item->recheck = recheck;
477 	item->recheckDistances = recheckDistances;
478 
479 	return item;
480 }
481 
482 /*
483  * Test whether a leaf tuple satisfies all the scan keys
484  *
485  * *reportedSome is set to true if:
486  *		the scan is not ordered AND the item satisfies the scankeys
487  */
488 static bool
489 spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
490 			SpGistLeafTuple leafTuple, bool isnull,
491 			bool *reportedSome, storeRes_func storeRes)
492 {
493 	Datum		leafValue;
494 	double	   *distances;
495 	bool		result;
496 	bool		recheck;
497 	bool		recheckDistances;
498 
499 	if (isnull)
500 	{
501 		/* Should not have arrived on a nulls page unless nulls are wanted */
502 		Assert(so->searchNulls);
503 		leafValue = (Datum) 0;
504 		distances = NULL;
505 		recheck = false;
506 		recheckDistances = false;
507 		result = true;
508 	}
509 	else
510 	{
511 		spgLeafConsistentIn in;
512 		spgLeafConsistentOut out;
513 
514 		/* use temp context for calling leaf_consistent */
515 		MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
516 
517 		in.scankeys = so->keyData;
518 		in.nkeys = so->numberOfKeys;
519 		in.orderbys = so->orderByData;
520 		in.norderbys = so->numberOfNonNullOrderBys;
521 		in.reconstructedValue = item->value;
522 		in.traversalValue = item->traversalValue;
523 		in.level = item->level;
524 		in.returnData = so->want_itup;
525 		in.leafDatum = SGLTDATUM(leafTuple, &so->state);
526 
527 		out.leafValue = (Datum) 0;
528 		out.recheck = false;
529 		out.distances = NULL;
530 		out.recheckDistances = false;
531 
532 		result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
533 												so->indexCollation,
534 												PointerGetDatum(&in),
535 												PointerGetDatum(&out)));
536 		recheck = out.recheck;
537 		recheckDistances = out.recheckDistances;
538 		leafValue = out.leafValue;
539 		distances = out.distances;
540 
541 		MemoryContextSwitchTo(oldCxt);
542 	}
543 
544 	if (result)
545 	{
546 		/* item passes the scankeys */
547 		if (so->numberOfNonNullOrderBys > 0)
548 		{
549 			/* the scan is ordered -> add the item to the queue */
550 			MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
551 			SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
552 														&leafTuple->heapPtr,
553 														leafValue,
554 														recheck,
555 														recheckDistances,
556 														isnull,
557 														distances);
558 
559 			spgAddSearchItemToQueue(so, heapItem);
560 
561 			MemoryContextSwitchTo(oldCxt);
562 		}
563 		else
564 		{
565 			/* non-ordered scan, so report the item right away */
566 			Assert(!recheckDistances);
567 			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
568 					 recheck, false, NULL);
569 			*reportedSome = true;
570 		}
571 	}
572 
573 	return result;
574 }
575 
576 /* A bundle initializer for inner_consistent methods */
577 static void
578 spgInitInnerConsistentIn(spgInnerConsistentIn *in,
579 						 SpGistScanOpaque so,
580 						 SpGistSearchItem *item,
581 						 SpGistInnerTuple innerTuple)
582 {
583 	in->scankeys = so->keyData;
584 	in->orderbys = so->orderByData;
585 	in->nkeys = so->numberOfKeys;
586 	in->norderbys = so->numberOfNonNullOrderBys;
587 	in->reconstructedValue = item->value;
588 	in->traversalMemoryContext = so->traversalCxt;
589 	in->traversalValue = item->traversalValue;
590 	in->level = item->level;
591 	in->returnData = so->want_itup;
592 	in->allTheSame = innerTuple->allTheSame;
593 	in->hasPrefix = (innerTuple->prefixSize > 0);
594 	in->prefixDatum = SGITDATUM(innerTuple, &so->state);
595 	in->nNodes = innerTuple->nNodes;
596 	in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
597 }
598 
599 static SpGistSearchItem *
600 spgMakeInnerItem(SpGistScanOpaque so,
601 				 SpGistSearchItem *parentItem,
602 				 SpGistNodeTuple tuple,
603 				 spgInnerConsistentOut *out, int i, bool isnull,
604 				 double *distances)
605 {
606 	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
607 
608 	item->heapPtr = tuple->t_tid;
609 	item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
610 		: parentItem->level;
611 
612 	/* Must copy value out of temp context */
613 	item->value = out->reconstructedValues
614 		? datumCopy(out->reconstructedValues[i],
615 					so->state.attLeafType.attbyval,
616 					so->state.attLeafType.attlen)
617 		: (Datum) 0;
618 
619 	/*
620 	 * Elements of out.traversalValues should be allocated in
621 	 * in.traversalMemoryContext, which is actually a long lived context of
622 	 * index scan.
623 	 */
624 	item->traversalValue =
625 		out->traversalValues ? out->traversalValues[i] : NULL;
626 
627 	item->isLeaf = false;
628 	item->recheck = false;
629 	item->recheckDistances = false;
630 
631 	return item;
632 }
633 
634 static void
635 spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
636 			 SpGistInnerTuple innerTuple, bool isnull)
637 {
638 	MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
639 	spgInnerConsistentOut out;
640 	int			nNodes = innerTuple->nNodes;
641 	int			i;
642 
643 	memset(&out, 0, sizeof(out));
644 
645 	if (!isnull)
646 	{
647 		spgInnerConsistentIn in;
648 
649 		spgInitInnerConsistentIn(&in, so, item, innerTuple);
650 
651 		/* use user-defined inner consistent method */
652 		FunctionCall2Coll(&so->innerConsistentFn,
653 						  so->indexCollation,
654 						  PointerGetDatum(&in),
655 						  PointerGetDatum(&out));
656 	}
657 	else
658 	{
659 		/* force all children to be visited */
660 		out.nNodes = nNodes;
661 		out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
662 		for (i = 0; i < nNodes; i++)
663 			out.nodeNumbers[i] = i;
664 	}
665 
666 	/* If allTheSame, they should all or none of them match */
667 	if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes)
668 		elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
669 
670 	if (out.nNodes)
671 	{
672 		/* collect node pointers */
673 		SpGistNodeTuple node;
674 		SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc(
675 															sizeof(SpGistNodeTuple) * nNodes);
676 
677 		SGITITERATE(innerTuple, i, node)
678 		{
679 			nodes[i] = node;
680 		}
681 
682 		MemoryContextSwitchTo(so->traversalCxt);
683 
684 		for (i = 0; i < out.nNodes; i++)
685 		{
686 			int			nodeN = out.nodeNumbers[i];
687 			SpGistSearchItem *innerItem;
688 			double	   *distances;
689 
690 			Assert(nodeN >= 0 && nodeN < nNodes);
691 
692 			node = nodes[nodeN];
693 
694 			if (!ItemPointerIsValid(&node->t_tid))
695 				continue;
696 
697 			/*
698 			 * Use infinity distances if innerConsistent() failed to return
699 			 * them or if is a NULL item (their distances are really unused).
700 			 */
701 			distances = out.distances ? out.distances[i] : so->infDistances;
702 
703 			innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull,
704 										 distances);
705 
706 			spgAddSearchItemToQueue(so, innerItem);
707 		}
708 	}
709 
710 	MemoryContextSwitchTo(oldCxt);
711 }
712 
713 /* Returns a next item in an (ordered) scan or null if the index is exhausted */
714 static SpGistSearchItem *
715 spgGetNextQueueItem(SpGistScanOpaque so)
716 {
717 	if (pairingheap_is_empty(so->scanQueue))
718 		return NULL;			/* Done when both heaps are empty */
719 
720 	/* Return item; caller is responsible to pfree it */
721 	return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue);
722 }
723 
724 enum SpGistSpecialOffsetNumbers
725 {
726 	SpGistBreakOffsetNumber = InvalidOffsetNumber,
727 	SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
728 	SpGistErrorOffsetNumber = MaxOffsetNumber + 2
729 };
730 
731 static OffsetNumber
732 spgTestLeafTuple(SpGistScanOpaque so,
733 				 SpGistSearchItem *item,
734 				 Page page, OffsetNumber offset,
735 				 bool isnull, bool isroot,
736 				 bool *reportedSome,
737 				 storeRes_func storeRes)
738 {
739 	SpGistLeafTuple leafTuple = (SpGistLeafTuple)
740 	PageGetItem(page, PageGetItemId(page, offset));
741 
742 	if (leafTuple->tupstate != SPGIST_LIVE)
743 	{
744 		if (!isroot)			/* all tuples on root should be live */
745 		{
746 			if (leafTuple->tupstate == SPGIST_REDIRECT)
747 			{
748 				/* redirection tuple should be first in chain */
749 				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
750 				/* transfer attention to redirect point */
751 				item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer;
752 				Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO);
753 				return SpGistRedirectOffsetNumber;
754 			}
755 
756 			if (leafTuple->tupstate == SPGIST_DEAD)
757 			{
758 				/* dead tuple should be first in chain */
759 				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
760 				/* No live entries on this page */
761 				Assert(leafTuple->nextOffset == InvalidOffsetNumber);
762 				return SpGistBreakOffsetNumber;
763 			}
764 		}
765 
766 		/* We should not arrive at a placeholder */
767 		elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
768 		return SpGistErrorOffsetNumber;
769 	}
770 
771 	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
772 
773 	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
774 
775 	return leafTuple->nextOffset;
776 }
777 
778 /*
779  * Walk the tree and report all tuples passing the scan quals to the storeRes
780  * subroutine.
781  *
782  * If scanWholeIndex is true, we'll do just that.  If not, we'll stop at the
783  * next page boundary once we have reported at least one tuple.
784  */
785 static void
786 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
787 		storeRes_func storeRes, Snapshot snapshot)
788 {
789 	Buffer		buffer = InvalidBuffer;
790 	bool		reportedSome = false;
791 
792 	while (scanWholeIndex || !reportedSome)
793 	{
794 		SpGistSearchItem *item = spgGetNextQueueItem(so);
795 
796 		if (item == NULL)
797 			break;				/* No more items in queue -> done */
798 
799 redirect:
800 		/* Check for interrupts, just in case of infinite loop */
801 		CHECK_FOR_INTERRUPTS();
802 
803 		if (item->isLeaf)
804 		{
805 			/* We store heap items in the queue only in case of ordered search */
806 			Assert(so->numberOfNonNullOrderBys > 0);
807 			storeRes(so, &item->heapPtr, item->value, item->isNull,
808 					 item->recheck, item->recheckDistances, item->distances);
809 			reportedSome = true;
810 		}
811 		else
812 		{
813 			BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr);
814 			OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr);
815 			Page		page;
816 			bool		isnull;
817 
818 			if (buffer == InvalidBuffer)
819 			{
820 				buffer = ReadBuffer(index, blkno);
821 				LockBuffer(buffer, BUFFER_LOCK_SHARE);
822 			}
823 			else if (blkno != BufferGetBlockNumber(buffer))
824 			{
825 				UnlockReleaseBuffer(buffer);
826 				buffer = ReadBuffer(index, blkno);
827 				LockBuffer(buffer, BUFFER_LOCK_SHARE);
828 			}
829 
830 			/* else new pointer points to the same page, no work needed */
831 
832 			page = BufferGetPage(buffer);
833 			TestForOldSnapshot(snapshot, index, page);
834 
835 			isnull = SpGistPageStoresNulls(page) ? true : false;
836 
837 			if (SpGistPageIsLeaf(page))
838 			{
839 				/* Page is a leaf - that is, all it's tuples are heap items */
840 				OffsetNumber max = PageGetMaxOffsetNumber(page);
841 
842 				if (SpGistBlockIsRoot(blkno))
843 				{
844 					/* When root is a leaf, examine all its tuples */
845 					for (offset = FirstOffsetNumber; offset <= max; offset++)
846 						(void) spgTestLeafTuple(so, item, page, offset,
847 												isnull, true,
848 												&reportedSome, storeRes);
849 				}
850 				else
851 				{
852 					/* Normal case: just examine the chain we arrived at */
853 					while (offset != InvalidOffsetNumber)
854 					{
855 						Assert(offset >= FirstOffsetNumber && offset <= max);
856 						offset = spgTestLeafTuple(so, item, page, offset,
857 												  isnull, false,
858 												  &reportedSome, storeRes);
859 						if (offset == SpGistRedirectOffsetNumber)
860 							goto redirect;
861 					}
862 				}
863 			}
864 			else				/* page is inner */
865 			{
866 				SpGistInnerTuple innerTuple = (SpGistInnerTuple)
867 				PageGetItem(page, PageGetItemId(page, offset));
868 
869 				if (innerTuple->tupstate != SPGIST_LIVE)
870 				{
871 					if (innerTuple->tupstate == SPGIST_REDIRECT)
872 					{
873 						/* transfer attention to redirect point */
874 						item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer;
875 						Assert(ItemPointerGetBlockNumber(&item->heapPtr) !=
876 							   SPGIST_METAPAGE_BLKNO);
877 						goto redirect;
878 					}
879 					elog(ERROR, "unexpected SPGiST tuple state: %d",
880 						 innerTuple->tupstate);
881 				}
882 
883 				spgInnerTest(so, item, innerTuple, isnull);
884 			}
885 		}
886 
887 		/* done with this scan item */
888 		spgFreeSearchItem(so, item);
889 		/* clear temp context before proceeding to the next one */
890 		MemoryContextReset(so->tempCxt);
891 	}
892 
893 	if (buffer != InvalidBuffer)
894 		UnlockReleaseBuffer(buffer);
895 }
896 
897 
898 /* storeRes subroutine for getbitmap case */
899 static void
900 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
901 			Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
902 			double *distances)
903 {
904 	Assert(!recheckDistances && !distances);
905 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
906 	so->ntids++;
907 }
908 
909 int64
910 spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
911 {
912 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
913 
914 	/* Copy want_itup to *so so we don't need to pass it around separately */
915 	so->want_itup = false;
916 
917 	so->tbm = tbm;
918 	so->ntids = 0;
919 
920 	spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot);
921 
922 	return so->ntids;
923 }
924 
925 /* storeRes subroutine for gettuple case */
926 static void
927 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
928 			  Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
929 			  double *nonNullDistances)
930 {
931 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
932 	so->heapPtrs[so->nPtrs] = *heapPtr;
933 	so->recheck[so->nPtrs] = recheck;
934 	so->recheckDistances[so->nPtrs] = recheckDistances;
935 
936 	if (so->numberOfOrderBys > 0)
937 	{
938 		if (isnull || so->numberOfNonNullOrderBys <= 0)
939 			so->distances[so->nPtrs] = NULL;
940 		else
941 		{
942 			IndexOrderByDistance *distances =
943 			palloc(sizeof(distances[0]) * so->numberOfOrderBys);
944 			int			i;
945 
946 			for (i = 0; i < so->numberOfOrderBys; i++)
947 			{
948 				int			offset = so->nonNullOrderByOffsets[i];
949 
950 				if (offset >= 0)
951 				{
952 					/* Copy non-NULL distance value */
953 					distances[i].value = nonNullDistances[offset];
954 					distances[i].isnull = false;
955 				}
956 				else
957 				{
958 					/* Set distance's NULL flag. */
959 					distances[i].value = 0.0;
960 					distances[i].isnull = true;
961 				}
962 			}
963 
964 			so->distances[so->nPtrs] = distances;
965 		}
966 	}
967 
968 	if (so->want_itup)
969 	{
970 		/*
971 		 * Reconstruct index data.  We have to copy the datum out of the temp
972 		 * context anyway, so we may as well create the tuple here.
973 		 */
974 		so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
975 												   &leafValue,
976 												   &isnull);
977 	}
978 	so->nPtrs++;
979 }
980 
981 bool
982 spggettuple(IndexScanDesc scan, ScanDirection dir)
983 {
984 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
985 
986 	if (dir != ForwardScanDirection)
987 		elog(ERROR, "SP-GiST only supports forward scan direction");
988 
989 	/* Copy want_itup to *so so we don't need to pass it around separately */
990 	so->want_itup = scan->xs_want_itup;
991 
992 	for (;;)
993 	{
994 		if (so->iPtr < so->nPtrs)
995 		{
996 			/* continuing to return reported tuples */
997 			scan->xs_heaptid = so->heapPtrs[so->iPtr];
998 			scan->xs_recheck = so->recheck[so->iPtr];
999 			scan->xs_hitup = so->reconTups[so->iPtr];
1000 
1001 			if (so->numberOfOrderBys > 0)
1002 				index_store_float8_orderby_distances(scan, so->orderByTypes,
1003 													 so->distances[so->iPtr],
1004 													 so->recheckDistances[so->iPtr]);
1005 			so->iPtr++;
1006 			return true;
1007 		}
1008 
1009 		if (so->numberOfOrderBys > 0)
1010 		{
1011 			/* Must pfree distances to avoid memory leak */
1012 			int			i;
1013 
1014 			for (i = 0; i < so->nPtrs; i++)
1015 				if (so->distances[i])
1016 					pfree(so->distances[i]);
1017 		}
1018 
1019 		if (so->want_itup)
1020 		{
1021 			/* Must pfree reconstructed tuples to avoid memory leak */
1022 			int			i;
1023 
1024 			for (i = 0; i < so->nPtrs; i++)
1025 				pfree(so->reconTups[i]);
1026 		}
1027 		so->iPtr = so->nPtrs = 0;
1028 
1029 		spgWalk(scan->indexRelation, so, false, storeGettuple,
1030 				scan->xs_snapshot);
1031 
1032 		if (so->nPtrs == 0)
1033 			break;				/* must have completed scan */
1034 	}
1035 
1036 	return false;
1037 }
1038 
1039 bool
1040 spgcanreturn(Relation index, int attno)
1041 {
1042 	SpGistCache *cache;
1043 
1044 	/* We can do it if the opclass config function says so */
1045 	cache = spgGetCache(index);
1046 
1047 	return cache->config.canReturnData;
1048 }
1049