Reimplement hash index locking algorithms, per my recent proposal to

pghackers.  This fixes the problem recently reported by Markus KrÌutner
(hash bucket split corrupts the state of scans being done concurrently),
and I believe it also fixes all the known problems with deadlocks in
hash index operations.  Hash indexes are still not really ready for prime
time (since they aren't WAL-logged), but this is a step forward.
This commit is contained in:
Tom Lane 2003-09-04 22:06:27 +00:00
parent ca43f71ca5
commit 7a3693716d
11 changed files with 1052 additions and 1036 deletions

View File

@ -1,4 +1,4 @@
$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.2 2003/09/02 03:29:01 tgl Exp $
$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.3 2003/09/04 22:06:27 tgl Exp $
This directory contains an implementation of hash indexing for Postgres.
@ -229,8 +229,8 @@ existing bucket in two, thereby lowering the fill ratio:
check split still needed
if split not needed anymore, drop locks and exit
decide which bucket to split
Attempt to X-lock new bucket number (shouldn't fail, but...)
Attempt to X-lock old bucket number (definitely could fail)
Attempt to X-lock new bucket number (shouldn't fail, but...)
if above fail, drop locks and exit
update meta page to reflect new number of buckets
write/release meta page
@ -261,12 +261,6 @@ not be overfull and split attempts will stop. (We could make a successful
splitter loop to see if the index is still overfull, but it seems better to
distribute the split overhead across successive insertions.)
It may be wise to make the initial exclusive-lock-page-zero operation a
conditional one as well, although the odds of a deadlock failure are quite
low. (AFAICS it could only deadlock against a VACUUM operation that is
trying to X-lock a bucket that the current process has a stopped indexscan
in.)
A problem is that if a split fails partway through (eg due to insufficient
disk space) the index is left corrupt. The probability of that could be
made quite low if we grab a free page or two before we update the meta

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.67 2003/09/02 18:13:29 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.68 2003/09/04 22:06:27 tgl Exp $
*
* NOTES
* This file contains only the public interface routines.
@ -27,9 +27,6 @@
#include "miscadmin.h"
bool BuildingHash = false;
/* Working state for hashbuild and its callback */
typedef struct
{
@ -61,9 +58,6 @@ hashbuild(PG_FUNCTION_ARGS)
double reltuples;
HashBuildState buildstate;
/* set flag to disable locking */
BuildingHash = true;
/*
* We expect to be called exactly once for any index relation. If
* that's not the case, big trouble's what we have.
@ -82,9 +76,6 @@ hashbuild(PG_FUNCTION_ARGS)
reltuples = IndexBuildHeapScan(heap, index, indexInfo,
hashbuildCallback, (void *) &buildstate);
/* all done */
BuildingHash = false;
/*
* Since we just counted the tuples in the heap, we update its stats
* in pg_class to guarantee that the planner takes advantage of the
@ -212,10 +203,18 @@ hashgettuple(PG_FUNCTION_ARGS)
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
Page page;
OffsetNumber offnum;
bool res;
/*
* We hold pin but not lock on current buffer while outside the hash AM.
* Reacquire the read lock here.
*/
if (BufferIsValid(so->hashso_curbuf))
_hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
/*
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call a
@ -267,6 +266,10 @@ hashgettuple(PG_FUNCTION_ARGS)
}
}
/* Release read lock on current buffer, but keep it pinned */
if (BufferIsValid(so->hashso_curbuf))
_hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
PG_RETURN_BOOL(res);
}
@ -285,6 +288,8 @@ hashbeginscan(PG_FUNCTION_ARGS)
scan = RelationGetIndexScan(rel, keysz, scankey);
so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
so->hashso_bucket_valid = false;
so->hashso_bucket_blkno = 0;
so->hashso_curbuf = so->hashso_mrkbuf = InvalidBuffer;
scan->opaque = so;
@ -303,28 +308,38 @@ hashrescan(PG_FUNCTION_ARGS)
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
HashScanOpaque so = (HashScanOpaque) scan->opaque;
ItemPointer iptr;
Relation rel = scan->indexRelation;
/* we hold a read lock on the current page in the scan */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
/* if we are called from beginscan, so is still NULL */
if (so)
{
_hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
/* release any pins we still hold */
if (BufferIsValid(so->hashso_curbuf))
_hash_dropbuf(rel, so->hashso_curbuf);
so->hashso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
_hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
if (BufferIsValid(so->hashso_mrkbuf))
_hash_dropbuf(rel, so->hashso_mrkbuf);
so->hashso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
/* release lock on bucket, too */
if (so->hashso_bucket_blkno)
_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
so->hashso_bucket_blkno = 0;
}
/* set positions invalid (this will cause _hash_first call) */
ItemPointerSetInvalid(&(scan->currentItemData));
ItemPointerSetInvalid(&(scan->currentMarkData));
/* Update scan key, if a new one is given */
if (scankey && scan->numberOfKeys > 0)
{
memmove(scan->keyData,
scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
if (so)
so->hashso_bucket_valid = false;
}
PG_RETURN_VOID();
@ -337,32 +352,32 @@ Datum
hashendscan(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
HashScanOpaque so;
so = (HashScanOpaque) scan->opaque;
/* release any locks we still hold */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
_hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
so->hashso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
if (BufferIsValid(so->hashso_mrkbuf))
_hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
so->hashso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
/* don't need scan registered anymore */
_hash_dropscan(scan);
/* release any pins we still hold */
if (BufferIsValid(so->hashso_curbuf))
_hash_dropbuf(rel, so->hashso_curbuf);
so->hashso_curbuf = InvalidBuffer;
if (BufferIsValid(so->hashso_mrkbuf))
_hash_dropbuf(rel, so->hashso_mrkbuf);
so->hashso_mrkbuf = InvalidBuffer;
/* release lock on bucket, too */
if (so->hashso_bucket_blkno)
_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
so->hashso_bucket_blkno = 0;
/* be tidy */
pfree(scan->opaque);
ItemPointerSetInvalid(&(scan->currentItemData));
ItemPointerSetInvalid(&(scan->currentMarkData));
pfree(so);
scan->opaque = NULL;
PG_RETURN_VOID();
}
@ -374,25 +389,21 @@ Datum
hashmarkpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
HashScanOpaque so;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
so = (HashScanOpaque) scan->opaque;
/* release pin on old marked data, if any */
if (BufferIsValid(so->hashso_mrkbuf))
_hash_dropbuf(rel, so->hashso_mrkbuf);
so->hashso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(&(scan->currentMarkData));
/* release lock on old marked data, if any */
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
_hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
so->hashso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump lock on currentItemData and copy to currentMarkData */
/* bump pin count on currentItemData and copy to currentMarkData */
if (ItemPointerIsValid(&(scan->currentItemData)))
{
so->hashso_mrkbuf = _hash_getbuf(scan->indexRelation,
so->hashso_mrkbuf = _hash_getbuf(rel,
BufferGetBlockNumber(so->hashso_curbuf),
HASH_READ);
HASH_NOLOCK);
scan->currentMarkData = scan->currentItemData;
}
@ -406,26 +417,21 @@ Datum
hashrestrpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
HashScanOpaque so;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
so = (HashScanOpaque) scan->opaque;
/* release pin on current data, if any */
if (BufferIsValid(so->hashso_curbuf))
_hash_dropbuf(rel, so->hashso_curbuf);
so->hashso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(&(scan->currentItemData));
/* release lock on current data, if any */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
_hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
so->hashso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump lock on currentMarkData and copy to currentItemData */
/* bump pin count on currentMarkData and copy to currentItemData */
if (ItemPointerIsValid(&(scan->currentMarkData)))
{
so->hashso_curbuf = _hash_getbuf(scan->indexRelation,
so->hashso_curbuf = _hash_getbuf(rel,
BufferGetBlockNumber(so->hashso_mrkbuf),
HASH_READ);
HASH_NOLOCK);
scan->currentItemData = scan->currentMarkData;
}
@ -474,7 +480,7 @@ hashbulkdelete(PG_FUNCTION_ARGS)
orig_maxbucket = metap->hashm_maxbucket;
orig_ntuples = metap->hashm_ntuples;
memcpy(&local_metapage, metap, sizeof(local_metapage));
_hash_relbuf(rel, metabuf, HASH_READ);
_hash_relbuf(rel, metabuf);
/* Scan the buckets that we know exist */
cur_bucket = 0;
@ -490,7 +496,12 @@ loop_top:
/* Get address of bucket's start page */
bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
/* XXX lock bucket here */
/* Exclusive-lock the bucket so we can shrink it */
_hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
/* Shouldn't have any active scans locally, either */
if (_hash_has_active_scan(rel, cur_bucket))
elog(ERROR, "hash index has active scan during VACUUM");
/* Scan each page in bucket */
blkno = bucket_blkno;
@ -522,13 +533,6 @@ loop_top:
htup = &(hitem->hash_itup.t_tid);
if (callback(htup, callback_state))
{
ItemPointerData indextup;
/* adjust any active scans that will be affected */
/* (this should be unnecessary) */
ItemPointerSet(&indextup, blkno, offno);
_hash_adjscans(rel, &indextup);
/* delete the item from the page */
PageIndexTupleDelete(page, offno);
bucket_dirty = page_dirty = true;
@ -547,24 +551,22 @@ loop_top:
}
/*
* Write or free page if needed, advance to next page. We want
* to preserve the invariant that overflow pages are nonempty.
* Write page if needed, advance to next page.
*/
blkno = opaque->hasho_nextblkno;
if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
_hash_freeovflpage(rel, buf);
else if (page_dirty)
if (page_dirty)
_hash_wrtbuf(rel, buf);
else
_hash_relbuf(rel, buf, HASH_WRITE);
_hash_relbuf(rel, buf);
}
/* If we deleted anything, try to compact free space */
if (bucket_dirty)
_hash_squeezebucket(rel, cur_bucket, bucket_blkno);
/* XXX unlock bucket here */
/* Release bucket lock */
_hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
/* Advance to next bucket */
cur_bucket++;
@ -580,7 +582,7 @@ loop_top:
/* There's been a split, so process the additional bucket(s) */
cur_maxbucket = metap->hashm_maxbucket;
memcpy(&local_metapage, metap, sizeof(local_metapage));
_hash_relbuf(rel, metabuf, HASH_WRITE);
_hash_relbuf(rel, metabuf);
goto loop_top;
}

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.29 2003/09/02 18:13:30 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.30 2003/09/04 22:06:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -16,136 +16,124 @@
#include "postgres.h"
#include "access/hash.h"
#include "storage/lmgr.h"
static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
Size itemsize, HashItem hitem);
static InsertIndexResult _hash_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, HashItem hitem, Buffer metabuf);
static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, HashItem hitem);
/*
* _hash_doinsert() -- Handle insertion of a single HashItem in the table.
*
* This routine is called by the public interface routines, hashbuild
* and hashinsert. By here, hashitem is filled in, and has a unique
* (xid, seqno) pair. The datum to be used as a "key" is in the
* hashitem.
* and hashinsert. By here, hashitem is completely filled in.
* The datum to be used as a "key" is in the hashitem.
*/
InsertIndexResult
_hash_doinsert(Relation rel, HashItem hitem)
{
Buffer buf;
Buffer metabuf;
BlockNumber blkno;
HashMetaPage metap;
IndexTuple itup;
BlockNumber itup_blkno;
OffsetNumber itup_off;
InsertIndexResult res;
ScanKey itup_scankey;
int natts;
BlockNumber blkno;
Page page;
HashPageOpaque pageopaque;
Size itemsz;
bool do_expand;
uint32 hashkey;
Bucket bucket;
Datum datum;
bool isnull;
/*
* Compute the hash key for the item. We do this first so as not to
* need to hold any locks while running the hash function.
*/
itup = &(hitem->hash_itup);
if (rel->rd_rel->relnatts != 1)
elog(ERROR, "hash indexes support only one index key");
datum = index_getattr(itup, 1, RelationGetDescr(rel), &isnull);
Assert(!isnull);
hashkey = _hash_datum2hashkey(rel, datum);
/* compute item size too */
itemsz = IndexTupleDSize(hitem->hash_itup)
+ (sizeof(HashItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but
* we need to be consistent */
/*
* Acquire shared split lock so we can compute the target bucket
* safely (see README).
*/
_hash_getlock(rel, 0, HASH_SHARE);
/* Read the metapage */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* we need a scan key to do our search, so build one */
itup = &(hitem->hash_itup);
if ((natts = rel->rd_rel->relnatts) != 1)
elog(ERROR, "Hash indexes support only one index key");
itup_scankey = _hash_mkscankey(rel, itup);
/*
* Check whether the item can fit on a hash page at all. (Eventually,
* we ought to try to apply TOAST methods if not.) Note that at this
* point, itemsz doesn't include the ItemId.
*/
if (itemsz > HashMaxItemSize((Page) metap))
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index tuple size %lu exceeds hash maximum, %lu",
(unsigned long) itemsz,
(unsigned long) HashMaxItemSize((Page) metap))));
/*
* find the first page in the bucket chain containing this key and
* place it in buf. _hash_search obtains a read lock for us.
* Compute the target bucket number, and convert to block number.
*/
_hash_search(rel, natts, itup_scankey, &buf, metap);
bucket = _hash_hashkey2bucket(hashkey,
metap->hashm_maxbucket,
metap->hashm_highmask,
metap->hashm_lowmask);
blkno = BUCKET_TO_BLKNO(metap, bucket);
/* release lock on metapage, but keep pin since we'll need it again */
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
/*
* Acquire share lock on target bucket; then we can release split lock.
*/
_hash_getlock(rel, blkno, HASH_SHARE);
_hash_droplock(rel, 0, HASH_SHARE);
/* Fetch the primary bucket page for the bucket */
buf = _hash_getbuf(rel, blkno, HASH_WRITE);
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
/*
* trade in our read lock for a write lock so that we can do the
* insertion.
*/
blkno = BufferGetBlockNumber(buf);
_hash_relbuf(rel, buf, HASH_READ);
buf = _hash_getbuf(rel, blkno, HASH_WRITE);
/*
* XXX btree comment (haven't decided what to do in hash): don't think
* the bucket can be split while we're reading the metapage.
*
* If the page was split between the time that we surrendered our read
* lock and acquired our write lock, then this page may no longer be
* the right place for the key we want to insert.
*/
/* do the insertion */
res = _hash_insertonpg(rel, buf, natts, itup_scankey,
hitem, metabuf);
/* be tidy */
_hash_freeskey(itup_scankey);
return res;
}
/*
* _hash_insertonpg() -- Insert a tuple on a particular page in the table.
*
* This recursive procedure does the following things:
*
* + if necessary, splits the target page.
* + inserts the tuple.
*
* On entry, we must have the right buffer on which to do the
* insertion, and the buffer must be pinned and locked. On return,
* we will have dropped both the pin and the write lock on the buffer.
*
*/
static InsertIndexResult
_hash_insertonpg(Relation rel,
Buffer buf,
int keysz,
ScanKey scankey,
HashItem hitem,
Buffer metabuf)
{
InsertIndexResult res;
Page page;
BlockNumber itup_blkno;
OffsetNumber itup_off;
Size itemsz;
HashPageOpaque pageopaque;
bool do_expand = false;
Buffer ovflbuf;
HashMetaPage metap;
Bucket bucket;
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
bucket = pageopaque->hasho_bucket;
itemsz = IndexTupleDSize(hitem->hash_itup)
+ (sizeof(HashItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
Assert(pageopaque->hasho_bucket == bucket);
/* Do the insertion */
while (PageGetFreeSpace(page) < itemsz)
{
/*
* no space on this page; check for an overflow page
*/
if (BlockNumberIsValid(pageopaque->hasho_nextblkno))
BlockNumber nextblkno = pageopaque->hasho_nextblkno;
if (BlockNumberIsValid(nextblkno))
{
/*
* ovfl page exists; go get it. if it doesn't have room,
* we'll find out next pass through the loop test above.
*/
ovflbuf = _hash_getbuf(rel, pageopaque->hasho_nextblkno,
HASH_WRITE);
_hash_relbuf(rel, buf, HASH_WRITE);
buf = ovflbuf;
_hash_relbuf(rel, buf);
buf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
page = BufferGetPage(buf);
}
else
@ -154,65 +142,72 @@ _hash_insertonpg(Relation rel,
* we're at the end of the bucket chain and we haven't found a
* page with enough room. allocate a new overflow page.
*/
do_expand = true;
ovflbuf = _hash_addovflpage(rel, metabuf, buf);
_hash_relbuf(rel, buf, HASH_WRITE);
buf = ovflbuf;
/* release our write lock without modifying buffer */
_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
/* chain to a new overflow page */
buf = _hash_addovflpage(rel, metabuf, buf);
page = BufferGetPage(buf);
if (PageGetFreeSpace(page) < itemsz)
{
/* it doesn't fit on an empty page -- give up */
elog(ERROR, "hash item too large");
}
/* should fit now, given test above */
Assert(PageGetFreeSpace(page) >= itemsz);
}
_hash_checkpage(rel, page, LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(pageopaque->hasho_bucket == bucket);
}
itup_off = _hash_pgaddtup(rel, buf, keysz, scankey, itemsz, hitem);
/* found page with enough space, so add the item here */
itup_off = _hash_pgaddtup(rel, buf, itemsz, hitem);
itup_blkno = BufferGetBlockNumber(buf);
/* by here, the new tuple is inserted */
/* write and release the modified page */
_hash_wrtbuf(rel, buf);
/* We can drop the bucket lock now */
_hash_droplock(rel, blkno, HASH_SHARE);
/*
* Write-lock the metapage so we can increment the tuple count.
* After incrementing it, check to see if it's time for a split.
*/
_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
metap->hashm_ntuples += 1;
/* Make sure this stays in sync with _hash_expandtable() */
do_expand = metap->hashm_ntuples >
(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
/* Write out the metapage and drop lock, but keep pin */
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
/* Attempt to split if a split is needed */
if (do_expand)
_hash_expandtable(rel, metabuf);
/* Finally drop our pin on the metapage */
_hash_dropbuf(rel, metabuf);
/* Create the return data structure */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
if (res != NULL)
{
/*
* Increment the number of keys in the table. We switch lock
* access type just for a moment to allow greater accessibility to
* the metapage.
*/
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
metap->hashm_ntuples += 1;
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
}
_hash_wrtbuf(rel, buf);
if (do_expand ||
(metap->hashm_ntuples / (metap->hashm_maxbucket + 1))
> (double) metap->hashm_ffactor)
_hash_expandtable(rel, metabuf);
_hash_relbuf(rel, metabuf, HASH_READ);
return res;
}
/*
* _hash_pgaddtup() -- add a tuple to a particular page in the index.
*
* This routine adds the tuple to the page as requested, and keeps the
* write lock and reference associated with the page's buffer. It is
* an error to call pgaddtup() without a write lock and reference.
* This routine adds the tuple to the page as requested; it does
* not write out the page. It is an error to call pgaddtup() without
* a write lock and pin.
*/
static OffsetNumber
_hash_pgaddtup(Relation rel,
Buffer buf,
int keysz,
ScanKey itup_scankey,
Size itemsize,
HashItem hitem)
{
@ -228,8 +223,5 @@ _hash_pgaddtup(Relation rel,
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(rel));
/* write the buffer, but hold our lock */
_hash_wrtnorelbuf(buf);
return itup_off;
}

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.40 2003/09/02 18:13:30 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.41 2003/09/04 22:06:27 tgl Exp $
*
* NOTES
* Overflow pages look like ordinary relation pages.
@ -77,39 +77,68 @@ blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
/*
* _hash_addovflpage
*
* Add an overflow page to the page currently pointed to by the buffer
* argument 'buf'.
* Add an overflow page to the bucket whose last page is pointed to by 'buf'.
*
* metabuf has a read lock upon entering the function; buf has a
* write lock. The same is true on exit. The returned overflow page
* is write-locked.
* On entry, the caller must hold a pin but no lock on 'buf'. The pin is
* dropped before exiting (we assume the caller is not interested in 'buf'
* anymore). The returned overflow page will be pinned and write-locked;
* it is guaranteed to be empty.
*
* The caller must hold a pin, but no lock, on the metapage buffer.
* That buffer is returned in the same state.
*
* The caller must hold at least share lock on the bucket, to ensure that
* no one else tries to compact the bucket meanwhile. This guarantees that
* 'buf' won't stop being part of the bucket while it's unlocked.
*
* NB: since this could be executed concurrently by multiple processes,
* one should not assume that the returned overflow page will be the
* immediate successor of the originally passed 'buf'. Additional overflow
* pages might have been added to the bucket chain in between.
*/
Buffer
_hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
{
BlockNumber ovflblkno;
Buffer ovflbuf;
HashMetaPage metap;
HashPageOpaque ovflopaque;
HashPageOpaque pageopaque;
Page page;
Page ovflpage;
/* this had better be the last page in a bucket chain */
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno));
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
HashPageOpaque pageopaque;
HashPageOpaque ovflopaque;
/* allocate an empty overflow page */
ovflblkno = _hash_getovflpage(rel, metabuf);
/* lock the overflow page */
ovflbuf = _hash_getbuf(rel, ovflblkno, HASH_WRITE);
ovflpage = BufferGetPage(ovflbuf);
/* initialize the new overflow page */
/*
* Write-lock the tail page. It is okay to hold two buffer locks here
* since there cannot be anyone else contending for access to ovflbuf.
*/
_hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_WRITE);
/* loop to find current tail page, in case someone else inserted too */
for (;;)
{
BlockNumber nextblkno;
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
nextblkno = pageopaque->hasho_nextblkno;
if (!BlockNumberIsValid(nextblkno))
break;
/* we assume we do not need to write the unmodified page */
_hash_relbuf(rel, buf);
buf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
}
/* now that we have correct backlink, initialize new overflow page */
_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
@ -117,11 +146,12 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
ovflopaque->hasho_filler = HASHO_FILL;
_hash_wrtnorelbuf(ovflbuf);
_hash_wrtnorelbuf(rel, ovflbuf);
/* logically chain overflow page to previous page */
pageopaque->hasho_nextblkno = ovflblkno;
_hash_wrtnorelbuf(buf);
_hash_wrtbuf(rel, buf);
return ovflbuf;
}
@ -130,9 +160,8 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
*
* Find an available overflow page and return its block number.
*
* When we enter this function, we have a read lock on metabuf which
* we change to a write lock immediately. Before exiting, the write lock
* is exchanged for a read lock.
* The caller must hold a pin, but no lock, on the metapage buffer.
* The buffer is returned in the same state.
*/
static BlockNumber
_hash_getovflpage(Relation rel, Buffer metabuf)
@ -140,6 +169,7 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
HashMetaPage metap;
Buffer mapbuf = 0;
BlockNumber blkno;
uint32 orig_firstfree;
uint32 splitnum;
uint32 *freep = NULL;
uint32 max_ovflpg;
@ -150,51 +180,66 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
uint32 i,
j;
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
metap = (HashMetaPage) BufferGetPage(metabuf);
splitnum = metap->hashm_ovflpoint;
/* Get exclusive lock on the meta page */
_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
/* end search with the last existing overflow page */
max_ovflpg = metap->hashm_spares[splitnum] - 1;
last_page = max_ovflpg >> BMPG_SHIFT(metap);
last_bit = max_ovflpg & BMPG_MASK(metap);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* start search at hashm_firstfree */
first_page = metap->hashm_firstfree >> BMPG_SHIFT(metap);
bit = metap->hashm_firstfree & BMPG_MASK(metap);
orig_firstfree = metap->hashm_firstfree;
first_page = orig_firstfree >> BMPG_SHIFT(metap);
bit = orig_firstfree & BMPG_MASK(metap);
i = first_page;
j = bit / BITS_PER_MAP;
bit &= ~(BITS_PER_MAP - 1);
for (i = first_page; i <= last_page; i++)
/* outer loop iterates once per bitmap page */
for (;;)
{
BlockNumber mapblkno;
Page mappage;
uint32 last_inpage;
mapblkno = metap->hashm_mapp[i];
mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
mappage = BufferGetPage(mapbuf);
_hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
freep = HashPageGetBitmap(mappage);
/* want to end search with the last existing overflow page */
splitnum = metap->hashm_ovflpoint;
max_ovflpg = metap->hashm_spares[splitnum] - 1;
last_page = max_ovflpg >> BMPG_SHIFT(metap);
last_bit = max_ovflpg & BMPG_MASK(metap);
if (i != first_page)
{
bit = 0;
j = 0;
}
if (i > last_page)
break;
Assert(i < metap->hashm_nmaps);
mapblkno = metap->hashm_mapp[i];
if (i == last_page)
last_inpage = last_bit;
else
last_inpage = BMPGSZ_BIT(metap) - 1;
/* Release exclusive lock on metapage while reading bitmap page */
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
mappage = BufferGetPage(mapbuf);
_hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
freep = HashPageGetBitmap(mappage);
for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
{
if (freep[j] != ALL_SET)
goto found;
}
_hash_relbuf(rel, mapbuf, HASH_WRITE);
/* No free space here, try to advance to next map page */
_hash_relbuf(rel, mapbuf);
i++;
j = 0; /* scan from start of next map page */
bit = 0;
/* Reacquire exclusive lock on the meta page */
_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
}
/* No Free Page Found - have to allocate a new page */
@ -225,13 +270,19 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
*/
}
/* mark new page as first free so we don't search much next time */
metap->hashm_firstfree = bit;
/* Calculate address of the new overflow page */
blkno = bitno_to_blkno(metap, bit);
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
/*
* Adjust hashm_firstfree to avoid redundant searches. But don't
* risk changing it if someone moved it while we were searching
* bitmap pages.
*/
if (metap->hashm_firstfree == orig_firstfree)
metap->hashm_firstfree = bit + 1;
/* Write updated metapage and release lock, but not pin */
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
return blkno;
@ -239,20 +290,36 @@ found:
/* convert bit to bit number within page */
bit += _hash_firstfreebit(freep[j]);
/* mark page "in use" */
/* mark page "in use" in the bitmap */
SETBIT(freep, bit);
_hash_wrtbuf(rel, mapbuf);
/* Reacquire exclusive lock on the meta page */
_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
/* convert bit to absolute bit number */
bit += (i << BMPG_SHIFT(metap));
/* adjust hashm_firstfree to avoid redundant searches */
if (bit > metap->hashm_firstfree)
metap->hashm_firstfree = bit;
/* Calculate address of the new overflow page */
blkno = bitno_to_blkno(metap, bit);
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
/*
* Adjust hashm_firstfree to avoid redundant searches. But don't
* risk changing it if someone moved it while we were searching
* bitmap pages.
*/
if (metap->hashm_firstfree == orig_firstfree)
{
metap->hashm_firstfree = bit + 1;
/* Write updated metapage and release lock, but not pin */
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
}
else
{
/* We didn't change the metapage, so no need to write */
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
}
return blkno;
}
@ -275,7 +342,10 @@ _hash_firstfreebit(uint32 map)
return i;
mask <<= 1;
}
return i;
elog(ERROR, "firstfreebit found no free bit");
return 0; /* keep compiler quiet */
}
/*
@ -287,7 +357,9 @@ _hash_firstfreebit(uint32 map)
* Returns the block number of the page that followed the given page
* in the bucket, or InvalidBlockNumber if no following page.
*
* NB: caller must not hold lock on metapage.
* NB: caller must not hold lock on metapage, nor on either page that's
* adjacent in the bucket chain. The caller had better hold exclusive lock
* on the bucket, too.
*/
BlockNumber
_hash_freeovflpage(Relation rel, Buffer ovflbuf)
@ -308,10 +380,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
bitmapbit;
Bucket bucket;
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* Get information from the doomed page */
ovflblkno = BufferGetBlockNumber(ovflbuf);
ovflpage = BufferGetPage(ovflbuf);
_hash_checkpage(rel, ovflpage, LH_OVERFLOW_PAGE);
@ -319,17 +388,16 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
nextblkno = ovflopaque->hasho_nextblkno;
prevblkno = ovflopaque->hasho_prevblkno;
bucket = ovflopaque->hasho_bucket;
/* Zero the page for debugging's sake; then write and release it */
MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
_hash_wrtbuf(rel, ovflbuf);
/*
* fix up the bucket chain. this is a doubly-linked list, so we must
* Fix up the bucket chain. this is a doubly-linked list, so we must
* fix up the bucket chain members behind and ahead of the overflow
* page being deleted.
*
* XXX this should look like: - lock prev/next - modify/write prev/next
* (how to do write ordering with a doubly-linked list?) - unlock
* prev/next
* page being deleted. No concurrency issues since we hold exclusive
* lock on the entire bucket.
*/
if (BlockNumberIsValid(prevblkno))
{
@ -354,9 +422,12 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
_hash_wrtbuf(rel, nextbuf);
}
/*
* Clear the bitmap bit to indicate that this overflow page is free.
*/
/* Read the metapage so we can determine which bitmap page to use */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* Identify which bit to set */
ovflbitno = blkno_to_bitno(metap, ovflblkno);
bitmappage = ovflbitno >> BMPG_SHIFT(metap);
@ -366,18 +437,32 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
elog(ERROR, "invalid overflow bit number %u", ovflbitno);
blkno = metap->hashm_mapp[bitmappage];
/* Release metapage lock while we access the bitmap page */
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
/* Clear the bitmap bit to indicate that this overflow page is free */
mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE);
mappage = BufferGetPage(mapbuf);
_hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
freep = HashPageGetBitmap(mappage);
Assert(ISSET(freep, bitmapbit));
CLRBIT(freep, bitmapbit);
_hash_wrtbuf(rel, mapbuf);
/* Get write-lock on metapage to update firstfree */
_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
/* if this is now the first free page, update hashm_firstfree */
if (ovflbitno < metap->hashm_firstfree)
{
metap->hashm_firstfree = ovflbitno;
_hash_wrtbuf(rel, metabuf);
_hash_wrtbuf(rel, metabuf);
}
else
{
/* no need to change metapage */
_hash_relbuf(rel, metabuf);
}
return nextblkno;
}
@ -401,9 +486,18 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
HashPageOpaque op;
uint32 *freep;
/* initialize the page */
/*
* It is okay to write-lock the new bitmap page while holding metapage
* write lock, because no one else could be contending for the new page.
*
* There is some loss of concurrency in possibly doing I/O for the new
* page while holding the metapage lock, but this path is taken so
* seldom that it's not worth worrying about.
*/
buf = _hash_getbuf(rel, blkno, HASH_WRITE);
pg = BufferGetPage(buf);
/* initialize the page */
_hash_pageinit(pg, BufferGetPageSize(buf));
op = (HashPageOpaque) PageGetSpecialPointer(pg);
op->hasho_prevblkno = InvalidBlockNumber;
@ -416,7 +510,7 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
freep = HashPageGetBitmap(pg);
MemSet((char *) freep, 0xFF, BMPGSZ_BYTE(metap));
/* write out the new bitmap page (releasing write lock) */
/* write out the new bitmap page (releasing write lock and pin) */
_hash_wrtbuf(rel, buf);
/* add the new bitmap page to the metapage's list of bitmaps */
@ -445,7 +539,14 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
* the write page works forward; the procedure terminates when the
* read page and write page are the same page.
*
* Caller must hold exclusive lock on the target bucket.
* At completion of this procedure, it is guaranteed that all pages in
* the bucket are nonempty, unless the bucket is totally empty (in
* which case all overflow pages will be freed). The original implementation
* required that to be true on entry as well, but it's a lot easier for
* callers to leave empty overflow pages and let this guy clean it up.
*
* Caller must hold exclusive lock on the target bucket. This allows
* us to safely lock multiple pages in the bucket.
*/
void
_hash_squeezebucket(Relation rel,
@ -479,7 +580,7 @@ _hash_squeezebucket(Relation rel,
*/
if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
{
_hash_relbuf(rel, wbuf, HASH_WRITE);
_hash_relbuf(rel, wbuf);
return;
}
@ -492,11 +593,10 @@ _hash_squeezebucket(Relation rel,
{
rblkno = ropaque->hasho_nextblkno;
if (ropaque != wopaque)
_hash_relbuf(rel, rbuf, HASH_WRITE);
_hash_relbuf(rel, rbuf);
rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
rpage = BufferGetPage(rbuf);
_hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
Assert(!PageIsEmpty(rpage));
ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
Assert(ropaque->hasho_bucket == bucket);
} while (BlockNumberIsValid(ropaque->hasho_nextblkno));
@ -507,81 +607,97 @@ _hash_squeezebucket(Relation rel,
roffnum = FirstOffsetNumber;
for (;;)
{
hitem = (HashItem) PageGetItem(rpage, PageGetItemId(rpage, roffnum));
itemsz = IndexTupleDSize(hitem->hash_itup)
+ (sizeof(HashItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
/*
* walk up the bucket chain, looking for a page big enough for
* this item.
*/
while (PageGetFreeSpace(wpage) < itemsz)
/* this test is needed in case page is empty on entry */
if (roffnum <= PageGetMaxOffsetNumber(rpage))
{
wblkno = wopaque->hasho_nextblkno;
hitem = (HashItem) PageGetItem(rpage,
PageGetItemId(rpage, roffnum));
itemsz = IndexTupleDSize(hitem->hash_itup)
+ (sizeof(HashItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
_hash_wrtbuf(rel, wbuf);
if (!BlockNumberIsValid(wblkno) || (rblkno == wblkno))
/*
* Walk up the bucket chain, looking for a page big enough for
* this item. Exit if we reach the read page.
*/
while (PageGetFreeSpace(wpage) < itemsz)
{
_hash_wrtbuf(rel, rbuf);
/* wbuf is already released */
return;
Assert(!PageIsEmpty(wpage));
wblkno = wopaque->hasho_nextblkno;
Assert(BlockNumberIsValid(wblkno));
_hash_wrtbuf(rel, wbuf);
if (rblkno == wblkno)
{
/* wbuf is already released */
_hash_wrtbuf(rel, rbuf);
return;
}
wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
wpage = BufferGetPage(wbuf);
_hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
Assert(wopaque->hasho_bucket == bucket);
}
wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
wpage = BufferGetPage(wbuf);
_hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
Assert(!PageIsEmpty(wpage));
wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
Assert(wopaque->hasho_bucket == bucket);
/*
* we have found room so insert on the "write" page.
*/
woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
== InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(rel));
/*
* delete the tuple from the "read" page. PageIndexTupleDelete
* repacks the ItemId array, so 'roffnum' will be "advanced" to
* the "next" ItemId.
*/
PageIndexTupleDelete(rpage, roffnum);
}
/*
* if we're here, we have found room so insert on the "write"
* page.
* if the "read" page is now empty because of the deletion (or
* because it was empty when we got to it), free it.
*
* Tricky point here: if our read and write pages are adjacent in the
* bucket chain, our write lock on wbuf will conflict with
* _hash_freeovflpage's attempt to update the sibling links of the
* removed page. However, in that case we are done anyway, so we can
* simply drop the write lock before calling _hash_freeovflpage.
*/
woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
== InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(rel));
/*
* delete the tuple from the "read" page. PageIndexTupleDelete
* repacks the ItemId array, so 'roffnum' will be "advanced" to
* the "next" ItemId.
*/
PageIndexTupleDelete(rpage, roffnum);
_hash_wrtnorelbuf(rbuf);
/*
* if the "read" page is now empty because of the deletion, free
* it.
*/
if (PageIsEmpty(rpage) && (ropaque->hasho_flag & LH_OVERFLOW_PAGE))
if (PageIsEmpty(rpage))
{
rblkno = ropaque->hasho_prevblkno;
Assert(BlockNumberIsValid(rblkno));
/* free this overflow page */
_hash_freeovflpage(rel, rbuf);
/* are we freeing the page adjacent to wbuf? */
if (rblkno == wblkno)
{
/* rbuf is already released */
/* yes, so release wbuf lock first */
_hash_wrtbuf(rel, wbuf);
/* free this overflow page (releases rbuf) */
_hash_freeovflpage(rel, rbuf);
/* done */
return;
}
/* free this overflow page, then get the previous one */
_hash_freeovflpage(rel, rbuf);
rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
rpage = BufferGetPage(rbuf);
_hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
Assert(!PageIsEmpty(rpage));
ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
Assert(ropaque->hasho_bucket == bucket);
roffnum = FirstOffsetNumber;
}
}
/* NOTREACHED */
}

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.41 2003/09/02 18:13:31 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.42 2003/09/04 22:06:27 tgl Exp $
*
* NOTES
* Postgres hash pages look like ordinary relation pages. The opaque
@ -26,54 +26,201 @@
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/hash.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
static void _hash_splitbucket(Relation rel, Buffer metabuf,
Bucket obucket, Bucket nbucket,
BlockNumber start_oblkno,
BlockNumber start_nblkno,
uint32 maxbucket,
uint32 highmask, uint32 lowmask);
/*
* We use high-concurrency locking on hash indices. There are two cases in
* which we don't do locking. One is when we're building the index.
* Since the creating transaction has not committed, no one can see
* the index, and there's no reason to share locks. The second case
* is when we're just starting up the database system. We use some
* special-purpose initialization code in the relation cache manager
* (see utils/cache/relcache.c) to allow us to do indexed scans on
* the system catalogs before we'd normally be able to. This happens
* before the lock table is fully initialized, so we can't use it.
* Strictly speaking, this violates 2pl, but we don't do 2pl on the
* system catalogs anyway.
*
* Note that our page locks are actual lockmanager locks, not buffer
* locks (as are used by btree, for example). This is a good idea because
* the algorithms are not deadlock-free, and we'd better be able to detect
* and recover from deadlocks.
*
* Another important difference from btree is that a hash indexscan
* retains both a lock and a buffer pin on the current index page
* between hashgettuple() calls (btree keeps only a buffer pin).
* Because of this, it's safe to do item deletions with only a regular
* write lock on a hash page --- there cannot be an indexscan stopped on
* the page being deleted, other than an indexscan of our own backend,
* which will be taken care of by _hash_adjscans.
* We use high-concurrency locking on hash indexes (see README for an overview
* of the locking rules). There are two cases in which we don't do locking.
* One is when the index is newly created in the current transaction. Since
* the creating transaction has not committed, no one else can see the index,
* and there's no reason to take locks. The second case is for temp
* relations, which no one else can see either. (We still take buffer-level
* locks, but not lmgr locks.)
*/
#define USELOCKING (!BuildingHash && !IsInitProcessingMode())
#define USELOCKING(rel) (!((rel)->rd_isnew || (rel)->rd_istemp))
static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
static void _hash_splitbucket(Relation rel, Buffer metabuf,
Bucket obucket, Bucket nbucket);
/*
* _hash_getlock() -- Acquire an lmgr lock.
*
* 'whichlock' should be zero to acquire the split-control lock, or the
* block number of a bucket's primary bucket page to acquire the per-bucket
* lock. (See README for details of the use of these locks.)
*
* 'access' must be HASH_SHARE or HASH_EXCLUSIVE.
*/
void
_hash_getlock(Relation rel, BlockNumber whichlock, int access)
{
if (USELOCKING(rel))
LockPage(rel, whichlock, access);
}
/*
* _hash_try_getlock() -- Acquire an lmgr lock, but only if it's free.
*
* Same as above except we return FALSE without blocking if lock isn't free.
*/
bool
_hash_try_getlock(Relation rel, BlockNumber whichlock, int access)
{
if (USELOCKING(rel))
return ConditionalLockPage(rel, whichlock, access);
else
return true;
}
/*
* _hash_droplock() -- Release an lmgr lock.
*/
void
_hash_droplock(Relation rel, BlockNumber whichlock, int access)
{
if (USELOCKING(rel))
UnlockPage(rel, whichlock, access);
}
/*
* _hash_getbuf() -- Get a buffer by block number for read or write.
*
* 'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
*
* When this routine returns, the appropriate lock is set on the
* requested buffer and its reference count has been incremented
* (ie, the buffer is "locked and pinned").
*
* XXX P_NEW is not used because, unlike the tree structures, we
* need the bucket blocks to be at certain block numbers. we must
* depend on the caller to call _hash_pageinit on the block if it
* knows that this is a new block.
*/
Buffer
_hash_getbuf(Relation rel, BlockNumber blkno, int access)
{
Buffer buf;
if (blkno == P_NEW)
elog(ERROR, "hash AM does not use P_NEW");
buf = ReadBuffer(rel, blkno);
if (access != HASH_NOLOCK)
LockBuffer(buf, access);
/* ref count and lock type are correct */
return buf;
}
/*
* _hash_relbuf() -- release a locked buffer.
*
* Lock and pin (refcount) are both dropped. Note that either read or
* write lock can be dropped this way, but if we modified the buffer,
* this is NOT the right way to release a write lock.
*/
void
_hash_relbuf(Relation rel, Buffer buf)
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
}
/*
* _hash_dropbuf() -- release an unlocked buffer.
*
* This is used to unpin a buffer on which we hold no lock. It is assumed
* that the buffer is not dirty.
*/
void
_hash_dropbuf(Relation rel, Buffer buf)
{
ReleaseBuffer(buf);
}
/*
* _hash_wrtbuf() -- write a hash page to disk.
*
* This routine releases the lock held on the buffer and our refcount
* for it. It is an error to call _hash_wrtbuf() without a write lock
* and a pin on the buffer.
*
* NOTE: actually, the buffer manager just marks the shared buffer page
* dirty here; the real I/O happens later. This is okay since we are not
* relying on write ordering anyway. The WAL mechanism is responsible for
* guaranteeing correctness after a crash.
*/
void
_hash_wrtbuf(Relation rel, Buffer buf)
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
}
/*
* _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
* our reference or lock.
*
* It is an error to call _hash_wrtnorelbuf() without a write lock
* and a pin on the buffer.
*
* See above NOTE.
*/
void
_hash_wrtnorelbuf(Relation rel, Buffer buf)
{
WriteNoReleaseBuffer(buf);
}
/*
* _hash_chgbufaccess() -- Change the lock type on a buffer, without
* dropping our pin on it.
*
* from_access and to_access may be HASH_READ, HASH_WRITE, or HASH_NOLOCK,
* the last indicating that no buffer-level lock is held or wanted.
*
* When from_access == HASH_WRITE, we assume the buffer is dirty and tell
* bufmgr it must be written out. If the caller wants to release a write
* lock on a page that's not been modified, it's okay to pass from_access
* as HASH_READ (a bit ugly, but handy in some places).
*/
void
_hash_chgbufaccess(Relation rel,
Buffer buf,
int from_access,
int to_access)
{
if (from_access != HASH_NOLOCK)
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (from_access == HASH_WRITE)
WriteNoReleaseBuffer(buf);
if (to_access != HASH_NOLOCK)
LockBuffer(buf, to_access);
}
/*
* _hash_metapinit() -- Initialize the metadata page of a hash index,
* the two buckets that we begin with and the initial
* bitmap page.
*
* We are fairly cavalier about locking here, since we know that no one else
* could be accessing this index. In particular the rule about not holding
* multiple buffer locks is ignored.
*/
void
_hash_metapinit(Relation rel)
@ -83,16 +230,31 @@ _hash_metapinit(Relation rel)
Buffer metabuf;
Buffer buf;
Page pg;
int32 data_width;
int32 item_width;
int32 ffactor;
uint16 i;
/* can't be sharing this with anyone, now... */
if (USELOCKING)
LockRelation(rel, AccessExclusiveLock);
/* safety check */
if (RelationGetNumberOfBlocks(rel) != 0)
elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
RelationGetRelationName(rel));
/*
* Determine the target fill factor (tuples per bucket) for this index.
* The idea is to make the fill factor correspond to pages about 3/4ths
* full. We can compute it exactly if the index datatype is fixed-width,
* but for var-width there's some guessing involved.
*/
data_width = get_typavgwidth(RelationGetDescr(rel)->attrs[0]->atttypid,
RelationGetDescr(rel)->attrs[0]->atttypmod);
item_width = MAXALIGN(sizeof(HashItemData)) + MAXALIGN(data_width) +
sizeof(ItemIdData); /* include the line pointer */
ffactor = (BLCKSZ * 3 / 4) / item_width;
/* keep to a sane range */
if (ffactor < 10)
ffactor = 10;
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
pg = BufferGetPage(metabuf);
_hash_pageinit(pg, BufferGetPageSize(metabuf));
@ -110,7 +272,7 @@ _hash_metapinit(Relation rel)
metap->hashm_version = HASH_VERSION;
metap->hashm_ntuples = 0;
metap->hashm_nmaps = 0;
metap->hashm_ffactor = DEFAULT_FFACTOR;
metap->hashm_ffactor = ffactor;
metap->hashm_bsize = BufferGetPageSize(metabuf);
/* find largest bitmap array size that will fit in page size */
for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
@ -142,7 +304,7 @@ _hash_metapinit(Relation rel)
metap->hashm_firstfree = 0;
/*
* initialize the first two buckets
* Initialize the first two buckets
*/
for (i = 0; i <= 1; i++)
{
@ -159,135 +321,17 @@ _hash_metapinit(Relation rel)
}
/*
* Initialize bitmap page. Can't do this until we
* Initialize first bitmap page. Can't do this until we
* create the first two buckets, else smgr will complain.
*/
_hash_initbitmap(rel, metap, 3);
/* all done */
_hash_wrtbuf(rel, metabuf);
if (USELOCKING)
UnlockRelation(rel, AccessExclusiveLock);
}
/*
* _hash_getbuf() -- Get a buffer by block number for read or write.
*
* When this routine returns, the appropriate lock is set on the
* requested buffer its reference count is correct.
*
* XXX P_NEW is not used because, unlike the tree structures, we
* need the bucket blocks to be at certain block numbers. we must
* depend on the caller to call _hash_pageinit on the block if it
* knows that this is a new block.
*/
Buffer
_hash_getbuf(Relation rel, BlockNumber blkno, int access)
{
Buffer buf;
if (blkno == P_NEW)
elog(ERROR, "hash AM does not use P_NEW");
switch (access)
{
case HASH_WRITE:
case HASH_READ:
_hash_setpagelock(rel, blkno, access);
break;
default:
elog(ERROR, "unrecognized hash access code: %d", access);
break;
}
buf = ReadBuffer(rel, blkno);
/* ref count and lock type are correct */
return buf;
}
/*
* _hash_relbuf() -- release a locked buffer.
*/
void
_hash_relbuf(Relation rel, Buffer buf, int access)
{
BlockNumber blkno;
blkno = BufferGetBlockNumber(buf);
switch (access)
{
case HASH_WRITE:
case HASH_READ:
_hash_unsetpagelock(rel, blkno, access);
break;
default:
elog(ERROR, "unrecognized hash access code: %d", access);
break;
}
ReleaseBuffer(buf);
}
/*
* _hash_wrtbuf() -- write a hash page to disk.
*
* This routine releases the lock held on the buffer and our reference
* to it. It is an error to call _hash_wrtbuf() without a write lock
* or a reference to the buffer.
*/
void
_hash_wrtbuf(Relation rel, Buffer buf)
{
BlockNumber blkno;
blkno = BufferGetBlockNumber(buf);
WriteBuffer(buf);
_hash_unsetpagelock(rel, blkno, HASH_WRITE);
}
/*
* _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
* our reference or lock.
*
* It is an error to call _hash_wrtnorelbuf() without a write lock
* or a reference to the buffer.
*/
void
_hash_wrtnorelbuf(Buffer buf)
{
BlockNumber blkno;
blkno = BufferGetBlockNumber(buf);
WriteNoReleaseBuffer(buf);
}
/*
* _hash_chgbufaccess() -- Change from read to write access or vice versa.
*
* When changing from write to read, we assume the buffer is dirty and tell
* bufmgr it must be written out.
*/
void
_hash_chgbufaccess(Relation rel,
Buffer buf,
int from_access,
int to_access)
{
BlockNumber blkno;
blkno = BufferGetBlockNumber(buf);
if (from_access == HASH_WRITE)
_hash_wrtnorelbuf(buf);
_hash_unsetpagelock(rel, blkno, from_access);
_hash_setpagelock(rel, blkno, to_access);
}
/*
* _hash_pageinit() -- Initialize a new page.
* _hash_pageinit() -- Initialize a new hash index page.
*/
void
_hash_pageinit(Page page, Size size)
@ -297,57 +341,14 @@ _hash_pageinit(Page page, Size size)
}
/*
* _hash_setpagelock() -- Acquire the requested type of lock on a page.
*/
static void
_hash_setpagelock(Relation rel,
BlockNumber blkno,
int access)
{
if (USELOCKING)
{
switch (access)
{
case HASH_WRITE:
LockPage(rel, blkno, ExclusiveLock);
break;
case HASH_READ:
LockPage(rel, blkno, ShareLock);
break;
default:
elog(ERROR, "unrecognized hash access code: %d", access);
break;
}
}
}
/*
* _hash_unsetpagelock() -- Release the specified type of lock on a page.
*/
static void
_hash_unsetpagelock(Relation rel,
BlockNumber blkno,
int access)
{
if (USELOCKING)
{
switch (access)
{
case HASH_WRITE:
UnlockPage(rel, blkno, ExclusiveLock);
break;
case HASH_READ:
UnlockPage(rel, blkno, ShareLock);
break;
default:
elog(ERROR, "unrecognized hash access code: %d", access);
break;
}
}
}
/*
* Expand the hash table by creating one new bucket.
* Attempt to expand the hash table by creating one new bucket.
*
* This will silently do nothing if it cannot get the needed locks.
*
* The caller should hold no locks on the hash index.
*
* The caller must hold a pin, but no lock, on the metapage buffer.
* The buffer is returned in the same state.
*/
void
_hash_expandtable(Relation rel, Buffer metabuf)
@ -356,15 +357,72 @@ _hash_expandtable(Relation rel, Buffer metabuf)
Bucket old_bucket;
Bucket new_bucket;
uint32 spare_ndx;
BlockNumber start_oblkno;
BlockNumber start_nblkno;
uint32 maxbucket;
uint32 highmask;
uint32 lowmask;
/*
* Obtain the page-zero lock to assert the right to begin a split
* (see README).
*
* Note: deadlock should be impossible here. Our own backend could only
* be holding bucket sharelocks due to stopped indexscans; those will not
* block other holders of the page-zero lock, who are only interested in
* acquiring bucket sharelocks themselves. Exclusive bucket locks are
* only taken here and in hashbulkdelete, and neither of these operations
* needs any additional locks to complete. (If, due to some flaw in this
* reasoning, we manage to deadlock anyway, it's okay to error out; the
* index will be left in a consistent state.)
*/
_hash_getlock(rel, 0, HASH_EXCLUSIVE);
/* Write-lock the meta page */
_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
/*
* Check to see if split is still needed; someone else might have already
* done one while we waited for the lock.
*
* Make sure this stays in sync with_hash_doinsert()
*/
if (metap->hashm_ntuples <=
(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1))
goto fail;
new_bucket = ++metap->hashm_maxbucket;
/*
* Determine which bucket is to be split, and attempt to lock the old
* bucket. If we can't get the lock, give up.
*
* The lock protects us against other backends, but not against our own
* backend. Must check for active scans separately.
*
* Ideally we would lock the new bucket too before proceeding, but if
* we are about to cross a splitpoint then the BUCKET_TO_BLKNO mapping
* isn't correct yet. For simplicity we update the metapage first and
* then lock. This should be okay because no one else should be trying
* to lock the new bucket yet...
*/
new_bucket = metap->hashm_maxbucket + 1;
old_bucket = (new_bucket & metap->hashm_lowmask);
start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket);
if (_hash_has_active_scan(rel, old_bucket))
goto fail;
if (!_hash_try_getlock(rel, start_oblkno, HASH_EXCLUSIVE))
goto fail;
/*
* Okay to proceed with split. Update the metapage bucket mapping info.
*/
metap->hashm_maxbucket = new_bucket;
if (new_bucket > metap->hashm_highmask)
{
/* Starting a new doubling */
@ -379,7 +437,7 @@ _hash_expandtable(Relation rel, Buffer metabuf)
* this new batch of bucket pages.
*
* XXX should initialize new bucket pages to prevent out-of-order
* page creation.
* page creation? Don't wanna do it right here though.
*/
spare_ndx = _hash_log2(metap->hashm_maxbucket + 1);
if (spare_ndx > metap->hashm_ovflpoint)
@ -389,10 +447,50 @@ _hash_expandtable(Relation rel, Buffer metabuf)
metap->hashm_ovflpoint = spare_ndx;
}
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
/* now we can compute the new bucket's primary block number */
start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket);
Assert(!_hash_has_active_scan(rel, new_bucket));
if (!_hash_try_getlock(rel, start_nblkno, HASH_EXCLUSIVE))
elog(PANIC, "could not get lock on supposedly new bucket");
/*
* Copy bucket mapping info now; this saves re-accessing the meta page
* inside _hash_splitbucket's inner loop. Note that once we drop the
* split lock, other splits could begin, so these values might be out of
* date before _hash_splitbucket finishes. That's okay, since all it
* needs is to tell which of these two buckets to map hashkeys into.
*/
maxbucket = metap->hashm_maxbucket;
highmask = metap->hashm_highmask;
lowmask = metap->hashm_lowmask;
/* Write out the metapage and drop lock, but keep pin */
_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
/* Release split lock; okay for other splits to occur now */
_hash_droplock(rel, 0, HASH_EXCLUSIVE);
/* Relocate records to the new bucket */
_hash_splitbucket(rel, metabuf, old_bucket, new_bucket);
_hash_splitbucket(rel, metabuf, old_bucket, new_bucket,
start_oblkno, start_nblkno,
maxbucket, highmask, lowmask);
/* Release bucket locks, allowing others to access them */
_hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
_hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
return;
/* Here if decide not to split or fail to acquire old bucket lock */
fail:
/* We didn't write the metapage, so just drop lock */
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
/* Release split lock */
_hash_droplock(rel, 0, HASH_EXCLUSIVE);
}
@ -403,27 +501,35 @@ _hash_expandtable(Relation rel, Buffer metabuf)
* or more overflow (bucket chain) pages. We must relocate tuples that
* belong in the new bucket, and compress out any free space in the old
* bucket.
*
* The caller must hold exclusive locks on both buckets to ensure that
* no one else is trying to access them (see README).
*
* The caller must hold a pin, but no lock, on the metapage buffer.
* The buffer is returned in the same state. (The metapage is only
* touched if it becomes necessary to add or remove overflow pages.)
*/
static void
_hash_splitbucket(Relation rel,
Buffer metabuf,
Bucket obucket,
Bucket nbucket)
Bucket nbucket,
BlockNumber start_oblkno,
BlockNumber start_nblkno,
uint32 maxbucket,
uint32 highmask,
uint32 lowmask)
{
Bucket bucket;
Buffer obuf;
Buffer nbuf;
Buffer ovflbuf;
BlockNumber oblkno;
BlockNumber nblkno;
BlockNumber start_oblkno;
BlockNumber start_nblkno;
bool null;
Datum datum;
HashItem hitem;
HashPageOpaque oopaque;
HashPageOpaque nopaque;
HashMetaPage metap;
IndexTuple itup;
Size itemsz;
OffsetNumber ooffnum;
@ -433,12 +539,11 @@ _hash_splitbucket(Relation rel,
Page npage;
TupleDesc itupdesc = RelationGetDescr(rel);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* get the buffers & pages */
start_oblkno = BUCKET_TO_BLKNO(metap, obucket);
start_nblkno = BUCKET_TO_BLKNO(metap, nbucket);
/*
* It should be okay to simultaneously write-lock pages from each
* bucket, since no one else can be trying to acquire buffer lock
* on pages of either bucket.
*/
oblkno = start_oblkno;
nblkno = start_nblkno;
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
@ -446,7 +551,10 @@ _hash_splitbucket(Relation rel,
opage = BufferGetPage(obuf);
npage = BufferGetPage(nbuf);
/* initialize the new bucket page */
_hash_checkpage(rel, opage, LH_BUCKET_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
/* initialize the new bucket's primary page */
_hash_pageinit(npage, BufferGetPageSize(nbuf));
nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
nopaque->hasho_prevblkno = InvalidBlockNumber;
@ -454,44 +562,11 @@ _hash_splitbucket(Relation rel,
nopaque->hasho_bucket = nbucket;
nopaque->hasho_flag = LH_BUCKET_PAGE;
nopaque->hasho_filler = HASHO_FILL;
_hash_wrtnorelbuf(nbuf);
/*
* make sure the old bucket isn't empty. advance 'opage' and friends
* through the overflow bucket chain until we find a non-empty page.
*
* XXX we should only need this once, if we are careful to preserve the
* invariant that overflow pages are never empty.
*/
_hash_checkpage(rel, opage, LH_BUCKET_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
if (PageIsEmpty(opage))
{
oblkno = oopaque->hasho_nextblkno;
_hash_relbuf(rel, obuf, HASH_WRITE);
if (!BlockNumberIsValid(oblkno))
{
/*
* the old bucket is completely empty; of course, the new
* bucket will be as well, but since it's a base bucket page
* we don't care.
*/
_hash_relbuf(rel, nbuf, HASH_WRITE);
return;
}
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
if (PageIsEmpty(opage))
elog(ERROR, "empty hash overflow page %u", oblkno);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
}
/*
* we are now guaranteed that 'opage' is not empty. partition the
* tuples in the old bucket between the old bucket and the new bucket,
* advancing along their respective overflow bucket chains and adding
* overflow pages as needed.
* Partition the tuples in the old bucket between the old bucket and the
* new bucket, advancing along the old bucket's overflow bucket chain
* and adding overflow pages to the new bucket as needed.
*/
ooffnum = FirstOffsetNumber;
omaxoffnum = PageGetMaxOffsetNumber(opage);
@ -505,48 +580,39 @@ _hash_splitbucket(Relation rel,
/* check if we're at the end of the page */
if (ooffnum > omaxoffnum)
{
/* at end of page, but check for overflow page */
/* at end of page, but check for an(other) overflow page */
oblkno = oopaque->hasho_nextblkno;
if (BlockNumberIsValid(oblkno))
{
/*
* we ran out of tuples on this particular page, but we
* have more overflow pages; re-init values.
*/
_hash_wrtbuf(rel, obuf);
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
/* we're guaranteed that an ovfl page has at least 1 tuple */
if (PageIsEmpty(opage))
elog(ERROR, "empty hash overflow page %u", oblkno);
ooffnum = FirstOffsetNumber;
omaxoffnum = PageGetMaxOffsetNumber(opage);
}
else
{
/*
* We're at the end of the bucket chain, so now we're
* really done with everything. Before quitting, call
* _hash_squeezebucket to ensure the tuples remaining in the
* old bucket (including the overflow pages) are packed as
* tightly as possible. The new bucket is already tight.
*/
_hash_wrtbuf(rel, obuf);
_hash_wrtbuf(rel, nbuf);
_hash_squeezebucket(rel, obucket, start_oblkno);
return;
}
if (!BlockNumberIsValid(oblkno))
break;
/*
* we ran out of tuples on this particular page, but we
* have more overflow pages; advance to next page.
*/
_hash_wrtbuf(rel, obuf);
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
ooffnum = FirstOffsetNumber;
omaxoffnum = PageGetMaxOffsetNumber(opage);
continue;
}
/* hash on the tuple */
/*
* Re-hash the tuple to determine which bucket it now belongs in.
*
* It is annoying to call the hash function while holding locks,
* but releasing and relocking the page for each tuple is unappealing
* too.
*/
hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
itup = &(hitem->hash_itup);
datum = index_getattr(itup, 1, itupdesc, &null);
Assert(!null);
bucket = _hash_call(rel, metap, datum);
bucket = _hash_hashkey2bucket(_hash_datum2hashkey(rel, datum),
maxbucket, highmask, lowmask);
if (bucket == nbucket)
{
@ -562,11 +628,13 @@ _hash_splitbucket(Relation rel,
if (PageGetFreeSpace(npage) < itemsz)
{
ovflbuf = _hash_addovflpage(rel, metabuf, nbuf);
_hash_wrtbuf(rel, nbuf);
nbuf = ovflbuf;
/* write out nbuf and drop lock, but keep pin */
_hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
/* chain to a new overflow page */
nbuf = _hash_addovflpage(rel, metabuf, nbuf);
npage = BufferGetPage(nbuf);
_hash_checkpage(rel, npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, npage, LH_OVERFLOW_PAGE);
/* we don't need nopaque within the loop */
}
noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
@ -574,7 +642,6 @@ _hash_splitbucket(Relation rel,
== InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(rel));
_hash_wrtnorelbuf(nbuf);
/*
* now delete the tuple from the old bucket. after this
@ -586,40 +653,7 @@ _hash_splitbucket(Relation rel,
* instead of calling PageGetMaxOffsetNumber.
*/
PageIndexTupleDelete(opage, ooffnum);
_hash_wrtnorelbuf(obuf);
omaxoffnum = OffsetNumberPrev(omaxoffnum);
/*
* tidy up. if the old page was an overflow page and it is
* now empty, we must free it (we want to preserve the
* invariant that overflow pages cannot be empty).
*/
if (PageIsEmpty(opage) &&
(oopaque->hasho_flag & LH_OVERFLOW_PAGE))
{
oblkno = _hash_freeovflpage(rel, obuf);
/* check that we're not through the bucket chain */
if (!BlockNumberIsValid(oblkno))
{
_hash_wrtbuf(rel, nbuf);
_hash_squeezebucket(rel, obucket, start_oblkno);
return;
}
/*
* re-init. again, we're guaranteed that an ovfl page has
* at least one tuple.
*/
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
if (PageIsEmpty(opage))
elog(ERROR, "empty hash overflow page %u", oblkno);
ooffnum = FirstOffsetNumber;
omaxoffnum = PageGetMaxOffsetNumber(opage);
}
}
else
{
@ -632,5 +666,15 @@ _hash_splitbucket(Relation rel,
ooffnum = OffsetNumberNext(ooffnum);
}
}
/* NOTREACHED */
/*
* We're at the end of the old bucket chain, so we're done partitioning
* the tuples. Before quitting, call _hash_squeezebucket to ensure the
* tuples remaining in the old bucket (including the overflow pages) are
* packed as tightly as possible. The new bucket is already tight.
*/
_hash_wrtbuf(rel, obuf);
_hash_wrtbuf(rel, nbuf);
_hash_squeezebucket(rel, obucket, start_oblkno);
}

View File

@ -8,22 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.30 2003/08/04 02:39:57 momjian Exp $
*
* NOTES
* Because we can be doing an index scan on a relation while we
* update it, we need to avoid missing data that moves around in
* the index. The routines and global variables in this file
* guarantee that all scans in the local address space stay
* correctly positioned. This is all we need to worry about, since
* write locking guarantees that no one else will be on the same
* page at the same time as we are.
*
* The scheme is to manage a list of active scans in the current
* backend. Whenever we add or remove records from an index, we
* check the list of active scans to see if any has been affected.
* A scan is affected only if it is on the same relation, and the
* same page, as the update.
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.31 2003/09/04 22:06:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -44,10 +29,6 @@ typedef HashScanListData *HashScanList;
static HashScanList HashScans = (HashScanList) NULL;
static void _hash_scandel(IndexScanDesc scan,
BlockNumber blkno, OffsetNumber offno);
/*
* AtEOXact_hash() --- clean up hash subsystem at xact abort or commit.
*
@ -67,9 +48,6 @@ AtEOXact_hash(void)
* at end of transaction anyway.
*/
HashScans = NULL;
/* If we were building a hash, we ain't anymore. */
BuildingHash = false;
}
/*
@ -112,70 +90,26 @@ _hash_dropscan(IndexScanDesc scan)
pfree(chk);
}
void
_hash_adjscans(Relation rel, ItemPointer tid)
/*
* Is there an active scan in this bucket?
*/
bool
_hash_has_active_scan(Relation rel, Bucket bucket)
{
Oid relid = RelationGetRelid(rel);
HashScanList l;
Oid relid;
relid = RelationGetRelid(rel);
for (l = HashScans; l != (HashScanList) NULL; l = l->hashsl_next)
for (l = HashScans; l != NULL; l = l->hashsl_next)
{
if (relid == l->hashsl_scan->indexRelation->rd_id)
_hash_scandel(l->hashsl_scan, ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
}
}
static void
_hash_scandel(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno)
{
ItemPointer current;
ItemPointer mark;
Buffer buf;
Buffer metabuf;
HashScanOpaque so;
so = (HashScanOpaque) scan->opaque;
current = &(scan->currentItemData);
mark = &(scan->currentMarkData);
if (ItemPointerIsValid(current)
&& ItemPointerGetBlockNumber(current) == blkno
&& ItemPointerGetOffsetNumber(current) >= offno)
{
metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
buf = so->hashso_curbuf;
_hash_step(scan, &buf, BackwardScanDirection, metabuf);
}
if (ItemPointerIsValid(mark)
&& ItemPointerGetBlockNumber(mark) == blkno
&& ItemPointerGetOffsetNumber(mark) >= offno)
{
/*
* The idea here is to exchange the current and mark positions,
* then step backwards (affecting current), then exchange again.
*/
ItemPointerData tmpitem;
Buffer tmpbuf;
tmpitem = *mark;
*mark = *current;
*current = tmpitem;
tmpbuf = so->hashso_mrkbuf;
so->hashso_mrkbuf = so->hashso_curbuf;
so->hashso_curbuf = tmpbuf;
metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
buf = so->hashso_curbuf;
_hash_step(scan, &buf, BackwardScanDirection, metabuf);
tmpitem = *mark;
*mark = *current;
*current = tmpitem;
tmpbuf = so->hashso_mrkbuf;
so->hashso_mrkbuf = so->hashso_curbuf;
so->hashso_curbuf = tmpbuf;
{
HashScanOpaque so = (HashScanOpaque) l->hashsl_scan->opaque;
if (so->hashso_bucket_valid &&
so->hashso_bucket == bucket)
return true;
}
}
return false;
}

View File

@ -8,55 +8,16 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.33 2003/09/02 18:13:31 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.34 2003/09/04 22:06:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/hash.h"
#include "storage/lmgr.h"
/*
* _hash_search() -- Find the bucket that contains the scankey
* and fetch its primary bucket page into *bufP.
*
* the buffer has a read lock.
*/
void
_hash_search(Relation rel,
int keysz,
ScanKey scankey,
Buffer *bufP,
HashMetaPage metap)
{
BlockNumber blkno;
Bucket bucket;
if (scankey == NULL ||
(scankey[0].sk_flags & SK_ISNULL))
{
/*
* If the scankey is empty, all tuples will satisfy the
* scan so we start the scan at the first bucket (bucket 0).
*
* If the scankey is NULL, no tuples will satisfy the search;
* this should have been checked already, but arbitrarily return
* bucket zero.
*/
bucket = 0;
}
else
{
bucket = _hash_call(rel, metap, scankey[0].sk_argument);
}
blkno = BUCKET_TO_BLKNO(metap, bucket);
*bufP = _hash_getbuf(rel, blkno, HASH_READ);
}
/*
* _hash_next() -- Get the next item in a scan.
*
@ -69,31 +30,23 @@ _hash_search(Relation rel,
bool
_hash_next(IndexScanDesc scan, ScanDirection dir)
{
Relation rel;
Relation rel = scan->indexRelation;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Buffer buf;
Buffer metabuf;
Page page;
OffsetNumber offnum;
ItemPointer current;
HashItem hitem;
IndexTuple itup;
HashScanOpaque so;
rel = scan->indexRelation;
so = (HashScanOpaque) scan->opaque;
/* we still have the buffer pinned and locked */
/* we still have the buffer pinned and read-locked */
buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
/*
* step to next valid tuple. note that _hash_step releases our lock
* on 'metabuf'; if we switch to a new 'buf' while looking for the
* next tuple, we come back with a lock on that buffer.
* step to next valid tuple.
*/
if (!_hash_step(scan, &buf, dir, metabuf))
if (!_hash_step(scan, &buf, dir))
return false;
/* if we're here, _hash_step found a valid tuple */
@ -108,6 +61,9 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
return true;
}
/*
* Advance to next page in a bucket, if any.
*/
static void
_hash_readnext(Relation rel,
Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
@ -115,7 +71,7 @@ _hash_readnext(Relation rel,
BlockNumber blkno;
blkno = (*opaquep)->hasho_nextblkno;
_hash_relbuf(rel, *bufp, HASH_READ);
_hash_relbuf(rel, *bufp);
*bufp = InvalidBuffer;
if (BlockNumberIsValid(blkno))
{
@ -123,10 +79,12 @@ _hash_readnext(Relation rel,
*pagep = BufferGetPage(*bufp);
_hash_checkpage(rel, *pagep, LH_OVERFLOW_PAGE);
*opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
Assert(!PageIsEmpty(*pagep));
}
}
/*
* Advance to previous page in a bucket, if any.
*/
static void
_hash_readprev(Relation rel,
Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
@ -134,7 +92,7 @@ _hash_readprev(Relation rel,
BlockNumber blkno;
blkno = (*opaquep)->hasho_prevblkno;
_hash_relbuf(rel, *bufp, HASH_READ);
_hash_relbuf(rel, *bufp);
*bufp = InvalidBuffer;
if (BlockNumberIsValid(blkno))
{
@ -142,28 +100,26 @@ _hash_readprev(Relation rel,
*pagep = BufferGetPage(*bufp);
_hash_checkpage(rel, *pagep, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
*opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
if (PageIsEmpty(*pagep))
{
Assert((*opaquep)->hasho_flag & LH_BUCKET_PAGE);
_hash_relbuf(rel, *bufp, HASH_READ);
*bufp = InvalidBuffer;
}
}
}
/*
* _hash_first() -- Find the first item in a scan.
*
* Find the first item in the tree that
* Find the first item in the index that
* satisfies the qualification associated with the scan descriptor. On
* exit, the page containing the current index tuple is read locked
* success, the page containing the current index tuple is read locked
* and pinned, and the scan's opaque data entry is updated to
* include the buffer.
*/
bool
_hash_first(IndexScanDesc scan, ScanDirection dir)
{
Relation rel;
Relation rel = scan->indexRelation;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
uint32 hashkey;
Bucket bucket;
BlockNumber blkno;
Buffer buf;
Buffer metabuf;
Page page;
@ -173,70 +129,89 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
IndexTuple itup;
ItemPointer current;
OffsetNumber offnum;
HashScanOpaque so;
rel = scan->indexRelation;
so = (HashScanOpaque) scan->opaque;
current = &(scan->currentItemData);
ItemPointerSetInvalid(current);
/*
* We do not support hash scans with no index qualification, because
* we would have to read the whole index rather than just one bucket.
* That creates a whole raft of problems, since we haven't got a
* practical way to lock all the buckets against splits or compactions.
*/
if (scan->numberOfKeys < 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("hash indexes do not support whole-index scans")));
/*
* If the constant in the index qual is NULL, assume it cannot match
* any items in the index.
*/
if (scan->keyData[0].sk_flags & SK_ISNULL)
return false;
/*
* Okay to compute the hash key. We want to do this before acquiring
* any locks, in case a user-defined hash function happens to be slow.
*/
hashkey = _hash_datum2hashkey(rel, scan->keyData[0].sk_argument);
/*
* Acquire shared split lock so we can compute the target bucket
* safely (see README).
*/
_hash_getlock(rel, 0, HASH_SHARE);
/* Read the metapage */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/*
* XXX -- The attribute number stored in the scan key is the attno in
* the heap relation. We need to transmogrify this into the index
* relation attno here. For the moment, we have hardwired attno == 1.
* Compute the target bucket number, and convert to block number.
*/
bucket = _hash_hashkey2bucket(hashkey,
metap->hashm_maxbucket,
metap->hashm_highmask,
metap->hashm_lowmask);
/* find the correct bucket page and load it into buf */
_hash_search(rel, 1, scan->keyData, &buf, metap);
blkno = BUCKET_TO_BLKNO(metap, bucket);
/* done with the metapage */
_hash_relbuf(rel, metabuf);
/*
* Acquire share lock on target bucket; then we can release split lock.
*/
_hash_getlock(rel, blkno, HASH_SHARE);
_hash_droplock(rel, 0, HASH_SHARE);
/* Update scan opaque state to show we have lock on the bucket */
so->hashso_bucket = bucket;
so->hashso_bucket_valid = true;
so->hashso_bucket_blkno = blkno;
/* Fetch the primary bucket page for the bucket */
buf = _hash_getbuf(rel, blkno, HASH_READ);
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
/*
* if we are scanning forward, we need to find the first non-empty
* page (if any) in the bucket chain. since overflow pages are never
* empty, this had better be either the bucket page or the first
* overflow page.
*
* if we are scanning backward, we always go all the way to the end of
* the bucket chain.
*/
if (PageIsEmpty(page))
{
if (BlockNumberIsValid(opaque->hasho_nextblkno))
_hash_readnext(rel, &buf, &page, &opaque);
else
{
ItemPointerSetInvalid(current);
so->hashso_curbuf = InvalidBuffer;
/*
* If there is no scankeys, all tuples will satisfy the scan -
* so we continue in _hash_step to get tuples from all
* buckets. - vadim 04/29/97
*/
if (scan->numberOfKeys >= 1)
{
_hash_relbuf(rel, buf, HASH_READ);
_hash_relbuf(rel, metabuf, HASH_READ);
return false;
}
}
}
/* If a backwards scan is requested, move to the end of the chain */
if (ScanDirectionIsBackward(dir))
{
while (BlockNumberIsValid(opaque->hasho_nextblkno))
_hash_readnext(rel, &buf, &page, &opaque);
}
if (!_hash_step(scan, &buf, dir, metabuf))
/* Now find the first tuple satisfying the qualification */
if (!_hash_step(scan, &buf, dir))
return false;
/* if we're here, _hash_step found a valid tuple */
current = &(scan->currentItemData);
offnum = ItemPointerGetOffsetNumber(current);
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
@ -254,19 +229,16 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
* false. Else, return true and set the CurrentItemData for the
* scan to the right thing.
*
* 'bufP' points to the buffer which contains the current page
* that we'll step through.
*
* 'metabuf' is released when this returns.
* 'bufP' points to the current buffer, which is pinned and read-locked.
* On success exit, we have pin and read-lock on whichever page
* contains the right item; on failure, we have released all buffers.
*/
bool
_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
{
Relation rel;
Relation rel = scan->indexRelation;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
ItemPointer current;
HashScanOpaque so;
int allbuckets;
HashMetaPage metap;
Buffer buf;
Page page;
HashPageOpaque opaque;
@ -277,18 +249,13 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
HashItem hitem;
IndexTuple itup;
rel = scan->indexRelation;
current = &(scan->currentItemData);
so = (HashScanOpaque) scan->opaque;
allbuckets = (scan->numberOfKeys < 1);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
buf = *bufP;
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
bucket = opaque->hasho_bucket;
/*
* If _hash_step is called from _hash_first, current will not be
@ -309,107 +276,63 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
*/
do
{
bucket = opaque->hasho_bucket;
switch (dir)
{
case ForwardScanDirection:
if (offnum != InvalidOffsetNumber)
{
offnum = OffsetNumberNext(offnum); /* move forward */
}
else
{
offnum = FirstOffsetNumber; /* new page */
}
while (offnum > maxoff)
{
/*--------
/*
* either this page is empty
* (maxoff == InvalidOffsetNumber)
* or we ran off the end.
*--------
*/
_hash_readnext(rel, &buf, &page, &opaque);
if (BufferIsInvalid(buf))
{ /* end of chain */
if (allbuckets && bucket < metap->hashm_maxbucket)
{
++bucket;
blkno = BUCKET_TO_BLKNO(metap, bucket);
buf = _hash_getbuf(rel, blkno, HASH_READ);
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
while (PageIsEmpty(page) &&
BlockNumberIsValid(opaque->hasho_nextblkno))
_hash_readnext(rel, &buf, &page, &opaque);
maxoff = PageGetMaxOffsetNumber(page);
offnum = FirstOffsetNumber;
}
else
{
maxoff = offnum = InvalidOffsetNumber;
break; /* while */
}
}
else
if (BufferIsValid(buf))
{
/* _hash_readnext never returns an empty page */
maxoff = PageGetMaxOffsetNumber(page);
offnum = FirstOffsetNumber;
}
else
{
/* end of bucket */
maxoff = offnum = InvalidOffsetNumber;
break; /* exit while */
}
}
break;
case BackwardScanDirection:
if (offnum != InvalidOffsetNumber)
{
offnum = OffsetNumberPrev(offnum); /* move back */
}
else
{
offnum = maxoff; /* new page */
}
while (offnum < FirstOffsetNumber)
{
/*---------
/*
* either this page is empty
* (offnum == InvalidOffsetNumber)
* or we ran off the end.
*---------
*/
_hash_readprev(rel, &buf, &page, &opaque);
if (BufferIsInvalid(buf))
{ /* end of chain */
if (allbuckets && bucket > 0)
{
--bucket;
blkno = BUCKET_TO_BLKNO(metap, bucket);
buf = _hash_getbuf(rel, blkno, HASH_READ);
page = BufferGetPage(buf);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
while (BlockNumberIsValid(opaque->hasho_nextblkno))
_hash_readnext(rel, &buf, &page, &opaque);
maxoff = offnum = PageGetMaxOffsetNumber(page);
}
else
{
maxoff = offnum = InvalidOffsetNumber;
break; /* while */
}
if (BufferIsValid(buf))
{
maxoff = offnum = PageGetMaxOffsetNumber(page);
}
else
{
/* _hash_readprev never returns an empty page */
maxoff = offnum = PageGetMaxOffsetNumber(page);
/* end of bucket */
maxoff = offnum = InvalidOffsetNumber;
break; /* exit while */
}
}
break;
default:
/* NoMovementScanDirection */
/* this should not be reached */
@ -419,7 +342,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
/* we ran off the end of the world without finding a match */
if (offnum == InvalidOffsetNumber)
{
_hash_relbuf(rel, metabuf, HASH_READ);
*bufP = so->hashso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(current);
return false;
@ -431,7 +353,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
} while (!_hash_checkqual(scan, itup));
/* if we made it to here, we've found a valid tuple */
_hash_relbuf(rel, metabuf, HASH_READ);
blkno = BufferGetBlockNumber(buf);
*bufP = so->hashso_curbuf = buf;
ItemPointerSet(current, blkno, offnum);

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.35 2003/09/02 18:13:31 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.36 2003/09/04 22:06:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -19,46 +19,6 @@
#include "access/iqual.h"
/*
* _hash_mkscankey -- build a scan key matching the given indextuple
*
* Note: this is prepared for multiple index columns, but very little
* else in access/hash is ...
*/
ScanKey
_hash_mkscankey(Relation rel, IndexTuple itup)
{
ScanKey skey;
TupleDesc itupdesc = RelationGetDescr(rel);
int natts = rel->rd_rel->relnatts;
AttrNumber i;
Datum arg;
FmgrInfo *procinfo;
bool isnull;
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
for (i = 0; i < natts; i++)
{
arg = index_getattr(itup, i + 1, itupdesc, &isnull);
procinfo = index_getprocinfo(rel, i + 1, HASHPROC);
ScanKeyEntryInitializeWithInfo(&skey[i],
isnull ? SK_ISNULL : 0x0,
(AttrNumber) (i + 1),
procinfo,
CurrentMemoryContext,
arg);
}
return skey;
}
void
_hash_freeskey(ScanKey skey)
{
pfree(skey);
}
/*
* _hash_checkqual -- does the index tuple satisfy the scan conditions?
*/
@ -102,24 +62,31 @@ _hash_formitem(IndexTuple itup)
}
/*
* _hash_call -- given a Datum, call the index's hash procedure
*
* Returns the bucket number that the hash key maps to.
* _hash_datum2hashkey -- given a Datum, call the index's hash procedure
*/
Bucket
_hash_call(Relation rel, HashMetaPage metap, Datum key)
uint32
_hash_datum2hashkey(Relation rel, Datum key)
{
FmgrInfo *procinfo;
uint32 n;
Bucket bucket;
/* XXX assumes index has only one attribute */
procinfo = index_getprocinfo(rel, 1, HASHPROC);
n = DatumGetUInt32(FunctionCall1(procinfo, key));
bucket = n & metap->hashm_highmask;
if (bucket > metap->hashm_maxbucket)
bucket = bucket & metap->hashm_lowmask;
return DatumGetUInt32(FunctionCall1(procinfo, key));
}
/*
* _hash_hashkey2bucket -- determine which bucket the hashkey maps to.
*/
Bucket
_hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
uint32 highmask, uint32 lowmask)
{
Bucket bucket;
bucket = hashkey & highmask;
if (bucket > maxbucket)
bucket = bucket & lowmask;
return bucket;
}

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.59 2003/08/17 22:41:12 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.60 2003/09/04 22:06:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -153,7 +153,7 @@ LockRelation(Relation relation, LOCKMODE lockmode)
* As above, but only lock if we can get the lock without blocking.
* Returns TRUE iff the lock was acquired.
*
* NOTE: we do not currently need conditional versions of the other
* NOTE: we do not currently need conditional versions of all the
* LockXXX routines in this file, but they could easily be added if needed.
*/
bool
@ -264,6 +264,26 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
elog(ERROR, "LockAcquire failed");
}
/*
* ConditionalLockPage
*
* As above, but only lock if we can get the lock without blocking.
* Returns TRUE iff the lock was acquired.
*/
bool
ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
{
LOCKTAG tag;
MemSet(&tag, 0, sizeof(tag));
tag.relId = relation->rd_lockInfo.lockRelId.relId;
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = blkno;
return LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
lockmode, true);
}
/*
* UnlockPage
*/

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: hash.h,v 1.52 2003/09/02 18:13:32 tgl Exp $
* $Id: hash.h,v 1.53 2003/09/04 22:06:27 tgl Exp $
*
* NOTES
* modeled after Margo Seltzer's hash implementation for unix.
@ -70,13 +70,27 @@ typedef HashPageOpaqueData *HashPageOpaque;
#define HASHO_FILL 0x1234
/*
* ScanOpaqueData is used to remember which buffers we're currently
* examining in the scan. We keep these buffers locked and pinned and
* recorded in the opaque entry of the scan in order to avoid doing a
* ReadBuffer() for every tuple in the index.
* HashScanOpaqueData is private state for a hash index scan.
*/
typedef struct HashScanOpaqueData
{
/*
* By definition, a hash scan should be examining only one bucket.
* We record the bucket number here as soon as it is known.
*/
Bucket hashso_bucket;
bool hashso_bucket_valid;
/*
* If we have a share lock on the bucket, we record it here. When
* hashso_bucket_blkno is zero, we have no such lock.
*/
BlockNumber hashso_bucket_blkno;
/*
* We also want to remember which buffers we're currently examining in the
* scan. We keep these buffers pinned (but not locked) across hashgettuple
* calls, in order to avoid doing a ReadBuffer() for every tuple in the
* index.
*/
Buffer hashso_curbuf;
Buffer hashso_mrkbuf;
} HashScanOpaqueData;
@ -148,10 +162,18 @@ typedef struct HashItemData
typedef HashItemData *HashItem;
/*
* Maximum size of a hash index item (it's okay to have only one per page)
*/
#define HashMaxItemSize(page) \
(PageGetPageSize(page) - \
sizeof(PageHeaderData) - \
MAXALIGN(sizeof(HashPageOpaqueData)) - \
sizeof(ItemIdData))
/*
* Constants
*/
#define DEFAULT_FFACTOR 300
#define BYTE_TO_BIT 3 /* 2^3 bits/byte */
#define ALL_SET ((uint32) ~0)
@ -180,10 +202,14 @@ typedef HashItemData *HashItem;
#define ISSET(A, N) ((A)[(N)/BITS_PER_MAP] & (1<<((N)%BITS_PER_MAP)))
/*
* page locking modes
* page-level and high-level locking modes (see README)
*/
#define HASH_READ 0
#define HASH_WRITE 1
#define HASH_READ BUFFER_LOCK_SHARE
#define HASH_WRITE BUFFER_LOCK_EXCLUSIVE
#define HASH_NOLOCK (-1)
#define HASH_SHARE ShareLock
#define HASH_EXCLUSIVE ExclusiveLock
/*
* Strategy number. There's only one valid strategy for hashing: equality.
@ -199,8 +225,6 @@ typedef HashItemData *HashItem;
#define HASHPROC 1
extern bool BuildingHash;
/* public routines */
extern Datum hashbuild(PG_FUNCTION_ARGS);
@ -250,36 +274,37 @@ extern void _hash_squeezebucket(Relation rel,
Bucket bucket, BlockNumber bucket_blkno);
/* hashpage.c */
extern void _hash_metapinit(Relation rel);
extern void _hash_getlock(Relation rel, BlockNumber whichlock, int access);
extern bool _hash_try_getlock(Relation rel, BlockNumber whichlock, int access);
extern void _hash_droplock(Relation rel, BlockNumber whichlock, int access);
extern Buffer _hash_getbuf(Relation rel, BlockNumber blkno, int access);
extern void _hash_relbuf(Relation rel, Buffer buf, int access);
extern void _hash_relbuf(Relation rel, Buffer buf);
extern void _hash_dropbuf(Relation rel, Buffer buf);
extern void _hash_wrtbuf(Relation rel, Buffer buf);
extern void _hash_wrtnorelbuf(Buffer buf);
extern void _hash_wrtnorelbuf(Relation rel, Buffer buf);
extern void _hash_chgbufaccess(Relation rel, Buffer buf, int from_access,
int to_access);
extern void _hash_metapinit(Relation rel);
extern void _hash_pageinit(Page page, Size size);
extern void _hash_expandtable(Relation rel, Buffer metabuf);
/* hashscan.c */
extern void _hash_regscan(IndexScanDesc scan);
extern void _hash_dropscan(IndexScanDesc scan);
extern void _hash_adjscans(Relation rel, ItemPointer tid);
extern bool _hash_has_active_scan(Relation rel, Bucket bucket);
extern void AtEOXact_hash(void);
/* hashsearch.c */
extern void _hash_search(Relation rel, int keysz, ScanKey scankey,
Buffer *bufP, HashMetaPage metap);
extern bool _hash_next(IndexScanDesc scan, ScanDirection dir);
extern bool _hash_first(IndexScanDesc scan, ScanDirection dir);
extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir,
Buffer metabuf);
extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir);
/* hashutil.c */
extern ScanKey _hash_mkscankey(Relation rel, IndexTuple itup);
extern void _hash_freeskey(ScanKey skey);
extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
extern HashItem _hash_formitem(IndexTuple itup);
extern Bucket _hash_call(Relation rel, HashMetaPage metap, Datum key);
extern uint32 _hash_datum2hashkey(Relation rel, Datum key);
extern Bucket _hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
uint32 highmask, uint32 lowmask);
extern uint32 _hash_log2(uint32 num);
extern void _hash_checkpage(Relation rel, Page page, int flags);

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: lmgr.h,v 1.39 2003/08/04 02:40:14 momjian Exp $
* $Id: lmgr.h,v 1.40 2003/09/04 22:06:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -54,8 +54,9 @@ extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
extern void UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
/* Lock a page (mainly used for indices) */
/* Lock a page (mainly used for indexes) */
extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
extern bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
extern void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
/* Lock an XID (used to wait for a transaction to finish) */