diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c index c4801d5cd8..7248b06326 100644 --- a/src/backend/access/gin/ginbtree.c +++ b/src/backend/access/gin/ginbtree.c @@ -279,6 +279,160 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, } } +/* + * Returns true if the insertion is done, false if the page was split and + * downlink insertion is pending. + * + * stack->buffer is locked on entry, and is kept locked. + */ +static bool +ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, + GinStatsData *buildStats) +{ + Page page = BufferGetPage(stack->buffer); + XLogRecData *rdata; + bool fit; + + START_CRIT_SECTION(); + fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata); + if (fit) + { + MarkBufferDirty(stack->buffer); + + if (RelationNeedsWAL(btree->index)) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata); + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + return true; + } + else + { + /* Didn't fit, have to split */ + Buffer rbuffer; + Page newlpage; + BlockNumber savedRightLink; + GinBtreeStack *parent; + Page lpage, + rpage; + + END_CRIT_SECTION(); + + rbuffer = GinNewBuffer(btree->index); + + savedRightLink = GinPageGetOpaque(page)->rightlink; + + /* + * newlpage is a pointer to memory page, it is not associated with + * a buffer. stack->buffer is not touched yet. + */ + newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata); + + ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno; + + /* During index build, count the newly-split page */ + if (buildStats) + { + if (btree->isData) + buildStats->nDataPages++; + else + buildStats->nEntryPages++; + } + + parent = stack->parent; + + if (parent == NULL) + { + /* + * split root, so we need to allocate new left page and place + * pointer on root to left and right page + */ + Buffer lbuffer = GinNewBuffer(btree->index); + + ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE; + ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber; + + lpage = BufferGetPage(lbuffer); + rpage = BufferGetPage(rbuffer); + + GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber; + GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); + ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer); + + START_CRIT_SECTION(); + + GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF); + PageRestoreTempPage(newlpage, lpage); + btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer); + + MarkBufferDirty(rbuffer); + MarkBufferDirty(lbuffer); + MarkBufferDirty(stack->buffer); + + if (RelationNeedsWAL(btree->index)) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); + PageSetLSN(page, recptr); + PageSetLSN(lpage, recptr); + PageSetLSN(rpage, recptr); + } + + UnlockReleaseBuffer(rbuffer); + UnlockReleaseBuffer(lbuffer); + END_CRIT_SECTION(); + + /* During index build, count the newly-added root page */ + if (buildStats) + { + if (btree->isData) + buildStats->nDataPages++; + else + buildStats->nEntryPages++; + } + + return true; + } + else + { + /* split non-root page */ + ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE; + ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink; + + lpage = BufferGetPage(stack->buffer); + rpage = BufferGetPage(rbuffer); + + GinPageGetOpaque(rpage)->rightlink = savedRightLink; + GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); + + START_CRIT_SECTION(); + PageRestoreTempPage(newlpage, lpage); + + MarkBufferDirty(rbuffer); + MarkBufferDirty(stack->buffer); + + if (RelationNeedsWAL(btree->index)) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); + PageSetLSN(lpage, recptr); + PageSetLSN(rpage, recptr); + } + UnlockReleaseBuffer(rbuffer); + END_CRIT_SECTION(); + + return false; + } + } +} + /* * Insert value (stored in GinBtree) to tree described by stack * @@ -292,9 +446,7 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) { GinBtreeStack *parent; BlockNumber rootBlkno; - Page page, - rpage, - lpage; + Page page; /* extract root BlockNumber from stack */ Assert(stack != NULL); @@ -307,153 +459,21 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) /* this loop crawls up the stack until the insertion is complete */ for (;;) { - XLogRecData *rdata; - BlockNumber savedRightLink; - bool fit; + bool done; - page = BufferGetPage(stack->buffer); - savedRightLink = GinPageGetOpaque(page)->rightlink; + done = ginPlaceToPage(btree, rootBlkno, stack, buildStats); - START_CRIT_SECTION(); - fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata); - if (fit) + /* just to be extra sure we don't delete anything by accident... */ + btree->isDelete = FALSE; + + if (done) { - MarkBufferDirty(stack->buffer); - - if (RelationNeedsWAL(btree->index)) - { - XLogRecPtr recptr; - - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata); - PageSetLSN(page, recptr); - } - LockBuffer(stack->buffer, GIN_UNLOCK); - END_CRIT_SECTION(); - freeGinBtreeStack(stack); - - return; - } - else - { - /* Didn't fit, have to split */ - Buffer rbuffer; - Page newlpage; - - END_CRIT_SECTION(); - - rbuffer = GinNewBuffer(btree->index); - - /* - * newlpage is a pointer to memory page, it is not associated with - * a buffer. stack->buffer is not touched yet. - */ - newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata); - - ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno; - - /* During index build, count the newly-split page */ - if (buildStats) - { - if (btree->isData) - buildStats->nDataPages++; - else - buildStats->nEntryPages++; - } - - parent = stack->parent; - - if (parent == NULL) - { - /* - * split root, so we need to allocate new left page and place - * pointer on root to left and right page - */ - Buffer lbuffer = GinNewBuffer(btree->index); - - ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE; - ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber; - - page = BufferGetPage(stack->buffer); - lpage = BufferGetPage(lbuffer); - rpage = BufferGetPage(rbuffer); - - GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber; - GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); - ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer); - - START_CRIT_SECTION(); - - GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF); - PageRestoreTempPage(newlpage, lpage); - btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer); - - MarkBufferDirty(rbuffer); - MarkBufferDirty(lbuffer); - MarkBufferDirty(stack->buffer); - - if (RelationNeedsWAL(btree->index)) - { - XLogRecPtr recptr; - - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); - PageSetLSN(page, recptr); - PageSetLSN(lpage, recptr); - PageSetLSN(rpage, recptr); - } - - UnlockReleaseBuffer(rbuffer); - UnlockReleaseBuffer(lbuffer); - LockBuffer(stack->buffer, GIN_UNLOCK); - END_CRIT_SECTION(); - - freeGinBtreeStack(stack); - - /* During index build, count the newly-added root page */ - if (buildStats) - { - if (btree->isData) - buildStats->nDataPages++; - else - buildStats->nEntryPages++; - } - - return; - } - else - { - /* split non-root page */ - ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE; - ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink; - - lpage = BufferGetPage(stack->buffer); - rpage = BufferGetPage(rbuffer); - - GinPageGetOpaque(rpage)->rightlink = savedRightLink; - GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); - - START_CRIT_SECTION(); - PageRestoreTempPage(newlpage, lpage); - - MarkBufferDirty(rbuffer); - MarkBufferDirty(stack->buffer); - - if (RelationNeedsWAL(btree->index)) - { - XLogRecPtr recptr; - - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); - PageSetLSN(lpage, recptr); - PageSetLSN(rpage, recptr); - } - UnlockReleaseBuffer(rbuffer); - END_CRIT_SECTION(); - } + break; } btree->prepareDownlink(btree, stack->buffer); - btree->isDelete = FALSE; /* search parent to lock */ LockBuffer(parent->buffer, GIN_EXCLUSIVE);