/*------------------------------------------------------------------------- * * gist.c * interface routines for the postgres GiST index access method. * * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.126 2005/09/22 20:44:36 momjian Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/gist_private.h" #include "access/gistscan.h" #include "access/heapam.h" #include "catalog/index.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "utils/memutils.h" const XLogRecPtr XLogRecPtrForTemp = {1, 1}; /* Working state for gistbuild and its callback */ typedef struct { GISTSTATE giststate; int numindexattrs; double indtuples; MemoryContext tmpCtx; } GISTBuildState; /* non-export function prototypes */ static void gistbuildCallback(Relation index, HeapTuple htup, Datum *values, bool *isnull, bool tupleIsAlive, void *state); static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *GISTstate); static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate); #define ROTATEDIST(d) do { \ SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \ memset(tmp,0,sizeof(SplitedPageLayout)); \ tmp->next = (d); \ (d)=tmp; \ } while(0) /* * Create and return a temporary memory context for use by GiST. We * _always_ invoke user-provided methods in a temporary memory * context, so that memory leaks in those functions cannot cause * problems. Also, we use some additional temporary contexts in the * GiST code itself, to avoid the need to do some awkward manual * memory management. */ MemoryContext createTempGistContext(void) { return AllocSetContextCreate(CurrentMemoryContext, "GiST temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); } /* * Routine to build an index. Basically calls insert over and over. * * XXX: it would be nice to implement some sort of bulk-loading * algorithm, but it is not clear how to do that. */ Datum gistbuild(PG_FUNCTION_ARGS) { Relation heap = (Relation) PG_GETARG_POINTER(0); Relation index = (Relation) PG_GETARG_POINTER(1); IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2); double reltuples; GISTBuildState buildstate; Buffer buffer; /* * We expect to be called exactly once for any index relation. If that's * not the case, big trouble's what we have. */ if (RelationGetNumberOfBlocks(index) != 0) elog(ERROR, "index \"%s\" already contains data", RelationGetRelationName(index)); /* no locking is needed */ initGISTstate(&buildstate.giststate, index); /* initialize the root page */ buffer = gistNewBuffer(index); GISTInitBuffer(buffer, F_LEAF); if (!index->rd_istemp) { XLogRecPtr recptr; XLogRecData rdata; Page page; rdata.buffer = InvalidBuffer; rdata.data = (char *) &(index->rd_node); rdata.len = sizeof(RelFileNode); rdata.next = NULL; page = BufferGetPage(buffer); START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); END_CRIT_SECTION(); } else PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp); LockBuffer(buffer, GIST_UNLOCK); WriteBuffer(buffer); /* build the index */ buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs; buildstate.indtuples = 0; /* * create a temporary memory context that is reset once for each tuple * inserted into the index */ buildstate.tmpCtx = createTempGistContext(); /* do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, gistbuildCallback, (void *) &buildstate); /* okay, all heap tuples are indexed */ MemoryContextDelete(buildstate.tmpCtx); /* since we just counted the # of tuples, may as well update stats */ IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples); freeGISTstate(&buildstate.giststate); PG_RETURN_VOID(); } /* * Per-tuple callback from IndexBuildHeapScan */ static void gistbuildCallback(Relation index, HeapTuple htup, Datum *values, bool *isnull, bool tupleIsAlive, void *state) { GISTBuildState *buildstate = (GISTBuildState *) state; IndexTuple itup; GISTENTRY tmpcentry; int i; MemoryContext oldCtx; /* GiST cannot index tuples with leading NULLs */ if (isnull[0]) return; oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* immediately compress keys to normalize */ for (i = 0; i < buildstate->numindexattrs; i++) { if (isnull[i]) values[i] = (Datum) 0; else { gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i], NULL, NULL, (OffsetNumber) 0, -1 /* size is currently bogus */ , TRUE, FALSE); values[i] = tmpcentry.key; } } /* form an index tuple and point it at the heap tuple */ itup = index_form_tuple(buildstate->giststate.tupdesc, values, isnull); itup->t_tid = htup->t_self; /* * Since we already have the index relation locked, we call gistdoinsert * directly. Normal access method calls dispatch through gistinsert, * which locks the relation for write. This is the right thing to do if * you're inserting single tups, but not when you're initializing the * whole index at once. */ gistdoinsert(index, itup, &buildstate->giststate); buildstate->indtuples += 1; MemoryContextSwitchTo(oldCtx); MemoryContextReset(buildstate->tmpCtx); } /* * gistinsert -- wrapper for GiST tuple insertion. * * This is the public interface routine for tuple insertion in GiSTs. * It doesn't do any work; just locks the relation and passes the buck. */ Datum gistinsert(PG_FUNCTION_ARGS) { Relation r = (Relation) PG_GETARG_POINTER(0); Datum *values = (Datum *) PG_GETARG_POINTER(1); bool *isnull = (bool *) PG_GETARG_POINTER(2); ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3); #ifdef NOT_USED Relation heapRel = (Relation) PG_GETARG_POINTER(4); bool checkUnique = PG_GETARG_BOOL(5); #endif IndexTuple itup; GISTSTATE giststate; GISTENTRY tmpentry; int i; MemoryContext oldCtx; MemoryContext insertCtx; /* GiST cannot index tuples with leading NULLs */ if (isnull[0]) PG_RETURN_BOOL(false); insertCtx = createTempGistContext(); oldCtx = MemoryContextSwitchTo(insertCtx); initGISTstate(&giststate, r); /* immediately compress keys to normalize */ for (i = 0; i < r->rd_att->natts; i++) { if (isnull[i]) values[i] = (Datum) 0; else { gistcentryinit(&giststate, i, &tmpentry, values[i], NULL, NULL, (OffsetNumber) 0, -1 /* size is currently bogus */ , TRUE, FALSE); values[i] = tmpentry.key; } } itup = index_form_tuple(giststate.tupdesc, values, isnull); itup->t_tid = *ht_ctid; gistdoinsert(r, itup, &giststate); /* cleanup */ freeGISTstate(&giststate); MemoryContextSwitchTo(oldCtx); MemoryContextDelete(insertCtx); PG_RETURN_BOOL(true); } /* * Workhouse routine for doing insertion into a GiST index. Note that * this routine assumes it is invoked in a short-lived memory context, * so it does not bother releasing palloc'd allocations. */ static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) { GISTInsertState state; memset(&state, 0, sizeof(GISTInsertState)); state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); memcpy(state.itup[0], itup, IndexTupleSize(itup)); state.ituplen = 1; state.r = r; state.key = itup->t_tid; state.needInsertComplete = true; state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); state.stack->blkno = GIST_ROOT_BLKNO; gistfindleaf(&state, giststate); gistmakedeal(&state, giststate); } static bool gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { bool is_splitted = false; bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; if (!is_leaf) /* * This node's key has been modified, either because a child split * occurred or because we needed to adjust our key for an insert in a * child node. Therefore, remove the old version of this node's key. */ PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); if (gistnospace(state->stack->page, state->itup, state->ituplen)) { /* no space for insertion */ IndexTuple *itvec, *newitup; int tlen, olen; SplitedPageLayout *dist = NULL, *ptr; is_splitted = true; itvec = gistextractbuffer(state->stack->buffer, &tlen); olen = tlen; itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); if (!state->r->rd_istemp) { XLogRecPtr recptr; XLogRecData *rdata; rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, &(state->key), dist); START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ptr = dist; while (ptr) { PageSetLSN(BufferGetPage(ptr->buffer), recptr); PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); ptr = ptr->next; } END_CRIT_SECTION(); } else { ptr = dist; while (ptr) { PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); ptr = ptr->next; } } state->itup = newitup; state->ituplen = tlen; /* now tlen >= 2 */ if (state->stack->blkno == GIST_ROOT_BLKNO) { gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); state->needInsertComplete = false; ptr = dist; while (ptr) { Page page = (Page) BufferGetPage(ptr->buffer); GistPageGetOpaque(page)->rightlink = (ptr->next) ? ptr->next->block.blkno : InvalidBlockNumber; GistPageGetOpaque(page)->nsn = PageGetLSN(page); LockBuffer(ptr->buffer, GIST_UNLOCK); WriteBuffer(ptr->buffer); ptr = ptr->next; } } else { Page page; BlockNumber rightrightlink = InvalidBlockNumber; SplitedPageLayout *ourpage = NULL; GistNSN oldnsn; GISTPageOpaque opaque; /* move origpage to first in chain */ if (dist->block.blkno != state->stack->blkno) { ptr = dist; while (ptr->next) { if (ptr->next->block.blkno == state->stack->blkno) { ourpage = ptr->next; ptr->next = ptr->next->next; ourpage->next = dist; dist = ourpage; break; } ptr = ptr->next; } Assert(ourpage != NULL); } else ourpage = dist; /* now gets all needed data, and sets nsn's */ page = (Page) BufferGetPage(ourpage->buffer); opaque = GistPageGetOpaque(page); rightrightlink = opaque->rightlink; oldnsn = opaque->nsn; opaque->nsn = PageGetLSN(page); opaque->rightlink = ourpage->next->block.blkno; /* * fills and write all new pages. They isn't linked into tree yet */ ptr = ourpage->next; while (ptr) { page = (Page) BufferGetPage(ptr->buffer); GistPageGetOpaque(page)->rightlink = (ptr->next) ? ptr->next->block.blkno : rightrightlink; /* only for last set oldnsn */ GistPageGetOpaque(page)->nsn = (ptr->next) ? opaque->nsn : oldnsn; LockBuffer(ptr->buffer, GIST_UNLOCK); WriteBuffer(ptr->buffer); ptr = ptr->next; } } WriteNoReleaseBuffer(state->stack->buffer); } else { /* enough space */ XLogRecPtr oldlsn; gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); oldlsn = PageGetLSN(state->stack->page); if (!state->r->rd_istemp) { OffsetNumber noffs = 0, offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)]; XLogRecPtr recptr; XLogRecData *rdata; if (!is_leaf) { /* only on inner page we should delete previous version */ offs[0] = state->stack->childoffnum; noffs = 1; } rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno, offs, noffs, false, state->itup, state->ituplen, &(state->key)); START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); PageSetLSN(state->stack->page, recptr); PageSetTLI(state->stack->page, ThisTimeLineID); END_CRIT_SECTION(); } else PageSetLSN(state->stack->page, XLogRecPtrForTemp); if (state->stack->blkno == GIST_ROOT_BLKNO) state->needInsertComplete = false; WriteNoReleaseBuffer(state->stack->buffer); if (!is_leaf) /* small optimization: inform scan ablout * deleting... */ gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum, PageGetLSN(state->stack->page), oldlsn); if (state->ituplen > 1) { /* previous is_splitted==true */ /* * child was splited, so we must form union for insertion in * parent */ IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); state->itup[0] = newtup; state->ituplen = 1; } else if (is_leaf) { /* * itup[0] store key to adjust parent, we set it to valid to * correct check by GistTupleIsInvalid macro in gistgetadjusted() */ ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); GistTupleSetValid(state->itup[0]); } } return is_splitted; } /* * returns stack of pages, all pages in stack are pinned, and * leaf is X-locked */ static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; GISTPageOpaque opaque; /* * walk down, We don't lock page for a long time, but so we should be * ready to recheck path in a bad case... We remember, that page->lsn * should never be invalid. */ while (true) { if (XLogRecPtrIsInvalid(state->stack->lsn)) state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); LockBuffer(state->stack->buffer, GIST_SHARE); state->stack->page = (Page) BufferGetPage(state->stack->buffer); opaque = GistPageGetOpaque(state->stack->page); state->stack->lsn = PageGetLSN(state->stack->page); Assert(state->r->rd_istemp || !XLogRecPtrIsInvalid(state->stack->lsn)); if (state->stack->blkno != GIST_ROOT_BLKNO && XLByteLT(state->stack->parent->lsn, opaque->nsn)) { /* * caused split non-root page is detected, go up to parent to * choose best child */ LockBuffer(state->stack->buffer, GIST_UNLOCK); ReleaseBuffer(state->stack->buffer); state->stack = state->stack->parent; continue; } if (!GistPageIsLeaf(state->stack->page)) { /* * This is an internal page, so continue to walk down the tree. We * find the child node that has the minimum insertion penalty and * recursively invoke ourselves to modify that node. Once the * recursive call returns, we may need to adjust the parent node * for two reasons: the child node split, or the key in this node * needs to be adjusted for the newly inserted key below us. */ GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); iid = PageGetItemId(state->stack->page, state->stack->childoffnum); idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); LockBuffer(state->stack->buffer, GIST_UNLOCK); item->parent = state->stack; item->child = NULL; if (state->stack) state->stack->child = item; state->stack = item; } else { /* be carefull, during unlock/lock page may be changed... */ LockBuffer(state->stack->buffer, GIST_UNLOCK); LockBuffer(state->stack->buffer, GIST_EXCLUSIVE); state->stack->page = (Page) BufferGetPage(state->stack->buffer); opaque = GistPageGetOpaque(state->stack->page); if (state->stack->blkno == GIST_ROOT_BLKNO) { /* * the only page can become inner instead of leaf is a root * page, so for root we should recheck it */ if (!GistPageIsLeaf(state->stack->page)) { /* * very rarely situation: during unlock/lock index with * number of pages = 1 was increased */ LockBuffer(state->stack->buffer, GIST_UNLOCK); continue; } /* * we don't need to check root split, because checking * leaf/inner is enough to recognize split for root */ } else if (XLByteLT(state->stack->parent->lsn, opaque->nsn)) { /* * detecting split during unlock/lock, so we should find * better child on parent */ /* forget buffer */ LockBuffer(state->stack->buffer, GIST_UNLOCK); ReleaseBuffer(state->stack->buffer); state->stack = state->stack->parent; continue; } state->stack->lsn = PageGetLSN(state->stack->page); /* ok we found a leaf page and it X-locked */ break; } } /* now state->stack->(page, buffer and blkno) points to leaf page */ } /* * Should have the same interface as XLogReadBuffer */ static Buffer gistReadAndLockBuffer(Relation r, BlockNumber blkno) { Buffer buffer = ReadBuffer(r, blkno); LockBuffer(buffer, GIST_SHARE); return buffer; } /* * Traverse the tree to find path from root page, * to prevent deadlocks, it should lock only one page simultaneously. * Function uses in recovery and usial mode, so should work with different * read functions (gistReadAndLockBuffer and XLogReadBuffer) * returns from the begining of closest parent; */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child, Buffer (*myReadBuffer) (Relation, BlockNumber)) { Page page; Buffer buffer; OffsetNumber i, maxoff; ItemId iid; IndexTuple idxtuple; GISTInsertStack *top, *tail, *ptr; BlockNumber blkno; top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); top->blkno = GIST_ROOT_BLKNO; while (top && top->blkno != child) { buffer = myReadBuffer(r, top->blkno); /* buffer locked */ page = (Page) BufferGetPage(buffer); if (GistPageIsLeaf(page)) { /* we can safety go away, follows only leaf pages */ LockBuffer(buffer, GIST_UNLOCK); ReleaseBuffer(buffer); return NULL; } top->lsn = PageGetLSN(page); if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) && GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ ) { /* page splited while we thinking of... */ ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ptr->blkno = GistPageGetOpaque(page)->rightlink; ptr->childoffnum = InvalidOffsetNumber; ptr->parent = top; ptr->next = NULL; tail->next = ptr; tail = ptr; } maxoff = PageGetMaxOffsetNumber(page); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { iid = PageGetItemId(page, i); idxtuple = (IndexTuple) PageGetItem(page, iid); blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); if (blkno == child) { OffsetNumber poff = InvalidOffsetNumber; /* make childs links */ ptr = top; while (ptr->parent) { /* set child link */ ptr->parent->child = ptr; /* move childoffnum.. */ if (ptr == top) { /* first iteration */ poff = ptr->parent->childoffnum; ptr->parent->childoffnum = ptr->childoffnum; } else { OffsetNumber tmp = ptr->parent->childoffnum; ptr->parent->childoffnum = poff; poff = tmp; } ptr = ptr->parent; } top->childoffnum = i; LockBuffer(buffer, GIST_UNLOCK); ReleaseBuffer(buffer); return top; } else { /* Install next inner page to the end of stack */ ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ptr->blkno = blkno; ptr->childoffnum = i; /* set offsetnumber of child to child * !!! */ ptr->parent = top; ptr->next = NULL; tail->next = ptr; tail = ptr; } } LockBuffer(buffer, GIST_UNLOCK); ReleaseBuffer(buffer); top = top->next; } return NULL; } /* * Returns X-locked parent of stack page */ static void gistFindCorrectParent(Relation r, GISTInsertStack *child) { GISTInsertStack *parent = child->parent; LockBuffer(parent->buffer, GIST_EXCLUSIVE); parent->page = (Page) BufferGetPage(parent->buffer); /* here we don't need to distinguish between split and page update */ if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page))) { /* parent is changed, look child in right links until found */ OffsetNumber i, maxoff; ItemId iid; IndexTuple idxtuple; GISTInsertStack *ptr; while (true) { maxoff = PageGetMaxOffsetNumber(parent->page); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { iid = PageGetItemId(parent->page, i); idxtuple = (IndexTuple) PageGetItem(parent->page, iid); if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno) { /* yes!!, found */ parent->childoffnum = i; return; } } parent->blkno = GistPageGetOpaque(parent->page)->rightlink; LockBuffer(parent->buffer, GIST_UNLOCK); ReleaseBuffer(parent->buffer); if (parent->blkno == InvalidBlockNumber) /* * end of chain and still didn't found parent, It's very-very * rare situation when root splited */ break; parent->buffer = ReadBuffer(r, parent->blkno); LockBuffer(parent->buffer, GIST_EXCLUSIVE); parent->page = (Page) BufferGetPage(parent->buffer); } /* * awful!!, we need search tree to find parent ... , but before we * should release all old parent */ ptr = child->parent->parent; /* child->parent already released * above */ while (ptr) { ReleaseBuffer(ptr->buffer); ptr = ptr->parent; } /* ok, find new path */ ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer); Assert(ptr != NULL); /* read all buffers as supposed in caller */ while (ptr) { ptr->buffer = ReadBuffer(r, ptr->blkno); ptr->page = (Page) BufferGetPage(ptr->buffer); ptr = ptr->parent; } /* install new chain of parents to stack */ child->parent = parent; parent->child = child; /* make recursive call to normal processing */ gistFindCorrectParent(r, child); } return; } void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { int is_splitted; ItemId iid; IndexTuple oldtup, newtup; /* walk up */ while (true) { /* * After this call: 1. if child page was splited, then itup contains * keys for each page 2. if child page wasn't splited, then itup * contains additional for adjustment of current key */ if (state->stack->parent) { /* * X-lock parent page before proceed child, gistFindCorrectParent * should find and lock it */ gistFindCorrectParent(state->r, state->stack); } is_splitted = gistplacetopage(state, giststate); /* parent locked above, so release child buffer */ LockBuffer(state->stack->buffer, GIST_UNLOCK); ReleaseBuffer(state->stack->buffer); /* pop parent page from stack */ state->stack = state->stack->parent; /* stack is void */ if (!state->stack) break; /* * child did not split, so we can check is it needed to update parent * tuple */ if (!is_splitted) { /* parent's tuple */ iid = PageGetItemId(state->stack->page, state->stack->childoffnum); oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); if (!newtup) { /* not need to update key */ LockBuffer(state->stack->buffer, GIST_UNLOCK); break; } state->itup[0] = newtup; } } /* while */ /* release all parent buffers */ while (state->stack) { ReleaseBuffer(state->stack->buffer); state->stack = state->stack->parent; } /* say to xlog that insert is completed */ if (state->needInsertComplete && !state->r->rd_istemp) gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); } static void gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) { int i; for (i = 0; i < len; i++) arr[i] = reasloffset[arr[i]]; } /* * gistSplit -- split a page in the tree. */ IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup, /* contains compressed entry */ int *len, SplitedPageLayout **dist, GISTSTATE *giststate) { Page p; Buffer leftbuf, rightbuf; Page left, right; IndexTuple *lvectup, *rvectup, *newtup; BlockNumber lbknum, rbknum; GISTPageOpaque opaque; GIST_SPLITVEC v; GistEntryVector *entryvec; int i, fakeoffset, nlen; OffsetNumber *realoffset; IndexTuple *cleaneditup = itup; int lencleaneditup = *len; p = (Page) BufferGetPage(buffer); opaque = GistPageGetOpaque(p); /* * The root of the tree is the first block in the relation. If we're * about to split the root, we need to do some hocus-pocus to enforce this * guarantee. */ if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO) { leftbuf = gistNewBuffer(r); GISTInitBuffer(leftbuf, opaque->flags & F_LEAF); lbknum = BufferGetBlockNumber(leftbuf); left = (Page) BufferGetPage(leftbuf); } else { leftbuf = buffer; /* IncrBufferRefCount(buffer); */ lbknum = BufferGetBlockNumber(buffer); left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); } rightbuf = gistNewBuffer(r); GISTInitBuffer(rightbuf, opaque->flags & F_LEAF); rbknum = BufferGetBlockNumber(rightbuf); right = (Page) BufferGetPage(rightbuf); /* generate the item array */ realoffset = palloc((*len + 1) * sizeof(OffsetNumber)); entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY)); entryvec->n = *len + 1; fakeoffset = FirstOffsetNumber; for (i = 1; i <= *len; i++) { Datum datum; bool IsNull; if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1])) { entryvec->n--; /* remember position of invalid tuple */ realoffset[entryvec->n] = i; continue; } datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]), datum, r, p, i, ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull); realoffset[fakeoffset] = i; fakeoffset++; } /* * if it was invalid tuple then we need special processing. If it's * possible, we move all invalid tuples on right page. We should remember, * that union with invalid tuples is a invalid tuple. */ if (entryvec->n != *len + 1) { lencleaneditup = entryvec->n - 1; cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple)); for (i = 1; i < entryvec->n; i++) cleaneditup[i - 1] = itup[realoffset[i] - 1]; if (gistnospace(left, cleaneditup, lencleaneditup)) { /* no space on left to put all good tuples, so picksplit */ gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate); v.spl_leftvalid = true; v.spl_rightvalid = false; gistToRealOffset(v.spl_left, v.spl_nleft, realoffset); gistToRealOffset(v.spl_right, v.spl_nright, realoffset); } else { /* we can try to store all valid tuples on one page */ v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber)); v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber)); if (lencleaneditup == 0) { /* all tuples are invalid, so moves half of its to right */ v.spl_leftvalid = v.spl_rightvalid = false; v.spl_nright = 0; v.spl_nleft = 0; for (i = 1; i <= *len; i++) if (i - 1 < *len / 2) v.spl_left[v.spl_nleft++] = i; else v.spl_right[v.spl_nright++] = i; } else { /* * we will not call gistUserPicksplit, just put good tuples on * left and invalid on right */ v.spl_nleft = lencleaneditup; v.spl_nright = 0; for (i = 1; i < entryvec->n; i++) v.spl_left[i - 1] = i; gistToRealOffset(v.spl_left, v.spl_nleft, realoffset); v.spl_lattr[0] = v.spl_ldatum = (Datum) 0; v.spl_rattr[0] = v.spl_rdatum = (Datum) 0; v.spl_lisnull[0] = true; v.spl_risnull[0] = true; gistunionsubkey(r, giststate, itup, &v, true); v.spl_leftvalid = true; v.spl_rightvalid = false; } } } else { /* there is no invalid tuples, so usial processing */ gistUserPicksplit(r, entryvec, &v, itup, *len, giststate); v.spl_leftvalid = v.spl_rightvalid = true; } /* form left and right vector */ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1)); rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1)); for (i = 0; i < v.spl_nleft; i++) lvectup[i] = itup[v.spl_left[i] - 1]; for (i = 0; i < v.spl_nright; i++) rvectup[i] = itup[v.spl_right[i] - 1]; /* place invalid tuples on right page if itsn't done yet */ for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++) { rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1]; } /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { nlen = v.spl_nright; newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); /* ReleaseBuffer(rightbuf); */ } else { char *ptr; gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); /* XLOG stuff */ ROTATEDIST(*dist); (*dist)->block.blkno = BufferGetBlockNumber(rightbuf); (*dist)->block.num = v.spl_nright; (*dist)->list = (IndexTupleData *) palloc(BLCKSZ); ptr = (char *) ((*dist)->list); for (i = 0; i < v.spl_nright; i++) { memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i])); ptr += IndexTupleSize(rvectup[i]); } (*dist)->lenlist = ptr - ((char *) ((*dist)->list)); (*dist)->buffer = rightbuf; nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull) : gist_form_invalid_tuple(rbknum); ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum); } if (gistnospace(left, lvectup, v.spl_nleft)) { int llen = v.spl_nleft; IndexTuple *lntup; lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); /* ReleaseBuffer(leftbuf); */ newtup = gistjoinvector(newtup, &nlen, lntup, llen); } else { char *ptr; gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); /* XLOG stuff */ ROTATEDIST(*dist); (*dist)->block.blkno = BufferGetBlockNumber(leftbuf); (*dist)->block.num = v.spl_nleft; (*dist)->list = (IndexTupleData *) palloc(BLCKSZ); ptr = (char *) ((*dist)->list); for (i = 0; i < v.spl_nleft; i++) { memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i])); ptr += IndexTupleSize(lvectup[i]); } (*dist)->lenlist = ptr - ((char *) ((*dist)->list)); (*dist)->buffer = leftbuf; if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) PageRestoreTempPage(left, p); nlen += 1; newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull) : gist_form_invalid_tuple(lbknum); ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum); } GistClearTuplesDeleted(p); *len = nlen; return newtup; } void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) { Page page; Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); page = BufferGetPage(buffer); GISTInitBuffer(buffer, 0); gistfillbuffer(r, page, itup, len, FirstOffsetNumber); if (!r->rd_istemp) { XLogRecPtr recptr; XLogRecData *rdata; rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO, NULL, 0, false, itup, len, key); START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); END_CRIT_SECTION(); } else PageSetLSN(page, XLogRecPtrForTemp); } void initGISTstate(GISTSTATE *giststate, Relation index) { int i; if (index->rd_att->natts > INDEX_MAX_KEYS) elog(ERROR, "numberOfAttributes %d > %d", index->rd_att->natts, INDEX_MAX_KEYS); giststate->tupdesc = index->rd_att; for (i = 0; i < index->rd_att->natts; i++) { fmgr_info_copy(&(giststate->consistentFn[i]), index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->unionFn[i]), index_getprocinfo(index, i + 1, GIST_UNION_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->compressFn[i]), index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->decompressFn[i]), index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->penaltyFn[i]), index_getprocinfo(index, i + 1, GIST_PENALTY_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->picksplitFn[i]), index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->equalFn[i]), index_getprocinfo(index, i + 1, GIST_EQUAL_PROC), CurrentMemoryContext); } } void freeGISTstate(GISTSTATE *giststate) { /* no work */ }