diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index a743da1cc4..7320e7cdbc 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.149 2007/02/06 14:55:11 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.150 2007/02/08 05:05:53 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -733,6 +733,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, rightoff; OffsetNumber maxoff; OffsetNumber i; + bool isroot; rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); origpage = BufferGetPage(buf); @@ -747,6 +748,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); + isroot = P_ISROOT(oopaque); + /* if we're splitting this page, it won't be the root when we're done */ /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */ lopaque->btpo_flags = oopaque->btpo_flags; @@ -921,61 +924,116 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, MarkBufferDirty(sbuf); } + /* + * By here, the original data page has been split into two new halves, and + * these are correct. The algorithm requires that the left page never + * move during a split, so we copy the new left page back on top of the + * original. Note that this is not a waste of time, since we also require + * (in the page management code) that the center of a page always be + * clean, and the most efficient way to guarantee this is just to compact + * the data by reinserting it into a new left page. (XXX the latter + * comment is probably obsolete.) + * + * We need to do this before writing the WAL record, so that XLogInsert can + * WAL log an image of the page if necessary. + */ + PageRestoreTempPage(leftpage, origpage); + /* XLOG stuff */ if (!rel->rd_istemp) { xl_btree_split xlrec; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[4]; + XLogRecData rdata[6]; + XLogRecData *lastrdata; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); - if (newitemonleft) - xlrec.otherblk = BufferGetBlockNumber(rbuf); - else - xlrec.otherblk = BufferGetBlockNumber(buf); - xlrec.leftblk = lopaque->btpo_prev; - xlrec.rightblk = ropaque->btpo_next; + xlrec.node = rel->rd_node; + xlrec.leftsib = BufferGetBlockNumber(buf); + xlrec.rightsib = BufferGetBlockNumber(rbuf); + xlrec.firstright = firstright; + xlrec.rnext = ropaque->btpo_next; xlrec.level = lopaque->btpo.level; - /* + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfBtreeSplit; + rdata[0].buffer = InvalidBuffer; + + lastrdata = &rdata[0]; + + /* Log downlink on non-leaf pages. */ + if (lopaque->btpo.level > 0) + { + lastrdata->next = lastrdata + 1; + lastrdata++; + + lastrdata->data = (char *) &newitem->t_tid.ip_blkid; + lastrdata->len = sizeof(BlockIdData); + lastrdata->buffer = InvalidBuffer; + } + + /* Log the new item, if it was inserted on the left page. If it was + * put on the right page, we don't need to explicitly WAL log it + * because it's included with all the other items on the right page. + */ + lastrdata->next = lastrdata + 1; + lastrdata++; + if (newitemonleft) + { + lastrdata->data = (char *) &newitemoff; + lastrdata->len = sizeof(OffsetNumber); + lastrdata->buffer = buf; /* backup block 1 */ + lastrdata->buffer_std = true; + + lastrdata->next = lastrdata + 1; + lastrdata++; + lastrdata->data = (char *)newitem; + lastrdata->len = newitemsz; + lastrdata->buffer = buf; /* backup block 1 */ + lastrdata->buffer_std = true; + } + else + { + lastrdata->data = NULL; + lastrdata->len = 0; + lastrdata->buffer = buf; /* backup block 1 */ + lastrdata->buffer_std = true; + } + + /* Log the contents of the right page in the format understood by + * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer, + * because we're going to recreate the whole page anyway. + * * Direct access to page is not good but faster - we should implement * some new func in page API. Note we only store the tuples * themselves, knowing that the item pointers are in the same order * and can be reconstructed by scanning the tuples. See comments for * _bt_restore_page(). */ - xlrec.leftlen = ((PageHeader) leftpage)->pd_special - - ((PageHeader) leftpage)->pd_upper; + lastrdata->next = lastrdata + 1; + lastrdata++; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeSplit; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper; - rdata[1].len = xlrec.leftlen; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &(rdata[2]); - - rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper; - rdata[2].len = ((PageHeader) rightpage)->pd_special - + lastrdata->data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper; - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; + lastrdata->len = ((PageHeader) rightpage)->pd_special - + ((PageHeader) rightpage)->pd_upper; + lastrdata->buffer = InvalidBuffer; + /* Log the right sibling, because we've changed it's prev-pointer. */ if (!P_RIGHTMOST(ropaque)) { - rdata[2].next = &(rdata[3]); - rdata[3].data = NULL; - rdata[3].len = 0; - rdata[3].buffer = sbuf; - rdata[3].buffer_std = true; - rdata[3].next = NULL; + lastrdata->next = lastrdata + 1; + lastrdata++; + + lastrdata->data = NULL; + lastrdata->len = 0; + lastrdata->buffer = sbuf; /* backup block 2 */ + lastrdata->buffer_std = true; } - if (P_ISROOT(oopaque)) + lastrdata->next = NULL; + + if (isroot) xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT; else xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R; @@ -993,24 +1051,6 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } } - /* - * By here, the original data page has been split into two new halves, and - * these are correct. The algorithm requires that the left page never - * move during a split, so we copy the new left page back on top of the - * original. Note that this is not a waste of time, since we also require - * (in the page management code) that the center of a page always be - * clean, and the most efficient way to guarantee this is just to compact - * the data by reinserting it into a new left page. (XXX the latter - * comment is probably obsolete.) - * - * It's a bit weird that we don't fill in the left page till after writing - * the XLOG entry, but not really worth changing. Note that we use the - * origpage data (specifically its BTP_ROOT bit) while preparing the XLOG - * entry, so simply reshuffling the code won't do. - */ - - PageRestoreTempPage(leftpage, origpage); - END_CRIT_SECTION(); /* release the old right sibling */ diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 5dce208308..dd6fd8571a 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.41 2007/02/01 19:10:25 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.42 2007/02/08 05:05:53 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -264,122 +264,165 @@ btree_xlog_split(bool onleft, bool isroot, { xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); Relation reln; - BlockNumber targetblk; - OffsetNumber targetoff; - BlockNumber leftsib; - BlockNumber rightsib; - BlockNumber downlink = 0; - Buffer buffer; - Page page; - BTPageOpaque pageop; + Buffer lbuf, rbuf; + Page lpage, rpage; + BTPageOpaque ropaque, lopaque; + char *datapos; + int datalen; + bool bkp_left = record->xl_info & XLR_BKP_BLOCK_1; + bool bkp_nextsib = record->xl_info & XLR_BKP_BLOCK_2; + OffsetNumber newitemoff; + Item newitem = NULL; + Size newitemsz = 0; - reln = XLogOpenRelation(xlrec->target.node); - targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - leftsib = (onleft) ? targetblk : xlrec->otherblk; - rightsib = (onleft) ? xlrec->otherblk : targetblk; + reln = XLogOpenRelation(xlrec->node); - /* Left (original) sibling */ - buffer = XLogReadBuffer(reln, leftsib, true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); - - _bt_pageinit(page, BufferGetPageSize(buffer)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - pageop->btpo_prev = xlrec->leftblk; - pageop->btpo_next = rightsib; - pageop->btpo.level = xlrec->level; - pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0; - pageop->btpo_cycleid = 0; - - _bt_restore_page(page, - (char *) xlrec + SizeOfBtreeSplit, - xlrec->leftlen); - - if (onleft && xlrec->level > 0) - { - IndexTuple itup; - - /* extract downlink in the target tuple */ - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff)); - downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); - } - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); - - /* Right (new) sibling */ - buffer = XLogReadBuffer(reln, rightsib, true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); - - _bt_pageinit(page, BufferGetPageSize(buffer)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - pageop->btpo_prev = leftsib; - pageop->btpo_next = xlrec->rightblk; - pageop->btpo.level = xlrec->level; - pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0; - pageop->btpo_cycleid = 0; - - _bt_restore_page(page, - (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen, - record->xl_len - SizeOfBtreeSplit - xlrec->leftlen); - - if (!onleft && xlrec->level > 0) - { - IndexTuple itup; - - /* extract downlink in the target tuple */ - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff)); - downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); - } - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); - - /* Fix left-link of right (next) page */ - if (!(record->xl_info & XLR_BKP_BLOCK_1)) - { - if (xlrec->rightblk != P_NONE) - { - buffer = XLogReadBuffer(reln, xlrec->rightblk, false); - if (BufferIsValid(buffer)) - { - page = (Page) BufferGetPage(buffer); - - if (XLByteLE(lsn, PageGetLSN(page))) - { - UnlockReleaseBuffer(buffer); - } - else - { - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = rightsib; - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); - } - } - } - } + datapos = (char *) xlrec + SizeOfBtreeSplit; + datalen = record->xl_len - SizeOfBtreeSplit; /* Forget any split this insertion completes */ if (xlrec->level > 0) - forget_matching_split(xlrec->target.node, downlink, false); + { + BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos); + + datapos += sizeof(BlockIdData); + datalen -= sizeof(BlockIdData); + + forget_matching_split(xlrec->node, downlink, false); + } + + + /* Extract newitem and newitemoff */ + if (!bkp_left && onleft) + { + IndexTupleData itupdata; + + /* Extract the offset of the new tuple and it's contents */ + memcpy(&newitemoff, datapos, sizeof(OffsetNumber)); + datapos += sizeof(OffsetNumber); + datalen -= sizeof(OffsetNumber); + + newitem = datapos; + /* Need to copy tuple header due to alignment considerations */ + memcpy(&itupdata, datapos, sizeof(IndexTupleData)); + newitemsz = IndexTupleDSize(itupdata); + newitemsz = MAXALIGN(newitemsz); + datapos += newitemsz; + datalen -= newitemsz; + } + + /* Reconstruct right (new) sibling */ + rbuf = XLogReadBuffer(reln, xlrec->rightsib, true); + Assert(BufferIsValid(rbuf)); + rpage = (Page) BufferGetPage(rbuf); + + _bt_pageinit(rpage, BufferGetPageSize(rbuf)); + ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage); + + ropaque->btpo_prev = xlrec->leftsib; + ropaque->btpo_next = xlrec->rnext; + ropaque->btpo.level = xlrec->level; + ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0; + ropaque->btpo_cycleid = 0; + + _bt_restore_page(rpage, datapos, datalen); + + PageSetLSN(rpage, lsn); + PageSetTLI(rpage, ThisTimeLineID); + MarkBufferDirty(rbuf); + + /* don't release the buffer yet, because reconstructing the left sibling + * needs to access the data on the right page + */ + + + /* Reconstruct left (original) sibling */ + + if(!bkp_left) + { + lbuf = XLogReadBuffer(reln, xlrec->leftsib, false); + + if (BufferIsValid(lbuf)) + { + lpage = (Page) BufferGetPage(lbuf); + lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); + + if (!XLByteLE(lsn, PageGetLSN(lpage))) + { + /* Remove the items from the left page that were copied to + * right page, and add the new item if it was inserted to + * left page. + */ + OffsetNumber off; + OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage); + ItemId hiItemId; + Item hiItem; + + for(off = maxoff ; off >= xlrec->firstright; off--) + PageIndexTupleDelete(lpage, off); + + if (onleft) + { + if (PageAddItem(lpage, newitem, newitemsz, newitemoff, + LP_USED) == InvalidOffsetNumber) + elog(PANIC, "can't add new item to left sibling after split"); + } + /* Set high key equal to the first key on the right page */ + hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque)); + hiItem = PageGetItem(rpage, hiItemId); + + if(!P_RIGHTMOST(lopaque)) + { + /* but remove the old high key first */ + PageIndexTupleDelete(lpage, P_HIKEY); + } + + if(PageAddItem(lpage, hiItem, ItemIdGetLength(hiItemId), + P_HIKEY, LP_USED) == InvalidOffsetNumber) + elog(PANIC, "can't add high key after split to left page"); + + /* Fix opaque fields */ + lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0; + lopaque->btpo_next = xlrec->rightsib; + lopaque->btpo_cycleid = 0; + + PageSetLSN(lpage, lsn); + PageSetTLI(lpage, ThisTimeLineID); + MarkBufferDirty(lbuf); + } + + UnlockReleaseBuffer(lbuf); + } + + } + + /* we no longer need the right buffer. */ + UnlockReleaseBuffer(rbuf); + + /* Fix left-link of the page to the right of the new right sibling */ + if (!bkp_nextsib && xlrec->rnext != P_NONE) + { + Buffer buffer = XLogReadBuffer(reln, xlrec->rnext, false); + if (BufferIsValid(buffer)) + { + Page page = (Page) BufferGetPage(buffer); + + if (!XLByteLE(lsn, PageGetLSN(page))) + { + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = xlrec->rightsib; + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } + } /* The job ain't done till the parent link is inserted... */ - log_incomplete_split(xlrec->target.node, - leftsib, rightsib, isroot); + log_incomplete_split(xlrec->node, + xlrec->leftsib, xlrec->rightsib, isroot); } static void @@ -727,40 +770,48 @@ btree_desc(StringInfo buf, uint8 xl_info, char *rec) { xl_btree_split *xlrec = (xl_btree_split *) rec; - appendStringInfo(buf, "split_l: "); - out_target(buf, &(xlrec->target)); - appendStringInfo(buf, "; oth %u; rgh %u", - xlrec->otherblk, xlrec->rightblk); + appendStringInfo(buf, "split_l: rel %u/%u/%u ", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode); + appendStringInfo(buf, "left %u, right %u off %u level %u", + xlrec->leftsib, xlrec->rightsib, + xlrec->firstright, xlrec->level); break; } case XLOG_BTREE_SPLIT_R: { xl_btree_split *xlrec = (xl_btree_split *) rec; - appendStringInfo(buf, "split_r: "); - out_target(buf, &(xlrec->target)); - appendStringInfo(buf, "; oth %u; rgh %u", - xlrec->otherblk, xlrec->rightblk); + appendStringInfo(buf, "split_r: rel %u/%u/%u ", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode); + appendStringInfo(buf, "left %u, right %u off %u level %u", + xlrec->leftsib, xlrec->rightsib, + xlrec->firstright, xlrec->level); break; } case XLOG_BTREE_SPLIT_L_ROOT: { xl_btree_split *xlrec = (xl_btree_split *) rec; - appendStringInfo(buf, "split_l_root: "); - out_target(buf, &(xlrec->target)); - appendStringInfo(buf, "; oth %u; rgh %u", - xlrec->otherblk, xlrec->rightblk); + appendStringInfo(buf, "split_l_root: rel %u/%u/%u ", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode); + appendStringInfo(buf, "left %u, right %u off %u level %u", + xlrec->leftsib, xlrec->rightsib, + xlrec->firstright, xlrec->level); break; } case XLOG_BTREE_SPLIT_R_ROOT: { xl_btree_split *xlrec = (xl_btree_split *) rec; - appendStringInfo(buf, "split_r_root: "); - out_target(buf, &(xlrec->target)); - appendStringInfo(buf, "; oth %u; rgh %u", - xlrec->otherblk, xlrec->rightblk); + appendStringInfo(buf, "split_r_root: rel %u/%u/%u ", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode); + appendStringInfo(buf, "left %u, right %u off %u level %u", + xlrec->leftsib, xlrec->rightsib, + xlrec->firstright, xlrec->level); break; } case XLOG_BTREE_DELETE: diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 91d91a7705..ff78cf1f71 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.110 2007/02/05 04:22:18 tgl Exp $ + * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.111 2007/02/08 05:05:53 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -259,7 +259,8 @@ typedef struct xl_btree_insert * * Note: the four XLOG_BTREE_SPLIT xl_info codes all use this data record. * The _L and _R variants indicate whether the inserted tuple went into the - * left or right split page (and thus, whether otherblk is the right or left + * left or right split page (and thus, whether newitemoff and the new item + * are stored or not. * page of the split pair). The _ROOT variants indicate that we are splitting * the root page, and thus that a newroot record rather than an insert or * split record should follow. Note that a split record never carries a @@ -267,17 +268,21 @@ typedef struct xl_btree_insert */ typedef struct xl_btree_split { - xl_btreetid target; /* inserted tuple id */ - BlockNumber otherblk; /* second block participated in split: */ - /* first one is stored in target' tid */ - BlockNumber leftblk; /* prev/left block */ - BlockNumber rightblk; /* next/right block */ - uint32 level; /* tree level of page being split */ - uint16 leftlen; /* len of left page items below */ - /* LEFT AND RIGHT PAGES TUPLES FOLLOW AT THE END */ + RelFileNode node; + BlockNumber leftsib; /* orig page / new left page */ + BlockNumber rightsib; /* new right page */ + OffsetNumber firstright; /* first item stored on right page */ + BlockNumber rnext; /* next/right block pointer */ + uint32 level; /* tree level of page being split */ + + /* BlockIdData downlink follows if level > 0 */ + + /* OffsetNumber newitemoff follows in the _L variants. */ + /* New item follows in the _L variants */ + /* RIGHT PAGES TUPLES FOLLOW AT THE END */ } xl_btree_split; -#define SizeOfBtreeSplit (offsetof(xl_btree_split, leftlen) + sizeof(uint16)) +#define SizeOfBtreeSplit (offsetof(xl_btree_split, level) + sizeof(uint32)) /* * This is what we need to know about delete of individual leaf index tuples.