diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index c7bfa9c962..9e3f935bd6 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -6,7 +6,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.70 2001/02/22 21:48:48 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.71 2001/03/07 21:20:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -395,6 +395,9 @@ gistPageAddItem(GISTSTATE *giststate, *newtup = gist_tuple_replacekey(r, tmpcentry, itup); retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), offsetNumber, flags); + if (retval == InvalidOffsetNumber) + elog(ERROR, "gist: failed to add index item to %s", + RelationGetRelationName(r)); /* be tidy */ if (tmpcentry.pred && tmpcentry.pred != dentry->pred && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData))) diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index e8e750739a..5439dce214 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.21 2001/01/24 19:42:47 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.22 2001/03/07 21:20:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -230,7 +230,10 @@ _hash_pgaddtup(Relation rel, _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); itup_off = OffsetNumberNext(PageGetMaxOffsetNumber(page)); - PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED); + if (PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED) + == InvalidOffsetNumber) + elog(ERROR, "_hash_pgaddtup: failed to add index item to %s", + RelationGetRelationName(rel)); /* write the buffer, but hold our lock */ _hash_wrtnorelbuf(rel, buf); diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c index cd129e6cb7..8e2ed1bb8a 100644 --- a/src/backend/access/hash/hashovfl.c +++ b/src/backend/access/hash/hashovfl.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.28 2001/01/24 19:42:47 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.29 2001/03/07 21:20:26 tgl Exp $ * * NOTES * Overflow pages look like ordinary relation pages. @@ -564,7 +564,10 @@ _hash_squeezebucket(Relation rel, * page. */ woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage)); - PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED); + if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED) + == InvalidOffsetNumber) + elog(ERROR, "_hash_squeezebucket: failed to add index item to %s", + RelationGetRelationName(rel)); /* * delete the tuple from the "read" page. PageIndexTupleDelete diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c index 4fb2cf82b0..156db9078a 100644 --- a/src/backend/access/hash/hashpage.c +++ b/src/backend/access/hash/hashpage.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.29 2001/01/24 19:42:47 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.30 2001/03/07 21:20:26 tgl Exp $ * * NOTES * Postgres hash pages look like ordinary relation pages. The opaque @@ -619,7 +619,10 @@ _hash_splitpage(Relation rel, } noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage)); - PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED); + if (PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED) + == InvalidOffsetNumber) + elog(ERROR, "_hash_splitpage: failed to add index item to %s", + RelationGetRelationName(rel)); _hash_wrtnorelbuf(rel, nbuf); /* diff --git a/src/backend/access/rtree/rtree.c b/src/backend/access/rtree/rtree.c index 9f3948657c..45382d5ef3 100644 --- a/src/backend/access/rtree/rtree.c +++ b/src/backend/access/rtree/rtree.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.59 2001/01/29 00:39:15 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.60 2001/03/07 21:20:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -18,20 +18,41 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/rtree.h" +#include "access/xlogutils.h" #include "catalog/index.h" #include "executor/executor.h" #include "miscadmin.h" -#include "access/xlogutils.h" +/* + * XXX We assume that all datatypes indexable in rtrees are pass-by-reference. + * To fix this, you'd need to improve the IndexTupleGetDatum() macro, and + * do something with the various datum-pfreeing code. However, it's not that + * unreasonable an assumption in practice. + */ +#define IndexTupleGetDatum(itup) \ + PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData)) + +/* + * Space-allocation macros. Note we count the item's line pointer in its size. + */ +#define RTPageAvailSpace \ + (BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \ + - MAXALIGN(sizeof(RTreePageOpaqueData))) +#define IndexTupleTotalSize(itup) \ + (MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData)) +#define IndexTupleAttSize(itup) \ + (IndexTupleSize(itup) - sizeof(IndexTupleData)) + +/* results of rtpicksplit() */ typedef struct SPLITVEC { OffsetNumber *spl_left; int spl_nleft; - char *spl_ldatum; + Datum spl_ldatum; OffsetNumber *spl_right; int spl_nright; - char *spl_rdatum; + Datum spl_rdatum; } SPLITVEC; typedef struct RTSTATE @@ -44,14 +65,14 @@ typedef struct RTSTATE /* non-export function prototypes */ static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate); -static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size, +static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size, RTSTATE *rtstate); -static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack, +static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack, IndexTuple itup, RTSTATE *rtstate); static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup, IndexTuple rtup, RTSTATE *rtstate); static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt); -static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup, +static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup, RTSTATE *rtstate); static void RTInitBuffer(Buffer b, uint32 f); static OffsetNumber choose(Relation r, Page p, IndexTuple it, @@ -295,7 +316,7 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate) RTSTACK *stack; InsertIndexResult res; RTreePageOpaque opaque; - char *datum; + Datum datum; blk = P_ROOT; buffer = InvalidBuffer; @@ -332,7 +353,7 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate) if (nospace(page, itup)) { /* need to do a split */ - res = dosplit(r, buffer, stack, itup, rtstate); + res = rtdosplit(r, buffer, stack, itup, rtstate); freestack(stack); WriteBuffer(buffer); /* don't forget to release buffer! */ return res; @@ -351,14 +372,16 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate) OffsetNumberNext(PageGetMaxOffsetNumber(page)), LP_USED); } + if (l == InvalidOffsetNumber) + elog(ERROR, "rtdoinsert: failed to add index item to %s", + RelationGetRelationName(r)); WriteBuffer(buffer); - datum = (((char *) itup) + sizeof(IndexTupleData)); + datum = IndexTupleGetDatum(itup); /* now expand the page boundary in the parent to include the new child */ - rttighten(r, stack, datum, - (IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate); + rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate); freestack(stack); /* build and return an InsertIndexResult for this insertion */ @@ -371,12 +394,12 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate) static void rttighten(Relation r, RTSTACK *stk, - char *datum, + Datum datum, int att_size, RTSTATE *rtstate) { - char *oldud; - char *tdatum; + Datum oldud; + Datum tdatum; Page p; float old_size, newd_size; @@ -388,20 +411,15 @@ rttighten(Relation r, b = ReadBuffer(r, stk->rts_blk); p = BufferGetPage(b); - oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child)); - oldud += sizeof(IndexTupleData); + oldud = IndexTupleGetDatum(PageGetItem(p, + PageGetItemId(p, stk->rts_child))); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(oldud), + FunctionCall2(&rtstate->sizeFn, oldud, PointerGetDatum(&old_size)); - datum = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(oldud), - PointerGetDatum(datum))); + datum = FunctionCall2(&rtstate->unionFn, oldud, datum); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(datum), + FunctionCall2(&rtstate->sizeFn, datum, PointerGetDatum(&newd_size)); if (newd_size != old_size) @@ -415,45 +433,46 @@ rttighten(Relation r, * This is an internal page, so 'oldud' had better be a union * (constant-length) key, too. (See comment below.) */ - Assert(VARSIZE(datum) == VARSIZE(oldud)); - memmove(oldud, datum, VARSIZE(datum)); + Assert(VARSIZE(DatumGetPointer(datum)) == + VARSIZE(DatumGetPointer(oldud))); + memmove(DatumGetPointer(oldud), DatumGetPointer(datum), + VARSIZE(DatumGetPointer(datum))); } else - memmove(oldud, datum, att_size); + { + memmove(DatumGetPointer(oldud), DatumGetPointer(datum), + att_size); + } WriteBuffer(b); /* * The user may be defining an index on variable-sized data (like * polygons). If so, we need to get a constant-sized datum for * insertion on the internal page. We do this by calling the - * union proc, which is guaranteed to return a rectangle. + * union proc, which is required to return a rectangle. */ + tdatum = FunctionCall2(&rtstate->unionFn, datum, datum); - tdatum = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum), - PointerGetDatum(datum))); rttighten(r, stk->rts_parent, tdatum, att_size, rtstate); - pfree(tdatum); + pfree(DatumGetPointer(tdatum)); } else ReleaseBuffer(b); - pfree(datum); + pfree(DatumGetPointer(datum)); } /* - * dosplit -- split a page in the tree. + * rtdosplit -- split a page in the tree. * - * This is the quadratic-cost split algorithm Guttman describes in - * his paper. The reason we chose it is that you can implement this - * with less information about the data types on which you're operating. + * rtpicksplit does the interesting work of choosing the split. + * This routine just does the bit-pushing. */ static InsertIndexResult -dosplit(Relation r, - Buffer buffer, - RTSTACK *stack, - IndexTuple itup, - RTSTATE *rtstate) +rtdosplit(Relation r, + Buffer buffer, + RTSTACK *stack, + IndexTuple itup, + RTSTATE *rtstate) { Page p; Buffer leftbuf, @@ -476,14 +495,15 @@ dosplit(Relation r, InsertIndexResult res; char *isnull; SPLITVEC v; + OffsetNumber *spl_left, + *spl_right; TupleDesc tupDesc; - isnull = (char *) palloc(r->rd_rel->relnatts); - for (blank = 0; blank < r->rd_rel->relnatts; blank++) - isnull[blank] = ' '; p = (Page) BufferGetPage(buffer); opaque = (RTreePageOpaque) PageGetSpecialPointer(p); + rtpicksplit(r, p, &v, itup, rtstate); + /* * The root of the tree is the first block in the relation. If we're * about to split the root, we need to do some hocus-pocus to enforce @@ -510,8 +530,8 @@ dosplit(Relation r, rbknum = BufferGetBlockNumber(rightbuf); right = (Page) BufferGetPage(rightbuf); - picksplit(r, p, &v, itup, rtstate); - + spl_left = v.spl_left; + spl_right = v.spl_right; leftoff = rightoff = FirstOffsetNumber; maxoff = PageGetMaxOffsetNumber(p); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) @@ -519,19 +539,24 @@ dosplit(Relation r, itemid = PageGetItemId(p, i); item = (IndexTuple) PageGetItem(p, itemid); - if (i == *(v.spl_left)) + if (i == *spl_left) { - PageAddItem(left, (Item) item, IndexTupleSize(item), - leftoff, LP_USED); + if (PageAddItem(left, (Item) item, IndexTupleSize(item), + leftoff, LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtdosplit: failed to copy index item in %s", + RelationGetRelationName(r)); leftoff = OffsetNumberNext(leftoff); - v.spl_left++; /* advance in left split vector */ + spl_left++; /* advance in left split vector */ } else { - PageAddItem(right, (Item) item, IndexTupleSize(item), - rightoff, LP_USED); + Assert(i == *spl_right); + if (PageAddItem(right, (Item) item, IndexTupleSize(item), + rightoff, LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtdosplit: failed to copy index item in %s", + RelationGetRelationName(r)); rightoff = OffsetNumberNext(rightoff); - v.spl_right++; /* advance in right split vector */ + spl_right++; /* advance in right split vector */ } } @@ -539,21 +564,34 @@ dosplit(Relation r, res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); /* now insert the new index tuple */ - if (*(v.spl_left) != FirstOffsetNumber) + if (*spl_left == maxoff+1) { - PageAddItem(left, (Item) itup, IndexTupleSize(itup), - leftoff, LP_USED); + if (PageAddItem(left, (Item) itup, IndexTupleSize(itup), + leftoff, LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtdosplit: failed to add index item to %s", + RelationGetRelationName(r)); leftoff = OffsetNumberNext(leftoff); ItemPointerSet(&(res->pointerData), lbknum, leftoff); + spl_left++; } else { - PageAddItem(right, (Item) itup, IndexTupleSize(itup), - rightoff, LP_USED); + Assert(*spl_right == maxoff+1); + if (PageAddItem(right, (Item) itup, IndexTupleSize(itup), + rightoff, LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtdosplit: failed to add index item to %s", + RelationGetRelationName(r)); rightoff = OffsetNumberNext(rightoff); ItemPointerSet(&(res->pointerData), rbknum, rightoff); + spl_right++; } + /* Make sure we consumed all of the split vectors, and release 'em */ + Assert(*spl_left == InvalidOffsetNumber); + Assert(*spl_right == InvalidOffsetNumber); + pfree(v.spl_left); + pfree(v.spl_right); + if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT) PageRestoreTempPage(left, p); WriteBuffer(leftbuf); @@ -579,10 +617,14 @@ dosplit(Relation r, rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber); tupDesc = r->rd_att; + isnull = (char *) palloc(r->rd_rel->relnatts); + for (blank = 0; blank < r->rd_rel->relnatts; blank++) + isnull[blank] = ' '; + ltup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(v.spl_ldatum), isnull); + &(v.spl_ldatum), isnull); rtup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(v.spl_rdatum), isnull); + &(v.spl_rdatum), isnull); pfree(isnull); /* set pointers to new child pages in the internal index tuples */ @@ -607,9 +649,9 @@ rtintinsert(Relation r, IndexTuple old; Buffer b; Page p; - char *ldatum, - *rdatum, - *newdatum; + Datum ldatum, + rdatum, + newdatum; InsertIndexResult res; if (stk == (RTSTACK *) NULL) @@ -623,7 +665,7 @@ rtintinsert(Relation r, old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child)); /* - * This is a hack. Right now, we force rtree keys to be constant + * This is a hack. Right now, we force rtree internal keys to be constant * size. To fix this, need delete the old key and add both left and * right for the two new pages. The insertion of left may force a * split if the new left key is bigger than the old key. @@ -637,30 +679,30 @@ rtintinsert(Relation r, if (nospace(p, rtup)) { - newdatum = (((char *) ltup) + sizeof(IndexTupleData)); + newdatum = IndexTupleGetDatum(ltup); rttighten(r, stk->rts_parent, newdatum, - (IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate); - res = dosplit(r, b, stk->rts_parent, rtup, rtstate); + IndexTupleAttSize(ltup), rtstate); + res = rtdosplit(r, b, stk->rts_parent, rtup, rtstate); WriteBuffer(b); /* don't forget to release buffer! - * 01/31/94 */ pfree(res); } else { - PageAddItem(p, (Item) rtup, IndexTupleSize(rtup), - PageGetMaxOffsetNumber(p), LP_USED); + if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup), + PageGetMaxOffsetNumber(p), + LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtintinsert: failed to add index item to %s", + RelationGetRelationName(r)); WriteBuffer(b); - ldatum = (((char *) ltup) + sizeof(IndexTupleData)); - rdatum = (((char *) rtup) + sizeof(IndexTupleData)); - newdatum = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(ldatum), - PointerGetDatum(rdatum))); + ldatum = IndexTupleGetDatum(ltup); + rdatum = IndexTupleGetDatum(rtup); + newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum); rttighten(r, stk->rts_parent, newdatum, - (IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate); + IndexTupleAttSize(rtup), rtstate); - pfree(newdatum); + pfree(DatumGetPointer(newdatum)); } } @@ -673,33 +715,64 @@ rtnewroot(Relation r, IndexTuple lt, IndexTuple rt) b = ReadBuffer(r, P_ROOT); RTInitBuffer(b, 0); p = BufferGetPage(b); - PageAddItem(p, (Item) lt, IndexTupleSize(lt), - FirstOffsetNumber, LP_USED); - PageAddItem(p, (Item) rt, IndexTupleSize(rt), - OffsetNumberNext(FirstOffsetNumber), LP_USED); + if (PageAddItem(p, (Item) lt, IndexTupleSize(lt), + FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtnewroot: failed to add index item to %s", + RelationGetRelationName(r)); + if (PageAddItem(p, (Item) rt, IndexTupleSize(rt), + OffsetNumberNext(FirstOffsetNumber), + LP_USED) == InvalidOffsetNumber) + elog(ERROR, "rtnewroot: failed to add index item to %s", + RelationGetRelationName(r)); WriteBuffer(b); } +/* + * Choose how to split an rtree page into two pages. + * + * We return two vectors of index item numbers, one for the items to be + * put on the left page, one for the items to be put on the right page. + * In addition, the item to be added (itup) is listed in the appropriate + * vector. It is represented by item number N+1 (N = # of items on page). + * + * Both vectors appear in sequence order with a terminating sentinel value + * of InvalidOffsetNumber. + * + * The bounding-box datums for the two new pages are also returned in *v. + * + * This is the quadratic-cost split algorithm Guttman describes in + * his paper. The reason we chose it is that you can implement this + * with less information about the data types on which you're operating. + * + * We must also deal with a consideration not found in Guttman's algorithm: + * variable-length data. In particular, the incoming item might be + * large enough that not just any split will work. In the worst case, + * our "split" may have to be the new item on one page and all the existing + * items on the other. Short of that, we have to take care that we do not + * make a split that leaves both pages too full for the new item. + */ static void -picksplit(Relation r, - Page page, - SPLITVEC *v, - IndexTuple itup, - RTSTATE *rtstate) +rtpicksplit(Relation r, + Page page, + SPLITVEC *v, + IndexTuple itup, + RTSTATE *rtstate) { - OffsetNumber maxoff; + OffsetNumber maxoff, + newitemoff; OffsetNumber i, j; IndexTuple item_1, item_2; - char *datum_alpha, - *datum_beta; - char *datum_l, - *datum_r; - char *union_d, - *union_dl, - *union_dr; - char *inter_d; + Datum datum_alpha, + datum_beta; + Datum datum_l, + datum_r; + Datum union_d, + union_dl, + union_dr; + Datum inter_d; bool firsttime; float size_alpha, size_beta, @@ -714,9 +787,26 @@ picksplit(Relation r, seed_2 = 0; OffsetNumber *left, *right; + Size newitemsz, + item_1_sz, + item_2_sz, + left_avail_space, + right_avail_space; + + /* + * First, make sure the new item is not so large that we can't possibly + * fit it on a page, even by itself. (It's sufficient to make this test + * here, since any oversize tuple must lead to a page split attempt.) + */ + newitemsz = IndexTupleTotalSize(itup); + if (newitemsz > RTPageAvailSpace) + elog(ERROR, "rtree: index item size %lu exceeds maximum %lu", + (unsigned long) newitemsz, (unsigned long) RTPageAvailSpace); maxoff = PageGetMaxOffsetNumber(page); + newitemoff = OffsetNumberNext(maxoff); /* phony index for new item */ + /* Make arrays big enough for worst case, including sentinel */ nbytes = (maxoff + 2) * sizeof(OffsetNumber); v->spl_left = (OffsetNumber *) palloc(nbytes); v->spl_right = (OffsetNumber *) palloc(nbytes); @@ -727,42 +817,46 @@ picksplit(Relation r, for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) { item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); - datum_alpha = ((char *) item_1) + sizeof(IndexTupleData); + datum_alpha = IndexTupleGetDatum(item_1); + item_1_sz = IndexTupleTotalSize(item_1); + for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) { item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j)); - datum_beta = ((char *) item_2) + sizeof(IndexTupleData); + datum_beta = IndexTupleGetDatum(item_2); + item_2_sz = IndexTupleTotalSize(item_2); + + /* + * Ignore seed pairs that don't leave room for the new item + * on either split page. + */ + if (newitemsz + item_1_sz > RTPageAvailSpace && + newitemsz + item_2_sz > RTPageAvailSpace) + continue; /* compute the wasted space by unioning these guys */ - union_d = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum_alpha), - PointerGetDatum(datum_beta))); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(union_d), + union_d = FunctionCall2(&rtstate->unionFn, + datum_alpha, datum_beta); + FunctionCall2(&rtstate->sizeFn, union_d, PointerGetDatum(&size_union)); - inter_d = (char *) - DatumGetPointer(FunctionCall2(&rtstate->interFn, - PointerGetDatum(datum_alpha), - PointerGetDatum(datum_beta))); + inter_d = FunctionCall2(&rtstate->interFn, + datum_alpha, datum_beta); /* The interFn may return a NULL pointer (not an SQL null!) * to indicate no intersection. sizeFn must cope with this. */ - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(inter_d), + FunctionCall2(&rtstate->sizeFn, inter_d, PointerGetDatum(&size_inter)); size_waste = size_union - size_inter; - if (union_d != (char *) NULL) - pfree(union_d); - if (inter_d != (char *) NULL) - pfree(inter_d); + if (DatumGetPointer(union_d) != NULL) + pfree(DatumGetPointer(union_d)); + if (DatumGetPointer(inter_d) != NULL) + pfree(DatumGetPointer(inter_d)); /* * are these a more promising split that what we've already * seen? */ - if (size_waste > waste || firsttime) { waste = size_waste; @@ -773,29 +867,36 @@ picksplit(Relation r, } } - left = v->spl_left; - v->spl_nleft = 0; - right = v->spl_right; - v->spl_nright = 0; + if (firsttime) + { + /* + * There is no possible split except to put the new item on its + * own page. Since we still have to compute the union rectangles, + * we play dumb and run through the split algorithm anyway, + * setting seed_1 = first item on page and seed_2 = new item. + */ + seed_1 = FirstOffsetNumber; + seed_2 = newitemoff; + } item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1)); - datum_alpha = ((char *) item_1) + sizeof(IndexTupleData); - datum_l = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum_alpha), - PointerGetDatum(datum_alpha))); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(datum_l), - PointerGetDatum(&size_l)); - item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2)); - datum_beta = ((char *) item_2) + sizeof(IndexTupleData); - datum_r = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum_beta), - PointerGetDatum(datum_beta))); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(datum_r), - PointerGetDatum(&size_r)); + datum_alpha = IndexTupleGetDatum(item_1); + datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha); + FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l)); + left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1); + + if (seed_2 == newitemoff) + { + item_2 = itup; + /* Needn't leave room for new item in calculations below */ + newitemsz = 0; + } + else + item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2)); + datum_beta = IndexTupleGetDatum(item_2); + datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta); + FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r)); + right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2); /* * Now split up the regions between the two seeds. An important @@ -808,14 +909,20 @@ picksplit(Relation r, * is handled at the very end, when we have placed all the existing * tuples and i == maxoff + 1. */ + left = v->spl_left; + v->spl_nleft = 0; + right = v->spl_right; + v->spl_nright = 0; - maxoff = OffsetNumberNext(maxoff); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i)) { + bool left_feasible, + right_feasible, + choose_left; /* * If we've already decided where to place this item, just put it - * on the right list. Otherwise, we need to figure out which page + * on the correct list. Otherwise, we need to figure out which page * needs the least enlargement in order to store the item. */ @@ -823,58 +930,89 @@ picksplit(Relation r, { *left++ = i; v->spl_nleft++; + /* left avail_space & union already includes this one */ continue; } - else if (i == seed_2) + if (i == seed_2) { *right++ = i; v->spl_nright++; + /* right avail_space & union already includes this one */ continue; } - /* okay, which page needs least enlargement? */ - if (i == maxoff) + /* Compute new union datums and sizes for both possible additions */ + if (i == newitemoff) + { item_1 = itup; + /* Needn't leave room for new item anymore */ + newitemsz = 0; + } else item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); + item_1_sz = IndexTupleTotalSize(item_1); - datum_alpha = ((char *) item_1) + sizeof(IndexTupleData); - union_dl = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum_l), - PointerGetDatum(datum_alpha))); - union_dr = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum_r), - PointerGetDatum(datum_alpha))); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(union_dl), + datum_alpha = IndexTupleGetDatum(item_1); + union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha); + union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha); + FunctionCall2(&rtstate->sizeFn, union_dl, PointerGetDatum(&size_alpha)); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(union_dr), + FunctionCall2(&rtstate->sizeFn, union_dr, PointerGetDatum(&size_beta)); - /* pick which page to add it to */ - if (size_alpha - size_l < size_beta - size_r) + /* + * We prefer the page that shows smaller enlargement of its union area + * (Guttman's algorithm), but we must take care that at least one page + * will still have room for the new item after this one is added. + * + * (We know that all the old items together can fit on one page, + * so we need not worry about any other problem than failing to fit + * the new item.) + */ + left_feasible = (left_avail_space >= item_1_sz && + ((left_avail_space - item_1_sz) >= newitemsz || + right_avail_space >= newitemsz)); + right_feasible = (right_avail_space >= item_1_sz && + ((right_avail_space - item_1_sz) >= newitemsz || + left_avail_space >= newitemsz)); + if (left_feasible && right_feasible) { - pfree(datum_l); - pfree(union_dr); + /* Both feasible, use Guttman's algorithm */ + choose_left = (size_alpha - size_l < size_beta - size_r); + } + else if (left_feasible) + choose_left = true; + else if (right_feasible) + choose_left = false; + else + { + elog(ERROR, "rtpicksplit: failed to find a workable page split"); + choose_left = false; /* keep compiler quiet */ + } + + if (choose_left) + { + pfree(DatumGetPointer(datum_l)); + pfree(DatumGetPointer(union_dr)); datum_l = union_dl; size_l = size_alpha; + left_avail_space -= item_1_sz; *left++ = i; v->spl_nleft++; } else { - pfree(datum_r); - pfree(union_dl); + pfree(DatumGetPointer(datum_r)); + pfree(DatumGetPointer(union_dl)); datum_r = union_dr; size_r = size_beta; + right_avail_space -= item_1_sz; *right++ = i; v->spl_nright++; } } - *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ + + *left = *right = InvalidOffsetNumber; /* add ending sentinels */ v->spl_ldatum = datum_l; v->spl_rdatum = datum_r; @@ -902,34 +1040,28 @@ choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate) { OffsetNumber maxoff; OffsetNumber i; - char *ud, - *id; - char *datum; + Datum ud, + id; + Datum datum; float usize, dsize; OffsetNumber which; float which_grow; - id = ((char *) it) + sizeof(IndexTupleData); + id = IndexTupleGetDatum(it); maxoff = PageGetMaxOffsetNumber(p); which_grow = -1.0; which = -1; for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { - datum = (char *) PageGetItem(p, PageGetItemId(p, i)); - datum += sizeof(IndexTupleData); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(datum), + datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i))); + FunctionCall2(&rtstate->sizeFn, datum, PointerGetDatum(&dsize)); - ud = (char *) - DatumGetPointer(FunctionCall2(&rtstate->unionFn, - PointerGetDatum(datum), - PointerGetDatum(id))); - FunctionCall2(&rtstate->sizeFn, - PointerGetDatum(ud), + ud = FunctionCall2(&rtstate->unionFn, datum, id); + FunctionCall2(&rtstate->sizeFn, ud, PointerGetDatum(&usize)); - pfree(ud); + pfree(DatumGetPointer(ud)); if (which_grow < 0 || usize - dsize < which_grow) { which = i; @@ -1026,7 +1158,7 @@ _rtdump(Relation r) IndexTuple itup; BlockNumber itblkno; OffsetNumber itoffno; - char *datum; + Datum datum; char *itkey; nblocks = RelationGetNumberOfBlocks(r); @@ -1052,10 +1184,9 @@ _rtdump(Relation r) itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); itblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid)); - datum = ((char *) itup); - datum += sizeof(IndexTupleData); + datum = IndexTupleGetDatum(itup); itkey = DatumGetCString(DirectFunctionCall1(box_out, - PointerGetDatum(datum))); + datum)); printf("\t[%d] size %d heap <%d,%d> key:%s\n", offnum, IndexTupleSize(itup), itblkno, itoffno, itkey); pfree(itkey); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 5ad9a87f59..04398423b6 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.50 2001/02/13 01:57:12 pjw Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.51 2001/03/07 21:20:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -785,7 +785,8 @@ void seq_redo(XLogRecPtr lsn, XLogRecord *record) itemsz = record->xl_len - sizeof(xl_seq_rec); itemsz = MAXALIGN(itemsz); if (PageAddItem(page, (Item)item, itemsz, - FirstOffsetNumber, LP_USED) == InvalidOffsetNumber) + FirstOffsetNumber, LP_USED) == InvalidOffsetNumber) + elog(STOP, "seq_redo: failed to add item to page"); PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID);