1 /*-------------------------------------------------------------------------
2  *
3  * rangetypes_gist.c
4  *	  GiST support for range types.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/adt/rangetypes_gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/gist.h"
18 #include "access/stratnum.h"
19 #include "utils/datum.h"
20 #include "utils/float.h"
21 #include "utils/fmgrprotos.h"
22 #include "utils/multirangetypes.h"
23 #include "utils/rangetypes.h"
24 
25 /*
26  * Range class properties used to segregate different classes of ranges in
27  * GiST.  Each unique combination of properties is a class.  CLS_EMPTY cannot
28  * be combined with anything else.
29  */
30 #define CLS_NORMAL			0	/* Ordinary finite range (no bits set) */
31 #define CLS_LOWER_INF		1	/* Lower bound is infinity */
32 #define CLS_UPPER_INF		2	/* Upper bound is infinity */
33 #define CLS_CONTAIN_EMPTY	4	/* Contains underlying empty ranges */
34 #define CLS_EMPTY			8	/* Special class for empty ranges */
35 
36 #define CLS_COUNT			9	/* # of classes; includes all combinations of
37 								 * properties. CLS_EMPTY doesn't combine with
38 								 * anything else, so it's only 2^3 + 1. */
39 
40 /*
41  * Minimum accepted ratio of split for items of the same class.  If the items
42  * are of different classes, we will separate along those lines regardless of
43  * the ratio.
44  */
45 #define LIMIT_RATIO  0.3
46 
47 /* Constants for fixed penalty values */
48 #define INFINITE_BOUND_PENALTY	2.0
49 #define CONTAIN_EMPTY_PENALTY  1.0
50 #define DEFAULT_SUBTYPE_DIFF_PENALTY  1.0
51 
52 /*
53  * Per-item data for range_gist_single_sorting_split.
54  */
55 typedef struct
56 {
57 	int			index;
58 	RangeBound	bound;
59 } SingleBoundSortItem;
60 
61 /* place on left or right side of split? */
62 typedef enum
63 {
64 	SPLIT_LEFT = 0,				/* makes initialization to SPLIT_LEFT easier */
65 	SPLIT_RIGHT
66 } SplitLR;
67 
68 /*
69  * Context for range_gist_consider_split.
70  */
71 typedef struct
72 {
73 	TypeCacheEntry *typcache;	/* typcache for range type */
74 	bool		has_subtype_diff;	/* does it have subtype_diff? */
75 	int			entries_count;	/* total number of entries being split */
76 
77 	/* Information about currently selected split follows */
78 
79 	bool		first;			/* true if no split was selected yet */
80 
81 	RangeBound *left_upper;		/* upper bound of left interval */
82 	RangeBound *right_lower;	/* lower bound of right interval */
83 
84 	float4		ratio;			/* split ratio */
85 	float4		overlap;		/* overlap between left and right predicate */
86 	int			common_left;	/* # common entries destined for each side */
87 	int			common_right;
88 } ConsiderSplitContext;
89 
90 /*
91  * Bounds extracted from a non-empty range, for use in
92  * range_gist_double_sorting_split.
93  */
94 typedef struct
95 {
96 	RangeBound	lower;
97 	RangeBound	upper;
98 } NonEmptyRange;
99 
100 /*
101  * Represents information about an entry that can be placed in either group
102  * without affecting overlap over selected axis ("common entry").
103  */
104 typedef struct
105 {
106 	/* Index of entry in the initial array */
107 	int			index;
108 	/* Delta between closeness of range to each of the two groups */
109 	double		delta;
110 } CommonEntry;
111 
112 /* Helper macros to place an entry in the left or right group during split */
113 /* Note direct access to variables v, typcache, left_range, right_range */
114 #define PLACE_LEFT(range, off)					\
115 	do {										\
116 		if (v->spl_nleft > 0)					\
117 			left_range = range_super_union(typcache, left_range, range); \
118 		else									\
119 			left_range = (range);				\
120 		v->spl_left[v->spl_nleft++] = (off);	\
121 	} while(0)
122 
123 #define PLACE_RIGHT(range, off)					\
124 	do {										\
125 		if (v->spl_nright > 0)					\
126 			right_range = range_super_union(typcache, right_range, range); \
127 		else									\
128 			right_range = (range);				\
129 		v->spl_right[v->spl_nright++] = (off);	\
130 	} while(0)
131 
132 /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
133 #define rangeCopy(r) \
134 	((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
135 											 false, -1)))
136 
137 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1,
138 									RangeType *r2);
139 static bool range_gist_consistent_int_range(TypeCacheEntry *typcache,
140 											StrategyNumber strategy,
141 											const RangeType *key,
142 											const RangeType *query);
143 static bool range_gist_consistent_int_multirange(TypeCacheEntry *typcache,
144 												 StrategyNumber strategy,
145 												 const RangeType *key,
146 												 const MultirangeType *query);
147 static bool range_gist_consistent_int_element(TypeCacheEntry *typcache,
148 											  StrategyNumber strategy,
149 											  const RangeType *key,
150 											  Datum query);
151 static bool range_gist_consistent_leaf_range(TypeCacheEntry *typcache,
152 											 StrategyNumber strategy,
153 											 const RangeType *key,
154 											 const RangeType *query);
155 static bool range_gist_consistent_leaf_multirange(TypeCacheEntry *typcache,
156 												  StrategyNumber strategy,
157 												  const RangeType *key,
158 												  const MultirangeType *query);
159 static bool range_gist_consistent_leaf_element(TypeCacheEntry *typcache,
160 											   StrategyNumber strategy,
161 											   const RangeType *key,
162 											   Datum query);
163 static void range_gist_fallback_split(TypeCacheEntry *typcache,
164 									  GistEntryVector *entryvec,
165 									  GIST_SPLITVEC *v);
166 static void range_gist_class_split(TypeCacheEntry *typcache,
167 								   GistEntryVector *entryvec,
168 								   GIST_SPLITVEC *v,
169 								   SplitLR *classes_groups);
170 static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
171 											GistEntryVector *entryvec,
172 											GIST_SPLITVEC *v,
173 											bool use_upper_bound);
174 static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
175 											GistEntryVector *entryvec,
176 											GIST_SPLITVEC *v);
177 static void range_gist_consider_split(ConsiderSplitContext *context,
178 									  RangeBound *right_lower, int min_left_count,
179 									  RangeBound *left_upper, int max_left_count);
180 static int	get_gist_range_class(RangeType *range);
181 static int	single_bound_cmp(const void *a, const void *b, void *arg);
182 static int	interval_cmp_lower(const void *a, const void *b, void *arg);
183 static int	interval_cmp_upper(const void *a, const void *b, void *arg);
184 static int	common_entry_cmp(const void *i1, const void *i2);
185 static float8 call_subtype_diff(TypeCacheEntry *typcache,
186 								Datum val1, Datum val2);
187 
188 
189 /* GiST query consistency check */
190 Datum
range_gist_consistent(PG_FUNCTION_ARGS)191 range_gist_consistent(PG_FUNCTION_ARGS)
192 {
193 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
194 	Datum		query = PG_GETARG_DATUM(1);
195 	StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
196 	bool		result;
197 	Oid			subtype = PG_GETARG_OID(3);
198 	bool	   *recheck = (bool *) PG_GETARG_POINTER(4);
199 	RangeType  *key = DatumGetRangeTypeP(entry->key);
200 	TypeCacheEntry *typcache;
201 
202 	/* All operators served by this function are exact */
203 	*recheck = false;
204 
205 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
206 
207 	/*
208 	 * Perform consistent checking using function corresponding to key type
209 	 * (leaf or internal) and query subtype (range, multirange, or element).
210 	 * Note that invalid subtype means that query type matches key type
211 	 * (range).
212 	 */
213 	if (GIST_LEAF(entry))
214 	{
215 		if (!OidIsValid(subtype) || subtype == ANYRANGEOID)
216 			result = range_gist_consistent_leaf_range(typcache, strategy, key,
217 													  DatumGetRangeTypeP(query));
218 		else if (subtype == ANYMULTIRANGEOID)
219 			result = range_gist_consistent_leaf_multirange(typcache, strategy, key,
220 														   DatumGetMultirangeTypeP(query));
221 		else
222 			result = range_gist_consistent_leaf_element(typcache, strategy,
223 														key, query);
224 	}
225 	else
226 	{
227 		if (!OidIsValid(subtype) || subtype == ANYRANGEOID)
228 			result = range_gist_consistent_int_range(typcache, strategy, key,
229 													 DatumGetRangeTypeP(query));
230 		else if (subtype == ANYMULTIRANGEOID)
231 			result = range_gist_consistent_int_multirange(typcache, strategy, key,
232 														  DatumGetMultirangeTypeP(query));
233 		else
234 			result = range_gist_consistent_int_element(typcache, strategy,
235 													   key, query);
236 	}
237 	PG_RETURN_BOOL(result);
238 }
239 
240 /*
241  * GiST compress method for multiranges: multirange is approximated as union
242  * range with no gaps.
243  */
244 Datum
multirange_gist_compress(PG_FUNCTION_ARGS)245 multirange_gist_compress(PG_FUNCTION_ARGS)
246 {
247 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
248 
249 	if (entry->leafkey)
250 	{
251 		MultirangeType *mr = DatumGetMultirangeTypeP(entry->key);
252 		RangeType  *r;
253 		TypeCacheEntry *typcache;
254 		GISTENTRY  *retval = palloc(sizeof(GISTENTRY));
255 
256 		typcache = multirange_get_typcache(fcinfo, MultirangeTypeGetOid(mr));
257 		r = multirange_get_union_range(typcache->rngtype, mr);
258 
259 		gistentryinit(*retval, RangeTypePGetDatum(r),
260 					  entry->rel, entry->page, entry->offset, false);
261 
262 		PG_RETURN_POINTER(retval);
263 	}
264 
265 	PG_RETURN_POINTER(entry);
266 }
267 
268 /* GiST query consistency check for multiranges */
269 Datum
multirange_gist_consistent(PG_FUNCTION_ARGS)270 multirange_gist_consistent(PG_FUNCTION_ARGS)
271 {
272 	GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
273 	Datum		query = PG_GETARG_DATUM(1);
274 	StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
275 	bool		result;
276 	Oid			subtype = PG_GETARG_OID(3);
277 	bool	   *recheck = (bool *) PG_GETARG_POINTER(4);
278 	RangeType  *key = DatumGetRangeTypeP(entry->key);
279 	TypeCacheEntry *typcache;
280 
281 	/*
282 	 * All operators served by this function are inexact because multirange is
283 	 * approximated by union range with no gaps.
284 	 */
285 	*recheck = true;
286 
287 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
288 
289 	/*
290 	 * Perform consistent checking using function corresponding to key type
291 	 * (leaf or internal) and query subtype (range, multirange, or element).
292 	 * Note that invalid subtype means that query type matches key type
293 	 * (multirange).
294 	 */
295 	if (GIST_LEAF(entry))
296 	{
297 		if (!OidIsValid(subtype) || subtype == ANYMULTIRANGEOID)
298 			result = range_gist_consistent_leaf_multirange(typcache, strategy, key,
299 														   DatumGetMultirangeTypeP(query));
300 		else if (subtype == ANYRANGEOID)
301 			result = range_gist_consistent_leaf_range(typcache, strategy, key,
302 													  DatumGetRangeTypeP(query));
303 		else
304 			result = range_gist_consistent_leaf_element(typcache, strategy,
305 														key, query);
306 	}
307 	else
308 	{
309 		if (!OidIsValid(subtype) || subtype == ANYMULTIRANGEOID)
310 			result = range_gist_consistent_int_multirange(typcache, strategy, key,
311 														  DatumGetMultirangeTypeP(query));
312 		else if (subtype == ANYRANGEOID)
313 			result = range_gist_consistent_int_range(typcache, strategy, key,
314 													 DatumGetRangeTypeP(query));
315 		else
316 			result = range_gist_consistent_int_element(typcache, strategy,
317 													   key, query);
318 	}
319 	PG_RETURN_BOOL(result);
320 }
321 
322 /* form union range */
323 Datum
range_gist_union(PG_FUNCTION_ARGS)324 range_gist_union(PG_FUNCTION_ARGS)
325 {
326 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
327 	GISTENTRY  *ent = entryvec->vector;
328 	RangeType  *result_range;
329 	TypeCacheEntry *typcache;
330 	int			i;
331 
332 	result_range = DatumGetRangeTypeP(ent[0].key);
333 
334 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
335 
336 	for (i = 1; i < entryvec->n; i++)
337 	{
338 		result_range = range_super_union(typcache, result_range,
339 										 DatumGetRangeTypeP(ent[i].key));
340 	}
341 
342 	PG_RETURN_RANGE_P(result_range);
343 }
344 
345 /*
346  * We store ranges as ranges in GiST indexes, so we do not need
347  * compress, decompress, or fetch functions.  Note this implies a limit
348  * on the size of range values that can be indexed.
349  */
350 
351 /*
352  * GiST page split penalty function.
353  *
354  * The penalty function has the following goals (in order from most to least
355  * important):
356  * - Keep normal ranges separate
357  * - Avoid broadening the class of the original predicate
358  * - Avoid broadening (as determined by subtype_diff) the original predicate
359  * - Favor adding ranges to narrower original predicates
360  */
361 Datum
range_gist_penalty(PG_FUNCTION_ARGS)362 range_gist_penalty(PG_FUNCTION_ARGS)
363 {
364 	GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
365 	GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
366 	float	   *penalty = (float *) PG_GETARG_POINTER(2);
367 	RangeType  *orig = DatumGetRangeTypeP(origentry->key);
368 	RangeType  *new = DatumGetRangeTypeP(newentry->key);
369 	TypeCacheEntry *typcache;
370 	bool		has_subtype_diff;
371 	RangeBound	orig_lower,
372 				new_lower,
373 				orig_upper,
374 				new_upper;
375 	bool		orig_empty,
376 				new_empty;
377 
378 	if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
379 		elog(ERROR, "range types do not match");
380 
381 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
382 
383 	has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
384 
385 	range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
386 	range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
387 
388 	/*
389 	 * Distinct branches for handling distinct classes of ranges.  Note that
390 	 * penalty values only need to be commensurate within the same class of
391 	 * new range.
392 	 */
393 	if (new_empty)
394 	{
395 		/* Handle insertion of empty range */
396 		if (orig_empty)
397 		{
398 			/*
399 			 * The best case is to insert it to empty original range.
400 			 * Insertion here means no broadening of original range. Also
401 			 * original range is the most narrow.
402 			 */
403 			*penalty = 0.0;
404 		}
405 		else if (RangeIsOrContainsEmpty(orig))
406 		{
407 			/*
408 			 * The second case is to insert empty range into range which
409 			 * contains at least one underlying empty range.  There is still
410 			 * no broadening of original range, but original range is not as
411 			 * narrow as possible.
412 			 */
413 			*penalty = CONTAIN_EMPTY_PENALTY;
414 		}
415 		else if (orig_lower.infinite && orig_upper.infinite)
416 		{
417 			/*
418 			 * Original range requires broadening.  (-inf; +inf) is most far
419 			 * from normal range in this case.
420 			 */
421 			*penalty = 2 * CONTAIN_EMPTY_PENALTY;
422 		}
423 		else if (orig_lower.infinite || orig_upper.infinite)
424 		{
425 			/*
426 			 * (-inf, x) or (x, +inf) original ranges are closer to normal
427 			 * ranges, so it's worse to mix it with empty ranges.
428 			 */
429 			*penalty = 3 * CONTAIN_EMPTY_PENALTY;
430 		}
431 		else
432 		{
433 			/*
434 			 * The least preferred case is broadening of normal range.
435 			 */
436 			*penalty = 4 * CONTAIN_EMPTY_PENALTY;
437 		}
438 	}
439 	else if (new_lower.infinite && new_upper.infinite)
440 	{
441 		/* Handle insertion of (-inf, +inf) range */
442 		if (orig_lower.infinite && orig_upper.infinite)
443 		{
444 			/*
445 			 * Best case is inserting to (-inf, +inf) original range.
446 			 */
447 			*penalty = 0.0;
448 		}
449 		else if (orig_lower.infinite || orig_upper.infinite)
450 		{
451 			/*
452 			 * When original range is (-inf, x) or (x, +inf) it requires
453 			 * broadening of original range (extension of one bound to
454 			 * infinity).
455 			 */
456 			*penalty = INFINITE_BOUND_PENALTY;
457 		}
458 		else
459 		{
460 			/*
461 			 * Insertion to normal original range is least preferred.
462 			 */
463 			*penalty = 2 * INFINITE_BOUND_PENALTY;
464 		}
465 
466 		if (RangeIsOrContainsEmpty(orig))
467 		{
468 			/*
469 			 * Original range is narrower when it doesn't contain empty
470 			 * ranges. Add additional penalty otherwise.
471 			 */
472 			*penalty += CONTAIN_EMPTY_PENALTY;
473 		}
474 	}
475 	else if (new_lower.infinite)
476 	{
477 		/* Handle insertion of (-inf, x) range */
478 		if (!orig_empty && orig_lower.infinite)
479 		{
480 			if (orig_upper.infinite)
481 			{
482 				/*
483 				 * (-inf, +inf) range won't be extended by insertion of (-inf,
484 				 * x) range. It's a less desirable case than insertion to
485 				 * (-inf, y) original range without extension, because in that
486 				 * case original range is narrower. But we can't express that
487 				 * in single float value.
488 				 */
489 				*penalty = 0.0;
490 			}
491 			else
492 			{
493 				if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
494 				{
495 					/*
496 					 * Get extension of original range using subtype_diff. Use
497 					 * constant if subtype_diff unavailable.
498 					 */
499 					if (has_subtype_diff)
500 						*penalty = call_subtype_diff(typcache,
501 													 new_upper.val,
502 													 orig_upper.val);
503 					else
504 						*penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
505 				}
506 				else
507 				{
508 					/* No extension of original range */
509 					*penalty = 0.0;
510 				}
511 			}
512 		}
513 		else
514 		{
515 			/*
516 			 * If lower bound of original range is not -inf, then extension of
517 			 * it is infinity.
518 			 */
519 			*penalty = get_float4_infinity();
520 		}
521 	}
522 	else if (new_upper.infinite)
523 	{
524 		/* Handle insertion of (x, +inf) range */
525 		if (!orig_empty && orig_upper.infinite)
526 		{
527 			if (orig_lower.infinite)
528 			{
529 				/*
530 				 * (-inf, +inf) range won't be extended by insertion of (x,
531 				 * +inf) range. It's a less desirable case than insertion to
532 				 * (y, +inf) original range without extension, because in that
533 				 * case original range is narrower. But we can't express that
534 				 * in single float value.
535 				 */
536 				*penalty = 0.0;
537 			}
538 			else
539 			{
540 				if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
541 				{
542 					/*
543 					 * Get extension of original range using subtype_diff. Use
544 					 * constant if subtype_diff unavailable.
545 					 */
546 					if (has_subtype_diff)
547 						*penalty = call_subtype_diff(typcache,
548 													 orig_lower.val,
549 													 new_lower.val);
550 					else
551 						*penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
552 				}
553 				else
554 				{
555 					/* No extension of original range */
556 					*penalty = 0.0;
557 				}
558 			}
559 		}
560 		else
561 		{
562 			/*
563 			 * If upper bound of original range is not +inf, then extension of
564 			 * it is infinity.
565 			 */
566 			*penalty = get_float4_infinity();
567 		}
568 	}
569 	else
570 	{
571 		/* Handle insertion of normal (non-empty, non-infinite) range */
572 		if (orig_empty || orig_lower.infinite || orig_upper.infinite)
573 		{
574 			/*
575 			 * Avoid mixing normal ranges with infinite and empty ranges.
576 			 */
577 			*penalty = get_float4_infinity();
578 		}
579 		else
580 		{
581 			/*
582 			 * Calculate extension of original range by calling subtype_diff.
583 			 * Use constant if subtype_diff unavailable.
584 			 */
585 			float8		diff = 0.0;
586 
587 			if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
588 			{
589 				if (has_subtype_diff)
590 					diff += call_subtype_diff(typcache,
591 											  orig_lower.val,
592 											  new_lower.val);
593 				else
594 					diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
595 			}
596 			if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
597 			{
598 				if (has_subtype_diff)
599 					diff += call_subtype_diff(typcache,
600 											  new_upper.val,
601 											  orig_upper.val);
602 				else
603 					diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
604 			}
605 			*penalty = diff;
606 		}
607 	}
608 
609 	PG_RETURN_POINTER(penalty);
610 }
611 
612 /*
613  * The GiST PickSplit method for ranges
614  *
615  * Primarily, we try to segregate ranges of different classes.  If splitting
616  * ranges of the same class, use the appropriate split method for that class.
617  */
618 Datum
range_gist_picksplit(PG_FUNCTION_ARGS)619 range_gist_picksplit(PG_FUNCTION_ARGS)
620 {
621 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
622 	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
623 	TypeCacheEntry *typcache;
624 	OffsetNumber i;
625 	RangeType  *pred_left;
626 	int			nbytes;
627 	OffsetNumber maxoff;
628 	int			count_in_classes[CLS_COUNT];
629 	int			j;
630 	int			non_empty_classes_count = 0;
631 	int			biggest_class = -1;
632 	int			biggest_class_count = 0;
633 	int			total_count;
634 
635 	/* use first item to look up range type's info */
636 	pred_left = DatumGetRangeTypeP(entryvec->vector[FirstOffsetNumber].key);
637 	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
638 
639 	maxoff = entryvec->n - 1;
640 	nbytes = (maxoff + 1) * sizeof(OffsetNumber);
641 	v->spl_left = (OffsetNumber *) palloc(nbytes);
642 	v->spl_right = (OffsetNumber *) palloc(nbytes);
643 
644 	/*
645 	 * Get count distribution of range classes.
646 	 */
647 	memset(count_in_classes, 0, sizeof(count_in_classes));
648 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
649 	{
650 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
651 
652 		count_in_classes[get_gist_range_class(range)]++;
653 	}
654 
655 	/*
656 	 * Count non-empty classes and find biggest class.
657 	 */
658 	total_count = maxoff;
659 	for (j = 0; j < CLS_COUNT; j++)
660 	{
661 		if (count_in_classes[j] > 0)
662 		{
663 			if (count_in_classes[j] > biggest_class_count)
664 			{
665 				biggest_class_count = count_in_classes[j];
666 				biggest_class = j;
667 			}
668 			non_empty_classes_count++;
669 		}
670 	}
671 
672 	Assert(non_empty_classes_count > 0);
673 
674 	if (non_empty_classes_count == 1)
675 	{
676 		/* One non-empty class, so split inside class */
677 		if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
678 		{
679 			/* double sorting split for normal ranges */
680 			range_gist_double_sorting_split(typcache, entryvec, v);
681 		}
682 		else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
683 		{
684 			/* upper bound sorting split for (-inf, x) ranges */
685 			range_gist_single_sorting_split(typcache, entryvec, v, true);
686 		}
687 		else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
688 		{
689 			/* lower bound sorting split for (x, +inf) ranges */
690 			range_gist_single_sorting_split(typcache, entryvec, v, false);
691 		}
692 		else
693 		{
694 			/* trivial split for all (-inf, +inf) or all empty ranges */
695 			range_gist_fallback_split(typcache, entryvec, v);
696 		}
697 	}
698 	else
699 	{
700 		/*
701 		 * Class based split.
702 		 *
703 		 * To which side of the split should each class go?  Initialize them
704 		 * all to go to the left side.
705 		 */
706 		SplitLR		classes_groups[CLS_COUNT];
707 
708 		memset(classes_groups, 0, sizeof(classes_groups));
709 
710 		if (count_in_classes[CLS_NORMAL] > 0)
711 		{
712 			/* separate normal ranges if any */
713 			classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
714 		}
715 		else
716 		{
717 			/*----------
718 			 * Try to split classes in one of two ways:
719 			 *	1) containing infinities - not containing infinities
720 			 *	2) containing empty - not containing empty
721 			 *
722 			 * Select the way which balances the ranges between left and right
723 			 * the best. If split in these ways is not possible, there are at
724 			 * most 3 classes, so just separate biggest class.
725 			 *----------
726 			 */
727 			int			infCount,
728 						nonInfCount;
729 			int			emptyCount,
730 						nonEmptyCount;
731 
732 			nonInfCount =
733 				count_in_classes[CLS_NORMAL] +
734 				count_in_classes[CLS_CONTAIN_EMPTY] +
735 				count_in_classes[CLS_EMPTY];
736 			infCount = total_count - nonInfCount;
737 
738 			nonEmptyCount =
739 				count_in_classes[CLS_NORMAL] +
740 				count_in_classes[CLS_LOWER_INF] +
741 				count_in_classes[CLS_UPPER_INF] +
742 				count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
743 			emptyCount = total_count - nonEmptyCount;
744 
745 			if (infCount > 0 && nonInfCount > 0 &&
746 				(Abs(infCount - nonInfCount) <=
747 				 Abs(emptyCount - nonEmptyCount)))
748 			{
749 				classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
750 				classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
751 				classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
752 			}
753 			else if (emptyCount > 0 && nonEmptyCount > 0)
754 			{
755 				classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
756 				classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
757 				classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
758 				classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
759 			}
760 			else
761 			{
762 				/*
763 				 * Either total_count == emptyCount or total_count ==
764 				 * infCount.
765 				 */
766 				classes_groups[biggest_class] = SPLIT_RIGHT;
767 			}
768 		}
769 
770 		range_gist_class_split(typcache, entryvec, v, classes_groups);
771 	}
772 
773 	PG_RETURN_POINTER(v);
774 }
775 
776 /* equality comparator for GiST */
777 Datum
range_gist_same(PG_FUNCTION_ARGS)778 range_gist_same(PG_FUNCTION_ARGS)
779 {
780 	RangeType  *r1 = PG_GETARG_RANGE_P(0);
781 	RangeType  *r2 = PG_GETARG_RANGE_P(1);
782 	bool	   *result = (bool *) PG_GETARG_POINTER(2);
783 
784 	/*
785 	 * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check
786 	 * that for ourselves.  More generally, if the entries have been properly
787 	 * normalized, then unequal flags bytes must mean unequal ranges ... so
788 	 * let's just test all the flag bits at once.
789 	 */
790 	if (range_get_flags(r1) != range_get_flags(r2))
791 		*result = false;
792 	else
793 	{
794 		TypeCacheEntry *typcache;
795 
796 		typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
797 
798 		*result = range_eq_internal(typcache, r1, r2);
799 	}
800 
801 	PG_RETURN_POINTER(result);
802 }
803 
804 /*
805  *----------------------------------------------------------
806  * STATIC FUNCTIONS
807  *----------------------------------------------------------
808  */
809 
810 /*
811  * Return the smallest range that contains r1 and r2
812  *
813  * This differs from regular range_union in two critical ways:
814  * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb
815  * the intervening values into the result range.
816  * 2. We track whether any empty range has been union'd into the result,
817  * so that contained_by searches can be indexed.  Note that this means
818  * that *all* unions formed within the GiST index must go through here.
819  */
820 static RangeType *
range_super_union(TypeCacheEntry * typcache,RangeType * r1,RangeType * r2)821 range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
822 {
823 	RangeType  *result;
824 	RangeBound	lower1,
825 				lower2;
826 	RangeBound	upper1,
827 				upper2;
828 	bool		empty1,
829 				empty2;
830 	char		flags1,
831 				flags2;
832 	RangeBound *result_lower;
833 	RangeBound *result_upper;
834 
835 	range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
836 	range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
837 	flags1 = range_get_flags(r1);
838 	flags2 = range_get_flags(r2);
839 
840 	if (empty1)
841 	{
842 		/* We can return r2 as-is if it already is or contains empty */
843 		if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
844 			return r2;
845 		/* Else we'd better copy it (modify-in-place isn't safe) */
846 		r2 = rangeCopy(r2);
847 		range_set_contain_empty(r2);
848 		return r2;
849 	}
850 	if (empty2)
851 	{
852 		/* We can return r1 as-is if it already is or contains empty */
853 		if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
854 			return r1;
855 		/* Else we'd better copy it (modify-in-place isn't safe) */
856 		r1 = rangeCopy(r1);
857 		range_set_contain_empty(r1);
858 		return r1;
859 	}
860 
861 	if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
862 		result_lower = &lower1;
863 	else
864 		result_lower = &lower2;
865 
866 	if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
867 		result_upper = &upper1;
868 	else
869 		result_upper = &upper2;
870 
871 	/* optimization to avoid constructing a new range */
872 	if (result_lower == &lower1 && result_upper == &upper1 &&
873 		((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY)))
874 		return r1;
875 	if (result_lower == &lower2 && result_upper == &upper2 &&
876 		((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY)))
877 		return r2;
878 
879 	result = make_range(typcache, result_lower, result_upper, false);
880 
881 	if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY))
882 		range_set_contain_empty(result);
883 
884 	return result;
885 }
886 
887 static bool
multirange_union_range_equal(TypeCacheEntry * typcache,const RangeType * r,const MultirangeType * mr)888 multirange_union_range_equal(TypeCacheEntry *typcache,
889 							 const RangeType *r,
890 							 const MultirangeType *mr)
891 {
892 	RangeBound	lower1,
893 				upper1,
894 				lower2,
895 				upper2,
896 				tmp;
897 	bool		empty;
898 
899 	if (RangeIsEmpty(r) || MultirangeIsEmpty(mr))
900 		return (RangeIsEmpty(r) && MultirangeIsEmpty(mr));
901 
902 	range_deserialize(typcache, r, &lower1, &upper1, &empty);
903 	Assert(!empty);
904 	multirange_get_bounds(typcache, mr, 0, &lower2, &tmp);
905 	multirange_get_bounds(typcache, mr, mr->rangeCount - 1, &tmp, &upper2);
906 
907 	return (range_cmp_bounds(typcache, &lower1, &lower2) == 0 &&
908 			range_cmp_bounds(typcache, &upper1, &upper2) == 0);
909 }
910 
911 /*
912  * GiST consistent test on an index internal page with range query
913  */
914 static bool
range_gist_consistent_int_range(TypeCacheEntry * typcache,StrategyNumber strategy,const RangeType * key,const RangeType * query)915 range_gist_consistent_int_range(TypeCacheEntry *typcache,
916 								StrategyNumber strategy,
917 								const RangeType *key,
918 								const RangeType *query)
919 {
920 	switch (strategy)
921 	{
922 		case RANGESTRAT_BEFORE:
923 			if (RangeIsEmpty(key) || RangeIsEmpty(query))
924 				return false;
925 			return (!range_overright_internal(typcache, key, query));
926 		case RANGESTRAT_OVERLEFT:
927 			if (RangeIsEmpty(key) || RangeIsEmpty(query))
928 				return false;
929 			return (!range_after_internal(typcache, key, query));
930 		case RANGESTRAT_OVERLAPS:
931 			return range_overlaps_internal(typcache, key, query);
932 		case RANGESTRAT_OVERRIGHT:
933 			if (RangeIsEmpty(key) || RangeIsEmpty(query))
934 				return false;
935 			return (!range_before_internal(typcache, key, query));
936 		case RANGESTRAT_AFTER:
937 			if (RangeIsEmpty(key) || RangeIsEmpty(query))
938 				return false;
939 			return (!range_overleft_internal(typcache, key, query));
940 		case RANGESTRAT_ADJACENT:
941 			if (RangeIsEmpty(key) || RangeIsEmpty(query))
942 				return false;
943 			if (range_adjacent_internal(typcache, key, query))
944 				return true;
945 			return range_overlaps_internal(typcache, key, query);
946 		case RANGESTRAT_CONTAINS:
947 			return range_contains_internal(typcache, key, query);
948 		case RANGESTRAT_CONTAINED_BY:
949 
950 			/*
951 			 * Empty ranges are contained by anything, so if key is or
952 			 * contains any empty ranges, we must descend into it.  Otherwise,
953 			 * descend only if key overlaps the query.
954 			 */
955 			if (RangeIsOrContainsEmpty(key))
956 				return true;
957 			return range_overlaps_internal(typcache, key, query);
958 		case RANGESTRAT_EQ:
959 
960 			/*
961 			 * If query is empty, descend only if the key is or contains any
962 			 * empty ranges.  Otherwise, descend if key contains query.
963 			 */
964 			if (RangeIsEmpty(query))
965 				return RangeIsOrContainsEmpty(key);
966 			return range_contains_internal(typcache, key, query);
967 		default:
968 			elog(ERROR, "unrecognized range strategy: %d", strategy);
969 			return false;		/* keep compiler quiet */
970 	}
971 }
972 
973 /*
974  * GiST consistent test on an index internal page with multirange query
975  */
976 static bool
range_gist_consistent_int_multirange(TypeCacheEntry * typcache,StrategyNumber strategy,const RangeType * key,const MultirangeType * query)977 range_gist_consistent_int_multirange(TypeCacheEntry *typcache,
978 									 StrategyNumber strategy,
979 									 const RangeType *key,
980 									 const MultirangeType *query)
981 {
982 	switch (strategy)
983 	{
984 		case RANGESTRAT_BEFORE:
985 			if (RangeIsEmpty(key) || MultirangeIsEmpty(query))
986 				return false;
987 			return (!range_overright_multirange_internal(typcache, key, query));
988 		case RANGESTRAT_OVERLEFT:
989 			if (RangeIsEmpty(key) || MultirangeIsEmpty(query))
990 				return false;
991 			return (!range_after_multirange_internal(typcache, key, query));
992 		case RANGESTRAT_OVERLAPS:
993 			return range_overlaps_multirange_internal(typcache, key, query);
994 		case RANGESTRAT_OVERRIGHT:
995 			if (RangeIsEmpty(key) || MultirangeIsEmpty(query))
996 				return false;
997 			return (!range_before_multirange_internal(typcache, key, query));
998 		case RANGESTRAT_AFTER:
999 			if (RangeIsEmpty(key) || MultirangeIsEmpty(query))
1000 				return false;
1001 			return (!range_overleft_multirange_internal(typcache, key, query));
1002 		case RANGESTRAT_ADJACENT:
1003 			if (RangeIsEmpty(key) || MultirangeIsEmpty(query))
1004 				return false;
1005 			if (range_adjacent_multirange_internal(typcache, key, query))
1006 				return true;
1007 			return range_overlaps_multirange_internal(typcache, key, query);
1008 		case RANGESTRAT_CONTAINS:
1009 			return range_contains_multirange_internal(typcache, key, query);
1010 		case RANGESTRAT_CONTAINED_BY:
1011 
1012 			/*
1013 			 * Empty ranges are contained by anything, so if key is or
1014 			 * contains any empty ranges, we must descend into it.  Otherwise,
1015 			 * descend only if key overlaps the query.
1016 			 */
1017 			if (RangeIsOrContainsEmpty(key))
1018 				return true;
1019 			return range_overlaps_multirange_internal(typcache, key, query);
1020 		case RANGESTRAT_EQ:
1021 
1022 			/*
1023 			 * If query is empty, descend only if the key is or contains any
1024 			 * empty ranges.  Otherwise, descend if key contains query.
1025 			 */
1026 			if (MultirangeIsEmpty(query))
1027 				return RangeIsOrContainsEmpty(key);
1028 			return range_contains_multirange_internal(typcache, key, query);
1029 		default:
1030 			elog(ERROR, "unrecognized range strategy: %d", strategy);
1031 			return false;		/* keep compiler quiet */
1032 	}
1033 }
1034 
1035 /*
1036  * GiST consistent test on an index internal page with element query
1037  */
1038 static bool
range_gist_consistent_int_element(TypeCacheEntry * typcache,StrategyNumber strategy,const RangeType * key,Datum query)1039 range_gist_consistent_int_element(TypeCacheEntry *typcache,
1040 								  StrategyNumber strategy,
1041 								  const RangeType *key,
1042 								  Datum query)
1043 {
1044 	switch (strategy)
1045 	{
1046 		case RANGESTRAT_CONTAINS_ELEM:
1047 			return range_contains_elem_internal(typcache, key, query);
1048 		default:
1049 			elog(ERROR, "unrecognized range strategy: %d", strategy);
1050 			return false;		/* keep compiler quiet */
1051 	}
1052 }
1053 
1054 /*
1055  * GiST consistent test on an index leaf page with range query
1056  */
1057 static bool
range_gist_consistent_leaf_range(TypeCacheEntry * typcache,StrategyNumber strategy,const RangeType * key,const RangeType * query)1058 range_gist_consistent_leaf_range(TypeCacheEntry *typcache,
1059 								 StrategyNumber strategy,
1060 								 const RangeType *key,
1061 								 const RangeType *query)
1062 {
1063 	switch (strategy)
1064 	{
1065 		case RANGESTRAT_BEFORE:
1066 			return range_before_internal(typcache, key, query);
1067 		case RANGESTRAT_OVERLEFT:
1068 			return range_overleft_internal(typcache, key, query);
1069 		case RANGESTRAT_OVERLAPS:
1070 			return range_overlaps_internal(typcache, key, query);
1071 		case RANGESTRAT_OVERRIGHT:
1072 			return range_overright_internal(typcache, key, query);
1073 		case RANGESTRAT_AFTER:
1074 			return range_after_internal(typcache, key, query);
1075 		case RANGESTRAT_ADJACENT:
1076 			return range_adjacent_internal(typcache, key, query);
1077 		case RANGESTRAT_CONTAINS:
1078 			return range_contains_internal(typcache, key, query);
1079 		case RANGESTRAT_CONTAINED_BY:
1080 			return range_contained_by_internal(typcache, key, query);
1081 		case RANGESTRAT_EQ:
1082 			return range_eq_internal(typcache, key, query);
1083 		default:
1084 			elog(ERROR, "unrecognized range strategy: %d", strategy);
1085 			return false;		/* keep compiler quiet */
1086 	}
1087 }
1088 
1089 /*
1090  * GiST consistent test on an index leaf page with multirange query
1091  */
1092 static bool
range_gist_consistent_leaf_multirange(TypeCacheEntry * typcache,StrategyNumber strategy,const RangeType * key,const MultirangeType * query)1093 range_gist_consistent_leaf_multirange(TypeCacheEntry *typcache,
1094 									  StrategyNumber strategy,
1095 									  const RangeType *key,
1096 									  const MultirangeType *query)
1097 {
1098 	switch (strategy)
1099 	{
1100 		case RANGESTRAT_BEFORE:
1101 			return range_before_multirange_internal(typcache, key, query);
1102 		case RANGESTRAT_OVERLEFT:
1103 			return range_overleft_multirange_internal(typcache, key, query);
1104 		case RANGESTRAT_OVERLAPS:
1105 			return range_overlaps_multirange_internal(typcache, key, query);
1106 		case RANGESTRAT_OVERRIGHT:
1107 			return range_overright_multirange_internal(typcache, key, query);
1108 		case RANGESTRAT_AFTER:
1109 			return range_after_multirange_internal(typcache, key, query);
1110 		case RANGESTRAT_ADJACENT:
1111 			return range_adjacent_multirange_internal(typcache, key, query);
1112 		case RANGESTRAT_CONTAINS:
1113 			return range_contains_multirange_internal(typcache, key, query);
1114 		case RANGESTRAT_CONTAINED_BY:
1115 			return multirange_contains_range_internal(typcache, query, key);
1116 		case RANGESTRAT_EQ:
1117 			return multirange_union_range_equal(typcache, key, query);
1118 		default:
1119 			elog(ERROR, "unrecognized range strategy: %d", strategy);
1120 			return false;		/* keep compiler quiet */
1121 	}
1122 }
1123 
1124 /*
1125  * GiST consistent test on an index leaf page with element query
1126  */
1127 static bool
range_gist_consistent_leaf_element(TypeCacheEntry * typcache,StrategyNumber strategy,const RangeType * key,Datum query)1128 range_gist_consistent_leaf_element(TypeCacheEntry *typcache,
1129 								   StrategyNumber strategy,
1130 								   const RangeType *key,
1131 								   Datum query)
1132 {
1133 	switch (strategy)
1134 	{
1135 		case RANGESTRAT_CONTAINS_ELEM:
1136 			return range_contains_elem_internal(typcache, key, query);
1137 		default:
1138 			elog(ERROR, "unrecognized range strategy: %d", strategy);
1139 			return false;		/* keep compiler quiet */
1140 	}
1141 }
1142 
1143 /*
1144  * Trivial split: half of entries will be placed on one page
1145  * and the other half on the other page.
1146  */
1147 static void
range_gist_fallback_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v)1148 range_gist_fallback_split(TypeCacheEntry *typcache,
1149 						  GistEntryVector *entryvec,
1150 						  GIST_SPLITVEC *v)
1151 {
1152 	RangeType  *left_range = NULL;
1153 	RangeType  *right_range = NULL;
1154 	OffsetNumber i,
1155 				maxoff,
1156 				split_idx;
1157 
1158 	maxoff = entryvec->n - 1;
1159 	/* Split entries before this to left page, after to right: */
1160 	split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
1161 
1162 	v->spl_nleft = 0;
1163 	v->spl_nright = 0;
1164 	for (i = FirstOffsetNumber; i <= maxoff; i++)
1165 	{
1166 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
1167 
1168 		if (i < split_idx)
1169 			PLACE_LEFT(range, i);
1170 		else
1171 			PLACE_RIGHT(range, i);
1172 	}
1173 
1174 	v->spl_ldatum = RangeTypePGetDatum(left_range);
1175 	v->spl_rdatum = RangeTypePGetDatum(right_range);
1176 }
1177 
1178 /*
1179  * Split based on classes of ranges.
1180  *
1181  * See get_gist_range_class for class definitions.
1182  * classes_groups is an array of length CLS_COUNT indicating the side of the
1183  * split to which each class should go.
1184  */
1185 static void
range_gist_class_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v,SplitLR * classes_groups)1186 range_gist_class_split(TypeCacheEntry *typcache,
1187 					   GistEntryVector *entryvec,
1188 					   GIST_SPLITVEC *v,
1189 					   SplitLR *classes_groups)
1190 {
1191 	RangeType  *left_range = NULL;
1192 	RangeType  *right_range = NULL;
1193 	OffsetNumber i,
1194 				maxoff;
1195 
1196 	maxoff = entryvec->n - 1;
1197 
1198 	v->spl_nleft = 0;
1199 	v->spl_nright = 0;
1200 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1201 	{
1202 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
1203 		int			class;
1204 
1205 		/* Get class of range */
1206 		class = get_gist_range_class(range);
1207 
1208 		/* Place range to appropriate page */
1209 		if (classes_groups[class] == SPLIT_LEFT)
1210 			PLACE_LEFT(range, i);
1211 		else
1212 		{
1213 			Assert(classes_groups[class] == SPLIT_RIGHT);
1214 			PLACE_RIGHT(range, i);
1215 		}
1216 	}
1217 
1218 	v->spl_ldatum = RangeTypePGetDatum(left_range);
1219 	v->spl_rdatum = RangeTypePGetDatum(right_range);
1220 }
1221 
1222 /*
1223  * Sorting based split. First half of entries according to the sort will be
1224  * placed to one page, and second half of entries will be placed to other
1225  * page. use_upper_bound parameter indicates whether to use upper or lower
1226  * bound for sorting.
1227  */
1228 static void
range_gist_single_sorting_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v,bool use_upper_bound)1229 range_gist_single_sorting_split(TypeCacheEntry *typcache,
1230 								GistEntryVector *entryvec,
1231 								GIST_SPLITVEC *v,
1232 								bool use_upper_bound)
1233 {
1234 	SingleBoundSortItem *sortItems;
1235 	RangeType  *left_range = NULL;
1236 	RangeType  *right_range = NULL;
1237 	OffsetNumber i,
1238 				maxoff,
1239 				split_idx;
1240 
1241 	maxoff = entryvec->n - 1;
1242 
1243 	sortItems = (SingleBoundSortItem *)
1244 		palloc(maxoff * sizeof(SingleBoundSortItem));
1245 
1246 	/*
1247 	 * Prepare auxiliary array and sort the values.
1248 	 */
1249 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1250 	{
1251 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
1252 		RangeBound	bound2;
1253 		bool		empty;
1254 
1255 		sortItems[i - 1].index = i;
1256 		/* Put appropriate bound into array */
1257 		if (use_upper_bound)
1258 			range_deserialize(typcache, range, &bound2,
1259 							  &sortItems[i - 1].bound, &empty);
1260 		else
1261 			range_deserialize(typcache, range, &sortItems[i - 1].bound,
1262 							  &bound2, &empty);
1263 		Assert(!empty);
1264 	}
1265 
1266 	qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
1267 			  single_bound_cmp, typcache);
1268 
1269 	split_idx = maxoff / 2;
1270 
1271 	v->spl_nleft = 0;
1272 	v->spl_nright = 0;
1273 
1274 	for (i = 0; i < maxoff; i++)
1275 	{
1276 		int			idx = sortItems[i].index;
1277 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[idx].key);
1278 
1279 		if (i < split_idx)
1280 			PLACE_LEFT(range, idx);
1281 		else
1282 			PLACE_RIGHT(range, idx);
1283 	}
1284 
1285 	v->spl_ldatum = RangeTypePGetDatum(left_range);
1286 	v->spl_rdatum = RangeTypePGetDatum(right_range);
1287 }
1288 
1289 /*
1290  * Double sorting split algorithm.
1291  *
1292  * The algorithm considers dividing ranges into two groups. The first (left)
1293  * group contains general left bound. The second (right) group contains
1294  * general right bound. The challenge is to find upper bound of left group
1295  * and lower bound of right group so that overlap of groups is minimal and
1296  * ratio of distribution is acceptable. Algorithm finds for each lower bound of
1297  * right group minimal upper bound of left group, and for each upper bound of
1298  * left group maximal lower bound of right group. For each found pair
1299  * range_gist_consider_split considers replacement of currently selected
1300  * split with the new one.
1301  *
1302  * After that, all the entries are divided into three groups:
1303  * 1) Entries which should be placed to the left group
1304  * 2) Entries which should be placed to the right group
1305  * 3) "Common entries" which can be placed to either group without affecting
1306  *	  amount of overlap.
1307  *
1308  * The common ranges are distributed by difference of distance from lower
1309  * bound of common range to lower bound of right group and distance from upper
1310  * bound of common range to upper bound of left group.
1311  *
1312  * For details see:
1313  * "A new double sorting-based node splitting algorithm for R-tree",
1314  * A. Korotkov
1315  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
1316  */
1317 static void
range_gist_double_sorting_split(TypeCacheEntry * typcache,GistEntryVector * entryvec,GIST_SPLITVEC * v)1318 range_gist_double_sorting_split(TypeCacheEntry *typcache,
1319 								GistEntryVector *entryvec,
1320 								GIST_SPLITVEC *v)
1321 {
1322 	ConsiderSplitContext context;
1323 	OffsetNumber i,
1324 				maxoff;
1325 	RangeType  *range,
1326 			   *left_range = NULL,
1327 			   *right_range = NULL;
1328 	int			common_entries_count;
1329 	NonEmptyRange *by_lower,
1330 			   *by_upper;
1331 	CommonEntry *common_entries;
1332 	int			nentries,
1333 				i1,
1334 				i2;
1335 	RangeBound *right_lower,
1336 			   *left_upper;
1337 
1338 	memset(&context, 0, sizeof(ConsiderSplitContext));
1339 	context.typcache = typcache;
1340 	context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
1341 
1342 	maxoff = entryvec->n - 1;
1343 	nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
1344 	context.first = true;
1345 
1346 	/* Allocate arrays for sorted range bounds */
1347 	by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1348 	by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
1349 
1350 	/* Fill arrays of bounds */
1351 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1352 	{
1353 		RangeType  *range = DatumGetRangeTypeP(entryvec->vector[i].key);
1354 		bool		empty;
1355 
1356 		range_deserialize(typcache, range,
1357 						  &by_lower[i - FirstOffsetNumber].lower,
1358 						  &by_lower[i - FirstOffsetNumber].upper,
1359 						  &empty);
1360 		Assert(!empty);
1361 	}
1362 
1363 	/*
1364 	 * Make two arrays of range bounds: one sorted by lower bound and another
1365 	 * sorted by upper bound.
1366 	 */
1367 	memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
1368 	qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
1369 			  interval_cmp_lower, typcache);
1370 	qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
1371 			  interval_cmp_upper, typcache);
1372 
1373 	/*----------
1374 	 * The goal is to form a left and right range, so that every entry
1375 	 * range is contained by either left or right interval (or both).
1376 	 *
1377 	 * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
1378 	 *
1379 	 * 0 1 2 3 4
1380 	 * +-+
1381 	 *	 +---+
1382 	 *	   +-+
1383 	 *	   +---+
1384 	 *
1385 	 * The left and right ranges are of the form (0,a) and (b,4).
1386 	 * We first consider splits where b is the lower bound of an entry.
1387 	 * We iterate through all entries, and for each b, calculate the
1388 	 * smallest possible a. Then we consider splits where a is the
1389 	 * upper bound of an entry, and for each a, calculate the greatest
1390 	 * possible b.
1391 	 *
1392 	 * In the above example, the first loop would consider splits:
1393 	 * b=0: (0,1)-(0,4)
1394 	 * b=1: (0,1)-(1,4)
1395 	 * b=2: (0,3)-(2,4)
1396 	 *
1397 	 * And the second loop:
1398 	 * a=1: (0,1)-(1,4)
1399 	 * a=3: (0,3)-(2,4)
1400 	 * a=4: (0,4)-(2,4)
1401 	 *----------
1402 	 */
1403 
1404 	/*
1405 	 * Iterate over lower bound of right group, finding smallest possible
1406 	 * upper bound of left group.
1407 	 */
1408 	i1 = 0;
1409 	i2 = 0;
1410 	right_lower = &by_lower[i1].lower;
1411 	left_upper = &by_upper[i2].lower;
1412 	while (true)
1413 	{
1414 		/*
1415 		 * Find next lower bound of right group.
1416 		 */
1417 		while (i1 < nentries &&
1418 			   range_cmp_bounds(typcache, right_lower,
1419 								&by_lower[i1].lower) == 0)
1420 		{
1421 			if (range_cmp_bounds(typcache, &by_lower[i1].upper,
1422 								 left_upper) > 0)
1423 				left_upper = &by_lower[i1].upper;
1424 			i1++;
1425 		}
1426 		if (i1 >= nentries)
1427 			break;
1428 		right_lower = &by_lower[i1].lower;
1429 
1430 		/*
1431 		 * Find count of ranges which anyway should be placed to the left
1432 		 * group.
1433 		 */
1434 		while (i2 < nentries &&
1435 			   range_cmp_bounds(typcache, &by_upper[i2].upper,
1436 								left_upper) <= 0)
1437 			i2++;
1438 
1439 		/*
1440 		 * Consider found split to see if it's better than what we had.
1441 		 */
1442 		range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
1443 	}
1444 
1445 	/*
1446 	 * Iterate over upper bound of left group finding greatest possible lower
1447 	 * bound of right group.
1448 	 */
1449 	i1 = nentries - 1;
1450 	i2 = nentries - 1;
1451 	right_lower = &by_lower[i1].upper;
1452 	left_upper = &by_upper[i2].upper;
1453 	while (true)
1454 	{
1455 		/*
1456 		 * Find next upper bound of left group.
1457 		 */
1458 		while (i2 >= 0 &&
1459 			   range_cmp_bounds(typcache, left_upper,
1460 								&by_upper[i2].upper) == 0)
1461 		{
1462 			if (range_cmp_bounds(typcache, &by_upper[i2].lower,
1463 								 right_lower) < 0)
1464 				right_lower = &by_upper[i2].lower;
1465 			i2--;
1466 		}
1467 		if (i2 < 0)
1468 			break;
1469 		left_upper = &by_upper[i2].upper;
1470 
1471 		/*
1472 		 * Find count of intervals which anyway should be placed to the right
1473 		 * group.
1474 		 */
1475 		while (i1 >= 0 &&
1476 			   range_cmp_bounds(typcache, &by_lower[i1].lower,
1477 								right_lower) >= 0)
1478 			i1--;
1479 
1480 		/*
1481 		 * Consider found split to see if it's better than what we had.
1482 		 */
1483 		range_gist_consider_split(&context, right_lower, i1 + 1,
1484 								  left_upper, i2 + 1);
1485 	}
1486 
1487 	/*
1488 	 * If we failed to find any acceptable splits, use trivial split.
1489 	 */
1490 	if (context.first)
1491 	{
1492 		range_gist_fallback_split(typcache, entryvec, v);
1493 		return;
1494 	}
1495 
1496 	/*
1497 	 * Ok, we have now selected bounds of the groups. Now we have to
1498 	 * distribute entries themselves. At first we distribute entries which can
1499 	 * be placed unambiguously and collect "common entries" to array.
1500 	 */
1501 
1502 	/* Allocate vectors for results */
1503 	v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1504 	v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
1505 	v->spl_nleft = 0;
1506 	v->spl_nright = 0;
1507 
1508 	/*
1509 	 * Allocate an array for "common entries" - entries which can be placed to
1510 	 * either group without affecting overlap along selected axis.
1511 	 */
1512 	common_entries_count = 0;
1513 	common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
1514 
1515 	/*
1516 	 * Distribute entries which can be distributed unambiguously, and collect
1517 	 * common entries.
1518 	 */
1519 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
1520 	{
1521 		RangeBound	lower,
1522 					upper;
1523 		bool		empty;
1524 
1525 		/*
1526 		 * Get upper and lower bounds along selected axis.
1527 		 */
1528 		range = DatumGetRangeTypeP(entryvec->vector[i].key);
1529 
1530 		range_deserialize(typcache, range, &lower, &upper, &empty);
1531 
1532 		if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
1533 		{
1534 			/* Fits in the left group */
1535 			if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
1536 			{
1537 				/* Fits also in the right group, so "common entry" */
1538 				common_entries[common_entries_count].index = i;
1539 				if (context.has_subtype_diff)
1540 				{
1541 					/*
1542 					 * delta = (lower - context.right_lower) -
1543 					 * (context.left_upper - upper)
1544 					 */
1545 					common_entries[common_entries_count].delta =
1546 						call_subtype_diff(typcache,
1547 										  lower.val,
1548 										  context.right_lower->val) -
1549 						call_subtype_diff(typcache,
1550 										  context.left_upper->val,
1551 										  upper.val);
1552 				}
1553 				else
1554 				{
1555 					/* Without subtype_diff, take all deltas as zero */
1556 					common_entries[common_entries_count].delta = 0;
1557 				}
1558 				common_entries_count++;
1559 			}
1560 			else
1561 			{
1562 				/* Doesn't fit to the right group, so join to the left group */
1563 				PLACE_LEFT(range, i);
1564 			}
1565 		}
1566 		else
1567 		{
1568 			/*
1569 			 * Each entry should fit on either left or right group. Since this
1570 			 * entry didn't fit in the left group, it better fit in the right
1571 			 * group.
1572 			 */
1573 			Assert(range_cmp_bounds(typcache, &lower,
1574 									context.right_lower) >= 0);
1575 			PLACE_RIGHT(range, i);
1576 		}
1577 	}
1578 
1579 	/*
1580 	 * Distribute "common entries", if any.
1581 	 */
1582 	if (common_entries_count > 0)
1583 	{
1584 		/*
1585 		 * Sort "common entries" by calculated deltas in order to distribute
1586 		 * the most ambiguous entries first.
1587 		 */
1588 		qsort(common_entries, common_entries_count, sizeof(CommonEntry),
1589 			  common_entry_cmp);
1590 
1591 		/*
1592 		 * Distribute "common entries" between groups according to sorting.
1593 		 */
1594 		for (i = 0; i < common_entries_count; i++)
1595 		{
1596 			int			idx = common_entries[i].index;
1597 
1598 			range = DatumGetRangeTypeP(entryvec->vector[idx].key);
1599 
1600 			/*
1601 			 * Check if we have to place this entry in either group to achieve
1602 			 * LIMIT_RATIO.
1603 			 */
1604 			if (i < context.common_left)
1605 				PLACE_LEFT(range, idx);
1606 			else
1607 				PLACE_RIGHT(range, idx);
1608 		}
1609 	}
1610 
1611 	v->spl_ldatum = PointerGetDatum(left_range);
1612 	v->spl_rdatum = PointerGetDatum(right_range);
1613 }
1614 
1615 /*
1616  * Consider replacement of currently selected split with a better one
1617  * during range_gist_double_sorting_split.
1618  */
1619 static void
range_gist_consider_split(ConsiderSplitContext * context,RangeBound * right_lower,int min_left_count,RangeBound * left_upper,int max_left_count)1620 range_gist_consider_split(ConsiderSplitContext *context,
1621 						  RangeBound *right_lower, int min_left_count,
1622 						  RangeBound *left_upper, int max_left_count)
1623 {
1624 	int			left_count,
1625 				right_count;
1626 	float4		ratio,
1627 				overlap;
1628 
1629 	/*
1630 	 * Calculate entries distribution ratio assuming most uniform distribution
1631 	 * of common entries.
1632 	 */
1633 	if (min_left_count >= (context->entries_count + 1) / 2)
1634 		left_count = min_left_count;
1635 	else if (max_left_count <= context->entries_count / 2)
1636 		left_count = max_left_count;
1637 	else
1638 		left_count = context->entries_count / 2;
1639 	right_count = context->entries_count - left_count;
1640 
1641 	/*
1642 	 * Ratio of split: quotient between size of smaller group and total
1643 	 * entries count.  This is necessarily 0.5 or less; if it's less than
1644 	 * LIMIT_RATIO then we will never accept the new split.
1645 	 */
1646 	ratio = ((float4) Min(left_count, right_count)) /
1647 		((float4) context->entries_count);
1648 
1649 	if (ratio > LIMIT_RATIO)
1650 	{
1651 		bool		selectthis = false;
1652 
1653 		/*
1654 		 * The ratio is acceptable, so compare current split with previously
1655 		 * selected one. We search for minimal overlap (allowing negative
1656 		 * values) and minimal ratio secondarily.  If subtype_diff is
1657 		 * available, it's used for overlap measure.  Without subtype_diff we
1658 		 * use number of "common entries" as an overlap measure.
1659 		 */
1660 		if (context->has_subtype_diff)
1661 			overlap = call_subtype_diff(context->typcache,
1662 										left_upper->val,
1663 										right_lower->val);
1664 		else
1665 			overlap = max_left_count - min_left_count;
1666 
1667 		/* If there is no previous selection, select this split */
1668 		if (context->first)
1669 			selectthis = true;
1670 		else
1671 		{
1672 			/*
1673 			 * Choose the new split if it has a smaller overlap, or same
1674 			 * overlap but better ratio.
1675 			 */
1676 			if (overlap < context->overlap ||
1677 				(overlap == context->overlap && ratio > context->ratio))
1678 				selectthis = true;
1679 		}
1680 
1681 		if (selectthis)
1682 		{
1683 			/* save information about selected split */
1684 			context->first = false;
1685 			context->ratio = ratio;
1686 			context->overlap = overlap;
1687 			context->right_lower = right_lower;
1688 			context->left_upper = left_upper;
1689 			context->common_left = max_left_count - left_count;
1690 			context->common_right = left_count - min_left_count;
1691 		}
1692 	}
1693 }
1694 
1695 /*
1696  * Find class number for range.
1697  *
1698  * The class number is a valid combination of the properties of the
1699  * range.  Note: the highest possible number is 8, because CLS_EMPTY
1700  * can't be combined with anything else.
1701  */
1702 static int
get_gist_range_class(RangeType * range)1703 get_gist_range_class(RangeType *range)
1704 {
1705 	int			classNumber;
1706 	char		flags;
1707 
1708 	flags = range_get_flags(range);
1709 	if (flags & RANGE_EMPTY)
1710 	{
1711 		classNumber = CLS_EMPTY;
1712 	}
1713 	else
1714 	{
1715 		classNumber = 0;
1716 		if (flags & RANGE_LB_INF)
1717 			classNumber |= CLS_LOWER_INF;
1718 		if (flags & RANGE_UB_INF)
1719 			classNumber |= CLS_UPPER_INF;
1720 		if (flags & RANGE_CONTAIN_EMPTY)
1721 			classNumber |= CLS_CONTAIN_EMPTY;
1722 	}
1723 	return classNumber;
1724 }
1725 
1726 /*
1727  * Comparison function for range_gist_single_sorting_split.
1728  */
1729 static int
single_bound_cmp(const void * a,const void * b,void * arg)1730 single_bound_cmp(const void *a, const void *b, void *arg)
1731 {
1732 	SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
1733 	SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
1734 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1735 
1736 	return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
1737 }
1738 
1739 /*
1740  * Compare NonEmptyRanges by lower bound.
1741  */
1742 static int
interval_cmp_lower(const void * a,const void * b,void * arg)1743 interval_cmp_lower(const void *a, const void *b, void *arg)
1744 {
1745 	NonEmptyRange *i1 = (NonEmptyRange *) a;
1746 	NonEmptyRange *i2 = (NonEmptyRange *) b;
1747 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1748 
1749 	return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
1750 }
1751 
1752 /*
1753  * Compare NonEmptyRanges by upper bound.
1754  */
1755 static int
interval_cmp_upper(const void * a,const void * b,void * arg)1756 interval_cmp_upper(const void *a, const void *b, void *arg)
1757 {
1758 	NonEmptyRange *i1 = (NonEmptyRange *) a;
1759 	NonEmptyRange *i2 = (NonEmptyRange *) b;
1760 	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
1761 
1762 	return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
1763 }
1764 
1765 /*
1766  * Compare CommonEntrys by their deltas.
1767  */
1768 static int
common_entry_cmp(const void * i1,const void * i2)1769 common_entry_cmp(const void *i1, const void *i2)
1770 {
1771 	double		delta1 = ((CommonEntry *) i1)->delta;
1772 	double		delta2 = ((CommonEntry *) i2)->delta;
1773 
1774 	if (delta1 < delta2)
1775 		return -1;
1776 	else if (delta1 > delta2)
1777 		return 1;
1778 	else
1779 		return 0;
1780 }
1781 
1782 /*
1783  * Convenience function to invoke type-specific subtype_diff function.
1784  * Caller must have already checked that there is one for the range type.
1785  */
1786 static float8
call_subtype_diff(TypeCacheEntry * typcache,Datum val1,Datum val2)1787 call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
1788 {
1789 	float8		value;
1790 
1791 	value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
1792 											 typcache->rng_collation,
1793 											 val1, val2));
1794 	/* Cope with buggy subtype_diff function by returning zero */
1795 	if (value >= 0.0)
1796 		return value;
1797 	return 0.0;
1798 }
1799