diff --git a/src/backend/access/hash/README b/src/backend/access/hash/README index 118d434879..ce195eae2c 100644 --- a/src/backend/access/hash/README +++ b/src/backend/access/hash/README @@ -1,4 +1,4 @@ -$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.2 2003/09/02 03:29:01 tgl Exp $ +$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.3 2003/09/04 22:06:27 tgl Exp $ This directory contains an implementation of hash indexing for Postgres. @@ -229,8 +229,8 @@ existing bucket in two, thereby lowering the fill ratio: check split still needed if split not needed anymore, drop locks and exit decide which bucket to split - Attempt to X-lock new bucket number (shouldn't fail, but...) Attempt to X-lock old bucket number (definitely could fail) + Attempt to X-lock new bucket number (shouldn't fail, but...) if above fail, drop locks and exit update meta page to reflect new number of buckets write/release meta page @@ -261,12 +261,6 @@ not be overfull and split attempts will stop. (We could make a successful splitter loop to see if the index is still overfull, but it seems better to distribute the split overhead across successive insertions.) -It may be wise to make the initial exclusive-lock-page-zero operation a -conditional one as well, although the odds of a deadlock failure are quite -low. (AFAICS it could only deadlock against a VACUUM operation that is -trying to X-lock a bucket that the current process has a stopped indexscan -in.) - A problem is that if a split fails partway through (eg due to insufficient disk space) the index is left corrupt. The probability of that could be made quite low if we grab a free page or two before we update the meta diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 7e30754c88..190c95e2c8 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.67 2003/09/02 18:13:29 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.68 2003/09/04 22:06:27 tgl Exp $ * * NOTES * This file contains only the public interface routines. @@ -27,9 +27,6 @@ #include "miscadmin.h" -bool BuildingHash = false; - - /* Working state for hashbuild and its callback */ typedef struct { @@ -61,9 +58,6 @@ hashbuild(PG_FUNCTION_ARGS) double reltuples; HashBuildState buildstate; - /* set flag to disable locking */ - BuildingHash = true; - /* * We expect to be called exactly once for any index relation. If * that's not the case, big trouble's what we have. @@ -82,9 +76,6 @@ hashbuild(PG_FUNCTION_ARGS) reltuples = IndexBuildHeapScan(heap, index, indexInfo, hashbuildCallback, (void *) &buildstate); - /* all done */ - BuildingHash = false; - /* * Since we just counted the tuples in the heap, we update its stats * in pg_class to guarantee that the planner takes advantage of the @@ -212,10 +203,18 @@ hashgettuple(PG_FUNCTION_ARGS) IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); HashScanOpaque so = (HashScanOpaque) scan->opaque; + Relation rel = scan->indexRelation; Page page; OffsetNumber offnum; bool res; + /* + * We hold pin but not lock on current buffer while outside the hash AM. + * Reacquire the read lock here. + */ + if (BufferIsValid(so->hashso_curbuf)) + _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ); + /* * If we've already initialized this scan, we can just advance it in * the appropriate direction. If we haven't done so yet, we call a @@ -267,6 +266,10 @@ hashgettuple(PG_FUNCTION_ARGS) } } + /* Release read lock on current buffer, but keep it pinned */ + if (BufferIsValid(so->hashso_curbuf)) + _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK); + PG_RETURN_BOOL(res); } @@ -285,6 +288,8 @@ hashbeginscan(PG_FUNCTION_ARGS) scan = RelationGetIndexScan(rel, keysz, scankey); so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData)); + so->hashso_bucket_valid = false; + so->hashso_bucket_blkno = 0; so->hashso_curbuf = so->hashso_mrkbuf = InvalidBuffer; scan->opaque = so; @@ -303,28 +308,38 @@ hashrescan(PG_FUNCTION_ARGS) IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1); HashScanOpaque so = (HashScanOpaque) scan->opaque; - ItemPointer iptr; + Relation rel = scan->indexRelation; - /* we hold a read lock on the current page in the scan */ - if (ItemPointerIsValid(iptr = &(scan->currentItemData))) + /* if we are called from beginscan, so is still NULL */ + if (so) { - _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ); + /* release any pins we still hold */ + if (BufferIsValid(so->hashso_curbuf)) + _hash_dropbuf(rel, so->hashso_curbuf); so->hashso_curbuf = InvalidBuffer; - ItemPointerSetInvalid(iptr); - } - if (ItemPointerIsValid(iptr = &(scan->currentMarkData))) - { - _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ); + + if (BufferIsValid(so->hashso_mrkbuf)) + _hash_dropbuf(rel, so->hashso_mrkbuf); so->hashso_mrkbuf = InvalidBuffer; - ItemPointerSetInvalid(iptr); + + /* release lock on bucket, too */ + if (so->hashso_bucket_blkno) + _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE); + so->hashso_bucket_blkno = 0; } + /* set positions invalid (this will cause _hash_first call) */ + ItemPointerSetInvalid(&(scan->currentItemData)); + ItemPointerSetInvalid(&(scan->currentMarkData)); + /* Update scan key, if a new one is given */ if (scankey && scan->numberOfKeys > 0) { memmove(scan->keyData, scankey, scan->numberOfKeys * sizeof(ScanKeyData)); + if (so) + so->hashso_bucket_valid = false; } PG_RETURN_VOID(); @@ -337,32 +352,32 @@ Datum hashendscan(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); - ItemPointer iptr; - HashScanOpaque so; - - so = (HashScanOpaque) scan->opaque; - - /* release any locks we still hold */ - if (ItemPointerIsValid(iptr = &(scan->currentItemData))) - { - _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ); - so->hashso_curbuf = InvalidBuffer; - ItemPointerSetInvalid(iptr); - } - - if (ItemPointerIsValid(iptr = &(scan->currentMarkData))) - { - if (BufferIsValid(so->hashso_mrkbuf)) - _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ); - so->hashso_mrkbuf = InvalidBuffer; - ItemPointerSetInvalid(iptr); - } + HashScanOpaque so = (HashScanOpaque) scan->opaque; + Relation rel = scan->indexRelation; /* don't need scan registered anymore */ _hash_dropscan(scan); + /* release any pins we still hold */ + if (BufferIsValid(so->hashso_curbuf)) + _hash_dropbuf(rel, so->hashso_curbuf); + so->hashso_curbuf = InvalidBuffer; + + if (BufferIsValid(so->hashso_mrkbuf)) + _hash_dropbuf(rel, so->hashso_mrkbuf); + so->hashso_mrkbuf = InvalidBuffer; + + /* release lock on bucket, too */ + if (so->hashso_bucket_blkno) + _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE); + so->hashso_bucket_blkno = 0; + /* be tidy */ - pfree(scan->opaque); + ItemPointerSetInvalid(&(scan->currentItemData)); + ItemPointerSetInvalid(&(scan->currentMarkData)); + + pfree(so); + scan->opaque = NULL; PG_RETURN_VOID(); } @@ -374,25 +389,21 @@ Datum hashmarkpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); - ItemPointer iptr; - HashScanOpaque so; + HashScanOpaque so = (HashScanOpaque) scan->opaque; + Relation rel = scan->indexRelation; - so = (HashScanOpaque) scan->opaque; + /* release pin on old marked data, if any */ + if (BufferIsValid(so->hashso_mrkbuf)) + _hash_dropbuf(rel, so->hashso_mrkbuf); + so->hashso_mrkbuf = InvalidBuffer; + ItemPointerSetInvalid(&(scan->currentMarkData)); - /* release lock on old marked data, if any */ - if (ItemPointerIsValid(iptr = &(scan->currentMarkData))) - { - _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ); - so->hashso_mrkbuf = InvalidBuffer; - ItemPointerSetInvalid(iptr); - } - - /* bump lock on currentItemData and copy to currentMarkData */ + /* bump pin count on currentItemData and copy to currentMarkData */ if (ItemPointerIsValid(&(scan->currentItemData))) { - so->hashso_mrkbuf = _hash_getbuf(scan->indexRelation, + so->hashso_mrkbuf = _hash_getbuf(rel, BufferGetBlockNumber(so->hashso_curbuf), - HASH_READ); + HASH_NOLOCK); scan->currentMarkData = scan->currentItemData; } @@ -406,26 +417,21 @@ Datum hashrestrpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); - ItemPointer iptr; - HashScanOpaque so; + HashScanOpaque so = (HashScanOpaque) scan->opaque; + Relation rel = scan->indexRelation; - so = (HashScanOpaque) scan->opaque; + /* release pin on current data, if any */ + if (BufferIsValid(so->hashso_curbuf)) + _hash_dropbuf(rel, so->hashso_curbuf); + so->hashso_curbuf = InvalidBuffer; + ItemPointerSetInvalid(&(scan->currentItemData)); - /* release lock on current data, if any */ - if (ItemPointerIsValid(iptr = &(scan->currentItemData))) - { - _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ); - so->hashso_curbuf = InvalidBuffer; - ItemPointerSetInvalid(iptr); - } - - /* bump lock on currentMarkData and copy to currentItemData */ + /* bump pin count on currentMarkData and copy to currentItemData */ if (ItemPointerIsValid(&(scan->currentMarkData))) { - so->hashso_curbuf = _hash_getbuf(scan->indexRelation, + so->hashso_curbuf = _hash_getbuf(rel, BufferGetBlockNumber(so->hashso_mrkbuf), - HASH_READ); - + HASH_NOLOCK); scan->currentItemData = scan->currentMarkData; } @@ -474,7 +480,7 @@ hashbulkdelete(PG_FUNCTION_ARGS) orig_maxbucket = metap->hashm_maxbucket; orig_ntuples = metap->hashm_ntuples; memcpy(&local_metapage, metap, sizeof(local_metapage)); - _hash_relbuf(rel, metabuf, HASH_READ); + _hash_relbuf(rel, metabuf); /* Scan the buckets that we know exist */ cur_bucket = 0; @@ -490,7 +496,12 @@ loop_top: /* Get address of bucket's start page */ bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket); - /* XXX lock bucket here */ + /* Exclusive-lock the bucket so we can shrink it */ + _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE); + + /* Shouldn't have any active scans locally, either */ + if (_hash_has_active_scan(rel, cur_bucket)) + elog(ERROR, "hash index has active scan during VACUUM"); /* Scan each page in bucket */ blkno = bucket_blkno; @@ -522,13 +533,6 @@ loop_top: htup = &(hitem->hash_itup.t_tid); if (callback(htup, callback_state)) { - ItemPointerData indextup; - - /* adjust any active scans that will be affected */ - /* (this should be unnecessary) */ - ItemPointerSet(&indextup, blkno, offno); - _hash_adjscans(rel, &indextup); - /* delete the item from the page */ PageIndexTupleDelete(page, offno); bucket_dirty = page_dirty = true; @@ -547,24 +551,22 @@ loop_top: } /* - * Write or free page if needed, advance to next page. We want - * to preserve the invariant that overflow pages are nonempty. + * Write page if needed, advance to next page. */ blkno = opaque->hasho_nextblkno; - if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE)) - _hash_freeovflpage(rel, buf); - else if (page_dirty) + if (page_dirty) _hash_wrtbuf(rel, buf); else - _hash_relbuf(rel, buf, HASH_WRITE); + _hash_relbuf(rel, buf); } /* If we deleted anything, try to compact free space */ if (bucket_dirty) _hash_squeezebucket(rel, cur_bucket, bucket_blkno); - /* XXX unlock bucket here */ + /* Release bucket lock */ + _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE); /* Advance to next bucket */ cur_bucket++; @@ -580,7 +582,7 @@ loop_top: /* There's been a split, so process the additional bucket(s) */ cur_maxbucket = metap->hashm_maxbucket; memcpy(&local_metapage, metap, sizeof(local_metapage)); - _hash_relbuf(rel, metabuf, HASH_WRITE); + _hash_relbuf(rel, metabuf); goto loop_top; } diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index 20cdcabfaa..00b3d60b28 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.29 2003/09/02 18:13:30 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.30 2003/09/04 22:06:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -16,136 +16,124 @@ #include "postgres.h" #include "access/hash.h" +#include "storage/lmgr.h" + + +static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, + Size itemsize, HashItem hitem); -static InsertIndexResult _hash_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, HashItem hitem, Buffer metabuf); -static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, HashItem hitem); /* * _hash_doinsert() -- Handle insertion of a single HashItem in the table. * * This routine is called by the public interface routines, hashbuild - * and hashinsert. By here, hashitem is filled in, and has a unique - * (xid, seqno) pair. The datum to be used as a "key" is in the - * hashitem. + * and hashinsert. By here, hashitem is completely filled in. + * The datum to be used as a "key" is in the hashitem. */ InsertIndexResult _hash_doinsert(Relation rel, HashItem hitem) { Buffer buf; Buffer metabuf; - BlockNumber blkno; HashMetaPage metap; IndexTuple itup; + BlockNumber itup_blkno; + OffsetNumber itup_off; InsertIndexResult res; - ScanKey itup_scankey; - int natts; + BlockNumber blkno; Page page; + HashPageOpaque pageopaque; + Size itemsz; + bool do_expand; + uint32 hashkey; + Bucket bucket; + Datum datum; + bool isnull; + /* + * Compute the hash key for the item. We do this first so as not to + * need to hold any locks while running the hash function. + */ + itup = &(hitem->hash_itup); + if (rel->rd_rel->relnatts != 1) + elog(ERROR, "hash indexes support only one index key"); + datum = index_getattr(itup, 1, RelationGetDescr(rel), &isnull); + Assert(!isnull); + hashkey = _hash_datum2hashkey(rel, datum); + + /* compute item size too */ + itemsz = IndexTupleDSize(hitem->hash_itup) + + (sizeof(HashItemData) - sizeof(IndexTupleData)); + + itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but + * we need to be consistent */ + + /* + * Acquire shared split lock so we can compute the target bucket + * safely (see README). + */ + _hash_getlock(rel, 0, HASH_SHARE); + + /* Read the metapage */ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ); metap = (HashMetaPage) BufferGetPage(metabuf); _hash_checkpage(rel, (Page) metap, LH_META_PAGE); - /* we need a scan key to do our search, so build one */ - itup = &(hitem->hash_itup); - if ((natts = rel->rd_rel->relnatts) != 1) - elog(ERROR, "Hash indexes support only one index key"); - itup_scankey = _hash_mkscankey(rel, itup); + /* + * Check whether the item can fit on a hash page at all. (Eventually, + * we ought to try to apply TOAST methods if not.) Note that at this + * point, itemsz doesn't include the ItemId. + */ + if (itemsz > HashMaxItemSize((Page) metap)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index tuple size %lu exceeds hash maximum, %lu", + (unsigned long) itemsz, + (unsigned long) HashMaxItemSize((Page) metap)))); /* - * find the first page in the bucket chain containing this key and - * place it in buf. _hash_search obtains a read lock for us. + * Compute the target bucket number, and convert to block number. */ - _hash_search(rel, natts, itup_scankey, &buf, metap); + bucket = _hash_hashkey2bucket(hashkey, + metap->hashm_maxbucket, + metap->hashm_highmask, + metap->hashm_lowmask); + + blkno = BUCKET_TO_BLKNO(metap, bucket); + + /* release lock on metapage, but keep pin since we'll need it again */ + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK); + + /* + * Acquire share lock on target bucket; then we can release split lock. + */ + _hash_getlock(rel, blkno, HASH_SHARE); + + _hash_droplock(rel, 0, HASH_SHARE); + + /* Fetch the primary bucket page for the bucket */ + buf = _hash_getbuf(rel, blkno, HASH_WRITE); page = BufferGetPage(buf); _hash_checkpage(rel, page, LH_BUCKET_PAGE); - - /* - * trade in our read lock for a write lock so that we can do the - * insertion. - */ - blkno = BufferGetBlockNumber(buf); - _hash_relbuf(rel, buf, HASH_READ); - buf = _hash_getbuf(rel, blkno, HASH_WRITE); - - - /* - * XXX btree comment (haven't decided what to do in hash): don't think - * the bucket can be split while we're reading the metapage. - * - * If the page was split between the time that we surrendered our read - * lock and acquired our write lock, then this page may no longer be - * the right place for the key we want to insert. - */ - - /* do the insertion */ - res = _hash_insertonpg(rel, buf, natts, itup_scankey, - hitem, metabuf); - - /* be tidy */ - _hash_freeskey(itup_scankey); - - return res; -} - -/* - * _hash_insertonpg() -- Insert a tuple on a particular page in the table. - * - * This recursive procedure does the following things: - * - * + if necessary, splits the target page. - * + inserts the tuple. - * - * On entry, we must have the right buffer on which to do the - * insertion, and the buffer must be pinned and locked. On return, - * we will have dropped both the pin and the write lock on the buffer. - * - */ -static InsertIndexResult -_hash_insertonpg(Relation rel, - Buffer buf, - int keysz, - ScanKey scankey, - HashItem hitem, - Buffer metabuf) -{ - InsertIndexResult res; - Page page; - BlockNumber itup_blkno; - OffsetNumber itup_off; - Size itemsz; - HashPageOpaque pageopaque; - bool do_expand = false; - Buffer ovflbuf; - HashMetaPage metap; - Bucket bucket; - - metap = (HashMetaPage) BufferGetPage(metabuf); - _hash_checkpage(rel, (Page) metap, LH_META_PAGE); - - page = BufferGetPage(buf); - _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); - bucket = pageopaque->hasho_bucket; - - itemsz = IndexTupleDSize(hitem->hash_itup) - + (sizeof(HashItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); + Assert(pageopaque->hasho_bucket == bucket); + /* Do the insertion */ while (PageGetFreeSpace(page) < itemsz) { /* * no space on this page; check for an overflow page */ - if (BlockNumberIsValid(pageopaque->hasho_nextblkno)) + BlockNumber nextblkno = pageopaque->hasho_nextblkno; + + if (BlockNumberIsValid(nextblkno)) { /* * ovfl page exists; go get it. if it doesn't have room, * we'll find out next pass through the loop test above. */ - ovflbuf = _hash_getbuf(rel, pageopaque->hasho_nextblkno, - HASH_WRITE); - _hash_relbuf(rel, buf, HASH_WRITE); - buf = ovflbuf; + _hash_relbuf(rel, buf); + buf = _hash_getbuf(rel, nextblkno, HASH_WRITE); page = BufferGetPage(buf); } else @@ -154,65 +142,72 @@ _hash_insertonpg(Relation rel, * we're at the end of the bucket chain and we haven't found a * page with enough room. allocate a new overflow page. */ - do_expand = true; - ovflbuf = _hash_addovflpage(rel, metabuf, buf); - _hash_relbuf(rel, buf, HASH_WRITE); - buf = ovflbuf; + + /* release our write lock without modifying buffer */ + _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK); + + /* chain to a new overflow page */ + buf = _hash_addovflpage(rel, metabuf, buf); page = BufferGetPage(buf); - if (PageGetFreeSpace(page) < itemsz) - { - /* it doesn't fit on an empty page -- give up */ - elog(ERROR, "hash item too large"); - } + /* should fit now, given test above */ + Assert(PageGetFreeSpace(page) >= itemsz); } _hash_checkpage(rel, page, LH_OVERFLOW_PAGE); pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); Assert(pageopaque->hasho_bucket == bucket); } - itup_off = _hash_pgaddtup(rel, buf, keysz, scankey, itemsz, hitem); + /* found page with enough space, so add the item here */ + itup_off = _hash_pgaddtup(rel, buf, itemsz, hitem); itup_blkno = BufferGetBlockNumber(buf); - /* by here, the new tuple is inserted */ + /* write and release the modified page */ + _hash_wrtbuf(rel, buf); + + /* We can drop the bucket lock now */ + _hash_droplock(rel, blkno, HASH_SHARE); + + /* + * Write-lock the metapage so we can increment the tuple count. + * After incrementing it, check to see if it's time for a split. + */ + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE); + + metap->hashm_ntuples += 1; + + /* Make sure this stays in sync with _hash_expandtable() */ + do_expand = metap->hashm_ntuples > + (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1); + + /* Write out the metapage and drop lock, but keep pin */ + _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK); + + /* Attempt to split if a split is needed */ + if (do_expand) + _hash_expandtable(rel, metabuf); + + /* Finally drop our pin on the metapage */ + _hash_dropbuf(rel, metabuf); + + /* Create the return data structure */ res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); - if (res != NULL) - { - /* - * Increment the number of keys in the table. We switch lock - * access type just for a moment to allow greater accessibility to - * the metapage. - */ - _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE); - metap->hashm_ntuples += 1; - _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ); - } - - _hash_wrtbuf(rel, buf); - - if (do_expand || - (metap->hashm_ntuples / (metap->hashm_maxbucket + 1)) - > (double) metap->hashm_ffactor) - _hash_expandtable(rel, metabuf); - _hash_relbuf(rel, metabuf, HASH_READ); return res; } /* * _hash_pgaddtup() -- add a tuple to a particular page in the index. * - * This routine adds the tuple to the page as requested, and keeps the - * write lock and reference associated with the page's buffer. It is - * an error to call pgaddtup() without a write lock and reference. + * This routine adds the tuple to the page as requested; it does + * not write out the page. It is an error to call pgaddtup() without + * a write lock and pin. */ static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, - int keysz, - ScanKey itup_scankey, Size itemsize, HashItem hitem) { @@ -228,8 +223,5 @@ _hash_pgaddtup(Relation rel, elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(rel)); - /* write the buffer, but hold our lock */ - _hash_wrtnorelbuf(buf); - return itup_off; } diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c index 388a711832..fe5e5e9595 100644 --- a/src/backend/access/hash/hashovfl.c +++ b/src/backend/access/hash/hashovfl.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.40 2003/09/02 18:13:30 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.41 2003/09/04 22:06:27 tgl Exp $ * * NOTES * Overflow pages look like ordinary relation pages. @@ -77,39 +77,68 @@ blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno) /* * _hash_addovflpage * - * Add an overflow page to the page currently pointed to by the buffer - * argument 'buf'. + * Add an overflow page to the bucket whose last page is pointed to by 'buf'. * - * metabuf has a read lock upon entering the function; buf has a - * write lock. The same is true on exit. The returned overflow page - * is write-locked. + * On entry, the caller must hold a pin but no lock on 'buf'. The pin is + * dropped before exiting (we assume the caller is not interested in 'buf' + * anymore). The returned overflow page will be pinned and write-locked; + * it is guaranteed to be empty. + * + * The caller must hold a pin, but no lock, on the metapage buffer. + * That buffer is returned in the same state. + * + * The caller must hold at least share lock on the bucket, to ensure that + * no one else tries to compact the bucket meanwhile. This guarantees that + * 'buf' won't stop being part of the bucket while it's unlocked. + * + * NB: since this could be executed concurrently by multiple processes, + * one should not assume that the returned overflow page will be the + * immediate successor of the originally passed 'buf'. Additional overflow + * pages might have been added to the bucket chain in between. */ Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf) { BlockNumber ovflblkno; Buffer ovflbuf; - HashMetaPage metap; - HashPageOpaque ovflopaque; - HashPageOpaque pageopaque; Page page; Page ovflpage; - - /* this had better be the last page in a bucket chain */ - page = BufferGetPage(buf); - _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); - pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); - Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno)); - - metap = (HashMetaPage) BufferGetPage(metabuf); - _hash_checkpage(rel, (Page) metap, LH_META_PAGE); + HashPageOpaque pageopaque; + HashPageOpaque ovflopaque; /* allocate an empty overflow page */ ovflblkno = _hash_getovflpage(rel, metabuf); + + /* lock the overflow page */ ovflbuf = _hash_getbuf(rel, ovflblkno, HASH_WRITE); ovflpage = BufferGetPage(ovflbuf); - /* initialize the new overflow page */ + /* + * Write-lock the tail page. It is okay to hold two buffer locks here + * since there cannot be anyone else contending for access to ovflbuf. + */ + _hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_WRITE); + + /* loop to find current tail page, in case someone else inserted too */ + for (;;) + { + BlockNumber nextblkno; + + page = BufferGetPage(buf); + _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); + pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); + nextblkno = pageopaque->hasho_nextblkno; + + if (!BlockNumberIsValid(nextblkno)) + break; + + /* we assume we do not need to write the unmodified page */ + _hash_relbuf(rel, buf); + + buf = _hash_getbuf(rel, nextblkno, HASH_WRITE); + } + + /* now that we have correct backlink, initialize new overflow page */ _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf)); ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage); ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf); @@ -117,11 +146,12 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf) ovflopaque->hasho_bucket = pageopaque->hasho_bucket; ovflopaque->hasho_flag = LH_OVERFLOW_PAGE; ovflopaque->hasho_filler = HASHO_FILL; - _hash_wrtnorelbuf(ovflbuf); + _hash_wrtnorelbuf(rel, ovflbuf); /* logically chain overflow page to previous page */ pageopaque->hasho_nextblkno = ovflblkno; - _hash_wrtnorelbuf(buf); + _hash_wrtbuf(rel, buf); + return ovflbuf; } @@ -130,9 +160,8 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf) * * Find an available overflow page and return its block number. * - * When we enter this function, we have a read lock on metabuf which - * we change to a write lock immediately. Before exiting, the write lock - * is exchanged for a read lock. + * The caller must hold a pin, but no lock, on the metapage buffer. + * The buffer is returned in the same state. */ static BlockNumber _hash_getovflpage(Relation rel, Buffer metabuf) @@ -140,6 +169,7 @@ _hash_getovflpage(Relation rel, Buffer metabuf) HashMetaPage metap; Buffer mapbuf = 0; BlockNumber blkno; + uint32 orig_firstfree; uint32 splitnum; uint32 *freep = NULL; uint32 max_ovflpg; @@ -150,51 +180,66 @@ _hash_getovflpage(Relation rel, Buffer metabuf) uint32 i, j; - _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE); - metap = (HashMetaPage) BufferGetPage(metabuf); - splitnum = metap->hashm_ovflpoint; + /* Get exclusive lock on the meta page */ + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE); - /* end search with the last existing overflow page */ - max_ovflpg = metap->hashm_spares[splitnum] - 1; - last_page = max_ovflpg >> BMPG_SHIFT(metap); - last_bit = max_ovflpg & BMPG_MASK(metap); + metap = (HashMetaPage) BufferGetPage(metabuf); + _hash_checkpage(rel, (Page) metap, LH_META_PAGE); /* start search at hashm_firstfree */ - first_page = metap->hashm_firstfree >> BMPG_SHIFT(metap); - bit = metap->hashm_firstfree & BMPG_MASK(metap); + orig_firstfree = metap->hashm_firstfree; + first_page = orig_firstfree >> BMPG_SHIFT(metap); + bit = orig_firstfree & BMPG_MASK(metap); + i = first_page; j = bit / BITS_PER_MAP; bit &= ~(BITS_PER_MAP - 1); - for (i = first_page; i <= last_page; i++) + /* outer loop iterates once per bitmap page */ + for (;;) { BlockNumber mapblkno; Page mappage; uint32 last_inpage; - mapblkno = metap->hashm_mapp[i]; - mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE); - mappage = BufferGetPage(mapbuf); - _hash_checkpage(rel, mappage, LH_BITMAP_PAGE); - freep = HashPageGetBitmap(mappage); + /* want to end search with the last existing overflow page */ + splitnum = metap->hashm_ovflpoint; + max_ovflpg = metap->hashm_spares[splitnum] - 1; + last_page = max_ovflpg >> BMPG_SHIFT(metap); + last_bit = max_ovflpg & BMPG_MASK(metap); - if (i != first_page) - { - bit = 0; - j = 0; - } + if (i > last_page) + break; + + Assert(i < metap->hashm_nmaps); + mapblkno = metap->hashm_mapp[i]; if (i == last_page) last_inpage = last_bit; else last_inpage = BMPGSZ_BIT(metap) - 1; + /* Release exclusive lock on metapage while reading bitmap page */ + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK); + + mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE); + mappage = BufferGetPage(mapbuf); + _hash_checkpage(rel, mappage, LH_BITMAP_PAGE); + freep = HashPageGetBitmap(mappage); + for (; bit <= last_inpage; j++, bit += BITS_PER_MAP) { if (freep[j] != ALL_SET) goto found; } - _hash_relbuf(rel, mapbuf, HASH_WRITE); + /* No free space here, try to advance to next map page */ + _hash_relbuf(rel, mapbuf); + i++; + j = 0; /* scan from start of next map page */ + bit = 0; + + /* Reacquire exclusive lock on the meta page */ + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE); } /* No Free Page Found - have to allocate a new page */ @@ -225,13 +270,19 @@ _hash_getovflpage(Relation rel, Buffer metabuf) */ } - /* mark new page as first free so we don't search much next time */ - metap->hashm_firstfree = bit; - /* Calculate address of the new overflow page */ blkno = bitno_to_blkno(metap, bit); - _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ); + /* + * Adjust hashm_firstfree to avoid redundant searches. But don't + * risk changing it if someone moved it while we were searching + * bitmap pages. + */ + if (metap->hashm_firstfree == orig_firstfree) + metap->hashm_firstfree = bit + 1; + + /* Write updated metapage and release lock, but not pin */ + _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK); return blkno; @@ -239,20 +290,36 @@ found: /* convert bit to bit number within page */ bit += _hash_firstfreebit(freep[j]); - /* mark page "in use" */ + /* mark page "in use" in the bitmap */ SETBIT(freep, bit); _hash_wrtbuf(rel, mapbuf); + /* Reacquire exclusive lock on the meta page */ + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE); + /* convert bit to absolute bit number */ bit += (i << BMPG_SHIFT(metap)); - /* adjust hashm_firstfree to avoid redundant searches */ - if (bit > metap->hashm_firstfree) - metap->hashm_firstfree = bit; - + /* Calculate address of the new overflow page */ blkno = bitno_to_blkno(metap, bit); - _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ); + /* + * Adjust hashm_firstfree to avoid redundant searches. But don't + * risk changing it if someone moved it while we were searching + * bitmap pages. + */ + if (metap->hashm_firstfree == orig_firstfree) + { + metap->hashm_firstfree = bit + 1; + + /* Write updated metapage and release lock, but not pin */ + _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK); + } + else + { + /* We didn't change the metapage, so no need to write */ + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK); + } return blkno; } @@ -275,7 +342,10 @@ _hash_firstfreebit(uint32 map) return i; mask <<= 1; } - return i; + + elog(ERROR, "firstfreebit found no free bit"); + + return 0; /* keep compiler quiet */ } /* @@ -287,7 +357,9 @@ _hash_firstfreebit(uint32 map) * Returns the block number of the page that followed the given page * in the bucket, or InvalidBlockNumber if no following page. * - * NB: caller must not hold lock on metapage. + * NB: caller must not hold lock on metapage, nor on either page that's + * adjacent in the bucket chain. The caller had better hold exclusive lock + * on the bucket, too. */ BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf) @@ -308,10 +380,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf) bitmapbit; Bucket bucket; - metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE); - metap = (HashMetaPage) BufferGetPage(metabuf); - _hash_checkpage(rel, (Page) metap, LH_META_PAGE); - + /* Get information from the doomed page */ ovflblkno = BufferGetBlockNumber(ovflbuf); ovflpage = BufferGetPage(ovflbuf); _hash_checkpage(rel, ovflpage, LH_OVERFLOW_PAGE); @@ -319,17 +388,16 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf) nextblkno = ovflopaque->hasho_nextblkno; prevblkno = ovflopaque->hasho_prevblkno; bucket = ovflopaque->hasho_bucket; + + /* Zero the page for debugging's sake; then write and release it */ MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf)); _hash_wrtbuf(rel, ovflbuf); /* - * fix up the bucket chain. this is a doubly-linked list, so we must + * Fix up the bucket chain. this is a doubly-linked list, so we must * fix up the bucket chain members behind and ahead of the overflow - * page being deleted. - * - * XXX this should look like: - lock prev/next - modify/write prev/next - * (how to do write ordering with a doubly-linked list?) - unlock - * prev/next + * page being deleted. No concurrency issues since we hold exclusive + * lock on the entire bucket. */ if (BlockNumberIsValid(prevblkno)) { @@ -354,9 +422,12 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf) _hash_wrtbuf(rel, nextbuf); } - /* - * Clear the bitmap bit to indicate that this overflow page is free. - */ + /* Read the metapage so we can determine which bitmap page to use */ + metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ); + metap = (HashMetaPage) BufferGetPage(metabuf); + _hash_checkpage(rel, (Page) metap, LH_META_PAGE); + + /* Identify which bit to set */ ovflbitno = blkno_to_bitno(metap, ovflblkno); bitmappage = ovflbitno >> BMPG_SHIFT(metap); @@ -366,18 +437,32 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf) elog(ERROR, "invalid overflow bit number %u", ovflbitno); blkno = metap->hashm_mapp[bitmappage]; + /* Release metapage lock while we access the bitmap page */ + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK); + + /* Clear the bitmap bit to indicate that this overflow page is free */ mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE); mappage = BufferGetPage(mapbuf); _hash_checkpage(rel, mappage, LH_BITMAP_PAGE); freep = HashPageGetBitmap(mappage); + Assert(ISSET(freep, bitmapbit)); CLRBIT(freep, bitmapbit); _hash_wrtbuf(rel, mapbuf); + /* Get write-lock on metapage to update firstfree */ + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE); + /* if this is now the first free page, update hashm_firstfree */ if (ovflbitno < metap->hashm_firstfree) + { metap->hashm_firstfree = ovflbitno; - - _hash_wrtbuf(rel, metabuf); + _hash_wrtbuf(rel, metabuf); + } + else + { + /* no need to change metapage */ + _hash_relbuf(rel, metabuf); + } return nextblkno; } @@ -401,9 +486,18 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno) HashPageOpaque op; uint32 *freep; - /* initialize the page */ + /* + * It is okay to write-lock the new bitmap page while holding metapage + * write lock, because no one else could be contending for the new page. + * + * There is some loss of concurrency in possibly doing I/O for the new + * page while holding the metapage lock, but this path is taken so + * seldom that it's not worth worrying about. + */ buf = _hash_getbuf(rel, blkno, HASH_WRITE); pg = BufferGetPage(buf); + + /* initialize the page */ _hash_pageinit(pg, BufferGetPageSize(buf)); op = (HashPageOpaque) PageGetSpecialPointer(pg); op->hasho_prevblkno = InvalidBlockNumber; @@ -416,7 +510,7 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno) freep = HashPageGetBitmap(pg); MemSet((char *) freep, 0xFF, BMPGSZ_BYTE(metap)); - /* write out the new bitmap page (releasing write lock) */ + /* write out the new bitmap page (releasing write lock and pin) */ _hash_wrtbuf(rel, buf); /* add the new bitmap page to the metapage's list of bitmaps */ @@ -445,7 +539,14 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno) * the write page works forward; the procedure terminates when the * read page and write page are the same page. * - * Caller must hold exclusive lock on the target bucket. + * At completion of this procedure, it is guaranteed that all pages in + * the bucket are nonempty, unless the bucket is totally empty (in + * which case all overflow pages will be freed). The original implementation + * required that to be true on entry as well, but it's a lot easier for + * callers to leave empty overflow pages and let this guy clean it up. + * + * Caller must hold exclusive lock on the target bucket. This allows + * us to safely lock multiple pages in the bucket. */ void _hash_squeezebucket(Relation rel, @@ -479,7 +580,7 @@ _hash_squeezebucket(Relation rel, */ if (!BlockNumberIsValid(wopaque->hasho_nextblkno)) { - _hash_relbuf(rel, wbuf, HASH_WRITE); + _hash_relbuf(rel, wbuf); return; } @@ -492,11 +593,10 @@ _hash_squeezebucket(Relation rel, { rblkno = ropaque->hasho_nextblkno; if (ropaque != wopaque) - _hash_relbuf(rel, rbuf, HASH_WRITE); + _hash_relbuf(rel, rbuf); rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE); rpage = BufferGetPage(rbuf); _hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE); - Assert(!PageIsEmpty(rpage)); ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage); Assert(ropaque->hasho_bucket == bucket); } while (BlockNumberIsValid(ropaque->hasho_nextblkno)); @@ -507,81 +607,97 @@ _hash_squeezebucket(Relation rel, roffnum = FirstOffsetNumber; for (;;) { - hitem = (HashItem) PageGetItem(rpage, PageGetItemId(rpage, roffnum)); - itemsz = IndexTupleDSize(hitem->hash_itup) - + (sizeof(HashItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - - /* - * walk up the bucket chain, looking for a page big enough for - * this item. - */ - while (PageGetFreeSpace(wpage) < itemsz) + /* this test is needed in case page is empty on entry */ + if (roffnum <= PageGetMaxOffsetNumber(rpage)) { - wblkno = wopaque->hasho_nextblkno; + hitem = (HashItem) PageGetItem(rpage, + PageGetItemId(rpage, roffnum)); + itemsz = IndexTupleDSize(hitem->hash_itup) + + (sizeof(HashItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); - _hash_wrtbuf(rel, wbuf); - - if (!BlockNumberIsValid(wblkno) || (rblkno == wblkno)) + /* + * Walk up the bucket chain, looking for a page big enough for + * this item. Exit if we reach the read page. + */ + while (PageGetFreeSpace(wpage) < itemsz) { - _hash_wrtbuf(rel, rbuf); - /* wbuf is already released */ - return; + Assert(!PageIsEmpty(wpage)); + + wblkno = wopaque->hasho_nextblkno; + Assert(BlockNumberIsValid(wblkno)); + + _hash_wrtbuf(rel, wbuf); + + if (rblkno == wblkno) + { + /* wbuf is already released */ + _hash_wrtbuf(rel, rbuf); + return; + } + + wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE); + wpage = BufferGetPage(wbuf); + _hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE); + wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage); + Assert(wopaque->hasho_bucket == bucket); } - wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE); - wpage = BufferGetPage(wbuf); - _hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE); - Assert(!PageIsEmpty(wpage)); - wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage); - Assert(wopaque->hasho_bucket == bucket); + /* + * we have found room so insert on the "write" page. + */ + woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage)); + if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED) + == InvalidOffsetNumber) + elog(ERROR, "failed to add index item to \"%s\"", + RelationGetRelationName(rel)); + + /* + * delete the tuple from the "read" page. PageIndexTupleDelete + * repacks the ItemId array, so 'roffnum' will be "advanced" to + * the "next" ItemId. + */ + PageIndexTupleDelete(rpage, roffnum); } /* - * if we're here, we have found room so insert on the "write" - * page. + * if the "read" page is now empty because of the deletion (or + * because it was empty when we got to it), free it. + * + * Tricky point here: if our read and write pages are adjacent in the + * bucket chain, our write lock on wbuf will conflict with + * _hash_freeovflpage's attempt to update the sibling links of the + * removed page. However, in that case we are done anyway, so we can + * simply drop the write lock before calling _hash_freeovflpage. */ - woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage)); - if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED) - == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(rel)); - - /* - * delete the tuple from the "read" page. PageIndexTupleDelete - * repacks the ItemId array, so 'roffnum' will be "advanced" to - * the "next" ItemId. - */ - PageIndexTupleDelete(rpage, roffnum); - _hash_wrtnorelbuf(rbuf); - - /* - * if the "read" page is now empty because of the deletion, free - * it. - */ - if (PageIsEmpty(rpage) && (ropaque->hasho_flag & LH_OVERFLOW_PAGE)) + if (PageIsEmpty(rpage)) { rblkno = ropaque->hasho_prevblkno; Assert(BlockNumberIsValid(rblkno)); - /* free this overflow page */ - _hash_freeovflpage(rel, rbuf); - + /* are we freeing the page adjacent to wbuf? */ if (rblkno == wblkno) { - /* rbuf is already released */ + /* yes, so release wbuf lock first */ _hash_wrtbuf(rel, wbuf); + /* free this overflow page (releases rbuf) */ + _hash_freeovflpage(rel, rbuf); + /* done */ return; } + /* free this overflow page, then get the previous one */ + _hash_freeovflpage(rel, rbuf); + rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE); rpage = BufferGetPage(rbuf); _hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE); - Assert(!PageIsEmpty(rpage)); ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage); Assert(ropaque->hasho_bucket == bucket); roffnum = FirstOffsetNumber; } } + + /* NOTREACHED */ } diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c index 1c16df33cd..5b9d19acf1 100644 --- a/src/backend/access/hash/hashpage.c +++ b/src/backend/access/hash/hashpage.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.41 2003/09/02 18:13:31 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.42 2003/09/04 22:06:27 tgl Exp $ * * NOTES * Postgres hash pages look like ordinary relation pages. The opaque @@ -26,54 +26,201 @@ * *------------------------------------------------------------------------- */ - #include "postgres.h" #include "access/genam.h" #include "access/hash.h" -#include "miscadmin.h" #include "storage/lmgr.h" +#include "utils/lsyscache.h" + + +static void _hash_splitbucket(Relation rel, Buffer metabuf, + Bucket obucket, Bucket nbucket, + BlockNumber start_oblkno, + BlockNumber start_nblkno, + uint32 maxbucket, + uint32 highmask, uint32 lowmask); /* - * We use high-concurrency locking on hash indices. There are two cases in - * which we don't do locking. One is when we're building the index. - * Since the creating transaction has not committed, no one can see - * the index, and there's no reason to share locks. The second case - * is when we're just starting up the database system. We use some - * special-purpose initialization code in the relation cache manager - * (see utils/cache/relcache.c) to allow us to do indexed scans on - * the system catalogs before we'd normally be able to. This happens - * before the lock table is fully initialized, so we can't use it. - * Strictly speaking, this violates 2pl, but we don't do 2pl on the - * system catalogs anyway. - * - * Note that our page locks are actual lockmanager locks, not buffer - * locks (as are used by btree, for example). This is a good idea because - * the algorithms are not deadlock-free, and we'd better be able to detect - * and recover from deadlocks. - * - * Another important difference from btree is that a hash indexscan - * retains both a lock and a buffer pin on the current index page - * between hashgettuple() calls (btree keeps only a buffer pin). - * Because of this, it's safe to do item deletions with only a regular - * write lock on a hash page --- there cannot be an indexscan stopped on - * the page being deleted, other than an indexscan of our own backend, - * which will be taken care of by _hash_adjscans. + * We use high-concurrency locking on hash indexes (see README for an overview + * of the locking rules). There are two cases in which we don't do locking. + * One is when the index is newly created in the current transaction. Since + * the creating transaction has not committed, no one else can see the index, + * and there's no reason to take locks. The second case is for temp + * relations, which no one else can see either. (We still take buffer-level + * locks, but not lmgr locks.) */ -#define USELOCKING (!BuildingHash && !IsInitProcessingMode()) +#define USELOCKING(rel) (!((rel)->rd_isnew || (rel)->rd_istemp)) -static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access); -static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access); -static void _hash_splitbucket(Relation rel, Buffer metabuf, - Bucket obucket, Bucket nbucket); +/* + * _hash_getlock() -- Acquire an lmgr lock. + * + * 'whichlock' should be zero to acquire the split-control lock, or the + * block number of a bucket's primary bucket page to acquire the per-bucket + * lock. (See README for details of the use of these locks.) + * + * 'access' must be HASH_SHARE or HASH_EXCLUSIVE. + */ +void +_hash_getlock(Relation rel, BlockNumber whichlock, int access) +{ + if (USELOCKING(rel)) + LockPage(rel, whichlock, access); +} + +/* + * _hash_try_getlock() -- Acquire an lmgr lock, but only if it's free. + * + * Same as above except we return FALSE without blocking if lock isn't free. + */ +bool +_hash_try_getlock(Relation rel, BlockNumber whichlock, int access) +{ + if (USELOCKING(rel)) + return ConditionalLockPage(rel, whichlock, access); + else + return true; +} + +/* + * _hash_droplock() -- Release an lmgr lock. + */ +void +_hash_droplock(Relation rel, BlockNumber whichlock, int access) +{ + if (USELOCKING(rel)) + UnlockPage(rel, whichlock, access); +} + +/* + * _hash_getbuf() -- Get a buffer by block number for read or write. + * + * 'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK. + * + * When this routine returns, the appropriate lock is set on the + * requested buffer and its reference count has been incremented + * (ie, the buffer is "locked and pinned"). + * + * XXX P_NEW is not used because, unlike the tree structures, we + * need the bucket blocks to be at certain block numbers. we must + * depend on the caller to call _hash_pageinit on the block if it + * knows that this is a new block. + */ +Buffer +_hash_getbuf(Relation rel, BlockNumber blkno, int access) +{ + Buffer buf; + + if (blkno == P_NEW) + elog(ERROR, "hash AM does not use P_NEW"); + + buf = ReadBuffer(rel, blkno); + + if (access != HASH_NOLOCK) + LockBuffer(buf, access); + + /* ref count and lock type are correct */ + return buf; +} + +/* + * _hash_relbuf() -- release a locked buffer. + * + * Lock and pin (refcount) are both dropped. Note that either read or + * write lock can be dropped this way, but if we modified the buffer, + * this is NOT the right way to release a write lock. + */ +void +_hash_relbuf(Relation rel, Buffer buf) +{ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buf); +} + +/* + * _hash_dropbuf() -- release an unlocked buffer. + * + * This is used to unpin a buffer on which we hold no lock. It is assumed + * that the buffer is not dirty. + */ +void +_hash_dropbuf(Relation rel, Buffer buf) +{ + ReleaseBuffer(buf); +} + +/* + * _hash_wrtbuf() -- write a hash page to disk. + * + * This routine releases the lock held on the buffer and our refcount + * for it. It is an error to call _hash_wrtbuf() without a write lock + * and a pin on the buffer. + * + * NOTE: actually, the buffer manager just marks the shared buffer page + * dirty here; the real I/O happens later. This is okay since we are not + * relying on write ordering anyway. The WAL mechanism is responsible for + * guaranteeing correctness after a crash. + */ +void +_hash_wrtbuf(Relation rel, Buffer buf) +{ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + WriteBuffer(buf); +} + +/* + * _hash_wrtnorelbuf() -- write a hash page to disk, but do not release + * our reference or lock. + * + * It is an error to call _hash_wrtnorelbuf() without a write lock + * and a pin on the buffer. + * + * See above NOTE. + */ +void +_hash_wrtnorelbuf(Relation rel, Buffer buf) +{ + WriteNoReleaseBuffer(buf); +} + +/* + * _hash_chgbufaccess() -- Change the lock type on a buffer, without + * dropping our pin on it. + * + * from_access and to_access may be HASH_READ, HASH_WRITE, or HASH_NOLOCK, + * the last indicating that no buffer-level lock is held or wanted. + * + * When from_access == HASH_WRITE, we assume the buffer is dirty and tell + * bufmgr it must be written out. If the caller wants to release a write + * lock on a page that's not been modified, it's okay to pass from_access + * as HASH_READ (a bit ugly, but handy in some places). + */ +void +_hash_chgbufaccess(Relation rel, + Buffer buf, + int from_access, + int to_access) +{ + if (from_access != HASH_NOLOCK) + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + if (from_access == HASH_WRITE) + WriteNoReleaseBuffer(buf); + + if (to_access != HASH_NOLOCK) + LockBuffer(buf, to_access); +} /* * _hash_metapinit() -- Initialize the metadata page of a hash index, * the two buckets that we begin with and the initial * bitmap page. + * + * We are fairly cavalier about locking here, since we know that no one else + * could be accessing this index. In particular the rule about not holding + * multiple buffer locks is ignored. */ void _hash_metapinit(Relation rel) @@ -83,16 +230,31 @@ _hash_metapinit(Relation rel) Buffer metabuf; Buffer buf; Page pg; + int32 data_width; + int32 item_width; + int32 ffactor; uint16 i; - /* can't be sharing this with anyone, now... */ - if (USELOCKING) - LockRelation(rel, AccessExclusiveLock); - + /* safety check */ if (RelationGetNumberOfBlocks(rel) != 0) elog(ERROR, "cannot initialize non-empty hash index \"%s\"", RelationGetRelationName(rel)); + /* + * Determine the target fill factor (tuples per bucket) for this index. + * The idea is to make the fill factor correspond to pages about 3/4ths + * full. We can compute it exactly if the index datatype is fixed-width, + * but for var-width there's some guessing involved. + */ + data_width = get_typavgwidth(RelationGetDescr(rel)->attrs[0]->atttypid, + RelationGetDescr(rel)->attrs[0]->atttypmod); + item_width = MAXALIGN(sizeof(HashItemData)) + MAXALIGN(data_width) + + sizeof(ItemIdData); /* include the line pointer */ + ffactor = (BLCKSZ * 3 / 4) / item_width; + /* keep to a sane range */ + if (ffactor < 10) + ffactor = 10; + metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE); pg = BufferGetPage(metabuf); _hash_pageinit(pg, BufferGetPageSize(metabuf)); @@ -110,7 +272,7 @@ _hash_metapinit(Relation rel) metap->hashm_version = HASH_VERSION; metap->hashm_ntuples = 0; metap->hashm_nmaps = 0; - metap->hashm_ffactor = DEFAULT_FFACTOR; + metap->hashm_ffactor = ffactor; metap->hashm_bsize = BufferGetPageSize(metabuf); /* find largest bitmap array size that will fit in page size */ for (i = _hash_log2(metap->hashm_bsize); i > 0; --i) @@ -142,7 +304,7 @@ _hash_metapinit(Relation rel) metap->hashm_firstfree = 0; /* - * initialize the first two buckets + * Initialize the first two buckets */ for (i = 0; i <= 1; i++) { @@ -159,135 +321,17 @@ _hash_metapinit(Relation rel) } /* - * Initialize bitmap page. Can't do this until we + * Initialize first bitmap page. Can't do this until we * create the first two buckets, else smgr will complain. */ _hash_initbitmap(rel, metap, 3); /* all done */ _hash_wrtbuf(rel, metabuf); - - if (USELOCKING) - UnlockRelation(rel, AccessExclusiveLock); } /* - * _hash_getbuf() -- Get a buffer by block number for read or write. - * - * When this routine returns, the appropriate lock is set on the - * requested buffer its reference count is correct. - * - * XXX P_NEW is not used because, unlike the tree structures, we - * need the bucket blocks to be at certain block numbers. we must - * depend on the caller to call _hash_pageinit on the block if it - * knows that this is a new block. - */ -Buffer -_hash_getbuf(Relation rel, BlockNumber blkno, int access) -{ - Buffer buf; - - if (blkno == P_NEW) - elog(ERROR, "hash AM does not use P_NEW"); - switch (access) - { - case HASH_WRITE: - case HASH_READ: - _hash_setpagelock(rel, blkno, access); - break; - default: - elog(ERROR, "unrecognized hash access code: %d", access); - break; - } - buf = ReadBuffer(rel, blkno); - - /* ref count and lock type are correct */ - return buf; -} - -/* - * _hash_relbuf() -- release a locked buffer. - */ -void -_hash_relbuf(Relation rel, Buffer buf, int access) -{ - BlockNumber blkno; - - blkno = BufferGetBlockNumber(buf); - - switch (access) - { - case HASH_WRITE: - case HASH_READ: - _hash_unsetpagelock(rel, blkno, access); - break; - default: - elog(ERROR, "unrecognized hash access code: %d", access); - break; - } - - ReleaseBuffer(buf); -} - -/* - * _hash_wrtbuf() -- write a hash page to disk. - * - * This routine releases the lock held on the buffer and our reference - * to it. It is an error to call _hash_wrtbuf() without a write lock - * or a reference to the buffer. - */ -void -_hash_wrtbuf(Relation rel, Buffer buf) -{ - BlockNumber blkno; - - blkno = BufferGetBlockNumber(buf); - WriteBuffer(buf); - _hash_unsetpagelock(rel, blkno, HASH_WRITE); -} - -/* - * _hash_wrtnorelbuf() -- write a hash page to disk, but do not release - * our reference or lock. - * - * It is an error to call _hash_wrtnorelbuf() without a write lock - * or a reference to the buffer. - */ -void -_hash_wrtnorelbuf(Buffer buf) -{ - BlockNumber blkno; - - blkno = BufferGetBlockNumber(buf); - WriteNoReleaseBuffer(buf); -} - -/* - * _hash_chgbufaccess() -- Change from read to write access or vice versa. - * - * When changing from write to read, we assume the buffer is dirty and tell - * bufmgr it must be written out. - */ -void -_hash_chgbufaccess(Relation rel, - Buffer buf, - int from_access, - int to_access) -{ - BlockNumber blkno; - - blkno = BufferGetBlockNumber(buf); - - if (from_access == HASH_WRITE) - _hash_wrtnorelbuf(buf); - - _hash_unsetpagelock(rel, blkno, from_access); - - _hash_setpagelock(rel, blkno, to_access); -} - -/* - * _hash_pageinit() -- Initialize a new page. + * _hash_pageinit() -- Initialize a new hash index page. */ void _hash_pageinit(Page page, Size size) @@ -297,57 +341,14 @@ _hash_pageinit(Page page, Size size) } /* - * _hash_setpagelock() -- Acquire the requested type of lock on a page. - */ -static void -_hash_setpagelock(Relation rel, - BlockNumber blkno, - int access) -{ - if (USELOCKING) - { - switch (access) - { - case HASH_WRITE: - LockPage(rel, blkno, ExclusiveLock); - break; - case HASH_READ: - LockPage(rel, blkno, ShareLock); - break; - default: - elog(ERROR, "unrecognized hash access code: %d", access); - break; - } - } -} - -/* - * _hash_unsetpagelock() -- Release the specified type of lock on a page. - */ -static void -_hash_unsetpagelock(Relation rel, - BlockNumber blkno, - int access) -{ - if (USELOCKING) - { - switch (access) - { - case HASH_WRITE: - UnlockPage(rel, blkno, ExclusiveLock); - break; - case HASH_READ: - UnlockPage(rel, blkno, ShareLock); - break; - default: - elog(ERROR, "unrecognized hash access code: %d", access); - break; - } - } -} - -/* - * Expand the hash table by creating one new bucket. + * Attempt to expand the hash table by creating one new bucket. + * + * This will silently do nothing if it cannot get the needed locks. + * + * The caller should hold no locks on the hash index. + * + * The caller must hold a pin, but no lock, on the metapage buffer. + * The buffer is returned in the same state. */ void _hash_expandtable(Relation rel, Buffer metabuf) @@ -356,15 +357,72 @@ _hash_expandtable(Relation rel, Buffer metabuf) Bucket old_bucket; Bucket new_bucket; uint32 spare_ndx; + BlockNumber start_oblkno; + BlockNumber start_nblkno; + uint32 maxbucket; + uint32 highmask; + uint32 lowmask; + + /* + * Obtain the page-zero lock to assert the right to begin a split + * (see README). + * + * Note: deadlock should be impossible here. Our own backend could only + * be holding bucket sharelocks due to stopped indexscans; those will not + * block other holders of the page-zero lock, who are only interested in + * acquiring bucket sharelocks themselves. Exclusive bucket locks are + * only taken here and in hashbulkdelete, and neither of these operations + * needs any additional locks to complete. (If, due to some flaw in this + * reasoning, we manage to deadlock anyway, it's okay to error out; the + * index will be left in a consistent state.) + */ + _hash_getlock(rel, 0, HASH_EXCLUSIVE); + + /* Write-lock the meta page */ + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE); metap = (HashMetaPage) BufferGetPage(metabuf); _hash_checkpage(rel, (Page) metap, LH_META_PAGE); - _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE); + /* + * Check to see if split is still needed; someone else might have already + * done one while we waited for the lock. + * + * Make sure this stays in sync with_hash_doinsert() + */ + if (metap->hashm_ntuples <= + (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1)) + goto fail; - new_bucket = ++metap->hashm_maxbucket; + /* + * Determine which bucket is to be split, and attempt to lock the old + * bucket. If we can't get the lock, give up. + * + * The lock protects us against other backends, but not against our own + * backend. Must check for active scans separately. + * + * Ideally we would lock the new bucket too before proceeding, but if + * we are about to cross a splitpoint then the BUCKET_TO_BLKNO mapping + * isn't correct yet. For simplicity we update the metapage first and + * then lock. This should be okay because no one else should be trying + * to lock the new bucket yet... + */ + new_bucket = metap->hashm_maxbucket + 1; old_bucket = (new_bucket & metap->hashm_lowmask); + start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket); + + if (_hash_has_active_scan(rel, old_bucket)) + goto fail; + + if (!_hash_try_getlock(rel, start_oblkno, HASH_EXCLUSIVE)) + goto fail; + + /* + * Okay to proceed with split. Update the metapage bucket mapping info. + */ + metap->hashm_maxbucket = new_bucket; + if (new_bucket > metap->hashm_highmask) { /* Starting a new doubling */ @@ -379,7 +437,7 @@ _hash_expandtable(Relation rel, Buffer metabuf) * this new batch of bucket pages. * * XXX should initialize new bucket pages to prevent out-of-order - * page creation. + * page creation? Don't wanna do it right here though. */ spare_ndx = _hash_log2(metap->hashm_maxbucket + 1); if (spare_ndx > metap->hashm_ovflpoint) @@ -389,10 +447,50 @@ _hash_expandtable(Relation rel, Buffer metabuf) metap->hashm_ovflpoint = spare_ndx; } - _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ); + /* now we can compute the new bucket's primary block number */ + start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket); + + Assert(!_hash_has_active_scan(rel, new_bucket)); + + if (!_hash_try_getlock(rel, start_nblkno, HASH_EXCLUSIVE)) + elog(PANIC, "could not get lock on supposedly new bucket"); + + /* + * Copy bucket mapping info now; this saves re-accessing the meta page + * inside _hash_splitbucket's inner loop. Note that once we drop the + * split lock, other splits could begin, so these values might be out of + * date before _hash_splitbucket finishes. That's okay, since all it + * needs is to tell which of these two buckets to map hashkeys into. + */ + maxbucket = metap->hashm_maxbucket; + highmask = metap->hashm_highmask; + lowmask = metap->hashm_lowmask; + + /* Write out the metapage and drop lock, but keep pin */ + _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK); + + /* Release split lock; okay for other splits to occur now */ + _hash_droplock(rel, 0, HASH_EXCLUSIVE); /* Relocate records to the new bucket */ - _hash_splitbucket(rel, metabuf, old_bucket, new_bucket); + _hash_splitbucket(rel, metabuf, old_bucket, new_bucket, + start_oblkno, start_nblkno, + maxbucket, highmask, lowmask); + + /* Release bucket locks, allowing others to access them */ + _hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE); + _hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE); + + return; + + /* Here if decide not to split or fail to acquire old bucket lock */ +fail: + + /* We didn't write the metapage, so just drop lock */ + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK); + + /* Release split lock */ + _hash_droplock(rel, 0, HASH_EXCLUSIVE); } @@ -403,27 +501,35 @@ _hash_expandtable(Relation rel, Buffer metabuf) * or more overflow (bucket chain) pages. We must relocate tuples that * belong in the new bucket, and compress out any free space in the old * bucket. + * + * The caller must hold exclusive locks on both buckets to ensure that + * no one else is trying to access them (see README). + * + * The caller must hold a pin, but no lock, on the metapage buffer. + * The buffer is returned in the same state. (The metapage is only + * touched if it becomes necessary to add or remove overflow pages.) */ static void _hash_splitbucket(Relation rel, Buffer metabuf, Bucket obucket, - Bucket nbucket) + Bucket nbucket, + BlockNumber start_oblkno, + BlockNumber start_nblkno, + uint32 maxbucket, + uint32 highmask, + uint32 lowmask) { Bucket bucket; Buffer obuf; Buffer nbuf; - Buffer ovflbuf; BlockNumber oblkno; BlockNumber nblkno; - BlockNumber start_oblkno; - BlockNumber start_nblkno; bool null; Datum datum; HashItem hitem; HashPageOpaque oopaque; HashPageOpaque nopaque; - HashMetaPage metap; IndexTuple itup; Size itemsz; OffsetNumber ooffnum; @@ -433,12 +539,11 @@ _hash_splitbucket(Relation rel, Page npage; TupleDesc itupdesc = RelationGetDescr(rel); - metap = (HashMetaPage) BufferGetPage(metabuf); - _hash_checkpage(rel, (Page) metap, LH_META_PAGE); - - /* get the buffers & pages */ - start_oblkno = BUCKET_TO_BLKNO(metap, obucket); - start_nblkno = BUCKET_TO_BLKNO(metap, nbucket); + /* + * It should be okay to simultaneously write-lock pages from each + * bucket, since no one else can be trying to acquire buffer lock + * on pages of either bucket. + */ oblkno = start_oblkno; nblkno = start_nblkno; obuf = _hash_getbuf(rel, oblkno, HASH_WRITE); @@ -446,7 +551,10 @@ _hash_splitbucket(Relation rel, opage = BufferGetPage(obuf); npage = BufferGetPage(nbuf); - /* initialize the new bucket page */ + _hash_checkpage(rel, opage, LH_BUCKET_PAGE); + oopaque = (HashPageOpaque) PageGetSpecialPointer(opage); + + /* initialize the new bucket's primary page */ _hash_pageinit(npage, BufferGetPageSize(nbuf)); nopaque = (HashPageOpaque) PageGetSpecialPointer(npage); nopaque->hasho_prevblkno = InvalidBlockNumber; @@ -454,44 +562,11 @@ _hash_splitbucket(Relation rel, nopaque->hasho_bucket = nbucket; nopaque->hasho_flag = LH_BUCKET_PAGE; nopaque->hasho_filler = HASHO_FILL; - _hash_wrtnorelbuf(nbuf); /* - * make sure the old bucket isn't empty. advance 'opage' and friends - * through the overflow bucket chain until we find a non-empty page. - * - * XXX we should only need this once, if we are careful to preserve the - * invariant that overflow pages are never empty. - */ - _hash_checkpage(rel, opage, LH_BUCKET_PAGE); - oopaque = (HashPageOpaque) PageGetSpecialPointer(opage); - if (PageIsEmpty(opage)) - { - oblkno = oopaque->hasho_nextblkno; - _hash_relbuf(rel, obuf, HASH_WRITE); - if (!BlockNumberIsValid(oblkno)) - { - /* - * the old bucket is completely empty; of course, the new - * bucket will be as well, but since it's a base bucket page - * we don't care. - */ - _hash_relbuf(rel, nbuf, HASH_WRITE); - return; - } - obuf = _hash_getbuf(rel, oblkno, HASH_WRITE); - opage = BufferGetPage(obuf); - _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE); - if (PageIsEmpty(opage)) - elog(ERROR, "empty hash overflow page %u", oblkno); - oopaque = (HashPageOpaque) PageGetSpecialPointer(opage); - } - - /* - * we are now guaranteed that 'opage' is not empty. partition the - * tuples in the old bucket between the old bucket and the new bucket, - * advancing along their respective overflow bucket chains and adding - * overflow pages as needed. + * Partition the tuples in the old bucket between the old bucket and the + * new bucket, advancing along the old bucket's overflow bucket chain + * and adding overflow pages to the new bucket as needed. */ ooffnum = FirstOffsetNumber; omaxoffnum = PageGetMaxOffsetNumber(opage); @@ -505,48 +580,39 @@ _hash_splitbucket(Relation rel, /* check if we're at the end of the page */ if (ooffnum > omaxoffnum) { - /* at end of page, but check for overflow page */ + /* at end of page, but check for an(other) overflow page */ oblkno = oopaque->hasho_nextblkno; - if (BlockNumberIsValid(oblkno)) - { - /* - * we ran out of tuples on this particular page, but we - * have more overflow pages; re-init values. - */ - _hash_wrtbuf(rel, obuf); - obuf = _hash_getbuf(rel, oblkno, HASH_WRITE); - opage = BufferGetPage(obuf); - _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE); - oopaque = (HashPageOpaque) PageGetSpecialPointer(opage); - /* we're guaranteed that an ovfl page has at least 1 tuple */ - if (PageIsEmpty(opage)) - elog(ERROR, "empty hash overflow page %u", oblkno); - ooffnum = FirstOffsetNumber; - omaxoffnum = PageGetMaxOffsetNumber(opage); - } - else - { - /* - * We're at the end of the bucket chain, so now we're - * really done with everything. Before quitting, call - * _hash_squeezebucket to ensure the tuples remaining in the - * old bucket (including the overflow pages) are packed as - * tightly as possible. The new bucket is already tight. - */ - _hash_wrtbuf(rel, obuf); - _hash_wrtbuf(rel, nbuf); - _hash_squeezebucket(rel, obucket, start_oblkno); - return; - } + if (!BlockNumberIsValid(oblkno)) + break; + /* + * we ran out of tuples on this particular page, but we + * have more overflow pages; advance to next page. + */ + _hash_wrtbuf(rel, obuf); + + obuf = _hash_getbuf(rel, oblkno, HASH_WRITE); + opage = BufferGetPage(obuf); + _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE); + oopaque = (HashPageOpaque) PageGetSpecialPointer(opage); + ooffnum = FirstOffsetNumber; + omaxoffnum = PageGetMaxOffsetNumber(opage); + continue; } - /* hash on the tuple */ + /* + * Re-hash the tuple to determine which bucket it now belongs in. + * + * It is annoying to call the hash function while holding locks, + * but releasing and relocking the page for each tuple is unappealing + * too. + */ hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum)); itup = &(hitem->hash_itup); datum = index_getattr(itup, 1, itupdesc, &null); Assert(!null); - bucket = _hash_call(rel, metap, datum); + bucket = _hash_hashkey2bucket(_hash_datum2hashkey(rel, datum), + maxbucket, highmask, lowmask); if (bucket == nbucket) { @@ -562,11 +628,13 @@ _hash_splitbucket(Relation rel, if (PageGetFreeSpace(npage) < itemsz) { - ovflbuf = _hash_addovflpage(rel, metabuf, nbuf); - _hash_wrtbuf(rel, nbuf); - nbuf = ovflbuf; + /* write out nbuf and drop lock, but keep pin */ + _hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK); + /* chain to a new overflow page */ + nbuf = _hash_addovflpage(rel, metabuf, nbuf); npage = BufferGetPage(nbuf); - _hash_checkpage(rel, npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); + _hash_checkpage(rel, npage, LH_OVERFLOW_PAGE); + /* we don't need nopaque within the loop */ } noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage)); @@ -574,7 +642,6 @@ _hash_splitbucket(Relation rel, == InvalidOffsetNumber) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(rel)); - _hash_wrtnorelbuf(nbuf); /* * now delete the tuple from the old bucket. after this @@ -586,40 +653,7 @@ _hash_splitbucket(Relation rel, * instead of calling PageGetMaxOffsetNumber. */ PageIndexTupleDelete(opage, ooffnum); - _hash_wrtnorelbuf(obuf); omaxoffnum = OffsetNumberPrev(omaxoffnum); - - /* - * tidy up. if the old page was an overflow page and it is - * now empty, we must free it (we want to preserve the - * invariant that overflow pages cannot be empty). - */ - if (PageIsEmpty(opage) && - (oopaque->hasho_flag & LH_OVERFLOW_PAGE)) - { - oblkno = _hash_freeovflpage(rel, obuf); - - /* check that we're not through the bucket chain */ - if (!BlockNumberIsValid(oblkno)) - { - _hash_wrtbuf(rel, nbuf); - _hash_squeezebucket(rel, obucket, start_oblkno); - return; - } - - /* - * re-init. again, we're guaranteed that an ovfl page has - * at least one tuple. - */ - obuf = _hash_getbuf(rel, oblkno, HASH_WRITE); - opage = BufferGetPage(obuf); - _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE); - oopaque = (HashPageOpaque) PageGetSpecialPointer(opage); - if (PageIsEmpty(opage)) - elog(ERROR, "empty hash overflow page %u", oblkno); - ooffnum = FirstOffsetNumber; - omaxoffnum = PageGetMaxOffsetNumber(opage); - } } else { @@ -632,5 +666,15 @@ _hash_splitbucket(Relation rel, ooffnum = OffsetNumberNext(ooffnum); } } - /* NOTREACHED */ + + /* + * We're at the end of the old bucket chain, so we're done partitioning + * the tuples. Before quitting, call _hash_squeezebucket to ensure the + * tuples remaining in the old bucket (including the overflow pages) are + * packed as tightly as possible. The new bucket is already tight. + */ + _hash_wrtbuf(rel, obuf); + _hash_wrtbuf(rel, nbuf); + + _hash_squeezebucket(rel, obucket, start_oblkno); } diff --git a/src/backend/access/hash/hashscan.c b/src/backend/access/hash/hashscan.c index a0b124cbee..35ac0622b5 100644 --- a/src/backend/access/hash/hashscan.c +++ b/src/backend/access/hash/hashscan.c @@ -8,22 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.30 2003/08/04 02:39:57 momjian Exp $ - * - * NOTES - * Because we can be doing an index scan on a relation while we - * update it, we need to avoid missing data that moves around in - * the index. The routines and global variables in this file - * guarantee that all scans in the local address space stay - * correctly positioned. This is all we need to worry about, since - * write locking guarantees that no one else will be on the same - * page at the same time as we are. - * - * The scheme is to manage a list of active scans in the current - * backend. Whenever we add or remove records from an index, we - * check the list of active scans to see if any has been affected. - * A scan is affected only if it is on the same relation, and the - * same page, as the update. + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.31 2003/09/04 22:06:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -44,10 +29,6 @@ typedef HashScanListData *HashScanList; static HashScanList HashScans = (HashScanList) NULL; -static void _hash_scandel(IndexScanDesc scan, - BlockNumber blkno, OffsetNumber offno); - - /* * AtEOXact_hash() --- clean up hash subsystem at xact abort or commit. * @@ -67,9 +48,6 @@ AtEOXact_hash(void) * at end of transaction anyway. */ HashScans = NULL; - - /* If we were building a hash, we ain't anymore. */ - BuildingHash = false; } /* @@ -112,70 +90,26 @@ _hash_dropscan(IndexScanDesc scan) pfree(chk); } -void -_hash_adjscans(Relation rel, ItemPointer tid) +/* + * Is there an active scan in this bucket? + */ +bool +_hash_has_active_scan(Relation rel, Bucket bucket) { + Oid relid = RelationGetRelid(rel); HashScanList l; - Oid relid; - relid = RelationGetRelid(rel); - for (l = HashScans; l != (HashScanList) NULL; l = l->hashsl_next) + for (l = HashScans; l != NULL; l = l->hashsl_next) { if (relid == l->hashsl_scan->indexRelation->rd_id) - _hash_scandel(l->hashsl_scan, ItemPointerGetBlockNumber(tid), - ItemPointerGetOffsetNumber(tid)); - } -} - -static void -_hash_scandel(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno) -{ - ItemPointer current; - ItemPointer mark; - Buffer buf; - Buffer metabuf; - HashScanOpaque so; - - so = (HashScanOpaque) scan->opaque; - current = &(scan->currentItemData); - mark = &(scan->currentMarkData); - - if (ItemPointerIsValid(current) - && ItemPointerGetBlockNumber(current) == blkno - && ItemPointerGetOffsetNumber(current) >= offno) - { - metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ); - buf = so->hashso_curbuf; - _hash_step(scan, &buf, BackwardScanDirection, metabuf); - } - - if (ItemPointerIsValid(mark) - && ItemPointerGetBlockNumber(mark) == blkno - && ItemPointerGetOffsetNumber(mark) >= offno) - { - /* - * The idea here is to exchange the current and mark positions, - * then step backwards (affecting current), then exchange again. - */ - ItemPointerData tmpitem; - Buffer tmpbuf; - - tmpitem = *mark; - *mark = *current; - *current = tmpitem; - tmpbuf = so->hashso_mrkbuf; - so->hashso_mrkbuf = so->hashso_curbuf; - so->hashso_curbuf = tmpbuf; - - metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ); - buf = so->hashso_curbuf; - _hash_step(scan, &buf, BackwardScanDirection, metabuf); - - tmpitem = *mark; - *mark = *current; - *current = tmpitem; - tmpbuf = so->hashso_mrkbuf; - so->hashso_mrkbuf = so->hashso_curbuf; - so->hashso_curbuf = tmpbuf; + { + HashScanOpaque so = (HashScanOpaque) l->hashsl_scan->opaque; + + if (so->hashso_bucket_valid && + so->hashso_bucket == bucket) + return true; + } } + + return false; } diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c index c5321e4b6b..d8982ffdbc 100644 --- a/src/backend/access/hash/hashsearch.c +++ b/src/backend/access/hash/hashsearch.c @@ -8,55 +8,16 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.33 2003/09/02 18:13:31 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.34 2003/09/04 22:06:27 tgl Exp $ * *------------------------------------------------------------------------- */ - #include "postgres.h" #include "access/hash.h" +#include "storage/lmgr.h" -/* - * _hash_search() -- Find the bucket that contains the scankey - * and fetch its primary bucket page into *bufP. - * - * the buffer has a read lock. - */ -void -_hash_search(Relation rel, - int keysz, - ScanKey scankey, - Buffer *bufP, - HashMetaPage metap) -{ - BlockNumber blkno; - Bucket bucket; - - if (scankey == NULL || - (scankey[0].sk_flags & SK_ISNULL)) - { - /* - * If the scankey is empty, all tuples will satisfy the - * scan so we start the scan at the first bucket (bucket 0). - * - * If the scankey is NULL, no tuples will satisfy the search; - * this should have been checked already, but arbitrarily return - * bucket zero. - */ - bucket = 0; - } - else - { - bucket = _hash_call(rel, metap, scankey[0].sk_argument); - } - - blkno = BUCKET_TO_BLKNO(metap, bucket); - - *bufP = _hash_getbuf(rel, blkno, HASH_READ); -} - /* * _hash_next() -- Get the next item in a scan. * @@ -69,31 +30,23 @@ _hash_search(Relation rel, bool _hash_next(IndexScanDesc scan, ScanDirection dir) { - Relation rel; + Relation rel = scan->indexRelation; + HashScanOpaque so = (HashScanOpaque) scan->opaque; Buffer buf; - Buffer metabuf; Page page; OffsetNumber offnum; ItemPointer current; HashItem hitem; IndexTuple itup; - HashScanOpaque so; - rel = scan->indexRelation; - so = (HashScanOpaque) scan->opaque; - - /* we still have the buffer pinned and locked */ + /* we still have the buffer pinned and read-locked */ buf = so->hashso_curbuf; Assert(BufferIsValid(buf)); - metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ); - /* - * step to next valid tuple. note that _hash_step releases our lock - * on 'metabuf'; if we switch to a new 'buf' while looking for the - * next tuple, we come back with a lock on that buffer. + * step to next valid tuple. */ - if (!_hash_step(scan, &buf, dir, metabuf)) + if (!_hash_step(scan, &buf, dir)) return false; /* if we're here, _hash_step found a valid tuple */ @@ -108,6 +61,9 @@ _hash_next(IndexScanDesc scan, ScanDirection dir) return true; } +/* + * Advance to next page in a bucket, if any. + */ static void _hash_readnext(Relation rel, Buffer *bufp, Page *pagep, HashPageOpaque *opaquep) @@ -115,7 +71,7 @@ _hash_readnext(Relation rel, BlockNumber blkno; blkno = (*opaquep)->hasho_nextblkno; - _hash_relbuf(rel, *bufp, HASH_READ); + _hash_relbuf(rel, *bufp); *bufp = InvalidBuffer; if (BlockNumberIsValid(blkno)) { @@ -123,10 +79,12 @@ _hash_readnext(Relation rel, *pagep = BufferGetPage(*bufp); _hash_checkpage(rel, *pagep, LH_OVERFLOW_PAGE); *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep); - Assert(!PageIsEmpty(*pagep)); } } +/* + * Advance to previous page in a bucket, if any. + */ static void _hash_readprev(Relation rel, Buffer *bufp, Page *pagep, HashPageOpaque *opaquep) @@ -134,7 +92,7 @@ _hash_readprev(Relation rel, BlockNumber blkno; blkno = (*opaquep)->hasho_prevblkno; - _hash_relbuf(rel, *bufp, HASH_READ); + _hash_relbuf(rel, *bufp); *bufp = InvalidBuffer; if (BlockNumberIsValid(blkno)) { @@ -142,28 +100,26 @@ _hash_readprev(Relation rel, *pagep = BufferGetPage(*bufp); _hash_checkpage(rel, *pagep, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep); - if (PageIsEmpty(*pagep)) - { - Assert((*opaquep)->hasho_flag & LH_BUCKET_PAGE); - _hash_relbuf(rel, *bufp, HASH_READ); - *bufp = InvalidBuffer; - } } } /* * _hash_first() -- Find the first item in a scan. * - * Find the first item in the tree that + * Find the first item in the index that * satisfies the qualification associated with the scan descriptor. On - * exit, the page containing the current index tuple is read locked + * success, the page containing the current index tuple is read locked * and pinned, and the scan's opaque data entry is updated to * include the buffer. */ bool _hash_first(IndexScanDesc scan, ScanDirection dir) { - Relation rel; + Relation rel = scan->indexRelation; + HashScanOpaque so = (HashScanOpaque) scan->opaque; + uint32 hashkey; + Bucket bucket; + BlockNumber blkno; Buffer buf; Buffer metabuf; Page page; @@ -173,70 +129,89 @@ _hash_first(IndexScanDesc scan, ScanDirection dir) IndexTuple itup; ItemPointer current; OffsetNumber offnum; - HashScanOpaque so; - rel = scan->indexRelation; - so = (HashScanOpaque) scan->opaque; current = &(scan->currentItemData); + ItemPointerSetInvalid(current); + /* + * We do not support hash scans with no index qualification, because + * we would have to read the whole index rather than just one bucket. + * That creates a whole raft of problems, since we haven't got a + * practical way to lock all the buckets against splits or compactions. + */ + if (scan->numberOfKeys < 1) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("hash indexes do not support whole-index scans"))); + + /* + * If the constant in the index qual is NULL, assume it cannot match + * any items in the index. + */ + if (scan->keyData[0].sk_flags & SK_ISNULL) + return false; + + /* + * Okay to compute the hash key. We want to do this before acquiring + * any locks, in case a user-defined hash function happens to be slow. + */ + hashkey = _hash_datum2hashkey(rel, scan->keyData[0].sk_argument); + + /* + * Acquire shared split lock so we can compute the target bucket + * safely (see README). + */ + _hash_getlock(rel, 0, HASH_SHARE); + + /* Read the metapage */ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ); metap = (HashMetaPage) BufferGetPage(metabuf); _hash_checkpage(rel, (Page) metap, LH_META_PAGE); /* - * XXX -- The attribute number stored in the scan key is the attno in - * the heap relation. We need to transmogrify this into the index - * relation attno here. For the moment, we have hardwired attno == 1. + * Compute the target bucket number, and convert to block number. */ + bucket = _hash_hashkey2bucket(hashkey, + metap->hashm_maxbucket, + metap->hashm_highmask, + metap->hashm_lowmask); - /* find the correct bucket page and load it into buf */ - _hash_search(rel, 1, scan->keyData, &buf, metap); + blkno = BUCKET_TO_BLKNO(metap, bucket); + + /* done with the metapage */ + _hash_relbuf(rel, metabuf); + + /* + * Acquire share lock on target bucket; then we can release split lock. + */ + _hash_getlock(rel, blkno, HASH_SHARE); + + _hash_droplock(rel, 0, HASH_SHARE); + + /* Update scan opaque state to show we have lock on the bucket */ + so->hashso_bucket = bucket; + so->hashso_bucket_valid = true; + so->hashso_bucket_blkno = blkno; + + /* Fetch the primary bucket page for the bucket */ + buf = _hash_getbuf(rel, blkno, HASH_READ); page = BufferGetPage(buf); _hash_checkpage(rel, page, LH_BUCKET_PAGE); opaque = (HashPageOpaque) PageGetSpecialPointer(page); + Assert(opaque->hasho_bucket == bucket); - /* - * if we are scanning forward, we need to find the first non-empty - * page (if any) in the bucket chain. since overflow pages are never - * empty, this had better be either the bucket page or the first - * overflow page. - * - * if we are scanning backward, we always go all the way to the end of - * the bucket chain. - */ - if (PageIsEmpty(page)) - { - if (BlockNumberIsValid(opaque->hasho_nextblkno)) - _hash_readnext(rel, &buf, &page, &opaque); - else - { - ItemPointerSetInvalid(current); - so->hashso_curbuf = InvalidBuffer; - - /* - * If there is no scankeys, all tuples will satisfy the scan - - * so we continue in _hash_step to get tuples from all - * buckets. - vadim 04/29/97 - */ - if (scan->numberOfKeys >= 1) - { - _hash_relbuf(rel, buf, HASH_READ); - _hash_relbuf(rel, metabuf, HASH_READ); - return false; - } - } - } + /* If a backwards scan is requested, move to the end of the chain */ if (ScanDirectionIsBackward(dir)) { while (BlockNumberIsValid(opaque->hasho_nextblkno)) _hash_readnext(rel, &buf, &page, &opaque); } - if (!_hash_step(scan, &buf, dir, metabuf)) + /* Now find the first tuple satisfying the qualification */ + if (!_hash_step(scan, &buf, dir)) return false; /* if we're here, _hash_step found a valid tuple */ - current = &(scan->currentItemData); offnum = ItemPointerGetOffsetNumber(current); page = BufferGetPage(buf); _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); @@ -254,19 +229,16 @@ _hash_first(IndexScanDesc scan, ScanDirection dir) * false. Else, return true and set the CurrentItemData for the * scan to the right thing. * - * 'bufP' points to the buffer which contains the current page - * that we'll step through. - * - * 'metabuf' is released when this returns. + * 'bufP' points to the current buffer, which is pinned and read-locked. + * On success exit, we have pin and read-lock on whichever page + * contains the right item; on failure, we have released all buffers. */ bool -_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf) +_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) { - Relation rel; + Relation rel = scan->indexRelation; + HashScanOpaque so = (HashScanOpaque) scan->opaque; ItemPointer current; - HashScanOpaque so; - int allbuckets; - HashMetaPage metap; Buffer buf; Page page; HashPageOpaque opaque; @@ -277,18 +249,13 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf) HashItem hitem; IndexTuple itup; - rel = scan->indexRelation; current = &(scan->currentItemData); - so = (HashScanOpaque) scan->opaque; - allbuckets = (scan->numberOfKeys < 1); - - metap = (HashMetaPage) BufferGetPage(metabuf); - _hash_checkpage(rel, (Page) metap, LH_META_PAGE); buf = *bufP; page = BufferGetPage(buf); _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); opaque = (HashPageOpaque) PageGetSpecialPointer(page); + bucket = opaque->hasho_bucket; /* * If _hash_step is called from _hash_first, current will not be @@ -309,107 +276,63 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf) */ do { - bucket = opaque->hasho_bucket; - switch (dir) { case ForwardScanDirection: if (offnum != InvalidOffsetNumber) - { offnum = OffsetNumberNext(offnum); /* move forward */ - } else - { offnum = FirstOffsetNumber; /* new page */ - } + while (offnum > maxoff) { - - /*-------- + /* * either this page is empty * (maxoff == InvalidOffsetNumber) * or we ran off the end. - *-------- */ _hash_readnext(rel, &buf, &page, &opaque); - if (BufferIsInvalid(buf)) - { /* end of chain */ - if (allbuckets && bucket < metap->hashm_maxbucket) - { - ++bucket; - blkno = BUCKET_TO_BLKNO(metap, bucket); - buf = _hash_getbuf(rel, blkno, HASH_READ); - page = BufferGetPage(buf); - _hash_checkpage(rel, page, LH_BUCKET_PAGE); - opaque = (HashPageOpaque) PageGetSpecialPointer(page); - Assert(opaque->hasho_bucket == bucket); - while (PageIsEmpty(page) && - BlockNumberIsValid(opaque->hasho_nextblkno)) - _hash_readnext(rel, &buf, &page, &opaque); - maxoff = PageGetMaxOffsetNumber(page); - offnum = FirstOffsetNumber; - } - else - { - maxoff = offnum = InvalidOffsetNumber; - break; /* while */ - } - } - else + if (BufferIsValid(buf)) { - /* _hash_readnext never returns an empty page */ maxoff = PageGetMaxOffsetNumber(page); offnum = FirstOffsetNumber; } + else + { + /* end of bucket */ + maxoff = offnum = InvalidOffsetNumber; + break; /* exit while */ + } } break; + case BackwardScanDirection: if (offnum != InvalidOffsetNumber) - { offnum = OffsetNumberPrev(offnum); /* move back */ - } else - { offnum = maxoff; /* new page */ - } + while (offnum < FirstOffsetNumber) { - - /*--------- + /* * either this page is empty * (offnum == InvalidOffsetNumber) * or we ran off the end. - *--------- */ _hash_readprev(rel, &buf, &page, &opaque); - if (BufferIsInvalid(buf)) - { /* end of chain */ - if (allbuckets && bucket > 0) - { - --bucket; - blkno = BUCKET_TO_BLKNO(metap, bucket); - buf = _hash_getbuf(rel, blkno, HASH_READ); - page = BufferGetPage(buf); - _hash_checkpage(rel, page, LH_BUCKET_PAGE); - opaque = (HashPageOpaque) PageGetSpecialPointer(page); - Assert(opaque->hasho_bucket == bucket); - while (BlockNumberIsValid(opaque->hasho_nextblkno)) - _hash_readnext(rel, &buf, &page, &opaque); - maxoff = offnum = PageGetMaxOffsetNumber(page); - } - else - { - maxoff = offnum = InvalidOffsetNumber; - break; /* while */ - } + if (BufferIsValid(buf)) + { + maxoff = offnum = PageGetMaxOffsetNumber(page); } else { - /* _hash_readprev never returns an empty page */ - maxoff = offnum = PageGetMaxOffsetNumber(page); + /* end of bucket */ + maxoff = offnum = InvalidOffsetNumber; + break; /* exit while */ } } break; + default: /* NoMovementScanDirection */ /* this should not be reached */ @@ -419,7 +342,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf) /* we ran off the end of the world without finding a match */ if (offnum == InvalidOffsetNumber) { - _hash_relbuf(rel, metabuf, HASH_READ); *bufP = so->hashso_curbuf = InvalidBuffer; ItemPointerSetInvalid(current); return false; @@ -431,7 +353,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf) } while (!_hash_checkqual(scan, itup)); /* if we made it to here, we've found a valid tuple */ - _hash_relbuf(rel, metabuf, HASH_READ); blkno = BufferGetBlockNumber(buf); *bufP = so->hashso_curbuf = buf; ItemPointerSet(current, blkno, offnum); diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c index ce62a3a844..0cfbe5e7a1 100644 --- a/src/backend/access/hash/hashutil.c +++ b/src/backend/access/hash/hashutil.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.35 2003/09/02 18:13:31 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.36 2003/09/04 22:06:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -19,46 +19,6 @@ #include "access/iqual.h" -/* - * _hash_mkscankey -- build a scan key matching the given indextuple - * - * Note: this is prepared for multiple index columns, but very little - * else in access/hash is ... - */ -ScanKey -_hash_mkscankey(Relation rel, IndexTuple itup) -{ - ScanKey skey; - TupleDesc itupdesc = RelationGetDescr(rel); - int natts = rel->rd_rel->relnatts; - AttrNumber i; - Datum arg; - FmgrInfo *procinfo; - bool isnull; - - skey = (ScanKey) palloc(natts * sizeof(ScanKeyData)); - - for (i = 0; i < natts; i++) - { - arg = index_getattr(itup, i + 1, itupdesc, &isnull); - procinfo = index_getprocinfo(rel, i + 1, HASHPROC); - ScanKeyEntryInitializeWithInfo(&skey[i], - isnull ? SK_ISNULL : 0x0, - (AttrNumber) (i + 1), - procinfo, - CurrentMemoryContext, - arg); - } - - return skey; -} - -void -_hash_freeskey(ScanKey skey) -{ - pfree(skey); -} - /* * _hash_checkqual -- does the index tuple satisfy the scan conditions? */ @@ -102,24 +62,31 @@ _hash_formitem(IndexTuple itup) } /* - * _hash_call -- given a Datum, call the index's hash procedure - * - * Returns the bucket number that the hash key maps to. + * _hash_datum2hashkey -- given a Datum, call the index's hash procedure */ -Bucket -_hash_call(Relation rel, HashMetaPage metap, Datum key) +uint32 +_hash_datum2hashkey(Relation rel, Datum key) { FmgrInfo *procinfo; - uint32 n; - Bucket bucket; /* XXX assumes index has only one attribute */ procinfo = index_getprocinfo(rel, 1, HASHPROC); - n = DatumGetUInt32(FunctionCall1(procinfo, key)); - bucket = n & metap->hashm_highmask; - if (bucket > metap->hashm_maxbucket) - bucket = bucket & metap->hashm_lowmask; + return DatumGetUInt32(FunctionCall1(procinfo, key)); +} + +/* + * _hash_hashkey2bucket -- determine which bucket the hashkey maps to. + */ +Bucket +_hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket, + uint32 highmask, uint32 lowmask) +{ + Bucket bucket; + + bucket = hashkey & highmask; + if (bucket > maxbucket) + bucket = bucket & lowmask; return bucket; } diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 12845f5593..c4fceb0096 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.59 2003/08/17 22:41:12 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.60 2003/09/04 22:06:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -153,7 +153,7 @@ LockRelation(Relation relation, LOCKMODE lockmode) * As above, but only lock if we can get the lock without blocking. * Returns TRUE iff the lock was acquired. * - * NOTE: we do not currently need conditional versions of the other + * NOTE: we do not currently need conditional versions of all the * LockXXX routines in this file, but they could easily be added if needed. */ bool @@ -264,6 +264,26 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode) elog(ERROR, "LockAcquire failed"); } +/* + * ConditionalLockPage + * + * As above, but only lock if we can get the lock without blocking. + * Returns TRUE iff the lock was acquired. + */ +bool +ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode) +{ + LOCKTAG tag; + + MemSet(&tag, 0, sizeof(tag)); + tag.relId = relation->rd_lockInfo.lockRelId.relId; + tag.dbId = relation->rd_lockInfo.lockRelId.dbId; + tag.objId.blkno = blkno; + + return LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), + lockmode, true); +} + /* * UnlockPage */ diff --git a/src/include/access/hash.h b/src/include/access/hash.h index 7edbdad098..beffa806ea 100644 --- a/src/include/access/hash.h +++ b/src/include/access/hash.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: hash.h,v 1.52 2003/09/02 18:13:32 tgl Exp $ + * $Id: hash.h,v 1.53 2003/09/04 22:06:27 tgl Exp $ * * NOTES * modeled after Margo Seltzer's hash implementation for unix. @@ -70,13 +70,27 @@ typedef HashPageOpaqueData *HashPageOpaque; #define HASHO_FILL 0x1234 /* - * ScanOpaqueData is used to remember which buffers we're currently - * examining in the scan. We keep these buffers locked and pinned and - * recorded in the opaque entry of the scan in order to avoid doing a - * ReadBuffer() for every tuple in the index. + * HashScanOpaqueData is private state for a hash index scan. */ typedef struct HashScanOpaqueData { + /* + * By definition, a hash scan should be examining only one bucket. + * We record the bucket number here as soon as it is known. + */ + Bucket hashso_bucket; + bool hashso_bucket_valid; + /* + * If we have a share lock on the bucket, we record it here. When + * hashso_bucket_blkno is zero, we have no such lock. + */ + BlockNumber hashso_bucket_blkno; + /* + * We also want to remember which buffers we're currently examining in the + * scan. We keep these buffers pinned (but not locked) across hashgettuple + * calls, in order to avoid doing a ReadBuffer() for every tuple in the + * index. + */ Buffer hashso_curbuf; Buffer hashso_mrkbuf; } HashScanOpaqueData; @@ -148,10 +162,18 @@ typedef struct HashItemData typedef HashItemData *HashItem; +/* + * Maximum size of a hash index item (it's okay to have only one per page) + */ +#define HashMaxItemSize(page) \ + (PageGetPageSize(page) - \ + sizeof(PageHeaderData) - \ + MAXALIGN(sizeof(HashPageOpaqueData)) - \ + sizeof(ItemIdData)) + /* * Constants */ -#define DEFAULT_FFACTOR 300 #define BYTE_TO_BIT 3 /* 2^3 bits/byte */ #define ALL_SET ((uint32) ~0) @@ -180,10 +202,14 @@ typedef HashItemData *HashItem; #define ISSET(A, N) ((A)[(N)/BITS_PER_MAP] & (1<<((N)%BITS_PER_MAP))) /* - * page locking modes + * page-level and high-level locking modes (see README) */ -#define HASH_READ 0 -#define HASH_WRITE 1 +#define HASH_READ BUFFER_LOCK_SHARE +#define HASH_WRITE BUFFER_LOCK_EXCLUSIVE +#define HASH_NOLOCK (-1) + +#define HASH_SHARE ShareLock +#define HASH_EXCLUSIVE ExclusiveLock /* * Strategy number. There's only one valid strategy for hashing: equality. @@ -199,8 +225,6 @@ typedef HashItemData *HashItem; #define HASHPROC 1 -extern bool BuildingHash; - /* public routines */ extern Datum hashbuild(PG_FUNCTION_ARGS); @@ -250,36 +274,37 @@ extern void _hash_squeezebucket(Relation rel, Bucket bucket, BlockNumber bucket_blkno); /* hashpage.c */ -extern void _hash_metapinit(Relation rel); +extern void _hash_getlock(Relation rel, BlockNumber whichlock, int access); +extern bool _hash_try_getlock(Relation rel, BlockNumber whichlock, int access); +extern void _hash_droplock(Relation rel, BlockNumber whichlock, int access); extern Buffer _hash_getbuf(Relation rel, BlockNumber blkno, int access); -extern void _hash_relbuf(Relation rel, Buffer buf, int access); +extern void _hash_relbuf(Relation rel, Buffer buf); +extern void _hash_dropbuf(Relation rel, Buffer buf); extern void _hash_wrtbuf(Relation rel, Buffer buf); -extern void _hash_wrtnorelbuf(Buffer buf); +extern void _hash_wrtnorelbuf(Relation rel, Buffer buf); extern void _hash_chgbufaccess(Relation rel, Buffer buf, int from_access, int to_access); +extern void _hash_metapinit(Relation rel); extern void _hash_pageinit(Page page, Size size); extern void _hash_expandtable(Relation rel, Buffer metabuf); /* hashscan.c */ extern void _hash_regscan(IndexScanDesc scan); extern void _hash_dropscan(IndexScanDesc scan); -extern void _hash_adjscans(Relation rel, ItemPointer tid); +extern bool _hash_has_active_scan(Relation rel, Bucket bucket); extern void AtEOXact_hash(void); /* hashsearch.c */ -extern void _hash_search(Relation rel, int keysz, ScanKey scankey, - Buffer *bufP, HashMetaPage metap); extern bool _hash_next(IndexScanDesc scan, ScanDirection dir); extern bool _hash_first(IndexScanDesc scan, ScanDirection dir); -extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, - Buffer metabuf); +extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir); /* hashutil.c */ -extern ScanKey _hash_mkscankey(Relation rel, IndexTuple itup); -extern void _hash_freeskey(ScanKey skey); extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup); extern HashItem _hash_formitem(IndexTuple itup); -extern Bucket _hash_call(Relation rel, HashMetaPage metap, Datum key); +extern uint32 _hash_datum2hashkey(Relation rel, Datum key); +extern Bucket _hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket, + uint32 highmask, uint32 lowmask); extern uint32 _hash_log2(uint32 num); extern void _hash_checkpage(Relation rel, Page page, int flags); diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index d7a557d2b5..19bda76d72 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: lmgr.h,v 1.39 2003/08/04 02:40:14 momjian Exp $ + * $Id: lmgr.h,v 1.40 2003/09/04 22:06:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -54,8 +54,9 @@ extern void UnlockRelation(Relation relation, LOCKMODE lockmode); extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode); extern void UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode); -/* Lock a page (mainly used for indices) */ +/* Lock a page (mainly used for indexes) */ extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode); +extern bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode); extern void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode); /* Lock an XID (used to wait for a transaction to finish) */