postgresql/src/backend/access/nbtree/nbtree.c

1119 lines
30 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* nbtree.c
* Implementation of Lehman and Yao's btree management algorithm for
* Postgres.
*
* NOTES
* This file contains only the public interface routines.
*
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.75 2001/01/24 19:42:48 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
1996-11-05 11:35:38 +01:00
#include "access/genam.h"
#include "access/heapam.h"
1999-07-16 07:00:38 +02:00
#include "access/nbtree.h"
#include "catalog/index.h"
1999-07-16 07:00:38 +02:00
#include "executor/executor.h"
#include "miscadmin.h"
#include "storage/sinval.h"
bool BuildingBtree = false; /* see comment in btbuild() */
bool FastBuild = true; /* use sort/build instead of insertion
* build */
2000-10-21 17:43:36 +02:00
#include "access/xlogutils.h"
static void _bt_restscan(IndexScanDesc scan);
1998-07-30 07:05:05 +02:00
/*
* btbuild() -- build a new btree index.
*
* We use a global variable to record the fact that we're creating
* a new index. This is used to avoid high-concurrency locking,
* since the index won't be visible until this transaction commits
* and since building is guaranteed to be single-threaded.
*/
Datum
btbuild(PG_FUNCTION_ARGS)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
Node *oldPred = (Node *) PG_GETARG_POINTER(3);
#ifdef NOT_USED
IndexStrategy istrat = (IndexStrategy) PG_GETARG_POINTER(4);
#endif
HeapScanDesc hscan;
HeapTuple htup;
IndexTuple itup;
TupleDesc htupdesc,
itupdesc;
Datum attdata[INDEX_MAX_KEYS];
char nulls[INDEX_MAX_KEYS];
int nhtups,
nitups;
Node *pred = indexInfo->ii_Predicate;
#ifndef OMIT_PARTIAL_INDEX
TupleTable tupleTable;
TupleTableSlot *slot;
#endif
ExprContext *econtext;
InsertIndexResult res = NULL;
BTSpool *spool = NULL;
BTItem btitem;
bool usefast;
Snapshot snapshot;
TransactionId XmaxRecent;
/*
* spool2 is needed only when the index is an unique index.
* Dead tuples are put into spool2 instead of spool in
* order to avoid uniqueness check.
*/
BTSpool *spool2 = NULL;
bool tupleIsAlive;
int dead_count;
/* note that this is a new btree */
BuildingBtree = true;
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
1997-02-12 06:04:52 +01:00
/*
* bootstrap processing does something strange, so don't use
* sort/build for initial catalog indices. at some point i need to
* look harder at this. (there is some kind of incremental processing
* going on there.) -- pma 08/29/95
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
1997-02-12 06:04:52 +01:00
*/
usefast = (FastBuild && IsNormalProcessingMode());
#ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats)
ResetUsage();
#endif /* BTREE_BUILD_STATS */
/* initialize the btree index metadata page (if this is a new index) */
if (oldPred == NULL)
_bt_metapinit(index);
/* get tuple descriptors for heap and index relations */
1998-09-01 05:29:17 +02:00
htupdesc = RelationGetDescr(heap);
itupdesc = RelationGetDescr(index);
/*
* If this is a predicate (partial) index, we will need to evaluate
* the predicate using ExecQual, which requires the current tuple to
* be in a slot of a TupleTable. In addition, ExecQual must have an
* ExprContext referring to that slot. Here, we initialize dummy
* TupleTable and ExprContext objects for this purpose. --Nels, Feb 92
*
* We construct the ExprContext anyway since we need a per-tuple
* temporary memory context for function evaluation -- tgl July 00
*/
#ifndef OMIT_PARTIAL_INDEX
if (pred != NULL || oldPred != NULL)
{
tupleTable = ExecCreateTupleTable(1);
slot = ExecAllocTableSlot(tupleTable);
ExecSetSlotDescriptor(slot, htupdesc);
/*
* we never want to use sort/build if we are extending an existing
* partial index -- it works by inserting the newly-qualifying
* tuples into the existing index. (sort/build would overwrite the
* existing index with one consisting of the newly-qualifying
* tuples.)
*/
usefast = false;
}
else
{
tupleTable = NULL;
slot = NULL;
}
econtext = MakeExprContext(slot, TransactionCommandContext);
#else
econtext = MakeExprContext(NULL, TransactionCommandContext);
#endif /* OMIT_PARTIAL_INDEX */
/* build the index */
nhtups = nitups = 0;
if (usefast)
{
spool = _bt_spoolinit(index, indexInfo->ii_Unique);
/*
* Different from spool,the uniqueness isn't checked
* for spool2.
*/
if (indexInfo->ii_Unique)
spool2 = _bt_spoolinit(index, false);
}
/* start a heap scan */
dead_count = 0;
snapshot = (IsBootstrapProcessingMode() ? SnapshotNow : SnapshotAny);
hscan = heap_beginscan(heap, 0, snapshot, 0, (ScanKey) NULL);
XmaxRecent = 0;
if (snapshot == SnapshotAny)
GetXmaxRecent(&XmaxRecent);
while (HeapTupleIsValid(htup = heap_getnext(hscan, 0)))
{
if (snapshot == SnapshotAny)
{
tupleIsAlive = HeapTupleSatisfiesNow(htup->t_data);
if (!tupleIsAlive)
{
if ((htup->t_data->t_infomask & HEAP_XMIN_INVALID) != 0)
continue;
if (htup->t_data->t_infomask & HEAP_XMAX_COMMITTED &&
htup->t_data->t_xmax < XmaxRecent)
continue;
}
}
else
tupleIsAlive = true;
MemoryContextReset(econtext->ecxt_per_tuple_memory);
nhtups++;
#ifndef OMIT_PARTIAL_INDEX
/*
* If oldPred != NULL, this is an EXTEND INDEX command, so skip
* this tuple if it was already in the existing partial index
*/
if (oldPred != NULL)
{
slot->val = htup;
if (ExecQual((List *) oldPred, econtext, false))
{
nitups++;
continue;
}
}
/*
* Skip this tuple if it doesn't satisfy the partial-index
* predicate
*/
if (pred != NULL)
{
slot->val = htup;
if (!ExecQual((List *) pred, econtext, false))
continue;
}
#endif /* OMIT_PARTIAL_INDEX */
nitups++;
/*
* For the current heap tuple, extract all the attributes we use
* in this index, and note which are null.
*/
FormIndexDatum(indexInfo,
htup,
htupdesc,
econtext->ecxt_per_tuple_memory,
attdata,
nulls);
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(itupdesc, attdata, nulls);
/*
* If the single index key is null, we don't insert it into the
* index. Btrees support scans on <, <=, =, >=, and >. Relational
* algebra says that A op B (where op is one of the operators
* above) returns null if either A or B is null. This means that
* no qualification used in an index scan could ever return true
* on a null attribute. It also means that indices can't be used
* by ISNULL or NOTNULL scans, but that's an artifact of the
* strategy map architecture chosen in 1986, not of the way nulls
* are handled here.
*/
/*
* New comments: NULLs handling. While we can't do NULL
* comparison, we can follow simple rule for ordering items on
* btree pages - NULLs greater NOT_NULLs and NULL = NULL is TRUE.
* Sure, it's just rule for placing/finding items and no more -
* keytest'll return FALSE for a = 5 for items having 'a' isNULL.
* Look at _bt_compare for how it works.
* - vadim 03/23/97
*
* if (itup->t_info & INDEX_NULL_MASK) { pfree(itup); continue; }
*/
1998-11-27 20:52:36 +01:00
itup->t_tid = htup->t_self;
btitem = _bt_formitem(itup);
/*
* if we are doing bottom-up btree build, we insert the index into
* a spool file for subsequent processing. otherwise, we insert
* into the btree.
*/
if (usefast)
{
if (tupleIsAlive || !spool2)
_bt_spool(btitem, spool);
else /* dead tuples are put into spool2 */
{
dead_count++;
_bt_spool(btitem, spool2);
}
}
else
res = _bt_doinsert(index, btitem, indexInfo->ii_Unique, heap);
pfree(btitem);
pfree(itup);
if (res)
pfree(res);
}
/* okay, all heap tuples are indexed */
heap_endscan(hscan);
if (spool2 && !dead_count) /* spool2 was found to be unnecessary */
{
_bt_spooldestroy(spool2);
spool2 = NULL;
}
#ifndef OMIT_PARTIAL_INDEX
if (pred != NULL || oldPred != NULL)
{
ExecDropTupleTable(tupleTable, true);
}
#endif /* OMIT_PARTIAL_INDEX */
FreeExprContext(econtext);
/*
* if we are doing bottom-up btree build, finish the build by (1)
* completing the sort of the spool file, (2) inserting the sorted
* tuples into btree pages and (3) building the upper levels.
*/
if (usefast)
{
_bt_leafbuild(spool, spool2);
_bt_spooldestroy(spool);
if (spool2)
_bt_spooldestroy(spool2);
}
#ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats)
{
fprintf(stderr, "BTREE BUILD STATS\n");
ShowUsage();
ResetUsage();
}
#endif /* BTREE_BUILD_STATS */
/*
* Since we just counted the tuples in the heap, we update its stats
* in pg_class to guarantee that the planner takes advantage of the
* index we just created. But, only update statistics during normal
* index definitions, not for indices on system catalogs created
* during bootstrap processing. We must close the relations before
* updating statistics to guarantee that the relcache entries are
* flushed when we increment the command counter in UpdateStats(). But
* we do not release any locks on the relations; those will be held
* until end of transaction.
*/
if (IsNormalProcessingMode())
{
Oid hrelid = RelationGetRelid(heap);
Oid irelid = RelationGetRelid(index);
heap_close(heap, NoLock);
index_close(index);
UpdateStats(hrelid, nhtups);
UpdateStats(irelid, nitups);
if (oldPred != NULL)
{
if (nitups == nhtups)
pred = NULL;
UpdateIndexPredicate(irelid, oldPred, pred);
}
}
/* all done */
BuildingBtree = false;
PG_RETURN_VOID();
}
/*
* btinsert() -- insert an index tuple into a btree.
*
* Descend the tree recursively, find the appropriate location for our
* new tuple, put it there, set its unique OID as appropriate, and
* return an InsertIndexResult to the caller.
*/
Datum
btinsert(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
Datum *datum = (Datum *) PG_GETARG_POINTER(1);
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
InsertIndexResult res;
BTItem btitem;
IndexTuple itup;
/* generate an index tuple */
1998-09-01 05:29:17 +02:00
itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
itup->t_tid = *ht_ctid;
btitem = _bt_formitem(itup);
res = _bt_doinsert(rel, btitem, rel->rd_uniqueindex, heapRel);
pfree(btitem);
pfree(itup);
PG_RETURN_POINTER(res);
}
/*
* btgettuple() -- Get the next tuple in the scan.
*/
Datum
btgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
RetrieveIndexResult res;
/*
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call a
* routine to get the first item in the scan.
*/
if (ItemPointerIsValid(&(scan->currentItemData)))
1998-07-30 07:05:05 +02:00
{
/*
1999-05-25 18:15:34 +02:00
* Restore scan position using heap TID returned by previous call
* to btgettuple(). _bt_restscan() re-grabs the read lock on
* the buffer, too.
1998-07-30 07:05:05 +02:00
*/
_bt_restscan(scan);
res = _bt_next(scan, dir);
1998-07-30 07:05:05 +02:00
}
else
res = _bt_first(scan, dir);
/*
* Save heap TID to use it in _bt_restscan. Then release the read
* lock on the buffer so that we aren't blocking other backends.
* NOTE: we do keep the pin on the buffer!
*/
1998-07-30 07:05:05 +02:00
if (res)
{
((BTScanOpaque) scan->opaque)->curHeapIptr = res->heap_iptr;
LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
BUFFER_LOCK_UNLOCK);
}
PG_RETURN_POINTER(res);
}
/*
* btbeginscan() -- start a scan on a btree index
*/
Datum
btbeginscan(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
bool fromEnd = PG_GETARG_BOOL(1);
uint16 keysz = PG_GETARG_UINT16(2);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(3);
IndexScanDesc scan;
/* get the scan */
scan = RelationGetIndexScan(rel, fromEnd, keysz, scankey);
/* register scan in case we change pages it's using */
_bt_regscan(scan);
PG_RETURN_POINTER(scan);
}
/*
* btrescan() -- rescan an index relation
*/
Datum
btrescan(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
#ifdef NOT_USED /* XXX surely it's wrong to ignore this? */
bool fromEnd = PG_GETARG_BOOL(1);
#endif
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
if (so == NULL) /* if called from btbeginscan */
{
so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
so->keyData = (ScanKey) NULL;
if (scan->numberOfKeys > 0)
so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
scan->opaque = so;
scan->flags = 0x0;
}
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/*
* Reset the scan keys. Note that keys ordering stuff moved to
* _bt_first. - vadim 05/05/97
*/
so->numberOfKeys = scan->numberOfKeys;
if (scan->numberOfKeys > 0)
{
memmove(scan->keyData,
scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
memmove(so->keyData,
scankey,
so->numberOfKeys * sizeof(ScanKeyData));
}
PG_RETURN_VOID();
}
void
btmovescan(IndexScanDesc scan, Datum v)
{
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
so->keyData[0].sk_argument = v;
}
/*
* btendscan() -- close down a scan
*/
Datum
btendscan(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
if (BufferIsValid(so->btso_curbuf))
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
if (BufferIsValid(so->btso_mrkbuf))
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (so->keyData != (ScanKey) NULL)
pfree(so->keyData);
pfree(so);
_bt_dropscan(scan);
PG_RETURN_VOID();
}
/*
* btmarkpos() -- save current scan position
*/
Datum
btmarkpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on current buffer for assignment to mark buffer */
if (ItemPointerIsValid(&(scan->currentItemData)))
{
so->btso_mrkbuf = ReadBuffer(scan->relation,
BufferGetBlockNumber(so->btso_curbuf));
scan->currentMarkData = scan->currentItemData;
1998-07-30 07:05:05 +02:00
so->mrkHeapIptr = so->curHeapIptr;
}
PG_RETURN_VOID();
}
/*
* btrestrpos() -- restore scan to last saved position
*/
Datum
btrestrpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on marked buffer */
if (ItemPointerIsValid(&(scan->currentMarkData)))
{
so->btso_curbuf = ReadBuffer(scan->relation,
BufferGetBlockNumber(so->btso_mrkbuf));
scan->currentItemData = scan->currentMarkData;
1998-07-30 07:05:05 +02:00
so->curHeapIptr = so->mrkHeapIptr;
}
PG_RETURN_VOID();
}
/* stubs */
Datum
btdelete(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
ItemPointer tid = (ItemPointer) PG_GETARG_POINTER(1);
/* adjust any active scans that will be affected by this deletion */
_bt_adjscans(rel, tid);
/* delete the data from the page */
_bt_pagedel(rel, tid);
PG_RETURN_VOID();
}
1998-07-30 07:05:05 +02:00
/*
* Restore scan position when btgettuple is called to continue a scan.
*/
1998-07-30 07:05:05 +02:00
static void
_bt_restscan(IndexScanDesc scan)
{
Relation rel = scan->relation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Buffer buf = so->btso_curbuf;
Page page;
ItemPointer current = &(scan->currentItemData);
OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
maxoff;
BTPageOpaque opaque;
ItemPointerData target = so->curHeapIptr;
BTItem item;
BlockNumber blkno;
1998-07-30 07:05:05 +02:00
/*
* Get back the read lock we were holding on the buffer.
* (We still have a reference-count pin on it, though.)
*/
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
1999-05-25 18:15:34 +02:00
* We use this as flag when first index tuple on page is deleted but
* we do not move left (this would slowdown vacuum) - so we set
* current->ip_posid before first index tuple on the current page
* (_bt_step will move it right)...
*/
if (!ItemPointerIsValid(&target))
{
ItemPointerSetOffsetNumber(current,
OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
return;
}
/*
* The item we were on may have moved right due to insertions.
* Find it again.
*/
for (;;)
1998-07-30 07:05:05 +02:00
{
/* Check for item on this page */
for (;
1998-07-30 07:05:05 +02:00
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
if (item->bti_itup.t_tid.ip_blkid.bi_hi ==
target.ip_blkid.bi_hi &&
item->bti_itup.t_tid.ip_blkid.bi_lo ==
target.ip_blkid.bi_lo &&
1998-07-30 07:05:05 +02:00
item->bti_itup.t_tid.ip_posid == target.ip_posid)
{
current->ip_posid = offnum;
return;
}
}
/*
* By here, the item we're looking for moved right at least one page
*/
1998-07-30 07:05:05 +02:00
if (P_RIGHTMOST(opaque))
elog(FATAL, "_bt_restscan: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
1998-07-30 07:05:05 +02:00
blkno = opaque->btpo_next;
_bt_relbuf(rel, buf, BT_READ);
buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
offnum = P_FIRSTDATAKEY(opaque);
ItemPointerSet(current, blkno, offnum);
so->btso_curbuf = buf;
1998-07-30 07:05:05 +02:00
}
}
2000-10-13 04:03:02 +02:00
2000-10-21 17:43:36 +02:00
static void
_bt_restore_page(Page page, char *from, int len)
2000-10-13 04:03:02 +02:00
{
BTItemData btdata;
Size itemsz;
char *end = from + len;
2000-10-21 17:43:36 +02:00
for ( ; from < end; )
2000-10-13 04:03:02 +02:00
{
memcpy(&btdata, from, sizeof(BTItemData));
2000-10-21 17:43:36 +02:00
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) from, itemsz,
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(STOP, "_bt_restore_page: can't add item to page");
from += itemsz;
2000-10-21 17:43:36 +02:00
}
}
static void
btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete *xlrec;
Relation reln;
Buffer buffer;
Page page;
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2000-10-21 17:43:36 +02:00
return;
xlrec = (xl_btree_delete*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_delete_redo: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_delete_redo: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
2000-10-21 17:43:36 +02:00
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-21 17:43:36 +02:00
return;
}
static void
btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec;
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
return;
2000-10-21 17:43:36 +02:00
xlrec = (xl_btree_insert*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
2000-10-21 17:43:36 +02:00
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un");
2000-10-21 17:43:36 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2000-10-21 17:43:36 +02:00
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
if (PageAddItem(page, (Item)((char*)xlrec + SizeOfBtreeInsert),
record->xl_len - SizeOfBtreeInsert,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
LP_USED) == InvalidOffsetNumber)
2000-10-21 17:43:36 +02:00
elog(STOP, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-21 17:43:36 +02:00
}
else
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_insert_undo: bad page LSN");
if (! P_ISLEAF(pageop))
{
UnlockAndReleaseBuffer(buffer);
return;
}
elog(STOP, "btree_insert_undo: unimplemented");
2000-10-21 17:43:36 +02:00
}
return;
}
static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec = (xl_btree_split*) XLogRecGetData(record);
2000-10-21 17:43:36 +02:00
Relation reln;
BlockNumber blkno;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
bool isleaf = (record->xl_info & XLOG_BTREE_LEAF);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2000-10-13 04:03:02 +02:00
/* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
2000-10-21 17:43:36 +02:00
BlockIdGetBlockNumber(&(xlrec->otherblk));
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost left sibling", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_undo: uninitialized left sibling");
2000-10-13 04:03:02 +02:00
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
pageop->btpo_prev = BlockIdGetBlockNumber(&(xlrec->leftblk));
if (onleft)
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
2000-10-13 04:03:02 +02:00
else
pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
2000-10-13 04:03:02 +02:00
_bt_restore_page(page, (char*)xlrec + SizeOfBtreeSplit, xlrec->leftlen);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
else /* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad left sibling LSN");
elog(STOP, "btree_split_undo: unimplemented");
2000-10-13 04:03:02 +02:00
}
/* Right (new) sibling */
2000-10-21 17:43:36 +02:00
blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
2000-10-13 04:03:02 +02:00
ItemPointerGetBlockNumber(&(xlrec->target.tid));
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost right sibling", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_undo: uninitialized right sibling");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2000-10-13 04:03:02 +02:00
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
pageop->btpo_prev = (onleft) ?
ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(&(xlrec->otherblk));
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
2000-10-13 04:03:02 +02:00
_bt_restore_page(page,
(char*)xlrec + SizeOfBtreeSplit + xlrec->leftlen,
record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
else /* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad right sibling LSN");
elog(STOP, "btree_split_undo: unimplemented");
2000-10-13 04:03:02 +02:00
}
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
return;
2000-10-13 04:03:02 +02:00
/* Right (next) page */
2000-10-21 17:43:36 +02:00
blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
2000-10-29 19:33:41 +01:00
if (blkno == P_NONE)
return;
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_redo: lost next right page");
2000-10-13 04:03:02 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_redo: uninitialized next right page");
2000-10-13 04:03:02 +02:00
if (XLByteLE(lsn, PageGetLSN(page)))
2000-10-13 04:03:02 +02:00
{
UnlockAndReleaseBuffer(buffer);
return;
2000-10-13 04:03:02 +02:00
}
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = (onleft) ?
BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
static void
btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
xl_btree_newroot *xlrec = (xl_btree_newroot*) XLogRecGetData(record);
2000-10-21 17:43:36 +02:00
Relation reln;
2000-10-13 04:03:02 +02:00
Buffer buffer;
Page page;
BTPageOpaque pageop;
2000-10-13 04:03:02 +02:00
Buffer metabuf;
Page metapg;
BTMetaPageData md;
2000-10-13 04:03:02 +02:00
if (!redo)
return;
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_newroot_redo: no root page");
metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
if (!BufferIsValid(buffer))
elog(STOP, "btree_newroot_redo: no metapage");
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2000-10-13 04:03:02 +02:00
pageop->btpo_flags |= BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo_parent = BTREE_METAPAGE;
2000-10-13 04:03:02 +02:00
if (record->xl_info & XLOG_BTREE_LEAF)
pageop->btpo_flags |= BTP_LEAF;
2000-10-13 04:03:02 +02:00
if (record->xl_len > SizeOfBtreeNewroot)
_bt_restore_page(page,
(char*)xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md.btm_magic = BTREE_MAGIC;
md.btm_version = BTREE_VERSION;
md.btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
md.btm_level = xlrec->level;
memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
PageSetLSN(metapg, lsn);
PageSetSUI(metapg, ThisStartUpID);
UnlockAndWriteBuffer(metabuf);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_redo(XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2000-10-21 17:43:36 +02:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(true, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(true, false, lsn, record); /* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(true, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(STOP, "btree_redo: unknown op code %u", info);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_undo(XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2000-10-21 17:43:36 +02:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
2000-10-13 04:03:02 +02:00
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(false, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(false, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(false, false, lsn, record);/* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(false, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
else
elog(STOP, "btree_undo: unknown op code %u", info);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
static void
out_target(char *buf, xl_btreetid *target)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
target->node.tblNode, target->node.relNode,
ItemPointerGetBlockNumber(&(target->tid)),
ItemPointerGetOffsetNumber(&(target->tid)));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_desc(char *buf, uint8 xl_info, char* rec)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
uint8 info = xl_info & ~XLR_INFO_MASK;
2000-10-13 14:05:22 +02:00
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_INSERT)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
xl_btree_insert *xlrec = (xl_btree_insert*) rec;
strcat(buf, "insert: ");
out_target(buf, &(xlrec->target));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_DELETE)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
xl_btree_delete *xlrec = (xl_btree_delete*) rec;
strcat(buf, "delete: ");
out_target(buf, &(xlrec->target));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
xl_btree_split *xlrec = (xl_btree_split*) rec;
sprintf(buf + strlen(buf), "split(%s): ",
(info == XLOG_BTREE_SPLIT) ? "right" : "left");
out_target(buf, &(xlrec->target));
sprintf(buf + strlen(buf), "; oth %u; rgh %u",
BlockIdGetBlockNumber(&xlrec->otherblk),
BlockIdGetBlockNumber(&xlrec->rightblk));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
xl_btree_newroot *xlrec = (xl_btree_newroot*) rec;
sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
xlrec->node.tblNode, xlrec->node.relNode,
BlockIdGetBlockNumber(&xlrec->rootblk));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else
strcat(buf, "UNKNOWN");
2000-10-13 14:05:22 +02:00
}