postgresql/src/backend/storage/page/bufpage.c

1547 lines
44 KiB
C

/*-------------------------------------------------------------------------
*
* bufpage.c
* POSTGRES standard buffer page code.
*
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/storage/page/bufpage.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/itup.h"
#include "access/xlog.h"
#include "pgstat.h"
#include "storage/checksum.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
/* GUC variable */
bool ignore_checksum_failure = false;
/* ----------------------------------------------------------------
* Page support functions
* ----------------------------------------------------------------
*/
/*
* PageInit
* Initializes the contents of a page.
* Note that we don't calculate an initial checksum here; that's not done
* until it's time to write.
*/
void
PageInit(Page page, Size pageSize, Size specialSize)
{
PageHeader p = (PageHeader) page;
specialSize = MAXALIGN(specialSize);
Assert(pageSize == BLCKSZ);
Assert(pageSize > specialSize + SizeOfPageHeaderData);
/* Make sure all fields of page are zero, as well as unused space */
MemSet(p, 0, pageSize);
p->pd_flags = 0;
p->pd_lower = SizeOfPageHeaderData;
p->pd_upper = pageSize - specialSize;
p->pd_special = pageSize - specialSize;
PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
/* p->pd_prune_xid = InvalidTransactionId; done by above MemSet */
}
/*
* PageIsVerifiedExtended
* Check that the page header and checksum (if any) appear valid.
*
* This is called when a page has just been read in from disk. The idea is
* to cheaply detect trashed pages before we go nuts following bogus line
* pointers, testing invalid transaction identifiers, etc.
*
* It turns out to be necessary to allow zeroed pages here too. Even though
* this routine is *not* called when deliberately adding a page to a relation,
* there are scenarios in which a zeroed page might be found in a table.
* (Example: a backend extends a relation, then crashes before it can write
* any WAL entry about the new page. The kernel will already have the
* zeroed page in the file, and it will stay that way after restart.) So we
* allow zeroed pages here, and are careful that the page access macros
* treat such a page as empty and without free space. Eventually, VACUUM
* will clean up such a page and make it usable.
*
* If flag PIV_LOG_WARNING is set, a WARNING is logged in the event of
* a checksum failure.
*
* If flag PIV_REPORT_STAT is set, a checksum failure is reported directly
* to pgstat.
*/
bool
PageIsVerifiedExtended(Page page, BlockNumber blkno, int flags)
{
PageHeader p = (PageHeader) page;
size_t *pagebytes;
int i;
bool checksum_failure = false;
bool header_sane = false;
bool all_zeroes = false;
uint16 checksum = 0;
/*
* Don't verify page data unless the page passes basic non-zero test
*/
if (!PageIsNew(page))
{
if (DataChecksumsEnabled())
{
checksum = pg_checksum_page((char *) page, blkno);
if (checksum != p->pd_checksum)
checksum_failure = true;
}
/*
* The following checks don't prove the header is correct, only that
* it looks sane enough to allow into the buffer pool. Later usage of
* the block can still reveal problems, which is why we offer the
* checksum option.
*/
if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
p->pd_lower <= p->pd_upper &&
p->pd_upper <= p->pd_special &&
p->pd_special <= BLCKSZ &&
p->pd_special == MAXALIGN(p->pd_special))
header_sane = true;
if (header_sane && !checksum_failure)
return true;
}
/* Check all-zeroes case */
all_zeroes = true;
pagebytes = (size_t *) page;
for (i = 0; i < (BLCKSZ / sizeof(size_t)); i++)
{
if (pagebytes[i] != 0)
{
all_zeroes = false;
break;
}
}
if (all_zeroes)
return true;
/*
* Throw a WARNING if the checksum fails, but only after we've checked for
* the all-zeroes case.
*/
if (checksum_failure)
{
if ((flags & PIV_LOG_WARNING) != 0)
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("page verification failed, calculated checksum %u but expected %u",
checksum, p->pd_checksum)));
if ((flags & PIV_REPORT_STAT) != 0)
pgstat_report_checksum_failure();
if (header_sane && ignore_checksum_failure)
return true;
}
return false;
}
/*
* PageAddItemExtended
*
* Add an item to a page. Return value is the offset at which it was
* inserted, or InvalidOffsetNumber if the item is not inserted for any
* reason. A WARNING is issued indicating the reason for the refusal.
*
* offsetNumber must be either InvalidOffsetNumber to specify finding a
* free line pointer, or a value between FirstOffsetNumber and one past
* the last existing item, to specify using that particular line pointer.
*
* If offsetNumber is valid and flag PAI_OVERWRITE is set, we just store
* the item at the specified offsetNumber, which must be either a
* currently-unused line pointer, or one past the last existing item.
*
* If offsetNumber is valid and flag PAI_OVERWRITE is not set, insert
* the item at the specified offsetNumber, moving existing items later
* in the array to make room.
*
* If offsetNumber is not valid, then assign a slot by finding the first
* one that is both unused and deallocated.
*
* If flag PAI_IS_HEAP is set, we enforce that there can't be more than
* MaxHeapTuplesPerPage line pointers on the page.
*
* !!! EREPORT(ERROR) IS DISALLOWED HERE !!!
*/
OffsetNumber
PageAddItemExtended(Page page,
Item item,
Size size,
OffsetNumber offsetNumber,
int flags)
{
PageHeader phdr = (PageHeader) page;
Size alignedSize;
int lower;
int upper;
ItemId itemId;
OffsetNumber limit;
bool needshuffle = false;
/*
* Be wary about corrupted page pointers
*/
if (phdr->pd_lower < SizeOfPageHeaderData ||
phdr->pd_lower > phdr->pd_upper ||
phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ)
ereport(PANIC,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
/*
* Select offsetNumber to place the new item at
*/
limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
/* was offsetNumber passed in? */
if (OffsetNumberIsValid(offsetNumber))
{
/* yes, check it */
if ((flags & PAI_OVERWRITE) != 0)
{
if (offsetNumber < limit)
{
itemId = PageGetItemId(phdr, offsetNumber);
if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
{
elog(WARNING, "will not overwrite a used ItemId");
return InvalidOffsetNumber;
}
}
}
else
{
if (offsetNumber < limit)
needshuffle = true; /* need to move existing linp's */
}
}
else
{
/* offsetNumber was not passed in, so find a free slot */
/* if no free slot, we'll put it at limit (1st open slot) */
if (PageHasFreeLinePointers(phdr))
{
/*
* Scan line pointer array to locate a "recyclable" (unused)
* ItemId.
*
* Always use earlier items first. PageTruncateLinePointerArray
* can only truncate unused items when they appear as a contiguous
* group at the end of the line pointer array.
*/
for (offsetNumber = FirstOffsetNumber;
offsetNumber < limit; /* limit is maxoff+1 */
offsetNumber++)
{
itemId = PageGetItemId(phdr, offsetNumber);
/*
* We check for no storage as well, just to be paranoid;
* unused items should never have storage. Assert() that the
* invariant is respected too.
*/
Assert(ItemIdIsUsed(itemId) || !ItemIdHasStorage(itemId));
if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
break;
}
if (offsetNumber >= limit)
{
/* the hint is wrong, so reset it */
PageClearHasFreeLinePointers(phdr);
}
}
else
{
/* don't bother searching if hint says there's no free slot */
offsetNumber = limit;
}
}
/* Reject placing items beyond the first unused line pointer */
if (offsetNumber > limit)
{
elog(WARNING, "specified item offset is too large");
return InvalidOffsetNumber;
}
/* Reject placing items beyond heap boundary, if heap */
if ((flags & PAI_IS_HEAP) != 0 && offsetNumber > MaxHeapTuplesPerPage)
{
elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
return InvalidOffsetNumber;
}
/*
* Compute new lower and upper pointers for page, see if it'll fit.
*
* Note: do arithmetic as signed ints, to avoid mistakes if, say,
* alignedSize > pd_upper.
*/
if (offsetNumber == limit || needshuffle)
lower = phdr->pd_lower + sizeof(ItemIdData);
else
lower = phdr->pd_lower;
alignedSize = MAXALIGN(size);
upper = (int) phdr->pd_upper - (int) alignedSize;
if (lower > upper)
return InvalidOffsetNumber;
/*
* OK to insert the item. First, shuffle the existing pointers if needed.
*/
itemId = PageGetItemId(phdr, offsetNumber);
if (needshuffle)
memmove(itemId + 1, itemId,
(limit - offsetNumber) * sizeof(ItemIdData));
/* set the line pointer */
ItemIdSetNormal(itemId, upper, size);
/*
* Items normally contain no uninitialized bytes. Core bufpage consumers
* conform, but this is not a necessary coding rule; a new index AM could
* opt to depart from it. However, data type input functions and other
* C-language functions that synthesize datums should initialize all
* bytes; datumIsEqual() relies on this. Testing here, along with the
* similar check in printtup(), helps to catch such mistakes.
*
* Values of the "name" type retrieved via index-only scans may contain
* uninitialized bytes; see comment in btrescan(). Valgrind will report
* this as an error, but it is safe to ignore.
*/
VALGRIND_CHECK_MEM_IS_DEFINED(item, size);
/* copy the item's data onto the page */
memcpy((char *) page + upper, item, size);
/* adjust page header */
phdr->pd_lower = (LocationIndex) lower;
phdr->pd_upper = (LocationIndex) upper;
return offsetNumber;
}
/*
* PageGetTempPage
* Get a temporary page in local memory for special processing.
* The returned page is not initialized at all; caller must do that.
*/
Page
PageGetTempPage(Page page)
{
Size pageSize;
Page temp;
pageSize = PageGetPageSize(page);
temp = (Page) palloc(pageSize);
return temp;
}
/*
* PageGetTempPageCopy
* Get a temporary page in local memory for special processing.
* The page is initialized by copying the contents of the given page.
*/
Page
PageGetTempPageCopy(Page page)
{
Size pageSize;
Page temp;
pageSize = PageGetPageSize(page);
temp = (Page) palloc(pageSize);
memcpy(temp, page, pageSize);
return temp;
}
/*
* PageGetTempPageCopySpecial
* Get a temporary page in local memory for special processing.
* The page is PageInit'd with the same special-space size as the
* given page, and the special space is copied from the given page.
*/
Page
PageGetTempPageCopySpecial(Page page)
{
Size pageSize;
Page temp;
pageSize = PageGetPageSize(page);
temp = (Page) palloc(pageSize);
PageInit(temp, pageSize, PageGetSpecialSize(page));
memcpy(PageGetSpecialPointer(temp),
PageGetSpecialPointer(page),
PageGetSpecialSize(page));
return temp;
}
/*
* PageRestoreTempPage
* Copy temporary page back to permanent page after special processing
* and release the temporary page.
*/
void
PageRestoreTempPage(Page tempPage, Page oldPage)
{
Size pageSize;
pageSize = PageGetPageSize(tempPage);
memcpy((char *) oldPage, (char *) tempPage, pageSize);
pfree(tempPage);
}
/*
* Tuple defrag support for PageRepairFragmentation and PageIndexMultiDelete
*/
typedef struct itemIdCompactData
{
uint16 offsetindex; /* linp array index */
int16 itemoff; /* page offset of item data */
uint16 alignedlen; /* MAXALIGN(item data len) */
} itemIdCompactData;
typedef itemIdCompactData *itemIdCompact;
/*
* After removing or marking some line pointers unused, move the tuples to
* remove the gaps caused by the removed items and reorder them back into
* reverse line pointer order in the page.
*
* This function can often be fairly hot, so it pays to take some measures to
* make it as optimal as possible.
*
* Callers may pass 'presorted' as true if the 'itemidbase' array is sorted in
* descending order of itemoff. When this is true we can just memmove()
* tuples towards the end of the page. This is quite a common case as it's
* the order that tuples are initially inserted into pages. When we call this
* function to defragment the tuples in the page then any new line pointers
* added to the page will keep that presorted order, so hitting this case is
* still very common for tables that are commonly updated.
*
* When the 'itemidbase' array is not presorted then we're unable to just
* memmove() tuples around freely. Doing so could cause us to overwrite the
* memory belonging to a tuple we've not moved yet. In this case, we copy all
* the tuples that need to be moved into a temporary buffer. We can then
* simply memcpy() out of that temp buffer back into the page at the correct
* location. Tuples are copied back into the page in the same order as the
* 'itemidbase' array, so we end up reordering the tuples back into reverse
* line pointer order. This will increase the chances of hitting the
* presorted case the next time around.
*
* Callers must ensure that nitems is > 0
*/
static void
compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted)
{
PageHeader phdr = (PageHeader) page;
Offset upper;
Offset copy_tail;
Offset copy_head;
itemIdCompact itemidptr;
int i;
/* Code within will not work correctly if nitems == 0 */
Assert(nitems > 0);
if (presorted)
{
#ifdef USE_ASSERT_CHECKING
{
/*
* Verify we've not gotten any new callers that are incorrectly
* passing a true presorted value.
*/
Offset lastoff = phdr->pd_special;
for (i = 0; i < nitems; i++)
{
itemidptr = &itemidbase[i];
Assert(lastoff > itemidptr->itemoff);
lastoff = itemidptr->itemoff;
}
}
#endif /* USE_ASSERT_CHECKING */
/*
* 'itemidbase' is already in the optimal order, i.e, lower item
* pointers have a higher offset. This allows us to memmove() the
* tuples up to the end of the page without having to worry about
* overwriting other tuples that have not been moved yet.
*
* There's a good chance that there are tuples already right at the
* end of the page that we can simply skip over because they're
* already in the correct location within the page. We'll do that
* first...
*/
upper = phdr->pd_special;
i = 0;
do
{
itemidptr = &itemidbase[i];
if (upper != itemidptr->itemoff + itemidptr->alignedlen)
break;
upper -= itemidptr->alignedlen;
i++;
} while (i < nitems);
/*
* Now that we've found the first tuple that needs to be moved, we can
* do the tuple compactification. We try and make the least number of
* memmove() calls and only call memmove() when there's a gap. When
* we see a gap we just move all tuples after the gap up until the
* point of the last move operation.
*/
copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
for (; i < nitems; i++)
{
ItemId lp;
itemidptr = &itemidbase[i];
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
{
memmove((char *) page + upper,
page + copy_head,
copy_tail - copy_head);
/*
* We've now moved all tuples already seen, but not the
* current tuple, so we set the copy_tail to the end of this
* tuple so it can be moved in another iteration of the loop.
*/
copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
}
/* shift the target offset down by the length of this tuple */
upper -= itemidptr->alignedlen;
/* point the copy_head to the start of this tuple */
copy_head = itemidptr->itemoff;
/* update the line pointer to reference the new offset */
lp->lp_off = upper;
}
/* move the remaining tuples. */
memmove((char *) page + upper,
page + copy_head,
copy_tail - copy_head);
}
else
{
PGAlignedBlock scratch;
char *scratchptr = scratch.data;
/*
* Non-presorted case: The tuples in the itemidbase array may be in
* any order. So, in order to move these to the end of the page we
* must make a temp copy of each tuple that needs to be moved before
* we copy them back into the page at the new offset.
*
* If a large percentage of tuples have been pruned (>75%) then we'll
* copy these into the temp buffer tuple-by-tuple, otherwise, we'll
* just do a single memcpy() for all tuples that need to be moved.
* When so many tuples have been removed there's likely to be a lot of
* gaps and it's unlikely that many non-movable tuples remain at the
* end of the page.
*/
if (nitems < PageGetMaxOffsetNumber(page) / 4)
{
i = 0;
do
{
itemidptr = &itemidbase[i];
memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff,
itemidptr->alignedlen);
i++;
} while (i < nitems);
/* Set things up for the compactification code below */
i = 0;
itemidptr = &itemidbase[0];
upper = phdr->pd_special;
}
else
{
upper = phdr->pd_special;
/*
* Many tuples are likely to already be in the correct location.
* There's no need to copy these into the temp buffer. Instead
* we'll just skip forward in the itemidbase array to the position
* that we do need to move tuples from so that the code below just
* leaves these ones alone.
*/
i = 0;
do
{
itemidptr = &itemidbase[i];
if (upper != itemidptr->itemoff + itemidptr->alignedlen)
break;
upper -= itemidptr->alignedlen;
i++;
} while (i < nitems);
/* Copy all tuples that need to be moved into the temp buffer */
memcpy(scratchptr + phdr->pd_upper,
page + phdr->pd_upper,
upper - phdr->pd_upper);
}
/*
* Do the tuple compactification. itemidptr is already pointing to
* the first tuple that we're going to move. Here we collapse the
* memcpy calls for adjacent tuples into a single call. This is done
* by delaying the memcpy call until we find a gap that needs to be
* closed.
*/
copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
for (; i < nitems; i++)
{
ItemId lp;
itemidptr = &itemidbase[i];
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
/* copy pending tuples when we detect a gap */
if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
{
memcpy((char *) page + upper,
scratchptr + copy_head,
copy_tail - copy_head);
/*
* We've now copied all tuples already seen, but not the
* current tuple, so we set the copy_tail to the end of this
* tuple.
*/
copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
}
/* shift the target offset down by the length of this tuple */
upper -= itemidptr->alignedlen;
/* point the copy_head to the start of this tuple */
copy_head = itemidptr->itemoff;
/* update the line pointer to reference the new offset */
lp->lp_off = upper;
}
/* Copy the remaining chunk */
memcpy((char *) page + upper,
scratchptr + copy_head,
copy_tail - copy_head);
}
phdr->pd_upper = upper;
}
/*
* PageRepairFragmentation
*
* Frees fragmented space on a heap page following pruning.
*
* This routine is usable for heap pages only, but see PageIndexMultiDelete.
*
* This routine removes unused line pointers from the end of the line pointer
* array. This is possible when dead heap-only tuples get removed by pruning,
* especially when there were HOT chains with several tuples each beforehand.
*
* Caller had better have a full cleanup lock on page's buffer. As a side
* effect the page's PD_HAS_FREE_LINES hint bit will be set or unset as
* needed. Caller might also need to account for a reduction in the length of
* the line pointer array following array truncation.
*/
void
PageRepairFragmentation(Page page)
{
Offset pd_lower = ((PageHeader) page)->pd_lower;
Offset pd_upper = ((PageHeader) page)->pd_upper;
Offset pd_special = ((PageHeader) page)->pd_special;
Offset last_offset;
itemIdCompactData itemidbase[MaxHeapTuplesPerPage];
itemIdCompact itemidptr;
ItemId lp;
int nline,
nstorage,
nunused;
OffsetNumber finalusedlp = InvalidOffsetNumber;
int i;
Size totallen;
bool presorted = true; /* For now */
/*
* It's worth the trouble to be more paranoid here than in most places,
* because we are about to reshuffle data in (what is usually) a shared
* disk buffer. If we aren't careful then corrupted pointers, lengths,
* etc could cause us to clobber adjacent disk buffers, spreading the data
* loss further. So, check everything.
*/
if (pd_lower < SizeOfPageHeaderData ||
pd_lower > pd_upper ||
pd_upper > pd_special ||
pd_special > BLCKSZ ||
pd_special != MAXALIGN(pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
pd_lower, pd_upper, pd_special)));
/*
* Run through the line pointer array and collect data about live items.
*/
nline = PageGetMaxOffsetNumber(page);
itemidptr = itemidbase;
nunused = totallen = 0;
last_offset = pd_special;
for (i = FirstOffsetNumber; i <= nline; i++)
{
lp = PageGetItemId(page, i);
if (ItemIdIsUsed(lp))
{
if (ItemIdHasStorage(lp))
{
itemidptr->offsetindex = i - 1;
itemidptr->itemoff = ItemIdGetOffset(lp);
if (last_offset > itemidptr->itemoff)
last_offset = itemidptr->itemoff;
else
presorted = false;
if (unlikely(itemidptr->itemoff < (int) pd_upper ||
itemidptr->itemoff >= (int) pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: %u",
itemidptr->itemoff)));
itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
totallen += itemidptr->alignedlen;
itemidptr++;
}
finalusedlp = i; /* Could be the final non-LP_UNUSED item */
}
else
{
/* Unused entries should have lp_len = 0, but make sure */
Assert(!ItemIdHasStorage(lp));
ItemIdSetUnused(lp);
nunused++;
}
}
nstorage = itemidptr - itemidbase;
if (nstorage == 0)
{
/* Page is completely empty, so just reset it quickly */
((PageHeader) page)->pd_upper = pd_special;
}
else
{
/* Need to compact the page the hard way */
if (totallen > (Size) (pd_special - pd_lower))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower)));
compactify_tuples(itemidbase, nstorage, page, presorted);
}
if (finalusedlp != nline)
{
/* The last line pointer is not the last used line pointer */
int nunusedend = nline - finalusedlp;
Assert(nunused >= nunusedend && nunusedend > 0);
/* remove trailing unused line pointers from the count */
nunused -= nunusedend;
/* truncate the line pointer array */
((PageHeader) page)->pd_lower -= (sizeof(ItemIdData) * nunusedend);
}
/* Set hint bit for PageAddItemExtended */
if (nunused > 0)
PageSetHasFreeLinePointers(page);
else
PageClearHasFreeLinePointers(page);
}
/*
* PageTruncateLinePointerArray
*
* Removes unused line pointers at the end of the line pointer array.
*
* This routine is usable for heap pages only. It is called by VACUUM during
* its second pass over the heap. We expect at least one LP_UNUSED line
* pointer on the page (if VACUUM didn't have an LP_DEAD item on the page that
* it just set to LP_UNUSED then it should not call here).
*
* We avoid truncating the line pointer array to 0 items, if necessary by
* leaving behind a single remaining LP_UNUSED item. This is a little
* arbitrary, but it seems like a good idea to avoid leaving a PageIsEmpty()
* page behind.
*
* Caller can have either an exclusive lock or a full cleanup lock on page's
* buffer. The page's PD_HAS_FREE_LINES hint bit will be set or unset based
* on whether or not we leave behind any remaining LP_UNUSED items.
*/
void
PageTruncateLinePointerArray(Page page)
{
PageHeader phdr = (PageHeader) page;
bool countdone = false,
sethint = false;
int nunusedend = 0;
/* Scan line pointer array back-to-front */
for (int i = PageGetMaxOffsetNumber(page); i >= FirstOffsetNumber; i--)
{
ItemId lp = PageGetItemId(page, i);
if (!countdone && i > FirstOffsetNumber)
{
/*
* Still determining which line pointers from the end of the array
* will be truncated away. Either count another line pointer as
* safe to truncate, or notice that it's not safe to truncate
* additional line pointers (stop counting line pointers).
*/
if (!ItemIdIsUsed(lp))
nunusedend++;
else
countdone = true;
}
else
{
/*
* Once we've stopped counting we still need to figure out if
* there are any remaining LP_UNUSED line pointers somewhere more
* towards the front of the array.
*/
if (!ItemIdIsUsed(lp))
{
/*
* This is an unused line pointer that we won't be truncating
* away -- so there is at least one. Set hint on page.
*/
sethint = true;
break;
}
}
}
if (nunusedend > 0)
{
phdr->pd_lower -= sizeof(ItemIdData) * nunusedend;
#ifdef CLOBBER_FREED_MEMORY
memset((char *) page + phdr->pd_lower, 0x7F,
sizeof(ItemIdData) * nunusedend);
#endif
}
else
Assert(sethint);
/* Set hint bit for PageAddItemExtended */
if (sethint)
PageSetHasFreeLinePointers(page);
else
PageClearHasFreeLinePointers(page);
}
/*
* PageGetFreeSpace
* Returns the size of the free (allocatable) space on a page,
* reduced by the space needed for a new line pointer.
*
* Note: this should usually only be used on index pages. Use
* PageGetHeapFreeSpace on heap pages.
*/
Size
PageGetFreeSpace(Page page)
{
int space;
/*
* Use signed arithmetic here so that we behave sensibly if pd_lower >
* pd_upper.
*/
space = (int) ((PageHeader) page)->pd_upper -
(int) ((PageHeader) page)->pd_lower;
if (space < (int) sizeof(ItemIdData))
return 0;
space -= sizeof(ItemIdData);
return (Size) space;
}
/*
* PageGetFreeSpaceForMultipleTuples
* Returns the size of the free (allocatable) space on a page,
* reduced by the space needed for multiple new line pointers.
*
* Note: this should usually only be used on index pages. Use
* PageGetHeapFreeSpace on heap pages.
*/
Size
PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
{
int space;
/*
* Use signed arithmetic here so that we behave sensibly if pd_lower >
* pd_upper.
*/
space = (int) ((PageHeader) page)->pd_upper -
(int) ((PageHeader) page)->pd_lower;
if (space < (int) (ntups * sizeof(ItemIdData)))
return 0;
space -= ntups * sizeof(ItemIdData);
return (Size) space;
}
/*
* PageGetExactFreeSpace
* Returns the size of the free (allocatable) space on a page,
* without any consideration for adding/removing line pointers.
*/
Size
PageGetExactFreeSpace(Page page)
{
int space;
/*
* Use signed arithmetic here so that we behave sensibly if pd_lower >
* pd_upper.
*/
space = (int) ((PageHeader) page)->pd_upper -
(int) ((PageHeader) page)->pd_lower;
if (space < 0)
return 0;
return (Size) space;
}
/*
* PageGetHeapFreeSpace
* Returns the size of the free (allocatable) space on a page,
* reduced by the space needed for a new line pointer.
*
* The difference between this and PageGetFreeSpace is that this will return
* zero if there are already MaxHeapTuplesPerPage line pointers in the page
* and none are free. We use this to enforce that no more than
* MaxHeapTuplesPerPage line pointers are created on a heap page. (Although
* no more tuples than that could fit anyway, in the presence of redirected
* or dead line pointers it'd be possible to have too many line pointers.
* To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
* on the number of line pointers, we make this extra check.)
*/
Size
PageGetHeapFreeSpace(Page page)
{
Size space;
space = PageGetFreeSpace(page);
if (space > 0)
{
OffsetNumber offnum,
nline;
/*
* Are there already MaxHeapTuplesPerPage line pointers in the page?
*/
nline = PageGetMaxOffsetNumber(page);
if (nline >= MaxHeapTuplesPerPage)
{
if (PageHasFreeLinePointers((PageHeader) page))
{
/*
* Since this is just a hint, we must confirm that there is
* indeed a free line pointer
*/
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
{
ItemId lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp))
break;
}
if (offnum > nline)
{
/*
* The hint is wrong, but we can't clear it here since we
* don't have the ability to mark the page dirty.
*/
space = 0;
}
}
else
{
/*
* Although the hint might be wrong, PageAddItem will believe
* it anyway, so we must believe it too.
*/
space = 0;
}
}
}
return space;
}
/*
* PageIndexTupleDelete
*
* This routine does the work of removing a tuple from an index page.
*
* Unlike heap pages, we compact out the line pointer for the removed tuple.
*/
void
PageIndexTupleDelete(Page page, OffsetNumber offnum)
{
PageHeader phdr = (PageHeader) page;
char *addr;
ItemId tup;
Size size;
unsigned offset;
int nbytes;
int offidx;
int nline;
/*
* As with PageRepairFragmentation, paranoia seems justified.
*/
if (phdr->pd_lower < SizeOfPageHeaderData ||
phdr->pd_lower > phdr->pd_upper ||
phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ ||
phdr->pd_special != MAXALIGN(phdr->pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
nline = PageGetMaxOffsetNumber(page);
if ((int) offnum <= 0 || (int) offnum > nline)
elog(ERROR, "invalid index offnum: %u", offnum);
/* change offset number to offset index */
offidx = offnum - 1;
tup = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(tup));
size = ItemIdGetLength(tup);
offset = ItemIdGetOffset(tup);
if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
offset != MAXALIGN(offset))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: offset = %u, size = %u",
offset, (unsigned int) size)));
/* Amount of space to actually be deleted */
size = MAXALIGN(size);
/*
* First, we want to get rid of the pd_linp entry for the index tuple. We
* copy all subsequent linp's back one slot in the array. We don't use
* PageGetItemId, because we are manipulating the _array_, not individual
* linp's.
*/
nbytes = phdr->pd_lower -
((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
if (nbytes > 0)
memmove((char *) &(phdr->pd_linp[offidx]),
(char *) &(phdr->pd_linp[offidx + 1]),
nbytes);
/*
* Now move everything between the old upper bound (beginning of tuple
* space) and the beginning of the deleted tuple forward, so that space in
* the middle of the page is left free. If we've just deleted the tuple
* at the beginning of tuple space, then there's no need to do the copy.
*/
/* beginning of tuple space */
addr = (char *) page + phdr->pd_upper;
if (offset > phdr->pd_upper)
memmove(addr + size, addr, offset - phdr->pd_upper);
/* adjust free space boundary pointers */
phdr->pd_upper += size;
phdr->pd_lower -= sizeof(ItemIdData);
/*
* Finally, we need to adjust the linp entries that remain.
*
* Anything that used to be before the deleted tuple's data was moved
* forward by the size of the deleted tuple.
*/
if (!PageIsEmpty(page))
{
int i;
nline--; /* there's one less than when we started */
for (i = 1; i <= nline; i++)
{
ItemId ii = PageGetItemId(phdr, i);
Assert(ItemIdHasStorage(ii));
if (ItemIdGetOffset(ii) <= offset)
ii->lp_off += size;
}
}
}
/*
* PageIndexMultiDelete
*
* This routine handles the case of deleting multiple tuples from an
* index page at once. It is considerably faster than a loop around
* PageIndexTupleDelete ... however, the caller *must* supply the array
* of item numbers to be deleted in item number order!
*/
void
PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
{
PageHeader phdr = (PageHeader) page;
Offset pd_lower = phdr->pd_lower;
Offset pd_upper = phdr->pd_upper;
Offset pd_special = phdr->pd_special;
Offset last_offset;
itemIdCompactData itemidbase[MaxIndexTuplesPerPage];
ItemIdData newitemids[MaxIndexTuplesPerPage];
itemIdCompact itemidptr;
ItemId lp;
int nline,
nused;
Size totallen;
Size size;
unsigned offset;
int nextitm;
OffsetNumber offnum;
bool presorted = true; /* For now */
Assert(nitems <= MaxIndexTuplesPerPage);
/*
* If there aren't very many items to delete, then retail
* PageIndexTupleDelete is the best way. Delete the items in reverse
* order so we don't have to think about adjusting item numbers for
* previous deletions.
*
* TODO: tune the magic number here
*/
if (nitems <= 2)
{
while (--nitems >= 0)
PageIndexTupleDelete(page, itemnos[nitems]);
return;
}
/*
* As with PageRepairFragmentation, paranoia seems justified.
*/
if (pd_lower < SizeOfPageHeaderData ||
pd_lower > pd_upper ||
pd_upper > pd_special ||
pd_special > BLCKSZ ||
pd_special != MAXALIGN(pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
pd_lower, pd_upper, pd_special)));
/*
* Scan the line pointer array and build a list of just the ones we are
* going to keep. Notice we do not modify the page yet, since we are
* still validity-checking.
*/
nline = PageGetMaxOffsetNumber(page);
itemidptr = itemidbase;
totallen = 0;
nused = 0;
nextitm = 0;
last_offset = pd_special;
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
{
lp = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(lp));
size = ItemIdGetLength(lp);
offset = ItemIdGetOffset(lp);
if (offset < pd_upper ||
(offset + size) > pd_special ||
offset != MAXALIGN(offset))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: offset = %u, size = %u",
offset, (unsigned int) size)));
if (nextitm < nitems && offnum == itemnos[nextitm])
{
/* skip item to be deleted */
nextitm++;
}
else
{
itemidptr->offsetindex = nused; /* where it will go */
itemidptr->itemoff = offset;
if (last_offset > itemidptr->itemoff)
last_offset = itemidptr->itemoff;
else
presorted = false;
itemidptr->alignedlen = MAXALIGN(size);
totallen += itemidptr->alignedlen;
newitemids[nused] = *lp;
itemidptr++;
nused++;
}
}
/* this will catch invalid or out-of-order itemnos[] */
if (nextitm != nitems)
elog(ERROR, "incorrect index offsets supplied");
if (totallen > (Size) (pd_special - pd_lower))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower)));
/*
* Looks good. Overwrite the line pointers with the copy, from which we've
* removed all the unused items.
*/
memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
/* and compactify the tuple data */
if (nused > 0)
compactify_tuples(itemidbase, nused, page, presorted);
else
phdr->pd_upper = pd_special;
}
/*
* PageIndexTupleDeleteNoCompact
*
* Remove the specified tuple from an index page, but set its line pointer
* to "unused" instead of compacting it out, except that it can be removed
* if it's the last line pointer on the page.
*
* This is used for index AMs that require that existing TIDs of live tuples
* remain unchanged, and are willing to allow unused line pointers instead.
*/
void
PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
{
PageHeader phdr = (PageHeader) page;
char *addr;
ItemId tup;
Size size;
unsigned offset;
int nline;
/*
* As with PageRepairFragmentation, paranoia seems justified.
*/
if (phdr->pd_lower < SizeOfPageHeaderData ||
phdr->pd_lower > phdr->pd_upper ||
phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ ||
phdr->pd_special != MAXALIGN(phdr->pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
nline = PageGetMaxOffsetNumber(page);
if ((int) offnum <= 0 || (int) offnum > nline)
elog(ERROR, "invalid index offnum: %u", offnum);
tup = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(tup));
size = ItemIdGetLength(tup);
offset = ItemIdGetOffset(tup);
if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
offset != MAXALIGN(offset))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: offset = %u, size = %u",
offset, (unsigned int) size)));
/* Amount of space to actually be deleted */
size = MAXALIGN(size);
/*
* Either set the line pointer to "unused", or zap it if it's the last
* one. (Note: it's possible that the next-to-last one(s) are already
* unused, but we do not trouble to try to compact them out if so.)
*/
if ((int) offnum < nline)
ItemIdSetUnused(tup);
else
{
phdr->pd_lower -= sizeof(ItemIdData);
nline--; /* there's one less than when we started */
}
/*
* Now move everything between the old upper bound (beginning of tuple
* space) and the beginning of the deleted tuple forward, so that space in
* the middle of the page is left free. If we've just deleted the tuple
* at the beginning of tuple space, then there's no need to do the copy.
*/
/* beginning of tuple space */
addr = (char *) page + phdr->pd_upper;
if (offset > phdr->pd_upper)
memmove(addr + size, addr, offset - phdr->pd_upper);
/* adjust free space boundary pointer */
phdr->pd_upper += size;
/*
* Finally, we need to adjust the linp entries that remain.
*
* Anything that used to be before the deleted tuple's data was moved
* forward by the size of the deleted tuple.
*/
if (!PageIsEmpty(page))
{
int i;
for (i = 1; i <= nline; i++)
{
ItemId ii = PageGetItemId(phdr, i);
if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
ii->lp_off += size;
}
}
}
/*
* PageIndexTupleOverwrite
*
* Replace a specified tuple on an index page.
*
* The new tuple is placed exactly where the old one had been, shifting
* other tuples' data up or down as needed to keep the page compacted.
* This is better than deleting and reinserting the tuple, because it
* avoids any data shifting when the tuple size doesn't change; and
* even when it does, we avoid moving the line pointers around.
* This could be used by an index AM that doesn't want to unset the
* LP_DEAD bit when it happens to be set. It could conceivably also be
* used by an index AM that cares about the physical order of tuples as
* well as their logical/ItemId order.
*
* If there's insufficient space for the new tuple, return false. Other
* errors represent data-corruption problems, so we just elog.
*/
bool
PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
Item newtup, Size newsize)
{
PageHeader phdr = (PageHeader) page;
ItemId tupid;
int oldsize;
unsigned offset;
Size alignednewsize;
int size_diff;
int itemcount;
/*
* As with PageRepairFragmentation, paranoia seems justified.
*/
if (phdr->pd_lower < SizeOfPageHeaderData ||
phdr->pd_lower > phdr->pd_upper ||
phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ ||
phdr->pd_special != MAXALIGN(phdr->pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
itemcount = PageGetMaxOffsetNumber(page);
if ((int) offnum <= 0 || (int) offnum > itemcount)
elog(ERROR, "invalid index offnum: %u", offnum);
tupid = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(tupid));
oldsize = ItemIdGetLength(tupid);
offset = ItemIdGetOffset(tupid);
if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
offset != MAXALIGN(offset))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: offset = %u, size = %u",
offset, (unsigned int) oldsize)));
/*
* Determine actual change in space requirement, check for page overflow.
*/
oldsize = MAXALIGN(oldsize);
alignednewsize = MAXALIGN(newsize);
if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
return false;
/*
* Relocate existing data and update line pointers, unless the new tuple
* is the same size as the old (after alignment), in which case there's
* nothing to do. Notice that what we have to relocate is data before the
* target tuple, not data after, so it's convenient to express size_diff
* as the amount by which the tuple's size is decreasing, making it the
* delta to add to pd_upper and affected line pointers.
*/
size_diff = oldsize - (int) alignednewsize;
if (size_diff != 0)
{
char *addr = (char *) page + phdr->pd_upper;
int i;
/* relocate all tuple data before the target tuple */
memmove(addr + size_diff, addr, offset - phdr->pd_upper);
/* adjust free space boundary pointer */
phdr->pd_upper += size_diff;
/* adjust affected line pointers too */
for (i = FirstOffsetNumber; i <= itemcount; i++)
{
ItemId ii = PageGetItemId(phdr, i);
/* Allow items without storage; currently only BRIN needs that */
if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
ii->lp_off += size_diff;
}
}
/* Update the item's tuple length without changing its lp_flags field */
tupid->lp_off = offset + size_diff;
tupid->lp_len = newsize;
/* Copy new tuple data onto page */
memcpy(PageGetItem(page, tupid), newtup, newsize);
return true;
}
/*
* Set checksum for a page in shared buffers.
*
* If checksums are disabled, or if the page is not initialized, just return
* the input. Otherwise, we must make a copy of the page before calculating
* the checksum, to prevent concurrent modifications (e.g. setting hint bits)
* from making the final checksum invalid. It doesn't matter if we include or
* exclude hints during the copy, as long as we write a valid page and
* associated checksum.
*
* Returns a pointer to the block-sized data that needs to be written. Uses
* statically-allocated memory, so the caller must immediately write the
* returned page and not refer to it again.
*/
char *
PageSetChecksumCopy(Page page, BlockNumber blkno)
{
static char *pageCopy = NULL;
/* If we don't need a checksum, just return the passed-in data */
if (PageIsNew(page) || !DataChecksumsEnabled())
return (char *) page;
/*
* We allocate the copy space once and use it over on each subsequent
* call. The point of palloc'ing here, rather than having a static char
* array, is first to ensure adequate alignment for the checksumming code
* and second to avoid wasting space in processes that never call this.
*/
if (pageCopy == NULL)
pageCopy = MemoryContextAlloc(TopMemoryContext, BLCKSZ);
memcpy(pageCopy, (char *) page, BLCKSZ);
((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
return pageCopy;
}
/*
* Set checksum for a page in private memory.
*
* This must only be used when we know that no other process can be modifying
* the page buffer.
*/
void
PageSetChecksumInplace(Page page, BlockNumber blkno)
{
/* If we don't need a checksum, just return */
if (PageIsNew(page) || !DataChecksumsEnabled())
return;
((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
}