From 89395bfa6f2fafccec10be377fcf759030910654 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 30 Mar 2006 23:03:10 +0000 Subject: [PATCH] Improve gist XLOG code to follow the coding rules needed to prevent torn-page problems. This introduces some issues of its own, mainly that there are now some critical sections of unreasonably broad scope, but it's a step forward anyway. Further cleanup will require some code refactoring that I'd prefer to get Oleg and Teodor involved in. --- src/backend/access/gist/gist.c | 105 +++++----- src/backend/access/gist/gistvacuum.c | 53 +++-- src/backend/access/gist/gistxlog.c | 299 ++++++++++++--------------- src/include/access/gist_private.h | 34 ++- 4 files changed, 230 insertions(+), 261 deletions(-) diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index de880831bf..d997db37ef 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -90,6 +90,7 @@ gistbuild(PG_FUNCTION_ARGS) double reltuples; GISTBuildState buildstate; Buffer buffer; + Page page; /* * We expect to be called exactly once for any index relation. If that's @@ -104,33 +105,33 @@ gistbuild(PG_FUNCTION_ARGS) /* initialize the root page */ buffer = gistNewBuffer(index); + Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); + page = BufferGetPage(buffer); + + START_CRIT_SECTION(); + GISTInitBuffer(buffer, F_LEAF); if (!index->rd_istemp) { XLogRecPtr recptr; XLogRecData rdata; - Page page; - rdata.buffer = InvalidBuffer; rdata.data = (char *) &(index->rd_node); rdata.len = sizeof(RelFileNode); + rdata.buffer = InvalidBuffer; rdata.next = NULL; - page = BufferGetPage(buffer); - - START_CRIT_SECTION(); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); } else - PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp); + PageSetLSN(page, XLogRecPtrForTemp); LockBuffer(buffer, GIST_UNLOCK); WriteBuffer(buffer); + END_CRIT_SECTION(); + /* build the index */ buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs; buildstate.indtuples = 0; @@ -305,6 +306,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) bool is_splitted = false; bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + /* + * XXX this code really ought to work by locking, but not modifying, + * all the buffers it needs; then starting a critical section; then + * modifying the buffers in an already-determined way and writing an + * XLOG record to reflect that. Since it doesn't, we've got to put + * a critical section around the entire process, which is horrible + * from a robustness point of view. + */ + START_CRIT_SECTION(); if (!is_leaf) @@ -312,6 +322,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * This node's key has been modified, either because a child split * occurred or because we needed to adjust our key for an insert in a * child node. Therefore, remove the old version of this node's key. + * + * Note: for WAL replay, in the non-split case we handle this by + * setting up a one-element todelete array; in the split case, it's + * handled implicitly because the tuple vector passed to gistSplit + * won't include this tuple. */ PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); @@ -336,9 +351,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) XLogRecData *rdata; rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, - &(state->key), dist); - - START_CRIT_SECTION(); + is_leaf, &(state->key), dist); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ptr = dist; @@ -348,8 +361,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); ptr = ptr->next; } - - END_CRIT_SECTION(); } else { @@ -410,7 +421,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) else ourpage = dist; - /* now gets all needed data, and sets nsn's */ page = (Page) BufferGetPage(ourpage->buffer); opaque = GistPageGetOpaque(page); @@ -437,8 +447,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) WriteBuffer(ptr->buffer); ptr = ptr->next; } + + WriteNoReleaseBuffer(state->stack->buffer); } - WriteNoReleaseBuffer(state->stack->buffer); + + END_CRIT_SECTION(); } else { @@ -451,7 +464,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) if (!state->r->rd_istemp) { OffsetNumber noffs = 0, - offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)]; + offs[1]; XLogRecPtr recptr; XLogRecData *rdata; @@ -462,17 +475,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) noffs = 1; } - rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno, - offs, noffs, false, state->itup, state->ituplen, + rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, + offs, noffs, false, + state->itup, state->ituplen, &(state->key)); - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(state->stack->page, recptr); PageSetTLI(state->stack->page, ThisTimeLineID); - - END_CRIT_SECTION(); } else PageSetLSN(state->stack->page, XLogRecPtrForTemp); @@ -481,6 +491,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) state->needInsertComplete = false; WriteNoReleaseBuffer(state->stack->buffer); + END_CRIT_SECTION(); + if (!is_leaf) /* small optimization: inform scan ablout * deleting... */ gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, @@ -636,30 +648,14 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) } /* - * Should have the same interface as XLogReadBuffer - */ -static Buffer -gistReadAndLockBuffer(Relation r, BlockNumber blkno) -{ - Buffer buffer = ReadBuffer(r, blkno); - - LockBuffer(buffer, GIST_SHARE); - return buffer; -} - -/* - * Traverse the tree to find path from root page. + * Traverse the tree to find path from root page to specified "child" block. * * returns from the begining of closest parent; * - * Function is used in both regular and recovery mode, so must work with - * different read functions (gistReadAndLockBuffer and XLogReadBuffer) - * * To prevent deadlocks, this should lock only one page simultaneously. */ GISTInsertStack * -gistFindPath(Relation r, BlockNumber child, - Buffer (*myReadBuffer) (Relation, BlockNumber)) +gistFindPath(Relation r, BlockNumber child) { Page page; Buffer buffer; @@ -677,7 +673,8 @@ gistFindPath(Relation r, BlockNumber child, while (top && top->blkno != child) { - buffer = myReadBuffer(r, top->blkno); /* locks buffer */ + buffer = ReadBuffer(r, top->blkno); + LockBuffer(buffer, GIST_SHARE); gistcheckpage(r, buffer); page = (Page) BufferGetPage(buffer); @@ -833,7 +830,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) } /* ok, find new path */ - ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer); + ptr = parent = gistFindPath(r, child->blkno); Assert(ptr != NULL); /* read all buffers as expected by caller */ @@ -1192,27 +1189,31 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); page = BufferGetPage(buffer); - GISTInitBuffer(buffer, 0); + START_CRIT_SECTION(); + + GISTInitBuffer(buffer, 0); /* XXX not F_LEAF? */ gistfillbuffer(r, page, itup, len, FirstOffsetNumber); + if (!r->rd_istemp) { XLogRecPtr recptr; XLogRecData *rdata; - rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO, - NULL, 0, false, itup, len, key); - - START_CRIT_SECTION(); + rdata = formUpdateRdata(r->rd_node, buffer, + NULL, 0, false, + itup, len, key); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); } else PageSetLSN(page, XLogRecPtrForTemp); + + WriteNoReleaseBuffer(buffer); + + END_CRIT_SECTION(); } void diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index 664ba47e40..e7925c2c15 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.16 2006/03/05 15:58:20 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.17 2006/03/30 23:03:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -80,6 +80,12 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) page = (Page) BufferGetPage(buffer); maxoff = PageGetMaxOffsetNumber(page); + /* + * XXX need to reduce scope of changes to page so we can make this + * critical section less extensive + */ + START_CRIT_SECTION(); + if (GistPageIsLeaf(page)) { if (GistTuplesDeleted(page)) @@ -188,11 +194,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ItemPointerSet(&key, blkno, TUPLE_IS_VALID); rdata = formSplitRdata(gv->index->rd_node, blkno, - &key, dist); + false, &key, dist); xlinfo = rdata->data; - START_CRIT_SECTION(); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ptr = dist; while (ptr) @@ -202,7 +206,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ptr = ptr->next; } - END_CRIT_SECTION(); pfree(xlinfo); pfree(rdata); } @@ -235,8 +238,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) oldCtx = MemoryContextSwitchTo(gv->opCtx); gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); MemoryContextSwitchTo(oldCtx); - - WriteNoReleaseBuffer(buffer); } needwrite = false; @@ -302,15 +303,14 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) XLogRecPtr recptr; char *xlinfo; - rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete, - res.emptypage, addon, curlenaddon, NULL); + rdata = formUpdateRdata(gv->index->rd_node, buffer, + todelete, ntodelete, res.emptypage, + addon, curlenaddon, NULL); xlinfo = rdata->data; - START_CRIT_SECTION(); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - END_CRIT_SECTION(); pfree(xlinfo); pfree(rdata); @@ -322,6 +322,8 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) else ReleaseBuffer(buffer); + END_CRIT_SECTION(); + if (ncompleted && !gv->index->rd_istemp) gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted); @@ -579,6 +581,17 @@ gistbulkdelete(PG_FUNCTION_ARGS) */ pushStackIfSplited(page, stack); + /* + * Remove deletable tuples from page + * + * XXX try to make this critical section shorter. Could do it + * by separating the callback loop from the actual tuple deletion, + * but that would affect the definition of the todelete[] array + * passed into the WAL record (because the indexes would all be + * pre-deletion). + */ + START_CRIT_SECTION(); + maxoff = PageGetMaxOffsetNumber(page); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) @@ -608,17 +621,17 @@ gistbulkdelete(PG_FUNCTION_ARGS) { XLogRecData *rdata; XLogRecPtr recptr; - gistxlogEntryUpdate *xlinfo; + gistxlogPageUpdate *xlinfo; - rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete, - false, NULL, 0, NULL); - xlinfo = (gistxlogEntryUpdate *) rdata->data; + rdata = formUpdateRdata(rel->rd_node, buffer, + todelete, ntodelete, false, + NULL, 0, + NULL); + xlinfo = (gistxlogPageUpdate *) rdata->data; - START_CRIT_SECTION(); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - END_CRIT_SECTION(); pfree(xlinfo); pfree(rdata); @@ -627,6 +640,8 @@ gistbulkdelete(PG_FUNCTION_ARGS) PageSetLSN(page, XLogRecPtrForTemp); WriteNoReleaseBuffer(buffer); } + + END_CRIT_SECTION(); } else { diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 9a15061484..12a521c75c 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.13 2006/03/30 23:03:10 tgl Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -25,11 +25,11 @@ typedef struct { - gistxlogEntryUpdate *data; + gistxlogPageUpdate *data; int len; IndexTuple *itup; OffsetNumber *todelete; -} EntryUpdateRecord; +} PageUpdateRecord; typedef struct { @@ -58,16 +58,15 @@ typedef struct gistIncompleteInsert } gistIncompleteInsert; -MemoryContext opCtx; -MemoryContext insertCtx; +static MemoryContext opCtx; /* working memory for operations */ +static MemoryContext insertCtx; /* holds incomplete_inserts list */ static List *incomplete_inserts; -#define ItemPointerEQ( a, b ) \ - ( \ - ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ - ) +#define ItemPointerEQ(a, b) \ + ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ + ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) + static void pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, @@ -101,7 +100,13 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, } Assert(ninsert->lenblk > 0); - incomplete_inserts = lappend(incomplete_inserts, ninsert); + /* + * Stick the new incomplete insert onto the front of the list, not the + * back. This is so that gist_xlog_cleanup will process incompletions + * in last-in-first-out order. + */ + incomplete_inserts = lcons(ninsert, incomplete_inserts); + MemoryContextSwitchTo(oldCxt); } @@ -116,10 +121,9 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) { - /* found */ - pfree(insert->blkno); incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); + pfree(insert->blkno); pfree(insert); break; } @@ -127,25 +131,25 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) } static void -decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) +decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; int i = 0, addpath = 0; - decoded->data = (gistxlogEntryUpdate *) begin; + decoded->data = (gistxlogPageUpdate *) begin; if (decoded->data->ntodelete) { - decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath); + decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); } else decoded->todelete = NULL; decoded->len = 0; - ptr = begin + sizeof(gistxlogEntryUpdate) + addpath; + ptr = begin + sizeof(gistxlogPageUpdate) + addpath; while (ptr - begin < record->xl_len) { decoded->len++; @@ -154,7 +158,7 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); - ptr = begin + sizeof(gistxlogEntryUpdate) + addpath; + ptr = begin + sizeof(gistxlogPageUpdate) + addpath; while (ptr - begin < record->xl_len) { decoded->itup[i] = (IndexTuple) ptr; @@ -167,38 +171,30 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) * redo any page update (except page split) */ static void -gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) +gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { - EntryUpdateRecord xlrec; + PageUpdateRecord xlrec; Relation reln; Buffer buffer; Page page; - decodeEntryUpdateRecord(&xlrec, record); + /* nothing to do if whole page was backed up (and no info to do it with) */ + if (record->xl_info & XLR_BKP_BLOCK_1) + return; + + decodePageUpdateRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); buffer = XLogReadBuffer(reln, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", xlrec.data->blkno); + return; page = (Page) BufferGetPage(buffer); - if (isnewroot) + if (XLByteLE(lsn, PageGetLSN(page))) { - if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } - } - else - { - if (XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; } if (xlrec.data->isemptypage) @@ -237,9 +233,9 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) GistClearTuplesDeleted(page); } + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); @@ -294,38 +290,21 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; int i; - int flags = 0; + int flags; decodePageSplitRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); - - /* first of all wee need get F_LEAF flag from original page */ - buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false); - if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", xlrec.data->origblkno); - page = (Page) BufferGetPage(buffer); - flags = (GistPageIsLeaf(page)) ? F_LEAF : 0; - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); + flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; - bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false; - buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage); - if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", newpage->header->blkno); + buffer = XLogReadBuffer(reln, newpage->header->blkno, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); - if (XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - continue; - } - /* ok, clear buffer */ GISTInitBuffer(buffer, flags); @@ -399,12 +378,11 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { - case XLOG_GIST_ENTRY_UPDATE: - case XLOG_GIST_ENTRY_DELETE: - gistRedoEntryUpdateRecord(lsn, record, false); + case XLOG_GIST_PAGE_UPDATE: + gistRedoPageUpdateRecord(lsn, record, false); break; case XLOG_GIST_NEW_ROOT: - gistRedoEntryUpdateRecord(lsn, record, true); + gistRedoPageUpdateRecord(lsn, record, true); break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); @@ -433,7 +411,7 @@ out_target(StringInfo buf, RelFileNode node, ItemPointerData key) } static void -out_gistxlogEntryUpdate(StringInfo buf, gistxlogEntryUpdate *xlrec) +out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { out_target(buf, xlrec->node, xlrec->key); appendStringInfo(buf, "; block number %u", xlrec->blkno); @@ -455,17 +433,13 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) switch (info) { - case XLOG_GIST_ENTRY_UPDATE: - appendStringInfo(buf, "entry_update: "); - out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec); - break; - case XLOG_GIST_ENTRY_DELETE: - appendStringInfo(buf, "entry_delete: "); - out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec); + case XLOG_GIST_PAGE_UPDATE: + appendStringInfo(buf, "page_update: "); + out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); break; case XLOG_GIST_NEW_ROOT: appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogEntryUpdate *) rec)->node, ((gistxlogEntryUpdate *) rec)->key); + out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); @@ -506,60 +480,47 @@ gist_form_invalid_tuple(BlockNumber blkno) return tuple; } -static Buffer -gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno) -{ - Buffer buffer = XLogReadBuffer(r, blkno, false); - - if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", blkno); - - return buffer; -} - static void -gixtxlogFindPath(Relation index, gistIncompleteInsert *insert) +gistxlogFindPath(Relation index, gistIncompleteInsert *insert) { GISTInsertStack *top; insert->pathlen = 0; insert->path = NULL; - if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL) + if ((top = gistFindPath(index, insert->origblkno)) != NULL) { int i; - GISTInsertStack *ptr = top; + GISTInsertStack *ptr; - while (ptr) - { + for (ptr = top; ptr; ptr = ptr->parent) insert->pathlen++; - ptr = ptr->parent; - } insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); i = 0; - ptr = top; - while (ptr) - { - insert->path[i] = ptr->blkno; - i++; - ptr = ptr->parent; - } + for (ptr = top; ptr; ptr = ptr->parent) + insert->path[i++] = ptr->blkno; } else elog(LOG, "lost parent for block %u", insert->origblkno); } /* - * Continue insert after crash. In normal situation, there isn't any incomplete - * inserts, but if it might be after crash, WAL may has not a record of completetion. + * Continue insert after crash. In normal situations, there aren't any + * incomplete inserts, but if a crash occurs partway through an insertion + * sequence, we'll need to finish making the index valid at the end of WAL + * replay. + * + * Note that we assume the index is now in a valid state, except for the + * unfinished insertion. In particular it's safe to invoke gistFindPath(); + * there shouldn't be any garbage pages for it to run into. * * Although stored LSN in gistIncompleteInsert is a LSN of child page, * we can compare it with LSN of parent, because parent is always locked * while we change child page (look at gistmakedeal). So if parent's LSN is - * lesser than stored lsn then changes in parent doesn't do yet. + * less than stored lsn then changes in parent aren't done yet. */ static void gistContinueInsert(gistIncompleteInsert *insert) @@ -602,6 +563,12 @@ gistContinueInsert(gistIncompleteInsert *insert) LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); + + /* + * XXX fall out to avoid making LOG message at bottom of routine. + * I think the logic for when to emit that message is all wrong... + */ + return; } else { @@ -610,7 +577,7 @@ gistContinueInsert(gistIncompleteInsert *insert) int numbuffer; /* construct path */ - gixtxlogFindPath(index, insert); + gistxlogFindPath(index, insert); Assert(insert->pathlen > 0); @@ -625,9 +592,8 @@ gistContinueInsert(gistIncompleteInsert *insert) childfound = 0; numbuffer = 1; - buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false); - if (!BufferIsValid(buffers[numbuffer - 1])) - elog(PANIC, "block %u unfound", insert->path[i]); + buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]); + LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE); pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]); if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1]))) @@ -661,10 +627,9 @@ gistContinueInsert(gistIncompleteInsert *insert) if (gistnospace(pages[numbuffer - 1], itup, lenitup)) { - /* no space left on page, so we should split */ - buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true); - if (!BufferIsValid(buffers[numbuffer])) - elog(PANIC, "could not obtain new block"); + /* no space left on page, so we must split */ + buffers[numbuffer] = ReadBuffer(index, P_NEW); + LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber); @@ -678,7 +643,8 @@ gistContinueInsert(gistIncompleteInsert *insert) * we split root, just copy tuples from old root to new * page */ - parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen); + parentitup = gistextractbuffer(buffers[numbuffer - 1], + &pituplen); /* sanity check */ if (i + 1 != insert->pathlen) @@ -686,9 +652,8 @@ gistContinueInsert(gistIncompleteInsert *insert) RelationGetRelationName(index)); /* fill new page */ - buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true); - if (!BufferIsValid(buffers[numbuffer])) - elog(PANIC, "could not obtain new block"); + buffers[numbuffer] = ReadBuffer(index, P_NEW); + LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber); @@ -748,16 +713,10 @@ void gist_xlog_cleanup(void) { ListCell *l; - List *reverse = NIL; - MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); - - /* we should call gistContinueInsert in reverse order */ + MemoryContext oldCxt; + oldCxt = MemoryContextSwitchTo(opCtx); foreach(l, incomplete_inserts) - reverse = lappend(reverse, lfirst(l)); - - MemoryContextSwitchTo(opCtx); - foreach(l, reverse) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); @@ -772,10 +731,9 @@ gist_xlog_cleanup(void) XLogRecData * -formSplitRdata(RelFileNode node, BlockNumber blkno, +formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ItemPointer key, SplitedPageLayout *dist) { - XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; @@ -793,6 +751,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, xlrec->node = node; xlrec->origblkno = blkno; + xlrec->origleaf = page_is_leaf; xlrec->npage = (uint16) npage; if (key) xlrec->key = *key; @@ -825,68 +784,64 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, return rdata; } - +/* + * Construct the rdata array for an XLOG record describing a page update + * (deletion and/or insertion of tuples on a single index page). + * + * Note that both the todelete array and the tuples are marked as belonging + * to the target buffer; they need not be stored in XLOG if XLogInsert decides + * to log the whole buffer contents instead. Also, we take care that there's + * at least one rdata item referencing the buffer, even when ntodelete and + * ituplen are both zero; this ensures that XLogInsert knows about the buffer. + */ XLogRecData * -formUpdateRdata(RelFileNode node, BlockNumber blkno, +formUpdateRdata(RelFileNode node, Buffer buffer, OffsetNumber *todelete, int ntodelete, bool emptypage, IndexTuple *itup, int ituplen, ItemPointer key) { XLogRecData *rdata; - gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate)); + gistxlogPageUpdate *xlrec; + int cur, + i; + + /* ugly wart in API: emptypage causes us to ignore other inputs */ + if (emptypage) + ntodelete = ituplen = 0; + + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen)); + xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; - xlrec->blkno = blkno; + xlrec->blkno = BufferGetBlockNumber(buffer); + xlrec->ntodelete = ntodelete; + xlrec->isemptypage = emptypage; if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); - if (emptypage) + rdata[0].data = (char *) xlrec; + rdata[0].len = sizeof(gistxlogPageUpdate); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + rdata[1].data = (char *) todelete; + rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; + rdata[1].next = NULL; + + /* new tuples */ + cur = 2; + for (i = 0; i < ituplen; i++) { - xlrec->isemptypage = true; - xlrec->ntodelete = 0; - - rdata = (XLogRecData *) palloc(sizeof(XLogRecData)); - rdata->buffer = InvalidBuffer; - rdata->data = (char *) xlrec; - rdata->len = sizeof(gistxlogEntryUpdate); - rdata->next = NULL; - } - else - { - int cur = 1, - i; - - xlrec->isemptypage = false; - xlrec->ntodelete = ntodelete; - - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen)); - - rdata->buffer = InvalidBuffer; - rdata->data = (char *) xlrec; - rdata->len = sizeof(gistxlogEntryUpdate); - rdata->next = NULL; - - if (ntodelete) - { - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) todelete; - rdata[cur].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); - rdata[cur].next = NULL; - cur++; - } - - /* new tuples */ - for (i = 0; i < ituplen; i++) - { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) (itup[i]); - rdata[cur].len = IndexTupleSize(itup[i]); - rdata[cur].next = NULL; - rdata[cur - 1].next = &(rdata[cur]); - cur++; - } + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = (char *) (itup[i]); + rdata[cur].len = IndexTupleSize(itup[i]); + rdata[cur].buffer = buffer; + rdata[cur].buffer_std = true; + rdata[cur].next = NULL; + cur++; } return rdata; diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 3b072da637..1bfc90abbc 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.11 2006/03/24 04:32:13 tgl Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -80,11 +80,13 @@ typedef GISTScanOpaqueData *GISTScanOpaque; /* XLog stuff */ extern const XLogRecPtr XLogRecPtrForTemp; -#define XLOG_GIST_ENTRY_UPDATE 0x00 -#define XLOG_GIST_ENTRY_DELETE 0x10 -#define XLOG_GIST_NEW_ROOT 0x20 +#define XLOG_GIST_PAGE_UPDATE 0x00 +#define XLOG_GIST_NEW_ROOT 0x20 +#define XLOG_GIST_PAGE_SPLIT 0x30 +#define XLOG_GIST_INSERT_COMPLETE 0x40 +#define XLOG_GIST_CREATE_INDEX 0x50 -typedef struct gistxlogEntryUpdate +typedef struct gistxlogPageUpdate { RelFileNode node; BlockNumber blkno; @@ -100,17 +102,16 @@ typedef struct gistxlogEntryUpdate /* * follow: 1. todelete OffsetNumbers 2. tuples to insert */ -} gistxlogEntryUpdate; - -#define XLOG_GIST_PAGE_SPLIT 0x30 +} gistxlogPageUpdate; typedef struct gistxlogPageSplit { RelFileNode node; BlockNumber origblkno; /* splitted page */ + bool origleaf; /* was splitted page a leaf page? */ uint16 npage; - /* see comments on gistxlogEntryUpdate */ + /* see comments on gistxlogPageUpdate */ ItemPointerData key; /* @@ -118,22 +119,19 @@ typedef struct gistxlogPageSplit */ } gistxlogPageSplit; -#define XLOG_GIST_INSERT_COMPLETE 0x40 - typedef struct gistxlogPage { BlockNumber blkno; - int num; + int num; /* number of index tuples following */ } gistxlogPage; -#define XLOG_GIST_CREATE_INDEX 0x50 - typedef struct gistxlogInsertComplete { RelFileNode node; /* follows ItemPointerData key to clean */ } gistxlogInsertComplete; + /* SplitedPageLayout - gistSplit function result */ typedef struct SplitedPageLayout { @@ -239,8 +237,7 @@ extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, It extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, int *len, SplitedPageLayout **dist, GISTSTATE *giststate); -extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child, - Buffer (*myReadBuffer) (Relation, BlockNumber)); +extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child); /* gistxlog.c */ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); @@ -249,11 +246,12 @@ extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); -extern XLogRecData *formUpdateRdata(RelFileNode node, BlockNumber blkno, +extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, OffsetNumber *todelete, int ntodelete, bool emptypage, IndexTuple *itup, int ituplen, ItemPointer key); -extern XLogRecData *formSplitRdata(RelFileNode node, BlockNumber blkno, +extern XLogRecData *formSplitRdata(RelFileNode node, + BlockNumber blkno, bool page_is_leaf, ItemPointer key, SplitedPageLayout *dist); extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);