1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_spgist.c
4  *	  implementation of quad tree over ranges mapped to 2d-points for SP-GiST.
5  *
6  * Quad tree is a data structure similar to a binary tree, but is adapted to
7  * 2d data. Each inner node of a quad tree contains a point (centroid) which
8  * divides the 2d-space into 4 quadrants. Each quadrant is associated with a
9  * child node.
10  *
11  * Ranges are mapped to 2d-points so that the lower bound is one dimension,
12  * and the upper bound is another. By convention, we visualize the lower bound
13  * to be the horizontal axis, and upper bound the vertical axis.
14  *
15  * One quirk with this mapping is the handling of empty ranges. An empty range
16  * doesn't have lower and upper bounds, so it cannot be mapped to 2d space in
17  * a straightforward way. To cope with that, the root node can have a 5th
18  * quadrant, which is reserved for empty ranges. Furthermore, there can be
19  * inner nodes in the tree with no centroid. They contain only two child nodes,
20  * one for empty ranges and another for non-empty ones. Such a node can appear
21  * as the root node, or in the tree under the 5th child of the root node (in
22  * which case it will only contain empty nodes).
23  *
24  * The SP-GiST picksplit function uses medians along both axes as the centroid.
25  * This implementation only uses the comparison function of the range element
26  * datatype, therefore it works for any range type.
27  *
28  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
29  * Portions Copyright (c) 1994, Regents of the University of California
30  *
31  * IDENTIFICATION
32  *			src/backend/utils/adt/rangetypes_spgist.c
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include "access/spgist.h"
40 #include "access/stratnum.h"
41 #include "catalog/pg_type.h"
42 #include "utils/builtins.h"
43 #include "utils/datum.h"
44 #include "utils/rangetypes.h"
45 
46 static int16 getQuadrant(TypeCacheEntry *typcache, RangeType *centroid,
47 			RangeType *tst);
48 static int	bound_cmp(const void *a, const void *b, void *arg);
49 
50 static int adjacent_inner_consistent(TypeCacheEntry *typcache,
51 						  RangeBound *arg, RangeBound *centroid,
52 						  RangeBound *prev);
53 static int adjacent_cmp_bounds(TypeCacheEntry *typcache, RangeBound *arg,
54 					RangeBound *centroid);
55 
56 /*
57  * SP-GiST 'config' interface function.
58  */
59 Datum
spg_range_quad_config(PG_FUNCTION_ARGS)60 spg_range_quad_config(PG_FUNCTION_ARGS)
61 {
62 	/* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
63 	spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
64 
65 	cfg->prefixType = ANYRANGEOID;
66 	cfg->labelType = VOIDOID;	/* we don't need node labels */
67 	cfg->canReturnData = true;
68 	cfg->longValuesOK = false;
69 	PG_RETURN_VOID();
70 }
71 
72 /*----------
73  * Determine which quadrant a 2d-mapped range falls into, relative to the
74  * centroid.
75  *
76  * Quadrants are numbered like this:
77  *
78  *	 4	|  1
79  *	----+----
80  *	 3	|  2
81  *
82  * Where the lower bound of range is the horizontal axis and upper bound the
83  * vertical axis.
84  *
85  * Ranges on one of the axes are taken to lie in the quadrant with higher value
86  * along perpendicular axis. That is, a value on the horizontal axis is taken
87  * to belong to quadrant 1 or 4, and a value on the vertical axis is taken to
88  * belong to quadrant 1 or 2. A range equal to centroid is taken to lie in
89  * quadrant 1.
90  *
91  * Empty ranges are taken to lie in the special quadrant 5.
92  *----------
93  */
94 static int16
getQuadrant(TypeCacheEntry * typcache,RangeType * centroid,RangeType * tst)95 getQuadrant(TypeCacheEntry *typcache, RangeType *centroid, RangeType *tst)
96 {
97 	RangeBound	centroidLower,
98 				centroidUpper;
99 	bool		centroidEmpty;
100 	RangeBound	lower,
101 				upper;
102 	bool		empty;
103 
104 	range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
105 					  &centroidEmpty);
106 	range_deserialize(typcache, tst, &lower, &upper, &empty);
107 
108 	if (empty)
109 		return 5;
110 
111 	if (range_cmp_bounds(typcache, &lower, &centroidLower) >= 0)
112 	{
113 		if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
114 			return 1;
115 		else
116 			return 2;
117 	}
118 	else
119 	{
120 		if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
121 			return 4;
122 		else
123 			return 3;
124 	}
125 }
126 
127 /*
128  * Choose SP-GiST function: choose path for addition of new range.
129  */
130 Datum
spg_range_quad_choose(PG_FUNCTION_ARGS)131 spg_range_quad_choose(PG_FUNCTION_ARGS)
132 {
133 	spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
134 	spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
135 	RangeType  *inRange = DatumGetRangeTypeP(in->datum),
136 			   *centroid;
137 	int16		quadrant;
138 	TypeCacheEntry *typcache;
139 
140 	if (in->allTheSame)
141 	{
142 		out->resultType = spgMatchNode;
143 		/* nodeN will be set by core */
144 		out->result.matchNode.levelAdd = 0;
145 		out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
146 		PG_RETURN_VOID();
147 	}
148 
149 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(inRange));
150 
151 	/*
152 	 * A node with no centroid divides ranges purely on whether they're empty
153 	 * or not. All empty ranges go to child node 0, all non-empty ranges go to
154 	 * node 1.
155 	 */
156 	if (!in->hasPrefix)
157 	{
158 		out->resultType = spgMatchNode;
159 		if (RangeIsEmpty(inRange))
160 			out->result.matchNode.nodeN = 0;
161 		else
162 			out->result.matchNode.nodeN = 1;
163 		out->result.matchNode.levelAdd = 1;
164 		out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
165 		PG_RETURN_VOID();
166 	}
167 
168 	centroid = DatumGetRangeTypeP(in->prefixDatum);
169 	quadrant = getQuadrant(typcache, centroid, inRange);
170 
171 	Assert(quadrant <= in->nNodes);
172 
173 	/* Select node matching to quadrant number */
174 	out->resultType = spgMatchNode;
175 	out->result.matchNode.nodeN = quadrant - 1;
176 	out->result.matchNode.levelAdd = 1;
177 	out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
178 
179 	PG_RETURN_VOID();
180 }
181 
182 /*
183  * Bound comparison for sorting.
184  */
185 static int
bound_cmp(const void * a,const void * b,void * arg)186 bound_cmp(const void *a, const void *b, void *arg)
187 {
188 	RangeBound *ba = (RangeBound *) a;
189 	RangeBound *bb = (RangeBound *) b;
190 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
191 
192 	return range_cmp_bounds(typcache, ba, bb);
193 }
194 
195 /*
196  * Picksplit SP-GiST function: split ranges into nodes. Select "centroid"
197  * range and distribute ranges according to quadrants.
198  */
199 Datum
spg_range_quad_picksplit(PG_FUNCTION_ARGS)200 spg_range_quad_picksplit(PG_FUNCTION_ARGS)
201 {
202 	spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
203 	spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
204 	int			i;
205 	int			j;
206 	int			nonEmptyCount;
207 	RangeType  *centroid;
208 	bool		empty;
209 	TypeCacheEntry *typcache;
210 
211 	/* Use the median values of lower and upper bounds as the centroid range */
212 	RangeBound *lowerBounds,
213 			   *upperBounds;
214 
215 	typcache = range_get_typcache(fcinfo,
216 								  RangeTypeGetOid(DatumGetRangeTypeP(in->datums[0])));
217 
218 	/* Allocate memory for bounds */
219 	lowerBounds = palloc(sizeof(RangeBound) * in->nTuples);
220 	upperBounds = palloc(sizeof(RangeBound) * in->nTuples);
221 	j = 0;
222 
223 	/* Deserialize bounds of ranges, count non-empty ranges */
224 	for (i = 0; i < in->nTuples; i++)
225 	{
226 		range_deserialize(typcache, DatumGetRangeTypeP(in->datums[i]),
227 						  &lowerBounds[j], &upperBounds[j], &empty);
228 		if (!empty)
229 			j++;
230 	}
231 	nonEmptyCount = j;
232 
233 	/*
234 	 * All the ranges are empty. The best we can do is to construct an inner
235 	 * node with no centroid, and put all ranges into node 0. If non-empty
236 	 * ranges are added later, they will be routed to node 1.
237 	 */
238 	if (nonEmptyCount == 0)
239 	{
240 		out->nNodes = 2;
241 		out->hasPrefix = false;
242 		/* Prefix is empty */
243 		out->prefixDatum = PointerGetDatum(NULL);
244 		out->nodeLabels = NULL;
245 
246 		out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
247 		out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
248 
249 		/* Place all ranges into node 0 */
250 		for (i = 0; i < in->nTuples; i++)
251 		{
252 			RangeType  *range = DatumGetRangeTypeP(in->datums[i]);
253 
254 			out->leafTupleDatums[i] = RangeTypePGetDatum(range);
255 			out->mapTuplesToNodes[i] = 0;
256 		}
257 		PG_RETURN_VOID();
258 	}
259 
260 	/* Sort range bounds in order to find medians */
261 	qsort_arg(lowerBounds, nonEmptyCount, sizeof(RangeBound),
262 			  bound_cmp, typcache);
263 	qsort_arg(upperBounds, nonEmptyCount, sizeof(RangeBound),
264 			  bound_cmp, typcache);
265 
266 	/* Construct "centroid" range from medians of lower and upper bounds */
267 	centroid = range_serialize(typcache, &lowerBounds[nonEmptyCount / 2],
268 							   &upperBounds[nonEmptyCount / 2], false);
269 	out->hasPrefix = true;
270 	out->prefixDatum = RangeTypePGetDatum(centroid);
271 
272 	/* Create node for empty ranges only if it is a root node */
273 	out->nNodes = (in->level == 0) ? 5 : 4;
274 	out->nodeLabels = NULL;		/* we don't need node labels */
275 
276 	out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
277 	out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
278 
279 	/*
280 	 * Assign ranges to corresponding nodes according to quadrants relative to
281 	 * "centroid" range.
282 	 */
283 	for (i = 0; i < in->nTuples; i++)
284 	{
285 		RangeType  *range = DatumGetRangeTypeP(in->datums[i]);
286 		int16		quadrant = getQuadrant(typcache, centroid, range);
287 
288 		out->leafTupleDatums[i] = RangeTypePGetDatum(range);
289 		out->mapTuplesToNodes[i] = quadrant - 1;
290 	}
291 
292 	PG_RETURN_VOID();
293 }
294 
295 /*
296  * SP-GiST consistent function for inner nodes: check which nodes are
297  * consistent with given set of queries.
298  */
299 Datum
spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)300 spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
301 {
302 	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
303 	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
304 	int			which;
305 	int			i;
306 	MemoryContext oldCtx;
307 
308 	/*
309 	 * For adjacent search we need also previous centroid (if any) to improve
310 	 * the precision of the consistent check. In this case needPrevious flag
311 	 * is set and centroid is passed into traversalValue.
312 	 */
313 	bool		needPrevious = false;
314 
315 	if (in->allTheSame)
316 	{
317 		/* Report that all nodes should be visited */
318 		out->nNodes = in->nNodes;
319 		out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
320 		for (i = 0; i < in->nNodes; i++)
321 			out->nodeNumbers[i] = i;
322 		PG_RETURN_VOID();
323 	}
324 
325 	if (!in->hasPrefix)
326 	{
327 		/*
328 		 * No centroid on this inner node. Such a node has two child nodes,
329 		 * the first for empty ranges, and the second for non-empty ones.
330 		 */
331 		Assert(in->nNodes == 2);
332 
333 		/*
334 		 * Nth bit of which variable means that (N - 1)th node should be
335 		 * visited. Initially all bits are set. Bits of nodes which should be
336 		 * skipped will be unset.
337 		 */
338 		which = (1 << 1) | (1 << 2);
339 		for (i = 0; i < in->nkeys; i++)
340 		{
341 			StrategyNumber strategy = in->scankeys[i].sk_strategy;
342 			bool		empty;
343 
344 			/*
345 			 * The only strategy when second argument of operator is not range
346 			 * is RANGESTRAT_CONTAINS_ELEM.
347 			 */
348 			if (strategy != RANGESTRAT_CONTAINS_ELEM)
349 				empty = RangeIsEmpty(
350 									 DatumGetRangeTypeP(in->scankeys[i].sk_argument));
351 			else
352 				empty = false;
353 
354 			switch (strategy)
355 			{
356 				case RANGESTRAT_BEFORE:
357 				case RANGESTRAT_OVERLEFT:
358 				case RANGESTRAT_OVERLAPS:
359 				case RANGESTRAT_OVERRIGHT:
360 				case RANGESTRAT_AFTER:
361 				case RANGESTRAT_ADJACENT:
362 					/* These strategies return false if any argument is empty */
363 					if (empty)
364 						which = 0;
365 					else
366 						which &= (1 << 2);
367 					break;
368 
369 				case RANGESTRAT_CONTAINS:
370 
371 					/*
372 					 * All ranges contain an empty range. Only non-empty
373 					 * ranges can contain a non-empty range.
374 					 */
375 					if (!empty)
376 						which &= (1 << 2);
377 					break;
378 
379 				case RANGESTRAT_CONTAINED_BY:
380 
381 					/*
382 					 * Only an empty range is contained by an empty range.
383 					 * Both empty and non-empty ranges can be contained by a
384 					 * non-empty range.
385 					 */
386 					if (empty)
387 						which &= (1 << 1);
388 					break;
389 
390 				case RANGESTRAT_CONTAINS_ELEM:
391 					which &= (1 << 2);
392 					break;
393 
394 				case RANGESTRAT_EQ:
395 					if (empty)
396 						which &= (1 << 1);
397 					else
398 						which &= (1 << 2);
399 					break;
400 
401 				default:
402 					elog(ERROR, "unrecognized range strategy: %d", strategy);
403 					break;
404 			}
405 			if (which == 0)
406 				break;			/* no need to consider remaining conditions */
407 		}
408 	}
409 	else
410 	{
411 		RangeBound	centroidLower,
412 					centroidUpper;
413 		bool		centroidEmpty;
414 		TypeCacheEntry *typcache;
415 		RangeType  *centroid;
416 
417 		/* This node has a centroid. Fetch it. */
418 		centroid = DatumGetRangeTypeP(in->prefixDatum);
419 		typcache = range_get_typcache(fcinfo,
420 									  RangeTypeGetOid(DatumGetRangeTypeP(centroid)));
421 		range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
422 						  &centroidEmpty);
423 
424 		Assert(in->nNodes == 4 || in->nNodes == 5);
425 
426 		/*
427 		 * Nth bit of which variable means that (N - 1)th node (Nth quadrant)
428 		 * should be visited. Initially all bits are set. Bits of nodes which
429 		 * can be skipped will be unset.
430 		 */
431 		which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
432 
433 		for (i = 0; i < in->nkeys; i++)
434 		{
435 			StrategyNumber strategy;
436 			RangeBound	lower,
437 						upper;
438 			bool		empty;
439 			RangeType  *range = NULL;
440 
441 			RangeType  *prevCentroid = NULL;
442 			RangeBound	prevLower,
443 						prevUpper;
444 			bool		prevEmpty;
445 
446 			/* Restrictions on range bounds according to scan strategy */
447 			RangeBound *minLower = NULL,
448 					   *maxLower = NULL,
449 					   *minUpper = NULL,
450 					   *maxUpper = NULL;
451 
452 			/* Are the restrictions on range bounds inclusive? */
453 			bool		inclusive = true;
454 			bool		strictEmpty = true;
455 			int			cmp,
456 						which1,
457 						which2;
458 
459 			strategy = in->scankeys[i].sk_strategy;
460 
461 			/*
462 			 * RANGESTRAT_CONTAINS_ELEM is just like RANGESTRAT_CONTAINS, but
463 			 * the argument is a single element. Expand the single element to
464 			 * a range containing only the element, and treat it like
465 			 * RANGESTRAT_CONTAINS.
466 			 */
467 			if (strategy == RANGESTRAT_CONTAINS_ELEM)
468 			{
469 				lower.inclusive = true;
470 				lower.infinite = false;
471 				lower.lower = true;
472 				lower.val = in->scankeys[i].sk_argument;
473 
474 				upper.inclusive = true;
475 				upper.infinite = false;
476 				upper.lower = false;
477 				upper.val = in->scankeys[i].sk_argument;
478 
479 				empty = false;
480 
481 				strategy = RANGESTRAT_CONTAINS;
482 			}
483 			else
484 			{
485 				range = DatumGetRangeTypeP(in->scankeys[i].sk_argument);
486 				range_deserialize(typcache, range, &lower, &upper, &empty);
487 			}
488 
489 			/*
490 			 * Most strategies are handled by forming a bounding box from the
491 			 * search key, defined by a minLower, maxLower, minUpper,
492 			 * maxUpper. Some modify 'which' directly, to specify exactly
493 			 * which quadrants need to be visited.
494 			 *
495 			 * For most strategies, nothing matches an empty search key, and
496 			 * an empty range never matches a non-empty key. If a strategy
497 			 * does not behave like that wrt. empty ranges, set strictEmpty to
498 			 * false.
499 			 */
500 			switch (strategy)
501 			{
502 				case RANGESTRAT_BEFORE:
503 
504 					/*
505 					 * Range A is before range B if upper bound of A is lower
506 					 * than lower bound of B.
507 					 */
508 					maxUpper = &lower;
509 					inclusive = false;
510 					break;
511 
512 				case RANGESTRAT_OVERLEFT:
513 
514 					/*
515 					 * Range A is overleft to range B if upper bound of A is
516 					 * less than or equal to upper bound of B.
517 					 */
518 					maxUpper = &upper;
519 					break;
520 
521 				case RANGESTRAT_OVERLAPS:
522 
523 					/*
524 					 * Non-empty ranges overlap, if lower bound of each range
525 					 * is lower or equal to upper bound of the other range.
526 					 */
527 					maxLower = &upper;
528 					minUpper = &lower;
529 					break;
530 
531 				case RANGESTRAT_OVERRIGHT:
532 
533 					/*
534 					 * Range A is overright to range B if lower bound of A is
535 					 * greater than or equal to lower bound of B.
536 					 */
537 					minLower = &lower;
538 					break;
539 
540 				case RANGESTRAT_AFTER:
541 
542 					/*
543 					 * Range A is after range B if lower bound of A is greater
544 					 * than upper bound of B.
545 					 */
546 					minLower = &upper;
547 					inclusive = false;
548 					break;
549 
550 				case RANGESTRAT_ADJACENT:
551 					if (empty)
552 						break;	/* Skip to strictEmpty check. */
553 
554 					/*
555 					 * Previously selected quadrant could exclude possibility
556 					 * for lower or upper bounds to be adjacent. Deserialize
557 					 * previous centroid range if present for checking this.
558 					 */
559 					if (in->traversalValue)
560 					{
561 						prevCentroid = DatumGetRangeTypeP(in->traversalValue);
562 						range_deserialize(typcache, prevCentroid,
563 										  &prevLower, &prevUpper, &prevEmpty);
564 					}
565 
566 					/*
567 					 * For a range's upper bound to be adjacent to the
568 					 * argument's lower bound, it will be found along the line
569 					 * adjacent to (and just below) Y=lower. Therefore, if the
570 					 * argument's lower bound is less than the centroid's
571 					 * upper bound, the line falls in quadrants 2 and 3; if
572 					 * greater, the line falls in quadrants 1 and 4. (see
573 					 * adjacent_cmp_bounds for description of edge cases).
574 					 */
575 					cmp = adjacent_inner_consistent(typcache, &lower,
576 													&centroidUpper,
577 													prevCentroid ? &prevUpper : NULL);
578 					if (cmp > 0)
579 						which1 = (1 << 1) | (1 << 4);
580 					else if (cmp < 0)
581 						which1 = (1 << 2) | (1 << 3);
582 					else
583 						which1 = 0;
584 
585 					/*
586 					 * Also search for ranges's adjacent to argument's upper
587 					 * bound. They will be found along the line adjacent to
588 					 * (and just right of) X=upper, which falls in quadrants 3
589 					 * and 4, or 1 and 2.
590 					 */
591 					cmp = adjacent_inner_consistent(typcache, &upper,
592 													&centroidLower,
593 													prevCentroid ? &prevLower : NULL);
594 					if (cmp > 0)
595 						which2 = (1 << 1) | (1 << 2);
596 					else if (cmp < 0)
597 						which2 = (1 << 3) | (1 << 4);
598 					else
599 						which2 = 0;
600 
601 					/* We must chase down ranges adjacent to either bound. */
602 					which &= which1 | which2;
603 
604 					needPrevious = true;
605 					break;
606 
607 				case RANGESTRAT_CONTAINS:
608 
609 					/*
610 					 * Non-empty range A contains non-empty range B if lower
611 					 * bound of A is lower or equal to lower bound of range B
612 					 * and upper bound of range A is greater than or equal to upper
613 					 * bound of range A.
614 					 *
615 					 * All non-empty ranges contain an empty range.
616 					 */
617 					strictEmpty = false;
618 					if (!empty)
619 					{
620 						which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
621 						maxLower = &lower;
622 						minUpper = &upper;
623 					}
624 					break;
625 
626 				case RANGESTRAT_CONTAINED_BY:
627 					/* The opposite of contains. */
628 					strictEmpty = false;
629 					if (empty)
630 					{
631 						/* An empty range is only contained by an empty range */
632 						which &= (1 << 5);
633 					}
634 					else
635 					{
636 						minLower = &lower;
637 						maxUpper = &upper;
638 					}
639 					break;
640 
641 				case RANGESTRAT_EQ:
642 
643 					/*
644 					 * Equal range can be only in the same quadrant where
645 					 * argument would be placed to.
646 					 */
647 					strictEmpty = false;
648 					which &= (1 << getQuadrant(typcache, centroid, range));
649 					break;
650 
651 				default:
652 					elog(ERROR, "unrecognized range strategy: %d", strategy);
653 					break;
654 			}
655 
656 			if (strictEmpty)
657 			{
658 				if (empty)
659 				{
660 					/* Scan key is empty, no branches are satisfying */
661 					which = 0;
662 					break;
663 				}
664 				else
665 				{
666 					/* Shouldn't visit tree branch with empty ranges */
667 					which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
668 				}
669 			}
670 
671 			/*
672 			 * Using the bounding box, see which quadrants we have to descend
673 			 * into.
674 			 */
675 			if (minLower)
676 			{
677 				/*
678 				 * If the centroid's lower bound is less than or equal to the
679 				 * minimum lower bound, anything in the 3rd and 4th quadrants
680 				 * will have an even smaller lower bound, and thus can't
681 				 * match.
682 				 */
683 				if (range_cmp_bounds(typcache, &centroidLower, minLower) <= 0)
684 					which &= (1 << 1) | (1 << 2) | (1 << 5);
685 			}
686 			if (maxLower)
687 			{
688 				/*
689 				 * If the centroid's lower bound is greater than the maximum
690 				 * lower bound, anything in the 1st and 2nd quadrants will
691 				 * also have a greater than or equal lower bound, and thus
692 				 * can't match. If the centroid's lower bound is equal to the
693 				 * maximum lower bound, we can still exclude the 1st and 2nd
694 				 * quadrants if we're looking for a value strictly greater
695 				 * than the maximum.
696 				 */
697 				int			cmp;
698 
699 				cmp = range_cmp_bounds(typcache, &centroidLower, maxLower);
700 				if (cmp > 0 || (!inclusive && cmp == 0))
701 					which &= (1 << 3) | (1 << 4) | (1 << 5);
702 			}
703 			if (minUpper)
704 			{
705 				/*
706 				 * If the centroid's upper bound is less than or equal to the
707 				 * minimum upper bound, anything in the 2nd and 3rd quadrants
708 				 * will have an even smaller upper bound, and thus can't
709 				 * match.
710 				 */
711 				if (range_cmp_bounds(typcache, &centroidUpper, minUpper) <= 0)
712 					which &= (1 << 1) | (1 << 4) | (1 << 5);
713 			}
714 			if (maxUpper)
715 			{
716 				/*
717 				 * If the centroid's upper bound is greater than the maximum
718 				 * upper bound, anything in the 1st and 4th quadrants will
719 				 * also have a greater than or equal upper bound, and thus
720 				 * can't match. If the centroid's upper bound is equal to the
721 				 * maximum upper bound, we can still exclude the 1st and 4th
722 				 * quadrants if we're looking for a value strictly greater
723 				 * than the maximum.
724 				 */
725 				int			cmp;
726 
727 				cmp = range_cmp_bounds(typcache, &centroidUpper, maxUpper);
728 				if (cmp > 0 || (!inclusive && cmp == 0))
729 					which &= (1 << 2) | (1 << 3) | (1 << 5);
730 			}
731 
732 			if (which == 0)
733 				break;			/* no need to consider remaining conditions */
734 		}
735 	}
736 
737 	/* We must descend into the quadrant(s) identified by 'which' */
738 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
739 	if (needPrevious)
740 		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
741 	out->nNodes = 0;
742 
743 	/*
744 	 * Elements of traversalValues should be allocated in
745 	 * traversalMemoryContext
746 	 */
747 	oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext);
748 
749 	for (i = 1; i <= in->nNodes; i++)
750 	{
751 		if (which & (1 << i))
752 		{
753 			/* Save previous prefix if needed */
754 			if (needPrevious)
755 			{
756 				Datum		previousCentroid;
757 
758 				/*
759 				 * We know, that in->prefixDatum in this place is varlena,
760 				 * because it's range
761 				 */
762 				previousCentroid = datumCopy(in->prefixDatum, false, -1);
763 				out->traversalValues[out->nNodes] = (void *) previousCentroid;
764 			}
765 			out->nodeNumbers[out->nNodes] = i - 1;
766 			out->nNodes++;
767 		}
768 	}
769 
770 	MemoryContextSwitchTo(oldCtx);
771 
772 	PG_RETURN_VOID();
773 }
774 
775 /*
776  * adjacent_cmp_bounds
777  *
778  * Given an argument and centroid bound, this function determines if any
779  * bounds that are adjacent to the argument are smaller than, or greater than
780  * or equal to centroid. For brevity, we call the arg < centroid "left", and
781  * arg >= centroid case "right". This corresponds to how the quadrants are
782  * arranged, if you imagine that "left" is equivalent to "down" and "right"
783  * is equivalent to "up".
784  *
785  * For the "left" case, returns -1, and for the "right" case, returns 1.
786  */
787 static int
adjacent_cmp_bounds(TypeCacheEntry * typcache,RangeBound * arg,RangeBound * centroid)788 adjacent_cmp_bounds(TypeCacheEntry *typcache, RangeBound *arg,
789 					RangeBound *centroid)
790 {
791 	int			cmp;
792 
793 	Assert(arg->lower != centroid->lower);
794 
795 	cmp = range_cmp_bounds(typcache, arg, centroid);
796 
797 	if (centroid->lower)
798 	{
799 		/*------
800 		 * The argument is an upper bound, we are searching for adjacent lower
801 		 * bounds. A matching adjacent lower bound must be *larger* than the
802 		 * argument, but only just.
803 		 *
804 		 * The following table illustrates the desired result with a fixed
805 		 * argument bound, and different centroids. The CMP column shows
806 		 * the value of 'cmp' variable, and ADJ shows whether the argument
807 		 * and centroid are adjacent, per bounds_adjacent(). (N) means we
808 		 * don't need to check for that case, because it's implied by CMP.
809 		 * With the argument range [..., 500), the adjacent range we're
810 		 * searching for is [500, ...):
811 		 *
812 		 *	ARGUMENT   CENTROID		CMP   ADJ
813 		 *	[..., 500) [498, ...)	 >	  (N)	[500, ...) is to the right
814 		 *	[..., 500) [499, ...)	 =	  (N)	[500, ...) is to the right
815 		 *	[..., 500) [500, ...)	 <	   Y	[500, ...) is to the right
816 		 *	[..., 500) [501, ...)	 <	   N	[500, ...) is to the left
817 		 *
818 		 * So, we must search left when the argument is smaller than, and not
819 		 * adjacent, to the centroid. Otherwise search right.
820 		 *------
821 		 */
822 		if (cmp < 0 && !bounds_adjacent(typcache, *arg, *centroid))
823 			return -1;
824 		else
825 			return 1;
826 	}
827 	else
828 	{
829 		/*------
830 		 * The argument is a lower bound, we are searching for adjacent upper
831 		 * bounds. A matching adjacent upper bound must be *smaller* than the
832 		 * argument, but only just.
833 		 *
834 		 *	ARGUMENT   CENTROID		CMP   ADJ
835 		 *	[500, ...) [..., 499)	 >	  (N)	[..., 500) is to the right
836 		 *	[500, ...) [..., 500)	 >	  (Y)	[..., 500) is to the right
837 		 *	[500, ...) [..., 501)	 =	  (N)	[..., 500) is to the left
838 		 *	[500, ...) [..., 502)	 <	  (N)	[..., 500) is to the left
839 		 *
840 		 * We must search left when the argument is smaller than or equal to
841 		 * the centroid. Otherwise search right. We don't need to check
842 		 * whether the argument is adjacent with the centroid, because it
843 		 * doesn't matter.
844 		 *------
845 		 */
846 		if (cmp <= 0)
847 			return -1;
848 		else
849 			return 1;
850 	}
851 }
852 
853 /*----------
854  * adjacent_inner_consistent
855  *
856  * Like adjacent_cmp_bounds, but also takes into account the previous
857  * level's centroid. We might've traversed left (or right) at the previous
858  * node, in search for ranges adjacent to the other bound, even though we
859  * already ruled out the possibility for any matches in that direction for
860  * this bound. By comparing the argument with the previous centroid, and
861  * the previous centroid with the current centroid, we can determine which
862  * direction we should've moved in at previous level, and which direction we
863  * actually moved.
864  *
865  * If there can be any matches to the left, returns -1. If to the right,
866  * returns 1. If there can be no matches below this centroid, because we
867  * already ruled them out at the previous level, returns 0.
868  *
869  * XXX: Comparing just the previous and current level isn't foolproof; we
870  * might still search some branches unnecessarily. For example, imagine that
871  * we are searching for value 15, and we traverse the following centroids
872  * (only considering one bound for the moment):
873  *
874  * Level 1: 20
875  * Level 2: 50
876  * Level 3: 25
877  *
878  * At this point, previous centroid is 50, current centroid is 25, and the
879  * target value is to the left. But because we already moved right from
880  * centroid 20 to 50 in the first level, there cannot be any values < 20 in
881  * the current branch. But we don't know that just by looking at the previous
882  * and current centroid, so we traverse left, unnecessarily. The reason we are
883  * down this branch is that we're searching for matches with the *other*
884  * bound. If we kept track of which bound we are searching for explicitly,
885  * instead of deducing that from the previous and current centroid, we could
886  * avoid some unnecessary work.
887  *----------
888  */
889 static int
adjacent_inner_consistent(TypeCacheEntry * typcache,RangeBound * arg,RangeBound * centroid,RangeBound * prev)890 adjacent_inner_consistent(TypeCacheEntry *typcache, RangeBound *arg,
891 						  RangeBound *centroid, RangeBound *prev)
892 {
893 	if (prev)
894 	{
895 		int			prevcmp;
896 		int			cmp;
897 
898 		/*
899 		 * Which direction were we supposed to traverse at previous level,
900 		 * left or right?
901 		 */
902 		prevcmp = adjacent_cmp_bounds(typcache, arg, prev);
903 
904 		/* and which direction did we actually go? */
905 		cmp = range_cmp_bounds(typcache, centroid, prev);
906 
907 		/* if the two don't agree, there's nothing to see here */
908 		if ((prevcmp < 0 && cmp >= 0) || (prevcmp > 0 && cmp < 0))
909 			return 0;
910 	}
911 
912 	return adjacent_cmp_bounds(typcache, arg, centroid);
913 }
914 
915 /*
916  * SP-GiST consistent function for leaf nodes: check leaf value against query
917  * using corresponding function.
918  */
919 Datum
spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)920 spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
921 {
922 	spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
923 	spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
924 	RangeType  *leafRange = DatumGetRangeTypeP(in->leafDatum);
925 	TypeCacheEntry *typcache;
926 	bool		res;
927 	int			i;
928 
929 	/* all tests are exact */
930 	out->recheck = false;
931 
932 	/* leafDatum is what it is... */
933 	out->leafValue = in->leafDatum;
934 
935 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(leafRange));
936 
937 	/* Perform the required comparison(s) */
938 	res = true;
939 	for (i = 0; i < in->nkeys; i++)
940 	{
941 		Datum		keyDatum = in->scankeys[i].sk_argument;
942 
943 		/* Call the function corresponding to the scan strategy */
944 		switch (in->scankeys[i].sk_strategy)
945 		{
946 			case RANGESTRAT_BEFORE:
947 				res = range_before_internal(typcache, leafRange,
948 											DatumGetRangeTypeP(keyDatum));
949 				break;
950 			case RANGESTRAT_OVERLEFT:
951 				res = range_overleft_internal(typcache, leafRange,
952 											  DatumGetRangeTypeP(keyDatum));
953 				break;
954 			case RANGESTRAT_OVERLAPS:
955 				res = range_overlaps_internal(typcache, leafRange,
956 											  DatumGetRangeTypeP(keyDatum));
957 				break;
958 			case RANGESTRAT_OVERRIGHT:
959 				res = range_overright_internal(typcache, leafRange,
960 											   DatumGetRangeTypeP(keyDatum));
961 				break;
962 			case RANGESTRAT_AFTER:
963 				res = range_after_internal(typcache, leafRange,
964 										   DatumGetRangeTypeP(keyDatum));
965 				break;
966 			case RANGESTRAT_ADJACENT:
967 				res = range_adjacent_internal(typcache, leafRange,
968 											  DatumGetRangeTypeP(keyDatum));
969 				break;
970 			case RANGESTRAT_CONTAINS:
971 				res = range_contains_internal(typcache, leafRange,
972 											  DatumGetRangeTypeP(keyDatum));
973 				break;
974 			case RANGESTRAT_CONTAINED_BY:
975 				res = range_contained_by_internal(typcache, leafRange,
976 												  DatumGetRangeTypeP(keyDatum));
977 				break;
978 			case RANGESTRAT_CONTAINS_ELEM:
979 				res = range_contains_elem_internal(typcache, leafRange,
980 												   keyDatum);
981 				break;
982 			case RANGESTRAT_EQ:
983 				res = range_eq_internal(typcache, leafRange,
984 										DatumGetRangeTypeP(keyDatum));
985 				break;
986 			default:
987 				elog(ERROR, "unrecognized range strategy: %d",
988 					 in->scankeys[i].sk_strategy);
989 				break;
990 		}
991 
992 		/*
993 		 * If leaf datum doesn't match to a query key, no need to check
994 		 * subsequent keys.
995 		 */
996 		if (!res)
997 			break;
998 	}
999 
1000 	PG_RETURN_BOOL(res);
1001 }
1002