1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_gist.c
4  *	  GiST support for range types.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/adt/rangetypes_gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist.h"
18 #include "access/stratnum.h"
19 #include "utils/builtins.h"
20 #include "utils/datum.h"
21 #include "utils/rangetypes.h"
22 
23 
24 /*
25  * Range class properties used to segregate different classes of ranges in
26  * GiST.  Each unique combination of properties is a class.  CLS_EMPTY cannot
27  * be combined with anything else.
28  */
29 #define CLS_NORMAL			0	/* Ordinary finite range (no bits set) */
30 #define CLS_LOWER_INF		1	/* Lower bound is infinity */
31 #define CLS_UPPER_INF		2	/* Upper bound is infinity */
32 #define CLS_CONTAIN_EMPTY	4	/* Contains underlying empty ranges */
33 #define CLS_EMPTY			8	/* Special class for empty ranges */
34 
35 #define CLS_COUNT			9	/* # of classes; includes all combinations of
36 								 * properties. CLS_EMPTY doesn't combine with
37 								 * anything else, so it's only 2^3 + 1. */
38 
39 /*
40  * Minimum accepted ratio of split for items of the same class.  If the items
41  * are of different classes, we will separate along those lines regardless of
42  * the ratio.
43  */
44 #define LIMIT_RATIO  0.3
45 
46 /* Constants for fixed penalty values */
47 #define INFINITE_BOUND_PENALTY	2.0
48 #define CONTAIN_EMPTY_PENALTY  1.0
49 #define DEFAULT_SUBTYPE_DIFF_PENALTY  1.0
50 
51 /*
52  * Per-item data for range_gist_single_sorting_split.
53  */
54 typedef struct
55 {
56 	int			index;
57 	RangeBound	bound;
58 } SingleBoundSortItem;
59 
60 /* place on left or right side of split? */
61 typedef enum
62 {
63 	SPLIT_LEFT = 0,				/* makes initialization to SPLIT_LEFT easier */
64 	SPLIT_RIGHT
65 } SplitLR;
66 
67 /*
68  * Context for range_gist_consider_split.
69  */
70 typedef struct
71 {
72 	TypeCacheEntry *typcache;	/* typcache for range type */
73 	bool		has_subtype_diff;	/* does it have subtype_diff? */
74 	int			entries_count;	/* total number of entries being split */
75 
76 	/* Information about currently selected split follows */
77 
78 	bool		first;			/* true if no split was selected yet */
79 
80 	RangeBound *left_upper;		/* upper bound of left interval */
81 	RangeBound *right_lower;	/* lower bound of right interval */
82 
83 	float4		ratio;			/* split ratio */
84 	float4		overlap;		/* overlap between left and right predicate */
85 	int			common_left;	/* # common entries destined for each side */
86 	int			common_right;
87 } ConsiderSplitContext;
88 
89 /*
90  * Bounds extracted from a non-empty range, for use in
91  * range_gist_double_sorting_split.
92  */
93 typedef struct
94 {
95 	RangeBound	lower;
96 	RangeBound	upper;
97 } NonEmptyRange;
98 
99 /*
100  * Represents information about an entry that can be placed in either group
101  * without affecting overlap over selected axis ("common entry").
102  */
103 typedef struct
104 {
105 	/* Index of entry in the initial array */
106 	int			index;
107 	/* Delta between closeness of range to each of the two groups */
108 	double		delta;
109 } CommonEntry;
110 
111 /* Helper macros to place an entry in the left or right group during split */
112 /* Note direct access to variables v, typcache, left_range, right_range */
113 #define PLACE_LEFT(range, off)					\
114 	do {										\
115 		if (v->spl_nleft > 0)					\
116 			left_range = range_super_union(typcache, left_range, range); \
117 		else									\
118 			left_range = (range);				\
119 		v->spl_left[v->spl_nleft++] = (off);	\
120 	} while(0)
121 
122 #define PLACE_RIGHT(range, off)					\
123 	do {										\
124 		if (v->spl_nright > 0)					\
125 			right_range = range_super_union(typcache, right_range, range); \
126 		else									\
127 			right_range = (range);				\
128 		v->spl_right[v->spl_nright++] = (off);	\
129 	} while(0)
130 
131 /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
132 #define rangeCopy(r) \
133 	((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
134 											 false, -1)))
135 
136 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1,
137 				  RangeType *r2);
138 static bool range_gist_consistent_int(TypeCacheEntry *typcache,
139 						  StrategyNumber strategy, RangeType *key,
140 						  Datum query);
141 static bool range_gist_consistent_leaf(TypeCacheEntry *typcache,
142 						   StrategyNumber strategy, RangeType *key,
143 						   Datum query);
144 static void range_gist_fallback_split(TypeCacheEntry *typcache,
145 						  GistEntryVector *entryvec,
146 						  GIST_SPLITVEC *v);
147 static void range_gist_class_split(TypeCacheEntry *typcache,
148 					   GistEntryVector *entryvec,
149 					   GIST_SPLITVEC *v,
150 					   SplitLR *classes_groups);
151 static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
152 								GistEntryVector *entryvec,
153 								GIST_SPLITVEC *v,
154 								bool use_upper_bound);
155 static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
156 								GistEntryVector *entryvec,
157 								GIST_SPLITVEC *v);
158 static void range_gist_consider_split(ConsiderSplitContext *context,
159 						  RangeBound *right_lower, int min_left_count,
160 						  RangeBound *left_upper, int max_left_count);
161 static int	get_gist_range_class(RangeType *range);
162 static int	single_bound_cmp(const void *a, const void *b, void *arg);
163 static int	interval_cmp_lower(const void *a, const void *b, void *arg);
164 static int	interval_cmp_upper(const void *a, const void *b, void *arg);
165 static int	common_entry_cmp(const void *i1, const void *i2);
166 static float8 call_subtype_diff(TypeCacheEntry *typcache,
167 				  Datum val1, Datum val2);
168 
169 
170 /* GiST query consistency check */
171 Datum
172 range_gist_consistent(PG_FUNCTION_ARGS)
173 {
174 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
175 	Datum		query = PG_GETARG_DATUM(1);
176 	StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
177 
178 	/* Oid subtype = PG_GETARG_OID(3); */
179 	bool	   *recheck = (bool *) PG_GETARG_POINTER(4);
180 	RangeType  *key = DatumGetRangeTypeP(entry->key);
181 	TypeCacheEntry *typcache;
182 
183 	/* All operators served by this function are exact */
184 	*recheck = false;
185 
186 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
187 
188 	if (GIST_LEAF(entry))
189 		PG_RETURN_BOOL(range_gist_consistent_leaf(typcache, strategy,
190 												  key, query));
191 	else
192 		PG_RETURN_BOOL(range_gist_consistent_int(typcache, strategy,
193 												 key, query));
194 }
195 
196 /* form union range */
197 Datum
198 range_gist_union(PG_FUNCTION_ARGS)
199 {
200 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
201 	GISTENTRY  *ent = entryvec->vector;
202 	RangeType  *result_range;
203 	TypeCacheEntry *typcache;
204 	int			i;
205 
206 	result_range = DatumGetRangeTypeP(ent[0].key);
207 
208 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
209 
210 	for (i = 1; i < entryvec->n; i++)
211 	{
212 		result_range = range_super_union(typcache, result_range,
213 										 DatumGetRangeTypeP(ent[i].key));
214 	}
215 
216 	PG_RETURN_RANGE_P(result_range);
217 }
218 
219 /*
220  * We store ranges as ranges in GiST indexes, so we do not need
221  * compress, decompress, or fetch functions.  Note this implies a limit
222  * on the size of range values that can be indexed.
223  */
224 
225 /*
226  * GiST page split penalty function.
227  *
228  * The penalty function has the following goals (in order from most to least
229  * important):
230  * - Keep normal ranges separate
231  * - Avoid broadening the class of the original predicate
232  * - Avoid broadening (as determined by subtype_diff) the original predicate
233  * - Favor adding ranges to narrower original predicates
234  */
235 Datum
236 range_gist_penalty(PG_FUNCTION_ARGS)
237 {
238 	GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
239 	GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
240 	float	   *penalty = (float *) PG_GETARG_POINTER(2);
241 	RangeType  *orig = DatumGetRangeTypeP(origentry->key);
242 	RangeType  *new = DatumGetRangeTypeP(newentry->key);
243 	TypeCacheEntry *typcache;
244 	bool		has_subtype_diff;
245 	RangeBound	orig_lower,
246 				new_lower,
247 				orig_upper,
248 				new_upper;
249 	bool		orig_empty,
250 				new_empty;
251 
252 	if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
253 		elog(ERROR, "range types do not match");
254 
255 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
256 
257 	has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
258 
259 	range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
260 	range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
261 
262 	/*
263 	 * Distinct branches for handling distinct classes of ranges.  Note that
264 	 * penalty values only need to be commensurate within the same class of
265 	 * new range.
266 	 */
267 	if (new_empty)
268 	{
269 		/* Handle insertion of empty range */
270 		if (orig_empty)
271 		{
272 			/*
273 			 * The best case is to insert it to empty original range.
274 			 * Insertion here means no broadening of original range. Also
275 			 * original range is the most narrow.
276 			 */
277 			*penalty = 0.0;
278 		}
279 		else if (RangeIsOrContainsEmpty(orig))
280 		{
281 			/*
282 			 * The second case is to insert empty range into range which
283 			 * contains at least one underlying empty range.  There is still
284 			 * no broadening of original range, but original range is not as
285 			 * narrow as possible.
286 			 */
287 			*penalty = CONTAIN_EMPTY_PENALTY;
288 		}
289 		else if (orig_lower.infinite && orig_upper.infinite)
290 		{
291 			/*
292 			 * Original range requires broadening.  (-inf; +inf) is most far
293 			 * from normal range in this case.
294 			 */
295 			*penalty = 2 * CONTAIN_EMPTY_PENALTY;
296 		}
297 		else if (orig_lower.infinite || orig_upper.infinite)
298 		{
299 			/*
300 			 * (-inf, x) or (x, +inf) original ranges are closer to normal
301 			 * ranges, so it's worse to mix it with empty ranges.
302 			 */
303 			*penalty = 3 * CONTAIN_EMPTY_PENALTY;
304 		}
305 		else
306 		{
307 			/*
308 			 * The least preferred case is broadening of normal range.
309 			 */
310 			*penalty = 4 * CONTAIN_EMPTY_PENALTY;
311 		}
312 	}
313 	else if (new_lower.infinite && new_upper.infinite)
314 	{
315 		/* Handle insertion of (-inf, +inf) range */
316 		if (orig_lower.infinite && orig_upper.infinite)
317 		{
318 			/*
319 			 * Best case is inserting to (-inf, +inf) original range.
320 			 */
321 			*penalty = 0.0;
322 		}
323 		else if (orig_lower.infinite || orig_upper.infinite)
324 		{
325 			/*
326 			 * When original range is (-inf, x) or (x, +inf) it requires
327 			 * broadening of original range (extension of one bound to
328 			 * infinity).
329 			 */
330 			*penalty = INFINITE_BOUND_PENALTY;
331 		}
332 		else
333 		{
334 			/*
335 			 * Insertion to normal original range is least preferred.
336 			 */
337 			*penalty = 2 * INFINITE_BOUND_PENALTY;
338 		}
339 
340 		if (RangeIsOrContainsEmpty(orig))
341 		{
342 			/*
343 			 * Original range is narrower when it doesn't contain empty
344 			 * ranges. Add additional penalty otherwise.
345 			 */
346 			*penalty += CONTAIN_EMPTY_PENALTY;
347 		}
348 	}
349 	else if (new_lower.infinite)
350 	{
351 		/* Handle insertion of (-inf, x) range */
352 		if (!orig_empty && orig_lower.infinite)
353 		{
354 			if (orig_upper.infinite)
355 			{
356 				/*
357 				 * (-inf, +inf) range won't be extended by insertion of (-inf,
358 				 * x) range. It's a less desirable case than insertion to
359 				 * (-inf, y) original range without extension, because in that
360 				 * case original range is narrower. But we can't express that
361 				 * in single float value.
362 				 */
363 				*penalty = 0.0;
364 			}
365 			else
366 			{
367 				if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
368 				{
369 					/*
370 					 * Get extension of original range using subtype_diff. Use
371 					 * constant if subtype_diff unavailable.
372 					 */
373 					if (has_subtype_diff)
374 						*penalty = call_subtype_diff(typcache,
375 													 new_upper.val,
376 													 orig_upper.val);
377 					else
378 						*penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
379 				}
380 				else
381 				{
382 					/* No extension of original range */
383 					*penalty = 0.0;
384 				}
385 			}
386 		}
387 		else
388 		{
389 			/*
390 			 * If lower bound of original range is not -inf, then extension of
391 			 * it is infinity.
392 			 */
393 			*penalty = get_float4_infinity();
394 		}
395 	}
396 	else if (new_upper.infinite)
397 	{
398 		/* Handle insertion of (x, +inf) range */
399 		if (!orig_empty && orig_upper.infinite)
400 		{
401 			if (orig_lower.infinite)
402 			{
403 				/*
404 				 * (-inf, +inf) range won't be extended by insertion of (x,
405 				 * +inf) range. It's a less desirable case than insertion to
406 				 * (y, +inf) original range without extension, because in that
407 				 * case original range is narrower. But we can't express that
408 				 * in single float value.
409 				 */
410 				*penalty = 0.0;
411 			}
412 			else
413 			{
414 				if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
415 				{
416 					/*
417 					 * Get extension of original range using subtype_diff. Use
418 					 * constant if subtype_diff unavailable.
419 					 */
420 					if (has_subtype_diff)
421 						*penalty = call_subtype_diff(typcache,
422 													 orig_lower.val,
423 													 new_lower.val);
424 					else
425 						*penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
426 				}
427 				else
428 				{
429 					/* No extension of original range */
430 					*penalty = 0.0;
431 				}
432 			}
433 		}
434 		else
435 		{
436 			/*
437 			 * If upper bound of original range is not +inf, then extension of
438 			 * it is infinity.
439 			 */
440 			*penalty = get_float4_infinity();
441 		}
442 	}
443 	else
444 	{
445 		/* Handle insertion of normal (non-empty, non-infinite) range */
446 		if (orig_empty || orig_lower.infinite || orig_upper.infinite)
447 		{
448 			/*
449 			 * Avoid mixing normal ranges with infinite and empty ranges.
450 			 */
451 			*penalty = get_float4_infinity();
452 		}
453 		else
454 		{
455 			/*
456 			 * Calculate extension of original range by calling subtype_diff.
457 			 * Use constant if subtype_diff unavailable.
458 			 */
459 			float8		diff = 0.0;
460 
461 			if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
462 			{
463 				if (has_subtype_diff)
464 					diff += call_subtype_diff(typcache,
465 											  orig_lower.val,
466 											  new_lower.val);
467 				else
468 					diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
469 			}
470 			if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
471 			{
472 				if (has_subtype_diff)
473 					diff += call_subtype_diff(typcache,
474 											  new_upper.val,
475 											  orig_upper.val);
476 				else
477 					diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
478 			}
479 			*penalty = diff;
480 		}
481 	}
482 
483 	PG_RETURN_POINTER(penalty);
484 }
485 
486 /*
487  * The GiST PickSplit method for ranges
488  *
489  * Primarily, we try to segregate ranges of different classes.  If splitting
490  * ranges of the same class, use the appropriate split method for that class.
491  */
492 Datum
493 range_gist_picksplit(PG_FUNCTION_ARGS)
494 {
495 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
496 	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
497 	TypeCacheEntry *typcache;
498 	OffsetNumber i;
499 	RangeType  *pred_left;
500 	int			nbytes;
501 	OffsetNumber maxoff;
502 	int			count_in_classes[CLS_COUNT];
503 	int			j;
504 	int			non_empty_classes_count = 0;
505 	int			biggest_class = -1;
506 	int			biggest_class_count = 0;
507 	int			total_count;
508 
509 	/* use first item to look up range type's info */
510 	pred_left = DatumGetRangeTypeP(entryvec->vector[FirstOffsetNumber].key);
511 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
512 
513 	maxoff = entryvec->n - 1;
514 	nbytes = (maxoff + 1) * sizeof(OffsetNumber);
515 	v->spl_left = (OffsetNumber *) palloc(nbytes);
516 	v->spl_right = (OffsetNumber *) palloc(nbytes);
517 
518 	/*
519 	 * Get count distribution of range classes.
520 	 */
521 	memset(count_in_classes, 0, sizeof(count_in_classes));
522 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
523 	{
524 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
525 
526 		count_in_classes[get_gist_range_class(range)]++;
527 	}
528 
529 	/*
530 	 * Count non-empty classes and find biggest class.
531 	 */
532 	total_count = maxoff;
533 	for (j = 0; j < CLS_COUNT; j++)
534 	{
535 		if (count_in_classes[j] > 0)
536 		{
537 			if (count_in_classes[j] > biggest_class_count)
538 			{
539 				biggest_class_count = count_in_classes[j];
540 				biggest_class = j;
541 			}
542 			non_empty_classes_count++;
543 		}
544 	}
545 
546 	Assert(non_empty_classes_count > 0);
547 
548 	if (non_empty_classes_count == 1)
549 	{
550 		/* One non-empty class, so split inside class */
551 		if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
552 		{
553 			/* double sorting split for normal ranges */
554 			range_gist_double_sorting_split(typcache, entryvec, v);
555 		}
556 		else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
557 		{
558 			/* upper bound sorting split for (-inf, x) ranges */
559 			range_gist_single_sorting_split(typcache, entryvec, v, true);
560 		}
561 		else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
562 		{
563 			/* lower bound sorting split for (x, +inf) ranges */
564 			range_gist_single_sorting_split(typcache, entryvec, v, false);
565 		}
566 		else
567 		{
568 			/* trivial split for all (-inf, +inf) or all empty ranges */
569 			range_gist_fallback_split(typcache, entryvec, v);
570 		}
571 	}
572 	else
573 	{
574 		/*
575 		 * Class based split.
576 		 *
577 		 * To which side of the split should each class go?  Initialize them
578 		 * all to go to the left side.
579 		 */
580 		SplitLR		classes_groups[CLS_COUNT];
581 
582 		memset(classes_groups, 0, sizeof(classes_groups));
583 
584 		if (count_in_classes[CLS_NORMAL] > 0)
585 		{
586 			/* separate normal ranges if any */
587 			classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
588 		}
589 		else
590 		{
591 			/*----------
592 			 * Try to split classes in one of two ways:
593 			 *	1) containing infinities - not containing infinities
594 			 *	2) containing empty - not containing empty
595 			 *
596 			 * Select the way which balances the ranges between left and right
597 			 * the best. If split in these ways is not possible, there are at
598 			 * most 3 classes, so just separate biggest class.
599 			 *----------
600 			 */
601 			int			infCount,
602 						nonInfCount;
603 			int			emptyCount,
604 						nonEmptyCount;
605 
606 			nonInfCount =
607 				count_in_classes[CLS_NORMAL] +
608 				count_in_classes[CLS_CONTAIN_EMPTY] +
609 				count_in_classes[CLS_EMPTY];
610 			infCount = total_count - nonInfCount;
611 
612 			nonEmptyCount =
613 				count_in_classes[CLS_NORMAL] +
614 				count_in_classes[CLS_LOWER_INF] +
615 				count_in_classes[CLS_UPPER_INF] +
616 				count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
617 			emptyCount = total_count - nonEmptyCount;
618 
619 			if (infCount > 0 && nonInfCount > 0 &&
620 				(Abs(infCount - nonInfCount) <=
621 				 Abs(emptyCount - nonEmptyCount)))
622 			{
623 				classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
624 				classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
625 				classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
626 			}
627 			else if (emptyCount > 0 && nonEmptyCount > 0)
628 			{
629 				classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
630 				classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
631 				classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
632 				classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
633 			}
634 			else
635 			{
636 				/*
637 				 * Either total_count == emptyCount or total_count ==
638 				 * infCount.
639 				 */
640 				classes_groups[biggest_class] = SPLIT_RIGHT;
641 			}
642 		}
643 
644 		range_gist_class_split(typcache, entryvec, v, classes_groups);
645 	}
646 
647 	PG_RETURN_POINTER(v);
648 }
649 
650 /* equality comparator for GiST */
651 Datum
652 range_gist_same(PG_FUNCTION_ARGS)
653 {
654 	RangeType  *r1 = PG_GETARG_RANGE_P(0);
655 	RangeType  *r2 = PG_GETARG_RANGE_P(1);
656 	bool	   *result = (bool *) PG_GETARG_POINTER(2);
657 
658 	/*
659 	 * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check
660 	 * that for ourselves.  More generally, if the entries have been properly
661 	 * normalized, then unequal flags bytes must mean unequal ranges ... so
662 	 * let's just test all the flag bits at once.
663 	 */
664 	if (range_get_flags(r1) != range_get_flags(r2))
665 		*result = false;
666 	else
667 	{
668 		TypeCacheEntry *typcache;
669 
670 		typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
671 
672 		*result = range_eq_internal(typcache, r1, r2);
673 	}
674 
675 	PG_RETURN_POINTER(result);
676 }
677 
678 /*
679  *----------------------------------------------------------
680  * STATIC FUNCTIONS
681  *----------------------------------------------------------
682  */
683 
684 /*
685  * Return the smallest range that contains r1 and r2
686  *
687  * This differs from regular range_union in two critical ways:
688  * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb
689  * the intervening values into the result range.
690  * 2. We track whether any empty range has been union'd into the result,
691  * so that contained_by searches can be indexed.  Note that this means
692  * that *all* unions formed within the GiST index must go through here.
693  */
694 static RangeType *
695 range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
696 {
697 	RangeType  *result;
698 	RangeBound	lower1,
699 				lower2;
700 	RangeBound	upper1,
701 				upper2;
702 	bool		empty1,
703 				empty2;
704 	char		flags1,
705 				flags2;
706 	RangeBound *result_lower;
707 	RangeBound *result_upper;
708 
709 	range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
710 	range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
711 	flags1 = range_get_flags(r1);
712 	flags2 = range_get_flags(r2);
713 
714 	if (empty1)
715 	{
716 		/* We can return r2 as-is if it already is or contains empty */
717 		if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
718 			return r2;
719 		/* Else we'd better copy it (modify-in-place isn't safe) */
720 		r2 = rangeCopy(r2);
721 		range_set_contain_empty(r2);
722 		return r2;
723 	}
724 	if (empty2)
725 	{
726 		/* We can return r1 as-is if it already is or contains empty */
727 		if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
728 			return r1;
729 		/* Else we'd better copy it (modify-in-place isn't safe) */
730 		r1 = rangeCopy(r1);
731 		range_set_contain_empty(r1);
732 		return r1;
733 	}
734 
735 	if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
736 		result_lower = &lower1;
737 	else
738 		result_lower = &lower2;
739 
740 	if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
741 		result_upper = &upper1;
742 	else
743 		result_upper = &upper2;
744 
745 	/* optimization to avoid constructing a new range */
746 	if (result_lower == &lower1 && result_upper == &upper1 &&
747 		((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY)))
748 		return r1;
749 	if (result_lower == &lower2 && result_upper == &upper2 &&
750 		((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY)))
751 		return r2;
752 
753 	result = make_range(typcache, result_lower, result_upper, false);
754 
755 	if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY))
756 		range_set_contain_empty(result);
757 
758 	return result;
759 }
760 
761 /*
762  * GiST consistent test on an index internal page
763  */
764 static bool
765 range_gist_consistent_int(TypeCacheEntry *typcache, StrategyNumber strategy,
766 						  RangeType *key, Datum query)
767 {
768 	switch (strategy)
769 	{
770 		case RANGESTRAT_BEFORE:
771 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
772 				return false;
773 			return (!range_overright_internal(typcache, key,
774 											  DatumGetRangeTypeP(query)));
775 		case RANGESTRAT_OVERLEFT:
776 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
777 				return false;
778 			return (!range_after_internal(typcache, key,
779 										  DatumGetRangeTypeP(query)));
780 		case RANGESTRAT_OVERLAPS:
781 			return range_overlaps_internal(typcache, key,
782 										   DatumGetRangeTypeP(query));
783 		case RANGESTRAT_OVERRIGHT:
784 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
785 				return false;
786 			return (!range_before_internal(typcache, key,
787 										   DatumGetRangeTypeP(query)));
788 		case RANGESTRAT_AFTER:
789 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
790 				return false;
791 			return (!range_overleft_internal(typcache, key,
792 											 DatumGetRangeTypeP(query)));
793 		case RANGESTRAT_ADJACENT:
794 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeTypeP(query)))
795 				return false;
796 			if (range_adjacent_internal(typcache, key,
797 										DatumGetRangeTypeP(query)))
798 				return true;
799 			return range_overlaps_internal(typcache, key,
800 										   DatumGetRangeTypeP(query));
801 		case RANGESTRAT_CONTAINS:
802 			return range_contains_internal(typcache, key,
803 										   DatumGetRangeTypeP(query));
804 		case RANGESTRAT_CONTAINED_BY:
805 
806 			/*
807 			 * Empty ranges are contained by anything, so if key is or
808 			 * contains any empty ranges, we must descend into it.  Otherwise,
809 			 * descend only if key overlaps the query.
810 			 */
811 			if (RangeIsOrContainsEmpty(key))
812 				return true;
813 			return range_overlaps_internal(typcache, key,
814 										   DatumGetRangeTypeP(query));
815 		case RANGESTRAT_CONTAINS_ELEM:
816 			return range_contains_elem_internal(typcache, key, query);
817 		case RANGESTRAT_EQ:
818 
819 			/*
820 			 * If query is empty, descend only if the key is or contains any
821 			 * empty ranges.  Otherwise, descend if key contains query.
822 			 */
823 			if (RangeIsEmpty(DatumGetRangeTypeP(query)))
824 				return RangeIsOrContainsEmpty(key);
825 			return range_contains_internal(typcache, key,
826 										   DatumGetRangeTypeP(query));
827 		default:
828 			elog(ERROR, "unrecognized range strategy: %d", strategy);
829 			return false;		/* keep compiler quiet */
830 	}
831 }
832 
833 /*
834  * GiST consistent test on an index leaf page
835  */
836 static bool
837 range_gist_consistent_leaf(TypeCacheEntry *typcache, StrategyNumber strategy,
838 						   RangeType *key, Datum query)
839 {
840 	switch (strategy)
841 	{
842 		case RANGESTRAT_BEFORE:
843 			return range_before_internal(typcache, key,
844 										 DatumGetRangeTypeP(query));
845 		case RANGESTRAT_OVERLEFT:
846 			return range_overleft_internal(typcache, key,
847 										   DatumGetRangeTypeP(query));
848 		case RANGESTRAT_OVERLAPS:
849 			return range_overlaps_internal(typcache, key,
850 										   DatumGetRangeTypeP(query));
851 		case RANGESTRAT_OVERRIGHT:
852 			return range_overright_internal(typcache, key,
853 											DatumGetRangeTypeP(query));
854 		case RANGESTRAT_AFTER:
855 			return range_after_internal(typcache, key,
856 										DatumGetRangeTypeP(query));
857 		case RANGESTRAT_ADJACENT:
858 			return range_adjacent_internal(typcache, key,
859 										   DatumGetRangeTypeP(query));
860 		case RANGESTRAT_CONTAINS:
861 			return range_contains_internal(typcache, key,
862 										   DatumGetRangeTypeP(query));
863 		case RANGESTRAT_CONTAINED_BY:
864 			return range_contained_by_internal(typcache, key,
865 											   DatumGetRangeTypeP(query));
866 		case RANGESTRAT_CONTAINS_ELEM:
867 			return range_contains_elem_internal(typcache, key, query);
868 		case RANGESTRAT_EQ:
869 			return range_eq_internal(typcache, key, DatumGetRangeTypeP(query));
870 		default:
871 			elog(ERROR, "unrecognized range strategy: %d", strategy);
872 			return false;		/* keep compiler quiet */
873 	}
874 }
875 
876 /*
877  * Trivial split: half of entries will be placed on one page
878  * and the other half on the other page.
879  */
880 static void
881 range_gist_fallback_split(TypeCacheEntry *typcache,
882 						  GistEntryVector *entryvec,
883 						  GIST_SPLITVEC *v)
884 {
885 	RangeType  *left_range = NULL;
886 	RangeType  *right_range = NULL;
887 	OffsetNumber i,
888 				maxoff,
889 				split_idx;
890 
891 	maxoff = entryvec->n - 1;
892 	/* Split entries before this to left page, after to right: */
893 	split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
894 
895 	v->spl_nleft = 0;
896 	v->spl_nright = 0;
897 	for (i = FirstOffsetNumber; i <= maxoff; i++)
898 	{
899 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
900 
901 		if (i < split_idx)
902 			PLACE_LEFT(range, i);
903 		else
904 			PLACE_RIGHT(range, i);
905 	}
906 
907 	v->spl_ldatum = RangeTypePGetDatum(left_range);
908 	v->spl_rdatum = RangeTypePGetDatum(right_range);
909 }
910 
911 /*
912  * Split based on classes of ranges.
913  *
914  * See get_gist_range_class for class definitions.
915  * classes_groups is an array of length CLS_COUNT indicating the side of the
916  * split to which each class should go.
917  */
918 static void
919 range_gist_class_split(TypeCacheEntry *typcache,
920 					   GistEntryVector *entryvec,
921 					   GIST_SPLITVEC *v,
922 					   SplitLR *classes_groups)
923 {
924 	RangeType  *left_range = NULL;
925 	RangeType  *right_range = NULL;
926 	OffsetNumber i,
927 				maxoff;
928 
929 	maxoff = entryvec->n - 1;
930 
931 	v->spl_nleft = 0;
932 	v->spl_nright = 0;
933 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
934 	{
935 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
936 		int			class;
937 
938 		/* Get class of range */
939 		class = get_gist_range_class(range);
940 
941 		/* Place range to appropriate page */
942 		if (classes_groups[class] == SPLIT_LEFT)
943 			PLACE_LEFT(range, i);
944 		else
945 		{
946 			Assert(classes_groups[class] == SPLIT_RIGHT);
947 			PLACE_RIGHT(range, i);
948 		}
949 	}
950 
951 	v->spl_ldatum = RangeTypePGetDatum(left_range);
952 	v->spl_rdatum = RangeTypePGetDatum(right_range);
953 }
954 
955 /*
956  * Sorting based split. First half of entries according to the sort will be
957  * placed to one page, and second half of entries will be placed to other
958  * page. use_upper_bound parameter indicates whether to use upper or lower
959  * bound for sorting.
960  */
961 static void
962 range_gist_single_sorting_split(TypeCacheEntry *typcache,
963 								GistEntryVector *entryvec,
964 								GIST_SPLITVEC *v,
965 								bool use_upper_bound)
966 {
967 	SingleBoundSortItem *sortItems;
968 	RangeType  *left_range = NULL;
969 	RangeType  *right_range = NULL;
970 	OffsetNumber i,
971 				maxoff,
972 				split_idx;
973 
974 	maxoff = entryvec->n - 1;
975 
976 	sortItems = (SingleBoundSortItem *)
977 		palloc(maxoff * sizeof(SingleBoundSortItem));
978 
979 	/*
980 	 * Prepare auxiliary array and sort the values.
981 	 */
982 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
983 	{
984 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
985 		RangeBound	bound2;
986 		bool		empty;
987 
988 		sortItems[i - 1].index = i;
989 		/* Put appropriate bound into array */
990 		if (use_upper_bound)
991 			range_deserialize(typcache, range, &bound2,
992 							  &sortItems[i - 1].bound, &empty);
993 		else
994 			range_deserialize(typcache, range, &sortItems[i - 1].bound,
995 							  &bound2, &empty);
996 		Assert(!empty);
997 	}
998 
999 	qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
1000 			  single_bound_cmp, typcache);
1001 
1002 	split_idx = maxoff / 2;
1003 
1004 	v->spl_nleft = 0;
1005 	v->spl_nright = 0;
1006 
1007 	for (i = 0; i < maxoff; i++)
1008 	{
1009 		int			idx = sortItems[i].index;
1010 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[idx].key);
1011 
1012 		if (i < split_idx)
1013 			PLACE_LEFT(range, idx);
1014 		else
1015 			PLACE_RIGHT(range, idx);
1016 	}
1017 
1018 	v->spl_ldatum = RangeTypePGetDatum(left_range);
1019 	v->spl_rdatum = RangeTypePGetDatum(right_range);
1020 }
1021 
1022 /*
1023  * Double sorting split algorithm.
1024  *
1025  * The algorithm considers dividing ranges into two groups. The first (left)
1026  * group contains general left bound. The second (right) group contains
1027  * general right bound. The challenge is to find upper bound of left group
1028  * and lower bound of right group so that overlap of groups is minimal and
1029  * ratio of distribution is acceptable. Algorithm finds for each lower bound of
1030  * right group minimal upper bound of left group, and for each upper bound of
1031  * left group maximal lower bound of right group. For each found pair
1032  * range_gist_consider_split considers replacement of currently selected
1033  * split with the new one.
1034  *
1035  * After that, all the entries are divided into three groups:
1036  * 1) Entries which should be placed to the left group
1037  * 2) Entries which should be placed to the right group
1038  * 3) "Common entries" which can be placed to either group without affecting
1039  *	  amount of overlap.
1040  *
1041  * The common ranges are distributed by difference of distance from lower
1042  * bound of common range to lower bound of right group and distance from upper
1043  * bound of common range to upper bound of left group.
1044  *
1045  * For details see:
1046  * "A new double sorting-based node splitting algorithm for R-tree",
1047  * A. Korotkov
1048  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
1049  */
1050 static void
1051 range_gist_double_sorting_split(TypeCacheEntry *typcache,
1052 								GistEntryVector *entryvec,
1053 								GIST_SPLITVEC *v)
1054 {
1055 	ConsiderSplitContext context;
1056 	OffsetNumber i,
1057 				maxoff;
1058 	RangeType  *range,
1059 			   *left_range = NULL,
1060 			   *right_range = NULL;
1061 	int			common_entries_count;
1062 	NonEmptyRange *by_lower,
1063 			   *by_upper;
1064 	CommonEntry *common_entries;
1065 	int			nentries,
1066 				i1,
1067 				i2;
1068 	RangeBound *right_lower,
1069 			   *left_upper;
1070 
1071 	memset(&context, 0, sizeof(ConsiderSplitContext));
1072 	context.typcache = typcache;
1073 	context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
1074 
1075 	maxoff = entryvec->n - 1;
1076 	nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
1077 	context.first = true;
1078 
1079 	/* Allocate arrays for sorted range bounds */
1080 	by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1081 	by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1082 
1083 	/* Fill arrays of bounds */
1084 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1085 	{
1086 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
1087 		bool		empty;
1088 
1089 		range_deserialize(typcache, range,
1090 						  &by_lower[i - FirstOffsetNumber].lower,
1091 						  &by_lower[i - FirstOffsetNumber].upper,
1092 						  &empty);
1093 		Assert(!empty);
1094 	}
1095 
1096 	/*
1097 	 * Make two arrays of range bounds: one sorted by lower bound and another
1098 	 * sorted by upper bound.
1099 	 */
1100 	memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
1101 	qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
1102 			  interval_cmp_lower, typcache);
1103 	qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
1104 			  interval_cmp_upper, typcache);
1105 
1106 	/*----------
1107 	 * The goal is to form a left and right range, so that every entry
1108 	 * range is contained by either left or right interval (or both).
1109 	 *
1110 	 * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
1111 	 *
1112 	 * 0 1 2 3 4
1113 	 * +-+
1114 	 *	 +---+
1115 	 *	   +-+
1116 	 *	   +---+
1117 	 *
1118 	 * The left and right ranges are of the form (0,a) and (b,4).
1119 	 * We first consider splits where b is the lower bound of an entry.
1120 	 * We iterate through all entries, and for each b, calculate the
1121 	 * smallest possible a. Then we consider splits where a is the
1122 	 * upper bound of an entry, and for each a, calculate the greatest
1123 	 * possible b.
1124 	 *
1125 	 * In the above example, the first loop would consider splits:
1126 	 * b=0: (0,1)-(0,4)
1127 	 * b=1: (0,1)-(1,4)
1128 	 * b=2: (0,3)-(2,4)
1129 	 *
1130 	 * And the second loop:
1131 	 * a=1: (0,1)-(1,4)
1132 	 * a=3: (0,3)-(2,4)
1133 	 * a=4: (0,4)-(2,4)
1134 	 *----------
1135 	 */
1136 
1137 	/*
1138 	 * Iterate over lower bound of right group, finding smallest possible
1139 	 * upper bound of left group.
1140 	 */
1141 	i1 = 0;
1142 	i2 = 0;
1143 	right_lower = &by_lower[i1].lower;
1144 	left_upper = &by_upper[i2].lower;
1145 	while (true)
1146 	{
1147 		/*
1148 		 * Find next lower bound of right group.
1149 		 */
1150 		while (i1 < nentries &&
1151 			   range_cmp_bounds(typcache, right_lower,
1152 								&by_lower[i1].lower) == 0)
1153 		{
1154 			if (range_cmp_bounds(typcache, &by_lower[i1].upper,
1155 								 left_upper) > 0)
1156 				left_upper = &by_lower[i1].upper;
1157 			i1++;
1158 		}
1159 		if (i1 >= nentries)
1160 			break;
1161 		right_lower = &by_lower[i1].lower;
1162 
1163 		/*
1164 		 * Find count of ranges which anyway should be placed to the left
1165 		 * group.
1166 		 */
1167 		while (i2 < nentries &&
1168 			   range_cmp_bounds(typcache, &by_upper[i2].upper,
1169 								left_upper) <= 0)
1170 			i2++;
1171 
1172 		/*
1173 		 * Consider found split to see if it's better than what we had.
1174 		 */
1175 		range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
1176 	}
1177 
1178 	/*
1179 	 * Iterate over upper bound of left group finding greatest possible lower
1180 	 * bound of right group.
1181 	 */
1182 	i1 = nentries - 1;
1183 	i2 = nentries - 1;
1184 	right_lower = &by_lower[i1].upper;
1185 	left_upper = &by_upper[i2].upper;
1186 	while (true)
1187 	{
1188 		/*
1189 		 * Find next upper bound of left group.
1190 		 */
1191 		while (i2 >= 0 &&
1192 			   range_cmp_bounds(typcache, left_upper,
1193 								&by_upper[i2].upper) == 0)
1194 		{
1195 			if (range_cmp_bounds(typcache, &by_upper[i2].lower,
1196 								 right_lower) < 0)
1197 				right_lower = &by_upper[i2].lower;
1198 			i2--;
1199 		}
1200 		if (i2 < 0)
1201 			break;
1202 		left_upper = &by_upper[i2].upper;
1203 
1204 		/*
1205 		 * Find count of intervals which anyway should be placed to the right
1206 		 * group.
1207 		 */
1208 		while (i1 >= 0 &&
1209 			   range_cmp_bounds(typcache, &by_lower[i1].lower,
1210 								right_lower) >= 0)
1211 			i1--;
1212 
1213 		/*
1214 		 * Consider found split to see if it's better than what we had.
1215 		 */
1216 		range_gist_consider_split(&context, right_lower, i1 + 1,
1217 								  left_upper, i2 + 1);
1218 	}
1219 
1220 	/*
1221 	 * If we failed to find any acceptable splits, use trivial split.
1222 	 */
1223 	if (context.first)
1224 	{
1225 		range_gist_fallback_split(typcache, entryvec, v);
1226 		return;
1227 	}
1228 
1229 	/*
1230 	 * Ok, we have now selected bounds of the groups. Now we have to
1231 	 * distribute entries themselves. At first we distribute entries which can
1232 	 * be placed unambiguously and collect "common entries" to array.
1233 	 */
1234 
1235 	/* Allocate vectors for results */
1236 	v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1237 	v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1238 	v->spl_nleft = 0;
1239 	v->spl_nright = 0;
1240 
1241 	/*
1242 	 * Allocate an array for "common entries" - entries which can be placed to
1243 	 * either group without affecting overlap along selected axis.
1244 	 */
1245 	common_entries_count = 0;
1246 	common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
1247 
1248 	/*
1249 	 * Distribute entries which can be distributed unambiguously, and collect
1250 	 * common entries.
1251 	 */
1252 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1253 	{
1254 		RangeBound	lower,
1255 					upper;
1256 		bool		empty;
1257 
1258 		/*
1259 		 * Get upper and lower bounds along selected axis.
1260 		 */
1261 		range = DatumGetRangeTypeP(entryvec->vector[i].key);
1262 
1263 		range_deserialize(typcache, range, &lower, &upper, &empty);
1264 
1265 		if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
1266 		{
1267 			/* Fits in the left group */
1268 			if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
1269 			{
1270 				/* Fits also in the right group, so "common entry" */
1271 				common_entries[common_entries_count].index = i;
1272 				if (context.has_subtype_diff)
1273 				{
1274 					/*
1275 					 * delta = (lower - context.right_lower) -
1276 					 * (context.left_upper - upper)
1277 					 */
1278 					common_entries[common_entries_count].delta =
1279 						call_subtype_diff(typcache,
1280 										  lower.val,
1281 										  context.right_lower->val) -
1282 						call_subtype_diff(typcache,
1283 										  context.left_upper->val,
1284 										  upper.val);
1285 				}
1286 				else
1287 				{
1288 					/* Without subtype_diff, take all deltas as zero */
1289 					common_entries[common_entries_count].delta = 0;
1290 				}
1291 				common_entries_count++;
1292 			}
1293 			else
1294 			{
1295 				/* Doesn't fit to the right group, so join to the left group */
1296 				PLACE_LEFT(range, i);
1297 			}
1298 		}
1299 		else
1300 		{
1301 			/*
1302 			 * Each entry should fit on either left or right group. Since this
1303 			 * entry didn't fit in the left group, it better fit in the right
1304 			 * group.
1305 			 */
1306 			Assert(range_cmp_bounds(typcache, &lower,
1307 									context.right_lower) >= 0);
1308 			PLACE_RIGHT(range, i);
1309 		}
1310 	}
1311 
1312 	/*
1313 	 * Distribute "common entries", if any.
1314 	 */
1315 	if (common_entries_count > 0)
1316 	{
1317 		/*
1318 		 * Sort "common entries" by calculated deltas in order to distribute
1319 		 * the most ambiguous entries first.
1320 		 */
1321 		qsort(common_entries, common_entries_count, sizeof(CommonEntry),
1322 			  common_entry_cmp);
1323 
1324 		/*
1325 		 * Distribute "common entries" between groups according to sorting.
1326 		 */
1327 		for (i = 0; i < common_entries_count; i++)
1328 		{
1329 			int			idx = common_entries[i].index;
1330 
1331 			range = DatumGetRangeTypeP(entryvec->vector[idx].key);
1332 
1333 			/*
1334 			 * Check if we have to place this entry in either group to achieve
1335 			 * LIMIT_RATIO.
1336 			 */
1337 			if (i < context.common_left)
1338 				PLACE_LEFT(range, idx);
1339 			else
1340 				PLACE_RIGHT(range, idx);
1341 		}
1342 	}
1343 
1344 	v->spl_ldatum = PointerGetDatum(left_range);
1345 	v->spl_rdatum = PointerGetDatum(right_range);
1346 }
1347 
1348 /*
1349  * Consider replacement of currently selected split with a better one
1350  * during range_gist_double_sorting_split.
1351  */
1352 static void
1353 range_gist_consider_split(ConsiderSplitContext *context,
1354 						  RangeBound *right_lower, int min_left_count,
1355 						  RangeBound *left_upper, int max_left_count)
1356 {
1357 	int			left_count,
1358 				right_count;
1359 	float4		ratio,
1360 				overlap;
1361 
1362 	/*
1363 	 * Calculate entries distribution ratio assuming most uniform distribution
1364 	 * of common entries.
1365 	 */
1366 	if (min_left_count >= (context->entries_count + 1) / 2)
1367 		left_count = min_left_count;
1368 	else if (max_left_count <= context->entries_count / 2)
1369 		left_count = max_left_count;
1370 	else
1371 		left_count = context->entries_count / 2;
1372 	right_count = context->entries_count - left_count;
1373 
1374 	/*
1375 	 * Ratio of split: quotient between size of smaller group and total
1376 	 * entries count.  This is necessarily 0.5 or less; if it's less than
1377 	 * LIMIT_RATIO then we will never accept the new split.
1378 	 */
1379 	ratio = ((float4) Min(left_count, right_count)) /
1380 		((float4) context->entries_count);
1381 
1382 	if (ratio > LIMIT_RATIO)
1383 	{
1384 		bool		selectthis = false;
1385 
1386 		/*
1387 		 * The ratio is acceptable, so compare current split with previously
1388 		 * selected one. We search for minimal overlap (allowing negative
1389 		 * values) and minimal ratio secondarily.  If subtype_diff is
1390 		 * available, it's used for overlap measure.  Without subtype_diff we
1391 		 * use number of "common entries" as an overlap measure.
1392 		 */
1393 		if (context->has_subtype_diff)
1394 			overlap = call_subtype_diff(context->typcache,
1395 										left_upper->val,
1396 										right_lower->val);
1397 		else
1398 			overlap = max_left_count - min_left_count;
1399 
1400 		/* If there is no previous selection, select this split */
1401 		if (context->first)
1402 			selectthis = true;
1403 		else
1404 		{
1405 			/*
1406 			 * Choose the new split if it has a smaller overlap, or same
1407 			 * overlap but better ratio.
1408 			 */
1409 			if (overlap < context->overlap ||
1410 				(overlap == context->overlap && ratio > context->ratio))
1411 				selectthis = true;
1412 		}
1413 
1414 		if (selectthis)
1415 		{
1416 			/* save information about selected split */
1417 			context->first = false;
1418 			context->ratio = ratio;
1419 			context->overlap = overlap;
1420 			context->right_lower = right_lower;
1421 			context->left_upper = left_upper;
1422 			context->common_left = max_left_count - left_count;
1423 			context->common_right = left_count - min_left_count;
1424 		}
1425 	}
1426 }
1427 
1428 /*
1429  * Find class number for range.
1430  *
1431  * The class number is a valid combination of the properties of the
1432  * range.  Note: the highest possible number is 8, because CLS_EMPTY
1433  * can't be combined with anything else.
1434  */
1435 static int
1436 get_gist_range_class(RangeType *range)
1437 {
1438 	int			classNumber;
1439 	char		flags;
1440 
1441 	flags = range_get_flags(range);
1442 	if (flags & RANGE_EMPTY)
1443 	{
1444 		classNumber = CLS_EMPTY;
1445 	}
1446 	else
1447 	{
1448 		classNumber = 0;
1449 		if (flags & RANGE_LB_INF)
1450 			classNumber |= CLS_LOWER_INF;
1451 		if (flags & RANGE_UB_INF)
1452 			classNumber |= CLS_UPPER_INF;
1453 		if (flags & RANGE_CONTAIN_EMPTY)
1454 			classNumber |= CLS_CONTAIN_EMPTY;
1455 	}
1456 	return classNumber;
1457 }
1458 
1459 /*
1460  * Comparison function for range_gist_single_sorting_split.
1461  */
1462 static int
1463 single_bound_cmp(const void *a, const void *b, void *arg)
1464 {
1465 	SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
1466 	SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
1467 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1468 
1469 	return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
1470 }
1471 
1472 /*
1473  * Compare NonEmptyRanges by lower bound.
1474  */
1475 static int
1476 interval_cmp_lower(const void *a, const void *b, void *arg)
1477 {
1478 	NonEmptyRange *i1 = (NonEmptyRange *) a;
1479 	NonEmptyRange *i2 = (NonEmptyRange *) b;
1480 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1481 
1482 	return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
1483 }
1484 
1485 /*
1486  * Compare NonEmptyRanges by upper bound.
1487  */
1488 static int
1489 interval_cmp_upper(const void *a, const void *b, void *arg)
1490 {
1491 	NonEmptyRange *i1 = (NonEmptyRange *) a;
1492 	NonEmptyRange *i2 = (NonEmptyRange *) b;
1493 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1494 
1495 	return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
1496 }
1497 
1498 /*
1499  * Compare CommonEntrys by their deltas.
1500  */
1501 static int
1502 common_entry_cmp(const void *i1, const void *i2)
1503 {
1504 	double		delta1 = ((CommonEntry *) i1)->delta;
1505 	double		delta2 = ((CommonEntry *) i2)->delta;
1506 
1507 	if (delta1 < delta2)
1508 		return -1;
1509 	else if (delta1 > delta2)
1510 		return 1;
1511 	else
1512 		return 0;
1513 }
1514 
1515 /*
1516  * Convenience function to invoke type-specific subtype_diff function.
1517  * Caller must have already checked that there is one for the range type.
1518  */
1519 static float8
1520 call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
1521 {
1522 	float8		value;
1523 
1524 	value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
1525 											 typcache->rng_collation,
1526 											 val1, val2));
1527 	/* Cope with buggy subtype_diff function by returning zero */
1528 	if (value >= 0.0)
1529 		return value;
1530 	return 0.0;
1531 }
1532