1 /*-------------------------------------------------------------------------
2 *
3 * rangetypes_spgist.c
4 * implementation of quad tree over ranges mapped to 2d-points for SP-GiST.
5 *
6 * Quad tree is a data structure similar to a binary tree, but is adapted to
7 * 2d data. Each inner node of a quad tree contains a point (centroid) which
8 * divides the 2d-space into 4 quadrants. Each quadrant is associated with a
9 * child node.
10 *
11 * Ranges are mapped to 2d-points so that the lower bound is one dimension,
12 * and the upper bound is another. By convention, we visualize the lower bound
13 * to be the horizontal axis, and upper bound the vertical axis.
14 *
15 * One quirk with this mapping is the handling of empty ranges. An empty range
16 * doesn't have lower and upper bounds, so it cannot be mapped to 2d space in
17 * a straightforward way. To cope with that, the root node can have a 5th
18 * quadrant, which is reserved for empty ranges. Furthermore, there can be
19 * inner nodes in the tree with no centroid. They contain only two child nodes,
20 * one for empty ranges and another for non-empty ones. Such a node can appear
21 * as the root node, or in the tree under the 5th child of the root node (in
22 * which case it will only contain empty nodes).
23 *
24 * The SP-GiST picksplit function uses medians along both axes as the centroid.
25 * This implementation only uses the comparison function of the range element
26 * datatype, therefore it works for any range type.
27 *
28 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
29 * Portions Copyright (c) 1994, Regents of the University of California
30 *
31 * IDENTIFICATION
32 * src/backend/utils/adt/rangetypes_spgist.c
33 *
34 *-------------------------------------------------------------------------
35 */
36
37 #include "postgres.h"
38
39 #include "access/spgist.h"
40 #include "access/stratnum.h"
41 #include "catalog/pg_type.h"
42 #include "utils/builtins.h"
43 #include "utils/datum.h"
44 #include "utils/rangetypes.h"
45
46 static int16 getQuadrant(TypeCacheEntry *typcache, const RangeType *centroid,
47 const RangeType *tst);
48 static int bound_cmp(const void *a, const void *b, void *arg);
49
50 static int adjacent_inner_consistent(TypeCacheEntry *typcache,
51 const RangeBound *arg, const RangeBound *centroid,
52 const RangeBound *prev);
53 static int adjacent_cmp_bounds(TypeCacheEntry *typcache, const RangeBound *arg,
54 const RangeBound *centroid);
55
56 /*
57 * SP-GiST 'config' interface function.
58 */
59 Datum
spg_range_quad_config(PG_FUNCTION_ARGS)60 spg_range_quad_config(PG_FUNCTION_ARGS)
61 {
62 /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
63 spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
64
65 cfg->prefixType = ANYRANGEOID;
66 cfg->labelType = VOIDOID; /* we don't need node labels */
67 cfg->canReturnData = true;
68 cfg->longValuesOK = false;
69 PG_RETURN_VOID();
70 }
71
72 /*----------
73 * Determine which quadrant a 2d-mapped range falls into, relative to the
74 * centroid.
75 *
76 * Quadrants are numbered like this:
77 *
78 * 4 | 1
79 * ----+----
80 * 3 | 2
81 *
82 * Where the lower bound of range is the horizontal axis and upper bound the
83 * vertical axis.
84 *
85 * Ranges on one of the axes are taken to lie in the quadrant with higher value
86 * along perpendicular axis. That is, a value on the horizontal axis is taken
87 * to belong to quadrant 1 or 4, and a value on the vertical axis is taken to
88 * belong to quadrant 1 or 2. A range equal to centroid is taken to lie in
89 * quadrant 1.
90 *
91 * Empty ranges are taken to lie in the special quadrant 5.
92 *----------
93 */
94 static int16
getQuadrant(TypeCacheEntry * typcache,const RangeType * centroid,const RangeType * tst)95 getQuadrant(TypeCacheEntry *typcache, const RangeType *centroid, const RangeType *tst)
96 {
97 RangeBound centroidLower,
98 centroidUpper;
99 bool centroidEmpty;
100 RangeBound lower,
101 upper;
102 bool empty;
103
104 range_deserialize(typcache, centroid, ¢roidLower, ¢roidUpper,
105 ¢roidEmpty);
106 range_deserialize(typcache, tst, &lower, &upper, &empty);
107
108 if (empty)
109 return 5;
110
111 if (range_cmp_bounds(typcache, &lower, ¢roidLower) >= 0)
112 {
113 if (range_cmp_bounds(typcache, &upper, ¢roidUpper) >= 0)
114 return 1;
115 else
116 return 2;
117 }
118 else
119 {
120 if (range_cmp_bounds(typcache, &upper, ¢roidUpper) >= 0)
121 return 4;
122 else
123 return 3;
124 }
125 }
126
127 /*
128 * Choose SP-GiST function: choose path for addition of new range.
129 */
130 Datum
spg_range_quad_choose(PG_FUNCTION_ARGS)131 spg_range_quad_choose(PG_FUNCTION_ARGS)
132 {
133 spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
134 spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
135 RangeType *inRange = DatumGetRangeTypeP(in->datum),
136 *centroid;
137 int16 quadrant;
138 TypeCacheEntry *typcache;
139
140 if (in->allTheSame)
141 {
142 out->resultType = spgMatchNode;
143 /* nodeN will be set by core */
144 out->result.matchNode.levelAdd = 0;
145 out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
146 PG_RETURN_VOID();
147 }
148
149 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(inRange));
150
151 /*
152 * A node with no centroid divides ranges purely on whether they're empty
153 * or not. All empty ranges go to child node 0, all non-empty ranges go to
154 * node 1.
155 */
156 if (!in->hasPrefix)
157 {
158 out->resultType = spgMatchNode;
159 if (RangeIsEmpty(inRange))
160 out->result.matchNode.nodeN = 0;
161 else
162 out->result.matchNode.nodeN = 1;
163 out->result.matchNode.levelAdd = 1;
164 out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
165 PG_RETURN_VOID();
166 }
167
168 centroid = DatumGetRangeTypeP(in->prefixDatum);
169 quadrant = getQuadrant(typcache, centroid, inRange);
170
171 Assert(quadrant <= in->nNodes);
172
173 /* Select node matching to quadrant number */
174 out->resultType = spgMatchNode;
175 out->result.matchNode.nodeN = quadrant - 1;
176 out->result.matchNode.levelAdd = 1;
177 out->result.matchNode.restDatum = RangeTypePGetDatum(inRange);
178
179 PG_RETURN_VOID();
180 }
181
182 /*
183 * Bound comparison for sorting.
184 */
185 static int
bound_cmp(const void * a,const void * b,void * arg)186 bound_cmp(const void *a, const void *b, void *arg)
187 {
188 RangeBound *ba = (RangeBound *) a;
189 RangeBound *bb = (RangeBound *) b;
190 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
191
192 return range_cmp_bounds(typcache, ba, bb);
193 }
194
195 /*
196 * Picksplit SP-GiST function: split ranges into nodes. Select "centroid"
197 * range and distribute ranges according to quadrants.
198 */
199 Datum
spg_range_quad_picksplit(PG_FUNCTION_ARGS)200 spg_range_quad_picksplit(PG_FUNCTION_ARGS)
201 {
202 spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
203 spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
204 int i;
205 int j;
206 int nonEmptyCount;
207 RangeType *centroid;
208 bool empty;
209 TypeCacheEntry *typcache;
210
211 /* Use the median values of lower and upper bounds as the centroid range */
212 RangeBound *lowerBounds,
213 *upperBounds;
214
215 typcache = range_get_typcache(fcinfo,
216 RangeTypeGetOid(DatumGetRangeTypeP(in->datums[0])));
217
218 /* Allocate memory for bounds */
219 lowerBounds = palloc(sizeof(RangeBound) * in->nTuples);
220 upperBounds = palloc(sizeof(RangeBound) * in->nTuples);
221 j = 0;
222
223 /* Deserialize bounds of ranges, count non-empty ranges */
224 for (i = 0; i < in->nTuples; i++)
225 {
226 range_deserialize(typcache, DatumGetRangeTypeP(in->datums[i]),
227 &lowerBounds[j], &upperBounds[j], &empty);
228 if (!empty)
229 j++;
230 }
231 nonEmptyCount = j;
232
233 /*
234 * All the ranges are empty. The best we can do is to construct an inner
235 * node with no centroid, and put all ranges into node 0. If non-empty
236 * ranges are added later, they will be routed to node 1.
237 */
238 if (nonEmptyCount == 0)
239 {
240 out->nNodes = 2;
241 out->hasPrefix = false;
242 /* Prefix is empty */
243 out->prefixDatum = PointerGetDatum(NULL);
244 out->nodeLabels = NULL;
245
246 out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
247 out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
248
249 /* Place all ranges into node 0 */
250 for (i = 0; i < in->nTuples; i++)
251 {
252 RangeType *range = DatumGetRangeTypeP(in->datums[i]);
253
254 out->leafTupleDatums[i] = RangeTypePGetDatum(range);
255 out->mapTuplesToNodes[i] = 0;
256 }
257 PG_RETURN_VOID();
258 }
259
260 /* Sort range bounds in order to find medians */
261 qsort_arg(lowerBounds, nonEmptyCount, sizeof(RangeBound),
262 bound_cmp, typcache);
263 qsort_arg(upperBounds, nonEmptyCount, sizeof(RangeBound),
264 bound_cmp, typcache);
265
266 /* Construct "centroid" range from medians of lower and upper bounds */
267 centroid = range_serialize(typcache, &lowerBounds[nonEmptyCount / 2],
268 &upperBounds[nonEmptyCount / 2], false);
269 out->hasPrefix = true;
270 out->prefixDatum = RangeTypePGetDatum(centroid);
271
272 /* Create node for empty ranges only if it is a root node */
273 out->nNodes = (in->level == 0) ? 5 : 4;
274 out->nodeLabels = NULL; /* we don't need node labels */
275
276 out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
277 out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
278
279 /*
280 * Assign ranges to corresponding nodes according to quadrants relative to
281 * "centroid" range.
282 */
283 for (i = 0; i < in->nTuples; i++)
284 {
285 RangeType *range = DatumGetRangeTypeP(in->datums[i]);
286 int16 quadrant = getQuadrant(typcache, centroid, range);
287
288 out->leafTupleDatums[i] = RangeTypePGetDatum(range);
289 out->mapTuplesToNodes[i] = quadrant - 1;
290 }
291
292 PG_RETURN_VOID();
293 }
294
295 /*
296 * SP-GiST consistent function for inner nodes: check which nodes are
297 * consistent with given set of queries.
298 */
299 Datum
spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)300 spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
301 {
302 spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
303 spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
304 int which;
305 int i;
306 MemoryContext oldCtx;
307
308 /*
309 * For adjacent search we need also previous centroid (if any) to improve
310 * the precision of the consistent check. In this case needPrevious flag
311 * is set and centroid is passed into traversalValue.
312 */
313 bool needPrevious = false;
314
315 if (in->allTheSame)
316 {
317 /* Report that all nodes should be visited */
318 out->nNodes = in->nNodes;
319 out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
320 for (i = 0; i < in->nNodes; i++)
321 out->nodeNumbers[i] = i;
322 PG_RETURN_VOID();
323 }
324
325 if (!in->hasPrefix)
326 {
327 /*
328 * No centroid on this inner node. Such a node has two child nodes,
329 * the first for empty ranges, and the second for non-empty ones.
330 */
331 Assert(in->nNodes == 2);
332
333 /*
334 * Nth bit of which variable means that (N - 1)th node should be
335 * visited. Initially all bits are set. Bits of nodes which should be
336 * skipped will be unset.
337 */
338 which = (1 << 1) | (1 << 2);
339 for (i = 0; i < in->nkeys; i++)
340 {
341 StrategyNumber strategy = in->scankeys[i].sk_strategy;
342 bool empty;
343
344 /*
345 * The only strategy when second argument of operator is not range
346 * is RANGESTRAT_CONTAINS_ELEM.
347 */
348 if (strategy != RANGESTRAT_CONTAINS_ELEM)
349 empty = RangeIsEmpty(DatumGetRangeTypeP(in->scankeys[i].sk_argument));
350 else
351 empty = false;
352
353 switch (strategy)
354 {
355 case RANGESTRAT_BEFORE:
356 case RANGESTRAT_OVERLEFT:
357 case RANGESTRAT_OVERLAPS:
358 case RANGESTRAT_OVERRIGHT:
359 case RANGESTRAT_AFTER:
360 case RANGESTRAT_ADJACENT:
361 /* These strategies return false if any argument is empty */
362 if (empty)
363 which = 0;
364 else
365 which &= (1 << 2);
366 break;
367
368 case RANGESTRAT_CONTAINS:
369
370 /*
371 * All ranges contain an empty range. Only non-empty
372 * ranges can contain a non-empty range.
373 */
374 if (!empty)
375 which &= (1 << 2);
376 break;
377
378 case RANGESTRAT_CONTAINED_BY:
379
380 /*
381 * Only an empty range is contained by an empty range.
382 * Both empty and non-empty ranges can be contained by a
383 * non-empty range.
384 */
385 if (empty)
386 which &= (1 << 1);
387 break;
388
389 case RANGESTRAT_CONTAINS_ELEM:
390 which &= (1 << 2);
391 break;
392
393 case RANGESTRAT_EQ:
394 if (empty)
395 which &= (1 << 1);
396 else
397 which &= (1 << 2);
398 break;
399
400 default:
401 elog(ERROR, "unrecognized range strategy: %d", strategy);
402 break;
403 }
404 if (which == 0)
405 break; /* no need to consider remaining conditions */
406 }
407 }
408 else
409 {
410 RangeBound centroidLower,
411 centroidUpper;
412 bool centroidEmpty;
413 TypeCacheEntry *typcache;
414 RangeType *centroid;
415
416 /* This node has a centroid. Fetch it. */
417 centroid = DatumGetRangeTypeP(in->prefixDatum);
418 typcache = range_get_typcache(fcinfo,
419 RangeTypeGetOid(DatumGetRangeTypeP(centroid)));
420 range_deserialize(typcache, centroid, ¢roidLower, ¢roidUpper,
421 ¢roidEmpty);
422
423 Assert(in->nNodes == 4 || in->nNodes == 5);
424
425 /*
426 * Nth bit of which variable means that (N - 1)th node (Nth quadrant)
427 * should be visited. Initially all bits are set. Bits of nodes which
428 * can be skipped will be unset.
429 */
430 which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
431
432 for (i = 0; i < in->nkeys; i++)
433 {
434 StrategyNumber strategy;
435 RangeBound lower,
436 upper;
437 bool empty;
438 RangeType *range = NULL;
439
440 RangeType *prevCentroid = NULL;
441 RangeBound prevLower,
442 prevUpper;
443 bool prevEmpty;
444
445 /* Restrictions on range bounds according to scan strategy */
446 RangeBound *minLower = NULL,
447 *maxLower = NULL,
448 *minUpper = NULL,
449 *maxUpper = NULL;
450
451 /* Are the restrictions on range bounds inclusive? */
452 bool inclusive = true;
453 bool strictEmpty = true;
454 int cmp,
455 which1,
456 which2;
457
458 strategy = in->scankeys[i].sk_strategy;
459
460 /*
461 * RANGESTRAT_CONTAINS_ELEM is just like RANGESTRAT_CONTAINS, but
462 * the argument is a single element. Expand the single element to
463 * a range containing only the element, and treat it like
464 * RANGESTRAT_CONTAINS.
465 */
466 if (strategy == RANGESTRAT_CONTAINS_ELEM)
467 {
468 lower.inclusive = true;
469 lower.infinite = false;
470 lower.lower = true;
471 lower.val = in->scankeys[i].sk_argument;
472
473 upper.inclusive = true;
474 upper.infinite = false;
475 upper.lower = false;
476 upper.val = in->scankeys[i].sk_argument;
477
478 empty = false;
479
480 strategy = RANGESTRAT_CONTAINS;
481 }
482 else
483 {
484 range = DatumGetRangeTypeP(in->scankeys[i].sk_argument);
485 range_deserialize(typcache, range, &lower, &upper, &empty);
486 }
487
488 /*
489 * Most strategies are handled by forming a bounding box from the
490 * search key, defined by a minLower, maxLower, minUpper,
491 * maxUpper. Some modify 'which' directly, to specify exactly
492 * which quadrants need to be visited.
493 *
494 * For most strategies, nothing matches an empty search key, and
495 * an empty range never matches a non-empty key. If a strategy
496 * does not behave like that wrt. empty ranges, set strictEmpty to
497 * false.
498 */
499 switch (strategy)
500 {
501 case RANGESTRAT_BEFORE:
502
503 /*
504 * Range A is before range B if upper bound of A is lower
505 * than lower bound of B.
506 */
507 maxUpper = &lower;
508 inclusive = false;
509 break;
510
511 case RANGESTRAT_OVERLEFT:
512
513 /*
514 * Range A is overleft to range B if upper bound of A is
515 * less than or equal to upper bound of B.
516 */
517 maxUpper = &upper;
518 break;
519
520 case RANGESTRAT_OVERLAPS:
521
522 /*
523 * Non-empty ranges overlap, if lower bound of each range
524 * is lower or equal to upper bound of the other range.
525 */
526 maxLower = &upper;
527 minUpper = &lower;
528 break;
529
530 case RANGESTRAT_OVERRIGHT:
531
532 /*
533 * Range A is overright to range B if lower bound of A is
534 * greater than or equal to lower bound of B.
535 */
536 minLower = &lower;
537 break;
538
539 case RANGESTRAT_AFTER:
540
541 /*
542 * Range A is after range B if lower bound of A is greater
543 * than upper bound of B.
544 */
545 minLower = &upper;
546 inclusive = false;
547 break;
548
549 case RANGESTRAT_ADJACENT:
550 if (empty)
551 break; /* Skip to strictEmpty check. */
552
553 /*
554 * Previously selected quadrant could exclude possibility
555 * for lower or upper bounds to be adjacent. Deserialize
556 * previous centroid range if present for checking this.
557 */
558 if (in->traversalValue)
559 {
560 prevCentroid = DatumGetRangeTypeP(in->traversalValue);
561 range_deserialize(typcache, prevCentroid,
562 &prevLower, &prevUpper, &prevEmpty);
563 }
564
565 /*
566 * For a range's upper bound to be adjacent to the
567 * argument's lower bound, it will be found along the line
568 * adjacent to (and just below) Y=lower. Therefore, if the
569 * argument's lower bound is less than the centroid's
570 * upper bound, the line falls in quadrants 2 and 3; if
571 * greater, the line falls in quadrants 1 and 4. (see
572 * adjacent_cmp_bounds for description of edge cases).
573 */
574 cmp = adjacent_inner_consistent(typcache, &lower,
575 ¢roidUpper,
576 prevCentroid ? &prevUpper : NULL);
577 if (cmp > 0)
578 which1 = (1 << 1) | (1 << 4);
579 else if (cmp < 0)
580 which1 = (1 << 2) | (1 << 3);
581 else
582 which1 = 0;
583
584 /*
585 * Also search for ranges's adjacent to argument's upper
586 * bound. They will be found along the line adjacent to
587 * (and just right of) X=upper, which falls in quadrants 3
588 * and 4, or 1 and 2.
589 */
590 cmp = adjacent_inner_consistent(typcache, &upper,
591 ¢roidLower,
592 prevCentroid ? &prevLower : NULL);
593 if (cmp > 0)
594 which2 = (1 << 1) | (1 << 2);
595 else if (cmp < 0)
596 which2 = (1 << 3) | (1 << 4);
597 else
598 which2 = 0;
599
600 /* We must chase down ranges adjacent to either bound. */
601 which &= which1 | which2;
602
603 needPrevious = true;
604 break;
605
606 case RANGESTRAT_CONTAINS:
607
608 /*
609 * Non-empty range A contains non-empty range B if lower
610 * bound of A is lower or equal to lower bound of range B
611 * and upper bound of range A is greater than or equal to upper
612 * bound of range A.
613 *
614 * All non-empty ranges contain an empty range.
615 */
616 strictEmpty = false;
617 if (!empty)
618 {
619 which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
620 maxLower = &lower;
621 minUpper = &upper;
622 }
623 break;
624
625 case RANGESTRAT_CONTAINED_BY:
626 /* The opposite of contains. */
627 strictEmpty = false;
628 if (empty)
629 {
630 /* An empty range is only contained by an empty range */
631 which &= (1 << 5);
632 }
633 else
634 {
635 minLower = &lower;
636 maxUpper = &upper;
637 }
638 break;
639
640 case RANGESTRAT_EQ:
641
642 /*
643 * Equal range can be only in the same quadrant where
644 * argument would be placed to.
645 */
646 strictEmpty = false;
647 which &= (1 << getQuadrant(typcache, centroid, range));
648 break;
649
650 default:
651 elog(ERROR, "unrecognized range strategy: %d", strategy);
652 break;
653 }
654
655 if (strictEmpty)
656 {
657 if (empty)
658 {
659 /* Scan key is empty, no branches are satisfying */
660 which = 0;
661 break;
662 }
663 else
664 {
665 /* Shouldn't visit tree branch with empty ranges */
666 which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
667 }
668 }
669
670 /*
671 * Using the bounding box, see which quadrants we have to descend
672 * into.
673 */
674 if (minLower)
675 {
676 /*
677 * If the centroid's lower bound is less than or equal to the
678 * minimum lower bound, anything in the 3rd and 4th quadrants
679 * will have an even smaller lower bound, and thus can't
680 * match.
681 */
682 if (range_cmp_bounds(typcache, ¢roidLower, minLower) <= 0)
683 which &= (1 << 1) | (1 << 2) | (1 << 5);
684 }
685 if (maxLower)
686 {
687 /*
688 * If the centroid's lower bound is greater than the maximum
689 * lower bound, anything in the 1st and 2nd quadrants will
690 * also have a greater than or equal lower bound, and thus
691 * can't match. If the centroid's lower bound is equal to the
692 * maximum lower bound, we can still exclude the 1st and 2nd
693 * quadrants if we're looking for a value strictly greater
694 * than the maximum.
695 */
696 int cmp;
697
698 cmp = range_cmp_bounds(typcache, ¢roidLower, maxLower);
699 if (cmp > 0 || (!inclusive && cmp == 0))
700 which &= (1 << 3) | (1 << 4) | (1 << 5);
701 }
702 if (minUpper)
703 {
704 /*
705 * If the centroid's upper bound is less than or equal to the
706 * minimum upper bound, anything in the 2nd and 3rd quadrants
707 * will have an even smaller upper bound, and thus can't
708 * match.
709 */
710 if (range_cmp_bounds(typcache, ¢roidUpper, minUpper) <= 0)
711 which &= (1 << 1) | (1 << 4) | (1 << 5);
712 }
713 if (maxUpper)
714 {
715 /*
716 * If the centroid's upper bound is greater than the maximum
717 * upper bound, anything in the 1st and 4th quadrants will
718 * also have a greater than or equal upper bound, and thus
719 * can't match. If the centroid's upper bound is equal to the
720 * maximum upper bound, we can still exclude the 1st and 4th
721 * quadrants if we're looking for a value strictly greater
722 * than the maximum.
723 */
724 int cmp;
725
726 cmp = range_cmp_bounds(typcache, ¢roidUpper, maxUpper);
727 if (cmp > 0 || (!inclusive && cmp == 0))
728 which &= (1 << 2) | (1 << 3) | (1 << 5);
729 }
730
731 if (which == 0)
732 break; /* no need to consider remaining conditions */
733 }
734 }
735
736 /* We must descend into the quadrant(s) identified by 'which' */
737 out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
738 if (needPrevious)
739 out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
740 out->nNodes = 0;
741
742 /*
743 * Elements of traversalValues should be allocated in
744 * traversalMemoryContext
745 */
746 oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext);
747
748 for (i = 1; i <= in->nNodes; i++)
749 {
750 if (which & (1 << i))
751 {
752 /* Save previous prefix if needed */
753 if (needPrevious)
754 {
755 Datum previousCentroid;
756
757 /*
758 * We know, that in->prefixDatum in this place is varlena,
759 * because it's range
760 */
761 previousCentroid = datumCopy(in->prefixDatum, false, -1);
762 out->traversalValues[out->nNodes] = (void *) previousCentroid;
763 }
764 out->nodeNumbers[out->nNodes] = i - 1;
765 out->nNodes++;
766 }
767 }
768
769 MemoryContextSwitchTo(oldCtx);
770
771 PG_RETURN_VOID();
772 }
773
774 /*
775 * adjacent_cmp_bounds
776 *
777 * Given an argument and centroid bound, this function determines if any
778 * bounds that are adjacent to the argument are smaller than, or greater than
779 * or equal to centroid. For brevity, we call the arg < centroid "left", and
780 * arg >= centroid case "right". This corresponds to how the quadrants are
781 * arranged, if you imagine that "left" is equivalent to "down" and "right"
782 * is equivalent to "up".
783 *
784 * For the "left" case, returns -1, and for the "right" case, returns 1.
785 */
786 static int
adjacent_cmp_bounds(TypeCacheEntry * typcache,const RangeBound * arg,const RangeBound * centroid)787 adjacent_cmp_bounds(TypeCacheEntry *typcache, const RangeBound *arg,
788 const RangeBound *centroid)
789 {
790 int cmp;
791
792 Assert(arg->lower != centroid->lower);
793
794 cmp = range_cmp_bounds(typcache, arg, centroid);
795
796 if (centroid->lower)
797 {
798 /*------
799 * The argument is an upper bound, we are searching for adjacent lower
800 * bounds. A matching adjacent lower bound must be *larger* than the
801 * argument, but only just.
802 *
803 * The following table illustrates the desired result with a fixed
804 * argument bound, and different centroids. The CMP column shows
805 * the value of 'cmp' variable, and ADJ shows whether the argument
806 * and centroid are adjacent, per bounds_adjacent(). (N) means we
807 * don't need to check for that case, because it's implied by CMP.
808 * With the argument range [..., 500), the adjacent range we're
809 * searching for is [500, ...):
810 *
811 * ARGUMENT CENTROID CMP ADJ
812 * [..., 500) [498, ...) > (N) [500, ...) is to the right
813 * [..., 500) [499, ...) = (N) [500, ...) is to the right
814 * [..., 500) [500, ...) < Y [500, ...) is to the right
815 * [..., 500) [501, ...) < N [500, ...) is to the left
816 *
817 * So, we must search left when the argument is smaller than, and not
818 * adjacent, to the centroid. Otherwise search right.
819 *------
820 */
821 if (cmp < 0 && !bounds_adjacent(typcache, *arg, *centroid))
822 return -1;
823 else
824 return 1;
825 }
826 else
827 {
828 /*------
829 * The argument is a lower bound, we are searching for adjacent upper
830 * bounds. A matching adjacent upper bound must be *smaller* than the
831 * argument, but only just.
832 *
833 * ARGUMENT CENTROID CMP ADJ
834 * [500, ...) [..., 499) > (N) [..., 500) is to the right
835 * [500, ...) [..., 500) > (Y) [..., 500) is to the right
836 * [500, ...) [..., 501) = (N) [..., 500) is to the left
837 * [500, ...) [..., 502) < (N) [..., 500) is to the left
838 *
839 * We must search left when the argument is smaller than or equal to
840 * the centroid. Otherwise search right. We don't need to check
841 * whether the argument is adjacent with the centroid, because it
842 * doesn't matter.
843 *------
844 */
845 if (cmp <= 0)
846 return -1;
847 else
848 return 1;
849 }
850 }
851
852 /*----------
853 * adjacent_inner_consistent
854 *
855 * Like adjacent_cmp_bounds, but also takes into account the previous
856 * level's centroid. We might've traversed left (or right) at the previous
857 * node, in search for ranges adjacent to the other bound, even though we
858 * already ruled out the possibility for any matches in that direction for
859 * this bound. By comparing the argument with the previous centroid, and
860 * the previous centroid with the current centroid, we can determine which
861 * direction we should've moved in at previous level, and which direction we
862 * actually moved.
863 *
864 * If there can be any matches to the left, returns -1. If to the right,
865 * returns 1. If there can be no matches below this centroid, because we
866 * already ruled them out at the previous level, returns 0.
867 *
868 * XXX: Comparing just the previous and current level isn't foolproof; we
869 * might still search some branches unnecessarily. For example, imagine that
870 * we are searching for value 15, and we traverse the following centroids
871 * (only considering one bound for the moment):
872 *
873 * Level 1: 20
874 * Level 2: 50
875 * Level 3: 25
876 *
877 * At this point, previous centroid is 50, current centroid is 25, and the
878 * target value is to the left. But because we already moved right from
879 * centroid 20 to 50 in the first level, there cannot be any values < 20 in
880 * the current branch. But we don't know that just by looking at the previous
881 * and current centroid, so we traverse left, unnecessarily. The reason we are
882 * down this branch is that we're searching for matches with the *other*
883 * bound. If we kept track of which bound we are searching for explicitly,
884 * instead of deducing that from the previous and current centroid, we could
885 * avoid some unnecessary work.
886 *----------
887 */
888 static int
adjacent_inner_consistent(TypeCacheEntry * typcache,const RangeBound * arg,const RangeBound * centroid,const RangeBound * prev)889 adjacent_inner_consistent(TypeCacheEntry *typcache, const RangeBound *arg,
890 const RangeBound *centroid, const RangeBound *prev)
891 {
892 if (prev)
893 {
894 int prevcmp;
895 int cmp;
896
897 /*
898 * Which direction were we supposed to traverse at previous level,
899 * left or right?
900 */
901 prevcmp = adjacent_cmp_bounds(typcache, arg, prev);
902
903 /* and which direction did we actually go? */
904 cmp = range_cmp_bounds(typcache, centroid, prev);
905
906 /* if the two don't agree, there's nothing to see here */
907 if ((prevcmp < 0 && cmp >= 0) || (prevcmp > 0 && cmp < 0))
908 return 0;
909 }
910
911 return adjacent_cmp_bounds(typcache, arg, centroid);
912 }
913
914 /*
915 * SP-GiST consistent function for leaf nodes: check leaf value against query
916 * using corresponding function.
917 */
918 Datum
spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)919 spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
920 {
921 spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
922 spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
923 RangeType *leafRange = DatumGetRangeTypeP(in->leafDatum);
924 TypeCacheEntry *typcache;
925 bool res;
926 int i;
927
928 /* all tests are exact */
929 out->recheck = false;
930
931 /* leafDatum is what it is... */
932 out->leafValue = in->leafDatum;
933
934 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(leafRange));
935
936 /* Perform the required comparison(s) */
937 res = true;
938 for (i = 0; i < in->nkeys; i++)
939 {
940 Datum keyDatum = in->scankeys[i].sk_argument;
941
942 /* Call the function corresponding to the scan strategy */
943 switch (in->scankeys[i].sk_strategy)
944 {
945 case RANGESTRAT_BEFORE:
946 res = range_before_internal(typcache, leafRange,
947 DatumGetRangeTypeP(keyDatum));
948 break;
949 case RANGESTRAT_OVERLEFT:
950 res = range_overleft_internal(typcache, leafRange,
951 DatumGetRangeTypeP(keyDatum));
952 break;
953 case RANGESTRAT_OVERLAPS:
954 res = range_overlaps_internal(typcache, leafRange,
955 DatumGetRangeTypeP(keyDatum));
956 break;
957 case RANGESTRAT_OVERRIGHT:
958 res = range_overright_internal(typcache, leafRange,
959 DatumGetRangeTypeP(keyDatum));
960 break;
961 case RANGESTRAT_AFTER:
962 res = range_after_internal(typcache, leafRange,
963 DatumGetRangeTypeP(keyDatum));
964 break;
965 case RANGESTRAT_ADJACENT:
966 res = range_adjacent_internal(typcache, leafRange,
967 DatumGetRangeTypeP(keyDatum));
968 break;
969 case RANGESTRAT_CONTAINS:
970 res = range_contains_internal(typcache, leafRange,
971 DatumGetRangeTypeP(keyDatum));
972 break;
973 case RANGESTRAT_CONTAINED_BY:
974 res = range_contained_by_internal(typcache, leafRange,
975 DatumGetRangeTypeP(keyDatum));
976 break;
977 case RANGESTRAT_CONTAINS_ELEM:
978 res = range_contains_elem_internal(typcache, leafRange,
979 keyDatum);
980 break;
981 case RANGESTRAT_EQ:
982 res = range_eq_internal(typcache, leafRange,
983 DatumGetRangeTypeP(keyDatum));
984 break;
985 default:
986 elog(ERROR, "unrecognized range strategy: %d",
987 in->scankeys[i].sk_strategy);
988 break;
989 }
990
991 /*
992 * If leaf datum doesn't match to a query key, no need to check
993 * subsequent keys.
994 */
995 if (!res)
996 break;
997 }
998
999 PG_RETURN_BOOL(res);
1000 }
1001