From 3043810d977b8197f9671c98439104db80b8e914 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 31 May 2001 18:16:55 +0000 Subject: [PATCH] Updates to make GIST work with multi-key indexes (from Oleg Bartunov and Teodor Sigaev). Declare key values as Datum where appropriate, rather than char* (Tom Lane). --- contrib/cube/cube.c | 65 +-- contrib/intarray/_int.c | 128 ++-- contrib/seg/seg.c | 61 +- src/backend/access/gist/gist.c | 903 +++++++++++++++++++++-------- src/backend/access/gist/gistget.c | 27 +- src/backend/access/gist/gistscan.c | 6 +- src/backend/access/index/indexam.c | 8 +- src/backend/commands/indexcmds.c | 10 +- src/include/access/gist.h | 72 +-- 9 files changed, 842 insertions(+), 438 deletions(-) diff --git a/contrib/cube/cube.c b/contrib/cube/cube.c index 4d6169a480..72f6e5a069 100644 --- a/contrib/cube/cube.c +++ b/contrib/cube/cube.c @@ -45,7 +45,7 @@ NDBOX *g_cube_binary_union(NDBOX * r1, NDBOX * r2, int *sizep); bool *g_cube_same(NDBOX * b1, NDBOX * b2, bool *result); /* -** R-tree suport functions +** R-tree support functions */ bool cube_same(NDBOX * a, NDBOX * b); bool cube_different(NDBOX * a, NDBOX * b); @@ -168,13 +168,15 @@ g_cube_consistent(GISTENTRY *entry, { /* - * * if entry is not leaf, use g_cube_internal_consistent, * else use + * if entry is not leaf, use g_cube_internal_consistent, else use * g_cube_leaf_consistent */ if (GIST_LEAF(entry)) - return (g_cube_leaf_consistent((NDBOX *) (entry->pred), query, strategy)); + return g_cube_leaf_consistent((NDBOX *) DatumGetPointer(entry->key), + query, strategy); else - return (g_cube_internal_consistent((NDBOX *) (entry->pred), query, strategy)); + return g_cube_internal_consistent((NDBOX *) DatumGetPointer(entry->key), + query, strategy); } @@ -194,7 +196,7 @@ g_cube_union(bytea *entryvec, int *sizep) * fprintf(stderr, "union\n"); */ numranges = (VARSIZE(entryvec) - VARHDRSZ) / sizeof(GISTENTRY); - tmp = (NDBOX *) (((GISTENTRY *) (VARDATA(entryvec)))[0]).pred; + tmp = (NDBOX *) DatumGetPointer((((GISTENTRY *) (VARDATA(entryvec)))[0]).key); /* * sizep = sizeof(NDBOX); -- NDBOX has variable size @@ -204,14 +206,8 @@ g_cube_union(bytea *entryvec, int *sizep) for (i = 1; i < numranges; i++) { out = g_cube_binary_union(tmp, (NDBOX *) - (((GISTENTRY *) (VARDATA(entryvec)))[i]).pred, + DatumGetPointer((((GISTENTRY *) (VARDATA(entryvec)))[i]).key), sizep); - - /* - * fprintf(stderr, "\t%s ^ %s -> %s\n", cube_out(tmp), - * cube_out((NDBOX *)(((GISTENTRY - * *)(VARDATA(entryvec)))[i]).pred), cube_out(out)); - */ if (i > 1) pfree(tmp); tmp = out; @@ -243,15 +239,16 @@ g_cube_decompress(GISTENTRY *entry) float * g_cube_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result) { - Datum ud; + NDBOX *ud; float tmp1, tmp2; - ud = (Datum) cube_union((NDBOX *) (origentry->pred), (NDBOX *) (newentry->pred)); - rt_cube_size((NDBOX *) ud, &tmp1); - rt_cube_size((NDBOX *) (origentry->pred), &tmp2); + ud = cube_union((NDBOX *) DatumGetPointer(origentry->key), + (NDBOX *) DatumGetPointer(newentry->key)); + rt_cube_size(ud, &tmp1); + rt_cube_size((NDBOX *) DatumGetPointer(origentry->key), &tmp2); *result = tmp1 - tmp2; - pfree((char *) ud); + pfree(ud); /* * fprintf(stderr, "penalty\n"); fprintf(stderr, "\t%g\n", *result); @@ -308,16 +305,16 @@ g_cube_picksplit(bytea *entryvec, for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) { - datum_alpha = (NDBOX *) (((GISTENTRY *) (VARDATA(entryvec)))[i].pred); + datum_alpha = (NDBOX *) DatumGetPointer(((GISTENTRY *) (VARDATA(entryvec)))[i].key); for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) { - datum_beta = (NDBOX *) (((GISTENTRY *) (VARDATA(entryvec)))[j].pred); + datum_beta = (NDBOX *) DatumGetPointer(((GISTENTRY *) (VARDATA(entryvec)))[j].key); /* compute the wasted space by unioning these guys */ /* size_waste = size_union - size_inter; */ - union_d = (NDBOX *) cube_union(datum_alpha, datum_beta); + union_d = cube_union(datum_alpha, datum_beta); rt_cube_size(union_d, &size_union); - inter_d = (NDBOX *) cube_inter(datum_alpha, datum_beta); + inter_d = cube_inter(datum_alpha, datum_beta); rt_cube_size(inter_d, &size_inter); size_waste = size_union - size_inter; @@ -346,12 +343,12 @@ g_cube_picksplit(bytea *entryvec, right = v->spl_right; v->spl_nright = 0; - datum_alpha = (NDBOX *) (((GISTENTRY *) (VARDATA(entryvec)))[seed_1].pred); - datum_l = (NDBOX *) cube_union(datum_alpha, datum_alpha); - rt_cube_size((NDBOX *) datum_l, &size_l); - datum_beta = (NDBOX *) (((GISTENTRY *) (VARDATA(entryvec)))[seed_2].pred);; - datum_r = (NDBOX *) cube_union(datum_beta, datum_beta); - rt_cube_size((NDBOX *) datum_r, &size_r); + datum_alpha = (NDBOX *) DatumGetPointer(((GISTENTRY *) (VARDATA(entryvec)))[seed_1].key); + datum_l = cube_union(datum_alpha, datum_alpha); + rt_cube_size(datum_l, &size_l); + datum_beta = (NDBOX *) DatumGetPointer(((GISTENTRY *) (VARDATA(entryvec)))[seed_2].key); + datum_r = cube_union(datum_beta, datum_beta); + rt_cube_size(datum_r, &size_r); /* * Now split up the regions between the two seeds. An important @@ -389,11 +386,11 @@ g_cube_picksplit(bytea *entryvec, } /* okay, which page needs least enlargement? */ - datum_alpha = (NDBOX *) (((GISTENTRY *) (VARDATA(entryvec)))[i].pred); - union_dl = (NDBOX *) cube_union(datum_l, datum_alpha); - union_dr = (NDBOX *) cube_union(datum_r, datum_alpha); - rt_cube_size((NDBOX *) union_dl, &size_alpha); - rt_cube_size((NDBOX *) union_dr, &size_beta); + datum_alpha = (NDBOX *) DatumGetPointer(((GISTENTRY *) (VARDATA(entryvec)))[i].key); + union_dl = cube_union(datum_l, datum_alpha); + union_dr = cube_union(datum_r, datum_alpha); + rt_cube_size(union_dl, &size_alpha); + rt_cube_size(union_dr, &size_beta); /* pick which page to add it to */ if (size_alpha - size_l < size_beta - size_r) @@ -417,8 +414,8 @@ g_cube_picksplit(bytea *entryvec, } *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ - v->spl_ldatum = (char *) datum_l; - v->spl_rdatum = (char *) datum_r; + v->spl_ldatum = PointerGetDatum(datum_l); + v->spl_rdatum = PointerGetDatum(datum_r); return v; } diff --git a/contrib/intarray/_int.c b/contrib/intarray/_int.c index f15d3ca112..dc49355739 100644 --- a/contrib/intarray/_int.c +++ b/contrib/intarray/_int.c @@ -34,11 +34,7 @@ /* useful macros for accessing int4 arrays */ #define ARRPTR(x) ( (int4 *) ARR_DATA_PTR(x) ) -#ifdef PGSQL71 #define ARRNELEMS(x) ArrayGetNItems( ARR_NDIM(x), ARR_DIMS(x)) -#else -#define ARRNELEMS(x) getNitems( ARR_NDIM(x), ARR_DIMS(x)) -#endif #define ARRISNULL(x) ( (x) ? ( ( ARR_NDIM(x) == NDIM ) ? ( ( ARRNELEMS( x ) ) ? 0 : 1 ) : 1 ) : 1 ) @@ -228,14 +224,17 @@ g_int_consistent(GISTENTRY *entry, switch (strategy) { case RTOverlapStrategyNumber: - retval = (bool) inner_int_overlap((ArrayType *) (entry->pred), query); + retval = inner_int_overlap((ArrayType *) DatumGetPointer(entry->key), + query); break; case RTSameStrategyNumber: case RTContainsStrategyNumber: - retval = (bool) inner_int_contains((ArrayType *) (entry->pred), query); + retval = inner_int_contains((ArrayType *) DatumGetPointer(entry->key), + query); break; case RTContainedByStrategyNumber: - retval = (bool) inner_int_overlap((ArrayType *) (entry->pred), query); + retval = inner_int_overlap((ArrayType *) DatumGetPointer(entry->key), + query); break; default: retval = FALSE; @@ -265,14 +264,10 @@ g_int_compress(GISTENTRY *entry) retval = palloc(sizeof(GISTENTRY)); -#ifdef PGSQL71 - if (entry->pred) - r = (ArrayType *) PG_DETOAST_DATUM_COPY(entry->pred); + if (DatumGetPointer(entry->key) != NULL) + r = (ArrayType *) PG_DETOAST_DATUM_COPY(entry->key); else r = NULL; -#else - r = copy_intArrayType((ArrayType *) entry->pred); -#endif if (ARRISNULL(r)) { @@ -280,10 +275,10 @@ g_int_compress(GISTENTRY *entry) elog(NOTICE, "COMP IN: NULL"); #endif if (r) - if ((char *) r != (char *) entry->pred) + if (r != (ArrayType *) DatumGetPointer(entry->key)) pfree(r); - gistentryinit(*retval, (char *) NULL, entry->rel, entry->page, entry->offset, + gistentryinit(*retval, (Datum) 0, entry->rel, entry->page, entry->offset, 0, FALSE); return (retval); } @@ -322,7 +317,8 @@ g_int_compress(GISTENTRY *entry) r = resize_intArrayType(r, len); } - gistentryinit(*retval, (char *) r, entry->rel, entry->page, entry->offset, VARSIZE(r), FALSE); + gistentryinit(*retval, PointerGetDatum(r), + entry->rel, entry->page, entry->offset, VARSIZE(r), FALSE); return (retval); } @@ -340,25 +336,19 @@ g_int_decompress(GISTENTRY *entry) int i, j; -#ifdef PGSQL71 - if (entry->pred) - in = (ArrayType *) PG_DETOAST_DATUM(entry->pred); + if (DatumGetPointer(entry->key) != NULL) + in = (ArrayType *) PG_DETOAST_DATUM(entry->key); else in = NULL; -#else - in = (ArrayType *) entry->pred; -#endif if (entry->bytes < ARR_OVERHEAD(NDIM) || ARRISNULL(in)) { retval = palloc(sizeof(GISTENTRY)); -#ifdef PGSQL71 if (in) - if ((char *) in != (char *) entry->pred) + if (in != (ArrayType *) DatumGetPointer(entry->key)) pfree(in); -#endif - gistentryinit(*retval, (char *) NULL, entry->rel, entry->page, entry->offset, 0, FALSE); + gistentryinit(*retval, (Datum) 0, entry->rel, entry->page, entry->offset, 0, FALSE); #ifdef GIST_DEBUG elog(NOTICE, "DECOMP IN: NULL"); #endif @@ -372,7 +362,7 @@ g_int_decompress(GISTENTRY *entry) if (lenin < 2 * MAXNUMRANGE) { /* not comressed value */ /* sometimes strange bytesize */ - gistentryinit(*entry, (char *) in, entry->rel, entry->page, entry->offset, VARSIZE(in), FALSE); + gistentryinit(*entry, PointerGetDatum(in), entry->rel, entry->page, entry->offset, VARSIZE(in), FALSE); return (entry); } @@ -390,13 +380,11 @@ g_int_decompress(GISTENTRY *entry) if ((!i) || *(dr - 1) != j) *dr++ = j; -#ifdef PGSQL71 - if ((char *) in != (char *) entry->pred) + if (in != (ArrayType *) DatumGetPointer(entry->key)) pfree(in); -#endif retval = palloc(sizeof(GISTENTRY)); - gistentryinit(*retval, (char *) r, entry->rel, entry->page, entry->offset, VARSIZE(r), FALSE); + gistentryinit(*retval, PointerGetDatum(r), entry->rel, entry->page, entry->offset, VARSIZE(r), FALSE); return (retval); } @@ -835,9 +823,6 @@ new_intArrayType(int num) MemSet(r, 0, nbytes); r->size = nbytes; r->ndim = NDIM; -#ifndef PGSQL71 - SET_LO_FLAG(false, r); -#endif *((int *) ARR_DIMS(r)) = num; *((int *) ARR_LBOUND(r)) = 1; @@ -1056,14 +1041,10 @@ g_intbig_compress(GISTENTRY *entry) ArrayType *r, *in; -#ifdef PGSQL71 - if (entry->pred) - in = (ArrayType *) PG_DETOAST_DATUM(entry->pred); + if (DatumGetPointer(entry->key) != NULL) + in = (ArrayType *) PG_DETOAST_DATUM(entry->key); else in = NULL; -#else - in = (ArrayType *) entry->pred; -#endif if (!entry->leafkey) return entry; @@ -1072,12 +1053,10 @@ g_intbig_compress(GISTENTRY *entry) if (ARRISNULL(in)) { -#ifdef PGSQL71 if (in) - if ((char *) in != (char *) entry->pred) + if (in != (ArrayType *) DatumGetPointer(entry->key)) pfree(in); -#endif - gistentryinit(*retval, (char *) NULL, entry->rel, entry->page, entry->offset, 0, FALSE); + gistentryinit(*retval, (Datum) 0, entry->rel, entry->page, entry->offset, 0, FALSE); return (retval); } @@ -1086,13 +1065,11 @@ g_intbig_compress(GISTENTRY *entry) ARRPTR(in), ARRNELEMS(in)); - gistentryinit(*retval, (char *) r, entry->rel, entry->page, entry->offset, VARSIZE(r), FALSE); + gistentryinit(*retval, PointerGetDatum(r), entry->rel, entry->page, entry->offset, VARSIZE(r), FALSE); -#ifdef PGSQL71 if (in) - if ((char *) in != (char *) entry->pred) + if (in != (ArrayType *) DatumGetPointer(entry->key)) pfree(in); -#endif return (retval); } @@ -1100,20 +1077,18 @@ g_intbig_compress(GISTENTRY *entry) GISTENTRY * g_intbig_decompress(GISTENTRY *entry) { -#ifdef PGSQL71 ArrayType *key; - key = (ArrayType *) PG_DETOAST_DATUM(entry->pred); - if ((char *) key != (char *) entry->pred) + key = (ArrayType *) PG_DETOAST_DATUM(entry->key); + if (key != (ArrayType *) DatumGetPointer(entry->key)) { GISTENTRY *retval; retval = palloc(sizeof(GISTENTRY)); - gistentryinit(*retval, (char *) key, entry->rel, entry->page, entry->offset, VARSIZE(key), FALSE); + gistentryinit(*retval, PointerGetDatum(key), entry->rel, entry->page, entry->offset, VARSIZE(key), FALSE); return retval; } -#endif return entry; } @@ -1159,14 +1134,14 @@ g_intbig_consistent(GISTENTRY *entry, ArrayType *query, StrategyNumber strategy) switch (strategy) { case RTOverlapStrategyNumber: - retval = (bool) _intbig_overlap((ArrayType *) (entry->pred), q); + retval = _intbig_overlap((ArrayType *) DatumGetPointer(entry->key), q); break; case RTSameStrategyNumber: case RTContainsStrategyNumber: - retval = (bool) _intbig_contains((ArrayType *) (entry->pred), q); + retval = _intbig_contains((ArrayType *) DatumGetPointer(entry->key), q); break; case RTContainedByStrategyNumber: - retval = (bool) _intbig_overlap((ArrayType *) (entry->pred), q); + retval = _intbig_overlap((ArrayType *) DatumGetPointer(entry->key), q); break; default: retval = FALSE; @@ -1196,12 +1171,12 @@ _int_common_union(bytea *entryvec, int *sizep, formarray unionf) #endif numranges = (VARSIZE(entryvec) - VARHDRSZ) / sizeof(GISTENTRY); - tmp = (ArrayType *) (((GISTENTRY *) (VARDATA(entryvec)))[0]).pred; + tmp = (ArrayType *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[0].key); for (i = 1; i < numranges; i++) { out = (*unionf) (tmp, (ArrayType *) - (((GISTENTRY *) (VARDATA(entryvec)))[i]).pred); + DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key)); if (i > 1 && tmp) pfree(tmp); tmp = out; @@ -1232,18 +1207,19 @@ _int_common_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result, formarray unionf, formfloat sizef) { - Datum ud; + ArrayType *ud; float tmp1, tmp2; #ifdef GIST_DEBUG elog(NOTICE, "penalty"); #endif - ud = (Datum) (*unionf) ((ArrayType *) (origentry->pred), (ArrayType *) (newentry->pred)); - (*sizef) ((ArrayType *) ud, &tmp1); - (*sizef) ((ArrayType *) (origentry->pred), &tmp2); + ud = (*unionf) ((ArrayType *) DatumGetPointer(origentry->key), + (ArrayType *) DatumGetPointer(newentry->key)); + (*sizef) (ud, &tmp1); + (*sizef) ((ArrayType *) DatumGetPointer(origentry->key), &tmp2); *result = tmp1 - tmp2; - pfree((char *) ud); + pfree(ud); #ifdef GIST_DEBUG elog(NOTICE, "--penalty\t%g", *result); @@ -1304,10 +1280,10 @@ _int_common_picksplit(bytea *entryvec, for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) { - datum_alpha = (ArrayType *) (((GISTENTRY *) (VARDATA(entryvec)))[i].pred); + datum_alpha = (ArrayType *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key); for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) { - datum_beta = (ArrayType *) (((GISTENTRY *) (VARDATA(entryvec)))[j].pred); + datum_beta = (ArrayType *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[j].key); /* compute the wasted space by unioning these guys */ /* size_waste = size_union - size_inter; */ @@ -1342,12 +1318,12 @@ _int_common_picksplit(bytea *entryvec, right = v->spl_right; v->spl_nright = 0; - datum_alpha = (ArrayType *) (((GISTENTRY *) (VARDATA(entryvec)))[seed_1].pred); + datum_alpha = (ArrayType *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[seed_1].key); datum_l = copy_intArrayType(datum_alpha); - (*sizef) ((ArrayType *) datum_l, &size_l); - datum_beta = (ArrayType *) (((GISTENTRY *) (VARDATA(entryvec)))[seed_2].pred); + (*sizef) (datum_l, &size_l); + datum_beta = (ArrayType *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[seed_2].key); datum_r = copy_intArrayType(datum_beta); - (*sizef) ((ArrayType *) datum_r, &size_r); + (*sizef) (datum_r, &size_r); /* * Now split up the regions between the two seeds. An important @@ -1386,11 +1362,11 @@ _int_common_picksplit(bytea *entryvec, } /* okay, which page needs least enlargement? */ - datum_alpha = (ArrayType *) (((GISTENTRY *) (VARDATA(entryvec)))[i].pred); - union_dl = (ArrayType *) (*unionf) (datum_l, datum_alpha); - union_dr = (ArrayType *) (*unionf) (datum_r, datum_alpha); - (*sizef) ((ArrayType *) union_dl, &size_alpha); - (*sizef) ((ArrayType *) union_dr, &size_beta); + datum_alpha = (ArrayType *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key); + union_dl = (*unionf) (datum_l, datum_alpha); + union_dr = (*unionf) (datum_r, datum_alpha); + (*sizef) (union_dl, &size_alpha); + (*sizef) (union_dr, &size_beta); /* pick which page to add it to */ if (size_alpha - size_l < size_beta - size_r + WISH_F(v->spl_nleft, v->spl_nright, coef)) @@ -1428,8 +1404,8 @@ _int_common_picksplit(bytea *entryvec, *(right - 1) = InvalidOffsetNumber; } - v->spl_ldatum = (char *) datum_l; - v->spl_rdatum = (char *) datum_r; + v->spl_ldatum = PointerGetDatum(datum_l); + v->spl_rdatum = PointerGetDatum(datum_r); #ifdef GIST_DEBUG elog(NOTICE, "--------ENDpicksplit %d %d", v->spl_nleft, v->spl_nright); diff --git a/contrib/seg/seg.c b/contrib/seg/seg.c index 3ed7926389..124299ab44 100644 --- a/contrib/seg/seg.c +++ b/contrib/seg/seg.c @@ -225,9 +225,9 @@ gseg_consistent(GISTENTRY *entry, * gseg_leaf_consistent */ if (GIST_LEAF(entry)) - return (gseg_leaf_consistent((SEG *) (entry->pred), query, strategy)); + return (gseg_leaf_consistent((SEG *) DatumGetPointer(entry->key), query, strategy)); else - return (gseg_internal_consistent((SEG *) (entry->pred), query, strategy)); + return (gseg_internal_consistent((SEG *) DatumGetPointer(entry->key), query, strategy)); } /* @@ -247,22 +247,14 @@ gseg_union(bytea *entryvec, int *sizep) #endif numranges = (VARSIZE(entryvec) - VARHDRSZ) / sizeof(GISTENTRY); - tmp = (SEG *) (((GISTENTRY *) (VARDATA(entryvec)))[0]).pred; + tmp = (SEG *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[0].key); *sizep = sizeof(SEG); for (i = 1; i < numranges; i++) { out = gseg_binary_union(tmp, (SEG *) - (((GISTENTRY *) (VARDATA(entryvec)))[i]).pred, + DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key), sizep); -#ifdef GIST_DEBUG - - /* - * fprintf(stderr, "\t%s ^ %s -> %s\n", seg_out(tmp), seg_out((SEG - * *)(((GISTENTRY *)(VARDATA(entryvec)))[i]).pred), seg_out(out)); - */ -#endif - if (i > 1) pfree(tmp); tmp = out; @@ -294,15 +286,16 @@ gseg_decompress(GISTENTRY *entry) float * gseg_penalty(GISTENTRY *origentry, GISTENTRY *newentry, float *result) { - Datum ud; + SEG *ud; float tmp1, tmp2; - ud = (Datum) seg_union((SEG *) (origentry->pred), (SEG *) (newentry->pred)); - rt_seg_size((SEG *) ud, &tmp1); - rt_seg_size((SEG *) (origentry->pred), &tmp2); + ud = seg_union((SEG *) DatumGetPointer(origentry->key), + (SEG *) DatumGetPointer(newentry->key)); + rt_seg_size(ud, &tmp1); + rt_seg_size((SEG *) DatumGetPointer(origentry->key), &tmp2); *result = tmp1 - tmp2; - pfree((char *) ud); + pfree(ud); #ifdef GIST_DEBUG fprintf(stderr, "penalty\n"); @@ -362,16 +355,16 @@ gseg_picksplit(bytea *entryvec, for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) { - datum_alpha = (SEG *) (((GISTENTRY *) (VARDATA(entryvec)))[i].pred); + datum_alpha = (SEG *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key); for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) { - datum_beta = (SEG *) (((GISTENTRY *) (VARDATA(entryvec)))[j].pred); + datum_beta = (SEG *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[j].key); /* compute the wasted space by unioning these guys */ /* size_waste = size_union - size_inter; */ - union_d = (SEG *) seg_union(datum_alpha, datum_beta); + union_d = seg_union(datum_alpha, datum_beta); rt_seg_size(union_d, &size_union); - inter_d = (SEG *) seg_inter(datum_alpha, datum_beta); + inter_d = seg_inter(datum_alpha, datum_beta); rt_seg_size(inter_d, &size_inter); size_waste = size_union - size_inter; @@ -400,12 +393,12 @@ gseg_picksplit(bytea *entryvec, right = v->spl_right; v->spl_nright = 0; - datum_alpha = (SEG *) (((GISTENTRY *) (VARDATA(entryvec)))[seed_1].pred); - datum_l = (SEG *) seg_union(datum_alpha, datum_alpha); - rt_seg_size((SEG *) datum_l, &size_l); - datum_beta = (SEG *) (((GISTENTRY *) (VARDATA(entryvec)))[seed_2].pred);; - datum_r = (SEG *) seg_union(datum_beta, datum_beta); - rt_seg_size((SEG *) datum_r, &size_r); + datum_alpha = (SEG *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[seed_1].key); + datum_l = seg_union(datum_alpha, datum_alpha); + rt_seg_size(datum_l, &size_l); + datum_beta = (SEG *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[seed_2].key); + datum_r = seg_union(datum_beta, datum_beta); + rt_seg_size(datum_r, &size_r); /* * Now split up the regions between the two seeds. An important @@ -443,11 +436,11 @@ gseg_picksplit(bytea *entryvec, } /* okay, which page needs least enlargement? */ - datum_alpha = (SEG *) (((GISTENTRY *) (VARDATA(entryvec)))[i].pred); - union_dl = (SEG *) seg_union(datum_l, datum_alpha); - union_dr = (SEG *) seg_union(datum_r, datum_alpha); - rt_seg_size((SEG *) union_dl, &size_alpha); - rt_seg_size((SEG *) union_dr, &size_beta); + datum_alpha = (SEG *) DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key); + union_dl = seg_union(datum_l, datum_alpha); + union_dr = seg_union(datum_r, datum_alpha); + rt_seg_size(union_dl, &size_alpha); + rt_seg_size(union_dr, &size_beta); /* pick which page to add it to */ if (size_alpha - size_l < size_beta - size_r) @@ -471,8 +464,8 @@ gseg_picksplit(bytea *entryvec, } *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ - v->spl_ldatum = (char *) datum_l; - v->spl_rdatum = (char *) datum_r; + v->spl_ldatum = PointerGetDatum(datum_l); + v->spl_rdatum = PointerGetDatum(datum_r); return v; } diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 8f23d14d79..8f0cfbbb49 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.77 2001/05/30 19:53:39 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.78 2001/05/31 18:16:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -26,10 +26,23 @@ #include "access/xlogutils.h" +#undef GIST_PAGEADDITEM + +#define ATTSIZE( datum, Rel, i, isnull ) \ + ( \ + ( isnull ) ? 0 : \ + att_addlength(0, (Rel)->rd_att->attrs[(i)-1]->attlen, (datum)) \ + ) + /* result's status */ #define INSERTED 0x01 #define SPLITED 0x02 +/* group flags ( in gistSplit ) */ +#define LEFT_ADDED 0x01 +#define RIGHT_ADDED 0x02 +#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) + /* non-export function prototypes */ static void gistdoinsert(Relation r, IndexTuple itup, @@ -55,10 +68,15 @@ static IndexTuple *gistjoinvector( IndexTuple *additvec, int addlen); static IndexTuple gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate); + static IndexTuple gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate); +static int gistfindgroup( GISTSTATE *giststate, + GISTENTRY *valvec, GIST_SPLITVEC * spl ); +static IndexTuple gistFormTuple( GISTSTATE *giststate, + Relation r, Datum attdata[], int datumsize[] ); static IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, @@ -71,13 +89,18 @@ static void GISTInitBuffer(Buffer b, uint32 f); static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it, GISTSTATE *giststate); +#ifdef GIST_PAGEADDITEM static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t); -static void gistcentryinit(GISTSTATE *giststate, - GISTENTRY *e, char *pr, +#endif +static void gistcentryinit(GISTSTATE *giststate, int nkey, + GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, int b, bool l); - +static bool gistDeCompressAtt( GISTSTATE *giststate, Relation r, + IndexTuple tuple, Page p, OffsetNumber o, + GISTENTRY attdata[], bool decompvec[] ); +static void gistFreeAtt( Relation r, GISTENTRY attdata[], bool decompvec[] ); #undef GISTDEBUG #ifdef GISTDEBUG @@ -230,15 +253,15 @@ gistbuild(PG_FUNCTION_ARGS) /* immediately compress keys to normalize */ for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) { - gistcentryinit(&giststate, &tmpcentry, (char *) attdata[i], + gistcentryinit(&giststate, i, &tmpcentry, attdata[i], (Relation) NULL, (Page) NULL, (OffsetNumber) 0, -1 /* size is currently bogus */ , TRUE); - if (attdata[i] != (Datum) tmpcentry.pred && + if (attdata[i] != tmpcentry.key && !(giststate.keytypbyval)) compvec[i] = TRUE; else compvec[i] = FALSE; - attdata[i] = (Datum) tmpcentry.pred; + attdata[i] = tmpcentry.key; } /* form an index tuple and point it at the heap tuple */ @@ -252,7 +275,6 @@ gistbuild(PG_FUNCTION_ARGS) * is the right thing to do if you're inserting single tups, but * not when you're initializing the whole index at once. */ - gistdoinsert(index, itup, NULL, &giststate); for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) @@ -339,14 +361,14 @@ gistinsert(PG_FUNCTION_ARGS) compvec = (bool *) palloc(sizeof(bool) * r->rd_att->natts); for (i = 0; i < r->rd_att->natts; i++) { - gistcentryinit(&giststate, &tmpentry, (char *) datum[i], + gistcentryinit(&giststate, i,&tmpentry, datum[i], (Relation) NULL, (Page) NULL, (OffsetNumber) 0, -1 /* size is currently bogus */ , TRUE); - if (datum[i] != (Datum) tmpentry.pred && !(giststate.keytypbyval)) + if (datum[i] != tmpentry.key && !(giststate.keytypbyval)) compvec[i] = TRUE; else compvec[i] = FALSE; - datum[i] = (Datum) tmpentry.pred; + datum[i] = tmpentry.key; } itup = index_formtuple(RelationGetDescr(r), datum, nulls); itup->t_tid = *ht_ctid; @@ -361,13 +383,14 @@ gistinsert(PG_FUNCTION_ARGS) gistdoinsert(r, itup, &res, &giststate); for (i = 0; i < r->rd_att->natts; i++) if (compvec[i] == TRUE) - pfree((char *) datum[i]); + pfree(DatumGetPointer(datum[i])); pfree(itup); pfree(compvec); PG_RETURN_POINTER(res); } +#ifdef GIST_PAGEADDITEM /* ** Take a compressed entry, and install it on a page. Since we now know ** where the entry will live, we decompress it and recompress it using @@ -388,16 +411,20 @@ gistPageAddItem(GISTSTATE *giststate, GISTENTRY tmpcentry; IndexTuple itup = (IndexTuple) item; OffsetNumber retval; + Datum datum; + bool IsNull; /* * recompress the item given that we now know the exact page and * offset for insertion */ - gistdentryinit(giststate, dentry, - (((char *) itup) + sizeof(IndexTupleData)), - (Relation) 0, (Page) 0, (OffsetNumber) InvalidOffsetNumber, - IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE); - gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page, + datum = index_getattr(itup, 1, r->rd_att, &IsNull); + gistdentryinit(giststate, 0,dentry, datum, + (Relation) 0, (Page) 0, + (OffsetNumber) InvalidOffsetNumber, + ATTSIZE( datum, r, 1, IsNull ), + FALSE); + gistcentryinit(giststate, 0,&tmpcentry, dentry->key, r, page, offsetNumber, dentry->bytes, FALSE); *newtup = gist_tuple_replacekey(r, tmpcentry, itup); retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), @@ -406,11 +433,13 @@ gistPageAddItem(GISTSTATE *giststate, elog(ERROR, "gist: failed to add index item to %s", RelationGetRelationName(r)); /* be tidy */ - if (tmpcentry.pred && tmpcentry.pred != dentry->pred - && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpcentry.pred); + if (DatumGetPointer(tmpcentry.key) != NULL && + tmpcentry.key != dentry->key && + tmpcentry.key != datum ) + pfree(DatumGetPointer(tmpcentry.key)); return (retval); } +#endif static void gistdoinsert(Relation r, @@ -485,7 +514,6 @@ gistlayerinsert(Relation r, BlockNumber blkno, if (!(ret & SPLITED)) { IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate); - if (!newtup) { /* not need to update key */ @@ -509,18 +537,23 @@ gistlayerinsert(Relation r, BlockNumber blkno, if (gistnospace(page, (*itup), *len)) { /* no space for insertion */ - IndexTuple *itvec; - int tlen; + IndexTuple *itvec, *newitup; + int tlen,oldlen; ret |= SPLITED; itvec = gistreadbuffer(r, buffer, &tlen); itvec = gistjoinvector(itvec, &tlen, (*itup), *len); - pfree((*itup)); - (*itup) = gistSplit(r, buffer, itvec, &tlen, giststate, + oldlen = *len; + newitup = gistSplit(r, buffer, itvec, &tlen, giststate, (opaque->flags & F_LEAF) ? res : NULL); /* res only for * inserting in leaf */ ReleaseBuffer(buffer); + do + pfree( (*itup)[ oldlen-1 ] ); + while ( (--oldlen) > 0 ); + pfree((*itup)); pfree(itvec); + *itup = newitup; *len = tlen; /* now tlen >= 2 */ } else @@ -551,6 +584,7 @@ gistlayerinsert(Relation r, BlockNumber blkno, * parent */ IndexTuple newtup = gistunion(r, (*itup), *len, giststate); + ItemPointerSet(&(newtup->t_tid), blkno, 1); for (i = 0; i < *len; i++) pfree((*itup)[i]); @@ -571,19 +605,30 @@ gistwritebuffer(Relation r, Page page, IndexTuple *itup, { OffsetNumber l = InvalidOffsetNumber; int i; +#ifdef GIST_PAGEADDITEM GISTENTRY tmpdentry; IndexTuple newtup; - + bool IsNull; +#endif for (i = 0; i < len; i++) { +#ifdef GIST_PAGEADDITEM l = gistPageAddItem(giststate, r, page, (Item) itup[i], IndexTupleSize(itup[i]), off, LP_USED, &tmpdentry, &newtup); off = OffsetNumberNext(off); - if (tmpdentry.pred != (((char *) itup[i]) + sizeof(IndexTupleData)) && tmpdentry.pred) - pfree(tmpdentry.pred); + if (DatumGetPointer(tmpdentry.key) != NULL && + tmpdentry.key != index_getattr(itup[i], 1, r->rd_att, &IsNull)) + pfree(DatumGetPointer(tmpdentry.key)); if (itup[i] != newtup) pfree(newtup); +#else + l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), + off, LP_USED); + if (l == InvalidOffsetNumber) + elog(ERROR, "gist: failed to add index item to %s", + RelationGetRelationName(r)); +#endif } return l; } @@ -598,7 +643,7 @@ gistnospace(Page page, IndexTuple *itvec, int len) int i; for (i = 0; i < len; i++) - size += IndexTupleSize(itvec[i]) + 4; /* ??? */ + size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); return (PageGetFreeSpace(page) < size); } @@ -614,11 +659,11 @@ gistreadbuffer(Relation r, Buffer buffer, int *len /* out */ ) IndexTuple *itvec; Page p = (Page) BufferGetPage(buffer); - *len = 0; maxoff = PageGetMaxOffsetNumber(p); + *len = maxoff; itvec = palloc(sizeof(IndexTuple) * maxoff); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - itvec[(*len)++] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + itvec[i-1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); return itvec; } @@ -639,46 +684,66 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) * return union of itup vector */ static IndexTuple -gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) -{ +gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { + Datum attr[INDEX_MAX_KEYS]; + bool whatfree[INDEX_MAX_KEYS]; + char isnull[INDEX_MAX_KEYS]; bytea *evec; - char *datum; + Datum datum; int datumsize, - i; - GISTENTRY centry; - char isnull; - IndexTuple newtup; + i,j; + GISTENTRY centry[INDEX_MAX_KEYS]; + bool *needfree; + IndexTuple newtup; + bool IsNull; + needfree = (bool *) palloc(len * sizeof(bool)); evec = (bytea *) palloc(len * sizeof(GISTENTRY) + VARHDRSZ); VARATT_SIZEP(evec) = len * sizeof(GISTENTRY) + VARHDRSZ; - for (i = 0; i < len; i++) - gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[i], - (char *) itvec[i] + sizeof(IndexTupleData), - (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - IndexTupleSize((IndexTuple) itvec[i]) - sizeof(IndexTupleData), FALSE); + for (j = 0; j < r->rd_att->natts; j++) { + for (i = 0; i < len; i++) { + datum = index_getattr(itvec[i], j+1, r->rd_att, &IsNull); + gistdentryinit(giststate, j, + &((GISTENTRY *) VARDATA(evec))[i], + datum, + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + ATTSIZE( datum, r, j+1, IsNull ), FALSE); + if ( DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key) != NULL + && ((GISTENTRY *) VARDATA(evec))[i].key != datum ) + needfree[i] = TRUE; + else + needfree[i] = FALSE; + } - datum = (char *) - DatumGetPointer(FunctionCall2(&giststate->unionFn, - PointerGetDatum(evec), - PointerGetDatum(&datumsize))); + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); - for (i = 0; i < len; i++) - if (((GISTENTRY *) VARDATA(evec))[i].pred && - ((GISTENTRY *) VARDATA(evec))[i].pred != - ((char *) (itvec[i]) + sizeof(IndexTupleData))) - pfree(((GISTENTRY *) VARDATA(evec))[i].pred); + for (i = 0; i < len; i++) + if ( needfree[i] ) + pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key)); + + gistcentryinit(giststate, j, ¢ry[j], datum, + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + datumsize, FALSE); + isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n'; + attr[j] = centry[j].key; + if ( DatumGetPointer(centry[j].key) != NULL ) { + whatfree[j] = TRUE; + if ( centry[j].key != datum ) + pfree(DatumGetPointer(datum)); + } else + whatfree[j] = FALSE; + } pfree(evec); + pfree(needfree); - gistcentryinit(giststate, ¢ry, datum, - (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - datumsize, FALSE); - - isnull = (centry.pred) ? ' ' : 'n'; - newtup = (IndexTuple) index_formtuple(r->rd_att, (Datum *) ¢ry.pred, &isnull); - if (centry.pred != datum) - pfree(datum); + newtup = (IndexTuple) index_formtuple(r->rd_att, attr, isnull); + for (j = 0; j < r->rd_att->natts; j++) + if ( whatfree[j] ) + pfree(DatumGetPointer(attr[j])); return newtup; } @@ -690,73 +755,234 @@ static IndexTuple gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) { bytea *evec; - char *datum; + Datum datum; int datumsize; - bool result; - char isnull; - GISTENTRY centry, + bool result, neednew = false; + char isnull[INDEX_MAX_KEYS], + whatfree[INDEX_MAX_KEYS]; + Datum attr[INDEX_MAX_KEYS]; + GISTENTRY centry[INDEX_MAX_KEYS], + oldatt[INDEX_MAX_KEYS], + addatt[INDEX_MAX_KEYS], *ev0p, *ev1p; + bool olddec[INDEX_MAX_KEYS], + adddec[INDEX_MAX_KEYS]; + IndexTuple newtup = NULL; + int j; evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ); VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ; - - gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[0], - (char *) oldtup + sizeof(IndexTupleData), (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - IndexTupleSize((IndexTuple) oldtup) - sizeof(IndexTupleData), FALSE); ev0p = &((GISTENTRY *) VARDATA(evec))[0]; - - gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[1], - (char *) addtup + sizeof(IndexTupleData), (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - IndexTupleSize((IndexTuple) addtup) - sizeof(IndexTupleData), FALSE); ev1p = &((GISTENTRY *) VARDATA(evec))[1]; + + gistDeCompressAtt( giststate, r, oldtup, (Page) NULL, + (OffsetNumber) 0, oldatt, olddec); - datum = (char *) - DatumGetPointer(FunctionCall2(&giststate->unionFn, - PointerGetDatum(evec), - PointerGetDatum(&datumsize))); + gistDeCompressAtt( giststate, r, addtup, (Page) NULL, + (OffsetNumber) 0, addatt, adddec); - if (!(ev0p->pred && ev1p->pred)) - result = (ev0p->pred == NULL && ev1p->pred == NULL); - else - { - FunctionCall3(&giststate->equalFn, - PointerGetDatum(ev0p->pred), - PointerGetDatum(datum), - PointerGetDatum(&result)); - } + + for( j=0; jrd_att->natts; j++ ) { + gistentryinit(*ev0p, oldatt[j].key, r, (Page) NULL, + (OffsetNumber) 0, oldatt[j].bytes, FALSE); + gistentryinit(*ev1p, addatt[j].key, r, (Page) NULL, + (OffsetNumber) 0, addatt[j].bytes, FALSE); - if (result) - { - /* not need to update key */ - pfree(datum); - } - else - { - gistcentryinit(giststate, ¢ry, datum, ev0p->rel, ev0p->page, - ev0p->offset, datumsize, FALSE); + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); - isnull = (centry.pred) ? ' ' : 'n'; - newtup = (IndexTuple) index_formtuple(r->rd_att, (Datum *) ¢ry.pred, &isnull); - newtup->t_tid = oldtup->t_tid; - if (centry.pred != datum) - pfree(datum); - } + if (!(DatumGetPointer(ev0p->key) != NULL && + DatumGetPointer(ev1p->key) != NULL)) + result = (DatumGetPointer(ev0p->key) == NULL && + DatumGetPointer(ev1p->key) == NULL); + else + { + FunctionCall3(&giststate->equalFn[j], + ev0p->key, + datum, + PointerGetDatum(&result)); + } + if ( !result ) + neednew = true; - if (ev0p->pred && - ev0p->pred != (char *) oldtup + sizeof(IndexTupleData)) - pfree(ev0p->pred); - if (ev1p->pred && - ev1p->pred != (char *) addtup + sizeof(IndexTupleData)) - pfree(ev1p->pred); + if ( olddec[j] && DatumGetPointer(oldatt[j].key) != NULL ) + pfree( DatumGetPointer(oldatt[j].key) ); + if ( adddec[j] && DatumGetPointer(addatt[j].key) != NULL ) + pfree( DatumGetPointer(addatt[j].key) ); + + gistcentryinit(giststate, j, ¢ry[j], datum, + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + datumsize, FALSE); + + isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n'; + attr[j] = centry[j].key; + if ( DatumGetPointer(centry[j].key) != NULL ) { + whatfree[j] = TRUE; + if ( centry[j].key != datum ) + pfree(DatumGetPointer(datum)); + } else + whatfree[j] = FALSE; + + } pfree(evec); + if (neednew) { + /* need to update key */ + newtup = (IndexTuple) index_formtuple(r->rd_att, attr, isnull); + newtup->t_tid = oldtup->t_tid; + } + + for (j = 0; j < r->rd_att->natts; j++) + if ( whatfree[j] ) + pfree(DatumGetPointer(attr[j])); + return newtup; } +static void +gistunionsubkey( Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC * spl ) { + int i,j,lr; + Datum *attr; + bool *needfree, IsNull; + int len, *attrsize; + OffsetNumber *entries; + bytea *evec; + Datum datum; + int datumsize; + + for(lr=0;lr<=1;lr++) { + if ( lr ) { + attrsize = spl->spl_lattrsize; + attr = spl->spl_lattr; + len = spl->spl_nleft; + entries = spl->spl_left; + } else { + attrsize = spl->spl_rattrsize; + attr = spl->spl_rattr; + len = spl->spl_nright; + entries = spl->spl_right; + } + + needfree = (bool *) palloc( (( len==1 ) ? 2 : len ) * sizeof(bool)); + evec = (bytea *) palloc( (( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ); + VARATT_SIZEP(evec) = (( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ; + for (j = 1; j < r->rd_att->natts; j++) { + for (i = 0; i < len; i++) { + if ( spl->spl_idgrp[ entries[i] ] ) + { + datum = (Datum) 0; + IsNull = true; + } else + datum = index_getattr(itvec[ entries[i]-1 ], j+1, + r->rd_att, &IsNull); + gistdentryinit(giststate, j, + &((GISTENTRY *) VARDATA(evec))[i], + datum, + (Relation) NULL, (Page) NULL, + (OffsetNumber) NULL, + ATTSIZE( datum, r, j+1, IsNull ), FALSE); + if ( DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key) != NULL && + ((GISTENTRY *) VARDATA(evec))[i].key != datum ) + needfree[i] = TRUE; + else + needfree[i] = FALSE; + + } + if ( len == 1 && + DatumGetPointer(((GISTENTRY *) VARDATA(evec))[0].key) == NULL) + { + datum = (Datum) 0; + datumsize = 0; + } else { + /* + * ((GISTENTRY *) VARDATA(evec))[0].bytes may be not defined, + * so form union with itself + */ + if ( len == 1 ) { + memcpy( (void*) &((GISTENTRY *) VARDATA(evec))[1], + (void*) &((GISTENTRY *) VARDATA(evec))[0], + sizeof( GISTENTRY ) ); + } + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + } + + for (i = 0; i < len; i++) + if ( needfree[i] ) + pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key)); + + attr[j] = datum; + attrsize[j] = datumsize; + } + pfree(evec); + pfree(needfree); + } +} + +/* + * find group in vector with equial value + */ +static int +gistfindgroup( GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC * spl ) { + int i,j,len; + int curid = 1; + bool result; + + for(i=0; ispl_nleft; i++) { + if ( spl->spl_idgrp[ spl->spl_left[i] ]) continue; + len = 0; + /* find all equal value in right part */ + for(j=0; j < spl->spl_nright; j++) { + if ( spl->spl_idgrp[ spl->spl_right[j] ]) continue; + if (!(DatumGetPointer(valvec[ spl->spl_left[i] ].key) != NULL && + DatumGetPointer(valvec[ spl->spl_right[j]].key) != NULL)) + result = + DatumGetPointer(valvec[ spl->spl_left[i] ].key) == NULL && + DatumGetPointer(valvec[ spl->spl_right[j]].key) == NULL; + else + FunctionCall3(&giststate->equalFn[0], + valvec[ spl->spl_left[i] ].key, + valvec[ spl->spl_right[j] ].key, + PointerGetDatum(&result)); + if ( result ) { + spl->spl_idgrp[ spl->spl_right[j] ] = curid; + len++; + } + } + /* find all other equal value in left part */ + if ( len ) { + /* add current val to list of equial values*/ + spl->spl_idgrp[ spl->spl_left[i] ]=curid; + /* searching .. */ + for(j=i+1; j < spl->spl_nleft; j++) { + if ( spl->spl_idgrp[ spl->spl_left[j] ]) continue; + if (!(DatumGetPointer(valvec[ spl->spl_left[i]].key) != NULL && + DatumGetPointer(valvec[ spl->spl_left[j]].key) != NULL)) + result = + DatumGetPointer(valvec[ spl->spl_left[i]].key) == NULL && + DatumGetPointer(valvec[ spl->spl_left[j]].key) == NULL; + else + FunctionCall3(&giststate->equalFn[0], + valvec[ spl->spl_left[i] ].key, + valvec[ spl->spl_left[j] ].key, + PointerGetDatum(&result)); + if ( result ) { + spl->spl_idgrp[ spl->spl_left[j] ] = curid; + len++; + } + } + spl->spl_ngrp[curid] = len+1; + curid++; + } + } + + return curid; +} + /* * gistSplit -- split a page in the tree. */ @@ -773,23 +999,20 @@ gistSplit(Relation r, rightbuf; Page left, right; - OffsetNumber *spl_left, - *spl_right; IndexTuple *lvectup, *rvectup, *newtup; - int leftoff, - rightoff; BlockNumber lbknum, rbknum; GISTPageOpaque opaque; - char isnull; GIST_SPLITVEC v; bytea *entryvec; bool *decompvec; - GISTENTRY tmpentry; - int i, + int i,j, nlen; + int MaxGrpId = 1; + Datum datum; + bool IsNull; p = (Page) BufferGetPage(buffer); opaque = (GISTPageOpaque) PageGetSpecialPointer(p); @@ -827,49 +1050,187 @@ gistSplit(Relation r, VARATT_SIZEP(entryvec) = (*len + 1) * sizeof(GISTENTRY) + VARHDRSZ; for (i = 1; i <= *len; i++) { - gistdentryinit(giststate, &((GISTENTRY *) VARDATA(entryvec))[i], - (((char *) itup[i - 1]) + sizeof(IndexTupleData)), - r, p, i, - IndexTupleSize(itup[i - 1]) - sizeof(IndexTupleData), FALSE); - if ((char *) (((GISTENTRY *) VARDATA(entryvec))[i].pred) - == (((char *) itup[i - 1]) + sizeof(IndexTupleData))) + datum = index_getattr(itup[i - 1], 1, r->rd_att, &IsNull); + gistdentryinit(giststate, 0,&((GISTENTRY *) VARDATA(entryvec))[i], + datum, r, p, i, + ATTSIZE( datum, r, 1, IsNull ), FALSE); + if (((GISTENTRY *) VARDATA(entryvec))[i].key == datum || + DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key) == NULL) decompvec[i] = FALSE; else decompvec[i] = TRUE; } /* now let the user-defined picksplit function set up the split vector */ - FunctionCall2(&giststate->picksplitFn, + FunctionCall2(&giststate->picksplitFn[0], PointerGetDatum(entryvec), PointerGetDatum(&v)); - /* clean up the entry vector: its preds need to be deleted, too */ + /* compatibility with old code */ + if ( v.spl_left[ v.spl_nleft-1 ] == InvalidOffsetNumber ) + v.spl_left[ v.spl_nleft-1 ] = (OffsetNumber)*len; + if ( v.spl_right[ v.spl_nright-1 ] == InvalidOffsetNumber ) + v.spl_right[ v.spl_nright-1 ] = (OffsetNumber)*len; + + v.spl_lattr[0] = v.spl_ldatum; + v.spl_rattr[0] = v.spl_rdatum; + + /* if index is multikey, then we must to try get smaller + * bounding box for subkey(s) + */ + if ( r->rd_att->natts > 1 ) { + v.spl_idgrp = (int*) palloc( sizeof(int) * (*len + 1) ); + MemSet((void*)v.spl_idgrp, 0, sizeof(int) * (*len + 1) ); + v.spl_grpflag = (char*) palloc( sizeof(char) * (*len + 1) ); + MemSet((void*)v.spl_grpflag, 0, sizeof(char) * (*len + 1) ); + v.spl_ngrp = (int*) palloc( sizeof(int) * (*len + 1) ); + + MaxGrpId = gistfindgroup( giststate, (GISTENTRY *) VARDATA(entryvec), &v ); + + /* form union of sub keys for each page (l,p) */ + gistunionsubkey( r, giststate, itup, &v ); + + /* if possible, we insert equivalrnt tuples + * with control by penalty for a subkey(s) + */ + if ( MaxGrpId > 1 ) { + int curlen; + OffsetNumber *curwpos; + bool decfree[INDEX_MAX_KEYS]; + GISTENTRY entry,identry[INDEX_MAX_KEYS], *ev0p, *ev1p; + float lpenalty, rpenalty; + bytea *evec; + int datumsize; + + /* clear vectors */ + curlen = v.spl_nleft; + curwpos = v.spl_left; + for( i=0; ird_att->natts; j++ ) { + gistentryinit(entry,v.spl_lattr[j], r, (Page) NULL, + (OffsetNumber) 0, v.spl_lattrsize[j], FALSE); + FunctionCall3(&giststate->penaltyFn[j], + PointerGetDatum(&entry), + PointerGetDatum(&identry[j]), + PointerGetDatum(&lpenalty)); + + gistentryinit(entry,v.spl_rattr[j], r, (Page) NULL, + (OffsetNumber) 0, v.spl_rattrsize[j], FALSE); + FunctionCall3(&giststate->penaltyFn[j], + PointerGetDatum(&entry), + PointerGetDatum(&identry[j]), + PointerGetDatum(&rpenalty)); + + if ( lpenalty != rpenalty ) + break; + } + } + /* add */ + if ( lpenalty < rpenalty ) { + v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] |= LEFT_ADDED; + v.spl_left[ v.spl_nleft ] = i+1; + v.spl_nleft++; + for( j=1; jrd_att->natts; j++ ) { + gistentryinit(*ev0p, v.spl_lattr[j], r, (Page) NULL, + (OffsetNumber) 0, v.spl_lattrsize[j], FALSE); + gistentryinit(*ev1p, identry[j].key, r, (Page) NULL, + (OffsetNumber) 0, identry[j].bytes, FALSE); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + if ( DatumGetPointer(v.spl_lattr[j]) != NULL ) + pfree( DatumGetPointer(v.spl_lattr[j]) ); + + v.spl_lattr[j] = datum; + v.spl_lattrsize[j] = datumsize; + } + } else { + v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] |= RIGHT_ADDED; + v.spl_right[ v.spl_nright ] = i+1; + v.spl_nright++; + for( j=1; jrd_att->natts; j++ ) { + gistentryinit(*ev0p, v.spl_rattr[j], r, (Page) NULL, + (OffsetNumber) 0, v.spl_rattrsize[j], FALSE); + gistentryinit(*ev1p, identry[j].key, r, (Page) NULL, + (OffsetNumber) 0, identry[j].bytes, FALSE); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + if ( DatumGetPointer(v.spl_rattr[j]) != NULL ) + pfree( DatumGetPointer(v.spl_rattr[j]) ); + + v.spl_rattr[j] = datum; + v.spl_rattrsize[j] = datumsize; + } + + } + gistFreeAtt( r, identry, decfree ); + } + pfree(evec); + } + pfree( v.spl_idgrp ); + pfree( v.spl_grpflag ); + pfree( v.spl_ngrp ); + } + + /* clean up the entry vector: its keys need to be deleted, too */ for (i = 1; i <= *len; i++) - if (decompvec[i] && ((GISTENTRY *) VARDATA(entryvec))[i].pred) - pfree(((GISTENTRY *) VARDATA(entryvec))[i].pred); + if (decompvec[i]) + pfree(DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key)); pfree(entryvec); pfree(decompvec); - spl_left = v.spl_left; - spl_right = v.spl_right; - /* form left and right vector */ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft); rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright); - leftoff = rightoff = 0; - for (i = 1; i <= *len; i++) - { - if (i == *(spl_left) || (i == *len && *(spl_left) != FirstOffsetNumber)) - { - lvectup[leftoff++] = itup[i - 1]; - spl_left++; - } - else - { - rvectup[rightoff++] = itup[i - 1]; - spl_right++; - } - } + + for(i=0; ird_att->natts; j++ ) + if ( DatumGetPointer(v.spl_rattr[j]) != NULL ) + pfree( DatumGetPointer(v.spl_rattr[j]) ); } else { @@ -888,20 +1252,14 @@ gistSplit(Relation r, if (res) ItemPointerSet(&((*res)->pointerData), rbknum, l); - gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - -1, FALSE); - if (v.spl_rdatum != tmpentry.pred) - pfree(v.spl_rdatum); - v.spl_rdatum = tmpentry.pred; nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); - isnull = (v.spl_rdatum) ? ' ' : 'n'; - newtup[0] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_rdatum), &isnull); + newtup[0] = gistFormTuple( giststate, r, v.spl_rattr, v.spl_rattrsize ); ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); } + if (gistnospace(left, lvectup, v.spl_nleft)) { int llen = v.spl_nleft; @@ -911,6 +1269,10 @@ gistSplit(Relation r, (res && lvectup[llen - 1] == itup[*len - 1]) ? res : NULL); ReleaseBuffer(leftbuf); + for( j=1; jrd_att->natts; j++ ) + if ( DatumGetPointer(v.spl_lattr[j]) != NULL ) + pfree( DatumGetPointer(v.spl_lattr[j]) ); + newtup = gistjoinvector(newtup, &nlen, lntup, llen); pfree(lntup); } @@ -926,17 +1288,10 @@ gistSplit(Relation r, if (res) ItemPointerSet(&((*res)->pointerData), lbknum, l); - gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - -1, FALSE); - if (v.spl_ldatum != tmpentry.pred) - pfree(v.spl_ldatum); - v.spl_ldatum = tmpentry.pred; nlen += 1; newtup = (IndexTuple *) repalloc((void *) newtup, sizeof(IndexTuple) * nlen); - isnull = (v.spl_ldatum) ? ' ' : 'n'; - newtup[nlen - 1] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_ldatum), &isnull); + newtup[nlen - 1] = gistFormTuple( giststate, r, v.spl_lattr, v.spl_lattrsize ); ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); } @@ -995,48 +1350,52 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ { OffsetNumber maxoff; OffsetNumber i; - char *id; - char *datum; + Datum datum; float usize; OffsetNumber which; - float which_grow; + float sum_grow, which_grow[INDEX_MAX_KEYS]; GISTENTRY entry, - identry; - int size, - idsize; + identry[INDEX_MAX_KEYS]; + bool IsNull, decompvec[INDEX_MAX_KEYS]; + int j; - idsize = IndexTupleSize(it) - sizeof(IndexTupleData); - id = ((char *) it) + sizeof(IndexTupleData); maxoff = PageGetMaxOffsetNumber(p); - which_grow = -1.0; + *which_grow = -1.0; which = -1; + sum_grow=1; + gistDeCompressAtt( giststate, r, + it, (Page) NULL, (OffsetNumber) 0, + identry, decompvec ); - gistdentryinit(giststate, &identry, id, (Relation) NULL, (Page) NULL, - (OffsetNumber) 0, idsize, FALSE); - - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) { - datum = (char *) PageGetItem(p, PageGetItemId(p, i)); - size = IndexTupleSize(datum) - sizeof(IndexTupleData); - datum += sizeof(IndexTupleData); - gistdentryinit(giststate, &entry, datum, r, p, i, size, FALSE); - FunctionCall3(&giststate->penaltyFn, - PointerGetDatum(&entry), - PointerGetDatum(&identry), - PointerGetDatum(&usize)); - if (which_grow < 0 || usize < which_grow) - { - which = i; - which_grow = usize; - if (which_grow == 0) - break; - } - if (entry.pred && entry.pred != datum) - pfree(entry.pred); - } - if (identry.pred && identry.pred != id) - pfree(identry.pred); + sum_grow=0; + for( j=0; jrd_att->natts; j++ ) { + datum = index_getattr( (IndexTuple)PageGetItem(p, PageGetItemId(p, i)), j+1, r->rd_att, &IsNull); + gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE( datum, r, j+1, IsNull ), FALSE); + FunctionCall3(&giststate->penaltyFn[j], + PointerGetDatum(&entry), + PointerGetDatum(&identry[j]), + PointerGetDatum(&usize)); + if (DatumGetPointer(entry.key) != NULL && entry.key != datum) + pfree(DatumGetPointer(entry.key)); + + if ( which_grow[j]<0 || usize < which_grow[j] ) { + which = i; + which_grow[j] = usize; + if ( jrd_att->natts-1 && i==FirstOffsetNumber ) which_grow[j+1]=-1; + sum_grow += which_grow[j]; + } else if ( which_grow[j] == usize ) { + sum_grow += usize; + } else { + sum_grow=1; + break; + } + } + } + + gistFreeAtt( r, identry, decompvec ); return which; } @@ -1104,21 +1463,28 @@ initGISTstate(GISTSTATE *giststate, Relation index) HeapTuple htup; Form_pg_index itupform; Oid indexrelid; + int i; - consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC); - union_proc = index_getprocid(index, 1, GIST_UNION_PROC); - compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC); - decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC); - penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC); - picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC); - equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC); - fmgr_info(consistent_proc, &giststate->consistentFn); - fmgr_info(union_proc, &giststate->unionFn); - fmgr_info(compress_proc, &giststate->compressFn); - fmgr_info(decompress_proc, &giststate->decompressFn); - fmgr_info(penalty_proc, &giststate->penaltyFn); - fmgr_info(picksplit_proc, &giststate->picksplitFn); - fmgr_info(equal_proc, &giststate->equalFn); + if (index->rd_att->natts >= INDEX_MAX_KEYS) + elog(ERROR, "initGISTstate: numberOfAttributes %d > %d", + index->rd_att->natts, INDEX_MAX_KEYS); + + for(i=0; ird_att->natts; i++) { + consistent_proc = index_getprocid(index, i+1, GIST_CONSISTENT_PROC ); + union_proc = index_getprocid(index, i+1, GIST_UNION_PROC ); + compress_proc = index_getprocid(index, i+1, GIST_COMPRESS_PROC ); + decompress_proc = index_getprocid(index, i+1, GIST_DECOMPRESS_PROC ); + penalty_proc = index_getprocid(index, i+1, GIST_PENALTY_PROC ); + picksplit_proc = index_getprocid(index, i+1, GIST_PICKSPLIT_PROC ); + equal_proc = index_getprocid(index, i+1, GIST_EQUAL_PROC ); + fmgr_info(consistent_proc, &((giststate->consistentFn)[i]) ); + fmgr_info(union_proc, &((giststate->unionFn)[i]) ); + fmgr_info(compress_proc, &((giststate->compressFn)[i]) ); + fmgr_info(decompress_proc, &((giststate->decompressFn)[i]) ); + fmgr_info(penalty_proc, &((giststate->penaltyFn)[i]) ); + fmgr_info(picksplit_proc, &((giststate->picksplitFn)[i]) ); + fmgr_info(equal_proc, &((giststate->equalFn)[i]) ); + } /* see if key type is different from type of attribute being indexed */ htup = SearchSysCache(INDEXRELID, @@ -1149,21 +1515,31 @@ initGISTstate(GISTSTATE *giststate, Relation index) giststate->keytypbyval = FALSE; } - +#ifdef GIST_PAGEADDITEM /* ** Given an IndexTuple to be inserted on a page, this routine replaces ** the key with another key, which may involve generating a new IndexTuple -** if the sizes don't match +** if the sizes don't match or if the null status changes. +** +** XXX this only works for a single-column index tuple! */ static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) { - char *datum = (((char *) t) + sizeof(IndexTupleData)); + bool IsNull; + Datum datum = index_getattr(t, 1, r->rd_att, &IsNull); - /* if new entry fits in index tuple, copy it in */ - if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData) || (Size) entry.bytes == 0) + /* + * If new entry fits in index tuple, copy it in. To avoid worrying + * about null-value bitmask, pass it off to the general index_formtuple + * routine if either the previous or new value is NULL. + */ + if (!IsNull && DatumGetPointer(entry.key) != NULL && + (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull)) { - memcpy(datum, entry.pred, entry.bytes); + memcpy(DatumGetPointer(datum), + DatumGetPointer(entry.key), + entry.bytes); /* clear out old size */ t->t_info &= ~INDEX_SIZE_MASK; /* or in new size */ @@ -1178,65 +1554,131 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) IndexTuple newtup; char isnull; - isnull = (entry.pred) ? ' ' : 'n'; + isnull = DatumGetPointer(entry.key) != NULL ? ' ' : 'n'; newtup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(entry.pred), + &(entry.key), &isnull); newtup->t_tid = t->t_tid; return newtup; } } - +#endif /* -** initialize a GiST entry with a decompressed version of pred +** initialize a GiST entry with a decompressed version of key */ void -gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, - Page pg, OffsetNumber o, int b, bool l) +gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, + Datum k, Relation r, Page pg, OffsetNumber o, + int b, bool l) { GISTENTRY *dep; - gistentryinit(*e, pr, r, pg, o, b, l); + gistentryinit(*e, k, r, pg, o, b, l); if (giststate->haskeytype) { if ( b ) { dep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->decompressFn, + DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], PointerGetDatum(e))); - gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes, + gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, dep->bytes, dep->leafkey); if (dep != e) pfree(dep); } else { - gistentryinit(*e, (char*)NULL, r, pg, o, 0, l); + gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); } } } /* -** initialize a GiST entry with a compressed version of pred +** initialize a GiST entry with a compressed version of key */ static void -gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, +gistcentryinit(GISTSTATE *giststate, int nkey, + GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, int b, bool l) { GISTENTRY *cep; - gistentryinit(*e, pr, r, pg, o, b, l); + gistentryinit(*e, k, r, pg, o, b, l); if (giststate->haskeytype) { cep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->compressFn, + DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], PointerGetDatum(e))); - gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes, - cep->leafkey); + gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, + cep->bytes, cep->leafkey); if (cep != e) pfree(cep); } } +static IndexTuple +gistFormTuple( GISTSTATE *giststate, Relation r, + Datum attdata[], int datumsize[] ) +{ + IndexTuple tup; + char isnull[INDEX_MAX_KEYS]; + bool whatfree[INDEX_MAX_KEYS]; + GISTENTRY centry[INDEX_MAX_KEYS]; + Datum compatt[INDEX_MAX_KEYS]; + int j; + + for (j = 0; j < r->rd_att->natts; j++) { + gistcentryinit(giststate, j, ¢ry[j], attdata[j], + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + datumsize[j], FALSE); + isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n'; + compatt[j] = centry[j].key; + if ( DatumGetPointer(centry[j].key) != NULL ) { + whatfree[j] = TRUE; + if ( centry[j].key != attdata[j] ) + pfree(DatumGetPointer(attdata[j])); + } else + whatfree[j] = FALSE; + } + + tup = (IndexTuple) index_formtuple(r->rd_att, compatt, isnull); + for (j = 0; j < r->rd_att->natts; j++) + if ( whatfree[j] ) pfree(DatumGetPointer(compatt[j])); + + return tup; +} + +static bool +gistDeCompressAtt( GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, + OffsetNumber o, GISTENTRY attdata[], bool decompvec[] ) { + bool allIsNull=true; + bool IsNull; + int i; + Datum datum; + + for(i=0; i < r->rd_att->natts; i++ ) { + datum = index_getattr(tuple, i+1, r->rd_att, &IsNull); + if ( ! IsNull ) allIsNull = false; + gistdentryinit(giststate, i, &attdata[i], + datum, r, p, o, + ATTSIZE( datum, r, i+1, IsNull ), FALSE); + if (attdata[i].key == datum || + DatumGetPointer(attdata[i].key) == NULL ) + decompvec[i] = FALSE; + else + decompvec[i] = TRUE; + } + + return allIsNull; +} + +static void +gistFreeAtt( Relation r, GISTENTRY attdata[], bool decompvec[] ) { + int i; + for(i=0; i < r->rd_att->natts; i++ ) + if ( decompvec[i] && DatumGetPointer(attdata[i].key) != NULL ) + pfree( DatumGetPointer(attdata[i].key) ); +} + #ifdef GISTDEBUG static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) @@ -1261,7 +1703,9 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) maxoff = PageGetMaxOffsetNumber(page); - elog(NOTICE, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred, coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk, (int) maxoff, PageGetFreeSpace(page)); + elog(NOTICE, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred, + coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk, + (int) maxoff, PageGetFreeSpace(page)); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { @@ -1269,7 +1713,8 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) which = (IndexTuple) PageGetItem(page, iid); cblk = ItemPointerGetBlockNumber(&(which->t_tid)); #ifdef PRINTTUPLE - elog(NOTICE, "%s Tuple. blk: %d size: %d", pred, (int) cblk, IndexTupleSize(which)); + elog(NOTICE, "%s Tuple. blk: %d size: %d", pred, (int) cblk, + IndexTupleSize(which)); #endif if (!(opaque->flags & F_LEAF)) diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index f922da750d..028ff601d9 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gistget.c,v 1.27 2001/05/30 19:53:40 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gistget.c,v 1.28 2001/05/31 18:16:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -238,16 +238,21 @@ gistindex_keytest(IndexTuple tuple, while (scanKeySize > 0) { datum = index_getattr(tuple, - 1, + key[0].sk_attno, tupdesc, &isNull); - if (isNull || IndexTupleSize(tuple) == sizeof(IndexTupleData) ) + if (isNull) { /* XXX eventually should check if SK_ISNULL */ return false; } - gistdentryinit(giststate, &de, (char *) datum, r, p, offset, +/* this code from backend/access/common/indexvalid.c. But why and what??? + if (key[0].sk_flags & SK_ISNULL) + return false; +*/ + gistdentryinit(giststate, key[0].sk_attno-1, &de, + datum, r, p, offset, IndexTupleSize(tuple) - sizeof(IndexTupleData), FALSE); @@ -266,16 +271,16 @@ gistindex_keytest(IndexTuple tuple, ObjectIdGetDatum(key[0].sk_procedure)); } - if ( (char*)de.pred != (char*)datum ) - if ( de.pred ) pfree( de.pred ); + if ( de.key != datum ) + if ( DatumGetPointer(de.key) != NULL ) + pfree( DatumGetPointer(de.key) ); if (DatumGetBool(test) == !!(key[0].sk_flags & SK_NEGATE)) return false; - scanKeySize -= 1; + scanKeySize--; key++; } - return true; } @@ -284,7 +289,7 @@ static OffsetNumber gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir) { OffsetNumber maxoff; - char *it; + IndexTuple it; GISTPageOpaque po; GISTScanOpaque so; GISTSTATE *giststate; @@ -307,8 +312,8 @@ gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir) while (n >= FirstOffsetNumber && n <= maxoff) { - it = (char *) PageGetItem(p, PageGetItemId(p, n)); - if (gistindex_keytest((IndexTuple) it, + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + if (gistindex_keytest(it, RelationGetDescr(s->relation), s->numberOfKeys, s->keyData, giststate, s->relation, p, n)) diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c index bcabd6caf2..9ffbd03898 100644 --- a/src/backend/access/gist/gistscan.c +++ b/src/backend/access/gist/gistscan.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gistscan.c,v 1.34 2001/05/30 19:53:40 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gistscan.c,v 1.35 2001/05/31 18:16:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -122,7 +122,7 @@ gistrescan(PG_FUNCTION_ARGS) s->keyData[i].sk_procedure = RelationGetGISTStrategy(s->relation, s->keyData[i].sk_attno, s->keyData[i].sk_procedure); - s->keyData[i].sk_func = p->giststate->consistentFn; + s->keyData[i].sk_func = p->giststate->consistentFn[i]; } } else @@ -153,7 +153,7 @@ gistrescan(PG_FUNCTION_ARGS) s->keyData[i].sk_procedure = RelationGetGISTStrategy(s->relation, s->keyData[i].sk_attno, s->keyData[i].sk_procedure); - s->keyData[i].sk_func = p->giststate->consistentFn; + s->keyData[i].sk_func = p->giststate->consistentFn[i]; } } diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index b48ef92365..3cea6895f3 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/index/indexam.c,v 1.48 2001/03/22 06:16:07 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/index/indexam.c,v 1.49 2001/05/31 18:16:54 tgl Exp $ * * INTERFACE ROUTINES * index_open - open an index relation by relationId @@ -410,13 +410,13 @@ index_getprocid(Relation irel, uint16 procnum) { RegProcedure *loc; - int natts; + int nproc; - natts = irel->rd_rel->relnatts; + nproc = irel->rd_am->amsupport; loc = irel->rd_support; Assert(loc != NULL); - return loc[(natts * (procnum - 1)) + (attnum - 1)]; + return loc[(nproc * (attnum - 1)) + (procnum - 1)]; } diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 213a3cc3ed..373a68fbf3 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.48 2001/05/30 20:52:32 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.49 2001/05/31 18:16:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -120,13 +120,15 @@ DefineIndex(char *heapRelationName, /* * XXX Hardwired hacks to check for limitations on supported index * types. We really ought to be learning this info from entries in the - * pg_am table, instead of having it wired in here! + * pg_am table, instead of having it wired-in here! */ if (unique && accessMethodId != BTREE_AM_OID) elog(ERROR, "DefineIndex: unique indices are only available with the btree access method"); - if (numberOfAttributes > 1 && accessMethodId != BTREE_AM_OID) - elog(ERROR, "DefineIndex: multi-column indices are only available with the btree access method"); + if (numberOfAttributes > 1 && + !( accessMethodId == BTREE_AM_OID || + accessMethodId == GIST_AM_OID)) + elog(ERROR, "DefineIndex: multi-column indices are only available with the btree or GiST access methods"); /* * WITH clause reinstated to handle lossy indices. -- JMH, 7/22/96 diff --git a/src/include/access/gist.h b/src/include/access/gist.h index 694a628c4a..9e8091a8a0 100644 --- a/src/include/access/gist.h +++ b/src/include/access/gist.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: gist.h,v 1.27 2001/05/30 19:53:39 tgl Exp $ + * $Id: gist.h,v 1.28 2001/05/31 18:16:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -65,13 +65,13 @@ typedef struct GISTSTACK typedef struct GISTSTATE { - FmgrInfo consistentFn; - FmgrInfo unionFn; - FmgrInfo compressFn; - FmgrInfo decompressFn; - FmgrInfo penaltyFn; - FmgrInfo picksplitFn; - FmgrInfo equalFn; + FmgrInfo consistentFn[INDEX_MAX_KEYS]; + FmgrInfo unionFn[INDEX_MAX_KEYS]; + FmgrInfo compressFn[INDEX_MAX_KEYS]; + FmgrInfo decompressFn[INDEX_MAX_KEYS]; + FmgrInfo penaltyFn[INDEX_MAX_KEYS]; + FmgrInfo picksplitFn[INDEX_MAX_KEYS]; + FmgrInfo equalFn[INDEX_MAX_KEYS]; bool haskeytype; bool keytypbyval; } GISTSTATE; @@ -121,21 +121,30 @@ typedef struct GIST_SPLITVEC { OffsetNumber *spl_left; /* array of entries that go left */ int spl_nleft; /* size of this array */ - char *spl_ldatum; /* Union of keys in spl_left */ + Datum spl_ldatum; /* Union of keys in spl_left */ + Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in spl_left */ + int spl_lattrsize[INDEX_MAX_KEYS]; + OffsetNumber *spl_right; /* array of entries that go right */ int spl_nright; /* size of the array */ - char *spl_rdatum; /* Union of keys in spl_right */ + Datum spl_rdatum; /* Union of keys in spl_right */ + Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in spl_right */ + int spl_rattrsize[INDEX_MAX_KEYS]; + + int *spl_idgrp; + int *spl_ngrp; /* number in each group */ + char *spl_grpflag; /* flags of each group */ } GIST_SPLITVEC; /* - * An entry on a GiST node. Contains the key (pred), as well as + * An entry on a GiST node. Contains the key, as well as * its own location (rel,page,offset) which can supply the matching - * pointer. The size of the pred is in bytes, and leafkey is a flag to + * pointer. The size of the key is in bytes, and leafkey is a flag to * tell us if the entry is in a leaf node. */ typedef struct GISTENTRY { - char *pred; + Datum key; Relation rel; Page page; OffsetNumber offset; @@ -146,43 +155,20 @@ typedef struct GISTENTRY /* * macro to initialize a GISTENTRY */ -#define gistentryinit(e, pr, r, pg, o, b, l)\ - do {(e).pred = (pr); (e).rel = (r); (e).page = (pg); (e).offset = (o); (e).bytes = (b); (e).leafkey = (l);} while (0) - -/* defined in gist.c */ -#define TRLOWER(tr) (((tr)->bytes)) -#define TRUPPER(tr) (&((tr)->bytes[MAXALIGN(VARSIZE(TRLOWER(tr)))])) - -typedef struct txtrange -{ - int32 vl_len; - /* - * flag: NINF means that lower is negative infinity; PINF means that * - * upper is positive infinity. 0 means that both are numbers. - */ - int32 flag; - char bytes[2]; -} TXTRANGE; - -typedef struct intrange -{ - int lower; - int upper; - /* - * flag: NINF means that lower is negative infinity; PINF means that * - * upper is positive infinity. 0 means that both are numbers. - */ - int flag; -} INTRANGE; +#define gistentryinit(e, k, r, pg, o, b, l) \ + do { (e).key = (k); (e).rel = (r); (e).page = (pg); \ + (e).offset = (o); (e).bytes = (b); (e).leafkey = (l); } while (0) +/* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); extern Datum gistinsert(PG_FUNCTION_ARGS); extern Datum gistdelete(PG_FUNCTION_ARGS); extern void _gistdump(Relation r); extern void gistfreestack(GISTSTACK *s); extern void initGISTstate(GISTSTATE *giststate, Relation index); -extern void gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, - Relation r, Page pg, OffsetNumber o, int b, bool l); +extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, + Datum k, Relation r, Page pg, OffsetNumber o, + int b, bool l); extern StrategyNumber RelationGetGISTStrategy(Relation, AttrNumber, RegProcedure);