diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 911c9a02a9..9a15061484 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.11 2006/03/24 04:32:12 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -177,9 +177,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) decodeEntryUpdateRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(false, reln, xlrec.data->blkno); + buffer = XLogReadBuffer(reln, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", xlrec.data->blkno); page = (Page) BufferGetPage(buffer); @@ -195,8 +193,6 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) } else { - if (PageIsNew((PageHeader) page)) - elog(PANIC, "uninitialized page %u", xlrec.data->blkno); if (XLByteLE(lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -302,17 +298,12 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) decodePageSplitRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); - if (!RelationIsValid(reln)) - return; /* first of all wee need get F_LEAF flag from original page */ - buffer = XLogReadBuffer(false, reln, xlrec.data->origblkno); + buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", xlrec.data->origblkno); page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "uninitialized page %u", xlrec.data->origblkno); - flags = (GistPageIsLeaf(page)) ? F_LEAF : 0; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); @@ -323,7 +314,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) NewPage *newpage = xlrec.page + i; bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false; - buffer = XLogReadBuffer(!isorigpage, reln, newpage->header->blkno); + buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", newpage->header->blkno); page = (Page) BufferGetPage(buffer); @@ -367,24 +358,15 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) Page page; reln = XLogOpenRelation(*node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(true, reln, GIST_ROOT_BLKNO); - if (!BufferIsValid(buffer)) - elog(PANIC, "root block unfound"); + buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); - if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } - GISTInitBuffer(buffer, F_LEAF); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } @@ -527,12 +509,10 @@ gist_form_invalid_tuple(BlockNumber blkno) static Buffer gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno) { - Buffer buffer = XLogReadBuffer(false, r, blkno); + Buffer buffer = XLogReadBuffer(r, blkno, false); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", blkno); - if (PageIsNew((PageHeader) (BufferGetPage(buffer)))) - elog(PANIC, "uninitialized page %u", blkno); return buffer; } @@ -590,8 +570,6 @@ gistContinueInsert(gistIncompleteInsert *insert) Relation index; index = XLogOpenRelation(insert->node); - if (!RelationIsValid(index)) - return; /* * needed vector itup never will be more than initial lenblkno+2, because @@ -606,29 +584,22 @@ gistContinueInsert(gistIncompleteInsert *insert) if (insert->origblkno == GIST_ROOT_BLKNO) { /* - * it was split root, so we should only make new root. it can't be + * it was split root, so we should only make new root. it can't be * simple insert into root, look at call pushIncompleteInsert in * gistRedoPageSplitRecord */ - Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); + Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true); Page page; - if (!BufferIsValid(buffer)) - elog(PANIC, "root block unfound"); - + Assert(BufferIsValid(buffer)); page = BufferGetPage(buffer); - if (XLByteLE(insert->lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } GISTInitBuffer(buffer, 0); - page = BufferGetPage(buffer); gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); + PageSetLSN(page, insert->lsn); PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } @@ -654,12 +625,10 @@ gistContinueInsert(gistIncompleteInsert *insert) childfound = 0; numbuffer = 1; - buffers[numbuffer - 1] = XLogReadBuffer(false, index, insert->path[i]); + buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false); if (!BufferIsValid(buffers[numbuffer - 1])) elog(PANIC, "block %u unfound", insert->path[i]); pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]); - if (PageIsNew((PageHeader) (pages[numbuffer - 1]))) - elog(PANIC, "uninitialized page %u", insert->path[i]); if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1]))) { @@ -693,7 +662,7 @@ gistContinueInsert(gistIncompleteInsert *insert) if (gistnospace(pages[numbuffer - 1], itup, lenitup)) { /* no space left on page, so we should split */ - buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); + buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true); if (!BufferIsValid(buffers[numbuffer])) elog(PANIC, "could not obtain new block"); GISTInitBuffer(buffers[numbuffer], 0); @@ -717,7 +686,7 @@ gistContinueInsert(gistIncompleteInsert *insert) RelationGetRelationName(index)); /* fill new page */ - buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); + buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true); if (!BufferIsValid(buffers[numbuffer])) elog(PANIC, "could not obtain new block"); GISTInitBuffer(buffers[numbuffer], 0); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 6327ce7117..dcaafa96c4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.209 2006/03/24 04:32:12 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.210 2006/03/29 21:17:36 tgl Exp $ * * * INTERFACE ROUTINES @@ -2888,16 +2888,10 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) return; reln = XLogOpenRelation(xlrec->node); - if (!RelationIsValid(reln)) - return; - - buffer = XLogReadBuffer(false, reln, xlrec->block); + buffer = XLogReadBuffer(reln, xlrec->block, false); if (!BufferIsValid(buffer)) - elog(PANIC, "heap_clean_redo: no block"); - + return; page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "heap_clean_redo: uninitialized page"); if (XLByteLE(lsn, PageGetLSN(page))) { @@ -2943,16 +2937,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) * Note: the NEWPAGE log record is used for both heaps and indexes, so do * not do anything that assumes we are touching a heap. */ - - if (record->xl_info & XLR_BKP_BLOCK_1) - return; - reln = XLogOpenRelation(xlrec->node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(true, reln, xlrec->blkno); - if (!BufferIsValid(buffer)) - elog(PANIC, "heap_newpage_redo: no block"); + buffer = XLogReadBuffer(reln, xlrec->blkno, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); Assert(record->xl_len == SizeOfHeapNewpage + BLCKSZ); @@ -2979,18 +2966,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) return; reln = XLogOpenRelation(xlrec->target.node); - - if (!RelationIsValid(reln)) - return; - - buffer = XLogReadBuffer(false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); if (!BufferIsValid(buffer)) - elog(PANIC, "heap_delete_redo: no block"); - + return; page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "heap_delete_redo: uninitialized page"); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { @@ -3045,27 +3026,31 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) reln = XLogOpenRelation(xlrec->target.node); - if (!RelationIsValid(reln)) - return; - - buffer = XLogReadBuffer(true, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); - if (!BufferIsValid(buffer)) - return; - - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page) && - !(record->xl_info & XLOG_HEAP_INIT_PAGE)) - elog(PANIC, "heap_insert_redo: uninitialized page"); - if (record->xl_info & XLOG_HEAP_INIT_PAGE) - PageInit(page, BufferGetPageSize(buffer), 0); - - if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + true); + Assert(BufferIsValid(buffer)); + page = (Page) BufferGetPage(buffer); + + PageInit(page, BufferGetPageSize(buffer), 0); + } + else + { + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } } offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); @@ -3110,9 +3095,8 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); Relation reln = XLogOpenRelation(xlrec->target.node); Buffer buffer; - bool samepage = - (ItemPointerGetBlockNumber(&(xlrec->newtid)) == - ItemPointerGetBlockNumber(&(xlrec->target.tid))); + bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == + ItemPointerGetBlockNumber(&(xlrec->target.tid))); Page page; OffsetNumber offnum; ItemId lp = NULL; @@ -3126,22 +3110,21 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) int hsize; uint32 newlen; - if (!RelationIsValid(reln)) - return; - if (record->xl_info & XLR_BKP_BLOCK_1) + { + if (samepage) + return; /* backup block covered both changes */ goto newt; + } /* Deal with old tuple version */ - buffer = XLogReadBuffer(false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); if (!BufferIsValid(buffer)) - elog(PANIC, "heap_update_redo: no block"); - + goto newt; page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "heap_update_redo: uninitialized old page"); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { @@ -3183,6 +3166,10 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) /* Set forward chain link in t_ctid */ htup->t_ctid = xlrec->newtid; } + /* + * this test is ugly, but necessary to avoid thinking that insert change + * is already applied + */ if (samepage) goto newsame; PageSetLSN(page, lsn); @@ -3194,31 +3181,37 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) newt:; - if ((record->xl_info & XLR_BKP_BLOCK_2) || - ((record->xl_info & XLR_BKP_BLOCK_1) && samepage)) + if (record->xl_info & XLR_BKP_BLOCK_2) return; - buffer = XLogReadBuffer(true, reln, - ItemPointerGetBlockNumber(&(xlrec->newtid))); - if (!BufferIsValid(buffer)) - return; - - page = (Page) BufferGetPage(buffer); - -newsame:; - if (PageIsNew((PageHeader) page) && - !(record->xl_info & XLOG_HEAP_INIT_PAGE)) - elog(PANIC, "heap_update_redo: uninitialized page"); - if (record->xl_info & XLOG_HEAP_INIT_PAGE) - PageInit(page, BufferGetPageSize(buffer), 0); - - if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + true); + Assert(BufferIsValid(buffer)); + page = (Page) BufferGetPage(buffer); + + PageInit(page, BufferGetPageSize(buffer), 0); } + else + { + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + false); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } + +newsame:; offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); if (PageGetMaxOffsetNumber(page) + 1 < offnum) @@ -3288,18 +3281,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) return; reln = XLogOpenRelation(xlrec->target.node); - - if (!RelationIsValid(reln)) - return; - - buffer = XLogReadBuffer(false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); if (!BufferIsValid(buffer)) - elog(PANIC, "heap_lock_redo: no block"); - + return; page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "heap_lock_redo: uninitialized page"); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { @@ -3381,7 +3368,10 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) { xl_heap_insert *xlrec = (xl_heap_insert *) rec; - appendStringInfo(buf, "insert: "); + if (xl_info & XLOG_HEAP_INIT_PAGE) + appendStringInfo(buf, "insert(init): "); + else + appendStringInfo(buf, "insert: "); out_target(buf, &(xlrec->target)); } else if (info == XLOG_HEAP_DELETE) @@ -3391,12 +3381,25 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) appendStringInfo(buf, "delete: "); out_target(buf, &(xlrec->target)); } - else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE) + else if (info == XLOG_HEAP_UPDATE) { xl_heap_update *xlrec = (xl_heap_update *) rec; - if (info == XLOG_HEAP_UPDATE) + if (xl_info & XLOG_HEAP_INIT_PAGE) + appendStringInfo(buf, "update(init): "); + else appendStringInfo(buf, "update: "); + out_target(buf, &(xlrec->target)); + appendStringInfo(buf, "; new %u/%u", + ItemPointerGetBlockNumber(&(xlrec->newtid)), + ItemPointerGetOffsetNumber(&(xlrec->newtid))); + } + else if (info == XLOG_HEAP_MOVE) + { + xl_heap_update *xlrec = (xl_heap_update *) rec; + + if (xl_info & XLOG_HEAP_INIT_PAGE) + appendStringInfo(buf, "move(init): "); else appendStringInfo(buf, "move: "); out_target(buf, &(xlrec->target)); diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index a13a7366b0..2047f37529 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.28 2006/03/28 21:17:23 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.29 2006/03/29 21:17:37 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -62,9 +62,9 @@ forget_matching_split(Relation reln, RelFileNode node, ListCell *l; /* Get downlink TID from page */ - buffer = XLogReadBuffer(false, reln, insertblk); + buffer = XLogReadBuffer(reln, insertblk, false); if (!BufferIsValid(buffer)) - elog(PANIC, "forget_matching_split: block unfound"); + return; page = (Page) BufferGetPage(buffer); itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); rightblk = ItemPointerGetBlockNumber(&(itup->t_tid)); @@ -117,11 +117,10 @@ _bt_restore_meta(Relation reln, XLogRecPtr lsn, BTMetaPageData *md; BTPageOpaque pageop; - metabuf = XLogReadBuffer(true, reln, BTREE_METAPAGE); - if (!BufferIsValid(metabuf)) - elog(PANIC, "_bt_restore_meta: no metapage"); - + metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true); + Assert(BufferIsValid(metabuf)); metapg = BufferGetPage(metabuf); + _bt_pageinit(metapg, BufferGetPageSize(metabuf)); md = BTPageGetMeta(metapg); @@ -174,35 +173,33 @@ btree_xlog_insert(bool isleaf, bool ismeta, return; /* nothing to do */ reln = XLogOpenRelation(xlrec->target.node); - if (!RelationIsValid(reln)) - return; if (!(record->xl_info & XLR_BKP_BLOCK_1)) { - buffer = XLogReadBuffer(false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_insert_redo: block unfound"); - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "btree_insert_redo: uninitialized page"); - - if (XLByteLE(lsn, PageGetLSN(page))) + buffer = XLogReadBuffer(reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); + if (BufferIsValid(buffer)) { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - } - else - { - if (PageAddItem(page, (Item) datapos, datalen, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - LP_USED) == InvalidOffsetNumber) - elog(PANIC, "btree_insert_redo: failed to add item"); + page = (Page) BufferGetPage(buffer); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - WriteBuffer(buffer); + if (XLByteLE(lsn, PageGetLSN(page))) + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + else + { + if (PageAddItem(page, (Item) datapos, datalen, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + LP_USED) == InvalidOffsetNumber) + elog(PANIC, "btree_insert_redo: failed to add item"); + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + } } } @@ -235,19 +232,15 @@ btree_xlog_split(bool onleft, bool isroot, BTPageOpaque pageop; reln = XLogOpenRelation(xlrec->target.node); - if (!RelationIsValid(reln)) - return; - targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid)); leftsib = (onleft) ? targetblk : xlrec->otherblk; rightsib = (onleft) ? xlrec->otherblk : targetblk; /* Left (original) sibling */ - buffer = XLogReadBuffer(true, reln, leftsib); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_split_redo: lost left sibling"); - + buffer = XLogReadBuffer(reln, leftsib, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); + _bt_pageinit(page, BufferGetPageSize(buffer)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -266,11 +259,10 @@ btree_xlog_split(bool onleft, bool isroot, WriteBuffer(buffer); /* Right (new) sibling */ - buffer = XLogReadBuffer(true, reln, rightsib); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_split_redo: lost right sibling"); - + buffer = XLogReadBuffer(reln, rightsib, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); + _bt_pageinit(page, BufferGetPageSize(buffer)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -293,28 +285,26 @@ btree_xlog_split(bool onleft, bool isroot, { if (xlrec->rightblk != P_NONE) { - buffer = XLogReadBuffer(false, reln, xlrec->rightblk); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_split_redo: lost next right page"); - - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "btree_split_redo: uninitialized next right page"); - - if (XLByteLE(lsn, PageGetLSN(page))) + buffer = XLogReadBuffer(reln, xlrec->rightblk, false); + if (BufferIsValid(buffer)) { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - } - else - { - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = rightsib; + page = (Page) BufferGetPage(buffer); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - WriteBuffer(buffer); + if (XLByteLE(lsn, PageGetLSN(page))) + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = rightsib; + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + } } } } @@ -346,14 +336,10 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) xlrec = (xl_btree_delete *) XLogRecGetData(record); reln = XLogOpenRelation(xlrec->node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(false, reln, xlrec->block); + buffer = XLogReadBuffer(reln, xlrec->block, false); if (!BufferIsValid(buffer)) - elog(PANIC, "btree_delete_redo: block unfound"); + return; page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "btree_delete_redo: uninitialized page"); if (XLByteLE(lsn, PageGetLSN(page))) { @@ -394,9 +380,6 @@ btree_xlog_delete_page(bool ismeta, BTPageOpaque pageop; reln = XLogOpenRelation(xlrec->target.node); - if (!RelationIsValid(reln)) - return; - parent = ItemPointerGetBlockNumber(&(xlrec->target.tid)); target = xlrec->deadblk; leftsib = xlrec->leftblk; @@ -405,86 +388,11 @@ btree_xlog_delete_page(bool ismeta, /* parent page */ if (!(record->xl_info & XLR_BKP_BLOCK_1)) { - buffer = XLogReadBuffer(false, reln, parent); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_delete_page_redo: parent block unfound"); - page = (Page) BufferGetPage(buffer); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "btree_delete_page_redo: uninitialized parent page"); - if (XLByteLE(lsn, PageGetLSN(page))) + buffer = XLogReadBuffer(reln, parent, false); + if (BufferIsValid(buffer)) { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - } - else - { - OffsetNumber poffset; - - poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (poffset >= PageGetMaxOffsetNumber(page)) - { - Assert(poffset == P_FIRSTDATAKEY(pageop)); - PageIndexTupleDelete(page, poffset); - pageop->btpo_flags |= BTP_HALF_DEAD; - } - else - { - ItemId itemid; - IndexTuple itup; - OffsetNumber nextoffset; - - itemid = PageGetItemId(page, poffset); - itup = (IndexTuple) PageGetItem(page, itemid); - ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); - nextoffset = OffsetNumberNext(poffset); - PageIndexTupleDelete(page, nextoffset); - } - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - WriteBuffer(buffer); - } - } - - /* Fix left-link of right sibling */ - if (!(record->xl_info & XLR_BKP_BLOCK_2)) - { - buffer = XLogReadBuffer(false, reln, rightsib); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_delete_page_redo: lost right sibling"); - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "btree_delete_page_redo: uninitialized right sibling"); - if (XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - } - else - { - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = leftsib; - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - WriteBuffer(buffer); - } - } - - /* Fix right-link of left sibling, if any */ - if (!(record->xl_info & XLR_BKP_BLOCK_3)) - { - if (leftsib != P_NONE) - { - buffer = XLogReadBuffer(false, reln, leftsib); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_delete_page_redo: lost left sibling"); page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "btree_delete_page_redo: uninitialized left sibling"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); if (XLByteLE(lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -492,8 +400,27 @@ btree_xlog_delete_page(bool ismeta, } else { - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_next = rightsib; + OffsetNumber poffset; + + poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (poffset >= PageGetMaxOffsetNumber(page)) + { + Assert(poffset == P_FIRSTDATAKEY(pageop)); + PageIndexTupleDelete(page, poffset); + pageop->btpo_flags |= BTP_HALF_DEAD; + } + else + { + ItemId itemid; + IndexTuple itup; + OffsetNumber nextoffset; + + itemid = PageGetItemId(page, poffset); + itup = (IndexTuple) PageGetItem(page, itemid); + ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); + nextoffset = OffsetNumberNext(poffset); + PageIndexTupleDelete(page, nextoffset); + } PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); @@ -503,11 +430,64 @@ btree_xlog_delete_page(bool ismeta, } } + /* Fix left-link of right sibling */ + if (!(record->xl_info & XLR_BKP_BLOCK_2)) + { + buffer = XLogReadBuffer(reln, rightsib, false); + if (BufferIsValid(buffer)) + { + page = (Page) BufferGetPage(buffer); + if (XLByteLE(lsn, PageGetLSN(page))) + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = leftsib; + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + } + } + } + + /* Fix right-link of left sibling, if any */ + if (!(record->xl_info & XLR_BKP_BLOCK_3)) + { + if (leftsib != P_NONE) + { + buffer = XLogReadBuffer(reln, leftsib, false); + if (BufferIsValid(buffer)) + { + page = (Page) BufferGetPage(buffer); + if (XLByteLE(lsn, PageGetLSN(page))) + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_next = rightsib; + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + } + } + } + } + /* Rewrite target page as empty deleted page */ - buffer = XLogReadBuffer(true, reln, target); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_delete_page_redo: lost target page"); + buffer = XLogReadBuffer(reln, target, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); + _bt_pageinit(page, BufferGetPageSize(buffer)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -544,13 +524,10 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) BTPageOpaque pageop; reln = XLogOpenRelation(xlrec->node); - if (!RelationIsValid(reln)) - return; - buffer = XLogReadBuffer(true, reln, xlrec->rootblk); - if (!BufferIsValid(buffer)) - elog(PANIC, "btree_newroot_redo: no root page"); - + buffer = XLogReadBuffer(reln, xlrec->rootblk, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); + _bt_pageinit(page, BufferGetPageSize(buffer)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -592,9 +569,6 @@ btree_xlog_newmeta(XLogRecPtr lsn, XLogRecord *record) Relation reln; reln = XLogOpenRelation(xlrec->node); - if (!RelationIsValid(reln)) - return; - _bt_restore_meta(reln, lsn, xlrec->meta.root, xlrec->meta.level, xlrec->meta.fastroot, xlrec->meta.fastlevel); @@ -800,14 +774,14 @@ btree_xlog_cleanup(void) bool is_only; reln = XLogOpenRelation(split->node); - if (!RelationIsValid(reln)) - continue; - lbuf = XLogReadBuffer(false, reln, split->leftblk); + lbuf = XLogReadBuffer(reln, split->leftblk, false); + /* failure should be impossible because we wrote this page earlier */ if (!BufferIsValid(lbuf)) elog(PANIC, "btree_xlog_cleanup: left block unfound"); lpage = (Page) BufferGetPage(lbuf); lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage); - rbuf = XLogReadBuffer(false, reln, split->rightblk); + rbuf = XLogReadBuffer(reln, split->rightblk, false); + /* failure should be impossible because we wrote this page earlier */ if (!BufferIsValid(rbuf)) elog(PANIC, "btree_xlog_cleanup: right block unfound"); rpage = (Page) BufferGetPage(rbuf); diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README index 177ba26cf3..4ebf7a8946 100644 --- a/src/backend/access/transam/README +++ b/src/backend/access/transam/README @@ -1,4 +1,4 @@ -$PostgreSQL: pgsql/src/backend/access/transam/README,v 1.3 2005/05/19 21:35:45 tgl Exp $ +$PostgreSQL: pgsql/src/backend/access/transam/README,v 1.4 2006/03/29 21:17:37 tgl Exp $ The Transaction System ---------------------- @@ -252,3 +252,166 @@ slru.c is the supporting mechanism for both pg_clog and pg_subtrans. It implements the LRU policy for in-memory buffer pages. The high-level routines for pg_clog are implemented in transam.c, while the low-level functions are in clog.c. pg_subtrans is contained completely in subtrans.c. + + +Write-Ahead Log coding +---------------------- + +The WAL subsystem (also called XLOG in the code) exists to guarantee crash +recovery. It can also be used to provide point-in-time recovery, as well as +hot-standby replication via log shipping. Here are some notes about +non-obvious aspects of its design. + +A basic assumption of a write AHEAD log is that log entries must reach stable +storage before the data-page changes they describe. This ensures that +replaying the log to its end will bring us to a consistent state where there +are no partially-performed transactions. To guarantee this, each data page +(either heap or index) is marked with the LSN (log sequence number --- in +practice, a WAL file location) of the latest XLOG record affecting the page. +Before the bufmgr can write out a dirty page, it must ensure that xlog has +been flushed to disk at least up to the page's LSN. This low-level +interaction improves performance by not waiting for XLOG I/O until necessary. +The LSN check exists only in the shared-buffer manager, not in the local +buffer manager used for temp tables; hence operations on temp tables must not +be WAL-logged. + +During WAL replay, we can check the LSN of a page to detect whether the change +recorded by the current log entry is already applied (it has been, if the page +LSN is >= the log entry's WAL location). + +Usually, log entries contain just enough information to redo a single +incremental update on a page (or small group of pages). This will work only +if the filesystem and hardware implement data page writes as atomic actions, +so that a page is never left in a corrupt partly-written state. Since that's +often an untenable assumption in practice, we log additional information to +allow complete reconstruction of modified pages. The first WAL record +affecting a given page after a checkpoint is made to contain a copy of the +entire page, and we implement replay by restoring that page copy instead of +redoing the update. (This is more reliable than the data storage itself would +be because we can check the validity of the WAL record's CRC.) We can detect +the "first change after checkpoint" by noting whether the page's old LSN +precedes the end of WAL as of the last checkpoint (the RedoRecPtr). + +The general schema for executing a WAL-logged action is + +1. Pin and exclusive-lock the shared buffer(s) containing the data page(s) +to be modified. + +2. START_CRIT_SECTION() (Any error during the next two steps must cause a +PANIC because the shared buffers will contain unlogged changes, which we +have to ensure don't get to disk. Obviously, you should check conditions +such as whether there's enough free space on the page before you start the +critical section.) + +3. Apply the required changes to the shared buffer(s). + +4. Build a WAL log record and pass it to XLogInsert(); then update the page's +LSN and TLI using the returned XLOG location. For instance, + + recptr = XLogInsert(rmgr_id, info, rdata); + + PageSetLSN(dp, recptr); + PageSetTLI(dp, ThisTimeLineID); + +5. END_CRIT_SECTION() + +6. Unlock and write the buffer(s): + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + +(Note: WriteBuffer doesn't really "write" the buffer anymore, it just marks it +dirty and unpins it. The write will not happen until a checkpoint occurs or +the shared buffer is needed for another page.) + +XLogInsert's "rdata" argument is an array of pointer/size items identifying +chunks of data to be written in the XLOG record, plus optional shared-buffer +IDs for chunks that are in shared buffers rather than temporary variables. +The "rdata" array must mention (at least once) each of the shared buffers +being modified, unless the action is such that the WAL replay routine can +reconstruct the entire page contents. XLogInsert includes the logic that +tests to see whether a shared buffer has been modified since the last +checkpoint. If not, the entire page contents are logged rather than just the +portion(s) pointed to by "rdata". + +Because XLogInsert drops the rdata components associated with buffers it +chooses to log in full, the WAL replay routines normally need to test to see +which buffers were handled that way --- otherwise they may be misled about +what the XLOG record actually contains. XLOG records that describe multi-page +changes therefore require some care to design: you must be certain that you +know what data is indicated by each "BKP" bit. An example of the trickiness +is that in a HEAP_UPDATE record, BKP(1) normally is associated with the source +page and BKP(2) is associated with the destination page --- but if these are +the same page, only BKP(1) would have been set. + +For this reason as well as the risk of deadlocking on buffer locks, it's best +to design WAL records so that they reflect small atomic actions involving just +one or a few pages. The current XLOG infrastructure cannot handle WAL records +involving references to more than three shared buffers, anyway. + +In the case where the WAL record contains enough information to re-generate +the entire contents of a page, do *not* show that page's buffer ID in the +rdata array, even if some of the rdata items point into the buffer. This is +because you don't want XLogInsert to log the whole page contents. The +standard replay-routine pattern for this case is + + reln = XLogOpenRelation(rnode); + buffer = XLogReadBuffer(reln, blkno, true); + Assert(BufferIsValid(buffer)); + page = (Page) BufferGetPage(buffer); + + ... initialize the page ... + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + +In the case where the WAL record provides only enough information to +incrementally update the page, the rdata array *must* mention the buffer +ID at least once; otherwise there is no defense against torn-page problems. +The standard replay-routine pattern for this case is + + if (record->xl_info & XLR_BKP_BLOCK_n) + << do nothing, page was rewritten from logged copy >>; + + reln = XLogOpenRelation(rnode); + buffer = XLogReadBuffer(reln, blkno, false); + if (!BufferIsValid(buffer)) + << do nothing, page has been deleted >>; + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) + { + /* changes are already applied */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + + ... apply the change ... + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + +As noted above, for a multi-page update you need to be able to determine +which XLR_BKP_BLOCK_n flag applies to each page. If a WAL record reflects +a combination of fully-rewritable and incremental updates, then the rewritable +pages don't count for the XLR_BKP_BLOCK_n numbering. (XLR_BKP_BLOCK_n is +associated with the n'th distinct buffer ID seen in the "rdata" array, and +per the above discussion, fully-rewritable buffers shouldn't be mentioned in +"rdata".) + +Due to all these constraints, complex changes (such as a multilevel index +insertion) normally need to be described by a series of atomic-action WAL +records. What do you do if the intermediate states are not self-consistent? +The answer is that the WAL replay logic has to be able to fix things up. +In btree indexes, for example, a page split requires insertion of a new key in +the parent btree level, but for locking reasons this has to be reflected by +two separate WAL records. The replay code has to remember "unfinished" split +operations, and match them up to subsequent insertions in the parent level. +If no matching insert has been found by the time the WAL replay ends, the +replay code has to do the insertion on its own to restore the index to +consistency. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a33e0df7c4..0bbe2c0d49 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.218 2006/03/24 04:32:12 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.219 2006/03/29 21:17:37 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -4097,7 +4097,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid) /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { - XLogCloseRelation(xlrec->xnodes[i]); + XLogDropRelation(xlrec->xnodes[i]); smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); } } @@ -4132,7 +4132,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { - XLogCloseRelation(xlrec->xnodes[i]); + XLogDropRelation(xlrec->xnodes[i]); smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 27149fd375..753b300fee 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.229 2006/03/28 22:01:16 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.230 2006/03/29 21:17:37 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2509,34 +2509,28 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) blk += sizeof(BkpBlock); reln = XLogOpenRelation(bkpb.node); + buffer = XLogReadBuffer(reln, bkpb.block, true); + Assert(BufferIsValid(buffer)); + page = (Page) BufferGetPage(buffer); - if (reln) + if (bkpb.hole_length == 0) { - buffer = XLogReadBuffer(true, reln, bkpb.block); - if (BufferIsValid(buffer)) - { - page = (Page) BufferGetPage(buffer); - - if (bkpb.hole_length == 0) - { - memcpy((char *) page, blk, BLCKSZ); - } - else - { - /* must zero-fill the hole */ - MemSet((char *) page, 0, BLCKSZ); - memcpy((char *) page, blk, bkpb.hole_offset); - memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), - blk + bkpb.hole_offset, - BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); - } - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - WriteBuffer(buffer); - } + memcpy((char *) page, blk, BLCKSZ); } + else + { + /* must zero-fill the hole */ + MemSet((char *) page, 0, BLCKSZ); + memcpy((char *) page, blk, bkpb.hole_offset); + memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), + blk + bkpb.hole_offset, + BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); + } + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); blk += BLCKSZ - bkpb.hole_length; } @@ -5451,25 +5445,19 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec) static void xlog_outrec(StringInfo buf, XLogRecord *record) { - int bkpb; int i; appendStringInfo(buf, "prev %X/%X; xid %u", - record->xl_prev.xlogid, record->xl_prev.xrecoff, - record->xl_xid); + record->xl_prev.xlogid, record->xl_prev.xrecoff, + record->xl_xid); - for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++) + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) - continue; - bkpb++; + if (record->xl_info & XLR_SET_BKP_BLOCK(i)) + appendStringInfo(buf, "; bkpb%d", i+1); } - if (bkpb) - appendStringInfo(buf, "; bkpb %d", bkpb); - - appendStringInfo(buf, ": %s", - RmgrTable[record->xl_rmid].rm_name); + appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name); } #endif /* WAL_DEBUG */ diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 2f85bb32ce..fb771fe2fd 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -11,7 +11,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.41 2006/03/05 15:58:22 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.42 2006/03/29 21:17:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -19,44 +19,81 @@ #include "access/xlogutils.h" #include "storage/bufmgr.h" +#include "storage/bufpage.h" #include "storage/smgr.h" #include "utils/hsearch.h" /* + * XLogReadBuffer + * Read a page during XLOG replay * - * Storage related support functions + * This is functionally comparable to ReadBuffer followed by + * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned + * and locked buffer. (The lock is not really necessary, since we + * expect that this is only done during single-process XLOG replay, + * but in some places it simplifies sharing code with the non-XLOG case.) * + * If "init" is true then the caller intends to rewrite the page fully + * using the info in the XLOG record. In this case we will extend the + * relation if needed to make the page exist, and we will not complain about + * the page being "new" (all zeroes). + * + * If "init" is false then the caller needs the page to be valid already. + * If the page doesn't exist or contains zeroes, we report failure. + * + * If the return value is InvalidBuffer (only possible when init = false), + * the caller should silently skip the update on this page. This currently + * never happens, but we retain it as part of the API spec for possible future + * use. */ - Buffer -XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno) +XLogReadBuffer(Relation reln, BlockNumber blkno, bool init) { BlockNumber lastblock = RelationGetNumberOfBlocks(reln); Buffer buffer; - if (blkno >= lastblock) + Assert(blkno != P_NEW); + + if (blkno < lastblock) { + /* page exists in file */ + buffer = ReadBuffer(reln, blkno); + } + else + { + /* hm, page doesn't exist in file */ + if (!init) + elog(PANIC, "block %u of relation %u/%u/%u does not exist", + blkno, reln->rd_node.spcNode, + reln->rd_node.dbNode, reln->rd_node.relNode); + /* OK to extend the file */ + /* we do this in recovery only - no rel-extension lock needed */ + Assert(InRecovery); buffer = InvalidBuffer; - if (extend) /* we do this in recovery only - no locks */ + while (blkno >= lastblock) { - Assert(InRecovery); - while (lastblock <= blkno) - { - if (buffer != InvalidBuffer) - ReleaseBuffer(buffer); /* must be WriteBuffer()? */ - buffer = ReadBuffer(reln, P_NEW); - lastblock++; - } + if (buffer != InvalidBuffer) + ReleaseBuffer(buffer); /* must be WriteBuffer()? */ + buffer = ReadBuffer(reln, P_NEW); + lastblock++; } - if (buffer != InvalidBuffer) - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - return buffer; + Assert(BufferGetBlockNumber(buffer) == blkno); + } + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + if (!init) + { + /* check that page has been initialized */ + Page page = (Page) BufferGetPage(buffer); + + if (PageIsNew((PageHeader) page)) + elog(PANIC, "block %u of relation %u/%u/%u is uninitialized", + blkno, reln->rd_node.spcNode, + reln->rd_node.dbNode, reln->rd_node.relNode); } - buffer = ReadBuffer(reln, blkno); - if (buffer != InvalidBuffer) - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); return buffer; } @@ -184,6 +221,9 @@ XLogCloseRelationCache(void) /* * Open a relation during XLOG replay + * + * Note: this once had an API that allowed NULL return on failure, but it + * no longer does; any failure results in elog(). */ Relation XLogOpenRelation(RelFileNode rnode) @@ -224,7 +264,7 @@ XLogOpenRelation(RelFileNode rnode) hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found); if (found) - elog(PANIC, "XLogOpenRelation: file found on insert into cache"); + elog(PANIC, "xlog relation already present on insert into cache"); hentry->rdesc = res; @@ -253,7 +293,7 @@ XLogOpenRelation(RelFileNode rnode) } /* - * Close a relation during XLOG replay + * Drop a relation during XLOG replay * * This is called when the relation is about to be deleted; we need to ensure * that there is no dangling smgr reference in the xlog relation cache. @@ -262,7 +302,7 @@ XLogOpenRelation(RelFileNode rnode) * cache, we just let it age out normally. */ void -XLogCloseRelation(RelFileNode rnode) +XLogDropRelation(RelFileNode rnode) { XLogRelDesc *rdesc; XLogRelCacheEntry *hentry; @@ -277,3 +317,25 @@ XLogCloseRelation(RelFileNode rnode) RelationCloseSmgr(&(rdesc->reldata)); } + +/* + * Drop a whole database during XLOG replay + * + * As above, but for DROP DATABASE instead of dropping a single rel + */ +void +XLogDropDatabase(Oid dbid) +{ + HASH_SEQ_STATUS status; + XLogRelCacheEntry *hentry; + + hash_seq_init(&status, _xlrelcache); + + while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL) + { + XLogRelDesc *rdesc = hentry->rdesc; + + if (hentry->rnode.dbNode == dbid) + RelationCloseSmgr(&(rdesc->reldata)); + } +} diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index a9b19ab89a..209362782a 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -15,7 +15,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.178 2006/03/24 04:32:13 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.179 2006/03/29 21:17:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -671,7 +671,7 @@ dropdb(const char *dbname, bool missing_ok) * is important to ensure that no remaining backend tries to write out a * dirty buffer to the dead database later... */ - DropBuffers(db_id); + DropDatabaseBuffers(db_id); /* * Also, clean out any entries in the shared free space map. @@ -1377,11 +1377,16 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record) dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id); - /* - * Drop pages for this database that are in the shared buffer cache - */ - DropBuffers(xlrec->db_id); + /* Drop pages for this database that are in the shared buffer cache */ + DropDatabaseBuffers(xlrec->db_id); + /* Also, clean out any entries in the shared free space map */ + FreeSpaceMapForgetDatabase(xlrec->db_id); + + /* Clean out the xlog relcache too */ + XLogDropDatabase(xlrec->db_id); + + /* And remove the physical files */ if (!rmtree(dst_path, true)) ereport(WARNING, (errmsg("could not remove database directory \"%s\"", diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 144cf73c8a..0e448271e1 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.130 2006/03/24 04:32:13 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.131 2006/03/29 21:17:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1140,14 +1140,8 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record) elog(PANIC, "seq_redo: unknown op code %u", info); reln = XLogOpenRelation(xlrec->node); - if (!RelationIsValid(reln)) - return; - - buffer = XLogReadBuffer(true, reln, 0); - if (!BufferIsValid(buffer)) - elog(PANIC, "seq_redo: can't read block 0 of rel %u/%u/%u", - xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode); - + buffer = XLogReadBuffer(reln, 0, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* Always reinit the page and reinstall the magic number */ diff --git a/src/backend/commands/tablespace.c b/src/backend/commands/tablespace.c index 84b8f02736..bafea91dfc 100644 --- a/src/backend/commands/tablespace.c +++ b/src/backend/commands/tablespace.c @@ -37,7 +37,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.33 2006/03/29 15:15:43 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.34 2006/03/29 21:17:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -467,6 +467,12 @@ DropTableSpace(DropTableSpaceStmt *stmt) (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP, rdata); } + /* + * Note: because we checked that the tablespace was empty, there should + * be no need to worry about flushing shared buffers or free space map + * entries for relations in the tablespace. + */ + /* * Allow TablespaceCreateDbspace again. */ diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index adc06b4756..461c9cf1fa 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.204 2006/03/05 15:58:36 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.205 2006/03/29 21:17:39 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1450,7 +1450,7 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp, } /* --------------------------------------------------------------------- - * DropBuffers + * DropDatabaseBuffers * * This function removes all the buffers in the buffer cache for a * particular database. Dirty pages are simply dropped, without @@ -1461,7 +1461,7 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp, * -------------------------------------------------------------------- */ void -DropBuffers(Oid dbid) +DropDatabaseBuffers(Oid dbid) { int i; volatile BufferDesc *bufHdr; diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index f2b20ad1d2..3c4c0feeef 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.161 2006/03/05 15:58:46 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.162 2006/03/29 21:17:39 tgl Exp $ * * *------------------------------------------------------------------------- @@ -166,7 +166,7 @@ ReverifyMyDatabase(const char *name) * other backend will eventually try to write them and die in * mdblindwrt. Flush any such pages to forestall trouble. */ - DropBuffers(MyDatabaseId); + DropDatabaseBuffers(MyDatabaseId); /* Now I can commit hara-kiri with a clear conscience... */ ereport(FATAL, (errcode(ERRCODE_UNDEFINED_DATABASE), diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 09e72edaf9..a684656d03 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/xlogutils.h,v 1.19 2006/03/05 15:58:54 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/xlogutils.h,v 1.20 2006/03/29 21:17:39 tgl Exp $ */ #ifndef XLOG_UTILS_H #define XLOG_UTILS_H @@ -19,8 +19,9 @@ extern void XLogInitRelationCache(void); extern void XLogCloseRelationCache(void); extern Relation XLogOpenRelation(RelFileNode rnode); -extern void XLogCloseRelation(RelFileNode rnode); +extern void XLogDropRelation(RelFileNode rnode); +extern void XLogDropDatabase(Oid dbid); -extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno); +extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init); #endif diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 8f319b88a8..4bc0737b21 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/bufmgr.h,v 1.98 2006/03/05 15:58:59 momjian Exp $ + * $PostgreSQL: pgsql/src/include/storage/bufmgr.h,v 1.99 2006/03/29 21:17:39 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -135,7 +135,7 @@ extern void RelationTruncate(Relation rel, BlockNumber nblocks); extern void FlushRelationBuffers(Relation rel); extern void DropRelFileNodeBuffers(RelFileNode rnode, bool istemp, BlockNumber firstDelBlock); -extern void DropBuffers(Oid dbid); +extern void DropDatabaseBuffers(Oid dbid); #ifdef NOT_USED extern void PrintPinnedBufs(void);