/*------------------------------------------------------------------------- * * gistxlog.c * WAL replay logic for GiST. * * * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.10 2006/03/05 15:58:20 momjian Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/gist_private.h" #include "access/gistscan.h" #include "access/heapam.h" #include "catalog/index.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "utils/memutils.h" typedef struct { gistxlogEntryUpdate *data; int len; IndexTuple *itup; OffsetNumber *todelete; } EntryUpdateRecord; typedef struct { gistxlogPage *header; IndexTuple *itup; } NewPage; typedef struct { gistxlogPageSplit *data; NewPage *page; } PageSplitRecord; /* track for incomplete inserts, idea was taken from nbtxlog.c */ typedef struct gistIncompleteInsert { RelFileNode node; BlockNumber origblkno; /* for splits */ ItemPointerData key; int lenblk; BlockNumber *blkno; XLogRecPtr lsn; BlockNumber *path; int pathlen; } gistIncompleteInsert; MemoryContext opCtx; MemoryContext insertCtx; static List *incomplete_inserts; #define ItemPointerEQ( a, b ) \ ( \ ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ ) static void pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, BlockNumber *blkno, int lenblk, PageSplitRecord *xlinfo /* to extract blkno info */ ) { MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); ninsert->node = node; ninsert->key = key; ninsert->lsn = lsn; if (lenblk && blkno) { ninsert->lenblk = lenblk; ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk); ninsert->origblkno = *blkno; } else { int i; Assert(xlinfo); ninsert->lenblk = xlinfo->data->npage; ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); for (i = 0; i < ninsert->lenblk; i++) ninsert->blkno[i] = xlinfo->page[i].header->blkno; ninsert->origblkno = xlinfo->data->origblkno; } Assert(ninsert->lenblk > 0); incomplete_inserts = lappend(incomplete_inserts, ninsert); MemoryContextSwitchTo(oldCxt); } static void forgetIncompleteInsert(RelFileNode node, ItemPointerData key) { ListCell *l; foreach(l, incomplete_inserts) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) { /* found */ pfree(insert->blkno); incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); pfree(insert); break; } } } static void decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; int i = 0, addpath = 0; decoded->data = (gistxlogEntryUpdate *) begin; if (decoded->data->ntodelete) { decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath); addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); } else decoded->todelete = NULL; decoded->len = 0; ptr = begin + sizeof(gistxlogEntryUpdate) + addpath; while (ptr - begin < record->xl_len) { decoded->len++; ptr += IndexTupleSize((IndexTuple) ptr); } decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); ptr = begin + sizeof(gistxlogEntryUpdate) + addpath; while (ptr - begin < record->xl_len) { decoded->itup[i] = (IndexTuple) ptr; ptr += IndexTupleSize(decoded->itup[i]); i++; } } /* * redo any page update (except page split) */ static void gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { EntryUpdateRecord xlrec; Relation reln; Buffer buffer; Page page; decodeEntryUpdateRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); if (!RelationIsValid(reln)) return; buffer = XLogReadBuffer(false, reln, xlrec.data->blkno); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", xlrec.data->blkno); page = (Page) BufferGetPage(buffer); if (isnewroot) { if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return; } } else { if (PageIsNew((PageHeader) page)) elog(PANIC, "uninitialized page %u", xlrec.data->blkno); if (XLByteLE(lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return; } } if (xlrec.data->isemptypage) { while (!PageIsEmpty(page)) PageIndexTupleDelete(page, FirstOffsetNumber); if (xlrec.data->blkno == GIST_ROOT_BLKNO) GistPageSetLeaf(page); else GistPageSetDeleted(page); } else { if (isnewroot) GISTInitBuffer(buffer, 0); else if (xlrec.data->ntodelete) { int i; for (i = 0; i < xlrec.data->ntodelete; i++) PageIndexTupleDelete(page, xlrec.todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ if (xlrec.len > 0) gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber); /* * special case: leafpage, nothing to insert, nothing to delete, then * vacuum marks page */ if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) GistClearTuplesDeleted(page); } PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); if (ItemPointerIsValid(&(xlrec.data->key))) { if (incomplete_inserts != NIL) forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); if (!isnewroot && xlrec.data->blkno != GIST_ROOT_BLKNO) pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, &(xlrec.data->blkno), 1, NULL); } } static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; int j, i = 0; decoded->data = (gistxlogPageSplit *) begin; decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage); ptr = begin + sizeof(gistxlogPageSplit); for (i = 0; i < decoded->data->npage; i++) { Assert(ptr - begin < record->xl_len); decoded->page[i].header = (gistxlogPage *) ptr; ptr += sizeof(gistxlogPage); decoded->page[i].itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->page[i].header->num); j = 0; while (j < decoded->page[i].header->num) { Assert(ptr - begin < record->xl_len); decoded->page[i].itup[j] = (IndexTuple) ptr; ptr += IndexTupleSize((IndexTuple) ptr); j++; } } } static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { PageSplitRecord xlrec; Relation reln; Buffer buffer; Page page; int i; int flags = 0; decodePageSplitRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); if (!RelationIsValid(reln)) return; /* first of all wee need get F_LEAF flag from original page */ buffer = XLogReadBuffer(false, reln, xlrec.data->origblkno); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", xlrec.data->origblkno); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) elog(PANIC, "uninitialized page %u", xlrec.data->origblkno); flags = (GistPageIsLeaf(page)) ? F_LEAF : 0; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false; buffer = XLogReadBuffer(!isorigpage, reln, newpage->header->blkno); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", newpage->header->blkno); page = (Page) BufferGetPage(buffer); if (XLByteLE(lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); continue; } /* ok, clear buffer */ GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } if (ItemPointerIsValid(&(xlrec.data->key))) { if (incomplete_inserts != NIL) forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, NULL, 0, &xlrec); } } static void gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { RelFileNode *node = (RelFileNode *) XLogRecGetData(record); Relation reln; Buffer buffer; Page page; reln = XLogOpenRelation(*node); if (!RelationIsValid(reln)) return; buffer = XLogReadBuffer(true, reln, GIST_ROOT_BLKNO); if (!BufferIsValid(buffer)) elog(PANIC, "root block unfound"); page = (Page) BufferGetPage(buffer); if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return; } GISTInitBuffer(buffer, F_LEAF); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } static void gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; gistxlogInsertComplete *xlrec; xlrec = (gistxlogInsertComplete *) begin; ptr = begin + sizeof(gistxlogInsertComplete); while (ptr - begin < record->xl_len) { Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData)); forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr)); ptr += sizeof(ItemPointerData); } } void gist_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_ENTRY_UPDATE: case XLOG_GIST_ENTRY_DELETE: gistRedoEntryUpdateRecord(lsn, record, false); break; case XLOG_GIST_NEW_ROOT: gistRedoEntryUpdateRecord(lsn, record, true); break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; case XLOG_GIST_INSERT_COMPLETE: gistRedoCompleteInsert(lsn, record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } MemoryContextSwitchTo(oldCxt); MemoryContextReset(opCtx); } static void out_target(char *buf, RelFileNode node, ItemPointerData key) { sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u", node.spcNode, node.dbNode, node.relNode, ItemPointerGetBlockNumber(&key), ItemPointerGetOffsetNumber(&key)); } static void out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) { out_target(buf, xlrec->node, xlrec->key); sprintf(buf + strlen(buf), "; block number %u", xlrec->blkno); } static void out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) { strcat(buf, "page_split: "); out_target(buf, xlrec->node, xlrec->key); sprintf(buf + strlen(buf), "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } void gist_desc(char *buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; switch (info) { case XLOG_GIST_ENTRY_UPDATE: strcat(buf, "entry_update: "); out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec); break; case XLOG_GIST_ENTRY_DELETE: strcat(buf, "entry_delete: "); out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec); break; case XLOG_GIST_NEW_ROOT: strcat(buf, "new_root: "); out_target(buf, ((gistxlogEntryUpdate *) rec)->node, ((gistxlogEntryUpdate *) rec)->key); break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; case XLOG_GIST_CREATE_INDEX: sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u", ((RelFileNode *) rec)->spcNode, ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; case XLOG_GIST_INSERT_COMPLETE: sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u", ((gistxlogInsertComplete *) rec)->node.spcNode, ((gistxlogInsertComplete *) rec)->node.dbNode, ((gistxlogInsertComplete *) rec)->node.relNode); break; default: elog(PANIC, "gist_desc: unknown op code %u", info); } } IndexTuple gist_form_invalid_tuple(BlockNumber blkno) { /* * we don't alloc space for null's bitmap, this is invalid tuple, be * carefull in read and write code */ Size size = IndexInfoFindDataOffset(0); IndexTuple tuple = (IndexTuple) palloc0(size); tuple->t_info |= size; ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); GistTupleSetInvalid(tuple); return tuple; } static Buffer gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno) { Buffer buffer = XLogReadBuffer(false, r, blkno); if (!BufferIsValid(buffer)) elog(PANIC, "block %u unfound", blkno); if (PageIsNew((PageHeader) (BufferGetPage(buffer)))) elog(PANIC, "uninitialized page %u", blkno); return buffer; } static void gixtxlogFindPath(Relation index, gistIncompleteInsert *insert) { GISTInsertStack *top; insert->pathlen = 0; insert->path = NULL; if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL) { int i; GISTInsertStack *ptr = top; while (ptr) { insert->pathlen++; ptr = ptr->parent; } insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); i = 0; ptr = top; while (ptr) { insert->path[i] = ptr->blkno; i++; ptr = ptr->parent; } } else elog(LOG, "lost parent for block %u", insert->origblkno); } /* * Continue insert after crash. In normal situation, there isn't any incomplete * inserts, but if it might be after crash, WAL may has not a record of completetion. * * Although stored LSN in gistIncompleteInsert is a LSN of child page, * we can compare it with LSN of parent, because parent is always locked * while we change child page (look at gistmakedeal). So if parent's LSN is * lesser than stored lsn then changes in parent doesn't do yet. */ static void gistContinueInsert(gistIncompleteInsert *insert) { IndexTuple *itup; int i, lenitup; Relation index; index = XLogOpenRelation(insert->node); if (!RelationIsValid(index)) return; /* * needed vector itup never will be more than initial lenblkno+2, because * during this processing Indextuple can be only smaller */ lenitup = insert->lenblk; itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); for (i = 0; i < insert->lenblk; i++) itup[i] = gist_form_invalid_tuple(insert->blkno[i]); if (insert->origblkno == GIST_ROOT_BLKNO) { /* * it was split root, so we should only make new root. it can't be * simple insert into root, look at call pushIncompleteInsert in * gistRedoPageSplitRecord */ Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); Page page; if (!BufferIsValid(buffer)) elog(PANIC, "root block unfound"); page = BufferGetPage(buffer); if (XLByteLE(insert->lsn, PageGetLSN(page))) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return; } GISTInitBuffer(buffer, 0); page = BufferGetPage(buffer); gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); PageSetLSN(page, insert->lsn); PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } else { Buffer *buffers; Page *pages; int numbuffer; /* construct path */ gixtxlogFindPath(index, insert); Assert(insert->pathlen > 0); buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); for (i = 0; i < insert->pathlen; i++) { int j, k, pituplen = 0, childfound = 0; numbuffer = 1; buffers[numbuffer - 1] = XLogReadBuffer(false, index, insert->path[i]); if (!BufferIsValid(buffers[numbuffer - 1])) elog(PANIC, "block %u unfound", insert->path[i]); pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]); if (PageIsNew((PageHeader) (pages[numbuffer - 1]))) elog(PANIC, "uninitialized page %u", insert->path[i]); if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1]))) { LockBuffer(buffers[numbuffer - 1], BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffers[numbuffer - 1]); return; } pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]); /* remove old IndexTuples */ for (j = 0; j < pituplen && childfound < lenitup; j++) { BlockNumber blkno; ItemId iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber); IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid); blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); for (k = 0; k < lenitup; k++) if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) { PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber); j--; pituplen--; childfound++; break; } } if (gistnospace(pages[numbuffer - 1], itup, lenitup)) { /* no space left on page, so we should split */ buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); if (!BufferIsValid(buffers[numbuffer])) elog(PANIC, "could not obtain new block"); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber); numbuffer++; if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) { IndexTuple *parentitup; /* * we split root, just copy tuples from old root to new * page */ parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen); /* sanity check */ if (i + 1 != insert->pathlen) elog(PANIC, "unexpected pathlen in index \"%s\"", RelationGetRelationName(index)); /* fill new page */ buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); if (!BufferIsValid(buffers[numbuffer])) elog(PANIC, "could not obtain new block"); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber); numbuffer++; /* fill root page */ GISTInitBuffer(buffers[0], 0); for (j = 1; j < numbuffer; j++) { IndexTuple tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); if (PageAddItem(pages[0], (Item) tuple, IndexTupleSize(tuple), (OffsetNumber) j, LP_USED) == InvalidOffsetNumber) elog(PANIC, "failed to add item to index page in \"%s\"", RelationGetRelationName(index)); } } } else gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber); lenitup = numbuffer; for (j = 0; j < numbuffer; j++) { itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); PageSetLSN(pages[j], insert->lsn); PageSetTLI(pages[j], ThisTimeLineID); GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK); WriteBuffer(buffers[j]); } } } ereport(LOG, (errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery", insert->node.spcNode, insert->node.dbNode, insert->node.relNode), errdetail("Incomplete insertion detected during crash replay."))); } void gist_xlog_startup(void) { incomplete_inserts = NIL; insertCtx = AllocSetContextCreate(CurrentMemoryContext, "GiST recovery temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { ListCell *l; List *reverse = NIL; MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); /* we should call gistContinueInsert in reverse order */ foreach(l, incomplete_inserts) reverse = lappend(reverse, lfirst(l)); MemoryContextSwitchTo(opCtx); foreach(l, reverse) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); gistContinueInsert(insert); MemoryContextReset(opCtx); } MemoryContextSwitchTo(oldCxt); MemoryContextDelete(opCtx); MemoryContextDelete(insertCtx); } XLogRecData * formSplitRdata(RelFileNode node, BlockNumber blkno, ItemPointer key, SplitedPageLayout *dist) { XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; int npage = 0, cur = 1; ptr = dist; while (ptr) { npage++; ptr = ptr->next; } rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); xlrec->node = node; xlrec->origblkno = blkno; xlrec->npage = (uint16) npage; if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) xlrec; rdata[0].len = sizeof(gistxlogPageSplit); rdata[0].next = NULL; ptr = dist; while (ptr) { rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); rdata[cur - 1].next = &(rdata[cur]); cur++; rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; rdata[cur - 1].next = &(rdata[cur]); rdata[cur].next = NULL; cur++; ptr = ptr->next; } return rdata; } XLogRecData * formUpdateRdata(RelFileNode node, BlockNumber blkno, OffsetNumber *todelete, int ntodelete, bool emptypage, IndexTuple *itup, int ituplen, ItemPointer key) { XLogRecData *rdata; gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate)); xlrec->node = node; xlrec->blkno = blkno; if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); if (emptypage) { xlrec->isemptypage = true; xlrec->ntodelete = 0; rdata = (XLogRecData *) palloc(sizeof(XLogRecData)); rdata->buffer = InvalidBuffer; rdata->data = (char *) xlrec; rdata->len = sizeof(gistxlogEntryUpdate); rdata->next = NULL; } else { int cur = 1, i; xlrec->isemptypage = false; xlrec->ntodelete = ntodelete; rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen)); rdata->buffer = InvalidBuffer; rdata->data = (char *) xlrec; rdata->len = sizeof(gistxlogEntryUpdate); rdata->next = NULL; if (ntodelete) { rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) todelete; rdata[cur].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); rdata[cur].next = NULL; cur++; } /* new tuples */ for (i = 0; i < ituplen; i++) { rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (itup[i]); rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].next = NULL; rdata[cur - 1].next = &(rdata[cur]); cur++; } } return rdata; } XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) { gistxlogInsertComplete xlrec; XLogRecData rdata[2]; XLogRecPtr recptr; Assert(len > 0); xlrec.node = node; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(gistxlogInsertComplete); rdata[0].next = &(rdata[1]); rdata[1].buffer = InvalidBuffer; rdata[1].data = (char *) keys; rdata[1].len = sizeof(ItemPointerData) * len; rdata[1].next = NULL; START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); END_CRIT_SECTION(); return recptr; }