/*------------------------------------------------------------------------- * * gistxlog.c * WAL replay logic for GiST. * * * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/access/gist/gistxlog.c *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/gist_private.h" #include "access/xlogutils.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/memutils.h" #include "utils/rel.h" typedef struct { gistxlogPage *header; IndexTuple *itup; } NewPage; typedef struct { gistxlogPageSplit *data; NewPage *page; } PageSplitRecord; static MemoryContext opCtx; /* working memory for operations */ /* * Replay the clearing of F_FOLLOW_RIGHT flag. */ static void gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, BlockNumber leftblkno) { Buffer buffer; buffer = XLogReadBuffer(node, leftblkno, false); if (BufferIsValid(buffer)) { Page page = (Page) BufferGetPage(buffer); /* * Note that we still update the page even if page LSN is equal to the * LSN of this record, because the updated NSN is not included in the * full page image. */ if (!XLByteLT(lsn, PageGetLSN(page))) { GistPageGetOpaque(page)->nsn = lsn; GistClearFollowRight(page); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); } UnlockReleaseBuffer(buffer); } } /* * redo any page update (except page split) */ static void gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) { char *begin = XLogRecGetData(record); gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; Buffer buffer; Page page; char *data; if (BlockNumberIsValid(xldata->leftchild)) gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); /* nothing more to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); if (XLByteLE(lsn, PageGetLSN(page))) { UnlockReleaseBuffer(buffer); return; } data = begin + sizeof(gistxlogPageUpdate); /* Delete old tuples */ if (xldata->ntodelete > 0) { int i; OffsetNumber *todelete = (OffsetNumber *) data; data += sizeof(OffsetNumber) * xldata->ntodelete; for (i = 0; i < xldata->ntodelete; i++) PageIndexTupleDelete(page, todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ if (data - begin < record->xl_len) { OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); while (data - begin < record->xl_len) { IndexTuple itup = (IndexTuple) data; Size sz = IndexTupleSize(itup); OffsetNumber l; data += sz; l = PageAddItem(page, (Item) itup, sz, off, false, false); if (l == InvalidOffsetNumber) elog(ERROR, "failed to add item to GiST index page, size %d bytes", (int) sz); off++; } } else { /* * special case: leafpage, nothing to insert, nothing to delete, then * vacuum marks page */ if (GistPageIsLeaf(page) && xldata->ntodelete == 0) GistClearTuplesDeleted(page); } if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) /* * all links on non-leaf root page was deleted by vacuum full, so root * page becomes a leaf */ GistPageSetLeaf(page); GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } static void gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) { gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record); Buffer buffer; Page page; /* nothing else to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); GistPageSetDeleted(page); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; int j, i = 0; decoded->data = (gistxlogPageSplit *) begin; decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage); ptr = begin + sizeof(gistxlogPageSplit); for (i = 0; i < decoded->data->npage; i++) { Assert(ptr - begin < record->xl_len); decoded->page[i].header = (gistxlogPage *) ptr; ptr += sizeof(gistxlogPage); decoded->page[i].itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->page[i].header->num); j = 0; while (j < decoded->page[i].header->num) { Assert(ptr - begin < record->xl_len); decoded->page[i].itup[j] = (IndexTuple) ptr; ptr += IndexTupleSize((IndexTuple) ptr); j++; } } } static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; Buffer buffer; Page page; int i; bool isrootsplit = false; if (BlockNumberIsValid(xldata->leftchild)) gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); decodePageSplitRecord(&xlrec, record); /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; int flags; if (newpage->header->blkno == GIST_ROOT_BLKNO) { Assert(i == 0); isrootsplit = true; } buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* ok, clear buffer */ if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO) flags = F_LEAF; else flags = 0; GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); if (newpage->header->blkno == GIST_ROOT_BLKNO) { GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; GistPageGetOpaque(page)->nsn = xldata->orignsn; GistClearFollowRight(page); } else { if (i < xlrec.data->npage - 1) GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno; else GistPageGetOpaque(page)->rightlink = xldata->origrlink; GistPageGetOpaque(page)->nsn = xldata->orignsn; if (i < xlrec.data->npage - 1 && !isrootsplit) GistMarkFollowRight(page); else GistClearFollowRight(page); } PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } } static void gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { RelFileNode *node = (RelFileNode *) XLogRecGetData(record); Buffer buffer; Page page; buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); GISTInitBuffer(buffer, F_LEAF); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } void gist_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; MemoryContext oldCxt; /* * GIST indexes do not require any conflict processing. NB: If we ever * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: gistRedoPageUpdateRecord(lsn, record); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } MemoryContextSwitchTo(oldCxt); MemoryContextReset(opCtx); } static void out_target(StringInfo buf, RelFileNode node) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u", xlrec->blkno); } static void out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec) { appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u", xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, xlrec->blkno); } static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } void gist_desc(StringInfo buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; switch (info) { case XLOG_GIST_PAGE_UPDATE: appendStringInfo(buf, "page_update: "); out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); break; case XLOG_GIST_PAGE_DELETE: out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; case XLOG_GIST_CREATE_INDEX: appendStringInfo(buf, "create_index: rel %u/%u/%u", ((RelFileNode *) rec)->spcNode, ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } void gist_xlog_startup(void) { opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { MemoryContextDelete(opCtx); } /* * Write WAL record of a page split. */ XLogRecPtr gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, SplitedPageLayout *dist, BlockNumber origrlink, GistNSN orignsn, Buffer leftchildbuf) { XLogRecData *rdata; gistxlogPageSplit xlrec; SplitedPageLayout *ptr; int npage = 0, cur; XLogRecPtr recptr; for (ptr = dist; ptr; ptr = ptr->next) npage++; rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); xlrec.node = node; xlrec.origblkno = blkno; xlrec.origrlink = origrlink; xlrec.orignsn = orignsn; xlrec.origleaf = page_is_leaf; xlrec.npage = (uint16) npage; xlrec.leftchild = BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(gistxlogPageSplit); rdata[0].buffer = InvalidBuffer; cur = 1; /* * Include a full page image of the child buf. (only necessary if a * checkpoint happened since the child page was split) */ if (BufferIsValid(leftchildbuf)) { rdata[cur - 1].next = &(rdata[cur]); rdata[cur].data = NULL; rdata[cur].len = 0; rdata[cur].buffer = leftchildbuf; rdata[cur].buffer_std = true; cur++; } for (ptr = dist; ptr; ptr = ptr->next) { rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); cur++; rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; cur++; } rdata[cur - 1].next = NULL; recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); pfree(rdata); return recptr; } /* * Write XLOG record describing a page update. The update can include any * number of deletions and/or insertions of tuples on a single index page. * * If this update inserts a downlink for a split page, also record that * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set. * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides * to log the whole buffer contents instead. Also, we take care that there's * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer, OffsetNumber *todelete, int ntodelete, IndexTuple *itup, int ituplen, Buffer leftchildbuf) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; XLogRecPtr recptr; rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; xlrec->leftchild = BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; rdata[0].buffer = buffer; rdata[0].buffer_std = true; rdata[0].data = NULL; rdata[0].len = 0; rdata[0].next = &(rdata[1]); rdata[1].data = (char *) xlrec; rdata[1].len = sizeof(gistxlogPageUpdate); rdata[1].buffer = InvalidBuffer; rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; rdata[2].len = sizeof(OffsetNumber) * ntodelete; rdata[2].buffer = buffer; rdata[2].buffer_std = true; cur = 3; /* new tuples */ for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); rdata[cur].data = (char *) (itup[i]); rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; cur++; } /* * Include a full page image of the child buf. (only necessary if a * checkpoint happened since the child page was split) */ if (BufferIsValid(leftchildbuf)) { rdata[cur - 1].next = &(rdata[cur]); rdata[cur].data = NULL; rdata[cur].len = 0; rdata[cur].buffer = leftchildbuf; rdata[cur].buffer_std = true; cur++; } rdata[cur - 1].next = NULL; recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); pfree(rdata); return recptr; }