postgresql/src/backend/access/gist/gistxlog.c

849 lines
24 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* gistxlog.c
* WAL replay logic for GiST.
*
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.8 2005/09/22 18:49:45 tgl Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "access/gistscan.h"
#include "access/heapam.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "utils/memutils.h"
typedef struct {
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
OffsetNumber *todelete;
} EntryUpdateRecord;
typedef struct {
gistxlogPage *header;
IndexTuple *itup;
} NewPage;
typedef struct {
gistxlogPageSplit *data;
NewPage *page;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
typedef struct gistIncompleteInsert {
RelFileNode node;
BlockNumber origblkno; /* for splits */
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
XLogRecPtr lsn;
BlockNumber *path;
int pathlen;
} gistIncompleteInsert;
MemoryContext opCtx;
MemoryContext insertCtx;
static List *incomplete_inserts;
#define ItemPointerEQ( a, b ) \
( \
ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
)
static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
ninsert->node = node;
ninsert->key = key;
ninsert->lsn = lsn;
if ( lenblk && blkno ) {
ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
ninsert->origblkno = *blkno;
} else {
int i;
Assert( xlinfo );
ninsert->lenblk = xlinfo->data->npage;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
for(i=0;i<ninsert->lenblk;i++)
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
ninsert->origblkno = xlinfo->data->origblkno;
}
Assert( ninsert->lenblk>0 );
incomplete_inserts = lappend(incomplete_inserts, ninsert);
MemoryContextSwitchTo(oldCxt);
}
static void
forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
ListCell *l;
foreach(l, incomplete_inserts) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
/* found */
pfree( insert->blkno );
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
pfree( insert );
break;
}
}
}
static void
decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int i=0, addpath=0;
decoded->data = (gistxlogEntryUpdate*)begin;
if ( decoded->data->ntodelete ) {
decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
addpath = MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
} else
decoded->todelete = NULL;
decoded->len=0;
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
while( ptr - begin < record->xl_len ) {
decoded->len++;
ptr += IndexTupleSize( (IndexTuple)ptr );
}
decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len );
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
while( ptr - begin < record->xl_len ) {
decoded->itup[i] = (IndexTuple)ptr;
ptr += IndexTupleSize( decoded->itup[i] );
i++;
}
}
/*
* redo any page update (except page split)
*/
static void
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
EntryUpdateRecord xlrec;
Relation reln;
Buffer buffer;
Page page;
decodeEntryUpdateRecord( &xlrec, record );
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", xlrec.data->blkno);
page = (Page) BufferGetPage(buffer);
if ( isnewroot ) {
if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
} else {
if ( PageIsNew((PageHeader) page) )
elog(PANIC, "uninitialized page %u", xlrec.data->blkno);
if (XLByteLE(lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
}
if ( xlrec.data->isemptypage ) {
while( !PageIsEmpty(page) )
PageIndexTupleDelete( page, FirstOffsetNumber );
if ( xlrec.data->blkno == GIST_ROOT_BLKNO )
GistPageSetLeaf( page );
else
GistPageSetDeleted( page );
} else {
if ( isnewroot )
GISTInitBuffer(buffer, 0);
else if ( xlrec.data->ntodelete ) {
int i;
for(i=0; i < xlrec.data->ntodelete ; i++)
PageIndexTupleDelete(page, xlrec.todelete[i]);
if ( GistPageIsLeaf(page) )
GistMarkTuplesDeleted(page);
}
/* add tuples */
if ( xlrec.len > 0 )
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
/* special case: leafpage, nothing to insert, nothing to delete, then
vacuum marks page */
if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 )
GistClearTuplesDeleted(page);
}
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
if ( incomplete_inserts != NIL )
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
&(xlrec.data->blkno), 1,
NULL);
}
}
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int j,i=0;
decoded->data = (gistxlogPageSplit*)begin;
decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
ptr=begin+sizeof( gistxlogPageSplit );
for(i=0;i<decoded->data->npage;i++) {
Assert( ptr - begin < record->xl_len );
decoded->page[i].header = (gistxlogPage*)ptr;
ptr += sizeof(gistxlogPage);
decoded->page[i].itup = (IndexTuple*)
palloc( sizeof(IndexTuple) * decoded->page[i].header->num );
j=0;
while(j<decoded->page[i].header->num) {
Assert( ptr - begin < record->xl_len );
decoded->page[i].itup[j] = (IndexTuple)ptr;
ptr += IndexTupleSize((IndexTuple)ptr);
j++;
}
}
}
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
PageSplitRecord xlrec;
Relation reln;
Buffer buffer;
Page page;
int i;
int flags=0;
decodePageSplitRecord( &xlrec, record );
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
/* first of all wee need get F_LEAF flag from original page */
buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", xlrec.data->origblkno);
page = (Page) BufferGetPage(buffer);
if ( PageIsNew((PageHeader) page) )
elog(PANIC, "uninitialized page %u", xlrec.data->origblkno);
flags = ( GistPageIsLeaf(page) ) ? F_LEAF : 0;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
/* loop around all pages */
for(i=0;i<xlrec.data->npage;i++) {
NewPage *newpage = xlrec.page + i;
bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
buffer = XLogReadBuffer( !isorigpage, reln, newpage->header->blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", newpage->header->blkno);
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
continue;
}
/* ok, clear buffer */
GISTInitBuffer(buffer, flags);
/* and fill it */
gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
if ( incomplete_inserts != NIL )
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
&xlrec);
}
}
static void
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
RelFileNode *node = (RelFileNode*)XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
reln = XLogOpenRelation(*node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO);
if (!BufferIsValid(buffer))
elog(PANIC, "root block unfound");
page = (Page) BufferGetPage(buffer);
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
GISTInitBuffer(buffer, F_LEAF);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
static void
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
gistxlogInsertComplete *xlrec;
xlrec = (gistxlogInsertComplete*)begin;
ptr = begin + sizeof( gistxlogInsertComplete );
while( ptr - begin < record->xl_len ) {
Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) );
forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) );
ptr += sizeof(ItemPointerData);
}
}
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info) {
case XLOG_GIST_ENTRY_UPDATE:
case XLOG_GIST_ENTRY_DELETE:
gistRedoEntryUpdateRecord(lsn, record,false);
break;
case XLOG_GIST_NEW_ROOT:
gistRedoEntryUpdateRecord(lsn, record,true);
break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
case XLOG_GIST_INSERT_COMPLETE:
gistRedoCompleteInsert(lsn, record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
static void
out_target(char *buf, RelFileNode node, ItemPointerData key)
{
sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
node.spcNode, node.dbNode, node.relNode,
ItemPointerGetBlockNumber(&key),
ItemPointerGetOffsetNumber(&key));
}
static void
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
out_target(buf, xlrec->node, xlrec->key);
sprintf(buf + strlen(buf), "; block number %u",
xlrec->blkno);
}
static void
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
strcat(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
sprintf(buf + strlen(buf), "; block number %u splits to %d pages",
xlrec->origblkno, xlrec->npage);
}
void
gist_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
switch (info) {
case XLOG_GIST_ENTRY_UPDATE:
strcat(buf, "entry_update: ");
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
break;
case XLOG_GIST_ENTRY_DELETE:
strcat(buf, "entry_delete: ");
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
break;
case XLOG_GIST_NEW_ROOT:
strcat(buf, "new_root: ");
out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key);
break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec);
break;
case XLOG_GIST_CREATE_INDEX:
sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u",
((RelFileNode*)rec)->spcNode,
((RelFileNode*)rec)->dbNode,
((RelFileNode*)rec)->relNode);
break;
case XLOG_GIST_INSERT_COMPLETE:
sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u",
((gistxlogInsertComplete*)rec)->node.spcNode,
((gistxlogInsertComplete*)rec)->node.dbNode,
((gistxlogInsertComplete*)rec)->node.relNode);
break;
default:
elog(PANIC, "gist_desc: unknown op code %u", info);
}
}
IndexTuple
gist_form_invalid_tuple(BlockNumber blkno) {
/* we don't alloc space for null's bitmap, this is invalid tuple,
be carefull in read and write code */
Size size = IndexInfoFindDataOffset(0);
IndexTuple tuple=(IndexTuple)palloc0( size );
tuple->t_info |= size;
ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
GistTupleSetInvalid( tuple );
return tuple;
}
static Buffer
gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) {
Buffer buffer = XLogReadBuffer( false, r, blkno );
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", blkno);
if ( PageIsNew( (PageHeader)(BufferGetPage(buffer)) ) )
elog(PANIC, "uninitialized page %u", blkno);
return buffer;
}
static void
gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
if ( (top=gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL ) {
int i;
GISTInsertStack *ptr=top;
while(ptr) {
insert->pathlen++;
ptr = ptr->parent;
}
insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
i=0;
ptr = top;
while(ptr) {
insert->path[i] = ptr->blkno;
i++;
ptr = ptr->parent;
}
} else
elog(LOG, "lost parent for block %u", insert->origblkno);
}
2005-07-01 15:18:17 +02:00
/*
* Continue insert after crash. In normal situation, there isn't any incomplete
* inserts, but if it might be after crash, WAL may has not a record of completetion.
*
* Although stored LSN in gistIncompleteInsert is a LSN of child page,
* we can compare it with LSN of parent, because parent is always locked
* while we change child page (look at gistmakedeal). So if parent's LSN is
* lesser than stored lsn then changes in parent doesn't do yet.
*/
static void
gistContinueInsert(gistIncompleteInsert *insert) {
IndexTuple *itup;
int i, lenitup;
Relation index;
index = XLogOpenRelation(insert->node);
if (!RelationIsValid(index))
return;
/* needed vector itup never will be more than initial lenblkno+2,
because during this processing Indextuple can be only smaller */
lenitup = insert->lenblk;
itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
for(i=0;i<insert->lenblk;i++)
itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
if ( insert->origblkno==GIST_ROOT_BLKNO ) {
/*it was split root, so we should only make new root.
it can't be simple insert into root, look at call
pushIncompleteInsert in gistRedoPageSplitRecord */
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
Page page;
if (!BufferIsValid(buffer))
elog(PANIC, "root block unfound");
page = BufferGetPage(buffer);
if (XLByteLE(insert->lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
GISTInitBuffer(buffer, 0);
page = BufferGetPage(buffer);
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
PageSetLSN(page, insert->lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
} else {
Buffer *buffers;
Page *pages;
int numbuffer;
/* construct path */
gixtxlogFindPath( index, insert );
Assert( insert->pathlen > 0 );
buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
for(i=0;i<insert->pathlen;i++) {
int j, k, pituplen=0, childfound=0;
numbuffer=1;
buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]);
if (!BufferIsValid(buffers[numbuffer-1]))
elog(PANIC, "block %u unfound", insert->path[i]);
pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] );
if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
elog(PANIC, "uninitialized page %u", insert->path[i]);
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer-1]))) {
LockBuffer(buffers[numbuffer-1], BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffers[numbuffer-1]);
return;
}
pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
/* remove old IndexTuples */
for(j=0;j<pituplen && childfound<lenitup;j++) {
BlockNumber blkno;
ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber);
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid);
blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) );
for(k=0;k<lenitup;k++)
if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber);
j--; pituplen--;
childfound++;
break;
}
}
if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) {
/* no space left on page, so we should split */
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
if (!BufferIsValid(buffers[numbuffer]))
elog(PANIC, "could not obtain new block");
GISTInitBuffer(buffers[numbuffer], 0);
pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber );
numbuffer++;
if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
IndexTuple *parentitup;
/* we split root, just copy tuples from old root to new page */
parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
/* sanity check */
if ( i+1 != insert->pathlen )
elog(PANIC,"unexpected pathlen in index \"%s\"",
RelationGetRelationName( index ));
/* fill new page */
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
if (!BufferIsValid(buffers[numbuffer]))
elog(PANIC, "could not obtain new block");
GISTInitBuffer(buffers[numbuffer], 0);
pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
numbuffer++;
/* fill root page */
GISTInitBuffer(buffers[0], 0);
for(j=1;j<numbuffer;j++) {
IndexTuple tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
if (PageAddItem(pages[0],
(Item)tuple,
IndexTupleSize( tuple ),
(OffsetNumber)j,
LP_USED) == InvalidOffsetNumber)
elog(PANIC, "failed to add item to index page in \"%s\"",
RelationGetRelationName( index ));
}
}
} else
gistfillbuffer( index, pages[numbuffer-1], itup, lenitup, InvalidOffsetNumber);
lenitup=numbuffer;
for(j=0;j<numbuffer;j++) {
itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
PageSetLSN(pages[j], insert->lsn);
PageSetTLI(pages[j], ThisTimeLineID);
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
WriteBuffer( buffers[j] );
}
}
}
ereport(LOG,
(errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
errdetail("Incomplete insertion detected during crash replay.")));
}
void
gist_xlog_startup(void) {
incomplete_inserts=NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"GiST recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void) {
ListCell *l;
List *reverse=NIL;
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
/* we should call gistContinueInsert in reverse order */
foreach(l, incomplete_inserts)
reverse = lappend(reverse, lfirst(l));
MemoryContextSwitchTo(opCtx);
foreach(l, reverse) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
gistContinueInsert(insert);
MemoryContextReset(opCtx);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
}
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno,
ItemPointer key, SplitedPageLayout *dist ) {
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
int npage = 0, cur=1;
ptr=dist;
while( ptr ) {
npage++;
ptr=ptr->next;
}
rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + 2));
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->npage = (uint16)npage;
if ( key )
xlrec->key = *key;
else
ItemPointerSetInvalid( &(xlrec->key) );
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) xlrec;
rdata[0].len = sizeof( gistxlogPageSplit );
rdata[0].next = NULL;
ptr=dist;
while(ptr) {
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)&(ptr->block);
rdata[cur].len = sizeof(gistxlogPage);
rdata[cur-1].next = &(rdata[cur]);
cur++;
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)(ptr->list);
rdata[cur].len = ptr->lenlist;
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].next=NULL;
cur++;
ptr=ptr->next;
}
return rdata;
}
XLogRecData *
formUpdateRdata(RelFileNode node, BlockNumber blkno,
OffsetNumber *todelete, int ntodelete, bool emptypage,
IndexTuple *itup, int ituplen, ItemPointer key ) {
XLogRecData *rdata;
gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
xlrec->node = node;
xlrec->blkno = blkno;
if ( key )
xlrec->key = *key;
else
ItemPointerSetInvalid( &(xlrec->key) );
if ( emptypage ) {
xlrec->isemptypage = true;
xlrec->ntodelete = 0;
rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
rdata->buffer = InvalidBuffer;
rdata->data = (char*)xlrec;
rdata->len = sizeof(gistxlogEntryUpdate);
rdata->next = NULL;
} else {
int cur=1,i;
xlrec->isemptypage = false;
xlrec->ntodelete = ntodelete;
rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 2 + ituplen ) );
rdata->buffer = InvalidBuffer;
rdata->data = (char*)xlrec;
rdata->len = sizeof(gistxlogEntryUpdate);
rdata->next = NULL;
if ( ntodelete ) {
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)todelete;
rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
rdata[cur].next = NULL;
cur++;
}
/* new tuples */
for(i=0;i<ituplen;i++) {
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)(itup[i]);
rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].next = NULL;
rdata[cur-1].next = &(rdata[cur]);
cur++;
}
}
return rdata;
}
XLogRecPtr
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
gistxlogInsertComplete xlrec;
XLogRecData rdata[2];
XLogRecPtr recptr;
Assert(len>0);
xlrec.node = node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof( gistxlogInsertComplete );
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) keys;
rdata[1].len = sizeof( ItemPointerData ) * len;
rdata[1].next = NULL;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
END_CRIT_SECTION();
return recptr;
}