Refactor per-page logic common to all redo routines to a new function.

Every redo routine uses the same idiom to determine what to do to a page:
check if there's a backup block for it, and if not read, the buffer if the
block exists, and check its LSN. Refactor that into a common function,
XLogReadBufferForRedo, making all the redo routines shorter and more
readable.

This has no user-visible effect, and makes no changes to the WAL format.

Reviewed by Andres Freund, Alvaro Herrera, Michael Paquier.
This commit is contained in:
Heikki Linnakangas 2014-08-13 15:39:08 +03:00
parent 26f8b99b24
commit f8f4227976
8 changed files with 1430 additions and 1739 deletions

View File

@ -20,25 +20,25 @@
static MemoryContext opCtx; /* working memory for operations */
static void
ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
ginRedoClearIncompleteSplit(XLogRecPtr lsn, XLogRecord *record,
int block_index,
RelFileNode node, BlockNumber blkno)
{
Buffer buffer;
Page page;
buffer = XLogReadBuffer(node, blkno, false);
if (!BufferIsValid(buffer))
return; /* page was deleted, nothing to do */
page = (Page) BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
if (XLogReadBufferForRedo(lsn, record, block_index, node, blkno, &buffer)
== BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
@ -332,7 +332,6 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
Buffer buffer;
Page page;
char *payload;
BlockNumber leftChildBlkno = InvalidBlockNumber;
BlockNumber rightChildBlkno = InvalidBlockNumber;
@ -351,26 +350,14 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
payload += sizeof(BlockIdData);
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
ginRedoClearIncompleteSplit(lsn, record, 0, data->node, leftChildBlkno);
}
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
if (XLogReadBufferForRedo(lsn, record, isLeaf ? 0 : 1, data->node,
data->blkno, &buffer) == BLK_NEEDS_REDO)
{
(void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
return;
}
Page page = BufferGetPage(buffer);
buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer))
return; /* page was deleted, nothing to do */
page = (Page) BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
{
/* How to insert the payload is tree-type specific */
if (data->flags & GIN_INSERT_ISDATA)
{
@ -386,8 +373,8 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
@ -476,12 +463,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
* split
*/
if (!isLeaf)
{
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
}
ginRedoClearIncompleteSplit(lsn, record, 0, data->node, data->leftChildBlkno);
flags = 0;
if (isLeaf)
@ -605,31 +587,21 @@ ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
Buffer buffer;
Page page;
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->blkno,
&buffer) == BLK_NEEDS_REDO)
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
Page page = BufferGetPage(buffer);
buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
Assert(GinPageIsLeaf(page));
Assert(GinPageIsData(page));
Assert(GinPageIsLeaf(page));
Assert(GinPageIsData(page));
if (lsn > PageGetLSN(page))
{
ginRedoRecompress(page, &xlrec->data);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
@ -641,62 +613,42 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
Buffer lbuffer;
Page page;
if (record->xl_info & XLR_BKP_BLOCK(0))
dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
else
if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->blkno, &dbuffer)
== BLK_NEEDS_REDO)
{
dbuffer = XLogReadBuffer(data->node, data->blkno, false);
if (BufferIsValid(dbuffer))
{
page = BufferGetPage(dbuffer);
if (lsn > PageGetLSN(page))
{
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn);
MarkBufferDirty(dbuffer);
}
}
page = BufferGetPage(dbuffer);
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn);
MarkBufferDirty(dbuffer);
}
if (record->xl_info & XLR_BKP_BLOCK(1))
pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
else
if (XLogReadBufferForRedo(lsn, record, 1, data->node, data->parentBlkno,
&pbuffer) == BLK_NEEDS_REDO)
{
pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
if (BufferIsValid(pbuffer))
{
page = BufferGetPage(pbuffer);
if (lsn > PageGetLSN(page))
{
Assert(GinPageIsData(page));
Assert(!GinPageIsLeaf(page));
GinPageDeletePostingItem(page, data->parentOffset);
PageSetLSN(page, lsn);
MarkBufferDirty(pbuffer);
}
}
page = BufferGetPage(pbuffer);
Assert(GinPageIsData(page));
Assert(!GinPageIsLeaf(page));
GinPageDeletePostingItem(page, data->parentOffset);
PageSetLSN(page, lsn);
MarkBufferDirty(pbuffer);
}
if (record->xl_info & XLR_BKP_BLOCK(2))
(void) RestoreBackupBlock(lsn, record, 2, false, false);
else if (data->leftBlkno != InvalidBlockNumber)
if (XLogReadBufferForRedo(lsn, record, 2, data->node, data->leftBlkno,
&lbuffer) == BLK_NEEDS_REDO)
{
lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
if (BufferIsValid(lbuffer))
{
page = BufferGetPage(lbuffer);
if (lsn > PageGetLSN(page))
{
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn);
MarkBufferDirty(lbuffer);
}
UnlockReleaseBuffer(lbuffer);
}
page = BufferGetPage(lbuffer);
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn);
MarkBufferDirty(lbuffer);
}
if (BufferIsValid(lbuffer))
UnlockReleaseBuffer(lbuffer);
if (BufferIsValid(pbuffer))
UnlockReleaseBuffer(pbuffer);
if (BufferIsValid(dbuffer))
@ -730,74 +682,64 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
/*
* insert into tail page
*/
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
if (XLogReadBufferForRedo(lsn, record, 0, data->node,
data->metadata.tail, &buffer)
== BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
if (BufferIsValid(buffer))
Page page = BufferGetPage(buffer);
OffsetNumber off;
int i;
Size tupsize;
IndexTuple tuples;
tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
if (PageIsEmpty(page))
off = FirstOffsetNumber;
else
off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
for (i = 0; i < data->ntuples; i++)
{
Page page = BufferGetPage(buffer);
tupsize = IndexTupleSize(tuples);
if (lsn > PageGetLSN(page))
{
OffsetNumber l,
off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
int i,
tupsize;
IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
if (PageAddItem(page, (Item) tuples, tupsize, off,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page");
for (i = 0; i < data->ntuples; i++)
{
tupsize = IndexTupleSize(tuples);
tuples = (IndexTuple) (((char *) tuples) + tupsize);
l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
if (l == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page");
tuples = (IndexTuple) (((char *) tuples) + tupsize);
off++;
}
/*
* Increase counter of heap tuples
*/
GinPageGetOpaque(page)->maxoff++;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
off++;
}
/*
* Increase counter of heap tuples
*/
GinPageGetOpaque(page)->maxoff++;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
else if (data->prevTail != InvalidBlockNumber)
{
/*
* New tail
*/
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->prevTail,
&buffer) == BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(data->node, data->prevTail, false);
if (BufferIsValid(buffer))
{
Page page = BufferGetPage(buffer);
Page page = BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
{
GinPageGetOpaque(page)->rightlink = data->newRightlink;
GinPageGetOpaque(page)->rightlink = data->newRightlink;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
UnlockReleaseBuffer(metabuffer);

View File

@ -48,31 +48,26 @@ gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
{
Buffer buffer;
Page page;
if (record->xl_info & XLR_BKP_BLOCK(block_index))
buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
else
{
buffer = XLogReadBuffer(node, childblkno, false);
if (!BufferIsValid(buffer))
return; /* page was deleted, nothing to do */
}
page = (Page) BufferGetPage(buffer);
XLogRedoAction action;
/*
* Note that we still update the page even if page LSN is equal to the LSN
* of this record, because the updated NSN is not included in the full
* page image.
* Note that we still update the page even if it was restored from a full
* page image, because the updated NSN is not included in the image.
*/
if (lsn >= PageGetLSN(page))
action = XLogReadBufferForRedo(lsn, record, block_index, node, childblkno,
&buffer);
if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
{
page = BufferGetPage(buffer);
GistPageSetNSN(page, lsn);
GistClearFollowRight(page);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
@ -87,104 +82,86 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
Page page;
char *data;
/*
* We need to acquire and hold lock on target page while updating the left
* child page. If we have a full-page image of target page, getting the
* lock is a side-effect of restoring that image. Note that even if the
* target page no longer exists, we'll still attempt to replay the change
* on the child page.
*/
if (record->xl_info & XLR_BKP_BLOCK(0))
buffer = RestoreBackupBlock(lsn, record, 0, false, true);
else
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
&buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
/* Fix follow-right data on left child page */
data = begin + sizeof(gistxlogPageUpdate);
/* Delete old tuples */
if (xldata->ntodelete > 0)
{
int i;
OffsetNumber *todelete = (OffsetNumber *) data;
data += sizeof(OffsetNumber) * xldata->ntodelete;
for (i = 0; i < xldata->ntodelete; i++)
PageIndexTupleDelete(page, todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
if (data - begin < record->xl_len)
{
OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
while (data - begin < record->xl_len)
{
IndexTuple itup = (IndexTuple) data;
Size sz = IndexTupleSize(itup);
OffsetNumber l;
data += sz;
l = PageAddItem(page, (Item) itup, sz, off, false, false);
if (l == InvalidOffsetNumber)
elog(ERROR, "failed to add item to GiST index page, size %d bytes",
(int) sz);
off++;
}
}
else
{
/*
* special case: leafpage, nothing to insert, nothing to delete,
* then vacuum marks page
*/
if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
GistClearTuplesDeleted(page);
}
if (!GistPageIsLeaf(page) &&
PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
xldata->blkno == GIST_ROOT_BLKNO)
{
/*
* all links on non-leaf root page was deleted by vacuum full, so
* root page becomes a leaf
*/
GistPageSetLeaf(page);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
/*
* Fix follow-right data on left child page
*
* This must be done while still holding the lock on the target page. Note
* that even if the target page no longer exists, we still attempt to
* replay the change on the child page.
*/
if (BlockNumberIsValid(xldata->leftchild))
gistRedoClearFollowRight(lsn, record, 1,
xldata->node, xldata->leftchild);
/* Done if target page no longer exists */
if (!BufferIsValid(buffer))
return;
/* nothing more to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
return;
}
page = (Page) BufferGetPage(buffer);
/* nothing more to do if change already applied */
if (lsn <= PageGetLSN(page))
{
UnlockReleaseBuffer(buffer);
return;
}
data = begin + sizeof(gistxlogPageUpdate);
/* Delete old tuples */
if (xldata->ntodelete > 0)
{
int i;
OffsetNumber *todelete = (OffsetNumber *) data;
data += sizeof(OffsetNumber) * xldata->ntodelete;
for (i = 0; i < xldata->ntodelete; i++)
PageIndexTupleDelete(page, todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
if (data - begin < record->xl_len)
{
OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
while (data - begin < record->xl_len)
{
IndexTuple itup = (IndexTuple) data;
Size sz = IndexTupleSize(itup);
OffsetNumber l;
data += sz;
l = PageAddItem(page, (Item) itup, sz, off, false, false);
if (l == InvalidOffsetNumber)
elog(ERROR, "failed to add item to GiST index page, size %d bytes",
(int) sz);
off++;
}
}
else
{
/*
* special case: leafpage, nothing to insert, nothing to delete, then
* vacuum marks page
*/
if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
GistClearTuplesDeleted(page);
}
if (!GistPageIsLeaf(page) &&
PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
xldata->blkno == GIST_ROOT_BLKNO)
{
/*
* all links on non-leaf root page was deleted by vacuum full, so root
* page becomes a leaf
*/
GistPageSetLeaf(page);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void

File diff suppressed because it is too large Load Diff

View File

@ -116,27 +116,25 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
*/
static void
_bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
int block_index,
RelFileNode rnode, BlockNumber cblock)
{
Buffer buf;
buf = XLogReadBuffer(rnode, cblock, false);
if (BufferIsValid(buf))
if (XLogReadBufferForRedo(lsn, record, block_index, rnode, cblock, &buf)
== BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buf);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (lsn > PageGetLSN(page))
{
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
UnlockReleaseBuffer(buf);
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void
@ -184,39 +182,28 @@ btree_xlog_insert(bool isleaf, bool ismeta,
*/
if (!isleaf)
{
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
_bt_clear_incomplete_split(lsn, record, xlrec->target.node, cblkno);
_bt_clear_incomplete_split(lsn, record, 0, xlrec->target.node, cblkno);
main_blk_index = 1;
}
else
main_blk_index = 0;
if (record->xl_info & XLR_BKP_BLOCK(main_blk_index))
(void) RestoreBackupBlock(lsn, record, main_blk_index, false, false);
else
if (XLogReadBufferForRedo(lsn, record, main_blk_index, xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
&buffer) == BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
page = BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
{
if (PageAddItem(page, (Item) datapos, datalen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
false, false) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
if (PageAddItem(page, (Item) datapos, datalen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
false, false) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
* Note: in normal operation, we'd update the metapage while still holding
@ -299,12 +286,7 @@ btree_xlog_split(bool onleft, bool isroot,
* before locking the other pages)
*/
if (!isleaf)
{
if (record->xl_info & XLR_BKP_BLOCK(1))
(void) RestoreBackupBlock(lsn, record, 1, false, false);
else
_bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
}
_bt_clear_incomplete_split(lsn, record, 1, xlrec->node, cblkno);
/* Reconstruct right (new) sibling page from scratch */
rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
@ -340,87 +322,77 @@ btree_xlog_split(bool onleft, bool isroot,
/* don't release the buffer yet; we touch right page's first item below */
/* Now reconstruct left (original) sibling page */
if (record->xl_info & XLR_BKP_BLOCK(0))
lbuf = RestoreBackupBlock(lsn, record, 0, false, true);
else
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->leftsib,
&lbuf) == BLK_NEEDS_REDO)
{
lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
/*
* To retain the same physical order of the tuples that they had, we
* initialize a temporary empty page for the left page and add all the
* items to that in item number order. This mirrors how _bt_split()
* works. It's not strictly required to retain the same physical
* order, as long as the items are in the correct item number order,
* but it helps debugging. See also _bt_restore_page(), which does
* the same for the right page.
*/
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
OffsetNumber off;
Page newlpage;
OffsetNumber leftoff;
if (BufferIsValid(lbuf))
newlpage = PageGetTempPageCopySpecial(lpage);
/* Set high key */
leftoff = P_HIKEY;
if (PageAddItem(newlpage, left_hikey, left_hikeysz,
P_HIKEY, false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add high key to left page after split");
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
{
/*
* To retain the same physical order of the tuples that they had,
* we initialize a temporary empty page for the left page and add
* all the items to that in item number order. This mirrors how
* _bt_split() works. It's not strictly required to retain the
* same physical order, as long as the items are in the correct
* item number order, but it helps debugging. See also
* _bt_restore_page(), which does the same for the right page.
*/
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
ItemId itemid;
Size itemsz;
Item item;
if (lsn > PageGetLSN(lpage))
/* add the new item if it was inserted on left page */
if (onleft && off == newitemoff)
{
OffsetNumber off;
Page newlpage;
OffsetNumber leftoff;
newlpage = PageGetTempPageCopySpecial(lpage);
/* Set high key */
leftoff = P_HIKEY;
if (PageAddItem(newlpage, left_hikey, left_hikeysz,
P_HIKEY, false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add high key to left page after split");
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
{
ItemId itemid;
Size itemsz;
Item item;
/* add the new item if it was inserted on left page */
if (onleft && off == newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
itemid = PageGetItemId(lpage, off);
itemsz = ItemIdGetLength(itemid);
item = PageGetItem(lpage, itemid);
if (PageAddItem(newlpage, item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
/* cope with possibility that newitem goes at the end */
if (onleft && off == newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(newlpage, lpage);
/* Fix opaque fields */
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
lopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = xlrec->rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf);
}
itemid = PageGetItemId(lpage, off);
itemsz = ItemIdGetLength(itemid);
item = PageGetItem(lpage, itemid);
if (PageAddItem(newlpage, item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
/* cope with possibility that newitem goes at the end */
if (onleft && off == newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(newlpage, lpage);
/* Fix opaque fields */
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
lopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = xlrec->rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf);
}
/* We no longer need the buffers */
@ -443,31 +415,21 @@ btree_xlog_split(bool onleft, bool isroot,
* whether this was a leaf or internal page.
*/
int rnext_index = isleaf ? 1 : 2;
Buffer buffer;
if (record->xl_info & XLR_BKP_BLOCK(rnext_index))
(void) RestoreBackupBlock(lsn, record, rnext_index, false, false);
else
if (XLogReadBufferForRedo(lsn, record, rnext_index, xlrec->node,
xlrec->rnext, &buffer) == BLK_NEEDS_REDO)
{
Buffer buffer;
Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
pageop->btpo_prev = xlrec->rightsib;
if (BufferIsValid(buffer))
{
Page page = (Page) BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
{
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = xlrec->rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
}
@ -529,54 +491,41 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
}
}
/*
* If we have a full-page image, restore it (using a cleanup lock) and
* we're done.
*/
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, true, false);
return;
}
/*
* Like in btvacuumpage(), we need to take a cleanup lock on every leaf
* page. See nbtree/README for details.
*/
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
if (!BufferIsValid(buffer))
return;
LockBufferForCleanup(buffer);
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
if (XLogReadBufferForRedoExtended(lsn, record, 0,
xlrec->node, MAIN_FORKNUM, xlrec->block,
RBM_NORMAL, true, &buffer)
== BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
if (record->xl_len > SizeOfBtreeVacuum)
{
OffsetNumber *unused;
OffsetNumber *unend;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments
* in _bt_delitems_vacuum().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
return;
}
if (record->xl_len > SizeOfBtreeVacuum)
{
OffsetNumber *unused;
OffsetNumber *unend;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments in
* _bt_delitems_vacuum().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
/*
@ -752,47 +701,36 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
}
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
/*
* We don't need to take a cleanup lock to apply these changes. See
* nbtree/README for details.
*/
buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block,
&buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
if (record->xl_len > SizeOfBtreeDelete)
{
OffsetNumber *unused;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
PageIndexMultiDelete(page, unused, xlrec->nitems);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments
* in _bt_delitems_delete().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
return;
}
if (record->xl_len > SizeOfBtreeDelete)
{
OffsetNumber *unused;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
PageIndexMultiDelete(page, unused, xlrec->nitems);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments in
* _bt_delitems_delete().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
@ -816,42 +754,36 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
/* parent page */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, parent,
&buffer) == BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->target.node, parent, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (lsn > PageGetLSN(page))
{
OffsetNumber poffset;
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
BlockNumber rightsib;
OffsetNumber poffset;
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
BlockNumber rightsib;
poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Rewrite the leaf page as a halfdead page */
buffer = XLogReadBuffer(xlrec->target.node, xlrec->leafblk, true);
@ -911,56 +843,34 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
/* Fix left-link of right sibling */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, rightsib, &buffer)
== BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->node, rightsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
{
UnlockReleaseBuffer(buffer);
}
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Fix right-link of left sibling, if any */
if (record->xl_info & XLR_BKP_BLOCK(1))
(void) RestoreBackupBlock(lsn, record, 1, false, false);
else
if (leftsib != P_NONE)
{
if (leftsib != P_NONE)
if (XLogReadBufferForRedo(lsn, record, 1, xlrec->node, leftsib, &buffer)
== BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->node, leftsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
{
UnlockReleaseBuffer(buffer);
}
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/* Rewrite target page as empty deleted page */
@ -1071,10 +981,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
/* Clear the incomplete-split flag in left child */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
_bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
_bt_clear_incomplete_split(lsn, record, 0, xlrec->node, cblkno);
}
PageSetLSN(page, lsn);

File diff suppressed because it is too large Load Diff

View File

@ -500,33 +500,28 @@ incrementally update the page, the rdata array *must* mention the buffer
ID at least once; otherwise there is no defense against torn-page problems.
The standard replay-routine pattern for this case is
if (record->xl_info & XLR_BKP_BLOCK(N))
if (XLogReadBufferForRedo(lsn, record, N, rnode, blkno, &buffer) == BLK_NEEDS_REDO)
{
/* apply the change from the full-page image */
(void) RestoreBackupBlock(lsn, record, N, false, false);
return;
}
page = (Page) BufferGetPage(buffer);
buffer = XLogReadBuffer(rnode, blkno, false);
if (!BufferIsValid(buffer))
{
/* page has been deleted, so we need do nothing */
return;
}
page = (Page) BufferGetPage(buffer);
... apply the change ...
if (XLByteLE(lsn, PageGetLSN(page)))
{
/* changes are already applied */
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
return;
}
... apply the change ...
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
XLogReadBufferForRedo reads the page from disk, and checks what action needs to
be taken to the page. If the XLR_BKP_BLOCK(N) flag is set, it restores the
full page image and returns BLK_RESTORED. If there is no full page image, but
page cannot be found or if the change has already been replayed (i.e. the
page's LSN >= the record we're replaying), it returns BLK_NOTFOUND or BLK_DONE,
respectively. Usually, the redo routine only needs to pay attention to the
BLK_NEEDS_REDO return code, which means that the routine should apply the
incremental change. In any case, the caller is responsible for unlocking and
releasing the buffer. Note that XLogReadBufferForRedo returns the buffer
locked even if no redo is required, unless the page does not exist.
As noted above, for a multi-page update you need to be able to determine
which XLR_BKP_BLOCK(N) flag applies to each page. If a WAL record reflects
@ -539,31 +534,8 @@ per the above discussion, fully-rewritable buffers shouldn't be mentioned in
When replaying a WAL record that describes changes on multiple pages, you
must be careful to lock the pages properly to prevent concurrent Hot Standby
queries from seeing an inconsistent state. If this requires that two
or more buffer locks be held concurrently, the coding pattern shown above
is too simplistic, since it assumes the routine can exit as soon as it's
known the current page requires no modification. Instead, you might have
something like
if (record->xl_info & XLR_BKP_BLOCK(0))
{
/* apply the change from the full-page image */
buffer0 = RestoreBackupBlock(lsn, record, 0, false, true);
}
else
{
buffer0 = XLogReadBuffer(rnode, blkno, false);
if (BufferIsValid(buffer0))
{
... apply the change if not already done ...
MarkBufferDirty(buffer0);
}
}
... similarly apply the changes for remaining pages ...
/* and now we can release the lock on the first page */
if (BufferIsValid(buffer0))
UnlockReleaseBuffer(buffer0);
or more buffer locks be held concurrently, you must lock the pages in
appropriate order, and not release the locks until all the changes are done.
Note that we must only use PageSetLSN/PageGetLSN() when we know the action
is serialised. Only Startup process may modify data blocks during recovery,

View File

@ -242,6 +242,87 @@ XLogCheckInvalidPages(void)
invalid_page_tab = NULL;
}
/*
* XLogReadBufferForRedo
* Read a page during XLOG replay
*
* Reads a block referenced by a WAL record into shared buffer cache, and
* determines what needs to be done to redo the changes to it. If the WAL
* record includes a full-page image of the page, it is restored.
*
* 'lsn' is the LSN of the record being replayed. It is compared with the
* page's LSN to determine if the record has already been replayed.
* 'rnode' and 'blkno' point to the block being replayed (main fork number
* is implied, use XLogReadBufferForRedoExtended for other forks).
* 'block_index' identifies the backup block in the record for the page.
*
* Returns one of the following:
*
* BLK_NEEDS_REDO - changes from the WAL record need to be applied
* BLK_DONE - block doesn't need replaying
* BLK_RESTORED - block was restored from a full-page image included in
* the record
* BLK_NOTFOUND - block was not found (because it was truncated away by
* an operation later in the WAL stream)
*
* On return, the buffer is locked in exclusive-mode, and returned in *buf.
* Note that the buffer is locked and returned even if it doesn't need
* replaying. (Getting the buffer lock is not really necessary during
* single-process crash recovery, but some subroutines such as MarkBufferDirty
* will complain if we don't have the lock. In hot standby mode it's
* definitely necessary.)
*/
XLogRedoAction
XLogReadBufferForRedo(XLogRecPtr lsn, XLogRecord *record, int block_index,
RelFileNode rnode, BlockNumber blkno,
Buffer *buf)
{
return XLogReadBufferForRedoExtended(lsn, record, block_index,
rnode, MAIN_FORKNUM, blkno,
RBM_NORMAL, false, buf);
}
/*
* XLogReadBufferForRedoExtended
* Like XLogReadBufferForRedo, but with extra options.
*
* If mode is RBM_ZERO or RBM_ZERO_ON_ERROR, if the page doesn't exist, the
* relation is extended with all-zeroes pages up to the referenced block
* number. In RBM_ZERO mode, the return value is always BLK_NEEDS_REDO.
*
* If 'get_cleanup_lock' is true, a "cleanup lock" is acquired on the buffer
* using LockBufferForCleanup(), instead of a regular exclusive lock.
*/
XLogRedoAction
XLogReadBufferForRedoExtended(XLogRecPtr lsn, XLogRecord *record,
int block_index, RelFileNode rnode,
ForkNumber forkno, BlockNumber blkno,
ReadBufferMode mode, bool get_cleanup_lock,
Buffer *buf)
{
if (record->xl_info & XLR_BKP_BLOCK(block_index))
{
*buf = RestoreBackupBlock(lsn, record, block_index,
get_cleanup_lock, true);
return BLK_RESTORED;
}
else
{
*buf = XLogReadBufferExtended(rnode, forkno, blkno, mode);
if (BufferIsValid(*buf))
{
LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
if (lsn <= PageGetLSN(BufferGetPage(*buf)))
return BLK_DONE;
else
return BLK_NEEDS_REDO;
}
else
return BLK_NOTFOUND;
}
}
/*
* XLogReadBuffer
* Read a page during XLOG replay.

View File

@ -1,7 +1,7 @@
/*
* xlogutils.h
*
* PostgreSQL transaction log manager utility routines
* Utilities for replaying WAL records.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@ -11,6 +11,7 @@
#ifndef XLOG_UTILS_H
#define XLOG_UTILS_H
#include "access/xlog.h"
#include "storage/bufmgr.h"
@ -22,6 +23,26 @@ extern void XLogDropDatabase(Oid dbid);
extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
BlockNumber nblocks);
/* Result codes for XLogReadBufferForRedo[Extended] */
typedef enum
{
BLK_NEEDS_REDO, /* changes from WAL record need to be applied */
BLK_DONE, /* block is already up-to-date */
BLK_RESTORED, /* block was restored from a full-page image */
BLK_NOTFOUND /* block was not found (and hence does not need to be
* replayed) */
} XLogRedoAction;
extern XLogRedoAction XLogReadBufferForRedo(XLogRecPtr lsn, XLogRecord *record,
int block_index, RelFileNode rnode, BlockNumber blkno,
Buffer *buf);
extern XLogRedoAction XLogReadBufferForRedoExtended(XLogRecPtr lsn,
XLogRecord *record, int block_index,
RelFileNode rnode, ForkNumber forkno,
BlockNumber blkno,
ReadBufferMode mode, bool get_cleanup_lock,
Buffer *buf);
extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
BlockNumber blkno, ReadBufferMode mode);