diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 0635301505..1efaaee1a8 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -26,12 +26,15 @@ /* non-export function prototypes */ static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate); +static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum); static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, GISTSTATE *giststate, IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, - Buffer leftchild); + Buffer leftchild, Buffer rightchild, + bool unlockbuf, bool unlockleftchild); static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, - GISTSTATE *giststate, List *splitinfo); + GISTSTATE *giststate, List *splitinfo, bool releasebuf); #define ROTATEDIST(d) do { \ @@ -609,15 +612,15 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) /* * Update the tuple. * - * We still hold the lock after gistinserttuples(), but it + * We still hold the lock after gistinserttuple(), but it * might have to split the page to make the updated tuple fit. * In that case the updated tuple might migrate to the other * half of the split, so we have to go back to the parent and * descend back to the half that's a better fit for the new * tuple. */ - if (gistinserttuples(&state, stack, giststate, &newtup, 1, - downlinkoffnum, InvalidBuffer)) + if (gistinserttuple(&state, stack, giststate, newtup, + downlinkoffnum)) { /* * If this was a root split, the root page continues to be @@ -703,8 +706,8 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) /* now state.stack->(page, buffer and blkno) points to leaf page */ - gistinserttuples(&state, stack, giststate, &itup, 1, - InvalidOffsetNumber, InvalidBuffer); + gistinserttuple(&state, stack, giststate, itup, + InvalidOffsetNumber); LockBuffer(stack->buffer, GIST_UNLOCK); /* Release any pins we might still hold before exiting */ @@ -1028,51 +1031,107 @@ gistfixsplit(GISTInsertState *state, GISTSTATE *giststate) } /* Insert the downlinks */ - gistfinishsplit(state, stack, giststate, splitinfo); + gistfinishsplit(state, stack, giststate, splitinfo, false); } /* - * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples - * replace an old tuple at oldoffnum. The caller must hold an exclusive lock - * on the page. + * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the + * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new. + * 'stack' represents the path from the root to the page being updated. * - * If leftchild is valid, we're inserting/updating the downlink for the - * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and - * update NSN on leftchild, atomically with the insertion of the downlink. + * The caller must hold an exclusive lock on stack->buffer. The lock is still + * held on return, but the page might not contain the inserted tuple if the + * page was split. The function returns true if the page was split, false + * otherwise. + */ +static bool +gistinserttuple(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum) +{ + return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum, + InvalidBuffer, InvalidBuffer, false, false); +} + +/* ---------------- + * An extended workhorse version of gistinserttuple(). This version allows + * inserting multiple tuples, or replacing a single tuple with multiple tuples. + * This is used to recursively update the downlinks in the parent when a page + * is split. * - * Returns 'true' if the page had to be split. On return, we will continue - * to hold an exclusive lock on state->stack->buffer, but if we had to split - * the page, it might not contain the tuple we just inserted/updated. + * If leftchild and rightchild are valid, we're inserting/replacing the + * downlink for rightchild, and leftchild is its left sibling. We clear the + * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the + * insertion of the downlink. + * + * To avoid holding locks for longer than necessary, when recursing up the + * tree to update the parents, the locking is a bit peculiar here. On entry, + * the caller must hold an exclusive lock on stack->buffer, as well as + * leftchild and rightchild if given. On return: + * + * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is + * always kept pinned, however. + * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page + * is kept pinned. + * - Lock and pin on 'rightchild' are always released. + * + * Returns 'true' if the page had to be split. Note that if the page had + * be split, the inserted/updated might've been inserted to a right sibling + * of stack->buffer instead of stack->buffer itself. */ static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, GISTSTATE *giststate, IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, - Buffer leftchild) + Buffer leftchild, Buffer rightchild, + bool unlockbuf, bool unlockleftchild) { List *splitinfo; bool is_split; + /* Insert the tuple(s) to the page, splitting the page if necessary */ is_split = gistplacetopage(state->r, state->freespace, giststate, stack->buffer, tuples, ntup, oldoffnum, leftchild, &splitinfo, true); + + /* + * Before recursing up in case the page was split, release locks on the + * child pages. We don't need to keep them locked when updating the + * parent. + */ + if (BufferIsValid(rightchild)) + UnlockReleaseBuffer(rightchild); + if (BufferIsValid(leftchild) && unlockleftchild) + LockBuffer(leftchild, GIST_UNLOCK); + + /* + * If we had to split, insert/update the downlinks in the parent. If the + * caller requested us to release the lock on stack->buffer, tell + * gistfinishsplit() to do that as soon as it's safe to do so. If we + * didn't have to split, release it ourselves. + */ if (splitinfo) - gistfinishsplit(state, stack, giststate, splitinfo); + gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf); + else if (unlockbuf) + LockBuffer(stack->buffer, GIST_UNLOCK); return is_split; } /* - * Finish an incomplete split by inserting/updating the downlinks in - * parent page. 'splitinfo' contains all the child pages, exclusively-locked, - * involved in the split, from left-to-right. + * Finish an incomplete split by inserting/updating the downlinks in parent + * page. 'splitinfo' contains all the child pages involved in the split, + * from left-to-right. + * + * On entry, the caller must hold a lock on stack->buffer and all the child + * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is + * released on return. The child pages are always unlocked and unpinned. */ static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, - GISTSTATE *giststate, List *splitinfo) + GISTSTATE *giststate, List *splitinfo, bool unlockbuf) { ListCell *lc; List *reversed; @@ -1101,6 +1160,10 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); gistFindCorrectParent(state->r, stack); + /* + * insert downlinks for the siblings from right to left, until there are + * only two siblings left. + */ while (list_length(reversed) > 2) { right = (GISTPageSplitInfo *) linitial(reversed); @@ -1109,7 +1172,7 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, if (gistinserttuples(state, stack->parent, giststate, &right->downlink, 1, InvalidOffsetNumber, - left->buf)) + left->buf, right->buf, false, false)) { /* * If the parent page was split, need to relocate the original @@ -1117,7 +1180,7 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, */ gistFindCorrectParent(state->r, stack); } - UnlockReleaseBuffer(right->buf); + /* gistinserttuples() released the lock on right->buf. */ reversed = list_delete_first(reversed); } @@ -1134,9 +1197,10 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, gistinserttuples(state, stack->parent, giststate, tuples, 2, stack->downlinkoffnum, - left->buf); - LockBuffer(stack->parent->buffer, GIST_UNLOCK); - UnlockReleaseBuffer(right->buf); + left->buf, right->buf, + true, /* Unlock parent */ + unlockbuf /* Unlock stack->buffer if caller wants that */ + ); Assert(left->buf == stack->buffer); }