From 384cad5c7b773da29c26fc08a7c969b35c94cb54 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Tue, 15 Sep 2009 20:31:30 +0000 Subject: [PATCH] Fix two distinct errors in creation of GIN_INSERT_LISTPAGE xlog records. In practice these mistakes were always masked when full_page_writes was on, because XLogInsert would always choose to log the full page, and then ginRedoInsertListPage wouldn't try to do anything. But with full_page_writes off a WAL replay failure was certain. The GIN_INSERT_LISTPAGE record type could probably be eliminated entirely in favor of using XLOG_HEAP_NEWPAGE, but I refrained from doing that now since it would have required a significantly more invasive patch. In passing do a little bit of code cleanup, including making the accounting for free space on GIN list pages more precise. (This wasn't a bug as the errors were always in the conservative direction.) Per report from Simon. Back-patch to 8.4 which contains the identical code. --- src/backend/access/gin/ginfast.c | 78 ++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c index 20887ba56c..9d2351af52 100644 --- a/src/backend/access/gin/ginfast.c +++ b/src/backend/access/gin/ginfast.c @@ -11,7 +11,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.3 2009/06/11 14:48:53 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.4 2009/09/15 20:31:30 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -41,13 +41,15 @@ typedef struct DatumArray /* * Build a pending-list page from the given array of tuples, and write it out. + * + * Returns amount of free space left on the page. */ static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink) { Page page = BufferGetPage(buffer); - int i, + int32 i, freesize, size = 0; OffsetNumber l, @@ -100,8 +102,6 @@ writeListPage(Relation index, Buffer buffer, GinPageGetOpaque(page)->maxoff = 0; } - freesize = PageGetFreeSpace(page); - MarkBufferDirty(buffer); if (!index->rd_istemp) @@ -110,26 +110,30 @@ writeListPage(Relation index, Buffer buffer, ginxlogInsertListPage data; XLogRecPtr recptr; - rdata[0].buffer = buffer; - rdata[0].buffer_std = true; + data.node = index->rd_node; + data.blkno = BufferGetBlockNumber(buffer); + data.rightlink = rightlink; + data.ntuples = ntuples; + + rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &data; rdata[0].len = sizeof(ginxlogInsertListPage); rdata[0].next = rdata + 1; - rdata[1].buffer = InvalidBuffer; + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; rdata[1].data = workspace; rdata[1].len = size; rdata[1].next = NULL; - data.blkno = BufferGetBlockNumber(buffer); - data.rightlink = rightlink; - data.ntuples = ntuples; - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } + /* get free space before releasing buffer */ + freesize = PageGetExactFreeSpace(page); + UnlockReleaseBuffer(buffer); END_CRIT_SECTION(); @@ -165,7 +169,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, { res->nPendingPages++; writeListPage(index, prevBuffer, - tuples + startTuple, i - startTuple, + tuples + startTuple, + i - startTuple, BufferGetBlockNumber(curBuffer)); } else @@ -180,7 +185,7 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData); - if (size + tupsize >= GinListPageSize) + if (size + tupsize > GinListPageSize) { /* won't fit, force a new page and reprocess */ i--; @@ -197,7 +202,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, */ res->tail = BufferGetBlockNumber(curBuffer); res->tailFreeSize = writeListPage(index, curBuffer, - tuples + startTuple, ntuples - startTuple, + tuples + startTuple, + ntuples - startTuple, InvalidBlockNumber); res->nPendingPages++; /* that was only one heap tuple */ @@ -237,7 +243,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate, metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO); metapage = BufferGetPage(metabuffer); - if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE) + if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize) { /* * Total size is greater than one page => make sublist @@ -265,13 +271,12 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate, if (separateList) { - GinMetaPageData sublist; - /* * We should make sublist separately and append it to the tail */ - memset(&sublist, 0, sizeof(GinMetaPageData)); + GinMetaPageData sublist; + memset(&sublist, 0, sizeof(GinMetaPageData)); makeSublist(index, collector->tuples, collector->ntuples, &sublist); /* @@ -283,45 +288,44 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate, if (metadata->head == InvalidBlockNumber) { /* - * Sublist becomes main list + * Main list is empty, so just copy sublist into main list */ START_CRIT_SECTION(); + memcpy(metadata, &sublist, sizeof(GinMetaPageData)); - memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData)); } else { /* - * merge lists + * Merge lists */ - data.prevTail = metadata->tail; + data.newRightlink = sublist.head; + buffer = ReadBuffer(index, metadata->tail); LockBuffer(buffer, GIN_EXCLUSIVE); page = BufferGetPage(buffer); + Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber); START_CRIT_SECTION(); GinPageGetOpaque(page)->rightlink = sublist.head; + + MarkBufferDirty(buffer); + metadata->tail = sublist.tail; metadata->tailFreeSize = sublist.tailFreeSize; metadata->nPendingPages += sublist.nPendingPages; metadata->nPendingHeapTuples += sublist.nPendingHeapTuples; - - memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); - data.newRightlink = sublist.head; - - MarkBufferDirty(buffer); } } else { /* - * Insert into tail page, metapage is already locked + * Insert into tail page. Metapage is already locked */ - OffsetNumber l, off; int i, @@ -331,6 +335,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate, buffer = ReadBuffer(index, metadata->tail); LockBuffer(buffer, GIN_EXCLUSIVE); page = BufferGetPage(buffer); + off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); @@ -368,20 +373,24 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate, off++; } - metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData); - memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); + Assert((ptr - rdata[1].data) <= collector->sumsize); + + metadata->tailFreeSize = PageGetExactFreeSpace(page); + MarkBufferDirty(buffer); } /* - * Make real write + * Write metabuffer, make xlog entry */ - MarkBufferDirty(metabuffer); + if (!index->rd_istemp) { XLogRecPtr recptr; + memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata); PageSetLSN(metapage, recptr); PageSetTLI(metapage, ThisTimeLineID); @@ -552,7 +561,6 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, metadata->nPendingPages = 0; metadata->nPendingHeapTuples = 0; } - memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); MarkBufferDirty(metabuffer); @@ -567,6 +575,8 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, { XLogRecPtr recptr; + memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata); PageSetLSN(metapage, recptr); PageSetTLI(metapage, ThisTimeLineID);