1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_gist.c
4  *	  GiST support for range types.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/adt/rangetypes_gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist.h"
18 #include "access/stratnum.h"
19 #include "utils/builtins.h"
20 #include "utils/datum.h"
21 #include "utils/rangetypes.h"
22 
23 
24 /*
25  * Range class properties used to segregate different classes of ranges in
26  * GiST.  Each unique combination of properties is a class.  CLS_EMPTY cannot
27  * be combined with anything else.
28  */
29 #define CLS_NORMAL			0	/* Ordinary finite range (no bits set) */
30 #define CLS_LOWER_INF		1	/* Lower bound is infinity */
31 #define CLS_UPPER_INF		2	/* Upper bound is infinity */
32 #define CLS_CONTAIN_EMPTY	4	/* Contains underlying empty ranges */
33 #define CLS_EMPTY			8	/* Special class for empty ranges */
34 
35 #define CLS_COUNT			9	/* # of classes; includes all combinations of
36 								 * properties. CLS_EMPTY doesn't combine with
37 								 * anything else, so it's only 2^3 + 1. */
38 
39 /*
40  * Minimum accepted ratio of split for items of the same class.  If the items
41  * are of different classes, we will separate along those lines regardless of
42  * the ratio.
43  */
44 #define LIMIT_RATIO  0.3
45 
46 /* Constants for fixed penalty values */
47 #define INFINITE_BOUND_PENALTY	2.0
48 #define CONTAIN_EMPTY_PENALTY  1.0
49 #define DEFAULT_SUBTYPE_DIFF_PENALTY  1.0
50 
51 /*
52  * Per-item data for range_gist_single_sorting_split.
53  */
54 typedef struct
55 {
56 	int			index;
57 	RangeBound	bound;
58 } SingleBoundSortItem;
59 
60 /* place on left or right side of split? */
61 typedef enum
62 {
63 	SPLIT_LEFT = 0,				/* makes initialization to SPLIT_LEFT easier */
64 	SPLIT_RIGHT
65 } SplitLR;
66 
67 /*
68  * Context for range_gist_consider_split.
69  */
70 typedef struct
71 {
72 	TypeCacheEntry *typcache;	/* typcache for range type */
73 	bool		has_subtype_diff;		/* does it have subtype_diff? */
74 	int			entries_count;	/* total number of entries being split */
75 
76 	/* Information about currently selected split follows */
77 
78 	bool		first;			/* true if no split was selected yet */
79 
80 	RangeBound *left_upper;		/* upper bound of left interval */
81 	RangeBound *right_lower;	/* lower bound of right interval */
82 
83 	float4		ratio;			/* split ratio */
84 	float4		overlap;		/* overlap between left and right predicate */
85 	int			common_left;	/* # common entries destined for each side */
86 	int			common_right;
87 } ConsiderSplitContext;
88 
89 /*
90  * Bounds extracted from a non-empty range, for use in
91  * range_gist_double_sorting_split.
92  */
93 typedef struct
94 {
95 	RangeBound	lower;
96 	RangeBound	upper;
97 } NonEmptyRange;
98 
99 /*
100  * Represents information about an entry that can be placed in either group
101  * without affecting overlap over selected axis ("common entry").
102  */
103 typedef struct
104 {
105 	/* Index of entry in the initial array */
106 	int			index;
107 	/* Delta between closeness of range to each of the two groups */
108 	double		delta;
109 } CommonEntry;
110 
111 /* Helper macros to place an entry in the left or right group during split */
112 /* Note direct access to variables v, typcache, left_range, right_range */
113 #define PLACE_LEFT(range, off)					\
114 	do {										\
115 		if (v->spl_nleft > 0)					\
116 			left_range = range_super_union(typcache, left_range, range); \
117 		else									\
118 			left_range = (range);				\
119 		v->spl_left[v->spl_nleft++] = (off);	\
120 	} while(0)
121 
122 #define PLACE_RIGHT(range, off)					\
123 	do {										\
124 		if (v->spl_nright > 0)					\
125 			right_range = range_super_union(typcache, right_range, range); \
126 		else									\
127 			right_range = (range);				\
128 		v->spl_right[v->spl_nright++] = (off);	\
129 	} while(0)
130 
131 /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
132 #define rangeCopy(r) \
133 	((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
134 											 false, -1)))
135 
136 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1,
137 				  RangeType *r2);
138 static bool range_gist_consistent_int(TypeCacheEntry *typcache,
139 						  StrategyNumber strategy, RangeType *key,
140 						  Datum query);
141 static bool range_gist_consistent_leaf(TypeCacheEntry *typcache,
142 						   StrategyNumber strategy, RangeType *key,
143 						   Datum query);
144 static void range_gist_fallback_split(TypeCacheEntry *typcache,
145 						  GistEntryVector *entryvec,
146 						  GIST_SPLITVEC *v);
147 static void range_gist_class_split(TypeCacheEntry *typcache,
148 					   GistEntryVector *entryvec,
149 					   GIST_SPLITVEC *v,
150 					   SplitLR *classes_groups);
151 static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
152 								GistEntryVector *entryvec,
153 								GIST_SPLITVEC *v,
154 								bool use_upper_bound);
155 static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
156 								GistEntryVector *entryvec,
157 								GIST_SPLITVEC *v);
158 static void range_gist_consider_split(ConsiderSplitContext *context,
159 						  RangeBound *right_lower, int min_left_count,
160 						  RangeBound *left_upper, int max_left_count);
161 static int	get_gist_range_class(RangeType *range);
162 static int	single_bound_cmp(const void *a, const void *b, void *arg);
163 static int	interval_cmp_lower(const void *a, const void *b, void *arg);
164 static int	interval_cmp_upper(const void *a, const void *b, void *arg);
165 static int	common_entry_cmp(const void *i1, const void *i2);
166 static float8 call_subtype_diff(TypeCacheEntry *typcache,
167 				  Datum val1, Datum val2);
168 
169 
170 /* GiST query consistency check */
171 Datum
range_gist_consistent(PG_FUNCTION_ARGS)172 range_gist_consistent(PG_FUNCTION_ARGS)
173 {
174 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
175 	Datum		query = PG_GETARG_DATUM(1);
176 	StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
177 
178 	/* Oid subtype = PG_GETARG_OID(3); */
179 	bool	   *recheck = (bool *) PG_GETARG_POINTER(4);
180 	RangeType  *key = DatumGetRangeType(entry->key);
181 	TypeCacheEntry *typcache;
182 
183 	/* All operators served by this function are exact */
184 	*recheck = false;
185 
186 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
187 
188 	if (GIST_LEAF(entry))
189 		PG_RETURN_BOOL(range_gist_consistent_leaf(typcache, strategy,
190 												  key, query));
191 	else
192 		PG_RETURN_BOOL(range_gist_consistent_int(typcache, strategy,
193 												 key, query));
194 }
195 
196 /* form union range */
197 Datum
range_gist_union(PG_FUNCTION_ARGS)198 range_gist_union(PG_FUNCTION_ARGS)
199 {
200 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
201 	GISTENTRY  *ent = entryvec->vector;
202 	RangeType  *result_range;
203 	TypeCacheEntry *typcache;
204 	int			i;
205 
206 	result_range = DatumGetRangeType(ent[0].key);
207 
208 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
209 
210 	for (i = 1; i < entryvec->n; i++)
211 	{
212 		result_range = range_super_union(typcache, result_range,
213 										 DatumGetRangeType(ent[i].key));
214 	}
215 
216 	PG_RETURN_RANGE(result_range);
217 }
218 
219 /* compress, decompress, fetch are no-ops */
220 Datum
range_gist_compress(PG_FUNCTION_ARGS)221 range_gist_compress(PG_FUNCTION_ARGS)
222 {
223 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
224 
225 	PG_RETURN_POINTER(entry);
226 }
227 
228 Datum
range_gist_decompress(PG_FUNCTION_ARGS)229 range_gist_decompress(PG_FUNCTION_ARGS)
230 {
231 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
232 
233 	PG_RETURN_POINTER(entry);
234 }
235 
236 Datum
range_gist_fetch(PG_FUNCTION_ARGS)237 range_gist_fetch(PG_FUNCTION_ARGS)
238 {
239 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
240 
241 	PG_RETURN_POINTER(entry);
242 }
243 
244 /*
245  * GiST page split penalty function.
246  *
247  * The penalty function has the following goals (in order from most to least
248  * important):
249  * - Keep normal ranges separate
250  * - Avoid broadening the class of the original predicate
251  * - Avoid broadening (as determined by subtype_diff) the original predicate
252  * - Favor adding ranges to narrower original predicates
253  */
254 Datum
range_gist_penalty(PG_FUNCTION_ARGS)255 range_gist_penalty(PG_FUNCTION_ARGS)
256 {
257 	GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
258 	GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
259 	float	   *penalty = (float *) PG_GETARG_POINTER(2);
260 	RangeType  *orig = DatumGetRangeType(origentry->key);
261 	RangeType  *new = DatumGetRangeType(newentry->key);
262 	TypeCacheEntry *typcache;
263 	bool		has_subtype_diff;
264 	RangeBound	orig_lower,
265 				new_lower,
266 				orig_upper,
267 				new_upper;
268 	bool		orig_empty,
269 				new_empty;
270 
271 	if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
272 		elog(ERROR, "range types do not match");
273 
274 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
275 
276 	has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
277 
278 	range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
279 	range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
280 
281 	/*
282 	 * Distinct branches for handling distinct classes of ranges.  Note that
283 	 * penalty values only need to be commensurate within the same class of
284 	 * new range.
285 	 */
286 	if (new_empty)
287 	{
288 		/* Handle insertion of empty range */
289 		if (orig_empty)
290 		{
291 			/*
292 			 * The best case is to insert it to empty original range.
293 			 * Insertion here means no broadening of original range. Also
294 			 * original range is the most narrow.
295 			 */
296 			*penalty = 0.0;
297 		}
298 		else if (RangeIsOrContainsEmpty(orig))
299 		{
300 			/*
301 			 * The second case is to insert empty range into range which
302 			 * contains at least one underlying empty range.  There is still
303 			 * no broadening of original range, but original range is not as
304 			 * narrow as possible.
305 			 */
306 			*penalty = CONTAIN_EMPTY_PENALTY;
307 		}
308 		else if (orig_lower.infinite && orig_upper.infinite)
309 		{
310 			/*
311 			 * Original range requires broadening.  (-inf; +inf) is most far
312 			 * from normal range in this case.
313 			 */
314 			*penalty = 2 * CONTAIN_EMPTY_PENALTY;
315 		}
316 		else if (orig_lower.infinite || orig_upper.infinite)
317 		{
318 			/*
319 			 * (-inf, x) or (x, +inf) original ranges are closer to normal
320 			 * ranges, so it's worse to mix it with empty ranges.
321 			 */
322 			*penalty = 3 * CONTAIN_EMPTY_PENALTY;
323 		}
324 		else
325 		{
326 			/*
327 			 * The least preferred case is broadening of normal range.
328 			 */
329 			*penalty = 4 * CONTAIN_EMPTY_PENALTY;
330 		}
331 	}
332 	else if (new_lower.infinite && new_upper.infinite)
333 	{
334 		/* Handle insertion of (-inf, +inf) range */
335 		if (orig_lower.infinite && orig_upper.infinite)
336 		{
337 			/*
338 			 * Best case is inserting to (-inf, +inf) original range.
339 			 */
340 			*penalty = 0.0;
341 		}
342 		else if (orig_lower.infinite || orig_upper.infinite)
343 		{
344 			/*
345 			 * When original range is (-inf, x) or (x, +inf) it requires
346 			 * broadening of original range (extension of one bound to
347 			 * infinity).
348 			 */
349 			*penalty = INFINITE_BOUND_PENALTY;
350 		}
351 		else
352 		{
353 			/*
354 			 * Insertion to normal original range is least preferred.
355 			 */
356 			*penalty = 2 * INFINITE_BOUND_PENALTY;
357 		}
358 
359 		if (RangeIsOrContainsEmpty(orig))
360 		{
361 			/*
362 			 * Original range is narrower when it doesn't contain empty
363 			 * ranges. Add additional penalty otherwise.
364 			 */
365 			*penalty += CONTAIN_EMPTY_PENALTY;
366 		}
367 	}
368 	else if (new_lower.infinite)
369 	{
370 		/* Handle insertion of (-inf, x) range */
371 		if (!orig_empty && orig_lower.infinite)
372 		{
373 			if (orig_upper.infinite)
374 			{
375 				/*
376 				 * (-inf, +inf) range won't be extended by insertion of (-inf,
377 				 * x) range. It's a less desirable case than insertion to
378 				 * (-inf, y) original range without extension, because in that
379 				 * case original range is narrower. But we can't express that
380 				 * in single float value.
381 				 */
382 				*penalty = 0.0;
383 			}
384 			else
385 			{
386 				if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
387 				{
388 					/*
389 					 * Get extension of original range using subtype_diff. Use
390 					 * constant if subtype_diff unavailable.
391 					 */
392 					if (has_subtype_diff)
393 						*penalty = call_subtype_diff(typcache,
394 													 new_upper.val,
395 													 orig_upper.val);
396 					else
397 						*penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
398 				}
399 				else
400 				{
401 					/* No extension of original range */
402 					*penalty = 0.0;
403 				}
404 			}
405 		}
406 		else
407 		{
408 			/*
409 			 * If lower bound of original range is not -inf, then extension of
410 			 * it is infinity.
411 			 */
412 			*penalty = get_float4_infinity();
413 		}
414 	}
415 	else if (new_upper.infinite)
416 	{
417 		/* Handle insertion of (x, +inf) range */
418 		if (!orig_empty && orig_upper.infinite)
419 		{
420 			if (orig_lower.infinite)
421 			{
422 				/*
423 				 * (-inf, +inf) range won't be extended by insertion of (x,
424 				 * +inf) range. It's a less desirable case than insertion to
425 				 * (y, +inf) original range without extension, because in that
426 				 * case original range is narrower. But we can't express that
427 				 * in single float value.
428 				 */
429 				*penalty = 0.0;
430 			}
431 			else
432 			{
433 				if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
434 				{
435 					/*
436 					 * Get extension of original range using subtype_diff. Use
437 					 * constant if subtype_diff unavailable.
438 					 */
439 					if (has_subtype_diff)
440 						*penalty = call_subtype_diff(typcache,
441 													 orig_lower.val,
442 													 new_lower.val);
443 					else
444 						*penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
445 				}
446 				else
447 				{
448 					/* No extension of original range */
449 					*penalty = 0.0;
450 				}
451 			}
452 		}
453 		else
454 		{
455 			/*
456 			 * If upper bound of original range is not +inf, then extension of
457 			 * it is infinity.
458 			 */
459 			*penalty = get_float4_infinity();
460 		}
461 	}
462 	else
463 	{
464 		/* Handle insertion of normal (non-empty, non-infinite) range */
465 		if (orig_empty || orig_lower.infinite || orig_upper.infinite)
466 		{
467 			/*
468 			 * Avoid mixing normal ranges with infinite and empty ranges.
469 			 */
470 			*penalty = get_float4_infinity();
471 		}
472 		else
473 		{
474 			/*
475 			 * Calculate extension of original range by calling subtype_diff.
476 			 * Use constant if subtype_diff unavailable.
477 			 */
478 			float8		diff = 0.0;
479 
480 			if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
481 			{
482 				if (has_subtype_diff)
483 					diff += call_subtype_diff(typcache,
484 											  orig_lower.val,
485 											  new_lower.val);
486 				else
487 					diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
488 			}
489 			if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
490 			{
491 				if (has_subtype_diff)
492 					diff += call_subtype_diff(typcache,
493 											  new_upper.val,
494 											  orig_upper.val);
495 				else
496 					diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
497 			}
498 			*penalty = diff;
499 		}
500 	}
501 
502 	PG_RETURN_POINTER(penalty);
503 }
504 
505 /*
506  * The GiST PickSplit method for ranges
507  *
508  * Primarily, we try to segregate ranges of different classes.  If splitting
509  * ranges of the same class, use the appropriate split method for that class.
510  */
511 Datum
range_gist_picksplit(PG_FUNCTION_ARGS)512 range_gist_picksplit(PG_FUNCTION_ARGS)
513 {
514 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
515 	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
516 	TypeCacheEntry *typcache;
517 	OffsetNumber i;
518 	RangeType  *pred_left;
519 	int			nbytes;
520 	OffsetNumber maxoff;
521 	int			count_in_classes[CLS_COUNT];
522 	int			j;
523 	int			non_empty_classes_count = 0;
524 	int			biggest_class = -1;
525 	int			biggest_class_count = 0;
526 	int			total_count;
527 
528 	/* use first item to look up range type's info */
529 	pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
530 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
531 
532 	maxoff = entryvec->n - 1;
533 	nbytes = (maxoff + 1) * sizeof(OffsetNumber);
534 	v->spl_left = (OffsetNumber *) palloc(nbytes);
535 	v->spl_right = (OffsetNumber *) palloc(nbytes);
536 
537 	/*
538 	 * Get count distribution of range classes.
539 	 */
540 	memset(count_in_classes, 0, sizeof(count_in_classes));
541 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
542 	{
543 		RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
544 
545 		count_in_classes[get_gist_range_class(range)]++;
546 	}
547 
548 	/*
549 	 * Count non-empty classes and find biggest class.
550 	 */
551 	total_count = maxoff;
552 	for (j = 0; j < CLS_COUNT; j++)
553 	{
554 		if (count_in_classes[j] > 0)
555 		{
556 			if (count_in_classes[j] > biggest_class_count)
557 			{
558 				biggest_class_count = count_in_classes[j];
559 				biggest_class = j;
560 			}
561 			non_empty_classes_count++;
562 		}
563 	}
564 
565 	Assert(non_empty_classes_count > 0);
566 
567 	if (non_empty_classes_count == 1)
568 	{
569 		/* One non-empty class, so split inside class */
570 		if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
571 		{
572 			/* double sorting split for normal ranges */
573 			range_gist_double_sorting_split(typcache, entryvec, v);
574 		}
575 		else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
576 		{
577 			/* upper bound sorting split for (-inf, x) ranges */
578 			range_gist_single_sorting_split(typcache, entryvec, v, true);
579 		}
580 		else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
581 		{
582 			/* lower bound sorting split for (x, +inf) ranges */
583 			range_gist_single_sorting_split(typcache, entryvec, v, false);
584 		}
585 		else
586 		{
587 			/* trivial split for all (-inf, +inf) or all empty ranges */
588 			range_gist_fallback_split(typcache, entryvec, v);
589 		}
590 	}
591 	else
592 	{
593 		/*
594 		 * Class based split.
595 		 *
596 		 * To which side of the split should each class go?  Initialize them
597 		 * all to go to the left side.
598 		 */
599 		SplitLR		classes_groups[CLS_COUNT];
600 
601 		memset(classes_groups, 0, sizeof(classes_groups));
602 
603 		if (count_in_classes[CLS_NORMAL] > 0)
604 		{
605 			/* separate normal ranges if any */
606 			classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
607 		}
608 		else
609 		{
610 			/*----------
611 			 * Try to split classes in one of two ways:
612 			 *	1) containing infinities - not containing infinities
613 			 *	2) containing empty - not containing empty
614 			 *
615 			 * Select the way which balances the ranges between left and right
616 			 * the best. If split in these ways is not possible, there are at
617 			 * most 3 classes, so just separate biggest class.
618 			 *----------
619 			 */
620 			int			infCount,
621 						nonInfCount;
622 			int			emptyCount,
623 						nonEmptyCount;
624 
625 			nonInfCount =
626 				count_in_classes[CLS_NORMAL] +
627 				count_in_classes[CLS_CONTAIN_EMPTY] +
628 				count_in_classes[CLS_EMPTY];
629 			infCount = total_count - nonInfCount;
630 
631 			nonEmptyCount =
632 				count_in_classes[CLS_NORMAL] +
633 				count_in_classes[CLS_LOWER_INF] +
634 				count_in_classes[CLS_UPPER_INF] +
635 				count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
636 			emptyCount = total_count - nonEmptyCount;
637 
638 			if (infCount > 0 && nonInfCount > 0 &&
639 				(Abs(infCount - nonInfCount) <=
640 				 Abs(emptyCount - nonEmptyCount)))
641 			{
642 				classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
643 				classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
644 				classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
645 			}
646 			else if (emptyCount > 0 && nonEmptyCount > 0)
647 			{
648 				classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
649 				classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
650 				classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
651 				classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
652 			}
653 			else
654 			{
655 				/*
656 				 * Either total_count == emptyCount or total_count ==
657 				 * infCount.
658 				 */
659 				classes_groups[biggest_class] = SPLIT_RIGHT;
660 			}
661 		}
662 
663 		range_gist_class_split(typcache, entryvec, v, classes_groups);
664 	}
665 
666 	PG_RETURN_POINTER(v);
667 }
668 
669 /* equality comparator for GiST */
670 Datum
range_gist_same(PG_FUNCTION_ARGS)671 range_gist_same(PG_FUNCTION_ARGS)
672 {
673 	RangeType  *r1 = PG_GETARG_RANGE(0);
674 	RangeType  *r2 = PG_GETARG_RANGE(1);
675 	bool	   *result = (bool *) PG_GETARG_POINTER(2);
676 
677 	/*
678 	 * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check
679 	 * that for ourselves.  More generally, if the entries have been properly
680 	 * normalized, then unequal flags bytes must mean unequal ranges ... so
681 	 * let's just test all the flag bits at once.
682 	 */
683 	if (range_get_flags(r1) != range_get_flags(r2))
684 		*result = false;
685 	else
686 	{
687 		TypeCacheEntry *typcache;
688 
689 		typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
690 
691 		*result = range_eq_internal(typcache, r1, r2);
692 	}
693 
694 	PG_RETURN_POINTER(result);
695 }
696 
697 /*
698  *----------------------------------------------------------
699  * STATIC FUNCTIONS
700  *----------------------------------------------------------
701  */
702 
703 /*
704  * Return the smallest range that contains r1 and r2
705  *
706  * This differs from regular range_union in two critical ways:
707  * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb
708  * the intervening values into the result range.
709  * 2. We track whether any empty range has been union'd into the result,
710  * so that contained_by searches can be indexed.  Note that this means
711  * that *all* unions formed within the GiST index must go through here.
712  */
713 static RangeType *
range_super_union(TypeCacheEntry * typcache,RangeType * r1,RangeType * r2)714 range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
715 {
716 	RangeType  *result;
717 	RangeBound	lower1,
718 				lower2;
719 	RangeBound	upper1,
720 				upper2;
721 	bool		empty1,
722 				empty2;
723 	char		flags1,
724 				flags2;
725 	RangeBound *result_lower;
726 	RangeBound *result_upper;
727 
728 	range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
729 	range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
730 	flags1 = range_get_flags(r1);
731 	flags2 = range_get_flags(r2);
732 
733 	if (empty1)
734 	{
735 		/* We can return r2 as-is if it already is or contains empty */
736 		if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
737 			return r2;
738 		/* Else we'd better copy it (modify-in-place isn't safe) */
739 		r2 = rangeCopy(r2);
740 		range_set_contain_empty(r2);
741 		return r2;
742 	}
743 	if (empty2)
744 	{
745 		/* We can return r1 as-is if it already is or contains empty */
746 		if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
747 			return r1;
748 		/* Else we'd better copy it (modify-in-place isn't safe) */
749 		r1 = rangeCopy(r1);
750 		range_set_contain_empty(r1);
751 		return r1;
752 	}
753 
754 	if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
755 		result_lower = &lower1;
756 	else
757 		result_lower = &lower2;
758 
759 	if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
760 		result_upper = &upper1;
761 	else
762 		result_upper = &upper2;
763 
764 	/* optimization to avoid constructing a new range */
765 	if (result_lower == &lower1 && result_upper == &upper1 &&
766 		((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY)))
767 		return r1;
768 	if (result_lower == &lower2 && result_upper == &upper2 &&
769 		((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY)))
770 		return r2;
771 
772 	result = make_range(typcache, result_lower, result_upper, false);
773 
774 	if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY))
775 		range_set_contain_empty(result);
776 
777 	return result;
778 }
779 
780 /*
781  * GiST consistent test on an index internal page
782  */
783 static bool
range_gist_consistent_int(TypeCacheEntry * typcache,StrategyNumber strategy,RangeType * key,Datum query)784 range_gist_consistent_int(TypeCacheEntry *typcache, StrategyNumber strategy,
785 						  RangeType *key, Datum query)
786 {
787 	switch (strategy)
788 	{
789 		case RANGESTRAT_BEFORE:
790 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
791 				return false;
792 			return (!range_overright_internal(typcache, key,
793 											  DatumGetRangeType(query)));
794 		case RANGESTRAT_OVERLEFT:
795 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
796 				return false;
797 			return (!range_after_internal(typcache, key,
798 										  DatumGetRangeType(query)));
799 		case RANGESTRAT_OVERLAPS:
800 			return range_overlaps_internal(typcache, key,
801 										   DatumGetRangeType(query));
802 		case RANGESTRAT_OVERRIGHT:
803 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
804 				return false;
805 			return (!range_before_internal(typcache, key,
806 										   DatumGetRangeType(query)));
807 		case RANGESTRAT_AFTER:
808 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
809 				return false;
810 			return (!range_overleft_internal(typcache, key,
811 											 DatumGetRangeType(query)));
812 		case RANGESTRAT_ADJACENT:
813 			if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
814 				return false;
815 			if (range_adjacent_internal(typcache, key,
816 										DatumGetRangeType(query)))
817 				return true;
818 			return range_overlaps_internal(typcache, key,
819 										   DatumGetRangeType(query));
820 		case RANGESTRAT_CONTAINS:
821 			return range_contains_internal(typcache, key,
822 										   DatumGetRangeType(query));
823 		case RANGESTRAT_CONTAINED_BY:
824 
825 			/*
826 			 * Empty ranges are contained by anything, so if key is or
827 			 * contains any empty ranges, we must descend into it.  Otherwise,
828 			 * descend only if key overlaps the query.
829 			 */
830 			if (RangeIsOrContainsEmpty(key))
831 				return true;
832 			return range_overlaps_internal(typcache, key,
833 										   DatumGetRangeType(query));
834 		case RANGESTRAT_CONTAINS_ELEM:
835 			return range_contains_elem_internal(typcache, key, query);
836 		case RANGESTRAT_EQ:
837 
838 			/*
839 			 * If query is empty, descend only if the key is or contains any
840 			 * empty ranges.  Otherwise, descend if key contains query.
841 			 */
842 			if (RangeIsEmpty(DatumGetRangeType(query)))
843 				return RangeIsOrContainsEmpty(key);
844 			return range_contains_internal(typcache, key,
845 										   DatumGetRangeType(query));
846 		default:
847 			elog(ERROR, "unrecognized range strategy: %d", strategy);
848 			return false;		/* keep compiler quiet */
849 	}
850 }
851 
852 /*
853  * GiST consistent test on an index leaf page
854  */
855 static bool
range_gist_consistent_leaf(TypeCacheEntry * typcache,StrategyNumber strategy,RangeType * key,Datum query)856 range_gist_consistent_leaf(TypeCacheEntry *typcache, StrategyNumber strategy,
857 						   RangeType *key, Datum query)
858 {
859 	switch (strategy)
860 	{
861 		case RANGESTRAT_BEFORE:
862 			return range_before_internal(typcache, key,
863 										 DatumGetRangeType(query));
864 		case RANGESTRAT_OVERLEFT:
865 			return range_overleft_internal(typcache, key,
866 										   DatumGetRangeType(query));
867 		case RANGESTRAT_OVERLAPS:
868 			return range_overlaps_internal(typcache, key,
869 										   DatumGetRangeType(query));
870 		case RANGESTRAT_OVERRIGHT:
871 			return range_overright_internal(typcache, key,
872 											DatumGetRangeType(query));
873 		case RANGESTRAT_AFTER:
874 			return range_after_internal(typcache, key,
875 										DatumGetRangeType(query));
876 		case RANGESTRAT_ADJACENT:
877 			return range_adjacent_internal(typcache, key,
878 										   DatumGetRangeType(query));
879 		case RANGESTRAT_CONTAINS:
880 			return range_contains_internal(typcache, key,
881 										   DatumGetRangeType(query));
882 		case RANGESTRAT_CONTAINED_BY:
883 			return range_contained_by_internal(typcache, key,
884 											   DatumGetRangeType(query));
885 		case RANGESTRAT_CONTAINS_ELEM:
886 			return range_contains_elem_internal(typcache, key, query);
887 		case RANGESTRAT_EQ:
888 			return range_eq_internal(typcache, key, DatumGetRangeType(query));
889 		default:
890 			elog(ERROR, "unrecognized range strategy: %d", strategy);
891 			return false;		/* keep compiler quiet */
892 	}
893 }
894 
895 /*
896  * Trivial split: half of entries will be placed on one page
897  * and the other half on the other page.
898  */
899 static void
range_gist_fallback_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v)900 range_gist_fallback_split(TypeCacheEntry *typcache,
901 						  GistEntryVector *entryvec,
902 						  GIST_SPLITVEC *v)
903 {
904 	RangeType  *left_range = NULL;
905 	RangeType  *right_range = NULL;
906 	OffsetNumber i,
907 				maxoff,
908 				split_idx;
909 
910 	maxoff = entryvec->n - 1;
911 	/* Split entries before this to left page, after to right: */
912 	split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
913 
914 	v->spl_nleft = 0;
915 	v->spl_nright = 0;
916 	for (i = FirstOffsetNumber; i <= maxoff; i++)
917 	{
918 		RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
919 
920 		if (i < split_idx)
921 			PLACE_LEFT(range, i);
922 		else
923 			PLACE_RIGHT(range, i);
924 	}
925 
926 	v->spl_ldatum = RangeTypeGetDatum(left_range);
927 	v->spl_rdatum = RangeTypeGetDatum(right_range);
928 }
929 
930 /*
931  * Split based on classes of ranges.
932  *
933  * See get_gist_range_class for class definitions.
934  * classes_groups is an array of length CLS_COUNT indicating the side of the
935  * split to which each class should go.
936  */
937 static void
range_gist_class_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v,SplitLR * classes_groups)938 range_gist_class_split(TypeCacheEntry *typcache,
939 					   GistEntryVector *entryvec,
940 					   GIST_SPLITVEC *v,
941 					   SplitLR *classes_groups)
942 {
943 	RangeType  *left_range = NULL;
944 	RangeType  *right_range = NULL;
945 	OffsetNumber i,
946 				maxoff;
947 
948 	maxoff = entryvec->n - 1;
949 
950 	v->spl_nleft = 0;
951 	v->spl_nright = 0;
952 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
953 	{
954 		RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
955 		int			class;
956 
957 		/* Get class of range */
958 		class = get_gist_range_class(range);
959 
960 		/* Place range to appropriate page */
961 		if (classes_groups[class] == SPLIT_LEFT)
962 			PLACE_LEFT(range, i);
963 		else
964 		{
965 			Assert(classes_groups[class] == SPLIT_RIGHT);
966 			PLACE_RIGHT(range, i);
967 		}
968 	}
969 
970 	v->spl_ldatum = RangeTypeGetDatum(left_range);
971 	v->spl_rdatum = RangeTypeGetDatum(right_range);
972 }
973 
974 /*
975  * Sorting based split. First half of entries according to the sort will be
976  * placed to one page, and second half of entries will be placed to other
977  * page. use_upper_bound parameter indicates whether to use upper or lower
978  * bound for sorting.
979  */
980 static void
range_gist_single_sorting_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v,bool use_upper_bound)981 range_gist_single_sorting_split(TypeCacheEntry *typcache,
982 								GistEntryVector *entryvec,
983 								GIST_SPLITVEC *v,
984 								bool use_upper_bound)
985 {
986 	SingleBoundSortItem *sortItems;
987 	RangeType  *left_range = NULL;
988 	RangeType  *right_range = NULL;
989 	OffsetNumber i,
990 				maxoff,
991 				split_idx;
992 
993 	maxoff = entryvec->n - 1;
994 
995 	sortItems = (SingleBoundSortItem *)
996 		palloc(maxoff * sizeof(SingleBoundSortItem));
997 
998 	/*
999 	 * Prepare auxiliary array and sort the values.
1000 	 */
1001 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1002 	{
1003 		RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
1004 		RangeBound	bound2;
1005 		bool		empty;
1006 
1007 		sortItems[i - 1].index = i;
1008 		/* Put appropriate bound into array */
1009 		if (use_upper_bound)
1010 			range_deserialize(typcache, range, &bound2,
1011 							  &sortItems[i - 1].bound, &empty);
1012 		else
1013 			range_deserialize(typcache, range, &sortItems[i - 1].bound,
1014 							  &bound2, &empty);
1015 		Assert(!empty);
1016 	}
1017 
1018 	qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
1019 			  single_bound_cmp, typcache);
1020 
1021 	split_idx = maxoff / 2;
1022 
1023 	v->spl_nleft = 0;
1024 	v->spl_nright = 0;
1025 
1026 	for (i = 0; i < maxoff; i++)
1027 	{
1028 		int			idx = sortItems[i].index;
1029 		RangeType  *range = DatumGetRangeType(entryvec->vector[idx].key);
1030 
1031 		if (i < split_idx)
1032 			PLACE_LEFT(range, idx);
1033 		else
1034 			PLACE_RIGHT(range, idx);
1035 	}
1036 
1037 	v->spl_ldatum = RangeTypeGetDatum(left_range);
1038 	v->spl_rdatum = RangeTypeGetDatum(right_range);
1039 }
1040 
1041 /*
1042  * Double sorting split algorithm.
1043  *
1044  * The algorithm considers dividing ranges into two groups. The first (left)
1045  * group contains general left bound. The second (right) group contains
1046  * general right bound. The challenge is to find upper bound of left group
1047  * and lower bound of right group so that overlap of groups is minimal and
1048  * ratio of distribution is acceptable. Algorithm finds for each lower bound of
1049  * right group minimal upper bound of left group, and for each upper bound of
1050  * left group maximal lower bound of right group. For each found pair
1051  * range_gist_consider_split considers replacement of currently selected
1052  * split with the new one.
1053  *
1054  * After that, all the entries are divided into three groups:
1055  * 1) Entries which should be placed to the left group
1056  * 2) Entries which should be placed to the right group
1057  * 3) "Common entries" which can be placed to either group without affecting
1058  *	  amount of overlap.
1059  *
1060  * The common ranges are distributed by difference of distance from lower
1061  * bound of common range to lower bound of right group and distance from upper
1062  * bound of common range to upper bound of left group.
1063  *
1064  * For details see:
1065  * "A new double sorting-based node splitting algorithm for R-tree",
1066  * A. Korotkov
1067  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
1068  */
1069 static void
range_gist_double_sorting_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v)1070 range_gist_double_sorting_split(TypeCacheEntry *typcache,
1071 								GistEntryVector *entryvec,
1072 								GIST_SPLITVEC *v)
1073 {
1074 	ConsiderSplitContext context;
1075 	OffsetNumber i,
1076 				maxoff;
1077 	RangeType  *range,
1078 			   *left_range = NULL,
1079 			   *right_range = NULL;
1080 	int			common_entries_count;
1081 	NonEmptyRange *by_lower,
1082 			   *by_upper;
1083 	CommonEntry *common_entries;
1084 	int			nentries,
1085 				i1,
1086 				i2;
1087 	RangeBound *right_lower,
1088 			   *left_upper;
1089 
1090 	memset(&context, 0, sizeof(ConsiderSplitContext));
1091 	context.typcache = typcache;
1092 	context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
1093 
1094 	maxoff = entryvec->n - 1;
1095 	nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
1096 	context.first = true;
1097 
1098 	/* Allocate arrays for sorted range bounds */
1099 	by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1100 	by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1101 
1102 	/* Fill arrays of bounds */
1103 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1104 	{
1105 		RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
1106 		bool		empty;
1107 
1108 		range_deserialize(typcache, range,
1109 						  &by_lower[i - FirstOffsetNumber].lower,
1110 						  &by_lower[i - FirstOffsetNumber].upper,
1111 						  &empty);
1112 		Assert(!empty);
1113 	}
1114 
1115 	/*
1116 	 * Make two arrays of range bounds: one sorted by lower bound and another
1117 	 * sorted by upper bound.
1118 	 */
1119 	memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
1120 	qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
1121 			  interval_cmp_lower, typcache);
1122 	qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
1123 			  interval_cmp_upper, typcache);
1124 
1125 	/*----------
1126 	 * The goal is to form a left and right range, so that every entry
1127 	 * range is contained by either left or right interval (or both).
1128 	 *
1129 	 * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
1130 	 *
1131 	 * 0 1 2 3 4
1132 	 * +-+
1133 	 *	 +---+
1134 	 *	   +-+
1135 	 *	   +---+
1136 	 *
1137 	 * The left and right ranges are of the form (0,a) and (b,4).
1138 	 * We first consider splits where b is the lower bound of an entry.
1139 	 * We iterate through all entries, and for each b, calculate the
1140 	 * smallest possible a. Then we consider splits where a is the
1141 	 * upper bound of an entry, and for each a, calculate the greatest
1142 	 * possible b.
1143 	 *
1144 	 * In the above example, the first loop would consider splits:
1145 	 * b=0: (0,1)-(0,4)
1146 	 * b=1: (0,1)-(1,4)
1147 	 * b=2: (0,3)-(2,4)
1148 	 *
1149 	 * And the second loop:
1150 	 * a=1: (0,1)-(1,4)
1151 	 * a=3: (0,3)-(2,4)
1152 	 * a=4: (0,4)-(2,4)
1153 	 *----------
1154 	 */
1155 
1156 	/*
1157 	 * Iterate over lower bound of right group, finding smallest possible
1158 	 * upper bound of left group.
1159 	 */
1160 	i1 = 0;
1161 	i2 = 0;
1162 	right_lower = &by_lower[i1].lower;
1163 	left_upper = &by_upper[i2].lower;
1164 	while (true)
1165 	{
1166 		/*
1167 		 * Find next lower bound of right group.
1168 		 */
1169 		while (i1 < nentries &&
1170 			   range_cmp_bounds(typcache, right_lower,
1171 								&by_lower[i1].lower) == 0)
1172 		{
1173 			if (range_cmp_bounds(typcache, &by_lower[i1].upper,
1174 								 left_upper) > 0)
1175 				left_upper = &by_lower[i1].upper;
1176 			i1++;
1177 		}
1178 		if (i1 >= nentries)
1179 			break;
1180 		right_lower = &by_lower[i1].lower;
1181 
1182 		/*
1183 		 * Find count of ranges which anyway should be placed to the left
1184 		 * group.
1185 		 */
1186 		while (i2 < nentries &&
1187 			   range_cmp_bounds(typcache, &by_upper[i2].upper,
1188 								left_upper) <= 0)
1189 			i2++;
1190 
1191 		/*
1192 		 * Consider found split to see if it's better than what we had.
1193 		 */
1194 		range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
1195 	}
1196 
1197 	/*
1198 	 * Iterate over upper bound of left group finding greatest possible lower
1199 	 * bound of right group.
1200 	 */
1201 	i1 = nentries - 1;
1202 	i2 = nentries - 1;
1203 	right_lower = &by_lower[i1].upper;
1204 	left_upper = &by_upper[i2].upper;
1205 	while (true)
1206 	{
1207 		/*
1208 		 * Find next upper bound of left group.
1209 		 */
1210 		while (i2 >= 0 &&
1211 			   range_cmp_bounds(typcache, left_upper,
1212 								&by_upper[i2].upper) == 0)
1213 		{
1214 			if (range_cmp_bounds(typcache, &by_upper[i2].lower,
1215 								 right_lower) < 0)
1216 				right_lower = &by_upper[i2].lower;
1217 			i2--;
1218 		}
1219 		if (i2 < 0)
1220 			break;
1221 		left_upper = &by_upper[i2].upper;
1222 
1223 		/*
1224 		 * Find count of intervals which anyway should be placed to the right
1225 		 * group.
1226 		 */
1227 		while (i1 >= 0 &&
1228 			   range_cmp_bounds(typcache, &by_lower[i1].lower,
1229 								right_lower) >= 0)
1230 			i1--;
1231 
1232 		/*
1233 		 * Consider found split to see if it's better than what we had.
1234 		 */
1235 		range_gist_consider_split(&context, right_lower, i1 + 1,
1236 								  left_upper, i2 + 1);
1237 	}
1238 
1239 	/*
1240 	 * If we failed to find any acceptable splits, use trivial split.
1241 	 */
1242 	if (context.first)
1243 	{
1244 		range_gist_fallback_split(typcache, entryvec, v);
1245 		return;
1246 	}
1247 
1248 	/*
1249 	 * Ok, we have now selected bounds of the groups. Now we have to
1250 	 * distribute entries themselves. At first we distribute entries which can
1251 	 * be placed unambiguously and collect "common entries" to array.
1252 	 */
1253 
1254 	/* Allocate vectors for results */
1255 	v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1256 	v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1257 	v->spl_nleft = 0;
1258 	v->spl_nright = 0;
1259 
1260 	/*
1261 	 * Allocate an array for "common entries" - entries which can be placed to
1262 	 * either group without affecting overlap along selected axis.
1263 	 */
1264 	common_entries_count = 0;
1265 	common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
1266 
1267 	/*
1268 	 * Distribute entries which can be distributed unambiguously, and collect
1269 	 * common entries.
1270 	 */
1271 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1272 	{
1273 		RangeBound	lower,
1274 					upper;
1275 		bool		empty;
1276 
1277 		/*
1278 		 * Get upper and lower bounds along selected axis.
1279 		 */
1280 		range = DatumGetRangeType(entryvec->vector[i].key);
1281 
1282 		range_deserialize(typcache, range, &lower, &upper, &empty);
1283 
1284 		if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
1285 		{
1286 			/* Fits in the left group */
1287 			if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
1288 			{
1289 				/* Fits also in the right group, so "common entry" */
1290 				common_entries[common_entries_count].index = i;
1291 				if (context.has_subtype_diff)
1292 				{
1293 					/*
1294 					 * delta = (lower - context.right_lower) -
1295 					 * (context.left_upper - upper)
1296 					 */
1297 					common_entries[common_entries_count].delta =
1298 						call_subtype_diff(typcache,
1299 										  lower.val,
1300 										  context.right_lower->val) -
1301 						call_subtype_diff(typcache,
1302 										  context.left_upper->val,
1303 										  upper.val);
1304 				}
1305 				else
1306 				{
1307 					/* Without subtype_diff, take all deltas as zero */
1308 					common_entries[common_entries_count].delta = 0;
1309 				}
1310 				common_entries_count++;
1311 			}
1312 			else
1313 			{
1314 				/* Doesn't fit to the right group, so join to the left group */
1315 				PLACE_LEFT(range, i);
1316 			}
1317 		}
1318 		else
1319 		{
1320 			/*
1321 			 * Each entry should fit on either left or right group. Since this
1322 			 * entry didn't fit in the left group, it better fit in the right
1323 			 * group.
1324 			 */
1325 			Assert(range_cmp_bounds(typcache, &lower,
1326 									context.right_lower) >= 0);
1327 			PLACE_RIGHT(range, i);
1328 		}
1329 	}
1330 
1331 	/*
1332 	 * Distribute "common entries", if any.
1333 	 */
1334 	if (common_entries_count > 0)
1335 	{
1336 		/*
1337 		 * Sort "common entries" by calculated deltas in order to distribute
1338 		 * the most ambiguous entries first.
1339 		 */
1340 		qsort(common_entries, common_entries_count, sizeof(CommonEntry),
1341 			  common_entry_cmp);
1342 
1343 		/*
1344 		 * Distribute "common entries" between groups according to sorting.
1345 		 */
1346 		for (i = 0; i < common_entries_count; i++)
1347 		{
1348 			int			idx = common_entries[i].index;
1349 
1350 			range = DatumGetRangeType(entryvec->vector[idx].key);
1351 
1352 			/*
1353 			 * Check if we have to place this entry in either group to achieve
1354 			 * LIMIT_RATIO.
1355 			 */
1356 			if (i < context.common_left)
1357 				PLACE_LEFT(range, idx);
1358 			else
1359 				PLACE_RIGHT(range, idx);
1360 		}
1361 	}
1362 
1363 	v->spl_ldatum = PointerGetDatum(left_range);
1364 	v->spl_rdatum = PointerGetDatum(right_range);
1365 }
1366 
1367 /*
1368  * Consider replacement of currently selected split with a better one
1369  * during range_gist_double_sorting_split.
1370  */
1371 static void
range_gist_consider_split(ConsiderSplitContext * context,RangeBound * right_lower,int min_left_count,RangeBound * left_upper,int max_left_count)1372 range_gist_consider_split(ConsiderSplitContext *context,
1373 						  RangeBound *right_lower, int min_left_count,
1374 						  RangeBound *left_upper, int max_left_count)
1375 {
1376 	int			left_count,
1377 				right_count;
1378 	float4		ratio,
1379 				overlap;
1380 
1381 	/*
1382 	 * Calculate entries distribution ratio assuming most uniform distribution
1383 	 * of common entries.
1384 	 */
1385 	if (min_left_count >= (context->entries_count + 1) / 2)
1386 		left_count = min_left_count;
1387 	else if (max_left_count <= context->entries_count / 2)
1388 		left_count = max_left_count;
1389 	else
1390 		left_count = context->entries_count / 2;
1391 	right_count = context->entries_count - left_count;
1392 
1393 	/*
1394 	 * Ratio of split: quotient between size of smaller group and total
1395 	 * entries count.  This is necessarily 0.5 or less; if it's less than
1396 	 * LIMIT_RATIO then we will never accept the new split.
1397 	 */
1398 	ratio = ((float4) Min(left_count, right_count)) /
1399 		((float4) context->entries_count);
1400 
1401 	if (ratio > LIMIT_RATIO)
1402 	{
1403 		bool		selectthis = false;
1404 
1405 		/*
1406 		 * The ratio is acceptable, so compare current split with previously
1407 		 * selected one. We search for minimal overlap (allowing negative
1408 		 * values) and minimal ratio secondarily.  If subtype_diff is
1409 		 * available, it's used for overlap measure.  Without subtype_diff we
1410 		 * use number of "common entries" as an overlap measure.
1411 		 */
1412 		if (context->has_subtype_diff)
1413 			overlap = call_subtype_diff(context->typcache,
1414 										left_upper->val,
1415 										right_lower->val);
1416 		else
1417 			overlap = max_left_count - min_left_count;
1418 
1419 		/* If there is no previous selection, select this split */
1420 		if (context->first)
1421 			selectthis = true;
1422 		else
1423 		{
1424 			/*
1425 			 * Choose the new split if it has a smaller overlap, or same
1426 			 * overlap but better ratio.
1427 			 */
1428 			if (overlap < context->overlap ||
1429 				(overlap == context->overlap && ratio > context->ratio))
1430 				selectthis = true;
1431 		}
1432 
1433 		if (selectthis)
1434 		{
1435 			/* save information about selected split */
1436 			context->first = false;
1437 			context->ratio = ratio;
1438 			context->overlap = overlap;
1439 			context->right_lower = right_lower;
1440 			context->left_upper = left_upper;
1441 			context->common_left = max_left_count - left_count;
1442 			context->common_right = left_count - min_left_count;
1443 		}
1444 	}
1445 }
1446 
1447 /*
1448  * Find class number for range.
1449  *
1450  * The class number is a valid combination of the properties of the
1451  * range.  Note: the highest possible number is 8, because CLS_EMPTY
1452  * can't be combined with anything else.
1453  */
1454 static int
get_gist_range_class(RangeType * range)1455 get_gist_range_class(RangeType *range)
1456 {
1457 	int			classNumber;
1458 	char		flags;
1459 
1460 	flags = range_get_flags(range);
1461 	if (flags & RANGE_EMPTY)
1462 	{
1463 		classNumber = CLS_EMPTY;
1464 	}
1465 	else
1466 	{
1467 		classNumber = 0;
1468 		if (flags & RANGE_LB_INF)
1469 			classNumber |= CLS_LOWER_INF;
1470 		if (flags & RANGE_UB_INF)
1471 			classNumber |= CLS_UPPER_INF;
1472 		if (flags & RANGE_CONTAIN_EMPTY)
1473 			classNumber |= CLS_CONTAIN_EMPTY;
1474 	}
1475 	return classNumber;
1476 }
1477 
1478 /*
1479  * Comparison function for range_gist_single_sorting_split.
1480  */
1481 static int
single_bound_cmp(const void * a,const void * b,void * arg)1482 single_bound_cmp(const void *a, const void *b, void *arg)
1483 {
1484 	SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
1485 	SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
1486 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1487 
1488 	return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
1489 }
1490 
1491 /*
1492  * Compare NonEmptyRanges by lower bound.
1493  */
1494 static int
interval_cmp_lower(const void * a,const void * b,void * arg)1495 interval_cmp_lower(const void *a, const void *b, void *arg)
1496 {
1497 	NonEmptyRange *i1 = (NonEmptyRange *) a;
1498 	NonEmptyRange *i2 = (NonEmptyRange *) b;
1499 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1500 
1501 	return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
1502 }
1503 
1504 /*
1505  * Compare NonEmptyRanges by upper bound.
1506  */
1507 static int
interval_cmp_upper(const void * a,const void * b,void * arg)1508 interval_cmp_upper(const void *a, const void *b, void *arg)
1509 {
1510 	NonEmptyRange *i1 = (NonEmptyRange *) a;
1511 	NonEmptyRange *i2 = (NonEmptyRange *) b;
1512 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1513 
1514 	return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
1515 }
1516 
1517 /*
1518  * Compare CommonEntrys by their deltas.
1519  */
1520 static int
common_entry_cmp(const void * i1,const void * i2)1521 common_entry_cmp(const void *i1, const void *i2)
1522 {
1523 	double		delta1 = ((CommonEntry *) i1)->delta;
1524 	double		delta2 = ((CommonEntry *) i2)->delta;
1525 
1526 	if (delta1 < delta2)
1527 		return -1;
1528 	else if (delta1 > delta2)
1529 		return 1;
1530 	else
1531 		return 0;
1532 }
1533 
1534 /*
1535  * Convenience function to invoke type-specific subtype_diff function.
1536  * Caller must have already checked that there is one for the range type.
1537  */
1538 static float8
call_subtype_diff(TypeCacheEntry * typcache,Datum val1,Datum val2)1539 call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
1540 {
1541 	float8		value;
1542 
1543 	value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
1544 											 typcache->rng_collation,
1545 											 val1, val2));
1546 	/* Cope with buggy subtype_diff function by returning zero */
1547 	if (value >= 0.0)
1548 		return value;
1549 	return 0.0;
1550 }
1551