postgresql/src/backend/access/gist/gistxlog.c

997 lines
25 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* gistxlog.c
* WAL replay logic for GiST.
*
*
2010-01-02 17:58:17 +01:00
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
2010-01-02 17:58:17 +01:00
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.35 2010/01/02 16:57:34 momjian Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gist_private.h"
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
typedef struct
{
gistxlogPageUpdate *data;
int len;
IndexTuple *itup;
OffsetNumber *todelete;
} PageUpdateRecord;
typedef struct
{
gistxlogPage *header;
IndexTuple *itup;
} NewPage;
typedef struct
{
gistxlogPageSplit *data;
NewPage *page;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
typedef struct gistIncompleteInsert
{
RelFileNode node;
BlockNumber origblkno; /* for splits */
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
XLogRecPtr lsn;
BlockNumber *path;
int pathlen;
} gistIncompleteInsert;
static MemoryContext opCtx; /* working memory for operations */
2006-10-04 02:30:14 +02:00
static MemoryContext insertCtx; /* holds incomplete_inserts list */
static List *incomplete_inserts;
2006-10-04 02:30:14 +02:00
#define ItemPointerEQ(a, b) \
( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
PageSplitRecord *xlinfo /* to extract blkno info */ )
{
MemoryContext oldCxt;
gistIncompleteInsert *ninsert;
2006-10-04 02:30:14 +02:00
if (!ItemPointerIsValid(&key))
/*
* if key is null then we should not store insertion as incomplete,
* because it's a vacuum operation..
*/
return;
oldCxt = MemoryContextSwitchTo(insertCtx);
ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
ninsert->node = node;
ninsert->key = key;
ninsert->lsn = lsn;
if (lenblk && blkno)
{
ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
ninsert->origblkno = *blkno;
}
else
{
int i;
Assert(xlinfo);
ninsert->lenblk = xlinfo->data->npage;
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
for (i = 0; i < ninsert->lenblk; i++)
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
ninsert->origblkno = xlinfo->data->origblkno;
}
Assert(ninsert->lenblk > 0);
/*
* Stick the new incomplete insert onto the front of the list, not the
2006-10-04 02:30:14 +02:00
* back. This is so that gist_xlog_cleanup will process incompletions in
* last-in-first-out order.
*/
incomplete_inserts = lcons(ninsert, incomplete_inserts);
MemoryContextSwitchTo(oldCxt);
}
static void
forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
{
ListCell *l;
2006-10-04 02:30:14 +02:00
if (!ItemPointerIsValid(&key))
return;
2006-10-04 02:30:14 +02:00
if (incomplete_inserts == NIL)
return;
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
{
/* found */
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
pfree(insert->blkno);
pfree(insert);
break;
}
}
}
static void
decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
int i = 0,
addpath = 0;
decoded->data = (gistxlogPageUpdate *) begin;
if (decoded->data->ntodelete)
{
decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
}
else
decoded->todelete = NULL;
decoded->len = 0;
ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
while (ptr - begin < record->xl_len)
{
decoded->len++;
ptr += IndexTupleSize((IndexTuple) ptr);
}
decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
while (ptr - begin < record->xl_len)
{
decoded->itup[i] = (IndexTuple) ptr;
ptr += IndexTupleSize(decoded->itup[i]);
i++;
}
}
/*
* redo any page update (except page split)
*/
static void
gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
{
gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
PageUpdateRecord xlrec;
Buffer buffer;
Page page;
/* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
forgetIncompleteInsert(xldata->node, xldata->key);
if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
/* operation with root always finalizes insertion */
pushIncompleteInsert(xldata->node, lsn, xldata->key,
&(xldata->blkno), 1,
NULL);
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
decodePageUpdateRecord(&xlrec, record);
buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockReleaseBuffer(buffer);
return;
}
if (isnewroot)
GISTInitBuffer(buffer, 0);
else if (xlrec.data->ntodelete)
{
int i;
for (i = 0; i < xlrec.data->ntodelete; i++)
PageIndexTupleDelete(page, xlrec.todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
if (xlrec.len > 0)
gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
/*
* special case: leafpage, nothing to insert, nothing to delete, then
* vacuum marks page
*/
if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
GistClearTuplesDeleted(page);
2006-10-04 02:30:14 +02:00
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
/*
* all links on non-leaf root page was deleted by vacuum full, so root
* page becomes a leaf
*/
GistPageSetLeaf(page);
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
{
gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
Buffer buffer;
Page page;
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
GistPageSetDeleted(page);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
int j,
i = 0;
decoded->data = (gistxlogPageSplit *) begin;
decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
ptr = begin + sizeof(gistxlogPageSplit);
for (i = 0; i < decoded->data->npage; i++)
{
Assert(ptr - begin < record->xl_len);
decoded->page[i].header = (gistxlogPage *) ptr;
ptr += sizeof(gistxlogPage);
decoded->page[i].itup = (IndexTuple *)
palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
j = 0;
while (j < decoded->page[i].header->num)
{
Assert(ptr - begin < record->xl_len);
decoded->page[i].itup[j] = (IndexTuple) ptr;
ptr += IndexTupleSize((IndexTuple) ptr);
j++;
}
}
}
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
PageSplitRecord xlrec;
Buffer buffer;
Page page;
int i;
int flags;
decodePageSplitRecord(&xlrec, record);
flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
/* ok, clear buffer */
GISTInitBuffer(buffer, flags);
/* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
&xlrec);
}
static void
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
{
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
Buffer buffer;
Page page;
buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
GISTInitBuffer(buffer, F_LEAF);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
gistxlogInsertComplete *xlrec;
xlrec = (gistxlogInsertComplete *) begin;
ptr = begin + sizeof(gistxlogInsertComplete);
while (ptr - begin < record->xl_len)
{
Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
ptr += sizeof(ItemPointerData);
}
}
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
MemoryContext oldCxt;
Allow read only connections during recovery, known as Hot Standby. Enabled by recovery_connections = on (default) and forcing archive recovery using a recovery.conf. Recovery processing now emulates the original transactions as they are replayed, providing full locking and MVCC behaviour for read only queries. Recovery must enter consistent state before connections are allowed, so there is a delay, typically short, before connections succeed. Replay of recovering transactions can conflict and in some cases deadlock with queries during recovery; these result in query cancellation after max_standby_delay seconds have expired. Infrastructure changes have minor effects on normal running, though introduce four new types of WAL record. New test mode "make standbycheck" allows regression tests of static command behaviour on a standby server while in recovery. Typical and extreme dynamic behaviours have been checked via code inspection and manual testing. Few port specific behaviours have been utilised, though primary testing has been on Linux only so far. This commit is the basic patch. Additional changes will follow in this release to enhance some aspects of behaviour, notably improved handling of conflicts, deadlock detection and query cancellation. Changes to VACUUM FULL are also required. Simon Riggs, with significant and lengthy review by Heikki Linnakangas, including streamlined redesign of snapshot creation and two-phase commit. Important contributions from Florian Pflug, Mark Kirkwood, Merlin Moncure, Greg Stark, Gianni Ciolli, Gabriele Bartolini, Hannu Krosing, Robert Haas, Tatsuo Ishii, Hiroyuki Yamada plus support and feedback from many other community members.
2009-12-19 02:32:45 +01:00
/*
* GIST indexes do not require any conflict processing. NB: If we ever
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record, false);
break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
case XLOG_GIST_NEW_ROOT:
gistRedoPageUpdateRecord(lsn, record, true);
break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
case XLOG_GIST_INSERT_COMPLETE:
gistRedoCompleteInsert(lsn, record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
static void
out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
{
appendStringInfo(buf, "rel %u/%u/%u",
2006-10-04 02:30:14 +02:00
node.spcNode, node.dbNode, node.relNode);
if (ItemPointerIsValid(&key))
appendStringInfo(buf, "; tid %u/%u",
2006-10-04 02:30:14 +02:00
ItemPointerGetBlockNumber(&key),
ItemPointerGetOffsetNumber(&key));
}
static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
out_target(buf, xlrec->node, xlrec->key);
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
static void
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
{
appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
2006-10-04 02:30:14 +02:00
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
xlrec->blkno);
}
static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
appendStringInfo(buf, "; block number %u splits to %d pages",
2006-10-04 02:30:14 +02:00
xlrec->origblkno, xlrec->npage);
}
void
gist_desc(StringInfo buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
appendStringInfo(buf, "page_update: ");
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break;
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
case XLOG_GIST_NEW_ROOT:
appendStringInfo(buf, "new_root: ");
out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break;
case XLOG_GIST_CREATE_INDEX:
appendStringInfo(buf, "create_index: rel %u/%u/%u",
2006-10-04 02:30:14 +02:00
((RelFileNode *) rec)->spcNode,
((RelFileNode *) rec)->dbNode,
((RelFileNode *) rec)->relNode);
break;
case XLOG_GIST_INSERT_COMPLETE:
appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
2006-10-04 02:30:14 +02:00
((gistxlogInsertComplete *) rec)->node.spcNode,
((gistxlogInsertComplete *) rec)->node.dbNode,
((gistxlogInsertComplete *) rec)->node.relNode);
break;
default:
appendStringInfo(buf, "unknown gist op code %u", info);
break;
}
}
IndexTuple
gist_form_invalid_tuple(BlockNumber blkno)
{
/*
* we don't alloc space for null's bitmap, this is invalid tuple, be
* carefull in read and write code
*/
Size size = IndexInfoFindDataOffset(0);
IndexTuple tuple = (IndexTuple) palloc0(size);
tuple->t_info |= size;
ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
GistTupleSetInvalid(tuple);
return tuple;
}
static void
gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
{
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
if ((top = gistFindPath(index, insert->origblkno)) != NULL)
{
int i;
GISTInsertStack *ptr;
for (ptr = top; ptr; ptr = ptr->parent)
insert->pathlen++;
insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
i = 0;
for (ptr = top; ptr; ptr = ptr->parent)
insert->path[i++] = ptr->blkno;
}
else
elog(ERROR, "lost parent for block %u", insert->origblkno);
}
2006-10-04 02:30:14 +02:00
static SplitedPageLayout *
gistMakePageLayout(Buffer *buffers, int nbuffers)
{
SplitedPageLayout *res = NULL,
*resptr;
2006-10-04 02:30:14 +02:00
while (nbuffers-- > 0)
{
Page page = BufferGetPage(buffers[nbuffers]);
IndexTuple *vec;
int veclen;
2006-10-04 02:30:14 +02:00
resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
2006-10-04 02:30:14 +02:00
resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
resptr->block.num = PageGetMaxOffsetNumber(page);
2006-10-04 02:30:14 +02:00
vec = gistextractpage(page, &veclen);
resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
resptr->next = res;
res = resptr;
}
return res;
}
2005-07-01 15:18:17 +02:00
/*
* Continue insert after crash. In normal situations, there aren't any
* incomplete inserts, but if a crash occurs partway through an insertion
* sequence, we'll need to finish making the index valid at the end of WAL
* replay.
*
* Note that we assume the index is now in a valid state, except for the
* unfinished insertion. In particular it's safe to invoke gistFindPath();
* there shouldn't be any garbage pages for it to run into.
2006-10-04 02:30:14 +02:00
*
* To complete insert we can't use basic insertion algorithm because
* during insertion we can't call user-defined support functions of opclass.
* So, we insert 'invalid' tuples without real key and do it by separate algorithm.
* 'invalid' tuple should be updated by vacuum full.
*/
static void
gistContinueInsert(gistIncompleteInsert *insert)
{
IndexTuple *itup;
int i,
lenitup;
Relation index;
index = CreateFakeRelcacheEntry(insert->node);
/*
* needed vector itup never will be more than initial lenblkno+2, because
* during this processing Indextuple can be only smaller
*/
lenitup = insert->lenblk;
itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
for (i = 0; i < insert->lenblk; i++)
itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
/*
2006-10-04 02:30:14 +02:00
* any insertion of itup[] should make LOG message about
*/
if (insert->origblkno == GIST_ROOT_BLKNO)
{
/*
* it was split root, so we should only make new root. it can't be
* simple insert into root, we should replace all content of root.
*/
Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
gistnewroot(index, buffer, itup, lenitup, NULL);
UnlockReleaseBuffer(buffer);
}
else
{
Buffer *buffers;
Page *pages;
int numbuffer;
2006-10-04 02:30:14 +02:00
OffsetNumber *todelete;
/* construct path */
gistxlogFindPath(index, insert);
Assert(insert->pathlen > 0);
buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
for (i = 0; i < insert->pathlen; i++)
{
int j,
k,
pituplen = 0;
uint8 xlinfo;
2006-10-04 02:30:14 +02:00
XLogRecData *rdata;
XLogRecPtr recptr;
Buffer tempbuffer = InvalidBuffer;
int ntodelete = 0;
numbuffer = 1;
buffers[0] = ReadBuffer(index, insert->path[i]);
LockBuffer(buffers[0], GIST_EXCLUSIVE);
2006-10-04 02:30:14 +02:00
/*
* we check buffer, because we restored page earlier
*/
gistcheckpage(index, buffers[0]);
pages[0] = BufferGetPage(buffers[0]);
2006-10-04 02:30:14 +02:00
Assert(!GistPageIsLeaf(pages[0]));
pituplen = PageGetMaxOffsetNumber(pages[0]);
/* find remove old IndexTuples to remove */
for (j = 0; j < pituplen && ntodelete < lenitup; j++)
{
BlockNumber blkno;
ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
for (k = 0; k < lenitup; k++)
if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
{
todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
ntodelete++;
break;
}
}
2006-10-04 02:30:14 +02:00
if (ntodelete == 0)
elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
/*
2006-10-04 02:30:14 +02:00
* we check space with subtraction only first tuple to delete,
* hope, that wiil be enough space....
*/
if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
{
/* no space left on page, so we must split */
buffers[numbuffer] = ReadBuffer(index, P_NEW);
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
GISTInitBuffer(buffers[numbuffer], 0);
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
numbuffer++;
if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
{
2006-10-04 02:30:14 +02:00
Buffer tmp;
/*
* we split root, just copy content from root to new page
*/
/* sanity check */
if (i + 1 != insert->pathlen)
elog(PANIC, "unexpected pathlen in index \"%s\"",
RelationGetRelationName(index));
/* fill new page, root will be changed later */
tempbuffer = ReadBuffer(index, P_NEW);
LockBuffer(tempbuffer, GIST_EXCLUSIVE);
2006-10-04 02:30:14 +02:00
memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
/* swap buffers[0] (was root) and temp buffer */
tmp = buffers[0];
buffers[0] = tempbuffer;
2006-10-04 02:30:14 +02:00
tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO,
* it is still unchanged */
pages[0] = BufferGetPage(buffers[0]);
}
START_CRIT_SECTION();
2006-10-04 02:30:14 +02:00
for (j = 0; j < ntodelete; j++)
PageIndexTupleDelete(pages[0], todelete[j]);
xlinfo = XLOG_GIST_PAGE_SPLIT;
rdata = formSplitRdata(index->rd_node, insert->path[i],
2006-10-04 02:30:14 +02:00
false, &(insert->key),
gistMakePageLayout(buffers, numbuffer));
2006-10-04 02:30:14 +02:00
}
else
{
START_CRIT_SECTION();
2006-10-04 02:30:14 +02:00
for (j = 0; j < ntodelete; j++)
PageIndexTupleDelete(pages[0], todelete[j]);
gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
xlinfo = XLOG_GIST_PAGE_UPDATE;
2006-10-04 02:30:14 +02:00
rdata = formUpdateRdata(index->rd_node, buffers[0],
todelete, ntodelete,
itup, lenitup, &(insert->key));
}
2006-10-04 02:30:14 +02:00
/*
* use insert->key as mark for completion of insert (form*Rdata()
* above) for following possible replays
*/
/* write pages, we should mark it dirty befor XLogInsert() */
2006-10-04 02:30:14 +02:00
for (j = 0; j < numbuffer; j++)
{
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
MarkBufferDirty(buffers[j]);
}
recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
for (j = 0; j < numbuffer; j++)
{
PageSetLSN(pages[j], recptr);
PageSetTLI(pages[j], ThisTimeLineID);
}
END_CRIT_SECTION();
lenitup = numbuffer;
2006-10-04 02:30:14 +02:00
for (j = 0; j < numbuffer; j++)
{
itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
UnlockReleaseBuffer(buffers[j]);
}
2006-10-04 02:30:14 +02:00
if (tempbuffer != InvalidBuffer)
{
/*
* it was a root split, so fill it by new values
*/
gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
UnlockReleaseBuffer(tempbuffer);
}
}
}
FreeFakeRelcacheEntry(index);
ereport(LOG,
2006-10-04 02:30:14 +02:00
(errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
2006-10-04 02:30:14 +02:00
errdetail("Incomplete insertion detected during crash replay.")));
}
void
gist_xlog_startup(void)
{
incomplete_inserts = NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"GiST recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void)
{
ListCell *l;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
gistContinueInsert(insert);
MemoryContextReset(opCtx);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
}
bool
gist_safe_restartpoint(void)
{
if (incomplete_inserts)
return false;
return true;
}
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
ItemPointer key, SplitedPageLayout *dist)
{
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
int npage = 0,
cur = 1;
ptr = dist;
while (ptr)
{
npage++;
ptr = ptr->next;
}
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->origleaf = page_is_leaf;
xlrec->npage = (uint16) npage;
if (key)
xlrec->key = *key;
else
ItemPointerSetInvalid(&(xlrec->key));
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) xlrec;
rdata[0].len = sizeof(gistxlogPageSplit);
rdata[0].next = NULL;
ptr = dist;
while (ptr)
{
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) &(ptr->block);
rdata[cur].len = sizeof(gistxlogPage);
rdata[cur - 1].next = &(rdata[cur]);
cur++;
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) (ptr->list);
rdata[cur].len = ptr->lenlist;
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].next = NULL;
cur++;
ptr = ptr->next;
}
return rdata;
}
/*
* Construct the rdata array for an XLOG record describing a page update
* (deletion and/or insertion of tuples on a single index page).
*
* Note that both the todelete array and the tuples are marked as belonging
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
* to log the whole buffer contents instead. Also, we take care that there's
* at least one rdata item referencing the buffer, even when ntodelete and
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
*/
XLogRecData *
formUpdateRdata(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen, ItemPointer key)
{
XLogRecData *rdata;
gistxlogPageUpdate *xlrec;
int cur,
i;
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete;
if (key)
xlrec->key = *key;
else
ItemPointerSetInvalid(&(xlrec->key));
rdata[0].buffer = buffer;
rdata[0].buffer_std = true;
rdata[0].data = NULL;
rdata[0].len = 0;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) xlrec;
rdata[1].len = sizeof(gistxlogPageUpdate);
rdata[1].buffer = InvalidBuffer;
rdata[1].next = &(rdata[2]);
rdata[2].data = (char *) todelete;
rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
/* new tuples */
cur = 3;
for (i = 0; i < ituplen; i++)
{
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].data = (char *) (itup[i]);
rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].buffer = buffer;
rdata[cur].buffer_std = true;
rdata[cur].next = NULL;
cur++;
}
return rdata;
}
XLogRecPtr
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
{
gistxlogInsertComplete xlrec;
XLogRecData rdata[2];
XLogRecPtr recptr;
Assert(len > 0);
xlrec.node = node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(gistxlogInsertComplete);
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) keys;
rdata[1].len = sizeof(ItemPointerData) * len;
rdata[1].next = NULL;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
END_CRIT_SECTION();
return recptr;
}