From 25a26a7ab8a70ee45dcbc6b060ce6ba274857a44 Mon Sep 17 00:00:00 2001 From: "Vadim B. Mikheev" Date: Fri, 13 Oct 2000 02:03:02 +0000 Subject: [PATCH] WAL --- src/backend/access/heap/heapam.c | 20 +- src/backend/access/nbtree/nbtinsert.c | 85 +++- src/backend/access/nbtree/nbtpage.c | 25 +- src/backend/access/nbtree/nbtree.c | 582 +++++++++++++++++++++++++- src/include/access/nbtree.h | 31 +- 5 files changed, 703 insertions(+), 40 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index dbcefbf273..3e1de33bfe 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.86 2000/10/04 00:04:41 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $ * * * INTERFACE ROUTINES @@ -2016,6 +2016,22 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record) elog(STOP, "heap_redo: unknown op code %u", info); } +void heap_undo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_HEAP_INSERT) + heap_xlog_insert(false, lsn, record); + else if (info == XLOG_HEAP_DELETE) + heap_xlog_delete(false, lsn, record); + else if (info == XLOG_HEAP_UPDATE) + heap_xlog_update(false, lsn, record); + else if (info == XLOG_HEAP_MOVE) + heap_xlog_move(false, lsn, record); + else + elog(STOP, "heap_undo: unknown op code %u", info); +} + void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record); @@ -2199,7 +2215,7 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) else /* we can't delete tuple right now */ { lp->lp_flags |= LP_DELETE; /* mark for deletion */ - MarkBufferForCleanup(buffer, PageCleanup); + MarkBufferForCleanup(buffer, HeapPageCleanup); } } diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index e454a989ee..c72b8ca3df 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.64 2000/10/05 20:10:20 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -61,6 +61,10 @@ static void _bt_pgaddtup(Relation rel, Page page, static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); +#ifdef XLOG +static Relation _xlheapRel; /* temporary hack */ +#endif + /* * _bt_doinsert() -- Handle insertion of a single btitem in the tree. * @@ -119,6 +123,10 @@ top: } } +#ifdef XLOG + _xlheapRel = heapRel; /* temporary hack */ +#endif + /* do the insertion */ res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0); @@ -517,21 +525,38 @@ _bt_insertonpg(Relation rel, #ifdef XLOG /* XLOG stuff */ { - char xlbuf[sizeof(xl_btree_insert) + 2 * sizeof(CommandId)]; + char xlbuf[sizeof(xl_btree_insert) + + sizeof(CommandId) + sizeof(RelFileNode)]; xl_btree_insert *xlrec = xlbuf; int hsize = SizeOfBtreeInsert; + BTItemData truncitem; + BTItem xlitem = btitem; + Size xlsize = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); xlrec->target.node = rel->rd_node; ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff); if (P_ISLEAF(lpageop)) - { + { CommandId cid = GetCurrentCommandId(); - memcpy(xlbuf + SizeOfBtreeInsert, &(char*)cid, sizeof(CommandId)); + memcpy(xlbuf + hsize, &cid, sizeof(CommandId)); hsize += sizeof(CommandId); + memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); + hsize += sizeof(RelFileNode); + } + /* + * Read comments in _bt_pgaddtup + */ + else if (newitemoff == P_FIRSTDATAKEY(lpageop)) + { + truncitem = *btitem; + truncitem.bti_itup.t_info = sizeof(BTItemData); + xlitem = &truncitem; + xlsize = sizeof(BTItemData); } XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT, - xlbuf, hsize, (char*) btitem, itemsz); + xlbuf, hsize, (char*) xlitem, xlsize); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); @@ -752,7 +777,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, */ { char xlbuf[sizeof(xl_btree_split) + - 2 * sizeof(CommandId) + BLCKSZ]; + sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ]; xl_btree_split *xlrec = xlbuf; int hsize = SizeOfBtreeSplit; int flag = (newitemonleft) ? @@ -765,11 +790,30 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, CommandId cid = GetCurrentCommandId(); memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId)); hsize += sizeof(CommandId); + memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); + hsize += sizeof(RelFileNode); } if (newitemonleft) { - memcpy(xlbuf + hsize, (char*) newitem, newitemsz); - hsize += newitemsz; + /* + * Read comments in _bt_pgaddtup. + * Actually, seems that in non-leaf splits newitem shouldn't + * go to first data key position. + */ + if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque)) + { + BTItemData truncitem = *newitem; + truncitem.bti_itup.t_info = sizeof(BTItemData); + memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData)); + hsize += sizeof(BTItemData); + } + else + { + Size itemsz = IndexTupleDSize(newitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + memcpy(xlbuf + hsize, (char*) newitem, itemsz); + hsize += itemsz; + } xlrec->otherblk = BufferGetBlockNumber(rbuf); } else @@ -1012,7 +1056,7 @@ static Buffer _bt_getstackbuf(Relation rel, BTStack stack) { BlockNumber blkno; - Buffer buf; + Buffer buf, newbuf; OffsetNumber start, offnum, maxoff; @@ -1101,11 +1145,18 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) Size itemsz; BTItem new_item; +#ifdef XLOG + Buffer metabuf; +#endif + /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootpage = BufferGetPage(rootbuf); rootblknum = BufferGetBlockNumber(rootbuf); +#ifdef XLOG + metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE); +#endif /* NO ELOG(ERROR) from here till newroot op is logged */ @@ -1168,9 +1219,12 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) #ifdef XLOG /* XLOG stuff */ { - xl_btree_newroot xlrec; + xl_btree_newroot xlrec; + Page metapg = BufferGetPage(metabuf); + BTMetaPageData *metad = BTPageGetMeta(metapg); + xlrec.node = rel->rd_node; - xlrec.rootblk = rootblknum; + BlockIdSet(&(xlrec.rootblk), rootblknum); /* * Dirrect access to page is not good but faster - we should @@ -1181,16 +1235,25 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) (char*)rootpage + (PageHeader) rootpage)->pd_upper, ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper); + metad->btm_root = rootblknum; + (metad->btm_level)++; + PageSetLSN(rootpage, recptr); PageSetSUI(rootpage, ThisStartUpID); + PageSetLSN(metapg, recptr); + PageSetSUI(metapg, ThisStartUpID); + + _bt_wrtbuf(rel, metabuf); } #endif /* write and let go of the new root buffer */ _bt_wrtbuf(rel, rootbuf); +#ifndef XLOG /* update metadata page with new root block number */ _bt_metaproot(rel, rootblknum, 0); +#endif /* update and release new sibling, and finally the old root */ _bt_wrtbuf(rel, rbuf); diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 2da7421901..41acd11659 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.38 2000/10/04 00:04:42 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.39 2000/10/13 02:03:00 vadim Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -27,23 +27,6 @@ #include "access/nbtree.h" #include "miscadmin.h" -#define BTREE_METAPAGE 0 -#define BTREE_MAGIC 0x053162 - -#define BTREE_VERSION 1 - -typedef struct BTMetaPageData -{ - uint32 btm_magic; - uint32 btm_version; - BlockNumber btm_root; - int32 btm_level; -} BTMetaPageData; - -#define BTPageGetMeta(p) \ - ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0]) - - /* * We use high-concurrency locking on btrees. There are two cases in * which we don't do locking. One is when we're building the btree. @@ -188,14 +171,18 @@ _bt_getroot(Relation rel, int access) #ifdef XLOG /* XLOG stuff */ { - xl_btree_insert xlrec; + xl_btree_newroot xlrec; + xlrec.node = rel->rd_node; + BlockIdSet(&(xlrec.rootblk), rootblkno); XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &xlrec, SizeOfBtreeNewroot, NULL, 0); PageSetLSN(rootpage, recptr); PageSetSUI(rootpage, ThisStartUpID); + PageSetLSN(metapg, recptr); + PageSetSUI(metapg, ThisStartUpID); } #endif diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 7fec982fa2..1064c2bb10 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -730,3 +730,583 @@ _bt_restscan(IndexScanDesc scan) so->btso_curbuf = buf; } } + +#ifdef XLOG +void btree_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(true, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(true, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(true, false, lsn, record); /* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(true, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(true, lsn, record); + else + elog(STOP, "btree_redo: unknown op code %u", info); +} + +void btree_undo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(false, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(false, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(false, false, lsn, record);/* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(false, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(false, lsn, record); + else + elog(STOP, "btree_undo: unknown op code %u", info); +} + +static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_delete *xlrec; + Relation *reln; + Buffer buffer; + Page page; + + if (!redo) + return; + + xlrec = (xl_btree_delete*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_delete_redo: block unfound"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_delete_redo: uninitialized page"); + + PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid))); + + return; +} + +static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_insert *xlrec; + Relation *reln; + Buffer buffer; + Page page; + BTPageOpaque pageop; + + xlrec = (xl_btree_insert*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer((redo) ? true : false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_insert_%s: uninitialized page", + (redo) ? "redo" : "undo"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + Size hsize = SizeOfBtreeInsert; + RelFileNode hnode; + + if (P_ISLEAF(pageop)) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + + sizeof(CommandId), sizeof(RelFileNode)); + } + + if (! _bt_add_item(page, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + (char*)xlrec + hsize, + record->xl_len - hsize, + &hnode)) + elog(STOP, "btree_insert_redo: failed to add item"); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else + { + BTItemData btdata; + + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_insert_undo: bad page LSN"); + + if (! P_ISLEAF(pageop)) + { + UnlockAndReleaseBuffer(buffer); + return; + } + + memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + + sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); + + _bt_del_item(reln, buffer, &btdata, true, lsn, record); + + } + + return; +} + +static void +btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_split *xlrec; + Relation *reln; + BlockNumber blkno; + BlockNumber parent; + Buffer buffer; + Page page; + BTPageOpaque pageop; + char *op = (redo) ? "redo" : "undo"; + bool isleaf; + + xlrec = (xl_btree_split*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + + /* Left (original) sibling */ + blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(xlrec->otherblk); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost left sibling", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_%s: uninitialized left sibling", op); + + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + isleaf = P_ISLEAF(pageop); + parent = pageop->btpo_parent; + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + /* Delete items related to new right sibling */ + _bt_thin_left_page(page, record); + + if (onleft) + { + BTItemData btdata; + Size hsize = SizeOfBtreeSplit; + Size itemsz; + RelFileNode hnode; + + pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk); + if (isleaf) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId), sizeof(RelFileNode)); + } + + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + + if (! _bt_add_item(page, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + (char*)xlrec + hsize, + itemsz, + &hnode)) + elog(STOP, "btree_split_redo: failed to add item"); + } + else + pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad left sibling LSN"); + + if (! isleaf || ! onleft) + UnlockAndReleaseBuffer(buffer); + else + { + BTItemData btdata; + + memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); + + _bt_del_item(reln, buffer, &btdata, false, lsn, record); + } + } + + /* Right (new) sibling */ + blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + buffer = XLogReadBuffer((redo) ? true : false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost right sibling", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + if (!redo) + elog(STOP, "btree_split_undo: uninitialized right sibling"); + PageInit(page, BufferGetPageSize(buffer), 0); + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + Size hsize = SizeOfBtreeSplit; + BTItemData btdata; + Size itemsz; + + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (isleaf) + { + pageop->btpo_flags |= BTP_LEAF; + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + } + if (onleft) /* skip target item */ + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + + for (char* item = (char*)xlrec + hsize; + item < (char*)record + record->xl_len; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(STOP, "btree_split_redo: can't add item to right sibling"); + item += itemsz; + } + + pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(xlrec->otherblk); + pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk); + pageop->btpo_parent = parent; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad right sibling LSN"); + + if (! isleaf || onleft) + UnlockAndReleaseBuffer(buffer); + else + { + char tbuf[BLCKSZ]; + int cnt; + char *item; + Size itemsz; + + item = (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId) + sizeof(RelFileNode); + for (cnt = 0; item < (char*)record + record->xl_len; ) + { + BTItem btitem = (BTItem) + (tbuf + cnt * (MAXALIGN(sizeof(BTItemData)))); + memcpy(btitem, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + item += itemsz; + cnt++; + } + cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (cnt < 0) + elog(STOP, "btree_split_undo: target item unfound in right sibling"); + + item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData))); + + _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record); + } + } + + /* Right (next) page */ + blkno = BlockIdGetBlockNumber(xlrec->rightblk); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost next right page", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_%s: uninitialized next right page", op); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad next right page LSN"); + + UnlockAndReleaseBuffer(buffer); + } + +} + +static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_newroot *xlrec; + Relation *reln; + Buffer buffer; + Page page; + Buffer metabuf; + Page metapg; + + if (!redo) + return; + + xlrec = (xl_btree_newroot*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk))); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_newroot_redo: no root page"); + metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_newroot_redo: no metapage"); + page = (Page) BufferGetPage(buffer); + + if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) + { + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_flags |= BTP_ROOT; + pageop->btpo_prev = pageop->btpo_next = P_NONE; + pageop->btpo_parent = BTREE_METAPAGE; + + if (record->xl_len == SizeOfBtreeNewroot) /* no childs */ + pageop->btpo_flags |= BTP_LEAF; + else + { + BTItemData btdata; + Size itemsz; + + for (char* item = (char*)xlrec + SizeOfBtreeNewroot; + item < (char*)record + record->xl_len; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(STOP, "btree_newroot_redo: can't add item"); + item += itemsz; + } + } + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + + metapg = BufferGetPage(metabuf); + if (PageIsNew((PageHeader) metapg)) + { + BTMetaPageData md; + + _bt_pageinit(metapg, BufferGetPageSize(metabuf)); + md.btm_magic = BTREE_MAGIC; + md.btm_version = BTREE_VERSION; + md.btm_root = P_NONE; + md.btm_level = 0; + memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md)); + } + + if (XLByteLT(PageGetLSN(metapg), lsn)) + { + BTMetaPageData *metad = BTPageGetMeta(metapg); + + metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk)); + (metad->btm_level)++; + PageSetLSN(metapg, lsn); + PageSetSUI(metapg, ThisStartUpID); + UnlockAndWriteBuffer(metabuf); + } + else + UnlockAndReleaseBuffer(metabuf); + + return; +} + +/* + * UNDO insertion on *leaf* page: + * - find inserted tuple; + * - delete it if heap tuple was inserted by the same xaction + */ +static void +_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, + XLogRecPtr lsn, XLogRecord *record) +{ + char *xlrec = (char*) XLogRecGetData(record); + Page page = (Page) BufferGetPage(buffer); + BTPageOpaque pageop; + BlockNumber blkno; + OffsetNumber offno; + ItemId lp; + + for ( ; ; ) + { + offno = _bt_find_btitem(page, btitem); + if (offno != InvalidOffsetNumber) + break; + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (P_RIGHTMOST(pageop)) + break; + blkno = pageop->btpo_next; + UnlockAndReleaseBuffer(buffer); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_%s_undo: lost right sibling", + (insert) ? "insert" : "split"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_%s_undo: uninitialized right sibling", + (insert) ? "insert" : "split"); + if (XLByteLT(PageGetLSN(page), lsn)) + break; + } + + if (offno == InvalidOffsetNumber) /* not found */ + { + if (!InRecovery) + elog(STOP, "btree_%s_undo: lost target tuple in rollback", + (insert) ? "insert" : "split"); + UnlockAndReleaseBuffer(buffer); + return; + } + + lp = PageGetItemId(page, offno); + if (ItemIdDeleted(lp)) /* marked for deletion */ + { + if (!InRecovery) + elog(STOP, "btree_%s_undo: deleted target tuple in rollback", + (insert) ? "insert" : "split"); + } + else if (InRecovery) /* check heap tuple */ + { + int result; + CommandId cid; + RelFileNode hnode; + Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit; + + memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); + memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); + result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid), + record->xl_xid, cid); + if (result <= 0) /* no tuple or not owner */ + { + UnlockAndReleaseBuffer(buffer); + return; + } + } + else if (! BufferIsUpdatable(buffer)) /* normal rollback */ + { + lp->lp_flags |= LP_DELETE; + MarkBufferForCleanup(buffer, IndexPageCleanup); + return; + } + + PageIndexTupleDelete(page, offno); + if (InRecovery) + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_flags |= BTP_REORDER; + } + UnlockAndWriteBuffer(buffer); + + return; +} + +static bool +_bt_add_item(Page page, OffsetNumber offno, + char* item, Size size, RelFileNode* hnode) +{ + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (offno > PageGetMaxOffsetNumber(page) + 1) + { + if (! (pageop->btpo_flags & BTP_REORDER)) + { + elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); + pageop->btpo_flags |= BTP_REORDER; + } + offno = PageGetMaxOffsetNumber(page) + 1; + } + + if (PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + { + /* ops, not enough space - try to deleted dead tuples */ + bool result; + + if (! P_ISLEAF(pageop)) + return(false); + result = _bt_cleanup_page(page, hnode); + if (!result || PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + return(false); + } + + return(true); +} + +#endif diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 437b6637b2..4ca61e0c63 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: nbtree.h,v 1.43 2000/10/04 00:04:43 vadim Exp $ + * $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -42,11 +42,28 @@ typedef struct BTPageOpaqueData #define BTP_FREE (1 << 2) /* not currently used... */ #define BTP_META (1 << 3) /* Set in the meta-page only */ +#ifdef XLOG +#define BTP_REORDER (1 << 4) /* items must be re-ordered */ +#endif } BTPageOpaqueData; typedef BTPageOpaqueData *BTPageOpaque; #define BTREE_METAPAGE 0 /* first page is meta */ +#define BTREE_MAGIC 0x053162 + +#define BTREE_VERSION 1 + +typedef struct BTMetaPageData +{ + uint32 btm_magic; + uint32 btm_version; + BlockNumber btm_root; + int32 btm_level; +} BTMetaPageData; + +#define BTPageGetMeta(p) \ + ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0]) /* * BTScanOpaqueData is used to remember which buffers we're currently @@ -228,13 +245,13 @@ typedef struct xl_btree_delete /* * This is what we need to know about pure (without split) insert - - * 14 + [4] + btitem with key data. Note that we need in CommandID - * (4 bytes) only for leaf page insert. + * 14 + [4+8] + btitem with key data. Note that we need in CommandID + * and HeapNode (4 + 8 bytes) only for leaf page insert. */ typedef struct xl_btree_insert { xl_btreetid target; /* inserted tuple id */ - /* [CommandID and ] BTITEM FOLLOWS AT END OF STRUCT */ + /* [CommandID, HeapNode and ] BTITEM FOLLOWS AT END OF STRUCT */ } xl_btree_insert; #define SizeOfBtreeInsert (offsetof(xl_btreetid, tid) + SizeOfIptrData) @@ -242,8 +259,8 @@ typedef struct xl_btree_insert /* * This is what we need to know about insert with split - - * 22 + [4] + [btitem] + right sibling btitems. Note that we need in - * CommandID (4 bytes) only for leaf page insert. + * 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in + * CommandID and HeapNode (4 + 8 bytes) only for leaf page insert. */ typedef struct xl_btree_split { @@ -255,7 +272,7 @@ typedef struct xl_btree_split * We log all btitems from the right sibling. If new btitem goes on * the left sibling then we log it too and it will be the first * BTItemData at the end of this struct, but after (for the leaf - * pages) CommandId. + * pages) CommandId and HeapNode. */ } xl_btree_split;