1 /*------------------------------------------------------------------------- 2 * 3 * rangetypes_gist.c 4 * GiST support for range types. 5 * 6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/utils/adt/rangetypes_gist.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 #include "postgres.h" 16 17 #include "access/gist.h" 18 #include "access/stratnum.h" 19 #include "utils/builtins.h" 20 #include "utils/datum.h" 21 #include "utils/rangetypes.h" 22 23 24 /* 25 * Range class properties used to segregate different classes of ranges in 26 * GiST. Each unique combination of properties is a class. CLS_EMPTY cannot 27 * be combined with anything else. 28 */ 29 #define CLS_NORMAL 0 /* Ordinary finite range (no bits set) */ 30 #define CLS_LOWER_INF 1 /* Lower bound is infinity */ 31 #define CLS_UPPER_INF 2 /* Upper bound is infinity */ 32 #define CLS_CONTAIN_EMPTY 4 /* Contains underlying empty ranges */ 33 #define CLS_EMPTY 8 /* Special class for empty ranges */ 34 35 #define CLS_COUNT 9 /* # of classes; includes all combinations of 36 * properties. CLS_EMPTY doesn't combine with 37 * anything else, so it's only 2^3 + 1. */ 38 39 /* 40 * Minimum accepted ratio of split for items of the same class. If the items 41 * are of different classes, we will separate along those lines regardless of 42 * the ratio. 43 */ 44 #define LIMIT_RATIO 0.3 45 46 /* Constants for fixed penalty values */ 47 #define INFINITE_BOUND_PENALTY 2.0 48 #define CONTAIN_EMPTY_PENALTY 1.0 49 #define DEFAULT_SUBTYPE_DIFF_PENALTY 1.0 50 51 /* 52 * Per-item data for range_gist_single_sorting_split. 53 */ 54 typedef struct 55 { 56 int index; 57 RangeBound bound; 58 } SingleBoundSortItem; 59 60 /* place on left or right side of split? */ 61 typedef enum 62 { 63 SPLIT_LEFT = 0, /* makes initialization to SPLIT_LEFT easier */ 64 SPLIT_RIGHT 65 } SplitLR; 66 67 /* 68 * Context for range_gist_consider_split. 69 */ 70 typedef struct 71 { 72 TypeCacheEntry *typcache; /* typcache for range type */ 73 bool has_subtype_diff; /* does it have subtype_diff? */ 74 int entries_count; /* total number of entries being split */ 75 76 /* Information about currently selected split follows */ 77 78 bool first; /* true if no split was selected yet */ 79 80 RangeBound *left_upper; /* upper bound of left interval */ 81 RangeBound *right_lower; /* lower bound of right interval */ 82 83 float4 ratio; /* split ratio */ 84 float4 overlap; /* overlap between left and right predicate */ 85 int common_left; /* # common entries destined for each side */ 86 int common_right; 87 } ConsiderSplitContext; 88 89 /* 90 * Bounds extracted from a non-empty range, for use in 91 * range_gist_double_sorting_split. 92 */ 93 typedef struct 94 { 95 RangeBound lower; 96 RangeBound upper; 97 } NonEmptyRange; 98 99 /* 100 * Represents information about an entry that can be placed in either group 101 * without affecting overlap over selected axis ("common entry"). 102 */ 103 typedef struct 104 { 105 /* Index of entry in the initial array */ 106 int index; 107 /* Delta between closeness of range to each of the two groups */ 108 double delta; 109 } CommonEntry; 110 111 /* Helper macros to place an entry in the left or right group during split */ 112 /* Note direct access to variables v, typcache, left_range, right_range */ 113 #define PLACE_LEFT(range, off) \ 114 do { \ 115 if (v->spl_nleft > 0) \ 116 left_range = range_super_union(typcache, left_range, range); \ 117 else \ 118 left_range = (range); \ 119 v->spl_left[v->spl_nleft++] = (off); \ 120 } while(0) 121 122 #define PLACE_RIGHT(range, off) \ 123 do { \ 124 if (v->spl_nright > 0) \ 125 right_range = range_super_union(typcache, right_range, range); \ 126 else \ 127 right_range = (range); \ 128 v->spl_right[v->spl_nright++] = (off); \ 129 } while(0) 130 131 /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */ 132 #define rangeCopy(r) \ 133 ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \ 134 false, -1))) 135 136 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1, 137 RangeType *r2); 138 static bool range_gist_consistent_int(TypeCacheEntry *typcache, 139 StrategyNumber strategy, RangeType *key, 140 Datum query); 141 static bool range_gist_consistent_leaf(TypeCacheEntry *typcache, 142 StrategyNumber strategy, RangeType *key, 143 Datum query); 144 static void range_gist_fallback_split(TypeCacheEntry *typcache, 145 GistEntryVector *entryvec, 146 GIST_SPLITVEC *v); 147 static void range_gist_class_split(TypeCacheEntry *typcache, 148 GistEntryVector *entryvec, 149 GIST_SPLITVEC *v, 150 SplitLR *classes_groups); 151 static void range_gist_single_sorting_split(TypeCacheEntry *typcache, 152 GistEntryVector *entryvec, 153 GIST_SPLITVEC *v, 154 bool use_upper_bound); 155 static void range_gist_double_sorting_split(TypeCacheEntry *typcache, 156 GistEntryVector *entryvec, 157 GIST_SPLITVEC *v); 158 static void range_gist_consider_split(ConsiderSplitContext *context, 159 RangeBound *right_lower, int min_left_count, 160 RangeBound *left_upper, int max_left_count); 161 static int get_gist_range_class(RangeType *range); 162 static int single_bound_cmp(const void *a, const void *b, void *arg); 163 static int interval_cmp_lower(const void *a, const void *b, void *arg); 164 static int interval_cmp_upper(const void *a, const void *b, void *arg); 165 static int common_entry_cmp(const void *i1, const void *i2); 166 static float8 call_subtype_diff(TypeCacheEntry *typcache, 167 Datum val1, Datum val2); 168 169 170 /* GiST query consistency check */ 171 Datum 172 range_gist_consistent(PG_FUNCTION_ARGS) 173 { 174 GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0); 175 Datum query = PG_GETARG_DATUM(1); 176 StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2); 177 178 /* Oid subtype = PG_GETARG_OID(3); */ 179 bool *recheck = (bool *) PG_GETARG_POINTER(4); 180 RangeType *key = DatumGetRangeTypeP(entry->key); 181 TypeCacheEntry *typcache; 182 183 /* All operators served by this function are exact */ 184 *recheck = false; 185 186 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key)); 187 188 if (GIST_LEAF(entry)) 189 PG_RETURN_BOOL(range_gist_consistent_leaf(typcache, strategy, 190 key, query)); 191 else 192 PG_RETURN_BOOL(range_gist_consistent_int(typcache, strategy, 193 key, query)); 194 } 195 196 /* form union range */ 197 Datum 198 range_gist_union(PG_FUNCTION_ARGS) 199 { 200 GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); 201 GISTENTRY *ent = entryvec->vector; 202 RangeType *result_range; 203 TypeCacheEntry *typcache; 204 int i; 205 206 result_range = DatumGetRangeTypeP(ent[0].key); 207 208 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range)); 209 210 for (i = 1; i < entryvec->n; i++) 211 { 212 result_range = range_super_union(typcache, result_range, 213 DatumGetRangeTypeP(ent[i].key)); 214 } 215 216 PG_RETURN_RANGE_P(result_range); 217 } 218 219 /* 220 * We store ranges as ranges in GiST indexes, so we do not need 221 * compress, decompress, or fetch functions. Note this implies a limit 222 * on the size of range values that can be indexed. 223 */ 224 225 /* 226 * GiST page split penalty function. 227 * 228 * The penalty function has the following goals (in order from most to least 229 * important): 230 * - Keep normal ranges separate 231 * - Avoid broadening the class of the original predicate 232 * - Avoid broadening (as determined by subtype_diff) the original predicate 233 * - Favor adding ranges to narrower original predicates 234 */ 235 Datum 236 range_gist_penalty(PG_FUNCTION_ARGS) 237 { 238 GISTENTRY *origentry = (GISTENTRY *) PG_GETARG_POINTER(0); 239 GISTENTRY *newentry = (GISTENTRY *) PG_GETARG_POINTER(1); 240 float *penalty = (float *) PG_GETARG_POINTER(2); 241 RangeType *orig = DatumGetRangeTypeP(origentry->key); 242 RangeType *new = DatumGetRangeTypeP(newentry->key); 243 TypeCacheEntry *typcache; 244 bool has_subtype_diff; 245 RangeBound orig_lower, 246 new_lower, 247 orig_upper, 248 new_upper; 249 bool orig_empty, 250 new_empty; 251 252 if (RangeTypeGetOid(orig) != RangeTypeGetOid(new)) 253 elog(ERROR, "range types do not match"); 254 255 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig)); 256 257 has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid); 258 259 range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty); 260 range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty); 261 262 /* 263 * Distinct branches for handling distinct classes of ranges. Note that 264 * penalty values only need to be commensurate within the same class of 265 * new range. 266 */ 267 if (new_empty) 268 { 269 /* Handle insertion of empty range */ 270 if (orig_empty) 271 { 272 /* 273 * The best case is to insert it to empty original range. 274 * Insertion here means no broadening of original range. Also 275 * original range is the most narrow. 276 */ 277 *penalty = 0.0; 278 } 279 else if (RangeIsOrContainsEmpty(orig)) 280 { 281 /* 282 * The second case is to insert empty range into range which 283 * contains at least one underlying empty range. There is still 284 * no broadening of original range, but original range is not as 285 * narrow as possible. 286 */ 287 *penalty = CONTAIN_EMPTY_PENALTY; 288 } 289 else if (orig_lower.infinite && orig_upper.infinite) 290 { 291 /* 292 * Original range requires broadening. (-inf; +inf) is most far 293 * from normal range in this case. 294 */ 295 *penalty = 2 * CONTAIN_EMPTY_PENALTY; 296 } 297 else if (orig_lower.infinite || orig_upper.infinite) 298 { 299 /* 300 * (-inf, x) or (x, +inf) original ranges are closer to normal 301 * ranges, so it's worse to mix it with empty ranges. 302 */ 303 *penalty = 3 * CONTAIN_EMPTY_PENALTY; 304 } 305 else 306 { 307 /* 308 * The least preferred case is broadening of normal range. 309 */ 310 *penalty = 4 * CONTAIN_EMPTY_PENALTY; 311 } 312 } 313 else if (new_lower.infinite && new_upper.infinite) 314 { 315 /* Handle insertion of (-inf, +inf) range */ 316 if (orig_lower.infinite && orig_upper.infinite) 317 { 318 /* 319 * Best case is inserting to (-inf, +inf) original range. 320 */ 321 *penalty = 0.0; 322 } 323 else if (orig_lower.infinite || orig_upper.infinite) 324 { 325 /* 326 * When original range is (-inf, x) or (x, +inf) it requires 327 * broadening of original range (extension of one bound to 328 * infinity). 329 */ 330 *penalty = INFINITE_BOUND_PENALTY; 331 } 332 else 333 { 334 /* 335 * Insertion to normal original range is least preferred. 336 */ 337 *penalty = 2 * INFINITE_BOUND_PENALTY; 338 } 339 340 if (RangeIsOrContainsEmpty(orig)) 341 { 342 /* 343 * Original range is narrower when it doesn't contain empty 344 * ranges. Add additional penalty otherwise. 345 */ 346 *penalty += CONTAIN_EMPTY_PENALTY; 347 } 348 } 349 else if (new_lower.infinite) 350 { 351 /* Handle insertion of (-inf, x) range */ 352 if (!orig_empty && orig_lower.infinite) 353 { 354 if (orig_upper.infinite) 355 { 356 /* 357 * (-inf, +inf) range won't be extended by insertion of (-inf, 358 * x) range. It's a less desirable case than insertion to 359 * (-inf, y) original range without extension, because in that 360 * case original range is narrower. But we can't express that 361 * in single float value. 362 */ 363 *penalty = 0.0; 364 } 365 else 366 { 367 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0) 368 { 369 /* 370 * Get extension of original range using subtype_diff. Use 371 * constant if subtype_diff unavailable. 372 */ 373 if (has_subtype_diff) 374 *penalty = call_subtype_diff(typcache, 375 new_upper.val, 376 orig_upper.val); 377 else 378 *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY; 379 } 380 else 381 { 382 /* No extension of original range */ 383 *penalty = 0.0; 384 } 385 } 386 } 387 else 388 { 389 /* 390 * If lower bound of original range is not -inf, then extension of 391 * it is infinity. 392 */ 393 *penalty = get_float4_infinity(); 394 } 395 } 396 else if (new_upper.infinite) 397 { 398 /* Handle insertion of (x, +inf) range */ 399 if (!orig_empty && orig_upper.infinite) 400 { 401 if (orig_lower.infinite) 402 { 403 /* 404 * (-inf, +inf) range won't be extended by insertion of (x, 405 * +inf) range. It's a less desirable case than insertion to 406 * (y, +inf) original range without extension, because in that 407 * case original range is narrower. But we can't express that 408 * in single float value. 409 */ 410 *penalty = 0.0; 411 } 412 else 413 { 414 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0) 415 { 416 /* 417 * Get extension of original range using subtype_diff. Use 418 * constant if subtype_diff unavailable. 419 */ 420 if (has_subtype_diff) 421 *penalty = call_subtype_diff(typcache, 422 orig_lower.val, 423 new_lower.val); 424 else 425 *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY; 426 } 427 else 428 { 429 /* No extension of original range */ 430 *penalty = 0.0; 431 } 432 } 433 } 434 else 435 { 436 /* 437 * If upper bound of original range is not +inf, then extension of 438 * it is infinity. 439 */ 440 *penalty = get_float4_infinity(); 441 } 442 } 443 else 444 { 445 /* Handle insertion of normal (non-empty, non-infinite) range */ 446 if (orig_empty || orig_lower.infinite || orig_upper.infinite) 447 { 448 /* 449 * Avoid mixing normal ranges with infinite and empty ranges. 450 */ 451 *penalty = get_float4_infinity(); 452 } 453 else 454 { 455 /* 456 * Calculate extension of original range by calling subtype_diff. 457 * Use constant if subtype_diff unavailable. 458 */ 459 float8 diff = 0.0; 460 461 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0) 462 { 463 if (has_subtype_diff) 464 diff += call_subtype_diff(typcache, 465 orig_lower.val, 466 new_lower.val); 467 else 468 diff += DEFAULT_SUBTYPE_DIFF_PENALTY; 469 } 470 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0) 471 { 472 if (has_subtype_diff) 473 diff += call_subtype_diff(typcache, 474 new_upper.val, 475 orig_upper.val); 476 else 477 diff += DEFAULT_SUBTYPE_DIFF_PENALTY; 478 } 479 *penalty = diff; 480 } 481 } 482 483 PG_RETURN_POINTER(penalty); 484 } 485 486 /* 487 * The GiST PickSplit method for ranges 488 * 489 * Primarily, we try to segregate ranges of different classes. If splitting 490 * ranges of the same class, use the appropriate split method for that class. 491 */ 492 Datum 493 range_gist_picksplit(PG_FUNCTION_ARGS) 494 { 495 GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); 496 GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); 497 TypeCacheEntry *typcache; 498 OffsetNumber i; 499 RangeType *pred_left; 500 int nbytes; 501 OffsetNumber maxoff; 502 int count_in_classes[CLS_COUNT]; 503 int j; 504 int non_empty_classes_count = 0; 505 int biggest_class = -1; 506 int biggest_class_count = 0; 507 int total_count; 508 509 /* use first item to look up range type's info */ 510 pred_left = DatumGetRangeTypeP(entryvec->vector[FirstOffsetNumber].key); 511 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left)); 512 513 maxoff = entryvec->n - 1; 514 nbytes = (maxoff + 1) * sizeof(OffsetNumber); 515 v->spl_left = (OffsetNumber *) palloc(nbytes); 516 v->spl_right = (OffsetNumber *) palloc(nbytes); 517 518 /* 519 * Get count distribution of range classes. 520 */ 521 memset(count_in_classes, 0, sizeof(count_in_classes)); 522 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 523 { 524 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key); 525 526 count_in_classes[get_gist_range_class(range)]++; 527 } 528 529 /* 530 * Count non-empty classes and find biggest class. 531 */ 532 total_count = maxoff; 533 for (j = 0; j < CLS_COUNT; j++) 534 { 535 if (count_in_classes[j] > 0) 536 { 537 if (count_in_classes[j] > biggest_class_count) 538 { 539 biggest_class_count = count_in_classes[j]; 540 biggest_class = j; 541 } 542 non_empty_classes_count++; 543 } 544 } 545 546 Assert(non_empty_classes_count > 0); 547 548 if (non_empty_classes_count == 1) 549 { 550 /* One non-empty class, so split inside class */ 551 if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL) 552 { 553 /* double sorting split for normal ranges */ 554 range_gist_double_sorting_split(typcache, entryvec, v); 555 } 556 else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF) 557 { 558 /* upper bound sorting split for (-inf, x) ranges */ 559 range_gist_single_sorting_split(typcache, entryvec, v, true); 560 } 561 else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF) 562 { 563 /* lower bound sorting split for (x, +inf) ranges */ 564 range_gist_single_sorting_split(typcache, entryvec, v, false); 565 } 566 else 567 { 568 /* trivial split for all (-inf, +inf) or all empty ranges */ 569 range_gist_fallback_split(typcache, entryvec, v); 570 } 571 } 572 else 573 { 574 /* 575 * Class based split. 576 * 577 * To which side of the split should each class go? Initialize them 578 * all to go to the left side. 579 */ 580 SplitLR classes_groups[CLS_COUNT]; 581 582 memset(classes_groups, 0, sizeof(classes_groups)); 583 584 if (count_in_classes[CLS_NORMAL] > 0) 585 { 586 /* separate normal ranges if any */ 587 classes_groups[CLS_NORMAL] = SPLIT_RIGHT; 588 } 589 else 590 { 591 /*---------- 592 * Try to split classes in one of two ways: 593 * 1) containing infinities - not containing infinities 594 * 2) containing empty - not containing empty 595 * 596 * Select the way which balances the ranges between left and right 597 * the best. If split in these ways is not possible, there are at 598 * most 3 classes, so just separate biggest class. 599 *---------- 600 */ 601 int infCount, 602 nonInfCount; 603 int emptyCount, 604 nonEmptyCount; 605 606 nonInfCount = 607 count_in_classes[CLS_NORMAL] + 608 count_in_classes[CLS_CONTAIN_EMPTY] + 609 count_in_classes[CLS_EMPTY]; 610 infCount = total_count - nonInfCount; 611 612 nonEmptyCount = 613 count_in_classes[CLS_NORMAL] + 614 count_in_classes[CLS_LOWER_INF] + 615 count_in_classes[CLS_UPPER_INF] + 616 count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF]; 617 emptyCount = total_count - nonEmptyCount; 618 619 if (infCount > 0 && nonInfCount > 0 && 620 (Abs(infCount - nonInfCount) <= 621 Abs(emptyCount - nonEmptyCount))) 622 { 623 classes_groups[CLS_NORMAL] = SPLIT_RIGHT; 624 classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT; 625 classes_groups[CLS_EMPTY] = SPLIT_RIGHT; 626 } 627 else if (emptyCount > 0 && nonEmptyCount > 0) 628 { 629 classes_groups[CLS_NORMAL] = SPLIT_RIGHT; 630 classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT; 631 classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT; 632 classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT; 633 } 634 else 635 { 636 /* 637 * Either total_count == emptyCount or total_count == 638 * infCount. 639 */ 640 classes_groups[biggest_class] = SPLIT_RIGHT; 641 } 642 } 643 644 range_gist_class_split(typcache, entryvec, v, classes_groups); 645 } 646 647 PG_RETURN_POINTER(v); 648 } 649 650 /* equality comparator for GiST */ 651 Datum 652 range_gist_same(PG_FUNCTION_ARGS) 653 { 654 RangeType *r1 = PG_GETARG_RANGE_P(0); 655 RangeType *r2 = PG_GETARG_RANGE_P(1); 656 bool *result = (bool *) PG_GETARG_POINTER(2); 657 658 /* 659 * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check 660 * that for ourselves. More generally, if the entries have been properly 661 * normalized, then unequal flags bytes must mean unequal ranges ... so 662 * let's just test all the flag bits at once. 663 */ 664 if (range_get_flags(r1) != range_get_flags(r2)) 665 *result = false; 666 else 667 { 668 TypeCacheEntry *typcache; 669 670 typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1)); 671 672 *result = range_eq_internal(typcache, r1, r2); 673 } 674 675 PG_RETURN_POINTER(result); 676 } 677 678 /* 679 *---------------------------------------------------------- 680 * STATIC FUNCTIONS 681 *---------------------------------------------------------- 682 */ 683 684 /* 685 * Return the smallest range that contains r1 and r2 686 * 687 * This differs from regular range_union in two critical ways: 688 * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb 689 * the intervening values into the result range. 690 * 2. We track whether any empty range has been union'd into the result, 691 * so that contained_by searches can be indexed. Note that this means 692 * that *all* unions formed within the GiST index must go through here. 693 */ 694 static RangeType * 695 range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2) 696 { 697 RangeType *result; 698 RangeBound lower1, 699 lower2; 700 RangeBound upper1, 701 upper2; 702 bool empty1, 703 empty2; 704 char flags1, 705 flags2; 706 RangeBound *result_lower; 707 RangeBound *result_upper; 708 709 range_deserialize(typcache, r1, &lower1, &upper1, &empty1); 710 range_deserialize(typcache, r2, &lower2, &upper2, &empty2); 711 flags1 = range_get_flags(r1); 712 flags2 = range_get_flags(r2); 713 714 if (empty1) 715 { 716 /* We can return r2 as-is if it already is or contains empty */ 717 if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY)) 718 return r2; 719 /* Else we'd better copy it (modify-in-place isn't safe) */ 720 r2 = rangeCopy(r2); 721 range_set_contain_empty(r2); 722 return r2; 723 } 724 if (empty2) 725 { 726 /* We can return r1 as-is if it already is or contains empty */ 727 if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY)) 728 return r1; 729 /* Else we'd better copy it (modify-in-place isn't safe) */ 730 r1 = rangeCopy(r1); 731 range_set_contain_empty(r1); 732 return r1; 733 } 734 735 if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0) 736 result_lower = &lower1; 737 else 738 result_lower = &lower2; 739 740 if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0) 741 result_upper = &upper1; 742 else 743 result_upper = &upper2; 744 745 /* optimization to avoid constructing a new range */ 746 if (result_lower == &lower1 && result_upper == &upper1 && 747 ((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY))) 748 return r1; 749 if (result_lower == &lower2 && result_upper == &upper2 && 750 ((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY))) 751 return r2; 752 753 result = make_range(typcache, result_lower, result_upper, false); 754 755 if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY)) 756 range_set_contain_empty(result); 757 758 return result; 759 } 760 761 /* 762 * GiST consistent test on an index internal page 763 */ 764 static bool 765 range_gist_consistent_int(TypeCacheEntry *typcache, StrategyNumber strategy, 766 RangeType *key, Datum query) 767 { 768 switch (strategy) 769 { 770 case RANGESTRAT_BEFORE: 771 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query))) 772 return false; 773 return (!range_overright_internal(typcache, key, 774 DatumGetRangeTypeP(query))); 775 case RANGESTRAT_OVERLEFT: 776 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query))) 777 return false; 778 return (!range_after_internal(typcache, key, 779 DatumGetRangeTypeP(query))); 780 case RANGESTRAT_OVERLAPS: 781 return range_overlaps_internal(typcache, key, 782 DatumGetRangeTypeP(query)); 783 case RANGESTRAT_OVERRIGHT: 784 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query))) 785 return false; 786 return (!range_before_internal(typcache, key, 787 DatumGetRangeTypeP(query))); 788 case RANGESTRAT_AFTER: 789 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query))) 790 return false; 791 return (!range_overleft_internal(typcache, key, 792 DatumGetRangeTypeP(query))); 793 case RANGESTRAT_ADJACENT: 794 if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query))) 795 return false; 796 if (range_adjacent_internal(typcache, key, 797 DatumGetRangeTypeP(query))) 798 return true; 799 return range_overlaps_internal(typcache, key, 800 DatumGetRangeTypeP(query)); 801 case RANGESTRAT_CONTAINS: 802 return range_contains_internal(typcache, key, 803 DatumGetRangeTypeP(query)); 804 case RANGESTRAT_CONTAINED_BY: 805 806 /* 807 * Empty ranges are contained by anything, so if key is or 808 * contains any empty ranges, we must descend into it. Otherwise, 809 * descend only if key overlaps the query. 810 */ 811 if (RangeIsOrContainsEmpty(key)) 812 return true; 813 return range_overlaps_internal(typcache, key, 814 DatumGetRangeTypeP(query)); 815 case RANGESTRAT_CONTAINS_ELEM: 816 return range_contains_elem_internal(typcache, key, query); 817 case RANGESTRAT_EQ: 818 819 /* 820 * If query is empty, descend only if the key is or contains any 821 * empty ranges. Otherwise, descend if key contains query. 822 */ 823 if (RangeIsEmpty(DatumGetRangeTypeP(query))) 824 return RangeIsOrContainsEmpty(key); 825 return range_contains_internal(typcache, key, 826 DatumGetRangeTypeP(query)); 827 default: 828 elog(ERROR, "unrecognized range strategy: %d", strategy); 829 return false; /* keep compiler quiet */ 830 } 831 } 832 833 /* 834 * GiST consistent test on an index leaf page 835 */ 836 static bool 837 range_gist_consistent_leaf(TypeCacheEntry *typcache, StrategyNumber strategy, 838 RangeType *key, Datum query) 839 { 840 switch (strategy) 841 { 842 case RANGESTRAT_BEFORE: 843 return range_before_internal(typcache, key, 844 DatumGetRangeTypeP(query)); 845 case RANGESTRAT_OVERLEFT: 846 return range_overleft_internal(typcache, key, 847 DatumGetRangeTypeP(query)); 848 case RANGESTRAT_OVERLAPS: 849 return range_overlaps_internal(typcache, key, 850 DatumGetRangeTypeP(query)); 851 case RANGESTRAT_OVERRIGHT: 852 return range_overright_internal(typcache, key, 853 DatumGetRangeTypeP(query)); 854 case RANGESTRAT_AFTER: 855 return range_after_internal(typcache, key, 856 DatumGetRangeTypeP(query)); 857 case RANGESTRAT_ADJACENT: 858 return range_adjacent_internal(typcache, key, 859 DatumGetRangeTypeP(query)); 860 case RANGESTRAT_CONTAINS: 861 return range_contains_internal(typcache, key, 862 DatumGetRangeTypeP(query)); 863 case RANGESTRAT_CONTAINED_BY: 864 return range_contained_by_internal(typcache, key, 865 DatumGetRangeTypeP(query)); 866 case RANGESTRAT_CONTAINS_ELEM: 867 return range_contains_elem_internal(typcache, key, query); 868 case RANGESTRAT_EQ: 869 return range_eq_internal(typcache, key, DatumGetRangeTypeP(query)); 870 default: 871 elog(ERROR, "unrecognized range strategy: %d", strategy); 872 return false; /* keep compiler quiet */ 873 } 874 } 875 876 /* 877 * Trivial split: half of entries will be placed on one page 878 * and the other half on the other page. 879 */ 880 static void 881 range_gist_fallback_split(TypeCacheEntry *typcache, 882 GistEntryVector *entryvec, 883 GIST_SPLITVEC *v) 884 { 885 RangeType *left_range = NULL; 886 RangeType *right_range = NULL; 887 OffsetNumber i, 888 maxoff, 889 split_idx; 890 891 maxoff = entryvec->n - 1; 892 /* Split entries before this to left page, after to right: */ 893 split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber; 894 895 v->spl_nleft = 0; 896 v->spl_nright = 0; 897 for (i = FirstOffsetNumber; i <= maxoff; i++) 898 { 899 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key); 900 901 if (i < split_idx) 902 PLACE_LEFT(range, i); 903 else 904 PLACE_RIGHT(range, i); 905 } 906 907 v->spl_ldatum = RangeTypePGetDatum(left_range); 908 v->spl_rdatum = RangeTypePGetDatum(right_range); 909 } 910 911 /* 912 * Split based on classes of ranges. 913 * 914 * See get_gist_range_class for class definitions. 915 * classes_groups is an array of length CLS_COUNT indicating the side of the 916 * split to which each class should go. 917 */ 918 static void 919 range_gist_class_split(TypeCacheEntry *typcache, 920 GistEntryVector *entryvec, 921 GIST_SPLITVEC *v, 922 SplitLR *classes_groups) 923 { 924 RangeType *left_range = NULL; 925 RangeType *right_range = NULL; 926 OffsetNumber i, 927 maxoff; 928 929 maxoff = entryvec->n - 1; 930 931 v->spl_nleft = 0; 932 v->spl_nright = 0; 933 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 934 { 935 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key); 936 int class; 937 938 /* Get class of range */ 939 class = get_gist_range_class(range); 940 941 /* Place range to appropriate page */ 942 if (classes_groups[class] == SPLIT_LEFT) 943 PLACE_LEFT(range, i); 944 else 945 { 946 Assert(classes_groups[class] == SPLIT_RIGHT); 947 PLACE_RIGHT(range, i); 948 } 949 } 950 951 v->spl_ldatum = RangeTypePGetDatum(left_range); 952 v->spl_rdatum = RangeTypePGetDatum(right_range); 953 } 954 955 /* 956 * Sorting based split. First half of entries according to the sort will be 957 * placed to one page, and second half of entries will be placed to other 958 * page. use_upper_bound parameter indicates whether to use upper or lower 959 * bound for sorting. 960 */ 961 static void 962 range_gist_single_sorting_split(TypeCacheEntry *typcache, 963 GistEntryVector *entryvec, 964 GIST_SPLITVEC *v, 965 bool use_upper_bound) 966 { 967 SingleBoundSortItem *sortItems; 968 RangeType *left_range = NULL; 969 RangeType *right_range = NULL; 970 OffsetNumber i, 971 maxoff, 972 split_idx; 973 974 maxoff = entryvec->n - 1; 975 976 sortItems = (SingleBoundSortItem *) 977 palloc(maxoff * sizeof(SingleBoundSortItem)); 978 979 /* 980 * Prepare auxiliary array and sort the values. 981 */ 982 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 983 { 984 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key); 985 RangeBound bound2; 986 bool empty; 987 988 sortItems[i - 1].index = i; 989 /* Put appropriate bound into array */ 990 if (use_upper_bound) 991 range_deserialize(typcache, range, &bound2, 992 &sortItems[i - 1].bound, &empty); 993 else 994 range_deserialize(typcache, range, &sortItems[i - 1].bound, 995 &bound2, &empty); 996 Assert(!empty); 997 } 998 999 qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem), 1000 single_bound_cmp, typcache); 1001 1002 split_idx = maxoff / 2; 1003 1004 v->spl_nleft = 0; 1005 v->spl_nright = 0; 1006 1007 for (i = 0; i < maxoff; i++) 1008 { 1009 int idx = sortItems[i].index; 1010 RangeType *range = DatumGetRangeTypeP(entryvec->vector[idx].key); 1011 1012 if (i < split_idx) 1013 PLACE_LEFT(range, idx); 1014 else 1015 PLACE_RIGHT(range, idx); 1016 } 1017 1018 v->spl_ldatum = RangeTypePGetDatum(left_range); 1019 v->spl_rdatum = RangeTypePGetDatum(right_range); 1020 } 1021 1022 /* 1023 * Double sorting split algorithm. 1024 * 1025 * The algorithm considers dividing ranges into two groups. The first (left) 1026 * group contains general left bound. The second (right) group contains 1027 * general right bound. The challenge is to find upper bound of left group 1028 * and lower bound of right group so that overlap of groups is minimal and 1029 * ratio of distribution is acceptable. Algorithm finds for each lower bound of 1030 * right group minimal upper bound of left group, and for each upper bound of 1031 * left group maximal lower bound of right group. For each found pair 1032 * range_gist_consider_split considers replacement of currently selected 1033 * split with the new one. 1034 * 1035 * After that, all the entries are divided into three groups: 1036 * 1) Entries which should be placed to the left group 1037 * 2) Entries which should be placed to the right group 1038 * 3) "Common entries" which can be placed to either group without affecting 1039 * amount of overlap. 1040 * 1041 * The common ranges are distributed by difference of distance from lower 1042 * bound of common range to lower bound of right group and distance from upper 1043 * bound of common range to upper bound of left group. 1044 * 1045 * For details see: 1046 * "A new double sorting-based node splitting algorithm for R-tree", 1047 * A. Korotkov 1048 * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36 1049 */ 1050 static void 1051 range_gist_double_sorting_split(TypeCacheEntry *typcache, 1052 GistEntryVector *entryvec, 1053 GIST_SPLITVEC *v) 1054 { 1055 ConsiderSplitContext context; 1056 OffsetNumber i, 1057 maxoff; 1058 RangeType *range, 1059 *left_range = NULL, 1060 *right_range = NULL; 1061 int common_entries_count; 1062 NonEmptyRange *by_lower, 1063 *by_upper; 1064 CommonEntry *common_entries; 1065 int nentries, 1066 i1, 1067 i2; 1068 RangeBound *right_lower, 1069 *left_upper; 1070 1071 memset(&context, 0, sizeof(ConsiderSplitContext)); 1072 context.typcache = typcache; 1073 context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid); 1074 1075 maxoff = entryvec->n - 1; 1076 nentries = context.entries_count = maxoff - FirstOffsetNumber + 1; 1077 context.first = true; 1078 1079 /* Allocate arrays for sorted range bounds */ 1080 by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange)); 1081 by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange)); 1082 1083 /* Fill arrays of bounds */ 1084 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 1085 { 1086 RangeType *range = DatumGetRangeTypeP(entryvec->vector[i].key); 1087 bool empty; 1088 1089 range_deserialize(typcache, range, 1090 &by_lower[i - FirstOffsetNumber].lower, 1091 &by_lower[i - FirstOffsetNumber].upper, 1092 &empty); 1093 Assert(!empty); 1094 } 1095 1096 /* 1097 * Make two arrays of range bounds: one sorted by lower bound and another 1098 * sorted by upper bound. 1099 */ 1100 memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange)); 1101 qsort_arg(by_lower, nentries, sizeof(NonEmptyRange), 1102 interval_cmp_lower, typcache); 1103 qsort_arg(by_upper, nentries, sizeof(NonEmptyRange), 1104 interval_cmp_upper, typcache); 1105 1106 /*---------- 1107 * The goal is to form a left and right range, so that every entry 1108 * range is contained by either left or right interval (or both). 1109 * 1110 * For example, with the ranges (0,1), (1,3), (2,3), (2,4): 1111 * 1112 * 0 1 2 3 4 1113 * +-+ 1114 * +---+ 1115 * +-+ 1116 * +---+ 1117 * 1118 * The left and right ranges are of the form (0,a) and (b,4). 1119 * We first consider splits where b is the lower bound of an entry. 1120 * We iterate through all entries, and for each b, calculate the 1121 * smallest possible a. Then we consider splits where a is the 1122 * upper bound of an entry, and for each a, calculate the greatest 1123 * possible b. 1124 * 1125 * In the above example, the first loop would consider splits: 1126 * b=0: (0,1)-(0,4) 1127 * b=1: (0,1)-(1,4) 1128 * b=2: (0,3)-(2,4) 1129 * 1130 * And the second loop: 1131 * a=1: (0,1)-(1,4) 1132 * a=3: (0,3)-(2,4) 1133 * a=4: (0,4)-(2,4) 1134 *---------- 1135 */ 1136 1137 /* 1138 * Iterate over lower bound of right group, finding smallest possible 1139 * upper bound of left group. 1140 */ 1141 i1 = 0; 1142 i2 = 0; 1143 right_lower = &by_lower[i1].lower; 1144 left_upper = &by_upper[i2].lower; 1145 while (true) 1146 { 1147 /* 1148 * Find next lower bound of right group. 1149 */ 1150 while (i1 < nentries && 1151 range_cmp_bounds(typcache, right_lower, 1152 &by_lower[i1].lower) == 0) 1153 { 1154 if (range_cmp_bounds(typcache, &by_lower[i1].upper, 1155 left_upper) > 0) 1156 left_upper = &by_lower[i1].upper; 1157 i1++; 1158 } 1159 if (i1 >= nentries) 1160 break; 1161 right_lower = &by_lower[i1].lower; 1162 1163 /* 1164 * Find count of ranges which anyway should be placed to the left 1165 * group. 1166 */ 1167 while (i2 < nentries && 1168 range_cmp_bounds(typcache, &by_upper[i2].upper, 1169 left_upper) <= 0) 1170 i2++; 1171 1172 /* 1173 * Consider found split to see if it's better than what we had. 1174 */ 1175 range_gist_consider_split(&context, right_lower, i1, left_upper, i2); 1176 } 1177 1178 /* 1179 * Iterate over upper bound of left group finding greatest possible lower 1180 * bound of right group. 1181 */ 1182 i1 = nentries - 1; 1183 i2 = nentries - 1; 1184 right_lower = &by_lower[i1].upper; 1185 left_upper = &by_upper[i2].upper; 1186 while (true) 1187 { 1188 /* 1189 * Find next upper bound of left group. 1190 */ 1191 while (i2 >= 0 && 1192 range_cmp_bounds(typcache, left_upper, 1193 &by_upper[i2].upper) == 0) 1194 { 1195 if (range_cmp_bounds(typcache, &by_upper[i2].lower, 1196 right_lower) < 0) 1197 right_lower = &by_upper[i2].lower; 1198 i2--; 1199 } 1200 if (i2 < 0) 1201 break; 1202 left_upper = &by_upper[i2].upper; 1203 1204 /* 1205 * Find count of intervals which anyway should be placed to the right 1206 * group. 1207 */ 1208 while (i1 >= 0 && 1209 range_cmp_bounds(typcache, &by_lower[i1].lower, 1210 right_lower) >= 0) 1211 i1--; 1212 1213 /* 1214 * Consider found split to see if it's better than what we had. 1215 */ 1216 range_gist_consider_split(&context, right_lower, i1 + 1, 1217 left_upper, i2 + 1); 1218 } 1219 1220 /* 1221 * If we failed to find any acceptable splits, use trivial split. 1222 */ 1223 if (context.first) 1224 { 1225 range_gist_fallback_split(typcache, entryvec, v); 1226 return; 1227 } 1228 1229 /* 1230 * Ok, we have now selected bounds of the groups. Now we have to 1231 * distribute entries themselves. At first we distribute entries which can 1232 * be placed unambiguously and collect "common entries" to array. 1233 */ 1234 1235 /* Allocate vectors for results */ 1236 v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); 1237 v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); 1238 v->spl_nleft = 0; 1239 v->spl_nright = 0; 1240 1241 /* 1242 * Allocate an array for "common entries" - entries which can be placed to 1243 * either group without affecting overlap along selected axis. 1244 */ 1245 common_entries_count = 0; 1246 common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry)); 1247 1248 /* 1249 * Distribute entries which can be distributed unambiguously, and collect 1250 * common entries. 1251 */ 1252 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) 1253 { 1254 RangeBound lower, 1255 upper; 1256 bool empty; 1257 1258 /* 1259 * Get upper and lower bounds along selected axis. 1260 */ 1261 range = DatumGetRangeTypeP(entryvec->vector[i].key); 1262 1263 range_deserialize(typcache, range, &lower, &upper, &empty); 1264 1265 if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0) 1266 { 1267 /* Fits in the left group */ 1268 if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0) 1269 { 1270 /* Fits also in the right group, so "common entry" */ 1271 common_entries[common_entries_count].index = i; 1272 if (context.has_subtype_diff) 1273 { 1274 /* 1275 * delta = (lower - context.right_lower) - 1276 * (context.left_upper - upper) 1277 */ 1278 common_entries[common_entries_count].delta = 1279 call_subtype_diff(typcache, 1280 lower.val, 1281 context.right_lower->val) - 1282 call_subtype_diff(typcache, 1283 context.left_upper->val, 1284 upper.val); 1285 } 1286 else 1287 { 1288 /* Without subtype_diff, take all deltas as zero */ 1289 common_entries[common_entries_count].delta = 0; 1290 } 1291 common_entries_count++; 1292 } 1293 else 1294 { 1295 /* Doesn't fit to the right group, so join to the left group */ 1296 PLACE_LEFT(range, i); 1297 } 1298 } 1299 else 1300 { 1301 /* 1302 * Each entry should fit on either left or right group. Since this 1303 * entry didn't fit in the left group, it better fit in the right 1304 * group. 1305 */ 1306 Assert(range_cmp_bounds(typcache, &lower, 1307 context.right_lower) >= 0); 1308 PLACE_RIGHT(range, i); 1309 } 1310 } 1311 1312 /* 1313 * Distribute "common entries", if any. 1314 */ 1315 if (common_entries_count > 0) 1316 { 1317 /* 1318 * Sort "common entries" by calculated deltas in order to distribute 1319 * the most ambiguous entries first. 1320 */ 1321 qsort(common_entries, common_entries_count, sizeof(CommonEntry), 1322 common_entry_cmp); 1323 1324 /* 1325 * Distribute "common entries" between groups according to sorting. 1326 */ 1327 for (i = 0; i < common_entries_count; i++) 1328 { 1329 int idx = common_entries[i].index; 1330 1331 range = DatumGetRangeTypeP(entryvec->vector[idx].key); 1332 1333 /* 1334 * Check if we have to place this entry in either group to achieve 1335 * LIMIT_RATIO. 1336 */ 1337 if (i < context.common_left) 1338 PLACE_LEFT(range, idx); 1339 else 1340 PLACE_RIGHT(range, idx); 1341 } 1342 } 1343 1344 v->spl_ldatum = PointerGetDatum(left_range); 1345 v->spl_rdatum = PointerGetDatum(right_range); 1346 } 1347 1348 /* 1349 * Consider replacement of currently selected split with a better one 1350 * during range_gist_double_sorting_split. 1351 */ 1352 static void 1353 range_gist_consider_split(ConsiderSplitContext *context, 1354 RangeBound *right_lower, int min_left_count, 1355 RangeBound *left_upper, int max_left_count) 1356 { 1357 int left_count, 1358 right_count; 1359 float4 ratio, 1360 overlap; 1361 1362 /* 1363 * Calculate entries distribution ratio assuming most uniform distribution 1364 * of common entries. 1365 */ 1366 if (min_left_count >= (context->entries_count + 1) / 2) 1367 left_count = min_left_count; 1368 else if (max_left_count <= context->entries_count / 2) 1369 left_count = max_left_count; 1370 else 1371 left_count = context->entries_count / 2; 1372 right_count = context->entries_count - left_count; 1373 1374 /* 1375 * Ratio of split: quotient between size of smaller group and total 1376 * entries count. This is necessarily 0.5 or less; if it's less than 1377 * LIMIT_RATIO then we will never accept the new split. 1378 */ 1379 ratio = ((float4) Min(left_count, right_count)) / 1380 ((float4) context->entries_count); 1381 1382 if (ratio > LIMIT_RATIO) 1383 { 1384 bool selectthis = false; 1385 1386 /* 1387 * The ratio is acceptable, so compare current split with previously 1388 * selected one. We search for minimal overlap (allowing negative 1389 * values) and minimal ratio secondarily. If subtype_diff is 1390 * available, it's used for overlap measure. Without subtype_diff we 1391 * use number of "common entries" as an overlap measure. 1392 */ 1393 if (context->has_subtype_diff) 1394 overlap = call_subtype_diff(context->typcache, 1395 left_upper->val, 1396 right_lower->val); 1397 else 1398 overlap = max_left_count - min_left_count; 1399 1400 /* If there is no previous selection, select this split */ 1401 if (context->first) 1402 selectthis = true; 1403 else 1404 { 1405 /* 1406 * Choose the new split if it has a smaller overlap, or same 1407 * overlap but better ratio. 1408 */ 1409 if (overlap < context->overlap || 1410 (overlap == context->overlap && ratio > context->ratio)) 1411 selectthis = true; 1412 } 1413 1414 if (selectthis) 1415 { 1416 /* save information about selected split */ 1417 context->first = false; 1418 context->ratio = ratio; 1419 context->overlap = overlap; 1420 context->right_lower = right_lower; 1421 context->left_upper = left_upper; 1422 context->common_left = max_left_count - left_count; 1423 context->common_right = left_count - min_left_count; 1424 } 1425 } 1426 } 1427 1428 /* 1429 * Find class number for range. 1430 * 1431 * The class number is a valid combination of the properties of the 1432 * range. Note: the highest possible number is 8, because CLS_EMPTY 1433 * can't be combined with anything else. 1434 */ 1435 static int 1436 get_gist_range_class(RangeType *range) 1437 { 1438 int classNumber; 1439 char flags; 1440 1441 flags = range_get_flags(range); 1442 if (flags & RANGE_EMPTY) 1443 { 1444 classNumber = CLS_EMPTY; 1445 } 1446 else 1447 { 1448 classNumber = 0; 1449 if (flags & RANGE_LB_INF) 1450 classNumber |= CLS_LOWER_INF; 1451 if (flags & RANGE_UB_INF) 1452 classNumber |= CLS_UPPER_INF; 1453 if (flags & RANGE_CONTAIN_EMPTY) 1454 classNumber |= CLS_CONTAIN_EMPTY; 1455 } 1456 return classNumber; 1457 } 1458 1459 /* 1460 * Comparison function for range_gist_single_sorting_split. 1461 */ 1462 static int 1463 single_bound_cmp(const void *a, const void *b, void *arg) 1464 { 1465 SingleBoundSortItem *i1 = (SingleBoundSortItem *) a; 1466 SingleBoundSortItem *i2 = (SingleBoundSortItem *) b; 1467 TypeCacheEntry *typcache = (TypeCacheEntry *) arg; 1468 1469 return range_cmp_bounds(typcache, &i1->bound, &i2->bound); 1470 } 1471 1472 /* 1473 * Compare NonEmptyRanges by lower bound. 1474 */ 1475 static int 1476 interval_cmp_lower(const void *a, const void *b, void *arg) 1477 { 1478 NonEmptyRange *i1 = (NonEmptyRange *) a; 1479 NonEmptyRange *i2 = (NonEmptyRange *) b; 1480 TypeCacheEntry *typcache = (TypeCacheEntry *) arg; 1481 1482 return range_cmp_bounds(typcache, &i1->lower, &i2->lower); 1483 } 1484 1485 /* 1486 * Compare NonEmptyRanges by upper bound. 1487 */ 1488 static int 1489 interval_cmp_upper(const void *a, const void *b, void *arg) 1490 { 1491 NonEmptyRange *i1 = (NonEmptyRange *) a; 1492 NonEmptyRange *i2 = (NonEmptyRange *) b; 1493 TypeCacheEntry *typcache = (TypeCacheEntry *) arg; 1494 1495 return range_cmp_bounds(typcache, &i1->upper, &i2->upper); 1496 } 1497 1498 /* 1499 * Compare CommonEntrys by their deltas. 1500 */ 1501 static int 1502 common_entry_cmp(const void *i1, const void *i2) 1503 { 1504 double delta1 = ((CommonEntry *) i1)->delta; 1505 double delta2 = ((CommonEntry *) i2)->delta; 1506 1507 if (delta1 < delta2) 1508 return -1; 1509 else if (delta1 > delta2) 1510 return 1; 1511 else 1512 return 0; 1513 } 1514 1515 /* 1516 * Convenience function to invoke type-specific subtype_diff function. 1517 * Caller must have already checked that there is one for the range type. 1518 */ 1519 static float8 1520 call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2) 1521 { 1522 float8 value; 1523 1524 value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo, 1525 typcache->rng_collation, 1526 val1, val2)); 1527 /* Cope with buggy subtype_diff function by returning zero */ 1528 if (value >= 0.0) 1529 return value; 1530 return 0.0; 1531 } 1532