diff --git a/contrib/intarray/_int_gist.c b/contrib/intarray/_int_gist.c index 911d18023b..fe10cb56b6 100644 --- a/contrib/intarray/_int_gist.c +++ b/contrib/intarray/_int_gist.c @@ -12,6 +12,17 @@ #define GETENTRY(vec,pos) ((ArrayType *) DatumGetPointer((vec)->vector[(pos)].key)) +/* + * Control the maximum sparseness of compressed keys. + * + * The upper safe bound for this limit is half the maximum allocatable array + * size. A lower bound would give more guarantees that pathological data + * wouldn't eat excessive CPU and memory, but at the expense of breaking + * possibly working (after a fashion) indexes. + */ +#define MAXNUMELTS (Min((MaxAllocSize / sizeof(Datum)),((MaxAllocSize - ARR_OVERHEAD_NONULLS(1)) / sizeof(int)))/2) +/* or: #define MAXNUMELTS 1000000 */ + /* ** GiST support methods */ @@ -141,11 +152,13 @@ g_int_compress(PG_FUNCTION_ARGS) GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0); GISTENTRY *retval; ArrayType *r; - int len; + int len, + lenr; int *dr; int i, - min, + j, cand; + int64 min; if (entry->leafkey) { @@ -186,23 +199,62 @@ g_int_compress(PG_FUNCTION_ARGS) dr = ARRPTR(r); - for (i = len - 1; i >= 0; i--) - dr[2 * i] = dr[2 * i + 1] = dr[i]; + /* + * "len" at this point is the number of ranges we will construct. + * "lenr" is the number of ranges we must eventually remove by + * merging, we must be careful to remove no more than this number. + */ + lenr = len - MAXNUMRANGE; - len *= 2; + /* + * Initially assume we can merge consecutive ints into a range. but we + * must count every value removed and stop when lenr runs out + */ + for (j = i = len - 1; i > 0 && lenr > 0; i--, j--) + { + int r_end = dr[i]; + int r_start = r_end; + while (i > 0 && lenr > 0 && dr[i-1] == r_start - 1) + --r_start, --i, --lenr; + dr[2*j] = r_start; + dr[2*j+1] = r_end; + } + /* just copy the rest, if any, as trivial ranges */ + for (; i >= 0; i--, j--) + dr[2*j] = dr[2*j + 1] = dr[i]; + + if (++j) + { + /* + * shunt everything down to start at the right place + */ + memmove((void *) &dr[0], (void *) &dr[2*j], 2*(len - j) * sizeof(int32)); + } + /* + * make "len" be number of array elements, not ranges + */ + len = 2*(len - j); cand = 1; while (len > MAXNUMRANGE * 2) { - min = INT_MAX; + min = PG_INT64_MAX; for (i = 2; i < len; i += 2) - if (min > (dr[i] - dr[i - 1])) + if (min > ((int64)dr[i] - (int64)dr[i - 1])) { - min = (dr[i] - dr[i - 1]); + min = ((int64)dr[i] - (int64)dr[i - 1]); cand = i; } memmove((void *) &dr[cand - 1], (void *) &dr[cand + 1], (len - cand - 1) * sizeof(int32)); len -= 2; } + /* + * check sparseness of result + */ + lenr = internal_size(dr, len); + if (lenr < 0 || lenr > MAXNUMELTS) + ereport(ERROR, + (errmsg("data is too sparse, recreate index using gist__intbig_ops opclass instead"))); + r = resize_intArrayType(r, len); retval = palloc(sizeof(GISTENTRY)); gistentryinit(*retval, PointerGetDatum(r), @@ -260,6 +312,9 @@ g_int_decompress(PG_FUNCTION_ARGS) din = ARRPTR(in); lenr = internal_size(din, lenin); + if (lenr < 0 || lenr > MAXNUMELTS) + ereport(ERROR, + (errmsg("compressed array is too big, recreate index using gist__intbig_ops opclass instead"))); r = new_intArrayType(lenr); dr = ARRPTR(r); diff --git a/contrib/intarray/_int_tool.c b/contrib/intarray/_int_tool.c index d86485dfa5..e12d15c21e 100644 --- a/contrib/intarray/_int_tool.c +++ b/contrib/intarray/_int_tool.c @@ -3,6 +3,8 @@ */ #include "postgres.h" +#include + #include "catalog/pg_type.h" #include "_int.h" @@ -225,6 +227,7 @@ new_intArrayType(int num) /* if no elements, return a zero-dimensional array */ if (num <= 0) { + Assert(num == 0); r = construct_empty_array(INT4OID); return r; } @@ -252,6 +255,7 @@ resize_intArrayType(ArrayType *a, int num) /* if no elements, return a zero-dimensional array */ if (num <= 0) { + Assert(num == 0); ARR_NDIM(a) = 0; return a; } @@ -288,16 +292,18 @@ copy_intArrayType(ArrayType *a) int internal_size(int *a, int len) { - int i, - size = 0; + int i; + int64 size = 0; for (i = 0; i < len; i += 2) { if (!i || a[i] != a[i - 1]) /* do not count repeated range */ - size += a[i + 1] - a[i] + 1; + size += (int64)(a[i + 1]) - (int64)(a[i]) + 1; } - return size; + if (size > (int64)INT_MAX || size < (int64)INT_MIN) + return -1; /* overflow */ + return (int) size; } /* unique-ify elements of r in-place ... r must be sorted already */