Repair a number of places that didn't bother to check whether PageAddItem

succeeds or not.  Revise rtree page split algorithm to take care about
making a feasible split --- ie, will the incoming tuple actually fit?
Failure to make a feasible split, combined with failure to notice the
failure, account for Jim Stone's recent bug report.  I suspect that
hash and gist indices may have the same type of bug, but at least now
we'll get error messages rather than silent failures if so.  Also clean
up rtree code to use Datum rather than char* where appropriate.
This commit is contained in:
Tom Lane 2001-03-07 21:20:26 +00:00
parent 296c806dd5
commit b109b03fea
6 changed files with 336 additions and 192 deletions

View File

@ -6,7 +6,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.70 2001/02/22 21:48:48 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.71 2001/03/07 21:20:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -395,6 +395,9 @@ gistPageAddItem(GISTSTATE *giststate,
*newtup = gist_tuple_replacekey(r, tmpcentry, itup);
retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
offsetNumber, flags);
if (retval == InvalidOffsetNumber)
elog(ERROR, "gist: failed to add index item to %s",
RelationGetRelationName(r));
/* be tidy */
if (tmpcentry.pred && tmpcentry.pred != dentry->pred
&& tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData)))

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.21 2001/01/24 19:42:47 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.22 2001/03/07 21:20:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -230,7 +230,10 @@ _hash_pgaddtup(Relation rel,
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
itup_off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED);
if (PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED)
== InvalidOffsetNumber)
elog(ERROR, "_hash_pgaddtup: failed to add index item to %s",
RelationGetRelationName(rel));
/* write the buffer, but hold our lock */
_hash_wrtnorelbuf(rel, buf);

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.28 2001/01/24 19:42:47 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.29 2001/03/07 21:20:26 tgl Exp $
*
* NOTES
* Overflow pages look like ordinary relation pages.
@ -564,7 +564,10 @@ _hash_squeezebucket(Relation rel,
* page.
*/
woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED);
if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
== InvalidOffsetNumber)
elog(ERROR, "_hash_squeezebucket: failed to add index item to %s",
RelationGetRelationName(rel));
/*
* delete the tuple from the "read" page. PageIndexTupleDelete

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.29 2001/01/24 19:42:47 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.30 2001/03/07 21:20:26 tgl Exp $
*
* NOTES
* Postgres hash pages look like ordinary relation pages. The opaque
@ -619,7 +619,10 @@ _hash_splitpage(Relation rel,
}
noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED);
if (PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED)
== InvalidOffsetNumber)
elog(ERROR, "_hash_splitpage: failed to add index item to %s",
RelationGetRelationName(rel));
_hash_wrtnorelbuf(rel, nbuf);
/*

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.59 2001/01/29 00:39:15 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.60 2001/03/07 21:20:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -18,20 +18,41 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/rtree.h"
#include "access/xlogutils.h"
#include "catalog/index.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "access/xlogutils.h"
/*
* XXX We assume that all datatypes indexable in rtrees are pass-by-reference.
* To fix this, you'd need to improve the IndexTupleGetDatum() macro, and
* do something with the various datum-pfreeing code. However, it's not that
* unreasonable an assumption in practice.
*/
#define IndexTupleGetDatum(itup) \
PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData))
/*
* Space-allocation macros. Note we count the item's line pointer in its size.
*/
#define RTPageAvailSpace \
(BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \
- MAXALIGN(sizeof(RTreePageOpaqueData)))
#define IndexTupleTotalSize(itup) \
(MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData))
#define IndexTupleAttSize(itup) \
(IndexTupleSize(itup) - sizeof(IndexTupleData))
/* results of rtpicksplit() */
typedef struct SPLITVEC
{
OffsetNumber *spl_left;
int spl_nleft;
char *spl_ldatum;
Datum spl_ldatum;
OffsetNumber *spl_right;
int spl_nright;
char *spl_rdatum;
Datum spl_rdatum;
} SPLITVEC;
typedef struct RTSTATE
@ -44,14 +65,14 @@ typedef struct RTSTATE
/* non-export function prototypes */
static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
RTSTATE *rtstate);
static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size,
static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size,
RTSTATE *rtstate);
static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack,
static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack,
IndexTuple itup, RTSTATE *rtstate);
static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, RTSTATE *rtstate);
static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
RTSTATE *rtstate);
static void RTInitBuffer(Buffer b, uint32 f);
static OffsetNumber choose(Relation r, Page p, IndexTuple it,
@ -295,7 +316,7 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
RTSTACK *stack;
InsertIndexResult res;
RTreePageOpaque opaque;
char *datum;
Datum datum;
blk = P_ROOT;
buffer = InvalidBuffer;
@ -332,7 +353,7 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
if (nospace(page, itup))
{
/* need to do a split */
res = dosplit(r, buffer, stack, itup, rtstate);
res = rtdosplit(r, buffer, stack, itup, rtstate);
freestack(stack);
WriteBuffer(buffer); /* don't forget to release buffer! */
return res;
@ -351,14 +372,16 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
OffsetNumberNext(PageGetMaxOffsetNumber(page)),
LP_USED);
}
if (l == InvalidOffsetNumber)
elog(ERROR, "rtdoinsert: failed to add index item to %s",
RelationGetRelationName(r));
WriteBuffer(buffer);
datum = (((char *) itup) + sizeof(IndexTupleData));
datum = IndexTupleGetDatum(itup);
/* now expand the page boundary in the parent to include the new child */
rttighten(r, stack, datum,
(IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate);
rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate);
freestack(stack);
/* build and return an InsertIndexResult for this insertion */
@ -371,12 +394,12 @@ rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
static void
rttighten(Relation r,
RTSTACK *stk,
char *datum,
Datum datum,
int att_size,
RTSTATE *rtstate)
{
char *oldud;
char *tdatum;
Datum oldud;
Datum tdatum;
Page p;
float old_size,
newd_size;
@ -388,20 +411,15 @@ rttighten(Relation r,
b = ReadBuffer(r, stk->rts_blk);
p = BufferGetPage(b);
oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
oldud += sizeof(IndexTupleData);
oldud = IndexTupleGetDatum(PageGetItem(p,
PageGetItemId(p, stk->rts_child)));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(oldud),
FunctionCall2(&rtstate->sizeFn, oldud,
PointerGetDatum(&old_size));
datum = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(oldud),
PointerGetDatum(datum)));
datum = FunctionCall2(&rtstate->unionFn, oldud, datum);
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(datum),
FunctionCall2(&rtstate->sizeFn, datum,
PointerGetDatum(&newd_size));
if (newd_size != old_size)
@ -415,45 +433,46 @@ rttighten(Relation r,
* This is an internal page, so 'oldud' had better be a union
* (constant-length) key, too. (See comment below.)
*/
Assert(VARSIZE(datum) == VARSIZE(oldud));
memmove(oldud, datum, VARSIZE(datum));
Assert(VARSIZE(DatumGetPointer(datum)) ==
VARSIZE(DatumGetPointer(oldud)));
memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
VARSIZE(DatumGetPointer(datum)));
}
else
memmove(oldud, datum, att_size);
{
memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
att_size);
}
WriteBuffer(b);
/*
* The user may be defining an index on variable-sized data (like
* polygons). If so, we need to get a constant-sized datum for
* insertion on the internal page. We do this by calling the
* union proc, which is guaranteed to return a rectangle.
* union proc, which is required to return a rectangle.
*/
tdatum = FunctionCall2(&rtstate->unionFn, datum, datum);
tdatum = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum),
PointerGetDatum(datum)));
rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
pfree(tdatum);
pfree(DatumGetPointer(tdatum));
}
else
ReleaseBuffer(b);
pfree(datum);
pfree(DatumGetPointer(datum));
}
/*
* dosplit -- split a page in the tree.
* rtdosplit -- split a page in the tree.
*
* This is the quadratic-cost split algorithm Guttman describes in
* his paper. The reason we chose it is that you can implement this
* with less information about the data types on which you're operating.
* rtpicksplit does the interesting work of choosing the split.
* This routine just does the bit-pushing.
*/
static InsertIndexResult
dosplit(Relation r,
Buffer buffer,
RTSTACK *stack,
IndexTuple itup,
RTSTATE *rtstate)
rtdosplit(Relation r,
Buffer buffer,
RTSTACK *stack,
IndexTuple itup,
RTSTATE *rtstate)
{
Page p;
Buffer leftbuf,
@ -476,14 +495,15 @@ dosplit(Relation r,
InsertIndexResult res;
char *isnull;
SPLITVEC v;
OffsetNumber *spl_left,
*spl_right;
TupleDesc tupDesc;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
p = (Page) BufferGetPage(buffer);
opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
rtpicksplit(r, p, &v, itup, rtstate);
/*
* The root of the tree is the first block in the relation. If we're
* about to split the root, we need to do some hocus-pocus to enforce
@ -510,8 +530,8 @@ dosplit(Relation r,
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
picksplit(r, p, &v, itup, rtstate);
spl_left = v.spl_left;
spl_right = v.spl_right;
leftoff = rightoff = FirstOffsetNumber;
maxoff = PageGetMaxOffsetNumber(p);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
@ -519,19 +539,24 @@ dosplit(Relation r,
itemid = PageGetItemId(p, i);
item = (IndexTuple) PageGetItem(p, itemid);
if (i == *(v.spl_left))
if (i == *spl_left)
{
PageAddItem(left, (Item) item, IndexTupleSize(item),
leftoff, LP_USED);
if (PageAddItem(left, (Item) item, IndexTupleSize(item),
leftoff, LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtdosplit: failed to copy index item in %s",
RelationGetRelationName(r));
leftoff = OffsetNumberNext(leftoff);
v.spl_left++; /* advance in left split vector */
spl_left++; /* advance in left split vector */
}
else
{
PageAddItem(right, (Item) item, IndexTupleSize(item),
rightoff, LP_USED);
Assert(i == *spl_right);
if (PageAddItem(right, (Item) item, IndexTupleSize(item),
rightoff, LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtdosplit: failed to copy index item in %s",
RelationGetRelationName(r));
rightoff = OffsetNumberNext(rightoff);
v.spl_right++; /* advance in right split vector */
spl_right++; /* advance in right split vector */
}
}
@ -539,21 +564,34 @@ dosplit(Relation r,
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
/* now insert the new index tuple */
if (*(v.spl_left) != FirstOffsetNumber)
if (*spl_left == maxoff+1)
{
PageAddItem(left, (Item) itup, IndexTupleSize(itup),
leftoff, LP_USED);
if (PageAddItem(left, (Item) itup, IndexTupleSize(itup),
leftoff, LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtdosplit: failed to add index item to %s",
RelationGetRelationName(r));
leftoff = OffsetNumberNext(leftoff);
ItemPointerSet(&(res->pointerData), lbknum, leftoff);
spl_left++;
}
else
{
PageAddItem(right, (Item) itup, IndexTupleSize(itup),
rightoff, LP_USED);
Assert(*spl_right == maxoff+1);
if (PageAddItem(right, (Item) itup, IndexTupleSize(itup),
rightoff, LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtdosplit: failed to add index item to %s",
RelationGetRelationName(r));
rightoff = OffsetNumberNext(rightoff);
ItemPointerSet(&(res->pointerData), rbknum, rightoff);
spl_right++;
}
/* Make sure we consumed all of the split vectors, and release 'em */
Assert(*spl_left == InvalidOffsetNumber);
Assert(*spl_right == InvalidOffsetNumber);
pfree(v.spl_left);
pfree(v.spl_right);
if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT)
PageRestoreTempPage(left, p);
WriteBuffer(leftbuf);
@ -579,10 +617,14 @@ dosplit(Relation r,
rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
tupDesc = r->rd_att;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
ltup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_ldatum), isnull);
&(v.spl_ldatum), isnull);
rtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_rdatum), isnull);
&(v.spl_rdatum), isnull);
pfree(isnull);
/* set pointers to new child pages in the internal index tuples */
@ -607,9 +649,9 @@ rtintinsert(Relation r,
IndexTuple old;
Buffer b;
Page p;
char *ldatum,
*rdatum,
*newdatum;
Datum ldatum,
rdatum,
newdatum;
InsertIndexResult res;
if (stk == (RTSTACK *) NULL)
@ -623,7 +665,7 @@ rtintinsert(Relation r,
old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
/*
* This is a hack. Right now, we force rtree keys to be constant
* This is a hack. Right now, we force rtree internal keys to be constant
* size. To fix this, need delete the old key and add both left and
* right for the two new pages. The insertion of left may force a
* split if the new left key is bigger than the old key.
@ -637,30 +679,30 @@ rtintinsert(Relation r,
if (nospace(p, rtup))
{
newdatum = (((char *) ltup) + sizeof(IndexTupleData));
newdatum = IndexTupleGetDatum(ltup);
rttighten(r, stk->rts_parent, newdatum,
(IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate);
res = dosplit(r, b, stk->rts_parent, rtup, rtstate);
IndexTupleAttSize(ltup), rtstate);
res = rtdosplit(r, b, stk->rts_parent, rtup, rtstate);
WriteBuffer(b); /* don't forget to release buffer! -
* 01/31/94 */
pfree(res);
}
else
{
PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
PageGetMaxOffsetNumber(p), LP_USED);
if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
PageGetMaxOffsetNumber(p),
LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtintinsert: failed to add index item to %s",
RelationGetRelationName(r));
WriteBuffer(b);
ldatum = (((char *) ltup) + sizeof(IndexTupleData));
rdatum = (((char *) rtup) + sizeof(IndexTupleData));
newdatum = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(ldatum),
PointerGetDatum(rdatum)));
ldatum = IndexTupleGetDatum(ltup);
rdatum = IndexTupleGetDatum(rtup);
newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum);
rttighten(r, stk->rts_parent, newdatum,
(IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate);
IndexTupleAttSize(rtup), rtstate);
pfree(newdatum);
pfree(DatumGetPointer(newdatum));
}
}
@ -673,33 +715,64 @@ rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
b = ReadBuffer(r, P_ROOT);
RTInitBuffer(b, 0);
p = BufferGetPage(b);
PageAddItem(p, (Item) lt, IndexTupleSize(lt),
FirstOffsetNumber, LP_USED);
PageAddItem(p, (Item) rt, IndexTupleSize(rt),
OffsetNumberNext(FirstOffsetNumber), LP_USED);
if (PageAddItem(p, (Item) lt, IndexTupleSize(lt),
FirstOffsetNumber,
LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtnewroot: failed to add index item to %s",
RelationGetRelationName(r));
if (PageAddItem(p, (Item) rt, IndexTupleSize(rt),
OffsetNumberNext(FirstOffsetNumber),
LP_USED) == InvalidOffsetNumber)
elog(ERROR, "rtnewroot: failed to add index item to %s",
RelationGetRelationName(r));
WriteBuffer(b);
}
/*
* Choose how to split an rtree page into two pages.
*
* We return two vectors of index item numbers, one for the items to be
* put on the left page, one for the items to be put on the right page.
* In addition, the item to be added (itup) is listed in the appropriate
* vector. It is represented by item number N+1 (N = # of items on page).
*
* Both vectors appear in sequence order with a terminating sentinel value
* of InvalidOffsetNumber.
*
* The bounding-box datums for the two new pages are also returned in *v.
*
* This is the quadratic-cost split algorithm Guttman describes in
* his paper. The reason we chose it is that you can implement this
* with less information about the data types on which you're operating.
*
* We must also deal with a consideration not found in Guttman's algorithm:
* variable-length data. In particular, the incoming item might be
* large enough that not just any split will work. In the worst case,
* our "split" may have to be the new item on one page and all the existing
* items on the other. Short of that, we have to take care that we do not
* make a split that leaves both pages too full for the new item.
*/
static void
picksplit(Relation r,
Page page,
SPLITVEC *v,
IndexTuple itup,
RTSTATE *rtstate)
rtpicksplit(Relation r,
Page page,
SPLITVEC *v,
IndexTuple itup,
RTSTATE *rtstate)
{
OffsetNumber maxoff;
OffsetNumber maxoff,
newitemoff;
OffsetNumber i,
j;
IndexTuple item_1,
item_2;
char *datum_alpha,
*datum_beta;
char *datum_l,
*datum_r;
char *union_d,
*union_dl,
*union_dr;
char *inter_d;
Datum datum_alpha,
datum_beta;
Datum datum_l,
datum_r;
Datum union_d,
union_dl,
union_dr;
Datum inter_d;
bool firsttime;
float size_alpha,
size_beta,
@ -714,9 +787,26 @@ picksplit(Relation r,
seed_2 = 0;
OffsetNumber *left,
*right;
Size newitemsz,
item_1_sz,
item_2_sz,
left_avail_space,
right_avail_space;
/*
* First, make sure the new item is not so large that we can't possibly
* fit it on a page, even by itself. (It's sufficient to make this test
* here, since any oversize tuple must lead to a page split attempt.)
*/
newitemsz = IndexTupleTotalSize(itup);
if (newitemsz > RTPageAvailSpace)
elog(ERROR, "rtree: index item size %lu exceeds maximum %lu",
(unsigned long) newitemsz, (unsigned long) RTPageAvailSpace);
maxoff = PageGetMaxOffsetNumber(page);
newitemoff = OffsetNumberNext(maxoff); /* phony index for new item */
/* Make arrays big enough for worst case, including sentinel */
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
@ -727,42 +817,46 @@ picksplit(Relation r,
for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
{
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
datum_alpha = IndexTupleGetDatum(item_1);
item_1_sz = IndexTupleTotalSize(item_1);
for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
{
item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
datum_beta = IndexTupleGetDatum(item_2);
item_2_sz = IndexTupleTotalSize(item_2);
/*
* Ignore seed pairs that don't leave room for the new item
* on either split page.
*/
if (newitemsz + item_1_sz > RTPageAvailSpace &&
newitemsz + item_2_sz > RTPageAvailSpace)
continue;
/* compute the wasted space by unioning these guys */
union_d = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum_alpha),
PointerGetDatum(datum_beta)));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(union_d),
union_d = FunctionCall2(&rtstate->unionFn,
datum_alpha, datum_beta);
FunctionCall2(&rtstate->sizeFn, union_d,
PointerGetDatum(&size_union));
inter_d = (char *)
DatumGetPointer(FunctionCall2(&rtstate->interFn,
PointerGetDatum(datum_alpha),
PointerGetDatum(datum_beta)));
inter_d = FunctionCall2(&rtstate->interFn,
datum_alpha, datum_beta);
/* The interFn may return a NULL pointer (not an SQL null!)
* to indicate no intersection. sizeFn must cope with this.
*/
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(inter_d),
FunctionCall2(&rtstate->sizeFn, inter_d,
PointerGetDatum(&size_inter));
size_waste = size_union - size_inter;
if (union_d != (char *) NULL)
pfree(union_d);
if (inter_d != (char *) NULL)
pfree(inter_d);
if (DatumGetPointer(union_d) != NULL)
pfree(DatumGetPointer(union_d));
if (DatumGetPointer(inter_d) != NULL)
pfree(DatumGetPointer(inter_d));
/*
* are these a more promising split that what we've already
* seen?
*/
if (size_waste > waste || firsttime)
{
waste = size_waste;
@ -773,29 +867,36 @@ picksplit(Relation r,
}
}
left = v->spl_left;
v->spl_nleft = 0;
right = v->spl_right;
v->spl_nright = 0;
if (firsttime)
{
/*
* There is no possible split except to put the new item on its
* own page. Since we still have to compute the union rectangles,
* we play dumb and run through the split algorithm anyway,
* setting seed_1 = first item on page and seed_2 = new item.
*/
seed_1 = FirstOffsetNumber;
seed_2 = newitemoff;
}
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
datum_l = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum_alpha),
PointerGetDatum(datum_alpha)));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(datum_l),
PointerGetDatum(&size_l));
item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
datum_r = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum_beta),
PointerGetDatum(datum_beta)));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(datum_r),
PointerGetDatum(&size_r));
datum_alpha = IndexTupleGetDatum(item_1);
datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha);
FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l));
left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1);
if (seed_2 == newitemoff)
{
item_2 = itup;
/* Needn't leave room for new item in calculations below */
newitemsz = 0;
}
else
item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
datum_beta = IndexTupleGetDatum(item_2);
datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta);
FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r));
right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2);
/*
* Now split up the regions between the two seeds. An important
@ -808,14 +909,20 @@ picksplit(Relation r,
* is handled at the very end, when we have placed all the existing
* tuples and i == maxoff + 1.
*/
left = v->spl_left;
v->spl_nleft = 0;
right = v->spl_right;
v->spl_nright = 0;
maxoff = OffsetNumberNext(maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i))
{
bool left_feasible,
right_feasible,
choose_left;
/*
* If we've already decided where to place this item, just put it
* on the right list. Otherwise, we need to figure out which page
* on the correct list. Otherwise, we need to figure out which page
* needs the least enlargement in order to store the item.
*/
@ -823,58 +930,89 @@ picksplit(Relation r,
{
*left++ = i;
v->spl_nleft++;
/* left avail_space & union already includes this one */
continue;
}
else if (i == seed_2)
if (i == seed_2)
{
*right++ = i;
v->spl_nright++;
/* right avail_space & union already includes this one */
continue;
}
/* okay, which page needs least enlargement? */
if (i == maxoff)
/* Compute new union datums and sizes for both possible additions */
if (i == newitemoff)
{
item_1 = itup;
/* Needn't leave room for new item anymore */
newitemsz = 0;
}
else
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
item_1_sz = IndexTupleTotalSize(item_1);
datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
union_dl = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum_l),
PointerGetDatum(datum_alpha)));
union_dr = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum_r),
PointerGetDatum(datum_alpha)));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(union_dl),
datum_alpha = IndexTupleGetDatum(item_1);
union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
FunctionCall2(&rtstate->sizeFn, union_dl,
PointerGetDatum(&size_alpha));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(union_dr),
FunctionCall2(&rtstate->sizeFn, union_dr,
PointerGetDatum(&size_beta));
/* pick which page to add it to */
if (size_alpha - size_l < size_beta - size_r)
/*
* We prefer the page that shows smaller enlargement of its union area
* (Guttman's algorithm), but we must take care that at least one page
* will still have room for the new item after this one is added.
*
* (We know that all the old items together can fit on one page,
* so we need not worry about any other problem than failing to fit
* the new item.)
*/
left_feasible = (left_avail_space >= item_1_sz &&
((left_avail_space - item_1_sz) >= newitemsz ||
right_avail_space >= newitemsz));
right_feasible = (right_avail_space >= item_1_sz &&
((right_avail_space - item_1_sz) >= newitemsz ||
left_avail_space >= newitemsz));
if (left_feasible && right_feasible)
{
pfree(datum_l);
pfree(union_dr);
/* Both feasible, use Guttman's algorithm */
choose_left = (size_alpha - size_l < size_beta - size_r);
}
else if (left_feasible)
choose_left = true;
else if (right_feasible)
choose_left = false;
else
{
elog(ERROR, "rtpicksplit: failed to find a workable page split");
choose_left = false; /* keep compiler quiet */
}
if (choose_left)
{
pfree(DatumGetPointer(datum_l));
pfree(DatumGetPointer(union_dr));
datum_l = union_dl;
size_l = size_alpha;
left_avail_space -= item_1_sz;
*left++ = i;
v->spl_nleft++;
}
else
{
pfree(datum_r);
pfree(union_dl);
pfree(DatumGetPointer(datum_r));
pfree(DatumGetPointer(union_dl));
datum_r = union_dr;
size_r = size_beta;
right_avail_space -= item_1_sz;
*right++ = i;
v->spl_nright++;
}
}
*left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
*left = *right = InvalidOffsetNumber; /* add ending sentinels */
v->spl_ldatum = datum_l;
v->spl_rdatum = datum_r;
@ -902,34 +1040,28 @@ choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
{
OffsetNumber maxoff;
OffsetNumber i;
char *ud,
*id;
char *datum;
Datum ud,
id;
Datum datum;
float usize,
dsize;
OffsetNumber which;
float which_grow;
id = ((char *) it) + sizeof(IndexTupleData);
id = IndexTupleGetDatum(it);
maxoff = PageGetMaxOffsetNumber(p);
which_grow = -1.0;
which = -1;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
datum = (char *) PageGetItem(p, PageGetItemId(p, i));
datum += sizeof(IndexTupleData);
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(datum),
datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i)));
FunctionCall2(&rtstate->sizeFn, datum,
PointerGetDatum(&dsize));
ud = (char *)
DatumGetPointer(FunctionCall2(&rtstate->unionFn,
PointerGetDatum(datum),
PointerGetDatum(id)));
FunctionCall2(&rtstate->sizeFn,
PointerGetDatum(ud),
ud = FunctionCall2(&rtstate->unionFn, datum, id);
FunctionCall2(&rtstate->sizeFn, ud,
PointerGetDatum(&usize));
pfree(ud);
pfree(DatumGetPointer(ud));
if (which_grow < 0 || usize - dsize < which_grow)
{
which = i;
@ -1026,7 +1158,7 @@ _rtdump(Relation r)
IndexTuple itup;
BlockNumber itblkno;
OffsetNumber itoffno;
char *datum;
Datum datum;
char *itkey;
nblocks = RelationGetNumberOfBlocks(r);
@ -1052,10 +1184,9 @@ _rtdump(Relation r)
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
datum = ((char *) itup);
datum += sizeof(IndexTupleData);
datum = IndexTupleGetDatum(itup);
itkey = DatumGetCString(DirectFunctionCall1(box_out,
PointerGetDatum(datum)));
datum));
printf("\t[%d] size %d heap <%d,%d> key:%s\n",
offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
pfree(itkey);

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.50 2001/02/13 01:57:12 pjw Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.51 2001/03/07 21:20:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -785,7 +785,8 @@ void seq_redo(XLogRecPtr lsn, XLogRecord *record)
itemsz = record->xl_len - sizeof(xl_seq_rec);
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item)item, itemsz,
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(STOP, "seq_redo: failed to add item to page");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);