diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index dc63063ac1..354e7339cf 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -228,3 +228,44 @@ _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup) return itup_off; } + +/* + * _hash_pgaddmultitup() -- add a tuple vector to a particular page in the + * index. + * + * This routine has same requirements for locking and tuple ordering as + * _hash_pgaddtup(). + * + * Returns the offset number array at which the tuples were inserted. + */ +void +_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups, + OffsetNumber *itup_offsets, uint16 nitups) +{ + OffsetNumber itup_off; + Page page; + uint32 hashkey; + int i; + + _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); + page = BufferGetPage(buf); + + for (i = 0; i < nitups; i++) + { + Size itemsize; + + itemsize = IndexTupleDSize(*itups[i]); + itemsize = MAXALIGN(itemsize); + + /* Find where to insert the tuple (preserving page's hashkey ordering) */ + hashkey = _hash_get_indextuple_hashkey(itups[i]); + itup_off = _hash_binsearch(page, hashkey); + + itup_offsets[i] = itup_off; + + if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false) + == InvalidOffsetNumber) + elog(ERROR, "failed to add index item to \"%s\"", + RelationGetRelationName(rel)); + } +} diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c index 3334089329..ff6c4e295c 100644 --- a/src/backend/access/hash/hashovfl.c +++ b/src/backend/access/hash/hashovfl.c @@ -391,6 +391,8 @@ _hash_firstfreebit(uint32 map) * Remove this overflow page from its bucket's chain, and mark the page as * free. On entry, ovflbuf is write-locked; it is released before exiting. * + * Add the tuples (itups) to wbuf. + * * Since this function is invoked in VACUUM, we provide an access strategy * parameter that controls fetches of the bucket pages. * @@ -403,13 +405,16 @@ _hash_firstfreebit(uint32 map) * has a lock on same. */ BlockNumber -_hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf, +_hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf, + Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets, + Size *tups_size, uint16 nitups, BufferAccessStrategy bstrategy) { HashMetaPage metap; Buffer metabuf; Buffer mapbuf; Buffer prevbuf = InvalidBuffer; + Buffer nextbuf = InvalidBuffer; BlockNumber ovflblkno; BlockNumber prevblkno; BlockNumber blkno; @@ -434,15 +439,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf, writeblkno = BufferGetBlockNumber(wbuf); bucket = ovflopaque->hasho_bucket; - /* - * Zero the page for debugging's sake; then write and release it. (Note: - * if we failed to zero the page here, we'd have problems with the Assert - * in _hash_pageinit() when the page is reused.) - */ - MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf)); - MarkBufferDirty(ovflbuf); - _hash_relbuf(rel, ovflbuf); - /* * Fix up the bucket chain. this is a doubly-linked list, so we must fix * up the bucket chain members behind and ahead of the overflow page being @@ -451,9 +447,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf, */ if (BlockNumberIsValid(prevblkno)) { - Page prevpage; - HashPageOpaque prevopaque; - if (prevblkno == writeblkno) prevbuf = wbuf; else @@ -462,32 +455,13 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf, HASH_WRITE, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE, bstrategy); - - prevpage = BufferGetPage(prevbuf); - prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage); - - Assert(prevopaque->hasho_bucket == bucket); - prevopaque->hasho_nextblkno = nextblkno; - - MarkBufferDirty(prevbuf); - if (prevblkno != writeblkno) - _hash_relbuf(rel, prevbuf); } if (BlockNumberIsValid(nextblkno)) - { - Buffer nextbuf = _hash_getbuf_with_strategy(rel, - nextblkno, - HASH_WRITE, - LH_OVERFLOW_PAGE, - bstrategy); - Page nextpage = BufferGetPage(nextbuf); - HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage); - - Assert(nextopaque->hasho_bucket == bucket); - nextopaque->hasho_prevblkno = prevblkno; - MarkBufferDirty(nextbuf); - _hash_relbuf(rel, nextbuf); - } + nextbuf = _hash_getbuf_with_strategy(rel, + nextblkno, + HASH_WRITE, + LH_OVERFLOW_PAGE, + bstrategy); /* Note: bstrategy is intentionally not used for metapage and bitmap */ @@ -508,24 +482,71 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf, /* Release metapage lock while we access the bitmap page */ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK); - /* Clear the bitmap bit to indicate that this overflow page is free */ + /* read the bitmap page to clear the bitmap bit */ mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE); mappage = BufferGetPage(mapbuf); freep = HashPageGetBitmap(mappage); Assert(ISSET(freep, bitmapbit)); - CLRBIT(freep, bitmapbit); - MarkBufferDirty(mapbuf); - _hash_relbuf(rel, mapbuf); /* Get write-lock on metapage to update firstfree */ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE); + /* + * we have to insert tuples on the "write" page, being careful to preserve + * hashkey ordering. (If we insert many tuples into the same "write" page + * it would be worth qsort'ing them). + */ + if (nitups > 0) + { + _hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups); + MarkBufferDirty(wbuf); + } + + /* Initialize the freed overflow page. */ + _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf)); + MarkBufferDirty(ovflbuf); + + if (BufferIsValid(prevbuf)) + { + Page prevpage = BufferGetPage(prevbuf); + HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage); + + Assert(prevopaque->hasho_bucket == bucket); + prevopaque->hasho_nextblkno = nextblkno; + MarkBufferDirty(prevbuf); + } + if (BufferIsValid(nextbuf)) + { + Page nextpage = BufferGetPage(nextbuf); + HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage); + + Assert(nextopaque->hasho_bucket == bucket); + nextopaque->hasho_prevblkno = prevblkno; + MarkBufferDirty(nextbuf); + } + + /* Clear the bitmap bit to indicate that this overflow page is free */ + CLRBIT(freep, bitmapbit); + MarkBufferDirty(mapbuf); + /* if this is now the first free page, update hashm_firstfree */ if (ovflbitno < metap->hashm_firstfree) { metap->hashm_firstfree = ovflbitno; MarkBufferDirty(metabuf); } + + /* release previous bucket if it is not same as write bucket */ + if (BufferIsValid(prevbuf) && prevblkno != writeblkno) + _hash_relbuf(rel, prevbuf); + + if (BufferIsValid(ovflbuf)) + _hash_relbuf(rel, ovflbuf); + + if (BufferIsValid(nextbuf)) + _hash_relbuf(rel, nextbuf); + + _hash_relbuf(rel, mapbuf); _hash_relbuf(rel, metabuf); return nextblkno; @@ -640,7 +661,6 @@ _hash_squeezebucket(Relation rel, Page rpage; HashPageOpaque wopaque; HashPageOpaque ropaque; - bool wbuf_dirty; /* * start squeezing into the primary bucket page. @@ -686,15 +706,21 @@ _hash_squeezebucket(Relation rel, /* * squeeze the tuples. */ - wbuf_dirty = false; for (;;) { OffsetNumber roffnum; OffsetNumber maxroffnum; OffsetNumber deletable[MaxOffsetNumber]; - int ndeletable = 0; + IndexTuple itups[MaxIndexTuplesPerPage]; + Size tups_size[MaxIndexTuplesPerPage]; + OffsetNumber itup_offsets[MaxIndexTuplesPerPage]; + uint16 ndeletable = 0; + uint16 nitups = 0; + Size all_tups_size = 0; + int i; bool retain_pin = false; +readpage: /* Scan each tuple in "read" page */ maxroffnum = PageGetMaxOffsetNumber(rpage); for (roffnum = FirstOffsetNumber; @@ -715,11 +741,13 @@ _hash_squeezebucket(Relation rel, /* * Walk up the bucket chain, looking for a page big enough for - * this item. Exit if we reach the read page. + * this item and all other accumulated items. Exit if we reach + * the read page. */ - while (PageGetFreeSpace(wpage) < itemsz) + while (PageGetFreeSpaceForMultipleTuples(wpage, nitups + 1) < (all_tups_size + itemsz)) { Buffer next_wbuf = InvalidBuffer; + bool tups_moved = false; Assert(!PageIsEmpty(wpage)); @@ -737,12 +765,30 @@ _hash_squeezebucket(Relation rel, LH_OVERFLOW_PAGE, bstrategy); + if (nitups > 0) + { + Assert(nitups == ndeletable); + + /* + * we have to insert tuples on the "write" page, being + * careful to preserve hashkey ordering. (If we insert + * many tuples into the same "write" page it would be + * worth qsort'ing them). + */ + _hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups); + MarkBufferDirty(wbuf); + + /* Delete tuples we already moved off read page */ + PageIndexMultiDelete(rpage, deletable, ndeletable); + MarkBufferDirty(rbuf); + + tups_moved = true; + } + /* * release the lock on previous page after acquiring the lock * on next page */ - if (wbuf_dirty) - MarkBufferDirty(wbuf); if (retain_pin) LockBuffer(wbuf, BUFFER_LOCK_UNLOCK); else @@ -751,12 +797,6 @@ _hash_squeezebucket(Relation rel, /* nothing more to do if we reached the read page */ if (rblkno == wblkno) { - if (ndeletable > 0) - { - /* Delete tuples we already moved off read page */ - PageIndexMultiDelete(rpage, deletable, ndeletable); - MarkBufferDirty(rbuf); - } _hash_relbuf(rel, rbuf); return; } @@ -765,21 +805,34 @@ _hash_squeezebucket(Relation rel, wpage = BufferGetPage(wbuf); wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage); Assert(wopaque->hasho_bucket == bucket); - wbuf_dirty = false; retain_pin = false; - } - /* - * we have found room so insert on the "write" page, being careful - * to preserve hashkey ordering. (If we insert many tuples into - * the same "write" page it would be worth qsort'ing instead of - * doing repeated _hash_pgaddtup.) - */ - (void) _hash_pgaddtup(rel, wbuf, itemsz, itup); - wbuf_dirty = true; + /* be tidy */ + for (i = 0; i < nitups; i++) + pfree(itups[i]); + nitups = 0; + all_tups_size = 0; + ndeletable = 0; + + /* + * after moving the tuples, rpage would have been compacted, + * so we need to rescan it. + */ + if (tups_moved) + goto readpage; + } /* remember tuple for deletion from "read" page */ deletable[ndeletable++] = roffnum; + + /* + * we need a copy of index tuples as they can be freed as part of + * overflow page, however we need them to write a WAL record in + * _hash_freeovflpage. + */ + itups[nitups] = CopyIndexTuple(itup); + tups_size[nitups++] = itemsz; + all_tups_size += itemsz; } /* @@ -797,10 +850,12 @@ _hash_squeezebucket(Relation rel, Assert(BlockNumberIsValid(rblkno)); /* free this overflow page (releases rbuf) */ - _hash_freeovflpage(rel, rbuf, wbuf, bstrategy); + _hash_freeovflpage(rel, bucket_buf, rbuf, wbuf, itups, itup_offsets, + tups_size, nitups, bstrategy); - if (wbuf_dirty) - MarkBufferDirty(wbuf); + /* be tidy */ + for (i = 0; i < nitups; i++) + pfree(itups[i]); /* are we freeing the page adjacent to wbuf? */ if (rblkno == wblkno) diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c index 9485978bfb..00f3ea81a7 100644 --- a/src/backend/access/hash/hashpage.c +++ b/src/backend/access/hash/hashpage.c @@ -470,7 +470,6 @@ _hash_metapinit(Relation rel, double num_tuples, ForkNumber forkNum) void _hash_pageinit(Page page, Size size) { - Assert(PageIsNew(page)); PageInit(page, size, sizeof(HashPageOpaqueData)); } diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index 6fc5fa4d05..fdf045a45b 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -597,6 +597,33 @@ PageGetFreeSpace(Page page) return (Size) space; } +/* + * PageGetFreeSpaceForMultipleTuples + * Returns the size of the free (allocatable) space on a page, + * reduced by the space needed for multiple new line pointers. + * + * Note: this should usually only be used on index pages. Use + * PageGetHeapFreeSpace on heap pages. + */ +Size +PageGetFreeSpaceForMultipleTuples(Page page, int ntups) +{ + int space; + + /* + * Use signed arithmetic here so that we behave sensibly if pd_lower > + * pd_upper. + */ + space = (int) ((PageHeader) page)->pd_upper - + (int) ((PageHeader) page)->pd_lower; + + if (space < (int) (ntups * sizeof(ItemIdData))) + return 0; + space -= ntups * sizeof(ItemIdData); + + return (Size) space; +} + /* * PageGetExactFreeSpace * Returns the size of the free (allocatable) space on a page, diff --git a/src/include/access/hash.h b/src/include/access/hash.h index 3bf587b1b7..5767deb029 100644 --- a/src/include/access/hash.h +++ b/src/include/access/hash.h @@ -303,11 +303,14 @@ extern Datum hash_uint32(uint32 k); extern void _hash_doinsert(Relation rel, IndexTuple itup); extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup); +extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups, + OffsetNumber *itup_offsets, uint16 nitups); /* hashovfl.c */ extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin); -extern BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf, - BufferAccessStrategy bstrategy); +extern BlockNumber _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf, + Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets, + Size *tups_size, uint16 nitups, BufferAccessStrategy bstrategy); extern void _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno, ForkNumber forkNum); extern void _hash_squeezebucket(Relation rel, diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 294f9cb85a..e956dc3386 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -425,6 +425,7 @@ extern Page PageGetTempPageCopySpecial(Page page); extern void PageRestoreTempPage(Page tempPage, Page oldPage); extern void PageRepairFragmentation(Page page); extern Size PageGetFreeSpace(Page page); +extern Size PageGetFreeSpaceForMultipleTuples(Page page, int ntups); extern Size PageGetExactFreeSpace(Page page); extern Size PageGetHeapFreeSpace(Page page); extern void PageIndexTupleDelete(Page page, OffsetNumber offset);