diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 956305bab2..4fc7a213b6 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -626,6 +626,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) firststack.blkno = GIST_ROOT_BLKNO; firststack.lsn.xrecoff = 0; firststack.parent = NULL; + firststack.downlinkoffnum = InvalidOffsetNumber; state.stack = stack = &firststack; /* @@ -702,9 +703,10 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) BlockNumber childblkno; IndexTuple newtup; GISTInsertStack *item; + OffsetNumber downlinkoffnum; - stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate); - iid = PageGetItemId(stack->page, stack->childoffnum); + downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate); + iid = PageGetItemId(stack->page, downlinkoffnum); idxtuple = (IndexTuple) PageGetItem(stack->page, iid); childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); @@ -754,7 +756,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) * tuple. */ if (gistinserttuples(&state, stack, giststate, &newtup, 1, - stack->childoffnum, InvalidBuffer)) + downlinkoffnum, InvalidBuffer)) { /* * If this was a root split, the root page continues to be @@ -778,6 +780,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); item->blkno = childblkno; item->parent = stack; + item->downlinkoffnum = downlinkoffnum; state.stack = stack = item; } else @@ -854,12 +857,14 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) /* * Traverse the tree to find path from root page to specified "child" block. * - * returns from the beginning of closest parent; + * returns a new insertion stack, starting from the parent of "child", up + * to the root. *downlinkoffnum is set to the offset of the downlink in the + * direct parent of child. * * To prevent deadlocks, this should lock only one page at a time. */ -GISTInsertStack * -gistFindPath(Relation r, BlockNumber child) +static GISTInsertStack * +gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum) { Page page; Buffer buffer; @@ -867,16 +872,22 @@ gistFindPath(Relation r, BlockNumber child) maxoff; ItemId iid; IndexTuple idxtuple; + List *fifo; GISTInsertStack *top, - *tail, *ptr; BlockNumber blkno; - top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); + top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); top->blkno = GIST_ROOT_BLKNO; + top->downlinkoffnum = InvalidOffsetNumber; - while (top && top->blkno != child) + fifo = list_make1(top); + while (fifo != NIL) { + /* Get next page to visit */ + top = linitial(fifo); + fifo = list_delete_first(fifo); + buffer = ReadBuffer(r, top->blkno); LockBuffer(buffer, GIST_SHARE); gistcheckpage(r, buffer); @@ -917,12 +928,10 @@ gistFindPath(Relation r, BlockNumber child) */ ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ptr->blkno = GistPageGetOpaque(page)->rightlink; - ptr->childoffnum = InvalidOffsetNumber; + ptr->downlinkoffnum = InvalidOffsetNumber; ptr->parent = top->parent; - ptr->next = top->next; - top->next = ptr; - if (tail == top) - tail = ptr; + + fifo = lcons(ptr, fifo); } maxoff = PageGetMaxOffsetNumber(page); @@ -934,48 +943,24 @@ gistFindPath(Relation r, BlockNumber child) blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); if (blkno == child) { - OffsetNumber poff = InvalidOffsetNumber; - - /* make childs links */ - ptr = top; - while (ptr->parent) - { - /* move childoffnum.. */ - if (ptr == top) - { - /* first iteration */ - poff = ptr->parent->childoffnum; - ptr->parent->childoffnum = ptr->childoffnum; - } - else - { - OffsetNumber tmp = ptr->parent->childoffnum; - - ptr->parent->childoffnum = poff; - poff = tmp; - } - ptr = ptr->parent; - } - top->childoffnum = i; + /* Found it! */ UnlockReleaseBuffer(buffer); + *downlinkoffnum = i; return top; } else { - /* Install next inner page to the end of stack */ + /* Append this child to the list of pages to visit later */ ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ptr->blkno = blkno; - ptr->childoffnum = i; /* set offsetnumber of child to child - * !!! */ + ptr->downlinkoffnum = i; ptr->parent = top; - ptr->next = NULL; - tail->next = ptr; - tail = ptr; + + fifo = lappend(fifo, ptr); } } UnlockReleaseBuffer(buffer); - top = top->next; } elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u", @@ -997,7 +982,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) parent->page = (Page) BufferGetPage(parent->buffer); /* here we don't need to distinguish between split and page update */ - if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page))) + if (child->downlinkoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page))) { /* parent is changed, look child in right links until found */ OffsetNumber i, @@ -1016,7 +1001,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno) { /* yes!!, found */ - parent->childoffnum = i; + child->downlinkoffnum = i; return; } } @@ -1024,12 +1009,13 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) parent->blkno = GistPageGetOpaque(parent->page)->rightlink; UnlockReleaseBuffer(parent->buffer); if (parent->blkno == InvalidBlockNumber) - + { /* - * end of chain and still didn't found parent, It's very-very - * rare situation when root splited + * End of chain and still didn't find parent. It's a very-very + * rare situation when root splited. */ break; + } parent->buffer = ReadBuffer(r, parent->blkno); LockBuffer(parent->buffer, GIST_EXCLUSIVE); gistcheckpage(r, parent->buffer); @@ -1050,7 +1036,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) } /* ok, find new path */ - ptr = parent = gistFindPath(r, child->blkno); + ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum); /* read all buffers as expected by caller */ /* note we don't lock them or gistcheckpage them here! */ @@ -1119,7 +1105,7 @@ gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate, LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); gistFindCorrectParent(rel, stack); - iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum); + iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum); downlink = (IndexTuple) PageGetItem(stack->parent->page, iid); downlink = CopyIndexTuple(downlink); LockBuffer(stack->parent->buffer, GIST_UNLOCK); @@ -1147,7 +1133,7 @@ gistfixsplit(GISTInsertState *state, GISTSTATE *giststate) RelationGetRelationName(state->r), stack->blkno); Assert(GistFollowRight(stack->page)); - Assert(OffsetNumberIsValid(stack->parent->childoffnum)); + Assert(OffsetNumberIsValid(stack->downlinkoffnum)); buf = stack->buffer; @@ -1284,7 +1270,7 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, tuples[1] = right->downlink; gistinserttuples(state, stack->parent, giststate, tuples, 2, - stack->parent->childoffnum, + stack->downlinkoffnum, left->buf); LockBuffer(stack->parent->buffer, GIST_UNLOCK); UnlockReleaseBuffer(right->buf); diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 77e3cb5aee..9fb20a6b6c 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -218,14 +218,11 @@ typedef struct GISTInsertStack */ GistNSN lsn; - /* child's offset */ - OffsetNumber childoffnum; + /* offset of the downlink in the parent page, that points to this page */ + OffsetNumber downlinkoffnum; /* pointer to parent */ struct GISTInsertStack *parent; - - /* for gistFindPath */ - struct GISTInsertStack *next; } GISTInsertStack; typedef struct GistSplitVector @@ -293,8 +290,6 @@ extern void freeGISTstate(GISTSTATE *giststate); extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate); -extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child); - /* gistxlog.c */ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);