diff --git a/src/backend/utils/adt/rangetypes.c b/src/backend/utils/adt/rangetypes.c index c440847916..84a4aca16c 100644 --- a/src/backend/utils/adt/rangetypes.c +++ b/src/backend/utils/adt/rangetypes.c @@ -709,6 +709,64 @@ range_after(PG_FUNCTION_ARGS) PG_RETURN_BOOL(range_after_internal(typcache, r1, r2)); } +/* + * Check if two bounds A and B are "adjacent", where A is an upper bound and B + * is a lower bound. For the bounds to be adjacent, each subtype value must + * satisfy strictly one of the bounds: there are no values which satisfy both + * bounds (i.e. less than A and greater than B); and there are no values which + * satisfy neither bound (i.e. greater than A and less than B). + * + * For discrete ranges, we rely on the canonicalization function to see if A..B + * normalizes to empty. (If there is no canonicalization function, it's + * impossible for such a range to normalize to empty, so we needn't bother to + * try.) + * + * If A == B, the ranges are adjacent only if the bounds have different + * inclusive flags (i.e., exactly one of the ranges includes the common + * boundary point). + * + * And if A > B then the ranges are not adjacent in this order. + */ +bool +bounds_adjacent(TypeCacheEntry *typcache, RangeBound boundA, RangeBound boundB) +{ + int cmp; + + Assert(!boundA.lower && boundB.lower); + + cmp = range_cmp_bound_values(typcache, &boundA, &boundB); + if (cmp < 0) + { + RangeType *r; + + /* + * Bounds do not overlap; see if there are points in between. + */ + + /* in a continuous subtype, there are assumed to be points between */ + if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid)) + return false; + + /* + * The bounds are of a discrete range type; so make a range A..B and + * see if it's empty. + */ + + /* flip the inclusion flags */ + boundA.inclusive = !boundA.inclusive; + boundB.inclusive = !boundB.inclusive; + /* change upper/lower labels to avoid Assert failures */ + boundA.lower = true; + boundB.lower = false; + r = make_range(typcache, &boundA, &boundB, false); + return RangeIsEmpty(r); + } + else if (cmp == 0) + return boundA.inclusive != boundB.inclusive; + else + return false; /* bounds overlap */ +} + /* adjacent to (but not overlapping)? (internal version) */ bool range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2) @@ -719,8 +777,6 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2) upper2; bool empty1, empty2; - RangeType *r3; - int cmp; /* Different types should be prevented by ANYRANGE matching rules */ if (RangeTypeGetOid(r1) != RangeTypeGetOid(r2)) @@ -734,62 +790,11 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2) return false; /* - * Given two ranges A..B and C..D, where B < C, the ranges are adjacent if - * and only if the range B..C is empty, where inclusivity of these two - * bounds is inverted compared to the original bounds. For discrete - * ranges, we have to rely on the canonicalization function to normalize - * B..C to empty if it contains no elements of the subtype. (If there is - * no canonicalization function, it's impossible for such a range to - * normalize to empty, so we needn't bother to try.) - * - * If B == C, the ranges are adjacent only if these bounds have different - * inclusive flags (i.e., exactly one of the ranges includes the common - * boundary point). - * - * And if B > C then the ranges cannot be adjacent in this order, but we - * must consider the other order (i.e., check D <= A). + * Given two ranges A..B and C..D, the ranges are adjacent if and only if + * B is adjacent to C, or D is adjacent to A. */ - cmp = range_cmp_bound_values(typcache, &upper1, &lower2); - if (cmp < 0) - { - /* in a continuous subtype, there are assumed to be points between */ - if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid)) - return (false); - /* flip the inclusion flags */ - upper1.inclusive = !upper1.inclusive; - lower2.inclusive = !lower2.inclusive; - /* change upper/lower labels to avoid Assert failures */ - upper1.lower = true; - lower2.lower = false; - r3 = make_range(typcache, &upper1, &lower2, false); - return RangeIsEmpty(r3); - } - if (cmp == 0) - { - return (upper1.inclusive != lower2.inclusive); - } - - cmp = range_cmp_bound_values(typcache, &upper2, &lower1); - if (cmp < 0) - { - /* in a continuous subtype, there are assumed to be points between */ - if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid)) - return (false); - /* flip the inclusion flags */ - upper2.inclusive = !upper2.inclusive; - lower1.inclusive = !lower1.inclusive; - /* change upper/lower labels to avoid Assert failures */ - upper2.lower = true; - lower1.lower = false; - r3 = make_range(typcache, &upper2, &lower1, false); - return RangeIsEmpty(r3); - } - if (cmp == 0) - { - return (upper2.inclusive != lower1.inclusive); - } - - return false; + return (bounds_adjacent(typcache, upper1, lower2) || + bounds_adjacent(typcache, upper2, lower1)); } /* adjacent to (but not overlapping)? */ diff --git a/src/backend/utils/adt/rangetypes_spgist.c b/src/backend/utils/adt/rangetypes_spgist.c index d395ef4718..9a7f20d9f3 100644 --- a/src/backend/utils/adt/rangetypes_spgist.c +++ b/src/backend/utils/adt/rangetypes_spgist.c @@ -305,6 +305,16 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS) int which; int i; + /* + * For adjacent search we need also previous centroid (if any) to improve + * the precision of the consistent check. In this case needPrevious flag is + * set and centroid is passed into reconstructedValues. This is not the + * intended purpose of reconstructedValues (because we already have the + * full value available at the leaf), but it's a convenient place to store + * state while traversing the tree. + */ + bool needPrevious = false; + if (in->allTheSame) { /* Report that all nodes should be visited */ @@ -351,6 +361,7 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS) case RANGESTRAT_OVERLAPS: case RANGESTRAT_OVERRIGHT: case RANGESTRAT_AFTER: + case RANGESTRAT_ADJACENT: /* These strategies return false if any argument is empty */ if (empty) which = 0; @@ -435,6 +446,9 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS) /* Are the restrictions on range bounds inclusive? */ bool inclusive = true; bool strictEmpty = true; + int cmp, + which1, + which2; strategy = in->scankeys[i].sk_strategy; @@ -522,6 +536,118 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS) inclusive = false; break; + case RANGESTRAT_ADJACENT: + if (empty) + break; /* Skip to strictEmpty check. */ + + /* + * which1 is bitmask for possibility to be adjacent with + * lower bound of argument. which2 is bitmask for + * possibility to be adjacent with upper bound of argument. + */ + which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4); + + /* + * Previously selected quadrant could exclude possibility + * for lower or upper bounds to be adjacent. Deserialize + * previous centroid range if present for checking this. + */ + if (in->reconstructedValue != (Datum) 0) + { + RangeType *prevCentroid; + RangeBound prevLower, + prevUpper; + bool prevEmpty; + int cmp1, + cmp2; + + prevCentroid = DatumGetRangeType(in->reconstructedValue); + range_deserialize(typcache, prevCentroid, + &prevLower, &prevUpper, &prevEmpty); + + /* + * Check if lower bound of argument is not in a + * quadrant we visited in the previous step. + */ + cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper); + cmp2 = range_cmp_bounds(typcache, ¢roidUpper, + &prevUpper); + if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0)) + which1 = 0; + + /* + * Check if upper bound of argument is not in a + * quadrant we visited in the previous step. + */ + cmp1 = range_cmp_bounds(typcache, &upper, &prevLower); + cmp2 = range_cmp_bounds(typcache, ¢roidLower, + &prevLower); + if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0)) + which2 = 0; + } + + if (which1) + { + /* + * For a range's upper bound to be adjacent to the + * argument's lower bound, it will be found along the + * line adjacent to (and just below) Y=lower. + * Therefore, if the argument's lower bound is less + * than the centroid's upper bound, the line falls in + * quadrants 2 and 3; if greater, the line falls in + * quadrants 1 and 4. + * + * The above is true even when the argument's lower + * bound is greater and adjacent to the centroid's + * upper bound. If the argument's lower bound is + * greater than the centroid's upper bound, then the + * lowest value that an adjacent range could have is + * that of the centroid's upper bound, which still + * falls in quadrants 1 and 4. + * + * In the edge case, where the argument's lower bound + * is equal to the cetroid's upper bound, there may be + * adjacent ranges in any quadrant. + */ + cmp = range_cmp_bounds(typcache, &lower, + ¢roidUpper); + if (cmp < 0) + which1 &= (1 << 2) | (1 << 3); + else if (cmp > 0) + which1 &= (1 << 1) | (1 << 4); + } + + if (which2) + { + /* + * For a range's lower bound to be adjacent to the + * argument's upper bound, it will be found along the + * line adjacent to (and just right of) + * X=upper. Therefore, if the argument's upper bound is + * less than (and not adjacent to) the centroid's upper + * bound, the line falls in quadrants 3 and 4; if + * greater or equal to, the line falls in quadrants 1 + * and 2. + * + * The edge case is when the argument's upper bound is + * less than and adjacent to the centroid's lower + * bound. In that case, adjacent ranges may be in any + * quadrant. + */ + cmp = range_cmp_bounds(typcache, &lower, + ¢roidUpper); + if (cmp < 0 && + !bounds_adjacent(typcache, upper, centroidLower)) + which1 &= (1 << 3) | (1 << 4); + else if (cmp > 0) + which1 &= (1 << 1) | (1 << 2); + } + + which &= which1 | which2; + + needPrevious = true; + break; + case RANGESTRAT_CONTAINS: /* * Non-empty range A contains non-empty range B if lower @@ -652,11 +778,18 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS) /* We must descend into the quadrant(s) identified by 'which' */ out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes); + if (needPrevious) + out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes); out->nNodes = 0; for (i = 1; i <= in->nNodes; i++) { if (which & (1 << i)) + { + /* Save previous prefix if needed */ + if (needPrevious) + out->reconstructedValues[out->nNodes] = in->prefixDatum; out->nodeNumbers[out->nNodes++] = i - 1; + } } PG_RETURN_VOID(); @@ -713,6 +846,10 @@ spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS) res = range_after_internal(typcache, leafRange, DatumGetRangeType(keyDatum)); break; + case RANGESTRAT_ADJACENT: + res = range_adjacent_internal(typcache, leafRange, + DatumGetRangeType(keyDatum)); + break; case RANGESTRAT_CONTAINS: res = range_contains_internal(typcache, leafRange, DatumGetRangeType(keyDatum)); diff --git a/src/include/catalog/pg_amop.h b/src/include/catalog/pg_amop.h index 5d451e36c1..d200348747 100644 --- a/src/include/catalog/pg_amop.h +++ b/src/include/catalog/pg_amop.h @@ -775,6 +775,7 @@ DATA(insert ( 3474 3831 3831 2 s 3895 4000 0 )); DATA(insert ( 3474 3831 3831 3 s 3888 4000 0 )); DATA(insert ( 3474 3831 3831 4 s 3896 4000 0 )); DATA(insert ( 3474 3831 3831 5 s 3894 4000 0 )); +DATA(insert ( 3474 3831 3831 6 s 3897 4000 0 )); DATA(insert ( 3474 3831 3831 7 s 3890 4000 0 )); DATA(insert ( 3474 3831 3831 8 s 3892 4000 0 )); DATA(insert ( 3474 3831 2283 16 s 3889 4000 0 )); diff --git a/src/include/utils/rangetypes.h b/src/include/utils/rangetypes.h index 67b1a7ce61..f6b941d3ac 100644 --- a/src/include/utils/rangetypes.h +++ b/src/include/utils/rangetypes.h @@ -201,6 +201,8 @@ extern int range_cmp_bounds(TypeCacheEntry *typcache, RangeBound *b1, RangeBound *b2); extern int range_cmp_bound_values(TypeCacheEntry *typcache, RangeBound *b1, RangeBound *b2); +extern bool bounds_adjacent(TypeCacheEntry *typcache, RangeBound bound1, + RangeBound bound2); extern RangeType *make_empty_range(TypeCacheEntry *typcache); /* GiST support (in rangetypes_gist.c) */ diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out index 6faaf4272f..256b7196d0 100644 --- a/src/test/regress/expected/opr_sanity.out +++ b/src/test/regress/expected/opr_sanity.out @@ -1076,6 +1076,7 @@ ORDER BY 1, 2, 3; 4000 | 4 | ~>=~ 4000 | 5 | >> 4000 | 5 | ~>~ + 4000 | 6 | -|- 4000 | 6 | ~= 4000 | 7 | @> 4000 | 8 | <@ @@ -1087,7 +1088,7 @@ ORDER BY 1, 2, 3; 4000 | 15 | > 4000 | 16 | @> 4000 | 18 | = -(61 rows) +(62 rows) -- Check that all opclass search operators have selectivity estimators. -- This is not absolutely required, but it seems a reasonable thing