1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_spgist.c
4  *	  implementation of quad tree over ranges mapped to 2d-points for SP-GiST.
5  *
6  * Quad tree is a data structure similar to a binary tree, but is adapted to
7  * 2d data. Each inner node of a quad tree contains a point (centroid) which
8  * divides the 2d-space into 4 quadrants. Each quadrant is associated with a
9  * child node.
10  *
11  * Ranges are mapped to 2d-points so that the lower bound is one dimension,
12  * and the upper bound is another. By convention, we visualize the lower bound
13  * to be the horizontal axis, and upper bound the vertical axis.
14  *
15  * One quirk with this mapping is the handling of empty ranges. An empty range
16  * doesn't have lower and upper bounds, so it cannot be mapped to 2d space in
17  * a straightforward way. To cope with that, the root node can have a 5th
18  * quadrant, which is reserved for empty ranges. Furthermore, there can be
19  * inner nodes in the tree with no centroid. They contain only two child nodes,
20  * one for empty ranges and another for non-empty ones. Such a node can appear
21  * as the root node, or in the tree under the 5th child of the root node (in
22  * which case it will only contain empty nodes).
23  *
24  * The SP-GiST picksplit function uses medians along both axes as the centroid.
25  * This implementation only uses the comparison function of the range element
26  * datatype, therefore it works for any range type.
27  *
28  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
29  * Portions Copyright (c) 1994, Regents of the University of California
30  *
31  * IDENTIFICATION
32  *			src/backend/utils/adt/rangetypes_spgist.c
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include "access/spgist.h"
40 #include "access/stratnum.h"
41 #include "catalog/pg_type.h"
42 #include "utils/builtins.h"
43 #include "utils/datum.h"
44 #include "utils/rangetypes.h"
45 
46 static int16 getQuadrant(TypeCacheEntry *typcache, const RangeType *centroid,
47 						 const RangeType *tst);
48 static int	bound_cmp(const void *a, const void *b, void *arg);
49 
50 static int	adjacent_inner_consistent(TypeCacheEntry *typcache,
51 									  const RangeBound *arg, const RangeBound *centroid,
52 									  const RangeBound *prev);
53 static int	adjacent_cmp_bounds(TypeCacheEntry *typcache, const RangeBound *arg,
54 								const RangeBound *centroid);
55 
56 /*
57  * SP-GiST 'config' interface function.
58  */
59 Datum
spg_range_quad_config(PG_FUNCTION_ARGS)60 spg_range_quad_config(PG_FUNCTION_ARGS)
61 {
62 	/* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
63 	spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
64 
65 	cfg->prefixType = ANYRANGEOID;
66 	cfg->labelType = VOIDOID;	/* we don't need node labels */
67 	cfg->canReturnData = true;
68 	cfg->longValuesOK = false;
69 	PG_RETURN_VOID();
70 }
71 
72 /*----------
73  * Determine which quadrant a 2d-mapped range falls into, relative to the
74  * centroid.
75  *
76  * Quadrants are numbered like this:
77  *
78  *	 4	|  1
79  *	----+----
80  *	 3	|  2
81  *
82  * Where the lower bound of range is the horizontal axis and upper bound the
83  * vertical axis.
84  *
85  * Ranges on one of the axes are taken to lie in the quadrant with higher value
86  * along perpendicular axis. That is, a value on the horizontal axis is taken
87  * to belong to quadrant 1 or 4, and a value on the vertical axis is taken to
88  * belong to quadrant 1 or 2. A range equal to centroid is taken to lie in
89  * quadrant 1.
90  *
91  * Empty ranges are taken to lie in the special quadrant 5.
92  *----------
93  */
94 static int16
getQuadrant(TypeCacheEntry * typcache,const RangeType * centroid,const RangeType * tst)95 getQuadrant(TypeCacheEntry *typcache, const RangeType *centroid, const RangeType *tst)
96 {
97 	RangeBound	centroidLower,
98 				centroidUpper;
99 	bool		centroidEmpty;
100 	RangeBound	lower,
101 				upper;
102 	bool		empty;
103 
104 	range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
105 					  &centroidEmpty);
106 	range_deserialize(typcache, tst, &lower, &upper, &empty);
107 
108 	if (empty)
109 		return 5;
110 
111 	if (range_cmp_bounds(typcache, &lower, &centroidLower) >= 0)
112 	{
113 		if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
114 			return 1;
115 		else
116 			return 2;
117 	}
118 	else
119 	{
120 		if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
121 			return 4;
122 		else
123 			return 3;
124 	}
125 }
126 
127 /*
128  * Choose SP-GiST function: choose path for addition of new range.
129  */
130 Datum
spg_range_quad_choose(PG_FUNCTION_ARGS)131 spg_range_quad_choose(PG_FUNCTION_ARGS)
132 {
133 	spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
134 	spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
135 	RangeType  *inRange = DatumGetRangeTypeP(in->datum),
136 			   *centroid;
137 	int16		quadrant;
138 	TypeCacheEntry *typcache;
139 
140 	if (in->allTheSame)
141 	{
142 		out->resultType = spgMatchNode;
143 		/* nodeN will be set by core */
144 		out->result.matchNode.levelAdd = 0;
145 		out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
146 		PG_RETURN_VOID();
147 	}
148 
149 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(inRange));
150 
151 	/*
152 	 * A node with no centroid divides ranges purely on whether they're empty
153 	 * or not. All empty ranges go to child node 0, all non-empty ranges go to
154 	 * node 1.
155 	 */
156 	if (!in->hasPrefix)
157 	{
158 		out->resultType = spgMatchNode;
159 		if (RangeIsEmpty(inRange))
160 			out->result.matchNode.nodeN = 0;
161 		else
162 			out->result.matchNode.nodeN = 1;
163 		out->result.matchNode.levelAdd = 1;
164 		out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
165 		PG_RETURN_VOID();
166 	}
167 
168 	centroid = DatumGetRangeTypeP(in->prefixDatum);
169 	quadrant = getQuadrant(typcache, centroid, inRange);
170 
171 	Assert(quadrant <= in->nNodes);
172 
173 	/* Select node matching to quadrant number */
174 	out->resultType = spgMatchNode;
175 	out->result.matchNode.nodeN = quadrant - 1;
176 	out->result.matchNode.levelAdd = 1;
177 	out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
178 
179 	PG_RETURN_VOID();
180 }
181 
182 /*
183  * Bound comparison for sorting.
184  */
185 static int
bound_cmp(const void * a,const void * b,void * arg)186 bound_cmp(const void *a, const void *b, void *arg)
187 {
188 	RangeBound *ba = (RangeBound *) a;
189 	RangeBound *bb = (RangeBound *) b;
190 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
191 
192 	return range_cmp_bounds(typcache, ba, bb);
193 }
194 
195 /*
196  * Picksplit SP-GiST function: split ranges into nodes. Select "centroid"
197  * range and distribute ranges according to quadrants.
198  */
199 Datum
spg_range_quad_picksplit(PG_FUNCTION_ARGS)200 spg_range_quad_picksplit(PG_FUNCTION_ARGS)
201 {
202 	spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
203 	spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
204 	int			i;
205 	int			j;
206 	int			nonEmptyCount;
207 	RangeType  *centroid;
208 	bool		empty;
209 	TypeCacheEntry *typcache;
210 
211 	/* Use the median values of lower and upper bounds as the centroid range */
212 	RangeBound *lowerBounds,
213 			   *upperBounds;
214 
215 	typcache = range_get_typcache(fcinfo,
216 								  RangeTypeGetOid(DatumGetRangeTypeP(in->datums[0])));
217 
218 	/* Allocate memory for bounds */
219 	lowerBounds = palloc(sizeof(RangeBound) * in->nTuples);
220 	upperBounds = palloc(sizeof(RangeBound) * in->nTuples);
221 	j = 0;
222 
223 	/* Deserialize bounds of ranges, count non-empty ranges */
224 	for (i = 0; i < in->nTuples; i++)
225 	{
226 		range_deserialize(typcache, DatumGetRangeTypeP(in->datums[i]),
227 						  &lowerBounds[j], &upperBounds[j], &empty);
228 		if (!empty)
229 			j++;
230 	}
231 	nonEmptyCount = j;
232 
233 	/*
234 	 * All the ranges are empty. The best we can do is to construct an inner
235 	 * node with no centroid, and put all ranges into node 0. If non-empty
236 	 * ranges are added later, they will be routed to node 1.
237 	 */
238 	if (nonEmptyCount == 0)
239 	{
240 		out->nNodes = 2;
241 		out->hasPrefix = false;
242 		/* Prefix is empty */
243 		out->prefixDatum = PointerGetDatum(NULL);
244 		out->nodeLabels = NULL;
245 
246 		out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
247 		out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
248 
249 		/* Place all ranges into node 0 */
250 		for (i = 0; i < in->nTuples; i++)
251 		{
252 			RangeType  *range = DatumGetRangeTypeP(in->datums[i]);
253 
254 			out->leafTupleDatums[i] = RangeTypePGetDatum(range);
255 			out->mapTuplesToNodes[i] = 0;
256 		}
257 		PG_RETURN_VOID();
258 	}
259 
260 	/* Sort range bounds in order to find medians */
261 	qsort_arg(lowerBounds, nonEmptyCount, sizeof(RangeBound),
262 			  bound_cmp, typcache);
263 	qsort_arg(upperBounds, nonEmptyCount, sizeof(RangeBound),
264 			  bound_cmp, typcache);
265 
266 	/* Construct "centroid" range from medians of lower and upper bounds */
267 	centroid = range_serialize(typcache, &lowerBounds[nonEmptyCount / 2],
268 							   &upperBounds[nonEmptyCount / 2], false);
269 	out->hasPrefix = true;
270 	out->prefixDatum = RangeTypePGetDatum(centroid);
271 
272 	/* Create node for empty ranges only if it is a root node */
273 	out->nNodes = (in->level == 0) ? 5 : 4;
274 	out->nodeLabels = NULL;		/* we don't need node labels */
275 
276 	out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
277 	out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
278 
279 	/*
280 	 * Assign ranges to corresponding nodes according to quadrants relative to
281 	 * "centroid" range.
282 	 */
283 	for (i = 0; i < in->nTuples; i++)
284 	{
285 		RangeType  *range = DatumGetRangeTypeP(in->datums[i]);
286 		int16		quadrant = getQuadrant(typcache, centroid, range);
287 
288 		out->leafTupleDatums[i] = RangeTypePGetDatum(range);
289 		out->mapTuplesToNodes[i] = quadrant - 1;
290 	}
291 
292 	PG_RETURN_VOID();
293 }
294 
295 /*
296  * SP-GiST consistent function for inner nodes: check which nodes are
297  * consistent with given set of queries.
298  */
299 Datum
spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)300 spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
301 {
302 	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
303 	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
304 	int			which;
305 	int			i;
306 	MemoryContext oldCtx;
307 
308 	/*
309 	 * For adjacent search we need also previous centroid (if any) to improve
310 	 * the precision of the consistent check. In this case needPrevious flag
311 	 * is set and centroid is passed into traversalValue.
312 	 */
313 	bool		needPrevious = false;
314 
315 	if (in->allTheSame)
316 	{
317 		/* Report that all nodes should be visited */
318 		out->nNodes = in->nNodes;
319 		out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
320 		for (i = 0; i < in->nNodes; i++)
321 			out->nodeNumbers[i] = i;
322 		PG_RETURN_VOID();
323 	}
324 
325 	if (!in->hasPrefix)
326 	{
327 		/*
328 		 * No centroid on this inner node. Such a node has two child nodes,
329 		 * the first for empty ranges, and the second for non-empty ones.
330 		 */
331 		Assert(in->nNodes == 2);
332 
333 		/*
334 		 * Nth bit of which variable means that (N - 1)th node should be
335 		 * visited. Initially all bits are set. Bits of nodes which should be
336 		 * skipped will be unset.
337 		 */
338 		which = (1 << 1) | (1 << 2);
339 		for (i = 0; i < in->nkeys; i++)
340 		{
341 			StrategyNumber strategy = in->scankeys[i].sk_strategy;
342 			bool		empty;
343 
344 			/*
345 			 * The only strategy when second argument of operator is not range
346 			 * is RANGESTRAT_CONTAINS_ELEM.
347 			 */
348 			if (strategy != RANGESTRAT_CONTAINS_ELEM)
349 				empty = RangeIsEmpty(DatumGetRangeTypeP(in->scankeys[i].sk_argument));
350 			else
351 				empty = false;
352 
353 			switch (strategy)
354 			{
355 				case RANGESTRAT_BEFORE:
356 				case RANGESTRAT_OVERLEFT:
357 				case RANGESTRAT_OVERLAPS:
358 				case RANGESTRAT_OVERRIGHT:
359 				case RANGESTRAT_AFTER:
360 				case RANGESTRAT_ADJACENT:
361 					/* These strategies return false if any argument is empty */
362 					if (empty)
363 						which = 0;
364 					else
365 						which &= (1 << 2);
366 					break;
367 
368 				case RANGESTRAT_CONTAINS:
369 
370 					/*
371 					 * All ranges contain an empty range. Only non-empty
372 					 * ranges can contain a non-empty range.
373 					 */
374 					if (!empty)
375 						which &= (1 << 2);
376 					break;
377 
378 				case RANGESTRAT_CONTAINED_BY:
379 
380 					/*
381 					 * Only an empty range is contained by an empty range.
382 					 * Both empty and non-empty ranges can be contained by a
383 					 * non-empty range.
384 					 */
385 					if (empty)
386 						which &= (1 << 1);
387 					break;
388 
389 				case RANGESTRAT_CONTAINS_ELEM:
390 					which &= (1 << 2);
391 					break;
392 
393 				case RANGESTRAT_EQ:
394 					if (empty)
395 						which &= (1 << 1);
396 					else
397 						which &= (1 << 2);
398 					break;
399 
400 				default:
401 					elog(ERROR, "unrecognized range strategy: %d", strategy);
402 					break;
403 			}
404 			if (which == 0)
405 				break;			/* no need to consider remaining conditions */
406 		}
407 	}
408 	else
409 	{
410 		RangeBound	centroidLower,
411 					centroidUpper;
412 		bool		centroidEmpty;
413 		TypeCacheEntry *typcache;
414 		RangeType  *centroid;
415 
416 		/* This node has a centroid. Fetch it. */
417 		centroid = DatumGetRangeTypeP(in->prefixDatum);
418 		typcache = range_get_typcache(fcinfo,
419 									  RangeTypeGetOid(DatumGetRangeTypeP(centroid)));
420 		range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
421 						  &centroidEmpty);
422 
423 		Assert(in->nNodes == 4 || in->nNodes == 5);
424 
425 		/*
426 		 * Nth bit of which variable means that (N - 1)th node (Nth quadrant)
427 		 * should be visited. Initially all bits are set. Bits of nodes which
428 		 * can be skipped will be unset.
429 		 */
430 		which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
431 
432 		for (i = 0; i < in->nkeys; i++)
433 		{
434 			StrategyNumber strategy;
435 			RangeBound	lower,
436 						upper;
437 			bool		empty;
438 			RangeType  *range = NULL;
439 
440 			RangeType  *prevCentroid = NULL;
441 			RangeBound	prevLower,
442 						prevUpper;
443 			bool		prevEmpty;
444 
445 			/* Restrictions on range bounds according to scan strategy */
446 			RangeBound *minLower = NULL,
447 					   *maxLower = NULL,
448 					   *minUpper = NULL,
449 					   *maxUpper = NULL;
450 
451 			/* Are the restrictions on range bounds inclusive? */
452 			bool		inclusive = true;
453 			bool		strictEmpty = true;
454 			int			cmp,
455 						which1,
456 						which2;
457 
458 			strategy = in->scankeys[i].sk_strategy;
459 
460 			/*
461 			 * RANGESTRAT_CONTAINS_ELEM is just like RANGESTRAT_CONTAINS, but
462 			 * the argument is a single element. Expand the single element to
463 			 * a range containing only the element, and treat it like
464 			 * RANGESTRAT_CONTAINS.
465 			 */
466 			if (strategy == RANGESTRAT_CONTAINS_ELEM)
467 			{
468 				lower.inclusive = true;
469 				lower.infinite = false;
470 				lower.lower = true;
471 				lower.val = in->scankeys[i].sk_argument;
472 
473 				upper.inclusive = true;
474 				upper.infinite = false;
475 				upper.lower = false;
476 				upper.val = in->scankeys[i].sk_argument;
477 
478 				empty = false;
479 
480 				strategy = RANGESTRAT_CONTAINS;
481 			}
482 			else
483 			{
484 				range = DatumGetRangeTypeP(in->scankeys[i].sk_argument);
485 				range_deserialize(typcache, range, &lower, &upper, &empty);
486 			}
487 
488 			/*
489 			 * Most strategies are handled by forming a bounding box from the
490 			 * search key, defined by a minLower, maxLower, minUpper,
491 			 * maxUpper. Some modify 'which' directly, to specify exactly
492 			 * which quadrants need to be visited.
493 			 *
494 			 * For most strategies, nothing matches an empty search key, and
495 			 * an empty range never matches a non-empty key. If a strategy
496 			 * does not behave like that wrt. empty ranges, set strictEmpty to
497 			 * false.
498 			 */
499 			switch (strategy)
500 			{
501 				case RANGESTRAT_BEFORE:
502 
503 					/*
504 					 * Range A is before range B if upper bound of A is lower
505 					 * than lower bound of B.
506 					 */
507 					maxUpper = &lower;
508 					inclusive = false;
509 					break;
510 
511 				case RANGESTRAT_OVERLEFT:
512 
513 					/*
514 					 * Range A is overleft to range B if upper bound of A is
515 					 * less than or equal to upper bound of B.
516 					 */
517 					maxUpper = &upper;
518 					break;
519 
520 				case RANGESTRAT_OVERLAPS:
521 
522 					/*
523 					 * Non-empty ranges overlap, if lower bound of each range
524 					 * is lower or equal to upper bound of the other range.
525 					 */
526 					maxLower = &upper;
527 					minUpper = &lower;
528 					break;
529 
530 				case RANGESTRAT_OVERRIGHT:
531 
532 					/*
533 					 * Range A is overright to range B if lower bound of A is
534 					 * greater than or equal to lower bound of B.
535 					 */
536 					minLower = &lower;
537 					break;
538 
539 				case RANGESTRAT_AFTER:
540 
541 					/*
542 					 * Range A is after range B if lower bound of A is greater
543 					 * than upper bound of B.
544 					 */
545 					minLower = &upper;
546 					inclusive = false;
547 					break;
548 
549 				case RANGESTRAT_ADJACENT:
550 					if (empty)
551 						break;	/* Skip to strictEmpty check. */
552 
553 					/*
554 					 * Previously selected quadrant could exclude possibility
555 					 * for lower or upper bounds to be adjacent. Deserialize
556 					 * previous centroid range if present for checking this.
557 					 */
558 					if (in->traversalValue)
559 					{
560 						prevCentroid = DatumGetRangeTypeP(in->traversalValue);
561 						range_deserialize(typcache, prevCentroid,
562 										  &prevLower, &prevUpper, &prevEmpty);
563 					}
564 
565 					/*
566 					 * For a range's upper bound to be adjacent to the
567 					 * argument's lower bound, it will be found along the line
568 					 * adjacent to (and just below) Y=lower. Therefore, if the
569 					 * argument's lower bound is less than the centroid's
570 					 * upper bound, the line falls in quadrants 2 and 3; if
571 					 * greater, the line falls in quadrants 1 and 4. (see
572 					 * adjacent_cmp_bounds for description of edge cases).
573 					 */
574 					cmp = adjacent_inner_consistent(typcache, &lower,
575 													&centroidUpper,
576 													prevCentroid ? &prevUpper : NULL);
577 					if (cmp > 0)
578 						which1 = (1 << 1) | (1 << 4);
579 					else if (cmp < 0)
580 						which1 = (1 << 2) | (1 << 3);
581 					else
582 						which1 = 0;
583 
584 					/*
585 					 * Also search for ranges's adjacent to argument's upper
586 					 * bound. They will be found along the line adjacent to
587 					 * (and just right of) X=upper, which falls in quadrants 3
588 					 * and 4, or 1 and 2.
589 					 */
590 					cmp = adjacent_inner_consistent(typcache, &upper,
591 													&centroidLower,
592 													prevCentroid ? &prevLower : NULL);
593 					if (cmp > 0)
594 						which2 = (1 << 1) | (1 << 2);
595 					else if (cmp < 0)
596 						which2 = (1 << 3) | (1 << 4);
597 					else
598 						which2 = 0;
599 
600 					/* We must chase down ranges adjacent to either bound. */
601 					which &= which1 | which2;
602 
603 					needPrevious = true;
604 					break;
605 
606 				case RANGESTRAT_CONTAINS:
607 
608 					/*
609 					 * Non-empty range A contains non-empty range B if lower
610 					 * bound of A is lower or equal to lower bound of range B
611 					 * and upper bound of range A is greater than or equal to upper
612 					 * bound of range A.
613 					 *
614 					 * All non-empty ranges contain an empty range.
615 					 */
616 					strictEmpty = false;
617 					if (!empty)
618 					{
619 						which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
620 						maxLower = &lower;
621 						minUpper = &upper;
622 					}
623 					break;
624 
625 				case RANGESTRAT_CONTAINED_BY:
626 					/* The opposite of contains. */
627 					strictEmpty = false;
628 					if (empty)
629 					{
630 						/* An empty range is only contained by an empty range */
631 						which &= (1 << 5);
632 					}
633 					else
634 					{
635 						minLower = &lower;
636 						maxUpper = &upper;
637 					}
638 					break;
639 
640 				case RANGESTRAT_EQ:
641 
642 					/*
643 					 * Equal range can be only in the same quadrant where
644 					 * argument would be placed to.
645 					 */
646 					strictEmpty = false;
647 					which &= (1 << getQuadrant(typcache, centroid, range));
648 					break;
649 
650 				default:
651 					elog(ERROR, "unrecognized range strategy: %d", strategy);
652 					break;
653 			}
654 
655 			if (strictEmpty)
656 			{
657 				if (empty)
658 				{
659 					/* Scan key is empty, no branches are satisfying */
660 					which = 0;
661 					break;
662 				}
663 				else
664 				{
665 					/* Shouldn't visit tree branch with empty ranges */
666 					which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
667 				}
668 			}
669 
670 			/*
671 			 * Using the bounding box, see which quadrants we have to descend
672 			 * into.
673 			 */
674 			if (minLower)
675 			{
676 				/*
677 				 * If the centroid's lower bound is less than or equal to the
678 				 * minimum lower bound, anything in the 3rd and 4th quadrants
679 				 * will have an even smaller lower bound, and thus can't
680 				 * match.
681 				 */
682 				if (range_cmp_bounds(typcache, &centroidLower, minLower) <= 0)
683 					which &= (1 << 1) | (1 << 2) | (1 << 5);
684 			}
685 			if (maxLower)
686 			{
687 				/*
688 				 * If the centroid's lower bound is greater than the maximum
689 				 * lower bound, anything in the 1st and 2nd quadrants will
690 				 * also have a greater than or equal lower bound, and thus
691 				 * can't match. If the centroid's lower bound is equal to the
692 				 * maximum lower bound, we can still exclude the 1st and 2nd
693 				 * quadrants if we're looking for a value strictly greater
694 				 * than the maximum.
695 				 */
696 				int			cmp;
697 
698 				cmp = range_cmp_bounds(typcache, &centroidLower, maxLower);
699 				if (cmp > 0 || (!inclusive && cmp == 0))
700 					which &= (1 << 3) | (1 << 4) | (1 << 5);
701 			}
702 			if (minUpper)
703 			{
704 				/*
705 				 * If the centroid's upper bound is less than or equal to the
706 				 * minimum upper bound, anything in the 2nd and 3rd quadrants
707 				 * will have an even smaller upper bound, and thus can't
708 				 * match.
709 				 */
710 				if (range_cmp_bounds(typcache, &centroidUpper, minUpper) <= 0)
711 					which &= (1 << 1) | (1 << 4) | (1 << 5);
712 			}
713 			if (maxUpper)
714 			{
715 				/*
716 				 * If the centroid's upper bound is greater than the maximum
717 				 * upper bound, anything in the 1st and 4th quadrants will
718 				 * also have a greater than or equal upper bound, and thus
719 				 * can't match. If the centroid's upper bound is equal to the
720 				 * maximum upper bound, we can still exclude the 1st and 4th
721 				 * quadrants if we're looking for a value strictly greater
722 				 * than the maximum.
723 				 */
724 				int			cmp;
725 
726 				cmp = range_cmp_bounds(typcache, &centroidUpper, maxUpper);
727 				if (cmp > 0 || (!inclusive && cmp == 0))
728 					which &= (1 << 2) | (1 << 3) | (1 << 5);
729 			}
730 
731 			if (which == 0)
732 				break;			/* no need to consider remaining conditions */
733 		}
734 	}
735 
736 	/* We must descend into the quadrant(s) identified by 'which' */
737 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
738 	if (needPrevious)
739 		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
740 	out->nNodes = 0;
741 
742 	/*
743 	 * Elements of traversalValues should be allocated in
744 	 * traversalMemoryContext
745 	 */
746 	oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext);
747 
748 	for (i = 1; i <= in->nNodes; i++)
749 	{
750 		if (which & (1 << i))
751 		{
752 			/* Save previous prefix if needed */
753 			if (needPrevious)
754 			{
755 				Datum		previousCentroid;
756 
757 				/*
758 				 * We know, that in->prefixDatum in this place is varlena,
759 				 * because it's range
760 				 */
761 				previousCentroid = datumCopy(in->prefixDatum, false, -1);
762 				out->traversalValues[out->nNodes] = (void *) previousCentroid;
763 			}
764 			out->nodeNumbers[out->nNodes] = i - 1;
765 			out->nNodes++;
766 		}
767 	}
768 
769 	MemoryContextSwitchTo(oldCtx);
770 
771 	PG_RETURN_VOID();
772 }
773 
774 /*
775  * adjacent_cmp_bounds
776  *
777  * Given an argument and centroid bound, this function determines if any
778  * bounds that are adjacent to the argument are smaller than, or greater than
779  * or equal to centroid. For brevity, we call the arg < centroid "left", and
780  * arg >= centroid case "right". This corresponds to how the quadrants are
781  * arranged, if you imagine that "left" is equivalent to "down" and "right"
782  * is equivalent to "up".
783  *
784  * For the "left" case, returns -1, and for the "right" case, returns 1.
785  */
786 static int
adjacent_cmp_bounds(TypeCacheEntry * typcache,const RangeBound * arg,const RangeBound * centroid)787 adjacent_cmp_bounds(TypeCacheEntry *typcache, const RangeBound *arg,
788 					const RangeBound *centroid)
789 {
790 	int			cmp;
791 
792 	Assert(arg->lower != centroid->lower);
793 
794 	cmp = range_cmp_bounds(typcache, arg, centroid);
795 
796 	if (centroid->lower)
797 	{
798 		/*------
799 		 * The argument is an upper bound, we are searching for adjacent lower
800 		 * bounds. A matching adjacent lower bound must be *larger* than the
801 		 * argument, but only just.
802 		 *
803 		 * The following table illustrates the desired result with a fixed
804 		 * argument bound, and different centroids. The CMP column shows
805 		 * the value of 'cmp' variable, and ADJ shows whether the argument
806 		 * and centroid are adjacent, per bounds_adjacent(). (N) means we
807 		 * don't need to check for that case, because it's implied by CMP.
808 		 * With the argument range [..., 500), the adjacent range we're
809 		 * searching for is [500, ...):
810 		 *
811 		 *	ARGUMENT   CENTROID		CMP   ADJ
812 		 *	[..., 500) [498, ...)	 >	  (N)	[500, ...) is to the right
813 		 *	[..., 500) [499, ...)	 =	  (N)	[500, ...) is to the right
814 		 *	[..., 500) [500, ...)	 <	   Y	[500, ...) is to the right
815 		 *	[..., 500) [501, ...)	 <	   N	[500, ...) is to the left
816 		 *
817 		 * So, we must search left when the argument is smaller than, and not
818 		 * adjacent, to the centroid. Otherwise search right.
819 		 *------
820 		 */
821 		if (cmp < 0 && !bounds_adjacent(typcache, *arg, *centroid))
822 			return -1;
823 		else
824 			return 1;
825 	}
826 	else
827 	{
828 		/*------
829 		 * The argument is a lower bound, we are searching for adjacent upper
830 		 * bounds. A matching adjacent upper bound must be *smaller* than the
831 		 * argument, but only just.
832 		 *
833 		 *	ARGUMENT   CENTROID		CMP   ADJ
834 		 *	[500, ...) [..., 499)	 >	  (N)	[..., 500) is to the right
835 		 *	[500, ...) [..., 500)	 >	  (Y)	[..., 500) is to the right
836 		 *	[500, ...) [..., 501)	 =	  (N)	[..., 500) is to the left
837 		 *	[500, ...) [..., 502)	 <	  (N)	[..., 500) is to the left
838 		 *
839 		 * We must search left when the argument is smaller than or equal to
840 		 * the centroid. Otherwise search right. We don't need to check
841 		 * whether the argument is adjacent with the centroid, because it
842 		 * doesn't matter.
843 		 *------
844 		 */
845 		if (cmp <= 0)
846 			return -1;
847 		else
848 			return 1;
849 	}
850 }
851 
852 /*----------
853  * adjacent_inner_consistent
854  *
855  * Like adjacent_cmp_bounds, but also takes into account the previous
856  * level's centroid. We might've traversed left (or right) at the previous
857  * node, in search for ranges adjacent to the other bound, even though we
858  * already ruled out the possibility for any matches in that direction for
859  * this bound. By comparing the argument with the previous centroid, and
860  * the previous centroid with the current centroid, we can determine which
861  * direction we should've moved in at previous level, and which direction we
862  * actually moved.
863  *
864  * If there can be any matches to the left, returns -1. If to the right,
865  * returns 1. If there can be no matches below this centroid, because we
866  * already ruled them out at the previous level, returns 0.
867  *
868  * XXX: Comparing just the previous and current level isn't foolproof; we
869  * might still search some branches unnecessarily. For example, imagine that
870  * we are searching for value 15, and we traverse the following centroids
871  * (only considering one bound for the moment):
872  *
873  * Level 1: 20
874  * Level 2: 50
875  * Level 3: 25
876  *
877  * At this point, previous centroid is 50, current centroid is 25, and the
878  * target value is to the left. But because we already moved right from
879  * centroid 20 to 50 in the first level, there cannot be any values < 20 in
880  * the current branch. But we don't know that just by looking at the previous
881  * and current centroid, so we traverse left, unnecessarily. The reason we are
882  * down this branch is that we're searching for matches with the *other*
883  * bound. If we kept track of which bound we are searching for explicitly,
884  * instead of deducing that from the previous and current centroid, we could
885  * avoid some unnecessary work.
886  *----------
887  */
888 static int
adjacent_inner_consistent(TypeCacheEntry * typcache,const RangeBound * arg,const RangeBound * centroid,const RangeBound * prev)889 adjacent_inner_consistent(TypeCacheEntry *typcache, const RangeBound *arg,
890 						  const RangeBound *centroid, const RangeBound *prev)
891 {
892 	if (prev)
893 	{
894 		int			prevcmp;
895 		int			cmp;
896 
897 		/*
898 		 * Which direction were we supposed to traverse at previous level,
899 		 * left or right?
900 		 */
901 		prevcmp = adjacent_cmp_bounds(typcache, arg, prev);
902 
903 		/* and which direction did we actually go? */
904 		cmp = range_cmp_bounds(typcache, centroid, prev);
905 
906 		/* if the two don't agree, there's nothing to see here */
907 		if ((prevcmp < 0 && cmp >= 0) || (prevcmp > 0 && cmp < 0))
908 			return 0;
909 	}
910 
911 	return adjacent_cmp_bounds(typcache, arg, centroid);
912 }
913 
914 /*
915  * SP-GiST consistent function for leaf nodes: check leaf value against query
916  * using corresponding function.
917  */
918 Datum
spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)919 spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
920 {
921 	spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
922 	spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
923 	RangeType  *leafRange = DatumGetRangeTypeP(in->leafDatum);
924 	TypeCacheEntry *typcache;
925 	bool		res;
926 	int			i;
927 
928 	/* all tests are exact */
929 	out->recheck = false;
930 
931 	/* leafDatum is what it is... */
932 	out->leafValue = in->leafDatum;
933 
934 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(leafRange));
935 
936 	/* Perform the required comparison(s) */
937 	res = true;
938 	for (i = 0; i < in->nkeys; i++)
939 	{
940 		Datum		keyDatum = in->scankeys[i].sk_argument;
941 
942 		/* Call the function corresponding to the scan strategy */
943 		switch (in->scankeys[i].sk_strategy)
944 		{
945 			case RANGESTRAT_BEFORE:
946 				res = range_before_internal(typcache, leafRange,
947 											DatumGetRangeTypeP(keyDatum));
948 				break;
949 			case RANGESTRAT_OVERLEFT:
950 				res = range_overleft_internal(typcache, leafRange,
951 											  DatumGetRangeTypeP(keyDatum));
952 				break;
953 			case RANGESTRAT_OVERLAPS:
954 				res = range_overlaps_internal(typcache, leafRange,
955 											  DatumGetRangeTypeP(keyDatum));
956 				break;
957 			case RANGESTRAT_OVERRIGHT:
958 				res = range_overright_internal(typcache, leafRange,
959 											   DatumGetRangeTypeP(keyDatum));
960 				break;
961 			case RANGESTRAT_AFTER:
962 				res = range_after_internal(typcache, leafRange,
963 										   DatumGetRangeTypeP(keyDatum));
964 				break;
965 			case RANGESTRAT_ADJACENT:
966 				res = range_adjacent_internal(typcache, leafRange,
967 											  DatumGetRangeTypeP(keyDatum));
968 				break;
969 			case RANGESTRAT_CONTAINS:
970 				res = range_contains_internal(typcache, leafRange,
971 											  DatumGetRangeTypeP(keyDatum));
972 				break;
973 			case RANGESTRAT_CONTAINED_BY:
974 				res = range_contained_by_internal(typcache, leafRange,
975 												  DatumGetRangeTypeP(keyDatum));
976 				break;
977 			case RANGESTRAT_CONTAINS_ELEM:
978 				res = range_contains_elem_internal(typcache, leafRange,
979 												   keyDatum);
980 				break;
981 			case RANGESTRAT_EQ:
982 				res = range_eq_internal(typcache, leafRange,
983 										DatumGetRangeTypeP(keyDatum));
984 				break;
985 			default:
986 				elog(ERROR, "unrecognized range strategy: %d",
987 					 in->scankeys[i].sk_strategy);
988 				break;
989 		}
990 
991 		/*
992 		 * If leaf datum doesn't match to a query key, no need to check
993 		 * subsequent keys.
994 		 */
995 		if (!res)
996 			break;
997 	}
998 
999 	PG_RETURN_BOOL(res);
1000 }
1001