postgresql/src/backend/access/nbtree/nbtree.c

1213 lines
33 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* nbtree.c
* Implementation of Lehman and Yao's btree management algorithm for
* Postgres.
*
* NOTES
* This file contains only the public interface routines.
*
*
2002-06-20 22:29:54 +02:00
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.93 2002/10/20 20:47:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
1996-11-05 11:35:38 +01:00
#include "access/genam.h"
#include "access/heapam.h"
1999-07-16 07:00:38 +02:00
#include "access/nbtree.h"
#include "catalog/index.h"
1999-07-16 07:00:38 +02:00
#include "executor/executor.h"
#include "miscadmin.h"
#include "storage/sinval.h"
#include "access/xlogutils.h"
2001-03-22 05:01:46 +01:00
/* Working state for btbuild and its callback */
typedef struct
{
bool usefast;
bool isUnique;
bool haveDead;
Relation heapRel;
BTSpool *spool;
/*
* spool2 is needed only when the index is an unique index. Dead
* tuples are put into spool2 instead of spool in order to avoid
* uniqueness check.
*/
BTSpool *spool2;
double indtuples;
} BTBuildState;
bool BuildingBtree = false; /* see comment in btbuild() */
bool FastBuild = true; /* use SORT instead of insertion build */
/*
* TEMPORARY FLAG FOR TESTING NEW FIX TREE
* CODE WITHOUT AFFECTING ANYONE ELSE
*/
bool FixBTree = true;
2000-10-21 17:43:36 +02:00
static void _bt_restscan(IndexScanDesc scan);
static void btbuildCallback(Relation index,
HeapTuple htup,
Datum *attdata,
char *nulls,
bool tupleIsAlive,
void *state);
/*
* AtEOXact_nbtree() --- clean up nbtree subsystem at xact abort or commit.
*/
void
AtEOXact_nbtree(void)
{
/*
* Note: these actions should only be necessary during xact abort; but
* they can't hurt during a commit.
*/
/* If we were building a btree, we ain't anymore. */
BuildingBtree = false;
}
1998-07-30 07:05:05 +02:00
/*
* btbuild() -- build a new btree index.
*
* We use a global variable to record the fact that we're creating
* a new index. This is used to avoid high-concurrency locking,
* since the index won't be visible until this transaction commits
* and since building is guaranteed to be single-threaded.
*/
Datum
btbuild(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
double reltuples;
BTBuildState buildstate;
2001-03-22 05:01:46 +01:00
/* set flag to disable locking */
BuildingBtree = true;
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
1997-02-12 06:04:52 +01:00
/*
* bootstrap processing does something strange, so don't use
* sort/build for initial catalog indices. at some point i need to
* look harder at this. (there is some kind of incremental processing
* going on there.) -- pma 08/29/95
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
1997-02-12 06:04:52 +01:00
*/
buildstate.usefast = (FastBuild && IsNormalProcessingMode());
buildstate.isUnique = indexInfo->ii_Unique;
buildstate.haveDead = false;
buildstate.heapRel = heap;
buildstate.spool = NULL;
buildstate.spool2 = NULL;
buildstate.indtuples = 0;
#ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats)
ResetUsage();
#endif /* BTREE_BUILD_STATS */
/*
* We expect to be called exactly once for any index relation. If
* that's not the case, big trouble's what we have.
*/
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "%s already contains data",
RelationGetRelationName(index));
/* initialize the btree index metadata page */
_bt_metapinit(index);
if (buildstate.usefast)
{
buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique);
/*
* Different from spool, the uniqueness isn't checked for spool2.
2001-03-22 05:01:46 +01:00
*/
if (indexInfo->ii_Unique)
buildstate.spool2 = _bt_spoolinit(index, false);
}
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo,
btbuildCallback, (void *) &buildstate);
/* okay, all heap tuples are indexed */
if (buildstate.spool2 && !buildstate.haveDead)
{
/* spool2 turns out to be unnecessary */
_bt_spooldestroy(buildstate.spool2);
buildstate.spool2 = NULL;
}
/*
* if we are doing bottom-up btree build, finish the build by (1)
* completing the sort of the spool file, (2) inserting the sorted
* tuples into btree pages and (3) building the upper levels.
*/
if (buildstate.usefast)
{
_bt_leafbuild(buildstate.spool, buildstate.spool2);
_bt_spooldestroy(buildstate.spool);
if (buildstate.spool2)
_bt_spooldestroy(buildstate.spool2);
}
#ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats)
{
ShowUsage("BTREE BUILD STATS");
ResetUsage();
}
#endif /* BTREE_BUILD_STATS */
/* all done */
BuildingBtree = false;
/*
* Since we just counted the tuples in the heap, we update its stats
* in pg_class to guarantee that the planner takes advantage of the
* index we just created. But, only update statistics during normal
* index definitions, not for indices on system catalogs created
* during bootstrap processing. We must close the relations before
* updating statistics to guarantee that the relcache entries are
* flushed when we increment the command counter in UpdateStats(). But
* we do not release any locks on the relations; those will be held
* until end of transaction.
*/
if (IsNormalProcessingMode())
{
Oid hrelid = RelationGetRelid(heap);
Oid irelid = RelationGetRelid(index);
heap_close(heap, NoLock);
index_close(index);
UpdateStats(hrelid, reltuples);
UpdateStats(irelid, buildstate.indtuples);
}
PG_RETURN_VOID();
}
/*
* Per-tuple callback from IndexBuildHeapScan
*/
static void
btbuildCallback(Relation index,
HeapTuple htup,
Datum *attdata,
char *nulls,
bool tupleIsAlive,
void *state)
{
BTBuildState *buildstate = (BTBuildState *) state;
IndexTuple itup;
BTItem btitem;
InsertIndexResult res;
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
itup->t_tid = htup->t_self;
btitem = _bt_formitem(itup);
/*
* if we are doing bottom-up btree build, we insert the index into a
* spool file for subsequent processing. otherwise, we insert into
* the btree.
*/
if (buildstate->usefast)
{
if (tupleIsAlive || buildstate->spool2 == NULL)
_bt_spool(btitem, buildstate->spool);
else
{
/* dead tuples are put into spool2 */
buildstate->haveDead = true;
_bt_spool(btitem, buildstate->spool2);
}
}
else
{
res = _bt_doinsert(index, btitem,
buildstate->isUnique, buildstate->heapRel);
if (res)
pfree(res);
}
buildstate->indtuples += 1;
pfree(btitem);
pfree(itup);
}
/*
* btinsert() -- insert an index tuple into a btree.
*
* Descend the tree recursively, find the appropriate location for our
* new tuple, put it there, set its unique OID as appropriate, and
* return an InsertIndexResult to the caller.
*/
Datum
btinsert(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
Relation rel = (Relation) PG_GETARG_POINTER(0);
Datum *datum = (Datum *) PG_GETARG_POINTER(1);
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
InsertIndexResult res;
BTItem btitem;
IndexTuple itup;
/* generate an index tuple */
1998-09-01 05:29:17 +02:00
itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
itup->t_tid = *ht_ctid;
btitem = _bt_formitem(itup);
res = _bt_doinsert(rel, btitem, checkUnique, heapRel);
pfree(btitem);
pfree(itup);
PG_RETURN_POINTER(res);
}
/*
* btgettuple() -- Get the next tuple in the scan.
*/
Datum
btgettuple(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Page page;
OffsetNumber offnum;
bool res;
/*
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call a
* routine to get the first item in the scan.
*/
if (ItemPointerIsValid(&(scan->currentItemData)))
1998-07-30 07:05:05 +02:00
{
/*
1999-05-25 18:15:34 +02:00
* Restore scan position using heap TID returned by previous call
2001-03-22 05:01:46 +01:00
* to btgettuple(). _bt_restscan() re-grabs the read lock on the
* buffer, too.
1998-07-30 07:05:05 +02:00
*/
_bt_restscan(scan);
2002-09-04 22:31:48 +02:00
/*
* Check to see if we should kill the previously-fetched tuple.
*/
if (scan->kill_prior_tuple)
{
/*
2002-09-04 22:31:48 +02:00
* Yes, so mark it by setting the LP_DELETE bit in the item
* flags.
*/
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->btso_curbuf);
PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
2002-09-04 22:31:48 +02:00
/*
* Since this can be redone later if needed, it's treated the
2002-09-04 22:31:48 +02:00
* same as a commit-hint-bit status update for heap tuples: we
* mark the buffer dirty but don't make a WAL log entry.
*/
SetBufferCommitInfoNeedsSave(so->btso_curbuf);
}
2002-09-04 22:31:48 +02:00
/*
* Now continue the scan.
*/
res = _bt_next(scan, dir);
1998-07-30 07:05:05 +02:00
}
else
res = _bt_first(scan, dir);
/*
* Skip killed tuples if asked to.
*/
if (scan->ignore_killed_tuples)
{
while (res)
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->btso_curbuf);
if (!ItemIdDeleted(PageGetItemId(page, offnum)))
break;
res = _bt_next(scan, dir);
}
}
/*
* Save heap TID to use it in _bt_restscan. Then release the read
* lock on the buffer so that we aren't blocking other backends.
*
* NOTE: we do keep the pin on the buffer! This is essential to ensure
* that someone else doesn't delete the index entry we are stopped on.
*/
1998-07-30 07:05:05 +02:00
if (res)
{
((BTScanOpaque) scan->opaque)->curHeapIptr = scan->xs_ctup.t_self;
LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
BUFFER_LOCK_UNLOCK);
}
PG_RETURN_BOOL(res);
}
/*
* btbeginscan() -- start a scan on a btree index
*/
Datum
btbeginscan(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
int keysz = PG_GETARG_INT32(1);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2);
IndexScanDesc scan;
/* get the scan */
scan = RelationGetIndexScan(rel, keysz, scankey);
PG_RETURN_POINTER(scan);
}
/*
* btrescan() -- rescan an index relation
*/
Datum
btrescan(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
if (so == NULL) /* if called from btbeginscan */
{
so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(&(so->curHeapIptr));
ItemPointerSetInvalid(&(so->mrkHeapIptr));
if (scan->numberOfKeys > 0)
so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
else
so->keyData = (ScanKey) NULL;
scan->opaque = so;
}
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(&(so->curHeapIptr));
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(&(so->mrkHeapIptr));
ItemPointerSetInvalid(iptr);
}
/*
* Reset the scan keys. Note that keys ordering stuff moved to
* _bt_first. - vadim 05/05/97
*/
so->numberOfKeys = scan->numberOfKeys;
if (scan->numberOfKeys > 0)
{
memmove(scan->keyData,
scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
memmove(so->keyData,
scankey,
so->numberOfKeys * sizeof(ScanKeyData));
}
PG_RETURN_VOID();
}
void
btmovescan(IndexScanDesc scan, Datum v)
{
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
so->keyData[0].sk_argument = v;
}
/*
* btendscan() -- close down a scan
*/
Datum
btendscan(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
if (BufferIsValid(so->btso_curbuf))
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
if (BufferIsValid(so->btso_mrkbuf))
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (so->keyData != (ScanKey) NULL)
pfree(so->keyData);
pfree(so);
PG_RETURN_VOID();
}
/*
* btmarkpos() -- save current scan position
*/
Datum
btmarkpos(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on current buffer for assignment to mark buffer */
if (ItemPointerIsValid(&(scan->currentItemData)))
{
so->btso_mrkbuf = ReadBuffer(scan->indexRelation,
BufferGetBlockNumber(so->btso_curbuf));
scan->currentMarkData = scan->currentItemData;
1998-07-30 07:05:05 +02:00
so->mrkHeapIptr = so->curHeapIptr;
}
PG_RETURN_VOID();
}
/*
* btrestrpos() -- restore scan to last saved position
*/
Datum
btrestrpos(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on marked buffer */
if (ItemPointerIsValid(&(scan->currentMarkData)))
{
so->btso_curbuf = ReadBuffer(scan->indexRelation,
BufferGetBlockNumber(so->btso_mrkbuf));
scan->currentItemData = scan->currentMarkData;
1998-07-30 07:05:05 +02:00
so->curHeapIptr = so->mrkHeapIptr;
}
PG_RETURN_VOID();
}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
btbulkdelete(PG_FUNCTION_ARGS)
{
2001-03-22 05:01:46 +01:00
Relation rel = (Relation) PG_GETARG_POINTER(0);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
void *callback_state = (void *) PG_GETARG_POINTER(2);
IndexBulkDeleteResult *result;
BlockNumber num_pages;
double tuples_removed;
double num_index_tuples;
IndexScanDesc scan;
BTScanOpaque so;
ItemPointer current;
tuples_removed = 0;
num_index_tuples = 0;
/*
* We use a standard IndexScanDesc scan object, but to speed up the
* loop, we skip most of the wrapper layers of index_getnext and
* instead call _bt_step directly. This implies holding buffer lock
* on a target page throughout the loop over the page's tuples.
*
* Whenever we step onto a new page, we have to trade in the read
* lock acquired by _bt_first or _bt_step for an exclusive write lock
* (fortunately, _bt_relbuf doesn't care which kind of lock it's
* releasing when it comes time for _bt_step to release our lock).
*
* Note that we exclusive-lock every leaf page, or at least every one
* containing data items. It sounds attractive to only exclusive-lock
* those containing items we need to delete, but unfortunately that
* is not safe: we could then pass a stopped indexscan, which could
* in rare cases lead to deleting the item it needs to find when it
* resumes. (See _bt_restscan --- this could only happen if an indexscan
* stops on a deletable item and then a page split moves that item
* into a page further to its right, which the indexscan will have no
* pin on.)
*/
scan = index_beginscan(NULL, rel, SnapshotAny, 0, (ScanKey) NULL);
so = (BTScanOpaque) scan->opaque;
current = &(scan->currentItemData);
/* Use _bt_first to get started, then _bt_step to remaining tuples */
if (_bt_first(scan, ForwardScanDirection))
{
Buffer buf;
BlockNumber lockedBlock = InvalidBlockNumber;
/* we have the buffer pinned and read-locked */
buf = so->btso_curbuf;
Assert(BufferIsValid(buf));
do
{
Page page;
BlockNumber blkno;
OffsetNumber offnum;
BTItem btitem;
BTPageOpaque opaque;
IndexTuple itup;
ItemPointer htup;
CHECK_FOR_INTERRUPTS();
/* current is the next index tuple */
page = BufferGetPage(buf);
blkno = ItemPointerGetBlockNumber(current);
/*
* Make sure we have a super-exclusive write lock on this page.
*
* We assume that only concurrent insertions, not deletions,
* can occur while we're not holding the page lock (the
* caller should hold a suitable relation lock to ensure
* this). Therefore, no items can escape being scanned because
* of this temporary lock release.
*/
if (blkno != lockedBlock)
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBufferForCleanup(buf);
lockedBlock = blkno;
/*
* If the page was formerly rightmost but was split while we
* didn't hold the lock, and ip_posid is pointing to item
* 1, then ip_posid now points at the high key not a valid
* data item. In this case we need to step forward.
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (current->ip_posid < P_FIRSTDATAKEY(opaque))
current->ip_posid = P_FIRSTDATAKEY(opaque);
}
offnum = ItemPointerGetOffsetNumber(current);
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &btitem->bti_itup;
htup = &(itup->t_tid);
if (callback(htup, callback_state))
{
/* Okay to delete the item from the page */
_bt_itemdel(rel, buf, current);
/* Mark buffer dirty, but keep the lock and pin */
WriteNoReleaseBuffer(buf);
tuples_removed += 1;
/*
* We now need to back up the scan one item, so that the next
* cycle will re-examine the same offnum on this page (which
* now holds the next item).
*
* For now, just hack the current-item index. Will need to
* be smarter when deletion includes removal of empty
* index pages.
*/
current->ip_posid--;
}
else
num_index_tuples += 1;
} while (_bt_step(scan, &buf, ForwardScanDirection));
}
index_endscan(scan);
/* return statistics */
num_pages = RelationGetNumberOfBlocks(rel);
result = (IndexBulkDeleteResult *) palloc(sizeof(IndexBulkDeleteResult));
result->num_pages = num_pages;
result->tuples_removed = tuples_removed;
result->num_index_tuples = num_index_tuples;
PG_RETURN_POINTER(result);
}
1998-07-30 07:05:05 +02:00
/*
* Restore scan position when btgettuple is called to continue a scan.
*
* This is nontrivial because concurrent insertions might have moved the
* index tuple we stopped on. We assume the tuple can only have moved to
* the right from our stop point, because we kept a pin on the buffer,
* and so no deletion can have occurred on that page.
*
* On entry, we have a pin but no read lock on the buffer that contained
* the index tuple we stopped the scan on. On exit, we have pin and read
* lock on the buffer that now contains that index tuple, and the scandesc's
* current position is updated to point at it.
*/
1998-07-30 07:05:05 +02:00
static void
_bt_restscan(IndexScanDesc scan)
{
Relation rel = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Buffer buf = so->btso_curbuf;
Page page;
ItemPointer current = &(scan->currentItemData);
OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
maxoff;
BTPageOpaque opaque;
Buffer nextbuf;
ItemPointerData target = so->curHeapIptr;
BTItem item;
BlockNumber blkno;
1998-07-30 07:05:05 +02:00
/*
* Reacquire read lock on the buffer. (We should still have
* a reference-count pin on it, so need not get that.)
*/
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
1999-05-25 18:15:34 +02:00
* We use this as flag when first index tuple on page is deleted but
* we do not move left (this would slowdown vacuum) - so we set
* current->ip_posid before first index tuple on the current page
* (_bt_step will move it right)... XXX still needed?
*/
if (!ItemPointerIsValid(&target))
{
ItemPointerSetOffsetNumber(current,
2001-03-22 05:01:46 +01:00
OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
return;
}
/*
2001-03-22 05:01:46 +01:00
* The item we were on may have moved right due to insertions. Find it
* again. We use the heap TID to identify the item uniquely.
*/
for (;;)
1998-07-30 07:05:05 +02:00
{
/* Check for item on this page */
for (;
1998-07-30 07:05:05 +02:00
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
if (item->bti_itup.t_tid.ip_blkid.bi_hi ==
target.ip_blkid.bi_hi &&
item->bti_itup.t_tid.ip_blkid.bi_lo ==
target.ip_blkid.bi_lo &&
1998-07-30 07:05:05 +02:00
item->bti_itup.t_tid.ip_posid == target.ip_posid)
{
/* Found it */
1998-07-30 07:05:05 +02:00
current->ip_posid = offnum;
return;
}
}
/*
* The item we're looking for moved right at least one page, so
* move right. We are careful here to pin and read-lock the next
* page before releasing the current one. This ensures that a
* concurrent btbulkdelete scan cannot pass our position --- if it
* did, it might be able to reach and delete our target item before
* we can find it again.
*/
1998-07-30 07:05:05 +02:00
if (P_RIGHTMOST(opaque))
elog(FATAL, "_bt_restscan: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
1998-07-30 07:05:05 +02:00
blkno = opaque->btpo_next;
nextbuf = _bt_getbuf(rel, blkno, BT_READ);
_bt_relbuf(rel, buf);
so->btso_curbuf = buf = nextbuf;
1998-07-30 07:05:05 +02:00
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
offnum = P_FIRSTDATAKEY(opaque);
ItemPointerSet(current, blkno, offnum);
1998-07-30 07:05:05 +02:00
}
}
2000-10-13 04:03:02 +02:00
2000-10-21 17:43:36 +02:00
static void
_bt_restore_page(Page page, char *from, int len)
2000-10-13 04:03:02 +02:00
{
BTItemData btdata;
Size itemsz;
char *end = from + len;
2000-10-21 17:43:36 +02:00
2001-03-22 05:01:46 +01:00
for (; from < end;)
2000-10-13 04:03:02 +02:00
{
memcpy(&btdata, from, sizeof(BTItemData));
2000-10-21 17:43:36 +02:00
itemsz = IndexTupleDSize(btdata.bti_itup) +
2001-03-22 05:01:46 +01:00
(sizeof(BTItemData) - sizeof(IndexTupleData));
2000-10-21 17:43:36 +02:00
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) from, itemsz,
2001-03-22 05:01:46 +01:00
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(PANIC, "_bt_restore_page: can't add item to page");
from += itemsz;
2000-10-21 17:43:36 +02:00
}
}
static void
btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
xl_btree_delete *xlrec;
Relation reln;
Buffer buffer;
Page page;
2000-10-21 17:43:36 +02:00
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
2000-10-21 17:43:36 +02:00
return;
2001-03-22 05:01:46 +01:00
xlrec = (xl_btree_delete *) XLogRecGetData(record);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2001-03-22 05:01:46 +01:00
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2000-10-21 17:43:36 +02:00
if (!BufferIsValid(buffer))
elog(PANIC, "btree_delete_redo: block unfound");
2000-10-21 17:43:36 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_delete_redo: uninitialized page");
2000-10-21 17:43:36 +02:00
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
2000-10-21 17:43:36 +02:00
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-21 17:43:36 +02:00
return;
}
static void
btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
xl_btree_insert *xlrec;
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
2000-10-21 17:43:36 +02:00
if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
return;
2001-03-22 05:01:46 +01:00
xlrec = (xl_btree_insert *) XLogRecGetData(record);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2001-03-22 05:01:46 +01:00
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
2000-10-21 17:43:36 +02:00
if (!BufferIsValid(buffer))
elog(PANIC, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un");
2000-10-21 17:43:36 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
2000-10-21 17:43:36 +02:00
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
2001-03-22 05:01:46 +01:00
if (PageAddItem(page, (Item) ((char *) xlrec + SizeOfBtreeInsert),
record->xl_len - SizeOfBtreeInsert,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
LP_USED) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
2000-10-21 17:43:36 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-21 17:43:36 +02:00
}
else
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_insert_undo: bad page LSN");
2000-10-21 17:43:36 +02:00
2001-03-22 05:01:46 +01:00
if (!P_ISLEAF(pageop))
2000-10-21 17:43:36 +02:00
{
UnlockAndReleaseBuffer(buffer);
return;
}
elog(PANIC, "btree_insert_undo: unimplemented");
2000-10-21 17:43:36 +02:00
}
return;
}
static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
Relation reln;
BlockNumber blkno;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
bool isleaf = (record->xl_info & XLOG_BTREE_LEAF);
2000-10-21 17:43:36 +02:00
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
2000-10-13 04:03:02 +02:00
/* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
2001-03-22 05:01:46 +01:00
BlockIdGetBlockNumber(&(xlrec->otherblk));
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_split_%s: lost left sibling", op);
2000-10-13 04:03:02 +02:00
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_split_undo: uninitialized left sibling");
2000-10-13 04:03:02 +02:00
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
pageop->btpo_prev = BlockIdGetBlockNumber(&(xlrec->leftblk));
if (onleft)
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
2000-10-13 04:03:02 +02:00
else
pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
2000-10-13 04:03:02 +02:00
2001-03-22 05:01:46 +01:00
_bt_restore_page(page, (char *) xlrec + SizeOfBtreeSplit, xlrec->leftlen);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2001-03-22 05:01:46 +01:00
else
/* undo */
2000-10-13 04:03:02 +02:00
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_split_undo: bad left sibling LSN");
elog(PANIC, "btree_split_undo: unimplemented");
2000-10-13 04:03:02 +02:00
}
/* Right (new) sibling */
2001-03-22 05:01:46 +01:00
blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_split_%s: lost right sibling", op);
2000-10-13 04:03:02 +02:00
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_split_undo: uninitialized right sibling");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2000-10-13 04:03:02 +02:00
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
2001-03-22 05:01:46 +01:00
pageop->btpo_prev = (onleft) ?
ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(&(xlrec->otherblk));
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
2000-10-13 04:03:02 +02:00
_bt_restore_page(page,
2001-03-22 05:01:46 +01:00
(char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2001-03-22 05:01:46 +01:00
else
/* undo */
2000-10-13 04:03:02 +02:00
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_split_undo: bad right sibling LSN");
elog(PANIC, "btree_split_undo: unimplemented");
2000-10-13 04:03:02 +02:00
}
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
return;
2000-10-13 04:03:02 +02:00
/* Right (next) page */
2000-10-21 17:43:36 +02:00
blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
2000-10-29 19:33:41 +01:00
if (blkno == P_NONE)
return;
2000-10-13 04:03:02 +02:00
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_split_redo: lost next right page");
2000-10-13 04:03:02 +02:00
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_split_redo: uninitialized next right page");
2000-10-13 04:03:02 +02:00
if (XLByteLE(lsn, PageGetLSN(page)))
2000-10-13 04:03:02 +02:00
{
UnlockAndReleaseBuffer(buffer);
return;
2000-10-13 04:03:02 +02:00
}
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2001-03-22 05:01:46 +01:00
pageop->btpo_prev = (onleft) ?
BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
static void
btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
Buffer metabuf;
Page metapg;
BTMetaPageData md;
2000-10-13 04:03:02 +02:00
if (!redo)
return;
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
if (!BufferIsValid(buffer))
elog(PANIC, "btree_newroot_redo: no root page");
2000-10-13 04:03:02 +02:00
metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_newroot_redo: no metapage");
2000-10-13 04:03:02 +02:00
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
2000-10-13 04:03:02 +02:00
pageop->btpo_flags |= BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo_parent = BTREE_METAPAGE;
2000-10-13 04:03:02 +02:00
if (record->xl_info & XLOG_BTREE_LEAF)
pageop->btpo_flags |= BTP_LEAF;
2000-10-13 04:03:02 +02:00
if (record->xl_len > SizeOfBtreeNewroot)
_bt_restore_page(page,
2001-03-22 05:01:46 +01:00
(char *) xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
2000-10-13 04:03:02 +02:00
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
2000-10-13 04:03:02 +02:00
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md.btm_magic = BTREE_MAGIC;
md.btm_version = BTREE_VERSION;
md.btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
md.btm_level = xlrec->level;
memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
PageSetLSN(metapg, lsn);
PageSetSUI(metapg, ThisStartUpID);
UnlockAndWriteBuffer(metabuf);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_redo(XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2001-03-22 05:01:46 +01:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(true, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(true, false, lsn, record); /* new item on the right */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLEFT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(true, true, lsn, record); /* new item on the left */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(PANIC, "btree_redo: unknown op code %u", info);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
void
btree_undo(XLogRecPtr lsn, XLogRecord *record)
2000-10-13 04:03:02 +02:00
{
2001-03-22 05:01:46 +01:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
2000-10-13 04:03:02 +02:00
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(false, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(false, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(false, false, lsn, record); /* new item on the right */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLEFT)
2001-03-22 05:01:46 +01:00
btree_xlog_split(false, true, lsn, record); /* new item on the left */
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
else
elog(PANIC, "btree_undo: unknown op code %u", info);
2000-10-13 04:03:02 +02:00
}
2000-10-21 17:43:36 +02:00
static void
out_target(char *buf, xl_btreetid *target)
2000-10-13 14:05:22 +02:00
{
2000-10-21 17:43:36 +02:00
sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
2001-03-22 05:01:46 +01:00
target->node.tblNode, target->node.relNode,
ItemPointerGetBlockNumber(&(target->tid)),
ItemPointerGetOffsetNumber(&(target->tid)));
2000-10-13 14:05:22 +02:00
}
2001-03-22 05:01:46 +01:00
2000-10-21 17:43:36 +02:00
void
2001-03-22 05:01:46 +01:00
btree_desc(char *buf, uint8 xl_info, char *rec)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
uint8 info = xl_info & ~XLR_INFO_MASK;
2000-10-13 14:05:22 +02:00
info &= ~XLOG_BTREE_LEAF;
2000-10-21 17:43:36 +02:00
if (info == XLOG_BTREE_INSERT)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_insert *xlrec = (xl_btree_insert *) rec;
2000-10-21 17:43:36 +02:00
strcat(buf, "insert: ");
out_target(buf, &(xlrec->target));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_DELETE)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_delete *xlrec = (xl_btree_delete *) rec;
2000-10-21 17:43:36 +02:00
strcat(buf, "delete: ");
out_target(buf, &(xlrec->target));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_split *xlrec = (xl_btree_split *) rec;
sprintf(buf + strlen(buf), "split(%s): ",
(info == XLOG_BTREE_SPLIT) ? "right" : "left");
2000-10-21 17:43:36 +02:00
out_target(buf, &(xlrec->target));
sprintf(buf + strlen(buf), "; oth %u; rgh %u",
2001-03-22 05:01:46 +01:00
BlockIdGetBlockNumber(&xlrec->otherblk),
BlockIdGetBlockNumber(&xlrec->rightblk));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else if (info == XLOG_BTREE_NEWROOT)
2000-10-13 14:05:22 +02:00
{
2001-03-22 05:01:46 +01:00
xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
2000-10-21 17:43:36 +02:00
sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
2001-03-22 05:01:46 +01:00
xlrec->node.tblNode, xlrec->node.relNode,
BlockIdGetBlockNumber(&xlrec->rootblk));
2000-10-13 14:05:22 +02:00
}
2000-10-21 17:43:36 +02:00
else
strcat(buf, "UNKNOWN");
2000-10-13 14:05:22 +02:00
}