From 13923be7c8799c4f8ce0f5a04e4cd06c5b696f25 Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Fri, 10 Aug 2001 14:34:28 +0000 Subject: [PATCH] 1. null-safe interface to GiST (as proposed in http://fts.postgresql.org/db/mw/msg.html?mid=1028327) 2. support for 'pass-by-value' arguments - to test this we used special opclass for int4 with values in range [0-2^15] More testing will be done after resolving problem with index_formtuple and implementation of B-tree using GiST 3. small patch to contrib modules (seg,cube,rtree_gist,intarray) - mark functions as 'isstrict' where needed. Oleg Bartunov --- contrib/cube/cube.sql.in | 32 +- contrib/intarray/_int.sql.in | 16 +- contrib/rtree_gist/rtree_gist.sql.in | 2 +- contrib/seg/seg.sql.in | 38 +- src/backend/access/gist/gist.c | 744 +++++++++++++++------------ src/backend/access/gist/gistget.c | 4 +- src/include/access/gist.h | 7 +- 7 files changed, 474 insertions(+), 369 deletions(-) diff --git a/contrib/cube/cube.sql.in b/contrib/cube/cube.sql.in index b67ee62260..ba616b5722 100644 --- a/contrib/cube/cube.sql.in +++ b/contrib/cube/cube.sql.in @@ -28,25 +28,25 @@ COMMENT ON TYPE cube IS -- Left/Right methods CREATE FUNCTION cube_over_left(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_over_left(cube, cube) IS 'is over and left of (NOT IMPLEMENTED)'; CREATE FUNCTION cube_over_right(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_over_right(cube, cube) IS 'is over and right of (NOT IMPLEMENTED)'; CREATE FUNCTION cube_left(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_left(cube, cube) IS 'is left of (NOT IMPLEMENTED)'; CREATE FUNCTION cube_right(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_right(cube, cube) IS 'is right of (NOT IMPLEMENTED)'; @@ -55,43 +55,43 @@ COMMENT ON FUNCTION cube_right(cube, cube) IS -- Comparison methods CREATE FUNCTION cube_lt(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_lt(cube, cube) IS 'lower than'; CREATE FUNCTION cube_gt(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_gt(cube, cube) IS 'greater than'; CREATE FUNCTION cube_contains(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_contains(cube, cube) IS 'contains'; CREATE FUNCTION cube_contained(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_contained(cube, cube) IS 'contained in'; CREATE FUNCTION cube_overlap(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_overlap(cube, cube) IS 'overlaps'; CREATE FUNCTION cube_same(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_same(cube, cube) IS 'same as'; CREATE FUNCTION cube_different(cube, cube) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION cube_different(cube, cube) IS 'different'; @@ -99,13 +99,13 @@ COMMENT ON FUNCTION cube_different(cube, cube) IS -- support routines for indexing CREATE FUNCTION cube_union(cube, cube) RETURNS cube - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION cube_inter(cube, cube) RETURNS cube - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION cube_size(cube) RETURNS float4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); -- Misc N-dimensional functions @@ -113,7 +113,7 @@ CREATE FUNCTION cube_size(cube) RETURNS float4 -- proximity routines CREATE FUNCTION cube_distance(cube, cube) RETURNS float4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); -- @@ -199,7 +199,7 @@ CREATE FUNCTION g_cube_decompress(opaque) RETURNS opaque AS 'MODULE_PATHNAME' LANGUAGE 'c'; CREATE FUNCTION g_cube_penalty(opaque,opaque,opaque) RETURNS opaque - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION g_cube_picksplit(opaque, opaque) RETURNS opaque AS 'MODULE_PATHNAME' LANGUAGE 'c'; diff --git a/contrib/intarray/_int.sql.in b/contrib/intarray/_int.sql.in index 2214bef21f..a07013337b 100644 --- a/contrib/intarray/_int.sql.in +++ b/contrib/intarray/_int.sql.in @@ -9,7 +9,7 @@ BEGIN TRANSACTION; -- Comparison methods CREATE FUNCTION _int_contains(_int4, _int4) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); INSERT INTO pg_description (objoid, description) SELECT oid, 'contains'::text @@ -17,7 +17,7 @@ INSERT INTO pg_description (objoid, description) WHERE proname = '_int_contains'::name; CREATE FUNCTION _int_contained(_int4, _int4) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); INSERT INTO pg_description (objoid, description) SELECT oid, 'contained in'::text @@ -25,7 +25,7 @@ INSERT INTO pg_description (objoid, description) WHERE proname = '_int_contained'::name; CREATE FUNCTION _int_overlap(_int4, _int4) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); INSERT INTO pg_description (objoid, description) SELECT oid, 'overlaps'::text @@ -33,7 +33,7 @@ INSERT INTO pg_description (objoid, description) WHERE proname = '_int_overlap'::name; CREATE FUNCTION _int_same(_int4, _int4) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); INSERT INTO pg_description (objoid, description) SELECT oid, 'same as'::text @@ -41,7 +41,7 @@ INSERT INTO pg_description (objoid, description) WHERE proname = '_int_same'::name; CREATE FUNCTION _int_different(_int4, _int4) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); INSERT INTO pg_description (objoid, description) SELECT oid, 'different'::text @@ -51,10 +51,10 @@ INSERT INTO pg_description (objoid, description) -- support routines for indexing CREATE FUNCTION _int_union(_int4, _int4) RETURNS _int4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION _int_inter(_int4, _int4) RETURNS _int4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); -- -- OPERATORS @@ -223,7 +223,7 @@ CREATE FUNCTION g_intbig_decompress(opaque) RETURNS opaque AS 'MODULE_PATHNAME' LANGUAGE 'c'; CREATE FUNCTION g_intbig_penalty(opaque,opaque,opaque) RETURNS opaque - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION g_intbig_picksplit(opaque, opaque) RETURNS opaque AS 'MODULE_PATHNAME' LANGUAGE 'c'; diff --git a/contrib/rtree_gist/rtree_gist.sql.in b/contrib/rtree_gist/rtree_gist.sql.in index 829ddd6468..68fe8b5edb 100644 --- a/contrib/rtree_gist/rtree_gist.sql.in +++ b/contrib/rtree_gist/rtree_gist.sql.in @@ -13,7 +13,7 @@ create function gbox_compress(opaque) returns opaque as 'MODULE_PATHNAME' langua create function rtree_decompress(opaque) returns opaque as 'MODULE_PATHNAME' language 'C'; -create function gbox_penalty(opaque,opaque,opaque) returns opaque as 'MODULE_PATHNAME' language 'C'; +create function gbox_penalty(opaque,opaque,opaque) returns opaque as 'MODULE_PATHNAME' language 'C' with (isstrict); create function gbox_picksplit(opaque, opaque) returns opaque as 'MODULE_PATHNAME' language 'C'; diff --git a/contrib/seg/seg.sql.in b/contrib/seg/seg.sql.in index 387989f742..f2c3f834c8 100644 --- a/contrib/seg/seg.sql.in +++ b/contrib/seg/seg.sql.in @@ -28,25 +28,25 @@ COMMENT ON TYPE seg IS -- Left/Right methods CREATE FUNCTION seg_over_left(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_over_left(seg, seg) IS 'is over and left of'; CREATE FUNCTION seg_over_right(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_over_right(seg, seg) IS 'is over and right of'; CREATE FUNCTION seg_left(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_left(seg, seg) IS 'is left of'; CREATE FUNCTION seg_right(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_right(seg, seg) IS 'is right of'; @@ -55,55 +55,55 @@ COMMENT ON FUNCTION seg_right(seg, seg) IS -- Comparison methods CREATE FUNCTION seg_lt(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_lt(seg, seg) IS 'less than'; CREATE FUNCTION seg_le(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_le(seg, seg) IS 'less than or equal'; CREATE FUNCTION seg_gt(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_gt(seg, seg) IS 'greater than'; CREATE FUNCTION seg_ge(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_ge(seg, seg) IS 'greater than or equal'; CREATE FUNCTION seg_contains(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_contains(seg, seg) IS 'contains'; CREATE FUNCTION seg_contained(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_contained(seg, seg) IS 'contained in'; CREATE FUNCTION seg_overlap(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_overlap(seg, seg) IS 'overlaps'; CREATE FUNCTION seg_same(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_same(seg, seg) IS 'same as'; CREATE FUNCTION seg_different(seg, seg) RETURNS bool - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); COMMENT ON FUNCTION seg_different(seg, seg) IS 'different'; @@ -111,21 +111,21 @@ COMMENT ON FUNCTION seg_different(seg, seg) IS -- support routines for indexing CREATE FUNCTION seg_union(seg, seg) RETURNS seg - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION seg_inter(seg, seg) RETURNS seg - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION seg_size(seg) RETURNS float4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); -- miscellaneous CREATE FUNCTION seg_upper(seg) RETURNS float4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION seg_lower(seg) RETURNS float4 - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); -- @@ -223,7 +223,7 @@ CREATE FUNCTION gseg_decompress(opaque) RETURNS opaque AS 'MODULE_PATHNAME' LANGUAGE 'c'; CREATE FUNCTION gseg_penalty(opaque,opaque,opaque) RETURNS opaque - AS 'MODULE_PATHNAME' LANGUAGE 'c'; + AS 'MODULE_PATHNAME' LANGUAGE 'c' with (isstrict); CREATE FUNCTION gseg_picksplit(opaque, opaque) RETURNS opaque AS 'MODULE_PATHNAME' LANGUAGE 'c'; diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index c99c4a7e6e..4d0cbb6d9c 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.80 2001/07/15 22:48:15 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.81 2001/08/10 14:34:28 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -43,6 +43,24 @@ #define RIGHT_ADDED 0x02 #define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) +/* + * This defines only for shorter code, used in gistgetadjusted + * and gistadjsubkey only + */ +#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ + if ( isnullkey ) { \ + gistentryinit((evp), rkey, r, (Page) NULL , \ + (OffsetNumber) 0, rkeyb, FALSE); \ + } else { \ + gistentryinit((evp), okey, r, (Page) NULL, \ + (OffsetNumber) 0, okeyb, FALSE); \ + } \ +} while(0) + +#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ + FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ + FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ +} while(0); /* Working state for gistbuild and its callback */ typedef struct @@ -91,8 +109,12 @@ static IndexTuple gistgetadjusted(Relation r, GISTSTATE *giststate); static int gistfindgroup( GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC * spl ); +static void gistadjsubkey(Relation r, + IndexTuple *itup, int *len, + GIST_SPLITVEC *v, + GISTSTATE *giststate); static IndexTuple gistFormTuple( GISTSTATE *giststate, - Relation r, Datum attdata[], int datumsize[] ); + Relation r, Datum attdata[], int datumsize[], bool isnull[] ); static IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, @@ -113,11 +135,15 @@ static IndexTuple gist_tuple_replacekey(Relation r, static void gistcentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, - OffsetNumber o, int b, bool l); -static bool gistDeCompressAtt( GISTSTATE *giststate, Relation r, + OffsetNumber o, int b, bool l, bool isNull); +static void gistDeCompressAtt( GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, OffsetNumber o, - GISTENTRY attdata[], bool decompvec[] ); + GISTENTRY attdata[], bool decompvec[], bool isnull[] ); static void gistFreeAtt( Relation r, GISTENTRY attdata[], bool decompvec[] ); +static void gistpenalty( GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2, + float *penalty ); #undef GISTDEBUG #ifdef GISTDEBUG @@ -210,38 +236,43 @@ gistbuildCallback(Relation index, GISTENTRY tmpcentry; int i; + /* GiST cannot index tuples with leading NULLs */ + if ( nulls[0] == 'n' ) + return; + /* immediately compress keys to normalize */ for (i = 0; i < buildstate->numindexattrs; i++) { - gistcentryinit(&buildstate->giststate, i, &tmpcentry, attdata[i], - (Relation) NULL, (Page) NULL, (OffsetNumber) 0, - -1 /* size is currently bogus */ , TRUE); - if (attdata[i] != tmpcentry.key && - !(buildstate->giststate.keytypbyval)) - compvec[i] = TRUE; - else + if ( nulls[i]=='n' ) { + attdata[i] = (Datum) 0; compvec[i] = FALSE; - attdata[i] = tmpcentry.key; + } else { + gistcentryinit(&buildstate->giststate, i, &tmpcentry, attdata[i], + (Relation) NULL, (Page) NULL, (OffsetNumber) 0, + -1 /* size is currently bogus */ , TRUE, FALSE); + if (attdata[i] != tmpcentry.key && + !(buildstate->giststate.attbyval[i])) + compvec[i] = TRUE; + else + compvec[i] = FALSE; + attdata[i] = tmpcentry.key; + } } /* form an index tuple and point it at the heap tuple */ itup = index_formtuple(RelationGetDescr(index), attdata, nulls); itup->t_tid = htup->t_self; - /* GIST indexes don't index nulls, see notes in gistinsert */ - if (! IndexTupleHasNulls(itup)) - { - /* - * Since we already have the index relation locked, we call - * gistdoinsert directly. Normal access method calls dispatch - * through gistinsert, which locks the relation for write. This - * is the right thing to do if you're inserting single tups, but - * not when you're initializing the whole index at once. - */ - gistdoinsert(index, itup, NULL, &buildstate->giststate); + /* + * Since we already have the index relation locked, we call + * gistdoinsert directly. Normal access method calls dispatch + * through gistinsert, which locks the relation for write. This + * is the right thing to do if you're inserting single tups, but + * not when you're initializing the whole index at once. + */ + gistdoinsert(index, itup, NULL, &buildstate->giststate); - buildstate->indtuples += 1; - } + buildstate->indtuples += 1; for (i = 0; i < buildstate->numindexattrs; i++) if (compvec[i]) @@ -279,37 +310,36 @@ gistinsert(PG_FUNCTION_ARGS) * here. */ + /* GiST cannot index tuples with leading NULLs */ + if ( nulls[0] == 'n' ) { + res = NULL; + PG_RETURN_POINTER(res); + } + initGISTstate(&giststate, r); /* immediately compress keys to normalize */ for (i = 0; i < r->rd_att->natts; i++) { - gistcentryinit(&giststate, i, &tmpentry, datum[i], - (Relation) NULL, (Page) NULL, (OffsetNumber) 0, - -1 /* size is currently bogus */ , TRUE); - if (datum[i] != tmpentry.key && !(giststate.keytypbyval)) - compvec[i] = TRUE; - else + if ( nulls[i]=='n' ) { + datum[i] = (Datum) 0; compvec[i] = FALSE; - datum[i] = tmpentry.key; + } else { + gistcentryinit(&giststate, i, &tmpentry, datum[i], + (Relation) NULL, (Page) NULL, (OffsetNumber) 0, + -1 /* size is currently bogus */ , TRUE, FALSE ); + if (datum[i] != tmpentry.key && !(giststate.attbyval[i])) + compvec[i] = TRUE; + else + compvec[i] = FALSE; + datum[i] = tmpentry.key; + } } itup = index_formtuple(RelationGetDescr(r), datum, nulls); itup->t_tid = *ht_ctid; - /* - * Currently, GIST indexes do not support indexing NULLs; considerable - * infrastructure work would have to be done to do anything reasonable - * with a NULL. - */ - if (IndexTupleHasNulls(itup)) - { - res = NULL; - } - else - { - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - gistdoinsert(r, itup, &res, &giststate); - } + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + gistdoinsert(r, itup, &res, &giststate); for (i = 0; i < r->rd_att->natts; i++) if (compvec[i] == TRUE) @@ -352,7 +382,7 @@ gistPageAddItem(GISTSTATE *giststate, (Relation) 0, (Page) 0, (OffsetNumber) InvalidOffsetNumber, ATTSIZE( datum, r, 1, IsNull ), - FALSE); + FALSE, IsNull); gistcentryinit(giststate, 0,&tmpcentry, dentry->key, r, page, offsetNumber, dentry->bytes, FALSE); *newtup = gist_tuple_replacekey(r, tmpcentry, itup); @@ -623,45 +653,64 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { bool *needfree; IndexTuple newtup; bool IsNull; + int reallen; - needfree = (bool *) palloc(len * sizeof(bool)); - evec = (bytea *) palloc(len * sizeof(GISTENTRY) + VARHDRSZ); - VARATT_SIZEP(evec) = len * sizeof(GISTENTRY) + VARHDRSZ; + needfree = (bool *) palloc((( len==1 ) ? 2 : len ) * sizeof(bool)); + evec = (bytea *) palloc((( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ); - for (j = 0; j < r->rd_att->natts; j++) { + for (j = 0; j < r->rd_att->natts; j++) { + reallen=0; for (i = 0; i < len; i++) { datum = index_getattr(itvec[i], j+1, r->rd_att, &IsNull); + if ( IsNull ) continue; + gistdentryinit(giststate, j, - &((GISTENTRY *) VARDATA(evec))[i], + &((GISTENTRY *) VARDATA(evec))[reallen], datum, (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - ATTSIZE( datum, r, j+1, IsNull ), FALSE); - if ( DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key) != NULL - && ((GISTENTRY *) VARDATA(evec))[i].key != datum ) - needfree[i] = TRUE; + ATTSIZE( datum, r, j+1, IsNull ), FALSE, IsNull); + if ( (!giststate->attbyval[j]) && + ((GISTENTRY *) VARDATA(evec))[reallen].key != datum ) + needfree[reallen] = TRUE; else - needfree[i] = FALSE; + needfree[reallen] = FALSE; + reallen++; } - datum = FunctionCall2(&giststate->unionFn[j], + if ( reallen == 0 ) { + attr[j] = (Datum) 0; + isnull[j] = 'n'; + whatfree[j] = FALSE; + } else { + if ( reallen == 1 ) { + VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ; + gistentryinit(((GISTENTRY *) VARDATA(evec))[1], + ((GISTENTRY *) VARDATA(evec))[0].key, r, (Page) NULL, + (OffsetNumber) 0, ((GISTENTRY *) VARDATA(evec))[0].bytes, FALSE); + + } else { + VARATT_SIZEP(evec) = reallen * sizeof(GISTENTRY) + VARHDRSZ; + } + datum = FunctionCall2(&giststate->unionFn[j], PointerGetDatum(evec), PointerGetDatum(&datumsize)); + + for (i = 0; i < reallen; i++) + if ( needfree[i] ) + pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key)); - for (i = 0; i < len; i++) - if ( needfree[i] ) - pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key)); - - gistcentryinit(giststate, j, ¢ry[j], datum, - (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - datumsize, FALSE); - isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n'; - attr[j] = centry[j].key; - if ( DatumGetPointer(centry[j].key) != NULL ) { - whatfree[j] = TRUE; - if ( centry[j].key != datum ) - pfree(DatumGetPointer(datum)); - } else - whatfree[j] = FALSE; + gistcentryinit(giststate, j, ¢ry[j], datum, + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + datumsize, FALSE, FALSE); + isnull[j] = ' '; + attr[j] = centry[j].key; + if ( !giststate->attbyval[j] ) { + whatfree[j] = TRUE; + if ( centry[j].key != datum ) + pfree(DatumGetPointer(datum)); + } else + whatfree[j] = FALSE; + } } pfree(evec); @@ -675,6 +724,7 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { return newtup; } + /* * Forms union of oldtup and addtup, if union == oldtup then return NULL */ @@ -695,6 +745,8 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis *ev1p; bool olddec[INDEX_MAX_KEYS], adddec[INDEX_MAX_KEYS]; + bool oldisnull[INDEX_MAX_KEYS], + addisnull[INDEX_MAX_KEYS]; IndexTuple newtup = NULL; int j; @@ -705,54 +757,56 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis ev1p = &((GISTENTRY *) VARDATA(evec))[1]; gistDeCompressAtt( giststate, r, oldtup, (Page) NULL, - (OffsetNumber) 0, oldatt, olddec); + (OffsetNumber) 0, oldatt, olddec, oldisnull); gistDeCompressAtt( giststate, r, addtup, (Page) NULL, - (OffsetNumber) 0, addatt, adddec); + (OffsetNumber) 0, addatt, adddec, addisnull); for( j=0; jrd_att->natts; j++ ) { - gistentryinit(*ev0p, oldatt[j].key, r, (Page) NULL, - (OffsetNumber) 0, oldatt[j].bytes, FALSE); - gistentryinit(*ev1p, addatt[j].key, r, (Page) NULL, - (OffsetNumber) 0, addatt[j].bytes, FALSE); + if ( oldisnull[j] && addisnull[j] ) { + isnull[j] = 'n'; + attr[j] = (Datum) 0; + whatfree[j] = FALSE; + } else { + FILLEV( + oldisnull[j], oldatt[j].key, oldatt[j].bytes, + addisnull[j], addatt[j].key, addatt[j].bytes + ); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - if (!(DatumGetPointer(ev0p->key) != NULL && - DatumGetPointer(ev1p->key) != NULL)) - result = (DatumGetPointer(ev0p->key) == NULL && - DatumGetPointer(ev1p->key) == NULL); - else - { - FunctionCall3(&giststate->equalFn[j], + if ( oldisnull[j] || addisnull[j] ) { + if ( oldisnull[j] ) + neednew = true; + } else { + FunctionCall3(&giststate->equalFn[j], ev0p->key, datum, PointerGetDatum(&result)); - } - if ( !result ) - neednew = true; + + if ( !result ) + neednew = true; + } - if ( olddec[j] && DatumGetPointer(oldatt[j].key) != NULL ) - pfree( DatumGetPointer(oldatt[j].key) ); - if ( adddec[j] && DatumGetPointer(addatt[j].key) != NULL ) - pfree( DatumGetPointer(addatt[j].key) ); + if ( olddec[j] ) pfree( DatumGetPointer(oldatt[j].key) ); + if ( adddec[j] ) pfree( DatumGetPointer(addatt[j].key) ); - gistcentryinit(giststate, j, ¢ry[j], datum, - (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - datumsize, FALSE); + gistcentryinit(giststate, j, ¢ry[j], datum, + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + datumsize, FALSE, FALSE); - isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n'; - attr[j] = centry[j].key; - if ( DatumGetPointer(centry[j].key) != NULL ) { - whatfree[j] = TRUE; - if ( centry[j].key != datum ) - pfree(DatumGetPointer(datum)); - } else - whatfree[j] = FALSE; - + isnull[j] = ' '; + attr[j] = centry[j].key; + if ( (!giststate->attbyval[j]) ) { + whatfree[j] = TRUE; + if ( centry[j].key != datum ) + pfree(DatumGetPointer(datum)); + } else + whatfree[j] = FALSE; + } } pfree(evec); @@ -779,6 +833,8 @@ gistunionsubkey( Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLIT bytea *evec; Datum datum; int datumsize; + int reallen; + bool *isnull; for(lr=0;lr<=1;lr++) { if ( lr ) { @@ -786,59 +842,64 @@ gistunionsubkey( Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLIT attr = spl->spl_lattr; len = spl->spl_nleft; entries = spl->spl_left; + isnull = spl->spl_lisnull; } else { attrsize = spl->spl_rattrsize; attr = spl->spl_rattr; len = spl->spl_nright; entries = spl->spl_right; + isnull = spl->spl_risnull; } needfree = (bool *) palloc( (( len==1 ) ? 2 : len ) * sizeof(bool)); evec = (bytea *) palloc( (( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ); - VARATT_SIZEP(evec) = (( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ; for (j = 1; j < r->rd_att->natts; j++) { + reallen = 0; for (i = 0; i < len; i++) { if ( spl->spl_idgrp[ entries[i] ] ) - { - datum = (Datum) 0; - IsNull = true; - } else - datum = index_getattr(itvec[ entries[i]-1 ], j+1, - r->rd_att, &IsNull); + continue; + datum = index_getattr(itvec[ entries[i]-1 ], j+1, + r->rd_att, &IsNull); + if ( IsNull ) + continue; gistdentryinit(giststate, j, - &((GISTENTRY *) VARDATA(evec))[i], + &((GISTENTRY *) VARDATA(evec))[reallen], datum, (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - ATTSIZE( datum, r, j+1, IsNull ), FALSE); - if ( DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key) != NULL && - ((GISTENTRY *) VARDATA(evec))[i].key != datum ) - needfree[i] = TRUE; + ATTSIZE( datum, r, j+1, IsNull ), FALSE, IsNull); + if ( (!giststate->attbyval[j]) && + ((GISTENTRY *) VARDATA(evec))[reallen].key != datum ) + needfree[reallen] = TRUE; else - needfree[i] = FALSE; + needfree[reallen] = FALSE; + reallen++; } - if ( len == 1 && - DatumGetPointer(((GISTENTRY *) VARDATA(evec))[0].key) == NULL) + if ( reallen == 0 ) { datum = (Datum) 0; datumsize = 0; + isnull[j] = true; } else { /* * ((GISTENTRY *) VARDATA(evec))[0].bytes may be not defined, * so form union with itself */ - if ( len == 1 ) { + if ( reallen == 1 ) { + VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ; memcpy( (void*) &((GISTENTRY *) VARDATA(evec))[1], (void*) &((GISTENTRY *) VARDATA(evec))[0], sizeof( GISTENTRY ) ); - } + } else + VARATT_SIZEP(evec) = reallen * sizeof(GISTENTRY) + VARHDRSZ; datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + isnull[j] = false; } - for (i = 0; i < len; i++) + for (i = 0; i < reallen; i++) if ( needfree[i] ) pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key)); @@ -859,22 +920,21 @@ gistfindgroup( GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC * spl ) { int curid = 1; bool result; + /* + * first key is always not null (see gistinsert), + * so we may not check for nulls + */ + for(i=0; ispl_nleft; i++) { if ( spl->spl_idgrp[ spl->spl_left[i] ]) continue; len = 0; /* find all equal value in right part */ for(j=0; j < spl->spl_nright; j++) { if ( spl->spl_idgrp[ spl->spl_right[j] ]) continue; - if (!(DatumGetPointer(valvec[ spl->spl_left[i] ].key) != NULL && - DatumGetPointer(valvec[ spl->spl_right[j]].key) != NULL)) - result = - DatumGetPointer(valvec[ spl->spl_left[i] ].key) == NULL && - DatumGetPointer(valvec[ spl->spl_right[j]].key) == NULL; - else - FunctionCall3(&giststate->equalFn[0], - valvec[ spl->spl_left[i] ].key, - valvec[ spl->spl_right[j] ].key, - PointerGetDatum(&result)); + FunctionCall3(&giststate->equalFn[0], + valvec[ spl->spl_left[i] ].key, + valvec[ spl->spl_right[j] ].key, + PointerGetDatum(&result)); if ( result ) { spl->spl_idgrp[ spl->spl_right[j] ] = curid; len++; @@ -887,16 +947,10 @@ gistfindgroup( GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC * spl ) { /* searching .. */ for(j=i+1; j < spl->spl_nleft; j++) { if ( spl->spl_idgrp[ spl->spl_left[j] ]) continue; - if (!(DatumGetPointer(valvec[ spl->spl_left[i]].key) != NULL && - DatumGetPointer(valvec[ spl->spl_left[j]].key) != NULL)) - result = - DatumGetPointer(valvec[ spl->spl_left[i]].key) == NULL && - DatumGetPointer(valvec[ spl->spl_left[j]].key) == NULL; - else - FunctionCall3(&giststate->equalFn[0], - valvec[ spl->spl_left[i] ].key, - valvec[ spl->spl_left[j] ].key, - PointerGetDatum(&result)); + FunctionCall3(&giststate->equalFn[0], + valvec[ spl->spl_left[i] ].key, + valvec[ spl->spl_left[j] ].key, + PointerGetDatum(&result)); if ( result ) { spl->spl_idgrp[ spl->spl_left[j] ] = curid; len++; @@ -910,6 +964,144 @@ gistfindgroup( GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC * spl ) { return curid; } +/* + * Insert equivalent tuples to left or right page + * with minimize penalty + */ +static void +gistadjsubkey(Relation r, + IndexTuple *itup, /* contains compressed entry */ + int *len, + GIST_SPLITVEC *v, + GISTSTATE *giststate + ) { + int curlen; + OffsetNumber *curwpos; + bool decfree[INDEX_MAX_KEYS]; + GISTENTRY entry,identry[INDEX_MAX_KEYS], *ev0p, *ev1p; + float lpenalty, rpenalty; + bytea *evec; + int datumsize; + bool isnull[INDEX_MAX_KEYS]; + int i,j; + Datum datum; + + /* clear vectors */ + curlen = v->spl_nleft; + curwpos = v->spl_left; + for( i=0; ispl_nleft;i++ ) + if ( v->spl_idgrp[ v->spl_left[i] ] == 0 ) { + *curwpos = v->spl_left[i]; + curwpos++; + } else + curlen--; + v->spl_nleft = curlen; + + curlen = v->spl_nright; + curwpos = v->spl_right; + for( i=0; ispl_nright;i++ ) + if ( v->spl_idgrp[ v->spl_right[i] ] == 0 ) { + *curwpos = v->spl_right[i]; + curwpos++; + } else + curlen--; + v->spl_nright = curlen; + + evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ); + VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ; + ev0p = &((GISTENTRY *) VARDATA(evec))[0]; + ev1p = &((GISTENTRY *) VARDATA(evec))[1]; + + /* add equivalent tuple */ + for(i = 0; i< *len; i++) { + if ( v->spl_idgrp[ i+1 ]==0 ) /* already inserted */ + continue; + gistDeCompressAtt( giststate, r, itup[i], (Page) NULL, (OffsetNumber) 0, + identry, decfree, isnull); + + v->spl_ngrp[ v->spl_idgrp[ i+1 ] ]--; + if ( v->spl_ngrp[ v->spl_idgrp[ i+1 ] ] == 0 && + (v->spl_grpflag[ v->spl_idgrp[ i+1 ] ] & BOTH_ADDED) != BOTH_ADDED ) { + + /* force last in group */ + rpenalty = 1.0; + lpenalty = ( v->spl_grpflag[ v->spl_idgrp[ i+1 ] ] & LEFT_ADDED ) ? 2.0 : 0.0; + } else { + /*where?*/ + for( j=1; jrd_att->natts; j++ ) { + gistentryinit(entry,v->spl_lattr[j], r, (Page) NULL, + (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); + gistpenalty( giststate, j, &entry, v->spl_lisnull[j], + &identry[j], isnull[j], &lpenalty); + + gistentryinit(entry,v->spl_rattr[j], r, (Page) NULL, + (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); + gistpenalty( giststate, j, &entry, v->spl_risnull[j], + &identry[j], isnull[j], &rpenalty); + + if ( lpenalty != rpenalty ) + break; + } + } + /* add */ + if ( lpenalty < rpenalty ) { + v->spl_grpflag[ v->spl_idgrp[ i+1 ] ] |= LEFT_ADDED; + v->spl_left[ v->spl_nleft ] = i+1; + v->spl_nleft++; + for( j=1; jrd_att->natts; j++ ) { + if ( isnull[j] && v->spl_lisnull[j] ) { + v->spl_lattr[j] = (Datum) 0; + v->spl_lattrsize[j] = 0; + } else { + FILLEV( + v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], + isnull[j], identry[j].key, identry[j].bytes + ); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + if ( (!giststate->attbyval[j]) && !v->spl_lisnull[j] ) + pfree( DatumGetPointer(v->spl_lattr[j]) ); + v->spl_lattr[j] = datum; + v->spl_lattrsize[j] = datumsize; + v->spl_lisnull[j] = false; + } + } + } else { + v->spl_grpflag[ v->spl_idgrp[ i+1 ] ] |= RIGHT_ADDED; + v->spl_right[ v->spl_nright ] = i+1; + v->spl_nright++; + for( j=1; jrd_att->natts; j++ ) { + if ( isnull[j] && v->spl_risnull[j] ) { + v->spl_rattr[j] = (Datum) 0; + v->spl_rattrsize[j] = 0; + } else { + FILLEV( + v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], + isnull[j], identry[j].key, identry[j].bytes + ); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + if ( (!giststate->attbyval[j]) && !v->spl_risnull[j] ) + pfree( DatumGetPointer(v->spl_rattr[j]) ); + + v->spl_rattr[j] = datum; + v->spl_rattrsize[j] = datumsize; + v->spl_risnull[j] = false; + } + } + + } + gistFreeAtt( r, identry, decfree ); + } + pfree(evec); +} + /* * gistSplit -- split a page in the tree. */ @@ -980,15 +1172,15 @@ gistSplit(Relation r, datum = index_getattr(itup[i - 1], 1, r->rd_att, &IsNull); gistdentryinit(giststate, 0,&((GISTENTRY *) VARDATA(entryvec))[i], datum, r, p, i, - ATTSIZE( datum, r, 1, IsNull ), FALSE); - if (((GISTENTRY *) VARDATA(entryvec))[i].key == datum || - DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key) == NULL) - decompvec[i] = FALSE; - else + ATTSIZE( datum, r, 1, IsNull ), FALSE, IsNull); + if ( (!giststate->attbyval[0]) && ((GISTENTRY *) VARDATA(entryvec))[i].key != datum ) decompvec[i] = TRUE; + else + decompvec[i] = FALSE; } - /* now let the user-defined picksplit function set up the split vector */ + /* now let the user-defined picksplit function set up the split vector; + in entryvec have no null value!! */ FunctionCall2(&giststate->picksplitFn[0], PointerGetDatum(entryvec), PointerGetDatum(&v)); @@ -1001,6 +1193,8 @@ gistSplit(Relation r, v.spl_lattr[0] = v.spl_ldatum; v.spl_rattr[0] = v.spl_rdatum; + v.spl_lisnull[0] = false; + v.spl_risnull[0] = false; /* if index is multikey, then we must to try get smaller * bounding box for subkey(s) @@ -1017,126 +1211,12 @@ gistSplit(Relation r, /* form union of sub keys for each page (l,p) */ gistunionsubkey( r, giststate, itup, &v ); - /* if possible, we insert equivalrnt tuples + /* if possible, we insert equivalent tuples * with control by penalty for a subkey(s) */ - if ( MaxGrpId > 1 ) { - int curlen; - OffsetNumber *curwpos; - bool decfree[INDEX_MAX_KEYS]; - GISTENTRY entry,identry[INDEX_MAX_KEYS], *ev0p, *ev1p; - float lpenalty, rpenalty; - bytea *evec; - int datumsize; + if ( MaxGrpId > 1 ) + gistadjsubkey( r,itup,len, &v, giststate ); - /* clear vectors */ - curlen = v.spl_nleft; - curwpos = v.spl_left; - for( i=0; ird_att->natts; j++ ) { - gistentryinit(entry,v.spl_lattr[j], r, (Page) NULL, - (OffsetNumber) 0, v.spl_lattrsize[j], FALSE); - FunctionCall3(&giststate->penaltyFn[j], - PointerGetDatum(&entry), - PointerGetDatum(&identry[j]), - PointerGetDatum(&lpenalty)); - - gistentryinit(entry,v.spl_rattr[j], r, (Page) NULL, - (OffsetNumber) 0, v.spl_rattrsize[j], FALSE); - FunctionCall3(&giststate->penaltyFn[j], - PointerGetDatum(&entry), - PointerGetDatum(&identry[j]), - PointerGetDatum(&rpenalty)); - - if ( lpenalty != rpenalty ) - break; - } - } - /* add */ - if ( lpenalty < rpenalty ) { - v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] |= LEFT_ADDED; - v.spl_left[ v.spl_nleft ] = i+1; - v.spl_nleft++; - for( j=1; jrd_att->natts; j++ ) { - gistentryinit(*ev0p, v.spl_lattr[j], r, (Page) NULL, - (OffsetNumber) 0, v.spl_lattrsize[j], FALSE); - gistentryinit(*ev1p, identry[j].key, r, (Page) NULL, - (OffsetNumber) 0, identry[j].bytes, FALSE); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - if ( DatumGetPointer(v.spl_lattr[j]) != NULL ) - pfree( DatumGetPointer(v.spl_lattr[j]) ); - - v.spl_lattr[j] = datum; - v.spl_lattrsize[j] = datumsize; - } - } else { - v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] |= RIGHT_ADDED; - v.spl_right[ v.spl_nright ] = i+1; - v.spl_nright++; - for( j=1; jrd_att->natts; j++ ) { - gistentryinit(*ev0p, v.spl_rattr[j], r, (Page) NULL, - (OffsetNumber) 0, v.spl_rattrsize[j], FALSE); - gistentryinit(*ev1p, identry[j].key, r, (Page) NULL, - (OffsetNumber) 0, identry[j].bytes, FALSE); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - if ( DatumGetPointer(v.spl_rattr[j]) != NULL ) - pfree( DatumGetPointer(v.spl_rattr[j]) ); - - v.spl_rattr[j] = datum; - v.spl_rattrsize[j] = datumsize; - } - - } - gistFreeAtt( r, identry, decfree ); - } - pfree(evec); - } pfree( v.spl_idgrp ); pfree( v.spl_grpflag ); pfree( v.spl_ngrp ); @@ -1159,6 +1239,7 @@ gistSplit(Relation r, for(i=0; ird_att->natts; j++ ) - if ( DatumGetPointer(v.spl_rattr[j]) != NULL ) + if ( (!giststate->attbyval[j]) && !v.spl_risnull[j] ) pfree( DatumGetPointer(v.spl_rattr[j]) ); } else @@ -1182,7 +1263,7 @@ gistSplit(Relation r, nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); - newtup[0] = gistFormTuple( giststate, r, v.spl_rattr, v.spl_rattrsize ); + newtup[0] = gistFormTuple( giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull ); ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); } @@ -1197,7 +1278,7 @@ gistSplit(Relation r, ReleaseBuffer(leftbuf); for( j=1; jrd_att->natts; j++ ) - if ( DatumGetPointer(v.spl_lattr[j]) != NULL ) + if ( (!giststate->attbyval[j]) && !v.spl_lisnull[j] ) pfree( DatumGetPointer(v.spl_lattr[j]) ); newtup = gistjoinvector(newtup, &nlen, lntup, llen); @@ -1218,7 +1299,7 @@ gistSplit(Relation r, nlen += 1; newtup = (IndexTuple *) repalloc((void *) newtup, sizeof(IndexTuple) * nlen); - newtup[nlen - 1] = gistFormTuple( giststate, r, v.spl_lattr, v.spl_lattrsize ); + newtup[nlen - 1] = gistFormTuple( giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull ); ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); } @@ -1283,7 +1364,7 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ float sum_grow, which_grow[INDEX_MAX_KEYS]; GISTENTRY entry, identry[INDEX_MAX_KEYS]; - bool IsNull, decompvec[INDEX_MAX_KEYS]; + bool IsNull, decompvec[INDEX_MAX_KEYS], isnull[INDEX_MAX_KEYS]; int j; maxoff = PageGetMaxOffsetNumber(p); @@ -1292,7 +1373,7 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ sum_grow=1; gistDeCompressAtt( giststate, r, it, (Page) NULL, (OffsetNumber) 0, - identry, decompvec ); + identry, decompvec, isnull ); for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) { @@ -1301,13 +1382,10 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); datum = index_getattr(itup, j+1, r->rd_att, &IsNull); - gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE( datum, r, j+1, IsNull ), FALSE); - FunctionCall3(&giststate->penaltyFn[j], - PointerGetDatum(&entry), - PointerGetDatum(&identry[j]), - PointerGetDatum(&usize)); + gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE( datum, r, j+1, IsNull ), FALSE, IsNull); + gistpenalty( giststate, j, &entry, IsNull, &identry[j], isnull[j], &usize); - if (DatumGetPointer(entry.key) != NULL && entry.key != datum) + if ( (!giststate->attbyval[j]) && entry.key != datum) pfree(DatumGetPointer(entry.key)); if ( which_grow[j]<0 || usize < which_grow[j] ) { @@ -1498,6 +1576,9 @@ initGISTstate(GISTSTATE *giststate, Relation index) fmgr_info(penalty_proc, &((giststate->penaltyFn)[i]) ); fmgr_info(picksplit_proc, &((giststate->picksplitFn)[i]) ); fmgr_info(equal_proc, &((giststate->equalFn)[i]) ); + + giststate->attbyval[i] = + index->rd_att->attrs[i]->attbyval; } /* see if key type is different from type of attribute being indexed */ @@ -1584,14 +1665,14 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l) + int b, bool l, bool isNull) { GISTENTRY *dep; gistentryinit(*e, k, r, pg, o, b, l); if (giststate->haskeytype) { - if ( b ) { + if ( b && ! isNull ) { dep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], PointerGetDatum(e))); @@ -1612,86 +1693,107 @@ gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, static void gistcentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, - Page pg, OffsetNumber o, int b, bool l) + Page pg, OffsetNumber o, int b, bool l, bool isNull) { GISTENTRY *cep; gistentryinit(*e, k, r, pg, o, b, l); if (giststate->haskeytype) { - cep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], + if ( ! isNull ) { + cep = (GISTENTRY *) + DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], PointerGetDatum(e))); - gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, - cep->bytes, cep->leafkey); - if (cep != e) - pfree(cep); + gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, + cep->bytes, cep->leafkey); + if (cep != e) + pfree(cep); + } else { + gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); + } } } static IndexTuple gistFormTuple( GISTSTATE *giststate, Relation r, - Datum attdata[], int datumsize[] ) + Datum attdata[], int datumsize[], bool isnull[] ) { IndexTuple tup; - char isnull[INDEX_MAX_KEYS]; + char isnullchar[INDEX_MAX_KEYS]; bool whatfree[INDEX_MAX_KEYS]; GISTENTRY centry[INDEX_MAX_KEYS]; Datum compatt[INDEX_MAX_KEYS]; int j; for (j = 0; j < r->rd_att->natts; j++) { - gistcentryinit(giststate, j, ¢ry[j], attdata[j], - (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, - datumsize[j], FALSE); - isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n'; - compatt[j] = centry[j].key; - if ( DatumGetPointer(centry[j].key) != NULL ) { - whatfree[j] = TRUE; - if ( centry[j].key != attdata[j] ) - pfree(DatumGetPointer(attdata[j])); - } else + if ( isnull[j] ) { + isnullchar[j] = 'n'; + compatt[j] = (Datum) 0; whatfree[j] = FALSE; + } else { + gistcentryinit(giststate, j, ¢ry[j], attdata[j], + (Relation) NULL, (Page) NULL, (OffsetNumber) NULL, + datumsize[j], FALSE, FALSE); + isnullchar[j] = ' '; + compatt[j] = centry[j].key; + if ( !giststate->attbyval[j] ) { + whatfree[j] = TRUE; + if ( centry[j].key != attdata[j] ) + pfree(DatumGetPointer(attdata[j])); + } else + whatfree[j] = FALSE; + } } - tup = (IndexTuple) index_formtuple(r->rd_att, compatt, isnull); + tup = (IndexTuple) index_formtuple(r->rd_att, compatt, isnullchar); for (j = 0; j < r->rd_att->natts; j++) if ( whatfree[j] ) pfree(DatumGetPointer(compatt[j])); return tup; } -static bool +static void gistDeCompressAtt( GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY attdata[], bool decompvec[] ) { - bool allIsNull=true; - bool IsNull; + OffsetNumber o, GISTENTRY attdata[], bool decompvec[], bool isnull[] ) { int i; Datum datum; for(i=0; i < r->rd_att->natts; i++ ) { - datum = index_getattr(tuple, i+1, r->rd_att, &IsNull); - if ( ! IsNull ) allIsNull = false; + datum = index_getattr(tuple, i+1, r->rd_att, &isnull[i]); gistdentryinit(giststate, i, &attdata[i], datum, r, p, o, - ATTSIZE( datum, r, i+1, IsNull ), FALSE); - if (attdata[i].key == datum || - DatumGetPointer(attdata[i].key) == NULL ) + ATTSIZE( datum, r, i+1, isnull[i] ), FALSE, isnull[i]); + if ( giststate->attbyval[i] ) decompvec[i] = FALSE; - else - decompvec[i] = TRUE; + else { + if (attdata[i].key == datum || isnull[i] ) + decompvec[i] = FALSE; + else + decompvec[i] = TRUE; + } } - - return allIsNull; } static void gistFreeAtt( Relation r, GISTENTRY attdata[], bool decompvec[] ) { int i; for(i=0; i < r->rd_att->natts; i++ ) - if ( decompvec[i] && DatumGetPointer(attdata[i].key) != NULL ) + if ( decompvec[i] ) pfree( DatumGetPointer(attdata[i].key) ); } + +static void +gistpenalty( GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2, float *penalty ){ + if ( giststate->penaltyFn[attno].fn_strict && ( isNull1 || isNull2 ) ) + *penalty=0.0; + else + FunctionCall3(&giststate->penaltyFn[attno], + PointerGetDatum(key1), + PointerGetDatum(key2), + PointerGetDatum(&penalty)); +} #ifdef GISTDEBUG static void diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 028ff601d9..d334b56928 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gistget.c,v 1.28 2001/05/31 18:16:54 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gistget.c,v 1.29 2001/08/10 14:34:28 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -254,7 +254,7 @@ gistindex_keytest(IndexTuple tuple, gistdentryinit(giststate, key[0].sk_attno-1, &de, datum, r, p, offset, IndexTupleSize(tuple) - sizeof(IndexTupleData), - FALSE); + FALSE, isNull); if (key[0].sk_flags & SK_COMMUTE) { diff --git a/src/include/access/gist.h b/src/include/access/gist.h index b555a195db..af9b7c3b0d 100644 --- a/src/include/access/gist.h +++ b/src/include/access/gist.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: gist.h,v 1.29 2001/07/15 22:48:18 tgl Exp $ + * $Id: gist.h,v 1.30 2001/08/10 14:34:28 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -72,6 +72,7 @@ typedef struct GISTSTATE FmgrInfo penaltyFn[INDEX_MAX_KEYS]; FmgrInfo picksplitFn[INDEX_MAX_KEYS]; FmgrInfo equalFn[INDEX_MAX_KEYS]; + bool attbyval[INDEX_MAX_KEYS]; bool haskeytype; bool keytypbyval; } GISTSTATE; @@ -124,12 +125,14 @@ typedef struct GIST_SPLITVEC Datum spl_ldatum; /* Union of keys in spl_left */ Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in spl_left */ int spl_lattrsize[INDEX_MAX_KEYS]; + bool spl_lisnull[INDEX_MAX_KEYS]; OffsetNumber *spl_right; /* array of entries that go right */ int spl_nright; /* size of the array */ Datum spl_rdatum; /* Union of keys in spl_right */ Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in spl_right */ int spl_rattrsize[INDEX_MAX_KEYS]; + bool spl_risnull[INDEX_MAX_KEYS]; int *spl_idgrp; int *spl_ngrp; /* number in each group */ @@ -168,7 +171,7 @@ extern void gistfreestack(GISTSTACK *s); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l); + int b, bool l, bool isNull); extern StrategyNumber RelationGetGISTStrategy(Relation, AttrNumber, RegProcedure);