diff --git a/src/backend/utils/adt/rangetypes_gist.c b/src/backend/utils/adt/rangetypes_gist.c index 4267dc8cb6..87f71e6812 100644 --- a/src/backend/utils/adt/rangetypes_gist.c +++ b/src/backend/utils/adt/rangetypes_gist.c @@ -34,21 +34,118 @@ #define RANGESTRAT_CONTAINS_ELEM 16 #define RANGESTRAT_EQ 18 +/* + * Range class properties used to segregate different classes of ranges in + * GiST. Each unique combination of properties is a class. CLS_EMPTY cannot + * be combined with anything else. + */ +#define CLS_NORMAL 0 /* Ordinary finite range (no bits set) */ +#define CLS_LOWER_INF 1 /* Lower bound is infinity */ +#define CLS_UPPER_INF 2 /* Upper bound is infinity */ +#define CLS_CONTAIN_EMPTY 4 /* Contains underlying empty ranges */ +#define CLS_EMPTY 8 /* Special class for empty ranges */ + +#define CLS_COUNT 9 /* # of classes; includes all combinations of + * properties. CLS_EMPTY doesn't combine with + * anything else, so it's only 2^3 + 1. */ + +/* + * Minimum accepted ratio of split for items of the same class. If the items + * are of different classes, we will separate along those lines regardless of + * the ratio. + */ +#define LIMIT_RATIO 0.3 + +/* Constants for fixed penalty values */ +#define INFINITE_BOUND_PENALTY 2.0 +#define CONTAIN_EMPTY_PENALTY 1.0 +#define DEFAULT_SUBTYPE_DIFF_PENALTY 1.0 + +/* + * Per-item data for range_gist_single_sorting_split. + */ +typedef struct +{ + int index; + RangeBound bound; +} SingleBoundSortItem; + +/* place on left or right side of split? */ +typedef enum +{ + SPLIT_LEFT = 0, /* makes initialization to SPLIT_LEFT easier */ + SPLIT_RIGHT +} SplitLR; + +/* + * Context for range_gist_consider_split. + */ +typedef struct +{ + TypeCacheEntry *typcache; /* typcache for range type */ + bool has_subtype_diff; /* does it have subtype_diff? */ + int entries_count; /* total number of entries being split */ + + /* Information about currently selected split follows */ + + bool first; /* true if no split was selected yet */ + + RangeBound *left_upper; /* upper bound of left interval */ + RangeBound *right_lower; /* lower bound of right interval */ + + float4 ratio; /* split ratio */ + float4 overlap; /* overlap between left and right predicate */ + int common_left; /* # common entries destined for each side */ + int common_right; +} ConsiderSplitContext; + +/* + * Bounds extracted from a non-empty range, for use in + * range_gist_double_sorting_split. + */ +typedef struct +{ + RangeBound lower; + RangeBound upper; +} NonEmptyRange; + +/* + * Represents information about an entry that can be placed in either group + * without affecting overlap over selected axis ("common entry"). + */ +typedef struct +{ + /* Index of entry in the initial array */ + int index; + /* Delta between closeness of range to each of the two groups */ + double delta; +} CommonEntry; + +/* Helper macros to place an entry in the left or right group during split */ +/* Note direct access to variables v, typcache, left_range, right_range */ +#define PLACE_LEFT(range, off) \ + do { \ + if (v->spl_nleft > 0) \ + left_range = range_super_union(typcache, left_range, range); \ + else \ + left_range = (range); \ + v->spl_left[v->spl_nleft++] = (off); \ + } while(0) + +#define PLACE_RIGHT(range, off) \ + do { \ + if (v->spl_nright > 0) \ + right_range = range_super_union(typcache, right_range, range); \ + else \ + right_range = (range); \ + v->spl_right[v->spl_nright++] = (off); \ + } while(0) + /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */ #define rangeCopy(r) \ ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \ false, -1))) -/* - * Auxiliary structure for picksplit method. - */ -typedef struct -{ - int index; /* original index in entryvec->vector[] */ - RangeType *data; /* range value to sort */ - TypeCacheEntry *typcache; /* range type's info */ -} PickSplitSortItem; - static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1, RangeType * r2); static bool range_gist_consistent_int(FmgrInfo *flinfo, @@ -57,7 +154,30 @@ static bool range_gist_consistent_int(FmgrInfo *flinfo, static bool range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy, RangeType *key, Datum query); -static int sort_item_cmp(const void *a, const void *b); +static void range_gist_fallback_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v); +static void range_gist_class_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + SplitLR *classes_groups); +static void range_gist_single_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + bool use_upper_bound); +static void range_gist_double_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v); +static void range_gist_consider_split(ConsiderSplitContext *context, + RangeBound *right_lower, int min_left_count, + RangeBound *left_upper, int max_left_count); +static int get_gist_range_class(RangeType *range); +static int single_bound_cmp(const void *a, const void *b, void *arg); +static int interval_cmp_lower(const void *a, const void *b, void *arg); +static int interval_cmp_upper(const void *a, const void *b, void *arg); +static int common_entry_cmp(const void *i1, const void *i2); +static float8 call_subtype_diff(TypeCacheEntry *typcache, + Datum val1, Datum val2); /* GiST query consistency check */ @@ -122,7 +242,16 @@ range_gist_decompress(PG_FUNCTION_ARGS) PG_RETURN_POINTER(entry); } -/* page split penalty function */ +/* + * GiST page split penalty function. + * + * The penalty function has the following goals (in order from most to least + * important): + * - Keep normal ranges separate + * - Avoid broadening the class of the original predicate + * - Avoid broadening (as determined by subtype_diff) the original predicate + * - Favor adding ranges to narrower original predicates + */ Datum range_gist_penalty(PG_FUNCTION_ARGS) { @@ -132,118 +261,253 @@ range_gist_penalty(PG_FUNCTION_ARGS) RangeType *orig = DatumGetRangeType(origentry->key); RangeType *new = DatumGetRangeType(newentry->key); TypeCacheEntry *typcache; - RangeType *s_union; - FmgrInfo *subtype_diff; - RangeBound lower1, - lower2; - RangeBound upper1, - upper2; - bool empty1, - empty2; - float8 lower_diff, - upper_diff; + bool has_subtype_diff; + RangeBound orig_lower, + new_lower, + orig_upper, + new_upper; + bool orig_empty, + new_empty; if (RangeTypeGetOid(orig) != RangeTypeGetOid(new)) elog(ERROR, "range types do not match"); typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig)); - subtype_diff = &typcache->rng_subdiff_finfo; + has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid); + + range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty); + range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty); /* - * If new is or contains empty, and orig doesn't, apply infinite penalty. - * We really don't want to pollute an empty-free subtree with empties. + * Distinct branches for handling distinct classes of ranges. Note + * that penalty values only need to be commensurate within the same + * class of new range. */ - if (RangeIsOrContainsEmpty(new) && !RangeIsOrContainsEmpty(orig)) + if (new_empty) { - *penalty = get_float4_infinity(); - PG_RETURN_POINTER(penalty); + /* Handle insertion of empty range */ + if (orig_empty) + { + /* + * The best case is to insert it to empty original + * range. Insertion here means no broadening of original range. + * Also original range is the most narrow. + */ + *penalty = 0.0; + } + else if (RangeIsOrContainsEmpty(orig)) + { + /* + * The second case is to insert empty range into range which + * contains at least one underlying empty range. There is still + * no broadening of original range, but original range is not as + * narrow as possible. + */ + *penalty = CONTAIN_EMPTY_PENALTY; + } + else if (orig_lower.infinite && orig_upper.infinite) + { + /* + * Original range requires broadening. (-inf; +inf) is most far + * from normal range in this case. + */ + *penalty = 2 * CONTAIN_EMPTY_PENALTY; + } + else if (orig_lower.infinite || orig_upper.infinite) + { + /* + * (-inf, x) or (x, +inf) original ranges are closer to normal + * ranges, so it's worse to mix it with empty ranges. + */ + *penalty = 3 * CONTAIN_EMPTY_PENALTY; + } + else + { + /* + * The least preferred case is broadening of normal range. + */ + *penalty = 4 * CONTAIN_EMPTY_PENALTY; + } } - - /* - * We want to compare the size of "orig" to size of "orig union new". - * The penalty will be the sum of the reduction in the lower bound plus - * the increase in the upper bound. - */ - s_union = range_super_union(typcache, orig, new); - - range_deserialize(typcache, orig, &lower1, &upper1, &empty1); - range_deserialize(typcache, s_union, &lower2, &upper2, &empty2); - - /* handle cases where orig is empty */ - if (empty1 && empty2) + else if (new_lower.infinite && new_upper.infinite) { - *penalty = 0; - PG_RETURN_POINTER(penalty); + /* Handle insertion of (-inf, +inf) range */ + if (orig_lower.infinite && orig_upper.infinite) + { + /* + * Best case is inserting to (-inf, +inf) original range. + */ + *penalty = 0.0; + } + else if (orig_lower.infinite || orig_upper.infinite) + { + /* + * When original range is (-inf, x) or (x, +inf) it requires + * broadening of original range (extension of one bound to + * infinity). + */ + *penalty = INFINITE_BOUND_PENALTY; + } + else + { + /* + * Insertion to normal original range is least preferred. + */ + *penalty = 2 * INFINITE_BOUND_PENALTY; + } + + if (RangeIsOrContainsEmpty(orig)) + { + /* + * Original range is narrower when it doesn't contain empty ranges. + * Add additional penalty otherwise. + */ + *penalty += CONTAIN_EMPTY_PENALTY; + } } - else if (empty1) + else if (new_lower.infinite) { - /* infinite penalty for pushing non-empty into all-empty subtree */ - *penalty = get_float4_infinity(); - PG_RETURN_POINTER(penalty); + /* Handle insertion of (-inf, x) range */ + if (!orig_empty && orig_lower.infinite) + { + if (orig_upper.infinite) + { + /* + * (-inf, +inf) range won't be extended by insertion of + * (-inf, x) range. It's a less desirable case than insertion + * to (-inf, y) original range without extension, because in + * that case original range is narrower. But we can't express + * that in single float value. + */ + *penalty = 0.0; + } + else + { + if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0) + { + /* + * Get extension of original range using subtype_diff. + * Use constant if subtype_diff unavailable. + */ + if (has_subtype_diff) + *penalty = call_subtype_diff(typcache, + new_upper.val, + orig_upper.val); + else + *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY; + } + else + { + /* No extension of original range */ + *penalty = 0.0; + } + } + } + else + { + /* + * If lower bound of original range is not -inf, then extension + * of it is infinity. + */ + *penalty = get_float4_infinity(); + } } - - /* if orig isn't empty, s_union can't be either */ - Assert(!empty2); - - /* similarly, if orig's lower bound is infinite, s_union's must be too */ - Assert(lower2.infinite || !lower1.infinite); - - if (lower2.infinite && lower1.infinite) - lower_diff = 0; - else if (lower2.infinite) - lower_diff = get_float8_infinity(); - else if (OidIsValid(subtype_diff->fn_oid)) + else if (new_upper.infinite) { - lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff, - typcache->rng_collation, - lower1.val, - lower2.val)); - /* orig's lower bound must be >= s_union's */ - if (lower_diff < 0) - lower_diff = 0; /* subtype_diff is broken */ + /* Handle insertion of (x, +inf) range */ + if (!orig_empty && orig_upper.infinite) + { + if (orig_lower.infinite) + { + /* + * (-inf, +inf) range won't be extended by insertion of + * (x, +inf) range. It's a less desirable case than insertion + * to (y, +inf) original range without extension, because in + * that case original range is narrower. But we can't express + * that in single float value. + */ + *penalty = 0.0; + } + else + { + if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0) + { + /* + * Get extension of original range using subtype_diff. + * Use constant if subtype_diff unavailable. + */ + if (has_subtype_diff) + *penalty = call_subtype_diff(typcache, + orig_lower.val, + new_lower.val); + else + *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY; + } + else + { + /* No extension of original range */ + *penalty = 0.0; + } + } + } + else + { + /* + * If upper bound of original range is not +inf, then extension + * of it is infinity. + */ + *penalty = get_float4_infinity(); + } } else { - /* only know whether there is a difference or not */ - lower_diff = range_cmp_bounds(typcache, &lower1, &lower2) > 0 ? 1 : 0; + /* Handle insertion of normal (non-empty, non-infinite) range */ + if (orig_empty || orig_lower.infinite || orig_upper.infinite) + { + /* + * Avoid mixing normal ranges with infinite and empty ranges. + */ + *penalty = get_float4_infinity(); + } + else + { + /* + * Calculate extension of original range by calling subtype_diff. + * Use constant if subtype_diff unavailable. + */ + float8 diff = 0.0; + + if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0) + { + if (has_subtype_diff) + diff += call_subtype_diff(typcache, + orig_lower.val, + new_lower.val); + else + diff += DEFAULT_SUBTYPE_DIFF_PENALTY; + } + if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0) + { + if (has_subtype_diff) + diff += call_subtype_diff(typcache, + new_upper.val, + orig_upper.val); + else + diff += DEFAULT_SUBTYPE_DIFF_PENALTY; + } + *penalty = diff; + } } - /* similarly, if orig's upper bound is infinite, s_union's must be too */ - Assert(upper2.infinite || !upper1.infinite); - - if (upper2.infinite && upper1.infinite) - upper_diff = 0; - else if (upper2.infinite) - upper_diff = get_float8_infinity(); - else if (OidIsValid(subtype_diff->fn_oid)) - { - upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff, - typcache->rng_collation, - upper2.val, - upper1.val)); - /* orig's upper bound must be <= s_union's */ - if (upper_diff < 0) - upper_diff = 0; /* subtype_diff is broken */ - } - else - { - /* only know whether there is a difference or not */ - upper_diff = range_cmp_bounds(typcache, &upper2, &upper1) > 0 ? 1 : 0; - } - - Assert(lower_diff >= 0 && upper_diff >= 0); - - *penalty = (float) (lower_diff + upper_diff); PG_RETURN_POINTER(penalty); } /* * The GiST PickSplit method for ranges * - * Algorithm based on sorting. Incoming array of ranges is sorted using - * sort_item_cmp function. After that first half of ranges goes to the left - * output, and the second half of ranges goes to the right output. + * Primarily, we try to segregate ranges of different classes. If splitting + * ranges of the same class, use the appropriate split method for that class. */ Datum range_gist_picksplit(PG_FUNCTION_ARGS) @@ -253,73 +517,149 @@ range_gist_picksplit(PG_FUNCTION_ARGS) TypeCacheEntry *typcache; OffsetNumber i; RangeType *pred_left; - RangeType *pred_right; - PickSplitSortItem *sortItems; int nbytes; - OffsetNumber split_idx; - OffsetNumber *left; - OffsetNumber *right; OffsetNumber maxoff; + int count_in_classes[CLS_COUNT]; + int j; + int non_empty_classes_count = 0; + int biggest_class = -1; + int biggest_class_count = 0; + int total_count; /* use first item to look up range type's info */ pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key); typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left)); - /* allocate result and work arrays */ maxoff = entryvec->n - 1; nbytes = (maxoff + 1) * sizeof(OffsetNumber); v->spl_left = (OffsetNumber *) palloc(nbytes); v->spl_right = (OffsetNumber *) palloc(nbytes); - sortItems = (PickSplitSortItem *) palloc(maxoff * sizeof(PickSplitSortItem)); /* - * Prepare auxiliary array and sort the values. + * Get count distribution of range classes. */ + memset(count_in_classes, 0, sizeof(count_in_classes)); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { - sortItems[i - 1].index = i; - sortItems[i - 1].data = DatumGetRangeType(entryvec->vector[i].key); - sortItems[i - 1].typcache = typcache; - } - qsort(sortItems, maxoff, sizeof(PickSplitSortItem), sort_item_cmp); + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); - split_idx = maxoff / 2; - - left = v->spl_left; - v->spl_nleft = 0; - right = v->spl_right; - v->spl_nright = 0; - - /* - * First half of items goes to the left output. - */ - pred_left = sortItems[0].data; - *left++ = sortItems[0].index; - v->spl_nleft++; - for (i = 1; i < split_idx; i++) - { - pred_left = range_super_union(typcache, pred_left, sortItems[i].data); - *left++ = sortItems[i].index; - v->spl_nleft++; + count_in_classes[get_gist_range_class(range)]++; } /* - * Second half of items goes to the right output. + * Count non-empty classes and find biggest class. */ - pred_right = sortItems[split_idx].data; - *right++ = sortItems[split_idx].index; - v->spl_nright++; - for (i = split_idx + 1; i < maxoff; i++) + total_count = maxoff; + for (j = 0; j < CLS_COUNT; j++) { - pred_right = range_super_union(typcache, pred_right, sortItems[i].data); - *right++ = sortItems[i].index; - v->spl_nright++; + if (count_in_classes[j] > 0) + { + if (count_in_classes[j] > biggest_class_count) + { + biggest_class_count = count_in_classes[j]; + biggest_class = j; + } + non_empty_classes_count++; + } } - *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ + Assert(non_empty_classes_count > 0); - v->spl_ldatum = RangeTypeGetDatum(pred_left); - v->spl_rdatum = RangeTypeGetDatum(pred_right); + if (non_empty_classes_count == 1) + { + /* One non-empty class, so split inside class */ + if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL) + { + /* double sorting split for normal ranges */ + range_gist_double_sorting_split(typcache, entryvec, v); + } + else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF) + { + /* upper bound sorting split for (-inf, x) ranges */ + range_gist_single_sorting_split(typcache, entryvec, v, true); + } + else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF) + { + /* lower bound sorting split for (x, +inf) ranges */ + range_gist_single_sorting_split(typcache, entryvec, v, false); + } + else + { + /* trivial split for all (-inf, +inf) or all empty ranges */ + range_gist_fallback_split(typcache, entryvec, v); + } + } + else + { + /* + * Class based split. + * + * To which side of the split should each class go? Initialize them + * all to go to the left side. + */ + SplitLR classes_groups[CLS_COUNT]; + + memset(classes_groups, 0, sizeof(classes_groups)); + + if (count_in_classes[CLS_NORMAL] > 0) + { + /* separate normal ranges if any */ + classes_groups[CLS_NORMAL] = SPLIT_RIGHT; + } + else + { + /*---------- + * Try to split classes in one of two ways: + * 1) containing infinities - not containing infinities + * 2) containing empty - not containing empty + * + * Select the way which balances the ranges between left and right + * the best. If split in these ways is not possible, there are at + * most 3 classes, so just separate biggest class. + *---------- + */ + int infCount, nonInfCount; + int emptyCount, nonEmptyCount; + + nonInfCount = + count_in_classes[CLS_NORMAL] + + count_in_classes[CLS_CONTAIN_EMPTY] + + count_in_classes[CLS_EMPTY]; + infCount = total_count - nonInfCount; + + nonEmptyCount = + count_in_classes[CLS_NORMAL] + + count_in_classes[CLS_LOWER_INF] + + count_in_classes[CLS_UPPER_INF] + + count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF]; + emptyCount = total_count - nonEmptyCount; + + if (infCount > 0 && nonInfCount > 0 && + (Abs(infCount - nonInfCount) <= + Abs(emptyCount - nonEmptyCount))) + { + classes_groups[CLS_NORMAL] = SPLIT_RIGHT; + classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT; + classes_groups[CLS_EMPTY] = SPLIT_RIGHT; + } + else if (emptyCount > 0 && nonEmptyCount > 0) + { + classes_groups[CLS_NORMAL] = SPLIT_RIGHT; + classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT; + classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT; + classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT; + } + else + { + /* + * Either total_count == emptyCount or total_count == infCount. + */ + classes_groups[biggest_class] = SPLIT_RIGHT; + } + } + + range_gist_class_split(typcache, entryvec, v, classes_groups); + } PG_RETURN_POINTER(v); } @@ -611,78 +951,649 @@ range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy, } /* - * Compare function for PickSplitSortItem. This is actually the - * interesting part of the picksplit algorithm. - * - * We want to separate out empty ranges, bounded ranges, and unbounded - * ranges. We assume that "contains" and "overlaps" are the most - * important queries, so empty ranges will rarely match and unbounded - * ranges frequently will. Bounded ranges should be in the middle. - * - * Empty ranges we push all the way to the left, then bounded ranges - * (sorted on lower bound, then upper), then ranges with no lower - * bound, then ranges with no upper bound; and finally, ranges with no - * upper or lower bound all the way to the right. + * Trivial split: half of entries will be placed on one page + * and the other half on the other page. */ -static int -sort_item_cmp(const void *a, const void *b) +static void +range_gist_fallback_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v) { - PickSplitSortItem *i1 = (PickSplitSortItem *) a; - PickSplitSortItem *i2 = (PickSplitSortItem *) b; - RangeType *r1 = i1->data; - RangeType *r2 = i2->data; - TypeCacheEntry *typcache = i1->typcache; - RangeBound lower1, - lower2; - RangeBound upper1, - upper2; - bool empty1, - empty2; - int cmp; + RangeType *left_range = NULL; + RangeType *right_range = NULL; + OffsetNumber i, maxoff, split_idx; - range_deserialize(typcache, r1, &lower1, &upper1, &empty1); - range_deserialize(typcache, r2, &lower2, &upper2, &empty2); + maxoff = entryvec->n - 1; + /* Split entries before this to left page, after to right: */ + split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber; - if (empty1 || empty2) + v->spl_nleft = 0; + v->spl_nright = 0; + for (i = FirstOffsetNumber; i <= maxoff; i++) { - if (empty1 && empty2) - return 0; - else if (empty1) - return -1; - else if (empty2) - return 1; + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + + if (i < split_idx) + PLACE_LEFT(range, i); else - Assert(false); + PLACE_RIGHT(range, i); + } + + v->spl_ldatum = RangeTypeGetDatum(left_range); + v->spl_rdatum = RangeTypeGetDatum(right_range); +} + +/* + * Split based on classes of ranges. + * + * See get_gist_range_class for class definitions. + * classes_groups is an array of length CLS_COUNT indicating the side of the + * split to which each class should go. + */ +static void +range_gist_class_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + SplitLR *classes_groups) +{ + RangeType *left_range = NULL; + RangeType *right_range = NULL; + OffsetNumber i, maxoff; + + maxoff = entryvec->n - 1; + + v->spl_nleft = 0; + v->spl_nright = 0; + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + int class; + + /* Get class of range */ + class = get_gist_range_class(range); + + /* Place range to appropriate page */ + if (classes_groups[class] == SPLIT_LEFT) + PLACE_LEFT(range, i); + else + { + Assert(classes_groups[class] == SPLIT_RIGHT); + PLACE_RIGHT(range, i); + } + } + + v->spl_ldatum = RangeTypeGetDatum(left_range); + v->spl_rdatum = RangeTypeGetDatum(right_range); +} + +/* + * Sorting based split. First half of entries according to the sort will be + * placed to one page, and second half of entries will be placed to other + * page. use_upper_bound parameter indicates whether to use upper or lower + * bound for sorting. + */ +static void +range_gist_single_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + bool use_upper_bound) +{ + SingleBoundSortItem *sortItems; + RangeType *left_range = NULL; + RangeType *right_range = NULL; + OffsetNumber i, maxoff, split_idx; + + maxoff = entryvec->n - 1; + + sortItems = (SingleBoundSortItem *) + palloc(maxoff * sizeof(SingleBoundSortItem)); + + /* + * Prepare auxiliary array and sort the values. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + RangeBound bound2; + bool empty; + + sortItems[i - 1].index = i; + /* Put appropriate bound into array */ + if (use_upper_bound) + range_deserialize(typcache, range, &bound2, + &sortItems[i - 1].bound, &empty); + else + range_deserialize(typcache, range, &sortItems[i - 1].bound, + &bound2, &empty); + Assert(!empty); + } + + qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem), + single_bound_cmp, typcache); + + split_idx = maxoff / 2; + + v->spl_nleft = 0; + v->spl_nright = 0; + + for (i = 0; i < maxoff; i++) + { + int idx = sortItems[i].index; + RangeType *range = DatumGetRangeType(entryvec->vector[idx].key); + + if (i < split_idx) + PLACE_LEFT(range, idx); + else + PLACE_RIGHT(range, idx); + } + + v->spl_ldatum = RangeTypeGetDatum(left_range); + v->spl_rdatum = RangeTypeGetDatum(right_range); +} + +/* + * Double sorting split algorithm. + * + * The algorithm considers dividing ranges into two groups. The first (left) + * group contains general left bound. The second (right) group contains + * general right bound. The challenge is to find upper bound of left group + * and lower bound of right group so that overlap of groups is minimal and + * ratio of distribution is acceptable. Algorithm finds for each lower bound of + * right group minimal upper bound of left group, and for each upper bound of + * left group maximal lower bound of right group. For each found pair + * range_gist_consider_split considers replacement of currently selected + * split with the new one. + * + * After that, all the entries are divided into three groups: + * 1) Entries which should be placed to the left group + * 2) Entries which should be placed to the right group + * 3) "Common entries" which can be placed to either group without affecting + * amount of overlap. + * + * The common ranges are distributed by difference of distance from lower + * bound of common range to lower bound of right group and distance from upper + * bound of common range to upper bound of left group. + * + * For details see: + * "A new double sorting-based node splitting algorithm for R-tree", + * A. Korotkov + * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36 + */ +static void +range_gist_double_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v) +{ + ConsiderSplitContext context; + OffsetNumber i, maxoff; + RangeType *range, + *left_range = NULL, + *right_range = NULL; + int common_entries_count; + NonEmptyRange *by_lower, + *by_upper; + CommonEntry *common_entries; + int nentries, i1, i2; + RangeBound *right_lower, *left_upper; + + memset(&context, 0, sizeof(ConsiderSplitContext)); + context.typcache = typcache; + context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid); + + maxoff = entryvec->n - 1; + nentries = context.entries_count = maxoff - FirstOffsetNumber + 1; + context.first = true; + + /* Allocate arrays for sorted range bounds */ + by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange)); + by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange)); + + /* Fill arrays of bounds */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + bool empty; + + range_deserialize(typcache, range, + &by_lower[i - FirstOffsetNumber].lower, + &by_lower[i - FirstOffsetNumber].upper, + &empty); + Assert(!empty); } /* - * If both lower or both upper bounds are infinite, we sort by ascending - * range size. That means that if both upper bounds are infinite, we sort - * by the lower bound _descending_. That creates a slightly odd total - * order, but keeps the pages with very unselective predicates grouped - * more closely together on the right. + * Make two arrays of range bounds: one sorted by lower bound and another + * sorted by upper bound. */ - if (lower1.infinite || upper1.infinite || - lower2.infinite || upper2.infinite) + memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange)); + qsort_arg(by_lower, nentries, sizeof(NonEmptyRange), + interval_cmp_lower, typcache); + qsort_arg(by_upper, nentries, sizeof(NonEmptyRange), + interval_cmp_upper, typcache); + + /*---------- + * The goal is to form a left and right range, so that every entry + * range is contained by either left or right interval (or both). + * + * For example, with the ranges (0,1), (1,3), (2,3), (2,4): + * + * 0 1 2 3 4 + * +-+ + * +---+ + * +-+ + * +---+ + * + * The left and right ranges are of the form (0,a) and (b,4). + * We first consider splits where b is the lower bound of an entry. + * We iterate through all entries, and for each b, calculate the + * smallest possible a. Then we consider splits where a is the + * upper bound of an entry, and for each a, calculate the greatest + * possible b. + * + * In the above example, the first loop would consider splits: + * b=0: (0,1)-(0,4) + * b=1: (0,1)-(1,4) + * b=2: (0,3)-(2,4) + * + * And the second loop: + * a=1: (0,1)-(1,4) + * a=3: (0,3)-(2,4) + * a=4: (0,4)-(2,4) + *---------- + */ + + /* + * Iterate over lower bound of right group, finding smallest possible + * upper bound of left group. + */ + i1 = 0; + i2 = 0; + right_lower = &by_lower[i1].lower; + left_upper = &by_upper[i2].lower; + while (true) { - if (lower1.infinite && lower2.infinite) - return range_cmp_bounds(typcache, &upper1, &upper2); - else if (lower1.infinite) - return -1; - else if (lower2.infinite) - return 1; - else if (upper1.infinite && upper2.infinite) - return -(range_cmp_bounds(typcache, &lower1, &lower2)); - else if (upper1.infinite) - return 1; - else if (upper2.infinite) - return -1; - else - Assert(false); + /* + * Find next lower bound of right group. + */ + while (i1 < nentries && + range_cmp_bounds(typcache, right_lower, + &by_lower[i1].lower) == 0) + { + if (range_cmp_bounds(typcache, &by_lower[i1].upper, + left_upper) > 0) + left_upper = &by_lower[i1].upper; + i1++; + } + if (i1 >= nentries) + break; + right_lower = &by_lower[i1].lower; + + /* + * Find count of ranges which anyway should be placed to the + * left group. + */ + while (i2 < nentries && + range_cmp_bounds(typcache, &by_upper[i2].upper, + left_upper) <= 0) + i2++; + + /* + * Consider found split to see if it's better than what we had. + */ + range_gist_consider_split(&context, right_lower, i1, left_upper, i2); } - if ((cmp = range_cmp_bounds(typcache, &lower1, &lower2)) != 0) - return cmp; + /* + * Iterate over upper bound of left group finding greatest possible + * lower bound of right group. + */ + i1 = nentries - 1; + i2 = nentries - 1; + right_lower = &by_lower[i1].upper; + left_upper = &by_upper[i2].upper; + while (true) + { + /* + * Find next upper bound of left group. + */ + while (i2 >= 0 && + range_cmp_bounds(typcache, left_upper, + &by_upper[i2].upper) == 0) + { + if (range_cmp_bounds(typcache, &by_upper[i2].lower, + right_lower) < 0) + right_lower = &by_upper[i2].lower; + i2--; + } + if (i2 < 0) + break; + left_upper = &by_upper[i2].upper; - return range_cmp_bounds(typcache, &upper1, &upper2); + /* + * Find count of intervals which anyway should be placed to the + * right group. + */ + while (i1 >= 0 && + range_cmp_bounds(typcache, &by_lower[i1].lower, + right_lower) >= 0) + i1--; + + /* + * Consider found split to see if it's better than what we had. + */ + range_gist_consider_split(&context, right_lower, i1 + 1, + left_upper, i2 + 1); + } + + /* + * If we failed to find any acceptable splits, use trivial split. + */ + if (context.first) + { + range_gist_fallback_split(typcache, entryvec, v); + return; + } + + /* + * Ok, we have now selected bounds of the groups. Now we have to distribute + * entries themselves. At first we distribute entries which can be placed + * unambiguously and collect "common entries" to array. + */ + + /* Allocate vectors for results */ + v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_nleft = 0; + v->spl_nright = 0; + + /* + * Allocate an array for "common entries" - entries which can be placed to + * either group without affecting overlap along selected axis. + */ + common_entries_count = 0; + common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry)); + + /* + * Distribute entries which can be distributed unambiguously, and collect + * common entries. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeBound lower, + upper; + bool empty; + + /* + * Get upper and lower bounds along selected axis. + */ + range = DatumGetRangeType(entryvec->vector[i].key); + + range_deserialize(typcache, range, &lower, &upper, &empty); + + if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0) + { + /* Fits in the left group */ + if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0) + { + /* Fits also in the right group, so "common entry" */ + common_entries[common_entries_count].index = i; + if (context.has_subtype_diff) + { + /* + * delta = (lower - context.right_lower) - + * (context.left_upper - upper) + */ + common_entries[common_entries_count].delta = + call_subtype_diff(typcache, + lower.val, + context.right_lower->val) - + call_subtype_diff(typcache, + context.left_upper->val, + upper.val); + } + else + { + /* Without subtype_diff, take all deltas as zero */ + common_entries[common_entries_count].delta = 0; + } + common_entries_count++; + } + else + { + /* Doesn't fit to the right group, so join to the left group */ + PLACE_LEFT(range, i); + } + } + else + { + /* + * Each entry should fit on either left or right group. Since this + * entry didn't fit in the left group, it better fit in the right + * group. + */ + Assert(range_cmp_bounds(typcache, &lower, + context.right_lower) >= 0); + PLACE_RIGHT(range, i); + } + } + + /* + * Distribute "common entries", if any. + */ + if (common_entries_count > 0) + { + /* + * Sort "common entries" by calculated deltas in order to distribute + * the most ambiguous entries first. + */ + qsort(common_entries, common_entries_count, sizeof(CommonEntry), + common_entry_cmp); + + /* + * Distribute "common entries" between groups according to sorting. + */ + for (i = 0; i < common_entries_count; i++) + { + int idx = common_entries[i].index; + + range = DatumGetRangeType(entryvec->vector[idx].key); + + /* + * Check if we have to place this entry in either group to achieve + * LIMIT_RATIO. + */ + if (i < context.common_left) + PLACE_LEFT(range, idx); + else + PLACE_RIGHT(range, idx); + } + } + + v->spl_ldatum = PointerGetDatum(left_range); + v->spl_rdatum = PointerGetDatum(right_range); +} + +/* + * Consider replacement of currently selected split with a better one + * during range_gist_double_sorting_split. + */ +static void +range_gist_consider_split(ConsiderSplitContext *context, + RangeBound *right_lower, int min_left_count, + RangeBound *left_upper, int max_left_count) +{ + int left_count, + right_count; + float4 ratio, + overlap; + + /* + * Calculate entries distribution ratio assuming most uniform distribution + * of common entries. + */ + if (min_left_count >= (context->entries_count + 1) / 2) + left_count = min_left_count; + else if (max_left_count <= context->entries_count / 2) + left_count = max_left_count; + else + left_count = context->entries_count / 2; + right_count = context->entries_count - left_count; + + /* + * Ratio of split: quotient between size of smaller group and total + * entries count. This is necessarily 0.5 or less; if it's less than + * LIMIT_RATIO then we will never accept the new split. + */ + ratio = ((float4) Min(left_count, right_count)) / + ((float4) context->entries_count); + + if (ratio > LIMIT_RATIO) + { + bool selectthis = false; + + /* + * The ratio is acceptable, so compare current split with previously + * selected one. We search for minimal overlap (allowing negative + * values) and minimal ratio secondarily. If subtype_diff is + * available, it's used for overlap measure. Without subtype_diff we + * use number of "common entries" as an overlap measure. + */ + if (context->has_subtype_diff) + overlap = call_subtype_diff(context->typcache, + left_upper->val, + right_lower->val); + else + overlap = max_left_count - min_left_count; + + /* If there is no previous selection, select this split */ + if (context->first) + selectthis = true; + else + { + /* + * Choose the new split if it has a smaller overlap, or same + * overlap but better ratio. + */ + if (overlap < context->overlap || + (overlap == context->overlap && ratio > context->ratio)) + selectthis = true; + } + + if (selectthis) + { + /* save information about selected split */ + context->first = false; + context->ratio = ratio; + context->overlap = overlap; + context->right_lower = right_lower; + context->left_upper = left_upper; + context->common_left = max_left_count - left_count; + context->common_right = left_count - min_left_count; + } + } +} + +/* + * Find class number for range. + * + * The class number is a valid combination of the properties of the + * range. Note: the highest possible number is 8, because CLS_EMPTY + * can't be combined with anything else. + */ +static int +get_gist_range_class(RangeType *range) +{ + int classNumber; + char flags; + + flags = range_get_flags(range); + if (flags & RANGE_EMPTY) + { + classNumber = CLS_EMPTY; + } + else + { + classNumber = 0; + if (flags & RANGE_LB_INF) + classNumber |= CLS_LOWER_INF; + if (flags & RANGE_UB_INF) + classNumber |= CLS_UPPER_INF; + if (flags & RANGE_CONTAIN_EMPTY) + classNumber |= CLS_CONTAIN_EMPTY; + } + return classNumber; +} + +/* + * Comparison function for range_gist_single_sorting_split. + */ +static int +single_bound_cmp(const void *a, const void *b, void *arg) +{ + SingleBoundSortItem *i1 = (SingleBoundSortItem *) a; + SingleBoundSortItem *i2 = (SingleBoundSortItem *) b; + TypeCacheEntry *typcache = (TypeCacheEntry *) arg; + + return range_cmp_bounds(typcache, &i1->bound, &i2->bound); +} + +/* + * Compare NonEmptyRanges by lower bound. + */ +static int +interval_cmp_lower(const void *a, const void *b, void *arg) +{ + NonEmptyRange *i1 = (NonEmptyRange *) a; + NonEmptyRange *i2 = (NonEmptyRange *) b; + TypeCacheEntry *typcache = (TypeCacheEntry *) arg; + + return range_cmp_bounds(typcache, &i1->lower, &i2->lower); +} + +/* + * Compare NonEmptyRanges by upper bound. + */ +static int +interval_cmp_upper(const void *a, const void *b, void *arg) +{ + NonEmptyRange *i1 = (NonEmptyRange *) a; + NonEmptyRange *i2 = (NonEmptyRange *) b; + TypeCacheEntry *typcache = (TypeCacheEntry *) arg; + + return range_cmp_bounds(typcache, &i1->upper, &i2->upper); +} + +/* + * Compare CommonEntrys by their deltas. + */ +static int +common_entry_cmp(const void *i1, const void *i2) +{ + double delta1 = ((CommonEntry *) i1)->delta; + double delta2 = ((CommonEntry *) i2)->delta; + + if (delta1 < delta2) + return -1; + else if (delta1 > delta2) + return 1; + else + return 0; +} + +/* + * Convenience function to invoke type-specific subtype_diff function. + * Caller must have already checked that there is one for the range type. + */ +static float8 +call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2) +{ + float8 value; + + value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo, + typcache->rng_collation, + val1, val2)); + /* Cope with buggy subtype_diff function by returning zero */ + if (value >= 0.0) + return value; + return 0.0; }