postgresql/src/backend/access/nbtree/nbtree.c

1137 lines
30 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* nbtree.c
* Implementation of Lehman and Yao's btree management algorithm for
* Postgres.
*
* NOTES
* This file contains only the public interface routines.
*
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.84 2001/11/05 17:46:24 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
1996-11-05 11:35:38 +01:00
#include "access/genam.h"
#include "access/heapam.h"
1999-07-16 07:00:38 +02:00
#include "access/nbtree.h"
#include "catalog/index.h"
1999-07-16 07:00:38 +02:00
#include "executor/executor.h"
#include "miscadmin.h"
#include "storage/sinval.h"
#include "access/xlogutils.h"
2001-03-22 05:01:46 +01:00
/* Working state for btbuild and its callback */
typedef struct
{
bool usefast;
bool isUnique;
bool haveDead;
Relation heapRel;
BTSpool *spool;
/*
* spool2 is needed only when the index is an unique index. Dead
* tuples are put into spool2 instead of spool in order to avoid
* uniqueness check.
*/
BTSpool *spool2;
double indtuples;
} BTBuildState;
bool BuildingBtree = false; /* see comment in btbuild() */
bool FastBuild = true; /* use SORT instead of insertion build */
/*
* TEMPORARY FLAG FOR TESTING NEW FIX TREE
* CODE WITHOUT AFFECTING ANYONE ELSE
*/
bool FixBTree = true;
2000-10-21 17:43:36 +02:00
static void _bt_restscan(IndexScanDesc scan);
static void btbuildCallback(Relation index,
HeapTuple htup,
Datum *attdata,
char *nulls,
bool tupleIsAlive,
void *state);
/*
* AtEOXact_nbtree() --- clean up nbtree subsystem at xact abort or commit.
*/
void
AtEOXact_nbtree(void)
{
/*
* Note: these actions should only be necessary during xact abort; but
* they can't hurt during a commit.
*/
/* If we were building a btree, we ain't anymore. */
BuildingBtree = false;
}
1998-07-30 07:05:05 +02:00
/*
* btbuild() -- build a new btree index.
*
* We use a global variable to record the fact that we're creating
* a new index. This is used to avoid high-concurrency locking,
* since the index won't be visible until this transaction commits
* and since building is guaranteed to be single-threaded.
*/
Datum
btbuild(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
double reltuples;
BTBuildState buildstate;
2001-03-22 05:01:46 +01:00
/* set flag to disable locking */
BuildingBtree = true;
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
1997-02-12 06:04:52 +01:00
/*
* bootstrap processing does something strange, so don't use
* sort/build for initial catalog indices. at some point i need to
* look harder at this. (there is some kind of incremental processing
* going on there.) -- pma 08/29/95
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
1997-02-12 06:04:52 +01:00
*/
buildstate.usefast = (FastBuild && IsNormalProcessingMode());
buildstate.isUnique = indexInfo->ii_Unique;
buildstate.haveDead = false;
buildstate.heapRel = heap;
buildstate.spool = NULL;
buildstate.spool2 = NULL;
buildstate.indtuples = 0;
#ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats)
ResetUsage();
#endif /* BTREE_BUILD_STATS */
/*
* We expect to be called exactly once for any index relation. If
* that's not the case, big trouble's what we have.
*/
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "%s already contains data",
RelationGetRelationName(index));
/* initialize the btree index metadata page */
_bt_metapinit(index);
if (buildstate.usefast)
{
buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique);
/*
* Different from spool, the uniqueness isn't checked for spool2.
2001-03-22 05:01:46 +01:00
*/
if (indexInfo->ii_Unique)
buildstate.spool2 = _bt_spoolinit(index, false);
}
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo,
btbuildCallback, (void *) &buildstate);
/* okay, all heap tuples are indexed */
if (buildstate.spool2 && !buildstate.haveDead)
{
/* spool2 turns out to be unnecessary */
_bt_spooldestroy(buildstate.spool2);
buildstate.spool2 = NULL;
}
/*
* if we are doing bottom-up btree build, finish the build by (1)
* completing the sort of the spool file, (2) inserting the sorted
* tuples into btree pages and (3) building the upper levels.
*/
if (buildstate.usefast)
{
_bt_leafbuild(buildstate.spool, buildstate.spool2);
_bt_spooldestroy(buildstate.spool);
if (buildstate.spool2)
_bt_spooldestroy(buildstate.spool2);
}
#ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats)
{
fprintf(stderr, "BTREE BUILD STATS\n");
ShowUsage();
ResetUsage();
}
#endif /* BTREE_BUILD_STATS */
/* all done */
BuildingBtree = false;
/*
* Since we just counted the tuples in the heap, we update its stats
* in pg_class to guarantee that the planner takes advantage of the
* index we just created. But, only update statistics during normal
* index definitions, not for indices on system catalogs created
* during bootstrap processing. We must close the relations before
* updating statistics to guarantee that the relcache entries are
* flushed when we increment the command counter in UpdateStats(). But
* we do not release any locks on the relations; those will be held
* until end of transaction.
*/
if (IsNormalProcessingMode())
{
Oid hrelid = RelationGetRelid(heap);
Oid irelid = RelationGetRelid(index);
heap_close(heap, NoLock);
index_close(index);
UpdateStats(hrelid, reltuples);
UpdateStats(irelid, buildstate.indtuples);
}
PG_RETURN_VOID();
}
/*
* Per-tuple callback from IndexBuildHeapScan
*/
static void
btbuildCallback(Relation index,
HeapTuple htup,
Datum *attdata,
char *nulls,
bool tupleIsAlive,
void *state)
{
BTBuildState *buildstate = (BTBuildState *) state;
IndexTuple itup;
BTItem btitem;
InsertIndexResult res;
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
itup->t_tid = htup->t_self;
btitem = _bt_formitem(itup);
/*
* if we are doing bottom-up btree build, we insert the index into a
* spool file for subsequent processing. otherwise, we insert into
* the btree.
*/
if (buildstate->usefast)
{
if (tupleIsAlive || buildstate->spool2 == NULL)
_bt_spool(btitem, buildstate->spool);
else
{
/* dead tuples are put into spool2 */
buildstate->haveDead = true;
_bt_spool(btitem, buildstate->spool2);
}
}
else
{
res = _bt_doinsert(index, btitem,
buildstate->isUnique, buildstate->heapRel);
if (res)
pfree(res);
}
buildstate->indtuples += 1;
pfree(btitem);
pfree(itup);
}
/*
* btinsert() -- insert an index tuple into a btree.
*
* Descend the tree recursively, find the appropriate location for our
* new tuple, put it there, set its unique OID as appropriate, and
* return an InsertIndexResult to the caller.
*/
Datum
btinsert(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
Relation rel = (Relation) PG_GETARG_POINTER(0);
Datum *datum = (Datum *) PG_GETARG_POINTER(1);
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
InsertIndexResult res;
BTItem btitem;
IndexTuple itup;
/* generate an index tuple */
1998-09-01 05:29:17 +02:00
itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
itup->t_tid = *ht_ctid;
btitem = _bt_formitem(itup);
res = _bt_doinsert(rel, btitem, rel->rd_uniqueindex, heapRel);
pfree(btitem);
pfree(itup);
PG_RETURN_POINTER(res);
}
/*
* btgettuple() -- Get the next tuple in the scan.
*/
Datum
btgettuple(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
RetrieveIndexResult res;
/*
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call a
* routine to get the first item in the scan.
*/
if (ItemPointerIsValid(&(scan->currentItemData)))
1998-07-30 07:05:05 +02:00
{
/*
1999-05-25 18:15:34 +02:00
* Restore scan position using heap TID returned by previous call
2001-03-22 05:01:46 +01:00
* to btgettuple(). _bt_restscan() re-grabs the read lock on the
* buffer, too.
1998-07-30 07:05:05 +02:00
*/
_bt_restscan(scan);
res = _bt_next(scan, dir);
1998-07-30 07:05:05 +02:00
}
else
res = _bt_first(scan, dir);
/*
* Save heap TID to use it in _bt_restscan. Then release the read
* lock on the buffer so that we aren't blocking other backends.
*
* NOTE: we do keep the pin on the buffer! This is essential to ensure
* that someone else doesn't delete the index entry we are stopped on.
*/
1998-07-30 07:05:05 +02:00
if (res)
{
((BTScanOpaque) scan->opaque)->curHeapIptr = res->heap_iptr;
LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
BUFFER_LOCK_UNLOCK);
}
PG_RETURN_POINTER(res);
}
/*
* btbeginscan() -- start a scan on a btree index
*/
Datum
btbeginscan(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
bool fromEnd = PG_GETARG_BOOL(1);
uint16 keysz = PG_GETARG_UINT16(2);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(3);
IndexScanDesc scan;
/* get the scan */
scan = RelationGetIndexScan(rel, fromEnd, keysz, scankey);
PG_RETURN_POINTER(scan);
}
/*
* btrescan() -- rescan an index relation
*/
Datum
btrescan(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
#ifdef NOT_USED /* XXX surely it's wrong to ignore this? */
2001-03-22 05:01:46 +01:00
bool fromEnd = PG_GETARG_BOOL(1);
#endif
2001-03-22 05:01:46 +01:00
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
if (so == NULL) /* if called from btbeginscan */
{
so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
so->keyData = (ScanKey) NULL;
if (scan->numberOfKeys > 0)
so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
scan->opaque = so;
scan->flags = 0x0;
}
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/*
* Reset the scan keys. Note that keys ordering stuff moved to
* _bt_first. - vadim 05/05/97
*/
so->numberOfKeys = scan->numberOfKeys;
if (scan->numberOfKeys > 0)
{
memmove(scan->keyData,
scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
memmove(so->keyData,
scankey,
so->numberOfKeys * sizeof(ScanKeyData));
}
PG_RETURN_VOID();
}
void
btmovescan(IndexScanDesc scan, Datum v)
{
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
so->keyData[0].sk_argument = v;
}
/*
* btendscan() -- close down a scan
*/
Datum
btendscan(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
if (BufferIsValid(so->btso_curbuf))
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
if (BufferIsValid(so->btso_mrkbuf))
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (so->keyData != (ScanKey) NULL)
pfree(so->keyData);
pfree(so);
PG_RETURN_VOID();
}
/*
* btmarkpos() -- save current scan position
*/
Datum
btmarkpos(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on current buffer for assignment to mark buffer */
if (ItemPointerIsValid(&(scan->currentItemData)))
{
so->btso_mrkbuf = ReadBuffer(scan->relation,
BufferGetBlockNumber(so->btso_curbuf));
scan->currentMarkData = scan->currentItemData;
1998-07-30 07:05:05 +02:00
so->mrkHeapIptr = so->curHeapIptr;
}
PG_RETURN_VOID();
}
/*
* btrestrpos() -- restore scan to last saved position
*/
Datum
btrestrpos(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on marked buffer */
if (ItemPointerIsValid(&(scan->currentMarkData)))
{
so->btso_curbuf = ReadBuffer(scan->relation,
BufferGetBlockNumber(so->btso_mrkbuf));
scan->currentItemData = scan->currentMarkData;
1998-07-30 07:05:05 +02:00
so->curHeapIptr = so->mrkHeapIptr;
}
PG_RETURN_VOID();
}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
btbulkdelete(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
Relation rel = (Relation) PG_GETARG_POINTER(0);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
void *callback_state = (void *) PG_GETARG_POINTER(2);
IndexBulkDeleteResult *result;
BlockNumber num_pages;
double tuples_removed;
double num_index_tuples;
RetrieveIndexResult res;
IndexScanDesc scan;
BTScanOpaque so;
ItemPointer current;
tuples_removed = 0;
num_index_tuples = 0;
/*
* We use a standard IndexScanDesc scan object, but to speed up the
* loop, we skip most of the wrapper layers of index_getnext and
* instead call _bt_step directly. This implies holding buffer lock
* on a target page throughout the loop over the page's tuples.
* Initially, we have a read lock acquired by _bt_step when we stepped
* onto the page. If we find a tuple we need to delete, we trade in
* the read lock for an exclusive write lock; after that, we hold the
* write lock until we step off the page (fortunately, _bt_relbuf
* doesn't care which kind of lock it's releasing). This should
* minimize the amount of work needed per page.
*/
scan = index_beginscan(rel, false, 0, (ScanKey) NULL);
so = (BTScanOpaque) scan->opaque;
current = &(scan->currentItemData);
/* Use _bt_first to get started, then _bt_step to remaining tuples */
res = _bt_first(scan, ForwardScanDirection);
if (res != NULL)
{
Buffer buf;
BlockNumber lockedBlock = InvalidBlockNumber;
pfree(res);
/* we have the buffer pinned and locked */
buf = so->btso_curbuf;
Assert(BufferIsValid(buf));
do
{
Page page;
BlockNumber blkno;
OffsetNumber offnum;
BTItem btitem;
IndexTuple itup;
ItemPointer htup;
/* current is the next index tuple */
blkno = ItemPointerGetBlockNumber(current);
offnum = ItemPointerGetOffsetNumber(current);
page = BufferGetPage(buf);
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &btitem->bti_itup;
htup = &(itup->t_tid);
if (callback(htup, callback_state))
{
/*
* If this is first deletion on this page, trade in read
* lock for a really-exclusive write lock. Then, step
* back one and re-examine the item, because someone else
* might have inserted an item while we weren't holding
* the lock!
*/
if (blkno != lockedBlock)
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBufferForCleanup(buf);
lockedBlock = blkno;
}
else
{
/* Delete the item from the page */
_bt_itemdel(rel, buf, current);
/* Mark buffer dirty, but keep the lock and pin */
WriteNoReleaseBuffer(buf);
tuples_removed += 1;
}
/*
* We need to back up the scan one item so that the next
* cycle will re-examine the same offnum on this page.
*
* For now, just hack the current-item index. Will need to
* be smarter when deletion includes removal of empty
* index pages.
*/
current->ip_posid--;
}
else
num_index_tuples += 1;
} while (_bt_step(scan, &buf, ForwardScanDirection));
}
index_endscan(scan);
/* return statistics */
num_pages = RelationGetNumberOfBlocks(rel);
result = (IndexBulkDeleteResult *) palloc(sizeof(IndexBulkDeleteResult));
result->num_pages = num_pages;
result->tuples_removed = tuples_removed;
result->num_index_tuples = num_index_tuples;
PG_RETURN_POINTER(result);
}
1998-07-30 07:05:05 +02:00
/*
* Restore scan position when btgettuple is called to continue a scan.
*/
1998-07-30 07:05:05 +02:00
static void
_bt_restscan(IndexScanDesc scan)
{
Relation rel = scan->relation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Buffer buf = so->btso_curbuf;
Page page;
ItemPointer current = &(scan->currentItemData);
OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
maxoff;
BTPageOpaque opaque;
ItemPointerData target = so->curHeapIptr;
BTItem item;
BlockNumber blkno;
1998-07-30 07:05:05 +02:00
/*
2001-03-22 05:01:46 +01:00
* Get back the read lock we were holding on the buffer. (We still
* have a reference-count pin on it, so need not get that.)
*/
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
1999-05-25 18:15:34 +02:00
* We use this as flag when first index tuple on page is deleted but
* we do not move left (this would slowdown vacuum) - so we set
* current->ip_posid before first index tuple on the current page
* (_bt_step will move it right)...
*/
if (!ItemPointerIsValid(&target))
{
ItemPointerSetOffsetNumber(current,
2001-03-22 05:01:46 +01:00
OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
return;
}
/*
2001-03-22 05:01:46 +01:00
* The item we were on may have moved right due to insertions. Find it
* again.
*/
for (;;)
1998-07-30 07:05:05 +02:00
{
/* Check for item on this page */
for (;
1998-07-30 07:05:05 +02:00
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
if (item->bti_itup.t_tid.ip_blkid.bi_hi ==
target.ip_blkid.bi_hi &&
item->bti_itup.t_tid.ip_blkid.bi_lo ==
target.ip_blkid.bi_lo &&
1998-07-30 07:05:05 +02:00
item->bti_itup.t_tid.ip_posid == target.ip_posid)
{
current->ip_posid = offnum;
return;
}
}
/*
2001-03-22 05:01:46 +01:00
* By here, the item we're looking for moved right at least one
* page
*/
1998-07-30 07:05:05 +02:00
if (P_RIGHTMOST(opaque))
elog(FATAL, "_bt_restscan: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
1998-07-30 07:05:05 +02:00
blkno = opaque->btpo_next;
_bt_relbuf(rel, buf);
1998-07-30 07:05:05 +02:00
buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
offnum = P_FIRSTDATAKEY(opaque);
ItemPointerSet(current, blkno, offnum);
so->btso_curbuf = buf;
1998-07-30 07:05:05 +02:00
}
}
2000-10-13 04:03:02 +02:00
2000-10-21 17:43:36 +02:00
static void
_bt_restore_page(Page page, char *from, int len)
2000-10-13 04:03:02 +02:00
{
BTItemData btdata;
Size itemsz;
char *end = from + len;
2000-10-21 17:43:36 +02:00
2001-03-22 05:01:46 +01:00
for (; from < end;)
2000-10-13 04:03:02 +02:00
{
memcpy(&btdata, from, sizeof(BTItemData));
2000-10-21 17:43:36 +02:00
itemsz = IndexTupleDSize(btdata.bti_itup) +
2001-03-22 05:01:46 +01:00
(sizeof(BTItemData) - sizeof(IndexTupleData));
2000-10-21 17:43:36 +02:00
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) from, itemsz,
2001-03-22 05:01:46 +01:00
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(STOP, "_bt_restore_page: can't add item to page");
from += itemsz;
2000-10-21 17:43:36 +02:00
}
}
static void
btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
xl_btree_delete *xlrec;
Relation reln;
Buffer buffer;
Page page;
2000-10-21 17:43:36 +02:00
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2000-10-21 17:43:36 +02:00
return;
2001-03-22 05:01:46 +01:00
xlrec = (xl_btree_delete *) XLogRecGetData(record);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2001-03-22 05:01:46 +01:00
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2000-10-21 17:43:36 +02:00
if (!BufferIsValid(buffer))
elog(STOP, "btree_delete_redo: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_delete_redo: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
2000-10-21 17:43:36 +02:00
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-21 17:43:36 +02:00
return;
}
static void
btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
xl_btree_insert *xlrec;
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
2000-10-21 17:43:36 +02:00
if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
return;
2001-03-22 05:01:46 +01:00
xlrec = (xl_btree_insert *) XLogRecGetData(record);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2001-03-22 05:01:46 +01:00
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2000-10-21 17:43:36 +02:00
if (!BufferIsValid(buffer))
elog(STOP, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un");
2000-10-21 17:43:36 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2000-10-21 17:43:36 +02:00
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
2001-03-22 05:01:46 +01:00
if (PageAddItem(page, (Item) ((char *) xlrec + SizeOfBtreeInsert),
record->xl_len - SizeOfBtreeInsert,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
LP_USED) == InvalidOffsetNumber)
elog(STOP, "btree_insert_redo: failed to add item");
2000-10-21 17:43:36 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-21 17:43:36 +02:00
}
else
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_insert_undo: bad page LSN");
2001-03-22 05:01:46 +01:00
if (!P_ISLEAF(pageop))
2000-10-21 17:43:36 +02:00
{
UnlockAndReleaseBuffer(buffer);
return;
}
elog(STOP, "btree_insert_undo: unimplemented");
2000-10-21 17:43:36 +02:00
}
return;
}
static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
Relation reln;
BlockNumber blkno;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
bool isleaf = (record->xl_info & XLOG_BTREE_LEAF);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2000-10-13 04:03:02 +02:00
/* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
2001-03-22 05:01:46 +01:00
BlockIdGetBlockNumber(&(xlrec->otherblk));
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost left sibling", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_undo: uninitialized left sibling");
2000-10-13 04:03:02 +02:00
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
pageop->btpo_prev = BlockIdGetBlockNumber(&(xlrec->leftblk));
if (onleft)
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
2000-10-13 04:03:02 +02:00
else
pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
2000-10-13 04:03:02 +02:00
2001-03-22 05:01:46 +01:00
_bt_restore_page(page, (char *) xlrec + SizeOfBtreeSplit, xlrec->leftlen);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2001-03-22 05:01:46 +01:00
else
/* undo */
2000-10-13 04:03:02 +02:00
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad left sibling LSN");
elog(STOP, "btree_split_undo: unimplemented");
2000-10-13 04:03:02 +02:00
}
/* Right (new) sibling */
2001-03-22 05:01:46 +01:00
blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost right sibling", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_undo: uninitialized right sibling");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2000-10-13 04:03:02 +02:00
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
2001-03-22 05:01:46 +01:00
pageop->btpo_prev = (onleft) ?
ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(&(xlrec->otherblk));
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
2000-10-13 04:03:02 +02:00
_bt_restore_page(page,
2001-03-22 05:01:46 +01:00
(char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2001-03-22 05:01:46 +01:00
else
/* undo */
2000-10-13 04:03:02 +02:00
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad right sibling LSN");
elog(STOP, "btree_split_undo: unimplemented");
2000-10-13 04:03:02 +02:00
}
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
return;
2000-10-13 04:03:02 +02:00
/* Right (next) page */
2000-10-21 17:43:36 +02:00
blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
2000-10-29 19:33:41 +01:00
if (blkno == P_NONE)
return;
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_redo: lost next right page");
2000-10-13 04:03:02 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_redo: uninitialized next right page");
2000-10-13 04:03:02 +02:00
if (XLByteLE(lsn, PageGetLSN(page)))
2000-10-13 04:03:02 +02:00
{
UnlockAndReleaseBuffer(buffer);
return;
2000-10-13 04:03:02 +02:00
}
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2001-03-22 05:01:46 +01:00
pageop->btpo_prev = (onleft) ?
BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
static void
btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
Buffer metabuf;
Page metapg;
BTMetaPageData md;
2000-10-13 04:03:02 +02:00
if (!redo)
return;
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_newroot_redo: no root page");
metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
if (!BufferIsValid(buffer))
elog(STOP, "btree_newroot_redo: no metapage");
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2000-10-13 04:03:02 +02:00
pageop->btpo_flags |= BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo_parent = BTREE_METAPAGE;
2000-10-13 04:03:02 +02:00
if (record->xl_info & XLOG_BTREE_LEAF)
pageop->btpo_flags |= BTP_LEAF;
2000-10-13 04:03:02 +02:00
if (record->xl_len > SizeOfBtreeNewroot)
_bt_restore_page(page,
2001-03-22 05:01:46 +01:00
(char *) xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md.btm_magic = BTREE_MAGIC;
md.btm_version = BTREE_VERSION;
md.btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
md.btm_level = xlrec->level;
memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
PageSetLSN(metapg, lsn);
PageSetSUI(metapg, ThisStartUpID);
UnlockAndWriteBuffer(metabuf);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_redo(XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2001-03-22 05:01:46 +01:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(true, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(true, false, lsn, record); /* new item on the right */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLEFT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(true, true, lsn, record); /* new item on the left */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(STOP, "btree_redo: unknown op code %u", info);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_undo(XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2001-03-22 05:01:46 +01:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
2000-10-13 04:03:02 +02:00
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(false, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(false, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(false, false, lsn, record); /* new item on the right */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLEFT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(false, true, lsn, record); /* new item on the left */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
else
elog(STOP, "btree_undo: unknown op code %u", info);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
static void
out_target(char *buf, xl_btreetid *target)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
2001-03-22 05:01:46 +01:00
target->node.tblNode, target->node.relNode,
ItemPointerGetBlockNumber(&(target->tid)),
ItemPointerGetOffsetNumber(&(target->tid)));
2000-10-13 14:05:22 +02:00
}
2001-03-22 05:01:46 +01:00
2000-10-21 17:43:36 +02:00
void
2001-03-22 05:01:46 +01:00
btree_desc(char *buf, uint8 xl_info, char *rec)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
uint8 info = xl_info & ~XLR_INFO_MASK;
2000-10-13 14:05:22 +02:00
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_INSERT)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_insert *xlrec = (xl_btree_insert *) rec;
2000-10-21 17:43:36 +02:00
strcat(buf, "insert: ");
out_target(buf, &(xlrec->target));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_DELETE)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_delete *xlrec = (xl_btree_delete *) rec;
2000-10-21 17:43:36 +02:00
strcat(buf, "delete: ");
out_target(buf, &(xlrec->target));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_split *xlrec = (xl_btree_split *) rec;
sprintf(buf + strlen(buf), "split(%s): ",
(info == XLOG_BTREE_SPLIT) ? "right" : "left");
2000-10-21 17:43:36 +02:00
out_target(buf, &(xlrec->target));
sprintf(buf + strlen(buf), "; oth %u; rgh %u",
2001-03-22 05:01:46 +01:00
BlockIdGetBlockNumber(&xlrec->otherblk),
BlockIdGetBlockNumber(&xlrec->rightblk));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
2000-10-21 17:43:36 +02:00
sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
2001-03-22 05:01:46 +01:00
xlrec->node.tblNode, xlrec->node.relNode,
BlockIdGetBlockNumber(&xlrec->rootblk));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else
strcat(buf, "UNKNOWN");
2000-10-13 14:05:22 +02:00
}