diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 4ce461d446..d207b7ecfa 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.134 2006/05/10 23:18:38 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -347,7 +347,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * Form index tuples vector to split: * remove old tuple if t's needed and add new tuples to vector */ - itvec = gistextractbuffer(state->stack->buffer, &tlen); + itvec = gistextractpage(state->stack->page, &tlen); if ( !is_leaf ) { /* on inner page we should remove old tuple */ int pos = state->stack->childoffnum - FirstOffsetNumber; @@ -501,7 +501,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) } rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, - offs, noffs, false, + offs, noffs, state->itup, state->ituplen, &(state->key)); @@ -1157,7 +1157,7 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke XLogRecData *rdata; rdata = formUpdateRdata(r->rd_node, buffer, - NULL, 0, false, + NULL, 0, itup, len, key); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index d5d6405100..ca5a9d652d 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -112,18 +112,17 @@ gistfitpage(IndexTuple *itvec, int len) { * Read buffer into itup vector */ IndexTuple * -gistextractbuffer(Buffer buffer, int *len /* out */ ) +gistextractpage(Page page, int *len /* out */ ) { OffsetNumber i, maxoff; IndexTuple *itvec; - Page p = (Page) BufferGetPage(buffer); - maxoff = PageGetMaxOffsetNumber(p); + maxoff = PageGetMaxOffsetNumber(page); *len = maxoff; itvec = palloc(sizeof(IndexTuple) * maxoff); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); return itvec; } diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index e81c0ebf48..9b32304d1a 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -47,23 +47,235 @@ typedef struct bool emptypage; } ArrayTuple; +/* + * Make union of keys on page + */ +static IndexTuple +PageMakeUnionKey(GistVacuum *gv, Buffer buffer) { + Page page = BufferGetPage( buffer ); + IndexTuple *vec, + tmp, res; + int veclen = 0; + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + vec = gistextractpage(page, &veclen); + /* we call gistunion() in temprorary context because user-defined functions called in gistunion() + may do not free all memory */ + tmp = gistunion(gv->index, vec, veclen, &(gv->giststate)); + MemoryContextSwitchTo(oldCtx); + + res = (IndexTuple) palloc(IndexTupleSize(tmp)); + memcpy(res, tmp, IndexTupleSize(tmp)); + + ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer)); + GistTupleSetValid(res); + + MemoryContextReset(gv->opCtx); + + return res; +} + +static void +gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) { + Buffer buffer; + Page page; + + buffer = ReadBuffer(gv->index, blkno); + LockBuffer(buffer, GIST_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + if ( !GistPageIsLeaf(page) ) { + int i; + + for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i)) { + ItemId iid = PageGetItemId(page, i); + IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid); + gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid))); + } + } + + START_CRIT_SECTION(); + + MarkBufferDirty(buffer); + + page = (Page) BufferGetPage(buffer); + GistPageSetDeleted(page); + gv->result->std.pages_deleted++; + + if (!gv->index->rd_istemp) + { + XLogRecData rdata; + XLogRecPtr recptr; + gistxlogPageDelete xlrec; + + xlrec.node = gv->index->rd_node; + xlrec.blkno = blkno; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof(gistxlogPageDelete); + rdata.next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + else + PageSetLSN(page, XLogRecPtrForTemp); + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); +} + +static Page +GistPageGetCopyPage( Page page ) { + Size pageSize = PageGetPageSize( page ); + Page tmppage; + + tmppage=(Page)palloc( pageSize ); + memcpy( tmppage, page, pageSize ); + + return tmppage; +} + +static ArrayTuple +vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon) { + ArrayTuple res = {NULL, 0, false}; + IndexTuple *vec; + SplitedPageLayout *dist = NULL, + *ptr; + int i, veclen=0; + BlockNumber blkno = BufferGetBlockNumber(buffer); + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + vec = gistextractpage(tempPage, &veclen); + vec = gistjoinvector(vec, &veclen, addon, curlenaddon); + dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate)); + + MemoryContextSwitchTo(oldCtx); + + if (blkno != GIST_ROOT_BLKNO) { + /* if non-root split then we should not allocate new buffer */ + dist->buffer = buffer; + dist->page = tempPage; + /* during vacuum we never split leaf page */ + GistPageGetOpaque(dist->page)->flags = 0; + } else + pfree(tempPage); + + res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen); + res.ituplen = 0; + + /* make new pages and fills them */ + for (ptr = dist; ptr; ptr = ptr->next) { + char *data; + + if ( ptr->buffer == InvalidBuffer ) { + ptr->buffer = gistNewBuffer( gv->index ); + GISTInitBuffer( ptr->buffer, 0 ); + ptr->page = BufferGetPage(ptr->buffer); + } + ptr->block.blkno = BufferGetBlockNumber( ptr->buffer ); + + data = (char*)(ptr->list); + for(i=0;iblock.num;i++) { + if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber ) + elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index)); + data += IndexTupleSize((IndexTuple)data); + } + + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup)); + memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) ); + res.ituplen++; + } + + START_CRIT_SECTION(); + + for (ptr = dist; ptr; ptr = ptr->next) { + MarkBufferDirty(ptr->buffer); + GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber; + } + + /* restore splitted non-root page */ + if (blkno != GIST_ROOT_BLKNO) { + PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) ); + dist->page = BufferGetPage( dist->buffer ); + } + + if (!gv->index->rd_istemp) + { + XLogRecPtr recptr; + XLogRecData *rdata; + ItemPointerData key; /* set key for incomplete + * insert */ + char *xlinfo; + + ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + rdata = formSplitRdata(gv->index->rd_node, blkno, + false, &key, dist); + xlinfo = rdata->data; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + for (ptr = dist; ptr; ptr = ptr->next) + { + PageSetLSN(BufferGetPage(ptr->buffer), recptr); + PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + } + + pfree(xlinfo); + pfree(rdata); + } + else + { + for (ptr = dist; ptr; ptr = ptr->next) + PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); + } + + for (ptr = dist; ptr; ptr = ptr->next) + { + /* we must keep the buffer pin on the head page */ + if (BufferGetBlockNumber(ptr->buffer) != blkno) + UnlockReleaseBuffer( ptr->buffer ); + } + + if (blkno == GIST_ROOT_BLKNO) + { + ItemPointerData key; /* set key for incomplete + * insert */ + + ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); + } + + END_CRIT_SECTION(); + + MemoryContextReset(gv->opCtx); + + return res; +} static ArrayTuple gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) { ArrayTuple res = {NULL, 0, false}; Buffer buffer; - Page page; + Page page, tempPage = NULL; OffsetNumber i, maxoff; ItemId iid; int lenaddon = 4, curlenaddon = 0, - ntodelete = 0; + nOffToDelete = 0, + nBlkToDelete = 0; IndexTuple idxtuple, *addon = NULL; bool needwrite = false; - OffsetNumber todelete[MaxOffsetNumber]; + OffsetNumber offToDelete[MaxOffsetNumber]; + BlockNumber blkToDelete[MaxOffsetNumber]; ItemPointerData *completed = NULL; int ncompleted = 0, lencompleted = 16; @@ -76,12 +288,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) page = (Page) BufferGetPage(buffer); maxoff = PageGetMaxOffsetNumber(page); - /* - * XXX need to reduce scope of changes to page so we can make this - * critical section less extensive - */ - START_CRIT_SECTION(); - if (GistPageIsLeaf(page)) { if (GistTuplesDeleted(page)) @@ -92,13 +298,16 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted); addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon); + /* get copy of page to work */ + tempPage = GistPageGetCopyPage(page); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ArrayTuple chldtuple; bool needchildunion; - iid = PageGetItemId(page, i); - idxtuple = (IndexTuple) PageGetItem(page, iid); + iid = PageGetItemId(tempPage, i); + idxtuple = (IndexTuple) PageGetItem(tempPage, iid); needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false; if (needchildunion) @@ -109,14 +318,19 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) needchildunion); if (chldtuple.ituplen || chldtuple.emptypage) { - PageIndexTupleDelete(page, i); - todelete[ntodelete++] = i; + /* update tuple or/and inserts new */ + if ( chldtuple.emptypage ) + blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); + offToDelete[nOffToDelete++] = i; + PageIndexTupleDelete(tempPage, i); i--; maxoff--; needwrite = needunion = true; if (chldtuple.ituplen) { + + Assert( chldtuple.emptypage == false ); while (curlenaddon + chldtuple.ituplen >= lenaddon) { lenaddon *= 2; @@ -150,200 +364,102 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) } } } + + Assert( maxoff == PageGetMaxOffsetNumber(tempPage) ); if (curlenaddon) { /* insert updated tuples */ - if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber)) - { + if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber)) { /* there is no space on page to insert tuples */ - IndexTuple *vec; - SplitedPageLayout *dist = NULL, - *ptr; - int i, veclen=0; - MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); - - vec = gistextractbuffer(buffer, &veclen); - vec = gistjoinvector(vec, &veclen, addon, curlenaddon); - dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate)); - - MemoryContextSwitchTo(oldCtx); - - if (blkno != GIST_ROOT_BLKNO) { - /* if non-root split then we should not allocate new buffer */ - dist->buffer = buffer; - dist->page = BufferGetPage(dist->buffer); - GistPageGetOpaque(dist->page)->flags = 0; - } - - res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen); - res.ituplen = 0; - - /* make new pages and fills them */ - for (ptr = dist; ptr; ptr = ptr->next) { - char *data; - - if ( ptr->buffer == InvalidBuffer ) { - ptr->buffer = gistNewBuffer( gv->index ); - GISTInitBuffer( ptr->buffer, 0 ); - ptr->page = BufferGetPage(ptr->buffer); - } - ptr->block.blkno = BufferGetBlockNumber( ptr->buffer ); - - data = (char*)(ptr->list); - for(i=0;iblock.num;i++) { - if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber ) - elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index)); - data += IndexTupleSize((IndexTuple)data); - } - - ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); - res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup)); - memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) ); - res.ituplen++; - - MarkBufferDirty(ptr->buffer); - } - - if (!gv->index->rd_istemp) - { - XLogRecPtr recptr; - XLogRecData *rdata; - ItemPointerData key; /* set key for incomplete - * insert */ - char *xlinfo; - - ItemPointerSet(&key, blkno, TUPLE_IS_VALID); - - rdata = formSplitRdata(gv->index->rd_node, blkno, - false, &key, dist); - xlinfo = rdata->data; - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(BufferGetPage(ptr->buffer), recptr); - PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); - } - - pfree(xlinfo); - pfree(rdata); - } - else - { - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); - } - } - - for (ptr = dist; ptr; ptr = ptr->next) - { - /* we must keep the buffer pin on the head page */ - if (BufferGetBlockNumber(ptr->buffer) != blkno) - UnlockReleaseBuffer( ptr->buffer ); - } - - if (blkno == GIST_ROOT_BLKNO) - { - ItemPointerData key; /* set key for incomplete - * insert */ - - ItemPointerSet(&key, blkno, TUPLE_IS_VALID); - - oldCtx = MemoryContextSwitchTo(gv->opCtx); - gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); - MemoryContextSwitchTo(oldCtx); - } - - needwrite = false; - - MemoryContextReset(gv->opCtx); - - needunion = false; /* gistSplit already forms unions */ - } - else - { + res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon); + tempPage=NULL; /* vacuumSplitPage() free tempPage */ + needwrite = needunion = false; /* gistSplit already forms unions and writes pages */ + } else /* enough free space */ - gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber); - } + gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber); } } - if (needunion) - { - /* forms union for page or check empty */ - if (PageIsEmpty(page)) + /* + * If page is empty, we should remove pointer to it before + * deleting page (except root) + */ + + if ( blkno != GIST_ROOT_BLKNO && ( PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage)) ) ) { + /* + * New version of page is empty, so leave it unchanged, + * upper call will mark our page as deleted. + * In case of page split we never will be here... + * + * If page was empty it can't become non-empty during processing + */ + res.emptypage = true; + UnlockReleaseBuffer(buffer); + } else { + /* write page and remove its childs if it need */ + + START_CRIT_SECTION(); + + if ( tempPage && needwrite ) { + PageRestoreTempPage(tempPage, page); + tempPage = NULL; + } + + /* Empty index */ + if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO ) { - if (blkno == GIST_ROOT_BLKNO) + needwrite = true; + GistPageSetLeaf(page); + } + + + if (needwrite) + { + MarkBufferDirty(buffer); + GistClearTuplesDeleted(page); + + if (!gv->index->rd_istemp) { - needwrite = true; - GistPageSetLeaf(page); + XLogRecData *rdata; + XLogRecPtr recptr; + char *xlinfo; + + rdata = formUpdateRdata(gv->index->rd_node, buffer, + offToDelete, nOffToDelete, + addon, curlenaddon, NULL); + xlinfo = rdata->next->data; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + pfree(xlinfo); + pfree(rdata); } else - { - needwrite = true; - res.emptypage = true; - GistPageSetDeleted(page); - gv->result->std.pages_deleted++; - } + PageSetLSN(page, XLogRecPtrForTemp); } - else + + END_CRIT_SECTION(); + + if ( needunion && !PageIsEmpty(page) ) { - IndexTuple *vec, - tmp; - int veclen = 0; - MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); - - vec = gistextractbuffer(buffer, &veclen); - tmp = gistunion(gv->index, vec, veclen, &(gv->giststate)); - MemoryContextSwitchTo(oldCtx); - res.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); res.ituplen = 1; - res.itup[0] = (IndexTuple) palloc(IndexTupleSize(tmp)); - memcpy(res.itup[0], tmp, IndexTupleSize(tmp)); - - ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno); - GistTupleSetValid(res.itup[0]); - - MemoryContextReset(gv->opCtx); + res.itup[0] = PageMakeUnionKey(gv, buffer); } + + UnlockReleaseBuffer(buffer); + + /* delete empty children, now we havn't any links to pointed subtrees */ + for(i=0;iindex->rd_istemp) + gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted); } - if (needwrite) - { - MarkBufferDirty(buffer); - GistClearTuplesDeleted(page); - - if (!gv->index->rd_istemp) - { - XLogRecData *rdata; - XLogRecPtr recptr; - char *xlinfo; - - rdata = formUpdateRdata(gv->index->rd_node, buffer, - todelete, ntodelete, res.emptypage, - addon, curlenaddon, NULL); - xlinfo = rdata->data; - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - - pfree(xlinfo); - pfree(rdata); - } - else - PageSetLSN(page, XLogRecPtrForTemp); - } - - END_CRIT_SECTION(); - - UnlockReleaseBuffer(buffer); - - if (ncompleted && !gv->index->rd_istemp) - gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted); for (i = 0; i < curlenaddon; i++) pfree(addon[i]); @@ -351,6 +467,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) pfree(addon); if (completed) pfree(completed); + if (tempPage) + pfree(tempPage); + return res; } @@ -627,10 +746,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) gistxlogPageUpdate *xlinfo; rdata = formUpdateRdata(rel->rd_node, buffer, - todelete, ntodelete, false, + todelete, ntodelete, NULL, 0, NULL); - xlinfo = (gistxlogPageUpdate *) rdata->data; + xlinfo = (gistxlogPageUpdate *) rdata->next->data; recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr); diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index a029d8f1ec..01dab119b2 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -209,41 +209,33 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) return; } - if (xlrec.data->isemptypage) + if (isnewroot) + GISTInitBuffer(buffer, 0); + else if (xlrec.data->ntodelete) { - while (!PageIsEmpty(page)) - PageIndexTupleDelete(page, FirstOffsetNumber); + int i; - if (xlrec.data->blkno == GIST_ROOT_BLKNO) - GistPageSetLeaf(page); - else - GistPageSetDeleted(page); + for (i = 0; i < xlrec.data->ntodelete; i++) + PageIndexTupleDelete(page, xlrec.todelete[i]); + if (GistPageIsLeaf(page)) + GistMarkTuplesDeleted(page); } - else - { - if (isnewroot) - GISTInitBuffer(buffer, 0); - else if (xlrec.data->ntodelete) - { - int i; - for (i = 0; i < xlrec.data->ntodelete; i++) - PageIndexTupleDelete(page, xlrec.todelete[i]); - if (GistPageIsLeaf(page)) - GistMarkTuplesDeleted(page); - } + /* add tuples */ + if (xlrec.len > 0) + gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber); - /* add tuples */ - if (xlrec.len > 0) - gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber); + /* + * special case: leafpage, nothing to insert, nothing to delete, then + * vacuum marks page + */ + if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) + GistClearTuplesDeleted(page); - /* - * special case: leafpage, nothing to insert, nothing to delete, then - * vacuum marks page - */ - if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) - GistClearTuplesDeleted(page); - } + if ( !GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO ) + /* all links on non-leaf root page was deleted by vacuum full, + so root page becomes a leaf */ + GistPageSetLeaf(page); GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; PageSetLSN(page, lsn); @@ -252,6 +244,29 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) UnlockReleaseBuffer(buffer); } +static void +gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) +{ + gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record); + Relation reln; + Buffer buffer; + Page page; + + reln = XLogOpenRelation(xldata->node); + buffer = XLogReadBuffer(reln, xldata->blkno, false); + if (!BufferIsValid(buffer)) + return; + + GISTInitBuffer( buffer, 0 ); + page = (Page) BufferGetPage(buffer); + GistPageSetDeleted(page); + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); +} + static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { @@ -382,6 +397,9 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_GIST_PAGE_UPDATE: gistRedoPageUpdateRecord(lsn, record, false); break; + case XLOG_GIST_PAGE_DELETE: + gistRedoPageDeleteRecord(lsn, record); + break; case XLOG_GIST_NEW_ROOT: gistRedoPageUpdateRecord(lsn, record, true); break; @@ -405,8 +423,10 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) static void out_target(StringInfo buf, RelFileNode node, ItemPointerData key) { - appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u", - node.spcNode, node.dbNode, node.relNode, + appendStringInfo(buf, "rel %u/%u/%u", + node.spcNode, node.dbNode, node.relNode); + if ( ItemPointerIsValid( &key ) ) + appendStringInfo(buf, "; tid %u/%u", ItemPointerGetBlockNumber(&key), ItemPointerGetOffsetNumber(&key)); } @@ -418,6 +438,14 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) appendStringInfo(buf, "; block number %u", xlrec->blkno); } +static void +out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec) +{ + appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u", + xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, + xlrec->blkno); +} + static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { @@ -438,6 +466,9 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) appendStringInfo(buf, "page_update: "); out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); break; + case XLOG_GIST_PAGE_DELETE: + out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); + break; case XLOG_GIST_NEW_ROOT: appendStringInfo(buf, "new_root: "); out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); @@ -643,7 +674,7 @@ gistContinueInsert(gistIncompleteInsert *insert) * we split root, just copy tuples from old root to new * page */ - parentitup = gistextractbuffer(buffers[numbuffer - 1], + parentitup = gistextractpage(pages[numbuffer - 1], &pituplen); /* sanity check */ @@ -796,7 +827,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, */ XLogRecData * formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, bool emptypage, + OffsetNumber *todelete, int ntodelete, IndexTuple *itup, int ituplen, ItemPointer key) { XLogRecData *rdata; @@ -804,35 +835,37 @@ formUpdateRdata(RelFileNode node, Buffer buffer, int cur, i; - /* ugly wart in API: emptypage causes us to ignore other inputs */ - if (emptypage) - ntodelete = ituplen = 0; - - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen)); + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; - xlrec->isemptypage = emptypage; + if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); - rdata[0].data = (char *) xlrec; - rdata[0].len = sizeof(gistxlogPageUpdate); - rdata[0].buffer = InvalidBuffer; + rdata[0].buffer = buffer; + rdata[0].buffer_std = true; + rdata[0].data = NULL; + rdata[0].len = 0; rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) todelete; - rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + rdata[1].data = (char *) xlrec; + rdata[1].len = sizeof(gistxlogPageUpdate); + rdata[1].buffer = InvalidBuffer; + rdata[1].next = &(rdata[2]); + + rdata[2].data = (char *) todelete; + rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = NULL; /* new tuples */ - cur = 2; + cur = 3; for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 7e9469f000..a866277fe9 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -85,20 +85,21 @@ extern const XLogRecPtr XLogRecPtrForTemp; #define XLOG_GIST_PAGE_SPLIT 0x30 #define XLOG_GIST_INSERT_COMPLETE 0x40 #define XLOG_GIST_CREATE_INDEX 0x50 +#define XLOG_GIST_PAGE_DELETE 0x60 typedef struct gistxlogPageUpdate { RelFileNode node; BlockNumber blkno; - uint16 ntodelete; - bool isemptypage; - /* * It used to identify completeness of insert. Sets to leaf itup */ ItemPointerData key; + /* number of deleted offsets */ + uint16 ntodelete; + /* * follow: 1. todelete OffsetNumbers 2. tuples to insert */ @@ -131,6 +132,11 @@ typedef struct gistxlogInsertComplete /* follows ItemPointerData key to clean */ } gistxlogInsertComplete; +typedef struct gistxlogPageDelete +{ + RelFileNode node; + BlockNumber blkno; +} gistxlogPageDelete; /* SplitedPageLayout - gistSplit function result */ typedef struct SplitedPageLayout @@ -249,7 +255,7 @@ extern void gist_xlog_cleanup(void); extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, bool emptypage, + OffsetNumber *todelete, int ntodelete, IndexTuple *itup, int ituplen, ItemPointer key); extern XLogRecData *formSplitRdata(RelFileNode node, @@ -273,7 +279,7 @@ extern void gistcheckpage(Relation rel, Buffer buf); extern Buffer gistNewBuffer(Relation r); extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, int len, OffsetNumber off); -extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ ); +extern IndexTuple *gistextractpage(Page page, int *len /* out */ ); extern IndexTuple *gistjoinvector( IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen);