/*------------------------------------------------------------------------- * * btinsert.c * Item insertion in Lehman and Yao btrees for Postgres. * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.38 1999/04/22 08:19:59 vadim Exp $ * *------------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include #ifndef HAVE_MEMMOVE #include #else #include #endif static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem); static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright); static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit); static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem); static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); /* * _bt_doinsert() -- Handle insertion of a single btitem in the tree. * * This routine is called by the public interface routines, btbuild * and btinsert. By here, btitem is filled in, and has a unique * (xid, seqno) pair. */ InsertIndexResult _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel) { ScanKey itup_scankey; IndexTuple itup; BTStack stack; Buffer buf; BlockNumber blkno; int natts = rel->rd_rel->relnatts; InsertIndexResult res; Buffer buffer; itup = &(btitem->bti_itup); /* we need a scan key to do our search, so build one */ itup_scankey = _bt_mkscankey(rel, itup); /* find the page containing this key */ stack = _bt_search(rel, natts, itup_scankey, &buf); blkno = BufferGetBlockNumber(buf); /* trade in our read lock for a write lock */ _bt_relbuf(rel, buf, BT_READ); l1: buf = _bt_getbuf(rel, blkno, BT_WRITE); /* * If the page was split between the time that we surrendered our read * lock and acquired our write lock, then this page may no longer be * the right place for the key we want to insert. In this case, we * need to move right in the tree. See Lehman and Yao for an * excruciatingly precise description. */ buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE); /* if we're not allowing duplicates, make sure the key isn't */ /* already in the node */ if (index_is_unique) { OffsetNumber offset, maxoff; Page page; page = BufferGetPage(buf); maxoff = PageGetMaxOffsetNumber(page); offset = _bt_binsrch(rel, buf, natts, itup_scankey, BT_DESCENT); /* make sure the offset we're given points to an actual */ /* key on the page before trying to compare it */ if (!PageIsEmpty(page) && offset <= maxoff) { TupleDesc itupdesc; BTItem cbti; HeapTupleData htup; BTPageOpaque opaque; Buffer nbuf; BlockNumber blkno; bool chtup = true; itupdesc = RelationGetDescr(rel); nbuf = InvalidBuffer; opaque = (BTPageOpaque) PageGetSpecialPointer(page); /* * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's * how we handling NULLs - and so we must not use _bt_compare * in real comparison, but only for ordering/finding items on * pages. - vadim 03/24/97 * * while ( !_bt_compare (rel, itupdesc, page, natts, * itup_scankey, offset) ) */ while (_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) { /* they're equal */ /* * Have to check is inserted heap tuple deleted one * (i.e. just moved to another place by vacuum)! */ if (chtup) { htup.t_self = btitem->bti_itup.t_tid; heap_fetch(heapRel, SnapshotDirty, &htup, &buffer); if (htup.t_data == NULL) /* YES! */ break; /* Live tuple was inserted */ ReleaseBuffer(buffer); chtup = false; } cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset)); htup.t_self = cbti->bti_itup.t_tid; heap_fetch(heapRel, SnapshotDirty, &htup, &buffer); if (htup.t_data != NULL) /* it is a duplicate */ { TransactionId xwait = (TransactionIdIsValid(SnapshotDirty->xmin)) ? SnapshotDirty->xmin : SnapshotDirty->xmax; /* * If this tuple is being updated by other transaction * then we have to wait for its commit/abort. */ ReleaseBuffer(buffer); if (TransactionIdIsValid(xwait)) { if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf, BT_READ); _bt_relbuf(rel, buf, BT_WRITE); XactLockTableWait(xwait); goto l1; /* continue from the begin */ } elog(ERROR, "Cannot insert a duplicate key into a unique index"); } /* htup null so no buffer to release */ /* get next offnum */ if (offset < maxoff) offset = OffsetNumberNext(offset); else { /* move right ? */ if (P_RIGHTMOST(opaque)) break; if (!_bt_isequal(itupdesc, page, P_HIKEY, natts, itup_scankey)) break; /* * min key of the right page is the same, ooh - so * many dead duplicates... */ blkno = opaque->btpo_next; if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf, BT_READ); for (nbuf = InvalidBuffer;;) { nbuf = _bt_getbuf(rel, blkno, BT_READ); page = BufferGetPage(nbuf); maxoff = PageGetMaxOffsetNumber(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page); offset = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; if (!PageIsEmpty(page) && offset <= maxoff) { /* Found some key */ break; } else { /* Empty or "pseudo"-empty page - get next */ blkno = opaque->btpo_next; _bt_relbuf(rel, nbuf, BT_READ); nbuf = InvalidBuffer; if (blkno == P_NONE) break; } } if (nbuf == InvalidBuffer) break; } } if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf, BT_READ); } } /* do the insertion */ res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, (BTItem) NULL); /* be tidy */ _bt_freestack(stack); _bt_freeskey(itup_scankey); return res; } /* * _bt_insertonpg() -- Insert a tuple on a particular page in the index. * * This recursive procedure does the following things: * * + if necessary, splits the target page. * + finds the right place to insert the tuple (taking into * account any changes induced by a split). * + inserts the tuple. * + if the page was split, pops the parent stack, and finds the * right place to insert the new child pointer (by walking * right using information stored in the parent stack). * + invoking itself with the appropriate tuple for the right * child page on the parent. * * On entry, we must have the right buffer on which to do the * insertion, and the buffer must be pinned and locked. On return, * we will have dropped both the pin and the write lock on the buffer. * * The locking interactions in this code are critical. You should * grok Lehman and Yao's paper before making any changes. In addition, * you need to understand how we disambiguate duplicate keys in this * implementation, in order to be able to find our location using * L&Y "move right" operations. Since we may insert duplicate user * keys, and since these dups may propogate up the tree, we use the * 'afteritem' parameter to position ourselves correctly for the * insertion on internal pages. */ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem) { InsertIndexResult res; Page page; BTPageOpaque lpageop; BlockNumber itup_blkno; OffsetNumber itup_off; OffsetNumber firstright = InvalidOffsetNumber; int itemsz; bool do_split = false; bool keys_equal = false; page = BufferGetPage(buf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); itemsz = IndexTupleDSize(btitem->bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); itemsz = DOUBLEALIGN(itemsz); /* be safe, PageAddItem will do * this but we need to be * consistent */ /* * If we have to insert item on the leftmost page which is the first * page in the chain of duplicates then: 1. if scankey == hikey (i.e. * - new duplicate item) then insert it here; 2. if scankey < hikey * then: 2.a if there is duplicate key(s) here - we force splitting; * 2.b else - we may "eat" this page from duplicates chain. */ if (lpageop->btpo_flags & BTP_CHAIN) { OffsetNumber maxoff = PageGetMaxOffsetNumber(page); ItemId hitemid; BTItem hitem; Assert(!P_RIGHTMOST(lpageop)); hitemid = PageGetItemId(page, P_HIKEY); hitem = (BTItem) PageGetItem(page, hitemid); if (maxoff > P_HIKEY && !_bt_itemcmp(rel, keysz, hitem, (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)), BTEqualStrategyNumber)) elog(FATAL, "btree: bad key on the page in the chain of duplicates"); if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid, BTEqualStrategyNumber)) { if (!P_LEFTMOST(lpageop)) elog(FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates"); if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid, BTLessStrategyNumber)) elog(FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates"); if (maxoff > P_HIKEY) /* have duplicate(s) */ { firstright = P_FIRSTKEY; do_split = true; } else /* "eat" page */ { Buffer pbuf; Page ppage; itup_blkno = BufferGetBlockNumber(buf); itup_off = PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED); if (itup_off == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item"); lpageop->btpo_flags &= ~BTP_CHAIN; pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); ppage = BufferGetPage(pbuf); PageIndexTupleDelete(ppage, stack->bts_offset); pfree(stack->bts_btitem); stack->bts_btitem = _bt_formitem(&(btitem->bti_itup)); ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), itup_blkno, P_HIKEY); _bt_wrtbuf(rel, buf); res = _bt_insertonpg(rel, pbuf, stack->bts_parent, keysz, scankey, stack->bts_btitem, NULL); ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); return res; } } else { keys_equal = true; if (PageGetFreeSpace(page) < itemsz) do_split = true; } } else if (PageGetFreeSpace(page) < itemsz) do_split = true; else if (PageGetFreeSpace(page) < 3 * itemsz + 2 * sizeof(ItemIdData)) { OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY; OffsetNumber maxoff = PageGetMaxOffsetNumber(page); ItemId itid; BTItem previtem, chkitem; Size maxsize; Size currsize; itid = PageGetItemId(page, offnum); previtem = (BTItem) PageGetItem(page, itid); maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData)); for (offnum = OffsetNumberNext(offnum); offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itid = PageGetItemId(page, offnum); chkitem = (BTItem) PageGetItem(page, itid); if (!_bt_itemcmp(rel, keysz, previtem, chkitem, BTEqualStrategyNumber)) { if (currsize > maxsize) maxsize = currsize; currsize = 0; previtem = chkitem; } currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData)); } if (currsize > maxsize) maxsize = currsize; maxsize += sizeof(PageHeaderData) + DOUBLEALIGN(sizeof(BTPageOpaqueData)); if (maxsize >= PageGetPageSize(page) / 2) do_split = true; } if (do_split) { Buffer rbuf; Page rpage; BTItem ritem; BlockNumber rbknum; BTPageOpaque rpageop; Buffer pbuf; Page ppage; BTPageOpaque ppageop; BlockNumber bknum = BufferGetBlockNumber(buf); BTItem lowLeftItem; OffsetNumber maxoff; bool shifted = false; bool left_chained = (lpageop->btpo_flags & BTP_CHAIN) ? true : false; bool is_root = lpageop->btpo_flags & BTP_ROOT; /* * If we have to split leaf page in the chain of duplicates by new * duplicate then we try to look at our right sibling first. */ if ((lpageop->btpo_flags & BTP_CHAIN) && (lpageop->btpo_flags & BTP_LEAF) && keys_equal) { bool use_left = true; rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); rpage = BufferGetPage(rbuf); rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); if (!P_RIGHTMOST(rpageop)) /* non-rightmost page */ { /* If we have the same hikey here then * it's yet another page in chain. */ if (_bt_skeycmp(rel, keysz, scankey, rpage, PageGetItemId(rpage, P_HIKEY), BTEqualStrategyNumber)) { if (!(rpageop->btpo_flags & BTP_CHAIN)) elog(FATAL, "btree: lost page in the chain of duplicates"); } else if (_bt_skeycmp(rel, keysz, scankey, rpage, PageGetItemId(rpage, P_HIKEY), BTGreaterStrategyNumber)) elog(FATAL, "btree: hikey is out of order"); else if (rpageop->btpo_flags & BTP_CHAIN) /* * If hikey > scankey then it's last page in chain and * BTP_CHAIN must be OFF */ elog(FATAL, "btree: lost last page in the chain of duplicates"); /* if there is room here then we use this page. */ if (PageGetFreeSpace(rpage) > itemsz) use_left = false; } else /* rightmost page */ { Assert(!(rpageop->btpo_flags & BTP_CHAIN)); /* if there is room here then we use this page. */ if (PageGetFreeSpace(rpage) > itemsz) use_left = false; } if (!use_left) /* insert on the right page */ { _bt_relbuf(rel, buf, BT_WRITE); return (_bt_insertonpg(rel, rbuf, stack, keysz, scankey, btitem, afteritem)); } _bt_relbuf(rel, rbuf, BT_WRITE); } /* * If after splitting un-chained page we'll got chain of pages * with duplicates then we want to know 1. on which of two pages * new btitem will go (current _bt_findsplitloc is quite bad); 2. * what parent (if there's one) thinking about it (remember about * deletions) */ else if (!(lpageop->btpo_flags & BTP_CHAIN)) { OffsetNumber start = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY; Size llimit; maxoff = PageGetMaxOffsetNumber(page); llimit = PageGetPageSize(page) - sizeof(PageHeaderData) - DOUBLEALIGN(sizeof(BTPageOpaqueData)) +sizeof(ItemIdData); llimit /= 2; firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit); if (_bt_itemcmp(rel, keysz, (BTItem) PageGetItem(page, PageGetItemId(page, start)), (BTItem) PageGetItem(page, PageGetItemId(page, firstright)), BTEqualStrategyNumber)) { if (_bt_skeycmp(rel, keysz, scankey, page, PageGetItemId(page, firstright), BTLessStrategyNumber)) /* * force moving current items to the new page: new * item will go on the current page. */ firstright = start; else /* * new btitem >= firstright, start item == firstright * - new chain of duplicates: if this non-leftmost * leaf page and parent item < start item then force * moving all items to the new page - current page * will be "empty" after it. */ { if (!P_LEFTMOST(lpageop) && (lpageop->btpo_flags & BTP_LEAF)) { ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), bknum, P_HIKEY); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); if (_bt_itemcmp(rel, keysz, stack->bts_btitem, (BTItem) PageGetItem(page, PageGetItemId(page, start)), BTLessStrategyNumber)) { firstright = start; shifted = true; } _bt_relbuf(rel, pbuf, BT_WRITE); } } } /* else - no new chain if start item < * firstright one */ } /* split the buffer into left and right halves */ rbuf = _bt_split(rel, buf, firstright); /* which new page (left half or right half) gets the tuple? */ if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) { /* left page */ itup_off = _bt_pgaddtup(rel, buf, keysz, scankey, itemsz, btitem, afteritem); itup_blkno = BufferGetBlockNumber(buf); } else { /* right page */ itup_off = _bt_pgaddtup(rel, rbuf, keysz, scankey, itemsz, btitem, afteritem); itup_blkno = BufferGetBlockNumber(rbuf); } maxoff = PageGetMaxOffsetNumber(page); if (shifted) { if (maxoff > P_FIRSTKEY) elog(FATAL, "btree: shifted page is not empty"); lowLeftItem = (BTItem) NULL; } else { if (maxoff < P_FIRSTKEY) elog(FATAL, "btree: un-shifted page is empty"); lowLeftItem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); if (_bt_itemcmp(rel, keysz, lowLeftItem, (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), BTEqualStrategyNumber)) lpageop->btpo_flags |= BTP_CHAIN; } /* * By here, * * + our target page has been split; + the original tuple has been * inserted; + we have write locks on both the old (left half) * and new (right half) buffers, after the split; and + we have * the key we want to insert into the parent. * * Do the parent insertion. We need to hold onto the locks for the * child pages until we locate the parent, but we can release them * before doing the actual insertion (see Lehman and Yao for the * reasoning). */ l_spl:; if (stack == (BTStack) NULL) { if (!is_root) /* if this page was not root page */ { elog(DEBUG, "btree: concurrent ROOT page split"); stack = (BTStack) palloc(sizeof(BTStackData)); stack->bts_blkno = lpageop->btpo_parent; stack->bts_offset = InvalidOffsetNumber; stack->bts_btitem = (BTItem) palloc(sizeof(BTItemData)); /* bts_btitem will be initialized below */ stack->bts_parent = NULL; goto l_spl; } /* create a new root node and release the split buffers */ _bt_newroot(rel, buf, rbuf); _bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, rbuf, BT_WRITE); } else { ScanKey newskey; InsertIndexResult newres; BTItem new_item; OffsetNumber upditem_offset = P_HIKEY; bool do_update = false; bool update_in_place = true; bool parent_chained; /* form a index tuple that points at the new right page */ rbknum = BufferGetBlockNumber(rbuf); rpage = BufferGetPage(rbuf); rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); /* * By convention, the first entry (1) on every non-rightmost * page is the high key for that page. In order to get the * lowest key on the new right page, we actually look at its * second (2) entry. */ if (!P_RIGHTMOST(rpageop)) { ritem = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_FIRSTKEY)); if (_bt_itemcmp(rel, keysz, ritem, (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY)), BTEqualStrategyNumber)) rpageop->btpo_flags |= BTP_CHAIN; } else ritem = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY)); /* get a unique btitem for this key */ new_item = _bt_formitem(&(ritem->bti_itup)); ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); /* * Find the parent buffer and get the parent page. * * Oops - if we were moved right then we need to change stack * item! We want to find parent pointing to where we are, * right ? - vadim 05/27/97 */ ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), bknum, P_HIKEY); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); ppage = BufferGetPage(pbuf); ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); parent_chained = ((ppageop->btpo_flags & BTP_CHAIN)) ? true : false; if (parent_chained && !left_chained) elog(FATAL, "nbtree: unexpected chained parent of unchained page"); /* * If the key of new_item is < than the key of the item in the * parent page pointing to the left page (stack->bts_btitem), * we have to update the latter key; otherwise the keys on the * parent page wouldn't be monotonically increasing after we * inserted the new pointer to the right page (new_item). This * only happens if our left page is the leftmost page and a * new minimum key had been inserted before, which is not * reflected in the parent page but didn't matter so far. If * there are duplicate keys and this new minimum key spills * over to our new right page, we get an inconsistency if we * don't update the left key in the parent page. * * Also, new duplicates handling code require us to update parent * item if some smaller items left on the left page (which is * possible in splitting leftmost page) and current parent * item == new_item. - vadim 05/27/97 */ if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item, BTGreaterStrategyNumber) || (!shifted && _bt_itemcmp(rel, keysz, stack->bts_btitem, new_item, BTEqualStrategyNumber) && _bt_itemcmp(rel, keysz, lowLeftItem, new_item, BTLessStrategyNumber))) { do_update = true; /* * figure out which key is leftmost (if the parent page is * rightmost, too, it must be the root) */ if (P_RIGHTMOST(ppageop)) upditem_offset = P_HIKEY; else upditem_offset = P_FIRSTKEY; if (!P_LEFTMOST(lpageop) || stack->bts_offset != upditem_offset) elog(FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)", P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset); } if (do_update) { if (shifted) elog(FATAL, "btree: attempt to update parent for shifted page"); /* * Try to update in place. If out parent page is chained * then we must forse insertion. */ if (!parent_chained && DOUBLEALIGN(IndexTupleDSize(lowLeftItem->bti_itup)) == DOUBLEALIGN(IndexTupleDSize(stack->bts_btitem->bti_itup))) { _bt_updateitem(rel, keysz, pbuf, stack->bts_btitem, lowLeftItem); _bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, rbuf, BT_WRITE); } else { update_in_place = false; PageIndexTupleDelete(ppage, upditem_offset); /* * don't write anything out yet--we still have the * write lock, and now we call another _bt_insertonpg * to insert the correct key. First, make a new item, * using the tuple data from lowLeftItem. Point it to * the left child. Update it on the stack at the same * time. */ pfree(stack->bts_btitem); stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup)); ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), bknum, P_HIKEY); /* * Unlock the children before doing this * * Mmm ... I foresee problems here. - vadim 06/10/97 */ _bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, rbuf, BT_WRITE); /* * A regular _bt_binsrch should find the right place * to put the new entry, since it should be lower than * any other key on the page. Therefore set afteritem * to NULL. */ newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup)); newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, keysz, newskey, stack->bts_btitem, NULL); pfree(newres); pfree(newskey); /* * we have now lost our lock on the parent buffer, and * need to get it back. */ pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); } } else { _bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, rbuf, BT_WRITE); } newskey = _bt_mkscankey(rel, &(new_item->bti_itup)); afteritem = stack->bts_btitem; if (parent_chained && !update_in_place) { ppage = BufferGetPage(pbuf); ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); if (ppageop->btpo_flags & BTP_CHAIN) elog(FATAL, "btree: unexpected BTP_CHAIN flag in parent after update"); if (P_RIGHTMOST(ppageop)) elog(FATAL, "btree: chained parent is RIGHTMOST after update"); maxoff = PageGetMaxOffsetNumber(ppage); if (maxoff != P_FIRSTKEY) elog(FATAL, "btree: FIRSTKEY was unexpected in parent after update"); if (_bt_skeycmp(rel, keysz, newskey, ppage, PageGetItemId(ppage, P_FIRSTKEY), BTLessEqualStrategyNumber)) elog(FATAL, "btree: parent FIRSTKEY is >= duplicate key after update"); if (!_bt_skeycmp(rel, keysz, newskey, ppage, PageGetItemId(ppage, P_HIKEY), BTEqualStrategyNumber)) elog(FATAL, "btree: parent HIGHKEY is not equal duplicate key after update"); afteritem = (BTItem) NULL; } else if (left_chained && !update_in_place) { ppage = BufferGetPage(pbuf); ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); if (!P_RIGHTMOST(ppageop) && _bt_skeycmp(rel, keysz, newskey, ppage, PageGetItemId(ppage, P_HIKEY), BTGreaterStrategyNumber)) afteritem = (BTItem) NULL; } if (afteritem == (BTItem) NULL) { rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE); _bt_relbuf(rel, pbuf, BT_WRITE); pbuf = rbuf; } newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, keysz, newskey, new_item, afteritem); /* be tidy */ pfree(newres); pfree(newskey); pfree(new_item); } } else { itup_off = _bt_pgaddtup(rel, buf, keysz, scankey, itemsz, btitem, afteritem); itup_blkno = BufferGetBlockNumber(buf); _bt_relbuf(rel, buf, BT_WRITE); } /* by here, the new tuple is inserted */ res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); return res; } /* * _bt_split() -- split a page in the btree. * * On entry, buf is the page to split, and is write-locked and pinned. * Returns the new right sibling of buf, pinned and write-locked. The * pin and lock on buf are maintained. */ static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright) { Buffer rbuf; Page origpage; Page leftpage, rightpage; BTPageOpaque ropaque, lopaque, oopaque; Buffer sbuf; Page spage; BTPageOpaque sopaque; Size itemsz; ItemId itemid; BTItem item; OffsetNumber leftoff, rightoff; OffsetNumber start; OffsetNumber maxoff; OffsetNumber i; rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); rightpage = BufferGetPage(rbuf); _bt_pageinit(rightpage, BufferGetPageSize(rbuf)); _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* init btree private data */ oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); /* if we're splitting this page, it won't be the root when we're done */ oopaque->btpo_flags &= ~BTP_ROOT; oopaque->btpo_flags &= ~BTP_CHAIN; lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; ropaque->btpo_prev = BufferGetBlockNumber(buf); lopaque->btpo_next = BufferGetBlockNumber(rbuf); ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent; /* * If the page we're splitting is not the rightmost page at its level * in the tree, then the first (0) entry on the page is the high key * for the page. We need to copy that to the right half. Otherwise * (meaning the rightmost page case), we should treat the line * pointers beginning at zero as user data. * * We leave a blank space at the start of the line table for the left * page. We'll come back later and fill it in with the high key item * we get from the right key. */ leftoff = P_FIRSTKEY; ropaque->btpo_next = oopaque->btpo_next; if (!P_RIGHTMOST(oopaque)) { /* splitting a non-rightmost page, start at the first data item */ start = P_FIRSTKEY; itemid = PageGetItemId(origpage, P_HIKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add hikey to the right sibling"); rightoff = P_FIRSTKEY; } else { /* splitting a rightmost page, "high key" is the first data item */ start = P_HIKEY; /* the new rightmost page will not have a high key */ rightoff = P_HIKEY; } maxoff = PageGetMaxOffsetNumber(origpage); if (firstright == InvalidOffsetNumber) { Size llimit = PageGetFreeSpace(leftpage) / 2; firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit); } for (i = start; i <= maxoff; i = OffsetNumberNext(i)) { itemid = PageGetItemId(origpage, i); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(origpage, itemid); /* decide which page to put it on */ if (i < firstright) { if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item to the left sibling"); leftoff = OffsetNumberNext(leftoff); } else { if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item to the right sibling"); rightoff = OffsetNumberNext(rightoff); } } /* * Okay, page has been split, high key on right page is correct. Now * set the high key on the left page to be the min key on the right * page. */ if (P_RIGHTMOST(ropaque)) itemid = PageGetItemId(rightpage, P_HIKEY); else itemid = PageGetItemId(rightpage, P_FIRSTKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(rightpage, itemid); /* * We left a hole for the high key on the left page; fill it. The * modal crap is to tell the page manager to put the new item on the * page and not screw around with anything else. Whoever designed * this interface has presumably crawled back into the dung heap they * came from. No one here will admit to it. */ PageManagerModeSet(OverwritePageManagerMode); if (PageAddItem(leftpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add hikey to the left sibling"); PageManagerModeSet(ShufflePageManagerMode); /* * By here, the original data page has been split into two new halves, * and these are correct. The algorithm requires that the left page * never move during a split, so we copy the new left page back on top * of the original. Note that this is not a waste of time, since we * also require (in the page management code) that the center of a * page always be clean, and the most efficient way to guarantee this * is just to compact the data by reinserting it into a new left page. */ PageRestoreTempPage(leftpage, origpage); /* write these guys out */ _bt_wrtnorelbuf(rel, rbuf); _bt_wrtnorelbuf(rel, buf); /* * Finally, we need to grab the right sibling (if any) and fix the * prev pointer there. We are guaranteed that this is deadlock-free * since no other writer will be moving holding a lock on that page * and trying to move left, and all readers release locks on a page * before trying to fetch its neighbors. */ if (!P_RIGHTMOST(ropaque)) { sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); sopaque->btpo_prev = BufferGetBlockNumber(rbuf); /* write and release the old right sibling */ _bt_wrtbuf(rel, sbuf); } /* split's done */ return rbuf; } /* * _bt_findsplitloc() -- find a safe place to split a page. * * In order to guarantee the proper handling of searches for duplicate * keys, the first duplicate in the chain must either be the first * item on the page after the split, or the entire chain must be on * one of the two pages. That is, * [1 2 2 2 3 4 5] * must become * [1] [2 2 2 3 4 5] * or * [1 2 2 2] [3 4 5] * but not * [1 2 2] [2 3 4 5]. * However, * [2 2 2 2 2 3 4] * may be split as * [2 2 2 2] [2 3 4]. */ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit) { OffsetNumber i; OffsetNumber saferight; ItemId nxtitemid, safeitemid; BTItem safeitem, nxtitem; Size nbytes; int natts; if (start >= maxoff) elog(FATAL, "btree: cannot split if start (%d) >= maxoff (%d)", start, maxoff); natts = rel->rd_rel->relnatts; saferight = start; safeitemid = PageGetItemId(page, saferight); nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData); safeitem = (BTItem) PageGetItem(page, safeitemid); i = OffsetNumberNext(start); while (nbytes < llimit) { /* check the next item on the page */ nxtitemid = PageGetItemId(page, i); nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData)); nxtitem = (BTItem) PageGetItem(page, nxtitemid); /* * Test against last known safe item: if the tuple we're looking * at isn't equal to the last safe one we saw, then it's our new * safe tuple. */ if (!_bt_itemcmp(rel, natts, safeitem, nxtitem, BTEqualStrategyNumber)) { safeitem = nxtitem; saferight = i; } if (i < maxoff) i = OffsetNumberNext(i); else break; } /* * If the chain of dups starts at the beginning of the page and * extends past the halfway mark, we can split it in the middle. */ if (saferight == start) saferight = i; if (saferight == maxoff && (maxoff - start) > 1) saferight = start + (maxoff - start) / 2; return saferight; } /* * _bt_newroot() -- Create a new root page for the index. * * We've just split the old root page and need to create a new one. * In order to do this, we add a new root page to the file, then lock * the metadata page and update it. This is guaranteed to be deadlock- * free, because all readers release their locks on the metadata page * before trying to lock the root, and all writers lock the root before * trying to lock the metadata page. We have a write lock on the old * root page, so we have not introduced any cycles into the waits-for * graph. * * On entry, lbuf (the old root) and rbuf (its new peer) are write- * locked. We don't drop the locks in this routine; that's done by * the caller. On exit, a new root page exists with entries for the * two new children. The new root page is neither pinned nor locked. */ static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { Buffer rootbuf; Page lpage, rpage, rootpage; BlockNumber lbkno, rbkno; BlockNumber rootbknum; BTPageOpaque rootopaque; ItemId itemid; BTItem item; Size itemsz; BTItem new_item; /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootpage = BufferGetPage(rootbuf); rootbknum = BufferGetBlockNumber(rootbuf); _bt_pageinit(rootpage, BufferGetPageSize(rootbuf)); /* set btree special data */ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE; rootopaque->btpo_flags |= BTP_ROOT; /* * Insert the internal tuple pointers. */ lbkno = BufferGetBlockNumber(lbuf); rbkno = BufferGetBlockNumber(rbuf); lpage = BufferGetPage(lbuf); rpage = BufferGetPage(rbuf); ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent = ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent = rootbknum; /* * step over the high key on the left page while building the left * page pointer. */ itemid = PageGetItemId(lpage, P_FIRSTKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(lpage, itemid); new_item = _bt_formitem(&(item->bti_itup)); ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY); /* * insert the left page pointer into the new root page. the root page * is the rightmost page on its level so the "high key" item is the * first data item. */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add leftkey to new root page"); pfree(new_item); /* * the right page is the rightmost page on the second level, so the * "high key" item is the first data item on that page as well. */ itemid = PageGetItemId(rpage, P_HIKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(rpage, itemid); new_item = _bt_formitem(&(item->bti_itup)); ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY); /* * insert the right page pointer into the new root page. */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add rightkey to new root page"); pfree(new_item); /* write and let go of the root buffer */ _bt_wrtbuf(rel, rootbuf); /* update metadata page with new root block number */ _bt_metaproot(rel, rootbknum, 0); WriteNoReleaseBuffer(lbuf); WriteNoReleaseBuffer(rbuf); } /* * _bt_pgaddtup() -- add a tuple to a particular page in the index. * * This routine adds the tuple to the page as requested, and keeps the * write lock and reference associated with the page's buffer. It is * an error to call pgaddtup() without a write lock and reference. If * afteritem is non-null, it's the item that we expect our new item * to follow. Otherwise, we do a binary search for the correct place * and insert the new item there. */ static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem) { OffsetNumber itup_off; OffsetNumber first; Page page; BTPageOpaque opaque; BTItem chkitem; page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); first = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; if (afteritem == (BTItem) NULL) itup_off = _bt_binsrch(rel, buf, keysz, itup_scankey, BT_INSERTION); else { itup_off = first; do { chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, itup_off)); itup_off = OffsetNumberNext(itup_off); } while (!BTItemSame(chkitem, afteritem)); } if (PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item to the page"); /* write the buffer, but hold our lock */ _bt_wrtnorelbuf(rel, buf); return itup_off; } /* * _bt_goesonpg() -- Does a new tuple belong on this page? * * This is part of the complexity introduced by allowing duplicate * keys into the index. The tuple belongs on this page if: * * + there is no page to the right of this one; or * + it is less than the high key on the page; or * + the item it is to follow ("afteritem") appears on this * page. */ static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem) { Page page; ItemId hikey; BTPageOpaque opaque; BTItem chkitem; OffsetNumber offnum, maxoff; bool found; page = BufferGetPage(buf); /* no right neighbor? */ opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (P_RIGHTMOST(opaque)) return true; /* * this is a non-rightmost page, so it must have a high key item. * * If the scan key is < the high key (the min key on the next page), then * it for sure belongs here. */ hikey = PageGetItemId(page, P_HIKEY); if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTLessStrategyNumber)) return true; /* * If the scan key is > the high key, then it for sure doesn't belong * here. */ if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTGreaterStrategyNumber)) return false; /* * If we have no adjacency information, and the item is equal to the * high key on the page (by here it is), then the item does not belong * on this page. * * Now it's not true in all cases. - vadim 06/10/97 */ if (afteritem == (BTItem) NULL) { if (opaque->btpo_flags & BTP_LEAF) return false; if (opaque->btpo_flags & BTP_CHAIN) return true; if (_bt_skeycmp(rel, keysz, scankey, page, PageGetItemId(page, P_FIRSTKEY), BTEqualStrategyNumber)) return true; return false; } /* damn, have to work for it. i hate that. */ maxoff = PageGetMaxOffsetNumber(page); /* * Search the entire page for the afteroid. We need to do this, * rather than doing a binary search and starting from there, because * if the key we're searching for is the leftmost key in the tree at * this level, then a binary search will do the wrong thing. Splits * are pretty infrequent, so the cost isn't as bad as it could be. */ found = false; for (offnum = P_FIRSTKEY; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); if (BTItemSame(chkitem, afteritem)) { found = true; break; } } return found; } /* * _bt_itemcmp() -- compare item1 to item2 using a requested * strategy (<, <=, =, >=, >) * */ bool _bt_itemcmp(Relation rel, Size keysz, BTItem item1, BTItem item2, StrategyNumber strat) { TupleDesc tupDes; IndexTuple indexTuple1, indexTuple2; Datum attrDatum1, attrDatum2; int i; bool isFirstNull, isSecondNull; bool compare; bool useEqual = false; if (strat == BTLessEqualStrategyNumber) { useEqual = true; strat = BTLessStrategyNumber; } else if (strat == BTGreaterEqualStrategyNumber) { useEqual = true; strat = BTGreaterStrategyNumber; } tupDes = RelationGetDescr(rel); indexTuple1 = &(item1->bti_itup); indexTuple2 = &(item2->bti_itup); for (i = 1; i <= keysz; i++) { attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull); attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull); /* see comments about NULLs handling in btbuild */ if (isFirstNull) /* attr in item1 is NULL */ { if (isSecondNull) /* attr in item2 is NULL too */ compare = (strat == BTEqualStrategyNumber) ? true : false; else compare = (strat == BTGreaterStrategyNumber) ? true : false; } else if (isSecondNull) /* attr in item1 is NOT_NULL and */ { /* and attr in item2 is NULL */ compare = (strat == BTLessStrategyNumber) ? true : false; } else compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2); if (compare) /* true for one of ">, <, =" */ { if (strat != BTEqualStrategyNumber) return true; } else /* false for one of ">, <, =" */ { if (strat == BTEqualStrategyNumber) return false; /* * if original strat was "<=, >=" OR "<, >" but some * attribute(s) left - need to test for Equality */ if (useEqual || i < keysz) { if (isFirstNull || isSecondNull) compare = (isFirstNull && isSecondNull) ? true : false; else compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, attrDatum1, attrDatum2); if (compare) /* item1' and item2' attributes are equal */ continue; /* - try to compare next attributes */ } return false; } } return true; } /* * _bt_updateitem() -- updates the key of the item identified by the * oid with the key of newItem (done in place if * possible) * */ static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem) { Page page; OffsetNumber maxoff; OffsetNumber i; ItemPointerData itemPtrData; BTItem item; IndexTuple oldIndexTuple, newIndexTuple; int first; page = BufferGetPage(buf); maxoff = PageGetMaxOffsetNumber(page); /* locate item on the page */ first = P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page)) ? P_HIKEY : P_FIRSTKEY; i = first; do { item = (BTItem) PageGetItem(page, PageGetItemId(page, i)); i = OffsetNumberNext(i); } while (i <= maxoff && !BTItemSame(item, oldItem)); /* this should never happen (in theory) */ if (!BTItemSame(item, oldItem)) elog(FATAL, "_bt_getstackbuf was lying!!"); /* * It's defined by caller (_bt_insertonpg) */ /* * if(IndexTupleDSize(newItem->bti_itup) > * IndexTupleDSize(item->bti_itup)) { elog(NOTICE, "trying to * overwrite a smaller value with a bigger one in _bt_updateitem"); * elog(ERROR, "this is not good."); } */ oldIndexTuple = &(item->bti_itup); newIndexTuple = &(newItem->bti_itup); /* keep the original item pointer */ ItemPointerCopy(&(oldIndexTuple->t_tid), &itemPtrData); CopyIndexTuple(newIndexTuple, &oldIndexTuple); ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid)); } /* * _bt_isequal - used in _bt_doinsert in check for duplicates. * * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too. */ static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey) { Datum datum; BTItem btitem; IndexTuple itup; ScanKey entry; AttrNumber attno; long result; int i; bool null; btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); itup = &(btitem->bti_itup); for (i = 1; i <= keysz; i++) { entry = &scankey[i - 1]; attno = entry->sk_attno; Assert(attno == i); datum = index_getattr(itup, attno, itupdesc, &null); /* NULLs are not equal */ if (entry->sk_flags & SK_ISNULL || null) return false; result = (long) FMGR_PTR2(&entry->sk_func, entry->sk_argument, datum); if (result != 0) return false; } /* by here, the keys are equal */ return true; } #ifdef NOT_USED /* * _bt_shift - insert btitem on the passed page after shifting page * to the right in the tree. * * NOTE: tested for shifting leftmost page only, having btitem < hikey. */ static InsertIndexResult _bt_shift(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey) { InsertIndexResult res; int itemsz; Page page; BlockNumber bknum; BTPageOpaque pageop; Buffer rbuf; Page rpage; BTPageOpaque rpageop; Buffer pbuf; Page ppage; BTPageOpaque ppageop; Buffer nbuf; Page npage; BTPageOpaque npageop; BlockNumber nbknum; BTItem nitem; OffsetNumber afteroff; btitem = _bt_formitem(&(btitem->bti_itup)); hikey = _bt_formitem(&(hikey->bti_itup)); page = BufferGetPage(buf); /* grab new page */ nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); nbknum = BufferGetBlockNumber(nbuf); npage = BufferGetPage(nbuf); _bt_pageinit(npage, BufferGetPageSize(nbuf)); npageop = (BTPageOpaque) PageGetSpecialPointer(npage); /* copy content of the passed page */ memmove((char *) npage, (char *) page, BufferGetPageSize(buf)); /* re-init old (passed) page */ _bt_pageinit(page, BufferGetPageSize(buf)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); /* init old page opaque */ pageop->btpo_flags = npageop->btpo_flags; /* restore flags */ pageop->btpo_flags &= ~BTP_CHAIN; if (_bt_itemcmp(rel, keysz, hikey, btitem, BTEqualStrategyNumber)) pageop->btpo_flags |= BTP_CHAIN; pageop->btpo_prev = npageop->btpo_prev; /* restore prev */ pageop->btpo_next = nbknum; /* next points to the new page */ pageop->btpo_parent = npageop->btpo_parent; /* init shifted page opaque */ npageop->btpo_prev = bknum = BufferGetBlockNumber(buf); /* shifted page is ok, populate old page */ /* add passed hikey */ itemsz = IndexTupleDSize(hikey->bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); itemsz = DOUBLEALIGN(itemsz); if (PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add hikey in _bt_shift"); pfree(hikey); /* add btitem */ itemsz = IndexTupleDSize(btitem->bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); itemsz = DOUBLEALIGN(itemsz); if (PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add firstkey in _bt_shift"); pfree(btitem); nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); btitem = _bt_formitem(&(nitem->bti_itup)); ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY); /* ok, write them out */ _bt_wrtnorelbuf(rel, nbuf); _bt_wrtnorelbuf(rel, buf); /* fix btpo_prev on right sibling of old page */ if (!P_RIGHTMOST(npageop)) { rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE); rpage = BufferGetPage(rbuf); rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); rpageop->btpo_prev = nbknum; _bt_wrtbuf(rel, rbuf); } /* get parent pointing to the old page */ ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), bknum, P_HIKEY); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); ppage = BufferGetPage(pbuf); ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); _bt_relbuf(rel, nbuf, BT_WRITE); _bt_relbuf(rel, buf, BT_WRITE); /* re-set parent' pointer - we shifted our page to the right ! */ nitem = (BTItem) PageGetItem(ppage, PageGetItemId(ppage, stack->bts_offset)); ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY); ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY); _bt_wrtnorelbuf(rel, pbuf); /* * Now we want insert into the parent pointer to our old page. It has * to be inserted before the pointer to new page. You may get problems * here (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I * don't know. It works if old page is leftmost (nitem is NULL) and * btitem < hikey and it's all what we need currently. - vadim * 05/30/97 */ nitem = NULL; afteroff = P_FIRSTKEY; if (!P_RIGHTMOST(ppageop)) afteroff = OffsetNumberNext(afteroff); if (stack->bts_offset >= afteroff) { afteroff = OffsetNumberPrev(stack->bts_offset); nitem = (BTItem) PageGetItem(ppage, PageGetItemId(ppage, afteroff)); nitem = _bt_formitem(&(nitem->bti_itup)); } res = _bt_insertonpg(rel, pbuf, stack->bts_parent, keysz, scankey, btitem, nitem); pfree(btitem); ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY); return res; } #endif