1 /*-------------------------------------------------------------------------
2 *
3 * rangetypes_gist.c
4 * GiST support for range types.
5 *
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/utils/adt/rangetypes_gist.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/gist.h"
18 #include "access/stratnum.h"
19 #include "utils/builtins.h"
20 #include "utils/datum.h"
21 #include "utils/rangetypes.h"
22
23
24 /*
25 * Range class properties used to segregate different classes of ranges in
26 * GiST. Each unique combination of properties is a class. CLS_EMPTY cannot
27 * be combined with anything else.
28 */
29 #define CLS_NORMAL 0 /* Ordinary finite range (no bits set) */
30 #define CLS_LOWER_INF 1 /* Lower bound is infinity */
31 #define CLS_UPPER_INF 2 /* Upper bound is infinity */
32 #define CLS_CONTAIN_EMPTY 4 /* Contains underlying empty ranges */
33 #define CLS_EMPTY 8 /* Special class for empty ranges */
34
35 #define CLS_COUNT 9 /* # of classes; includes all combinations of
36 * properties. CLS_EMPTY doesn't combine with
37 * anything else, so it's only 2^3 + 1. */
38
39 /*
40 * Minimum accepted ratio of split for items of the same class. If the items
41 * are of different classes, we will separate along those lines regardless of
42 * the ratio.
43 */
44 #define LIMIT_RATIO 0.3
45
46 /* Constants for fixed penalty values */
47 #define INFINITE_BOUND_PENALTY 2.0
48 #define CONTAIN_EMPTY_PENALTY 1.0
49 #define DEFAULT_SUBTYPE_DIFF_PENALTY 1.0
50
51 /*
52 * Per-item data for range_gist_single_sorting_split.
53 */
54 typedef struct
55 {
56 int index;
57 RangeBound bound;
58 } SingleBoundSortItem;
59
60 /* place on left or right side of split? */
61 typedef enum
62 {
63 SPLIT_LEFT = 0, /* makes initialization to SPLIT_LEFT easier */
64 SPLIT_RIGHT
65 } SplitLR;
66
67 /*
68 * Context for range_gist_consider_split.
69 */
70 typedef struct
71 {
72 TypeCacheEntry *typcache; /* typcache for range type */
73 bool has_subtype_diff; /* does it have subtype_diff? */
74 int entries_count; /* total number of entries being split */
75
76 /* Information about currently selected split follows */
77
78 bool first; /* true if no split was selected yet */
79
80 RangeBound *left_upper; /* upper bound of left interval */
81 RangeBound *right_lower; /* lower bound of right interval */
82
83 float4 ratio; /* split ratio */
84 float4 overlap; /* overlap between left and right predicate */
85 int common_left; /* # common entries destined for each side */
86 int common_right;
87 } ConsiderSplitContext;
88
89 /*
90 * Bounds extracted from a non-empty range, for use in
91 * range_gist_double_sorting_split.
92 */
93 typedef struct
94 {
95 RangeBound lower;
96 RangeBound upper;
97 } NonEmptyRange;
98
99 /*
100 * Represents information about an entry that can be placed in either group
101 * without affecting overlap over selected axis ("common entry").
102 */
103 typedef struct
104 {
105 /* Index of entry in the initial array */
106 int index;
107 /* Delta between closeness of range to each of the two groups */
108 double delta;
109 } CommonEntry;
110
111 /* Helper macros to place an entry in the left or right group during split */
112 /* Note direct access to variables v, typcache, left_range, right_range */
113 #define PLACE_LEFT(range, off) \
114 do { \
115 if (v->spl_nleft > 0) \
116 left_range = range_super_union(typcache, left_range, range); \
117 else \
118 left_range = (range); \
119 v->spl_left[v->spl_nleft++] = (off); \
120 } while(0)
121
122 #define PLACE_RIGHT(range, off) \
123 do { \
124 if (v->spl_nright > 0) \
125 right_range = range_super_union(typcache, right_range, range); \
126 else \
127 right_range = (range); \
128 v->spl_right[v->spl_nright++] = (off); \
129 } while(0)
130
131 /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
132 #define rangeCopy(r) \
133 ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
134 false, -1)))
135
136 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1,
137 RangeType *r2);
138 static bool range_gist_consistent_int(TypeCacheEntry *typcache,
139 StrategyNumber strategy, RangeType *key,
140 Datum query);
141 static bool range_gist_consistent_leaf(TypeCacheEntry *typcache,
142 StrategyNumber strategy, RangeType *key,
143 Datum query);
144 static void range_gist_fallback_split(TypeCacheEntry *typcache,
145 GistEntryVector *entryvec,
146 GIST_SPLITVEC *v);
147 static void range_gist_class_split(TypeCacheEntry *typcache,
148 GistEntryVector *entryvec,
149 GIST_SPLITVEC *v,
150 SplitLR *classes_groups);
151 static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
152 GistEntryVector *entryvec,
153 GIST_SPLITVEC *v,
154 bool use_upper_bound);
155 static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
156 GistEntryVector *entryvec,
157 GIST_SPLITVEC *v);
158 static void range_gist_consider_split(ConsiderSplitContext *context,
159 RangeBound *right_lower, int min_left_count,
160 RangeBound *left_upper, int max_left_count);
161 static int get_gist_range_class(RangeType *range);
162 static int single_bound_cmp(const void *a, const void *b, void *arg);
163 static int interval_cmp_lower(const void *a, const void *b, void *arg);
164 static int interval_cmp_upper(const void *a, const void *b, void *arg);
165 static int common_entry_cmp(const void *i1, const void *i2);
166 static float8 call_subtype_diff(TypeCacheEntry *typcache,
167 Datum val1, Datum val2);
168
169
170 /* GiST query consistency check */
171 Datum
range_gist_consistent(PG_FUNCTION_ARGS)172 range_gist_consistent(PG_FUNCTION_ARGS)
173 {
174 GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
175 Datum query = PG_GETARG_DATUM(1);
176 StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
177
178 /* Oid subtype = PG_GETARG_OID(3); */
179 bool *recheck = (bool *) PG_GETARG_POINTER(4);
180 RangeType *key = DatumGetRangeType(entry->key);
181 TypeCacheEntry *typcache;
182
183 /* All operators served by this function are exact */
184 *recheck = false;
185
186 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
187
188 if (GIST_LEAF(entry))
189 PG_RETURN_BOOL(range_gist_consistent_leaf(typcache, strategy,
190 key, query));
191 else
192 PG_RETURN_BOOL(range_gist_consistent_int(typcache, strategy,
193 key, query));
194 }
195
196 /* form union range */
197 Datum
range_gist_union(PG_FUNCTION_ARGS)198 range_gist_union(PG_FUNCTION_ARGS)
199 {
200 GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
201 GISTENTRY *ent = entryvec->vector;
202 RangeType *result_range;
203 TypeCacheEntry *typcache;
204 int i;
205
206 result_range = DatumGetRangeType(ent[0].key);
207
208 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
209
210 for (i = 1; i < entryvec->n; i++)
211 {
212 result_range = range_super_union(typcache, result_range,
213 DatumGetRangeType(ent[i].key));
214 }
215
216 PG_RETURN_RANGE(result_range);
217 }
218
219 /* compress, decompress, fetch are no-ops */
220 Datum
range_gist_compress(PG_FUNCTION_ARGS)221 range_gist_compress(PG_FUNCTION_ARGS)
222 {
223 GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
224
225 PG_RETURN_POINTER(entry);
226 }
227
228 Datum
range_gist_decompress(PG_FUNCTION_ARGS)229 range_gist_decompress(PG_FUNCTION_ARGS)
230 {
231 GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
232
233 PG_RETURN_POINTER(entry);
234 }
235
236 Datum
range_gist_fetch(PG_FUNCTION_ARGS)237 range_gist_fetch(PG_FUNCTION_ARGS)
238 {
239 GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
240
241 PG_RETURN_POINTER(entry);
242 }
243
244 /*
245 * GiST page split penalty function.
246 *
247 * The penalty function has the following goals (in order from most to least
248 * important):
249 * - Keep normal ranges separate
250 * - Avoid broadening the class of the original predicate
251 * - Avoid broadening (as determined by subtype_diff) the original predicate
252 * - Favor adding ranges to narrower original predicates
253 */
254 Datum
range_gist_penalty(PG_FUNCTION_ARGS)255 range_gist_penalty(PG_FUNCTION_ARGS)
256 {
257 GISTENTRY *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
258 GISTENTRY *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
259 float *penalty = (float *) PG_GETARG_POINTER(2);
260 RangeType *orig = DatumGetRangeType(origentry->key);
261 RangeType *new = DatumGetRangeType(newentry->key);
262 TypeCacheEntry *typcache;
263 bool has_subtype_diff;
264 RangeBound orig_lower,
265 new_lower,
266 orig_upper,
267 new_upper;
268 bool orig_empty,
269 new_empty;
270
271 if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
272 elog(ERROR, "range types do not match");
273
274 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
275
276 has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
277
278 range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
279 range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
280
281 /*
282 * Distinct branches for handling distinct classes of ranges. Note that
283 * penalty values only need to be commensurate within the same class of
284 * new range.
285 */
286 if (new_empty)
287 {
288 /* Handle insertion of empty range */
289 if (orig_empty)
290 {
291 /*
292 * The best case is to insert it to empty original range.
293 * Insertion here means no broadening of original range. Also
294 * original range is the most narrow.
295 */
296 *penalty = 0.0;
297 }
298 else if (RangeIsOrContainsEmpty(orig))
299 {
300 /*
301 * The second case is to insert empty range into range which
302 * contains at least one underlying empty range. There is still
303 * no broadening of original range, but original range is not as
304 * narrow as possible.
305 */
306 *penalty = CONTAIN_EMPTY_PENALTY;
307 }
308 else if (orig_lower.infinite && orig_upper.infinite)
309 {
310 /*
311 * Original range requires broadening. (-inf; +inf) is most far
312 * from normal range in this case.
313 */
314 *penalty = 2 * CONTAIN_EMPTY_PENALTY;
315 }
316 else if (orig_lower.infinite || orig_upper.infinite)
317 {
318 /*
319 * (-inf, x) or (x, +inf) original ranges are closer to normal
320 * ranges, so it's worse to mix it with empty ranges.
321 */
322 *penalty = 3 * CONTAIN_EMPTY_PENALTY;
323 }
324 else
325 {
326 /*
327 * The least preferred case is broadening of normal range.
328 */
329 *penalty = 4 * CONTAIN_EMPTY_PENALTY;
330 }
331 }
332 else if (new_lower.infinite && new_upper.infinite)
333 {
334 /* Handle insertion of (-inf, +inf) range */
335 if (orig_lower.infinite && orig_upper.infinite)
336 {
337 /*
338 * Best case is inserting to (-inf, +inf) original range.
339 */
340 *penalty = 0.0;
341 }
342 else if (orig_lower.infinite || orig_upper.infinite)
343 {
344 /*
345 * When original range is (-inf, x) or (x, +inf) it requires
346 * broadening of original range (extension of one bound to
347 * infinity).
348 */
349 *penalty = INFINITE_BOUND_PENALTY;
350 }
351 else
352 {
353 /*
354 * Insertion to normal original range is least preferred.
355 */
356 *penalty = 2 * INFINITE_BOUND_PENALTY;
357 }
358
359 if (RangeIsOrContainsEmpty(orig))
360 {
361 /*
362 * Original range is narrower when it doesn't contain empty
363 * ranges. Add additional penalty otherwise.
364 */
365 *penalty += CONTAIN_EMPTY_PENALTY;
366 }
367 }
368 else if (new_lower.infinite)
369 {
370 /* Handle insertion of (-inf, x) range */
371 if (!orig_empty && orig_lower.infinite)
372 {
373 if (orig_upper.infinite)
374 {
375 /*
376 * (-inf, +inf) range won't be extended by insertion of (-inf,
377 * x) range. It's a less desirable case than insertion to
378 * (-inf, y) original range without extension, because in that
379 * case original range is narrower. But we can't express that
380 * in single float value.
381 */
382 *penalty = 0.0;
383 }
384 else
385 {
386 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
387 {
388 /*
389 * Get extension of original range using subtype_diff. Use
390 * constant if subtype_diff unavailable.
391 */
392 if (has_subtype_diff)
393 *penalty = call_subtype_diff(typcache,
394 new_upper.val,
395 orig_upper.val);
396 else
397 *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
398 }
399 else
400 {
401 /* No extension of original range */
402 *penalty = 0.0;
403 }
404 }
405 }
406 else
407 {
408 /*
409 * If lower bound of original range is not -inf, then extension of
410 * it is infinity.
411 */
412 *penalty = get_float4_infinity();
413 }
414 }
415 else if (new_upper.infinite)
416 {
417 /* Handle insertion of (x, +inf) range */
418 if (!orig_empty && orig_upper.infinite)
419 {
420 if (orig_lower.infinite)
421 {
422 /*
423 * (-inf, +inf) range won't be extended by insertion of (x,
424 * +inf) range. It's a less desirable case than insertion to
425 * (y, +inf) original range without extension, because in that
426 * case original range is narrower. But we can't express that
427 * in single float value.
428 */
429 *penalty = 0.0;
430 }
431 else
432 {
433 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
434 {
435 /*
436 * Get extension of original range using subtype_diff. Use
437 * constant if subtype_diff unavailable.
438 */
439 if (has_subtype_diff)
440 *penalty = call_subtype_diff(typcache,
441 orig_lower.val,
442 new_lower.val);
443 else
444 *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
445 }
446 else
447 {
448 /* No extension of original range */
449 *penalty = 0.0;
450 }
451 }
452 }
453 else
454 {
455 /*
456 * If upper bound of original range is not +inf, then extension of
457 * it is infinity.
458 */
459 *penalty = get_float4_infinity();
460 }
461 }
462 else
463 {
464 /* Handle insertion of normal (non-empty, non-infinite) range */
465 if (orig_empty || orig_lower.infinite || orig_upper.infinite)
466 {
467 /*
468 * Avoid mixing normal ranges with infinite and empty ranges.
469 */
470 *penalty = get_float4_infinity();
471 }
472 else
473 {
474 /*
475 * Calculate extension of original range by calling subtype_diff.
476 * Use constant if subtype_diff unavailable.
477 */
478 float8 diff = 0.0;
479
480 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
481 {
482 if (has_subtype_diff)
483 diff += call_subtype_diff(typcache,
484 orig_lower.val,
485 new_lower.val);
486 else
487 diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
488 }
489 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
490 {
491 if (has_subtype_diff)
492 diff += call_subtype_diff(typcache,
493 new_upper.val,
494 orig_upper.val);
495 else
496 diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
497 }
498 *penalty = diff;
499 }
500 }
501
502 PG_RETURN_POINTER(penalty);
503 }
504
505 /*
506 * The GiST PickSplit method for ranges
507 *
508 * Primarily, we try to segregate ranges of different classes. If splitting
509 * ranges of the same class, use the appropriate split method for that class.
510 */
511 Datum
range_gist_picksplit(PG_FUNCTION_ARGS)512 range_gist_picksplit(PG_FUNCTION_ARGS)
513 {
514 GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
515 GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
516 TypeCacheEntry *typcache;
517 OffsetNumber i;
518 RangeType *pred_left;
519 int nbytes;
520 OffsetNumber maxoff;
521 int count_in_classes[CLS_COUNT];
522 int j;
523 int non_empty_classes_count = 0;
524 int biggest_class = -1;
525 int biggest_class_count = 0;
526 int total_count;
527
528 /* use first item to look up range type's info */
529 pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
530 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
531
532 maxoff = entryvec->n - 1;
533 nbytes = (maxoff + 1) * sizeof(OffsetNumber);
534 v->spl_left = (OffsetNumber *) palloc(nbytes);
535 v->spl_right = (OffsetNumber *) palloc(nbytes);
536
537 /*
538 * Get count distribution of range classes.
539 */
540 memset(count_in_classes, 0, sizeof(count_in_classes));
541 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
542 {
543 RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
544
545 count_in_classes[get_gist_range_class(range)]++;
546 }
547
548 /*
549 * Count non-empty classes and find biggest class.
550 */
551 total_count = maxoff;
552 for (j = 0; j < CLS_COUNT; j++)
553 {
554 if (count_in_classes[j] > 0)
555 {
556 if (count_in_classes[j] > biggest_class_count)
557 {
558 biggest_class_count = count_in_classes[j];
559 biggest_class = j;
560 }
561 non_empty_classes_count++;
562 }
563 }
564
565 Assert(non_empty_classes_count > 0);
566
567 if (non_empty_classes_count == 1)
568 {
569 /* One non-empty class, so split inside class */
570 if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
571 {
572 /* double sorting split for normal ranges */
573 range_gist_double_sorting_split(typcache, entryvec, v);
574 }
575 else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
576 {
577 /* upper bound sorting split for (-inf, x) ranges */
578 range_gist_single_sorting_split(typcache, entryvec, v, true);
579 }
580 else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
581 {
582 /* lower bound sorting split for (x, +inf) ranges */
583 range_gist_single_sorting_split(typcache, entryvec, v, false);
584 }
585 else
586 {
587 /* trivial split for all (-inf, +inf) or all empty ranges */
588 range_gist_fallback_split(typcache, entryvec, v);
589 }
590 }
591 else
592 {
593 /*
594 * Class based split.
595 *
596 * To which side of the split should each class go? Initialize them
597 * all to go to the left side.
598 */
599 SplitLR classes_groups[CLS_COUNT];
600
601 memset(classes_groups, 0, sizeof(classes_groups));
602
603 if (count_in_classes[CLS_NORMAL] > 0)
604 {
605 /* separate normal ranges if any */
606 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
607 }
608 else
609 {
610 /*----------
611 * Try to split classes in one of two ways:
612 * 1) containing infinities - not containing infinities
613 * 2) containing empty - not containing empty
614 *
615 * Select the way which balances the ranges between left and right
616 * the best. If split in these ways is not possible, there are at
617 * most 3 classes, so just separate biggest class.
618 *----------
619 */
620 int infCount,
621 nonInfCount;
622 int emptyCount,
623 nonEmptyCount;
624
625 nonInfCount =
626 count_in_classes[CLS_NORMAL] +
627 count_in_classes[CLS_CONTAIN_EMPTY] +
628 count_in_classes[CLS_EMPTY];
629 infCount = total_count - nonInfCount;
630
631 nonEmptyCount =
632 count_in_classes[CLS_NORMAL] +
633 count_in_classes[CLS_LOWER_INF] +
634 count_in_classes[CLS_UPPER_INF] +
635 count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
636 emptyCount = total_count - nonEmptyCount;
637
638 if (infCount > 0 && nonInfCount > 0 &&
639 (Abs(infCount - nonInfCount) <=
640 Abs(emptyCount - nonEmptyCount)))
641 {
642 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
643 classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
644 classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
645 }
646 else if (emptyCount > 0 && nonEmptyCount > 0)
647 {
648 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
649 classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
650 classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
651 classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
652 }
653 else
654 {
655 /*
656 * Either total_count == emptyCount or total_count ==
657 * infCount.
658 */
659 classes_groups[biggest_class] = SPLIT_RIGHT;
660 }
661 }
662
663 range_gist_class_split(typcache, entryvec, v, classes_groups);
664 }
665
666 PG_RETURN_POINTER(v);
667 }
668
669 /* equality comparator for GiST */
670 Datum
range_gist_same(PG_FUNCTION_ARGS)671 range_gist_same(PG_FUNCTION_ARGS)
672 {
673 RangeType *r1 = PG_GETARG_RANGE(0);
674 RangeType *r2 = PG_GETARG_RANGE(1);
675 bool *result = (bool *) PG_GETARG_POINTER(2);
676
677 /*
678 * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check
679 * that for ourselves. More generally, if the entries have been properly
680 * normalized, then unequal flags bytes must mean unequal ranges ... so
681 * let's just test all the flag bits at once.
682 */
683 if (range_get_flags(r1) != range_get_flags(r2))
684 *result = false;
685 else
686 {
687 TypeCacheEntry *typcache;
688
689 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
690
691 *result = range_eq_internal(typcache, r1, r2);
692 }
693
694 PG_RETURN_POINTER(result);
695 }
696
697 /*
698 *----------------------------------------------------------
699 * STATIC FUNCTIONS
700 *----------------------------------------------------------
701 */
702
703 /*
704 * Return the smallest range that contains r1 and r2
705 *
706 * This differs from regular range_union in two critical ways:
707 * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb
708 * the intervening values into the result range.
709 * 2. We track whether any empty range has been union'd into the result,
710 * so that contained_by searches can be indexed. Note that this means
711 * that *all* unions formed within the GiST index must go through here.
712 */
713 static RangeType *
range_super_union(TypeCacheEntry * typcache,RangeType * r1,RangeType * r2)714 range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
715 {
716 RangeType *result;
717 RangeBound lower1,
718 lower2;
719 RangeBound upper1,
720 upper2;
721 bool empty1,
722 empty2;
723 char flags1,
724 flags2;
725 RangeBound *result_lower;
726 RangeBound *result_upper;
727
728 range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
729 range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
730 flags1 = range_get_flags(r1);
731 flags2 = range_get_flags(r2);
732
733 if (empty1)
734 {
735 /* We can return r2 as-is if it already is or contains empty */
736 if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
737 return r2;
738 /* Else we'd better copy it (modify-in-place isn't safe) */
739 r2 = rangeCopy(r2);
740 range_set_contain_empty(r2);
741 return r2;
742 }
743 if (empty2)
744 {
745 /* We can return r1 as-is if it already is or contains empty */
746 if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
747 return r1;
748 /* Else we'd better copy it (modify-in-place isn't safe) */
749 r1 = rangeCopy(r1);
750 range_set_contain_empty(r1);
751 return r1;
752 }
753
754 if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
755 result_lower = &lower1;
756 else
757 result_lower = &lower2;
758
759 if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
760 result_upper = &upper1;
761 else
762 result_upper = &upper2;
763
764 /* optimization to avoid constructing a new range */
765 if (result_lower == &lower1 && result_upper == &upper1 &&
766 ((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY)))
767 return r1;
768 if (result_lower == &lower2 && result_upper == &upper2 &&
769 ((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY)))
770 return r2;
771
772 result = make_range(typcache, result_lower, result_upper, false);
773
774 if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY))
775 range_set_contain_empty(result);
776
777 return result;
778 }
779
780 /*
781 * GiST consistent test on an index internal page
782 */
783 static bool
range_gist_consistent_int(TypeCacheEntry * typcache,StrategyNumber strategy,RangeType * key,Datum query)784 range_gist_consistent_int(TypeCacheEntry *typcache, StrategyNumber strategy,
785 RangeType *key, Datum query)
786 {
787 switch (strategy)
788 {
789 case RANGESTRAT_BEFORE:
790 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
791 return false;
792 return (!range_overright_internal(typcache, key,
793 DatumGetRangeType(query)));
794 case RANGESTRAT_OVERLEFT:
795 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
796 return false;
797 return (!range_after_internal(typcache, key,
798 DatumGetRangeType(query)));
799 case RANGESTRAT_OVERLAPS:
800 return range_overlaps_internal(typcache, key,
801 DatumGetRangeType(query));
802 case RANGESTRAT_OVERRIGHT:
803 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
804 return false;
805 return (!range_before_internal(typcache, key,
806 DatumGetRangeType(query)));
807 case RANGESTRAT_AFTER:
808 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
809 return false;
810 return (!range_overleft_internal(typcache, key,
811 DatumGetRangeType(query)));
812 case RANGESTRAT_ADJACENT:
813 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
814 return false;
815 if (range_adjacent_internal(typcache, key,
816 DatumGetRangeType(query)))
817 return true;
818 return range_overlaps_internal(typcache, key,
819 DatumGetRangeType(query));
820 case RANGESTRAT_CONTAINS:
821 return range_contains_internal(typcache, key,
822 DatumGetRangeType(query));
823 case RANGESTRAT_CONTAINED_BY:
824
825 /*
826 * Empty ranges are contained by anything, so if key is or
827 * contains any empty ranges, we must descend into it. Otherwise,
828 * descend only if key overlaps the query.
829 */
830 if (RangeIsOrContainsEmpty(key))
831 return true;
832 return range_overlaps_internal(typcache, key,
833 DatumGetRangeType(query));
834 case RANGESTRAT_CONTAINS_ELEM:
835 return range_contains_elem_internal(typcache, key, query);
836 case RANGESTRAT_EQ:
837
838 /*
839 * If query is empty, descend only if the key is or contains any
840 * empty ranges. Otherwise, descend if key contains query.
841 */
842 if (RangeIsEmpty(DatumGetRangeType(query)))
843 return RangeIsOrContainsEmpty(key);
844 return range_contains_internal(typcache, key,
845 DatumGetRangeType(query));
846 default:
847 elog(ERROR, "unrecognized range strategy: %d", strategy);
848 return false; /* keep compiler quiet */
849 }
850 }
851
852 /*
853 * GiST consistent test on an index leaf page
854 */
855 static bool
range_gist_consistent_leaf(TypeCacheEntry * typcache,StrategyNumber strategy,RangeType * key,Datum query)856 range_gist_consistent_leaf(TypeCacheEntry *typcache, StrategyNumber strategy,
857 RangeType *key, Datum query)
858 {
859 switch (strategy)
860 {
861 case RANGESTRAT_BEFORE:
862 return range_before_internal(typcache, key,
863 DatumGetRangeType(query));
864 case RANGESTRAT_OVERLEFT:
865 return range_overleft_internal(typcache, key,
866 DatumGetRangeType(query));
867 case RANGESTRAT_OVERLAPS:
868 return range_overlaps_internal(typcache, key,
869 DatumGetRangeType(query));
870 case RANGESTRAT_OVERRIGHT:
871 return range_overright_internal(typcache, key,
872 DatumGetRangeType(query));
873 case RANGESTRAT_AFTER:
874 return range_after_internal(typcache, key,
875 DatumGetRangeType(query));
876 case RANGESTRAT_ADJACENT:
877 return range_adjacent_internal(typcache, key,
878 DatumGetRangeType(query));
879 case RANGESTRAT_CONTAINS:
880 return range_contains_internal(typcache, key,
881 DatumGetRangeType(query));
882 case RANGESTRAT_CONTAINED_BY:
883 return range_contained_by_internal(typcache, key,
884 DatumGetRangeType(query));
885 case RANGESTRAT_CONTAINS_ELEM:
886 return range_contains_elem_internal(typcache, key, query);
887 case RANGESTRAT_EQ:
888 return range_eq_internal(typcache, key, DatumGetRangeType(query));
889 default:
890 elog(ERROR, "unrecognized range strategy: %d", strategy);
891 return false; /* keep compiler quiet */
892 }
893 }
894
895 /*
896 * Trivial split: half of entries will be placed on one page
897 * and the other half on the other page.
898 */
899 static void
range_gist_fallback_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v)900 range_gist_fallback_split(TypeCacheEntry *typcache,
901 GistEntryVector *entryvec,
902 GIST_SPLITVEC *v)
903 {
904 RangeType *left_range = NULL;
905 RangeType *right_range = NULL;
906 OffsetNumber i,
907 maxoff,
908 split_idx;
909
910 maxoff = entryvec->n - 1;
911 /* Split entries before this to left page, after to right: */
912 split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
913
914 v->spl_nleft = 0;
915 v->spl_nright = 0;
916 for (i = FirstOffsetNumber; i <= maxoff; i++)
917 {
918 RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
919
920 if (i < split_idx)
921 PLACE_LEFT(range, i);
922 else
923 PLACE_RIGHT(range, i);
924 }
925
926 v->spl_ldatum = RangeTypeGetDatum(left_range);
927 v->spl_rdatum = RangeTypeGetDatum(right_range);
928 }
929
930 /*
931 * Split based on classes of ranges.
932 *
933 * See get_gist_range_class for class definitions.
934 * classes_groups is an array of length CLS_COUNT indicating the side of the
935 * split to which each class should go.
936 */
937 static void
range_gist_class_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v,SplitLR * classes_groups)938 range_gist_class_split(TypeCacheEntry *typcache,
939 GistEntryVector *entryvec,
940 GIST_SPLITVEC *v,
941 SplitLR *classes_groups)
942 {
943 RangeType *left_range = NULL;
944 RangeType *right_range = NULL;
945 OffsetNumber i,
946 maxoff;
947
948 maxoff = entryvec->n - 1;
949
950 v->spl_nleft = 0;
951 v->spl_nright = 0;
952 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
953 {
954 RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
955 int class;
956
957 /* Get class of range */
958 class = get_gist_range_class(range);
959
960 /* Place range to appropriate page */
961 if (classes_groups[class] == SPLIT_LEFT)
962 PLACE_LEFT(range, i);
963 else
964 {
965 Assert(classes_groups[class] == SPLIT_RIGHT);
966 PLACE_RIGHT(range, i);
967 }
968 }
969
970 v->spl_ldatum = RangeTypeGetDatum(left_range);
971 v->spl_rdatum = RangeTypeGetDatum(right_range);
972 }
973
974 /*
975 * Sorting based split. First half of entries according to the sort will be
976 * placed to one page, and second half of entries will be placed to other
977 * page. use_upper_bound parameter indicates whether to use upper or lower
978 * bound for sorting.
979 */
980 static void
range_gist_single_sorting_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v,bool use_upper_bound)981 range_gist_single_sorting_split(TypeCacheEntry *typcache,
982 GistEntryVector *entryvec,
983 GIST_SPLITVEC *v,
984 bool use_upper_bound)
985 {
986 SingleBoundSortItem *sortItems;
987 RangeType *left_range = NULL;
988 RangeType *right_range = NULL;
989 OffsetNumber i,
990 maxoff,
991 split_idx;
992
993 maxoff = entryvec->n - 1;
994
995 sortItems = (SingleBoundSortItem *)
996 palloc(maxoff * sizeof(SingleBoundSortItem));
997
998 /*
999 * Prepare auxiliary array and sort the values.
1000 */
1001 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1002 {
1003 RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
1004 RangeBound bound2;
1005 bool empty;
1006
1007 sortItems[i - 1].index = i;
1008 /* Put appropriate bound into array */
1009 if (use_upper_bound)
1010 range_deserialize(typcache, range, &bound2,
1011 &sortItems[i - 1].bound, &empty);
1012 else
1013 range_deserialize(typcache, range, &sortItems[i - 1].bound,
1014 &bound2, &empty);
1015 Assert(!empty);
1016 }
1017
1018 qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
1019 single_bound_cmp, typcache);
1020
1021 split_idx = maxoff / 2;
1022
1023 v->spl_nleft = 0;
1024 v->spl_nright = 0;
1025
1026 for (i = 0; i < maxoff; i++)
1027 {
1028 int idx = sortItems[i].index;
1029 RangeType *range = DatumGetRangeType(entryvec->vector[idx].key);
1030
1031 if (i < split_idx)
1032 PLACE_LEFT(range, idx);
1033 else
1034 PLACE_RIGHT(range, idx);
1035 }
1036
1037 v->spl_ldatum = RangeTypeGetDatum(left_range);
1038 v->spl_rdatum = RangeTypeGetDatum(right_range);
1039 }
1040
1041 /*
1042 * Double sorting split algorithm.
1043 *
1044 * The algorithm considers dividing ranges into two groups. The first (left)
1045 * group contains general left bound. The second (right) group contains
1046 * general right bound. The challenge is to find upper bound of left group
1047 * and lower bound of right group so that overlap of groups is minimal and
1048 * ratio of distribution is acceptable. Algorithm finds for each lower bound of
1049 * right group minimal upper bound of left group, and for each upper bound of
1050 * left group maximal lower bound of right group. For each found pair
1051 * range_gist_consider_split considers replacement of currently selected
1052 * split with the new one.
1053 *
1054 * After that, all the entries are divided into three groups:
1055 * 1) Entries which should be placed to the left group
1056 * 2) Entries which should be placed to the right group
1057 * 3) "Common entries" which can be placed to either group without affecting
1058 * amount of overlap.
1059 *
1060 * The common ranges are distributed by difference of distance from lower
1061 * bound of common range to lower bound of right group and distance from upper
1062 * bound of common range to upper bound of left group.
1063 *
1064 * For details see:
1065 * "A new double sorting-based node splitting algorithm for R-tree",
1066 * A. Korotkov
1067 * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
1068 */
1069 static void
range_gist_double_sorting_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v)1070 range_gist_double_sorting_split(TypeCacheEntry *typcache,
1071 GistEntryVector *entryvec,
1072 GIST_SPLITVEC *v)
1073 {
1074 ConsiderSplitContext context;
1075 OffsetNumber i,
1076 maxoff;
1077 RangeType *range,
1078 *left_range = NULL,
1079 *right_range = NULL;
1080 int common_entries_count;
1081 NonEmptyRange *by_lower,
1082 *by_upper;
1083 CommonEntry *common_entries;
1084 int nentries,
1085 i1,
1086 i2;
1087 RangeBound *right_lower,
1088 *left_upper;
1089
1090 memset(&context, 0, sizeof(ConsiderSplitContext));
1091 context.typcache = typcache;
1092 context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
1093
1094 maxoff = entryvec->n - 1;
1095 nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
1096 context.first = true;
1097
1098 /* Allocate arrays for sorted range bounds */
1099 by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1100 by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1101
1102 /* Fill arrays of bounds */
1103 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1104 {
1105 RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
1106 bool empty;
1107
1108 range_deserialize(typcache, range,
1109 &by_lower[i - FirstOffsetNumber].lower,
1110 &by_lower[i - FirstOffsetNumber].upper,
1111 &empty);
1112 Assert(!empty);
1113 }
1114
1115 /*
1116 * Make two arrays of range bounds: one sorted by lower bound and another
1117 * sorted by upper bound.
1118 */
1119 memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
1120 qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
1121 interval_cmp_lower, typcache);
1122 qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
1123 interval_cmp_upper, typcache);
1124
1125 /*----------
1126 * The goal is to form a left and right range, so that every entry
1127 * range is contained by either left or right interval (or both).
1128 *
1129 * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
1130 *
1131 * 0 1 2 3 4
1132 * +-+
1133 * +---+
1134 * +-+
1135 * +---+
1136 *
1137 * The left and right ranges are of the form (0,a) and (b,4).
1138 * We first consider splits where b is the lower bound of an entry.
1139 * We iterate through all entries, and for each b, calculate the
1140 * smallest possible a. Then we consider splits where a is the
1141 * upper bound of an entry, and for each a, calculate the greatest
1142 * possible b.
1143 *
1144 * In the above example, the first loop would consider splits:
1145 * b=0: (0,1)-(0,4)
1146 * b=1: (0,1)-(1,4)
1147 * b=2: (0,3)-(2,4)
1148 *
1149 * And the second loop:
1150 * a=1: (0,1)-(1,4)
1151 * a=3: (0,3)-(2,4)
1152 * a=4: (0,4)-(2,4)
1153 *----------
1154 */
1155
1156 /*
1157 * Iterate over lower bound of right group, finding smallest possible
1158 * upper bound of left group.
1159 */
1160 i1 = 0;
1161 i2 = 0;
1162 right_lower = &by_lower[i1].lower;
1163 left_upper = &by_upper[i2].lower;
1164 while (true)
1165 {
1166 /*
1167 * Find next lower bound of right group.
1168 */
1169 while (i1 < nentries &&
1170 range_cmp_bounds(typcache, right_lower,
1171 &by_lower[i1].lower) == 0)
1172 {
1173 if (range_cmp_bounds(typcache, &by_lower[i1].upper,
1174 left_upper) > 0)
1175 left_upper = &by_lower[i1].upper;
1176 i1++;
1177 }
1178 if (i1 >= nentries)
1179 break;
1180 right_lower = &by_lower[i1].lower;
1181
1182 /*
1183 * Find count of ranges which anyway should be placed to the left
1184 * group.
1185 */
1186 while (i2 < nentries &&
1187 range_cmp_bounds(typcache, &by_upper[i2].upper,
1188 left_upper) <= 0)
1189 i2++;
1190
1191 /*
1192 * Consider found split to see if it's better than what we had.
1193 */
1194 range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
1195 }
1196
1197 /*
1198 * Iterate over upper bound of left group finding greatest possible lower
1199 * bound of right group.
1200 */
1201 i1 = nentries - 1;
1202 i2 = nentries - 1;
1203 right_lower = &by_lower[i1].upper;
1204 left_upper = &by_upper[i2].upper;
1205 while (true)
1206 {
1207 /*
1208 * Find next upper bound of left group.
1209 */
1210 while (i2 >= 0 &&
1211 range_cmp_bounds(typcache, left_upper,
1212 &by_upper[i2].upper) == 0)
1213 {
1214 if (range_cmp_bounds(typcache, &by_upper[i2].lower,
1215 right_lower) < 0)
1216 right_lower = &by_upper[i2].lower;
1217 i2--;
1218 }
1219 if (i2 < 0)
1220 break;
1221 left_upper = &by_upper[i2].upper;
1222
1223 /*
1224 * Find count of intervals which anyway should be placed to the right
1225 * group.
1226 */
1227 while (i1 >= 0 &&
1228 range_cmp_bounds(typcache, &by_lower[i1].lower,
1229 right_lower) >= 0)
1230 i1--;
1231
1232 /*
1233 * Consider found split to see if it's better than what we had.
1234 */
1235 range_gist_consider_split(&context, right_lower, i1 + 1,
1236 left_upper, i2 + 1);
1237 }
1238
1239 /*
1240 * If we failed to find any acceptable splits, use trivial split.
1241 */
1242 if (context.first)
1243 {
1244 range_gist_fallback_split(typcache, entryvec, v);
1245 return;
1246 }
1247
1248 /*
1249 * Ok, we have now selected bounds of the groups. Now we have to
1250 * distribute entries themselves. At first we distribute entries which can
1251 * be placed unambiguously and collect "common entries" to array.
1252 */
1253
1254 /* Allocate vectors for results */
1255 v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1256 v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1257 v->spl_nleft = 0;
1258 v->spl_nright = 0;
1259
1260 /*
1261 * Allocate an array for "common entries" - entries which can be placed to
1262 * either group without affecting overlap along selected axis.
1263 */
1264 common_entries_count = 0;
1265 common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
1266
1267 /*
1268 * Distribute entries which can be distributed unambiguously, and collect
1269 * common entries.
1270 */
1271 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1272 {
1273 RangeBound lower,
1274 upper;
1275 bool empty;
1276
1277 /*
1278 * Get upper and lower bounds along selected axis.
1279 */
1280 range = DatumGetRangeType(entryvec->vector[i].key);
1281
1282 range_deserialize(typcache, range, &lower, &upper, &empty);
1283
1284 if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
1285 {
1286 /* Fits in the left group */
1287 if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
1288 {
1289 /* Fits also in the right group, so "common entry" */
1290 common_entries[common_entries_count].index = i;
1291 if (context.has_subtype_diff)
1292 {
1293 /*
1294 * delta = (lower - context.right_lower) -
1295 * (context.left_upper - upper)
1296 */
1297 common_entries[common_entries_count].delta =
1298 call_subtype_diff(typcache,
1299 lower.val,
1300 context.right_lower->val) -
1301 call_subtype_diff(typcache,
1302 context.left_upper->val,
1303 upper.val);
1304 }
1305 else
1306 {
1307 /* Without subtype_diff, take all deltas as zero */
1308 common_entries[common_entries_count].delta = 0;
1309 }
1310 common_entries_count++;
1311 }
1312 else
1313 {
1314 /* Doesn't fit to the right group, so join to the left group */
1315 PLACE_LEFT(range, i);
1316 }
1317 }
1318 else
1319 {
1320 /*
1321 * Each entry should fit on either left or right group. Since this
1322 * entry didn't fit in the left group, it better fit in the right
1323 * group.
1324 */
1325 Assert(range_cmp_bounds(typcache, &lower,
1326 context.right_lower) >= 0);
1327 PLACE_RIGHT(range, i);
1328 }
1329 }
1330
1331 /*
1332 * Distribute "common entries", if any.
1333 */
1334 if (common_entries_count > 0)
1335 {
1336 /*
1337 * Sort "common entries" by calculated deltas in order to distribute
1338 * the most ambiguous entries first.
1339 */
1340 qsort(common_entries, common_entries_count, sizeof(CommonEntry),
1341 common_entry_cmp);
1342
1343 /*
1344 * Distribute "common entries" between groups according to sorting.
1345 */
1346 for (i = 0; i < common_entries_count; i++)
1347 {
1348 int idx = common_entries[i].index;
1349
1350 range = DatumGetRangeType(entryvec->vector[idx].key);
1351
1352 /*
1353 * Check if we have to place this entry in either group to achieve
1354 * LIMIT_RATIO.
1355 */
1356 if (i < context.common_left)
1357 PLACE_LEFT(range, idx);
1358 else
1359 PLACE_RIGHT(range, idx);
1360 }
1361 }
1362
1363 v->spl_ldatum = PointerGetDatum(left_range);
1364 v->spl_rdatum = PointerGetDatum(right_range);
1365 }
1366
1367 /*
1368 * Consider replacement of currently selected split with a better one
1369 * during range_gist_double_sorting_split.
1370 */
1371 static void
range_gist_consider_split(ConsiderSplitContext * context,RangeBound * right_lower,int min_left_count,RangeBound * left_upper,int max_left_count)1372 range_gist_consider_split(ConsiderSplitContext *context,
1373 RangeBound *right_lower, int min_left_count,
1374 RangeBound *left_upper, int max_left_count)
1375 {
1376 int left_count,
1377 right_count;
1378 float4 ratio,
1379 overlap;
1380
1381 /*
1382 * Calculate entries distribution ratio assuming most uniform distribution
1383 * of common entries.
1384 */
1385 if (min_left_count >= (context->entries_count + 1) / 2)
1386 left_count = min_left_count;
1387 else if (max_left_count <= context->entries_count / 2)
1388 left_count = max_left_count;
1389 else
1390 left_count = context->entries_count / 2;
1391 right_count = context->entries_count - left_count;
1392
1393 /*
1394 * Ratio of split: quotient between size of smaller group and total
1395 * entries count. This is necessarily 0.5 or less; if it's less than
1396 * LIMIT_RATIO then we will never accept the new split.
1397 */
1398 ratio = ((float4) Min(left_count, right_count)) /
1399 ((float4) context->entries_count);
1400
1401 if (ratio > LIMIT_RATIO)
1402 {
1403 bool selectthis = false;
1404
1405 /*
1406 * The ratio is acceptable, so compare current split with previously
1407 * selected one. We search for minimal overlap (allowing negative
1408 * values) and minimal ratio secondarily. If subtype_diff is
1409 * available, it's used for overlap measure. Without subtype_diff we
1410 * use number of "common entries" as an overlap measure.
1411 */
1412 if (context->has_subtype_diff)
1413 overlap = call_subtype_diff(context->typcache,
1414 left_upper->val,
1415 right_lower->val);
1416 else
1417 overlap = max_left_count - min_left_count;
1418
1419 /* If there is no previous selection, select this split */
1420 if (context->first)
1421 selectthis = true;
1422 else
1423 {
1424 /*
1425 * Choose the new split if it has a smaller overlap, or same
1426 * overlap but better ratio.
1427 */
1428 if (overlap < context->overlap ||
1429 (overlap == context->overlap && ratio > context->ratio))
1430 selectthis = true;
1431 }
1432
1433 if (selectthis)
1434 {
1435 /* save information about selected split */
1436 context->first = false;
1437 context->ratio = ratio;
1438 context->overlap = overlap;
1439 context->right_lower = right_lower;
1440 context->left_upper = left_upper;
1441 context->common_left = max_left_count - left_count;
1442 context->common_right = left_count - min_left_count;
1443 }
1444 }
1445 }
1446
1447 /*
1448 * Find class number for range.
1449 *
1450 * The class number is a valid combination of the properties of the
1451 * range. Note: the highest possible number is 8, because CLS_EMPTY
1452 * can't be combined with anything else.
1453 */
1454 static int
get_gist_range_class(RangeType * range)1455 get_gist_range_class(RangeType *range)
1456 {
1457 int classNumber;
1458 char flags;
1459
1460 flags = range_get_flags(range);
1461 if (flags & RANGE_EMPTY)
1462 {
1463 classNumber = CLS_EMPTY;
1464 }
1465 else
1466 {
1467 classNumber = 0;
1468 if (flags & RANGE_LB_INF)
1469 classNumber |= CLS_LOWER_INF;
1470 if (flags & RANGE_UB_INF)
1471 classNumber |= CLS_UPPER_INF;
1472 if (flags & RANGE_CONTAIN_EMPTY)
1473 classNumber |= CLS_CONTAIN_EMPTY;
1474 }
1475 return classNumber;
1476 }
1477
1478 /*
1479 * Comparison function for range_gist_single_sorting_split.
1480 */
1481 static int
single_bound_cmp(const void * a,const void * b,void * arg)1482 single_bound_cmp(const void *a, const void *b, void *arg)
1483 {
1484 SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
1485 SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
1486 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1487
1488 return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
1489 }
1490
1491 /*
1492 * Compare NonEmptyRanges by lower bound.
1493 */
1494 static int
interval_cmp_lower(const void * a,const void * b,void * arg)1495 interval_cmp_lower(const void *a, const void *b, void *arg)
1496 {
1497 NonEmptyRange *i1 = (NonEmptyRange *) a;
1498 NonEmptyRange *i2 = (NonEmptyRange *) b;
1499 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1500
1501 return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
1502 }
1503
1504 /*
1505 * Compare NonEmptyRanges by upper bound.
1506 */
1507 static int
interval_cmp_upper(const void * a,const void * b,void * arg)1508 interval_cmp_upper(const void *a, const void *b, void *arg)
1509 {
1510 NonEmptyRange *i1 = (NonEmptyRange *) a;
1511 NonEmptyRange *i2 = (NonEmptyRange *) b;
1512 TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1513
1514 return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
1515 }
1516
1517 /*
1518 * Compare CommonEntrys by their deltas.
1519 */
1520 static int
common_entry_cmp(const void * i1,const void * i2)1521 common_entry_cmp(const void *i1, const void *i2)
1522 {
1523 double delta1 = ((CommonEntry *) i1)->delta;
1524 double delta2 = ((CommonEntry *) i2)->delta;
1525
1526 if (delta1 < delta2)
1527 return -1;
1528 else if (delta1 > delta2)
1529 return 1;
1530 else
1531 return 0;
1532 }
1533
1534 /*
1535 * Convenience function to invoke type-specific subtype_diff function.
1536 * Caller must have already checked that there is one for the range type.
1537 */
1538 static float8
call_subtype_diff(TypeCacheEntry * typcache,Datum val1,Datum val2)1539 call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
1540 {
1541 float8 value;
1542
1543 value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
1544 typcache->rng_collation,
1545 val1, val2));
1546 /* Cope with buggy subtype_diff function by returning zero */
1547 if (value >= 0.0)
1548 return value;
1549 return 0.0;
1550 }
1551