Fix COPY FREEZE with CLOBBER_CACHE_ALWAYS

This adds code omitted from commit 7db0cd2145 by accident, which had
two consequences. Firstly, only rows inserted by heap_multi_insert were
frozen as expected when running COPY FREEZE, while heap_insert left
rows unfrozen. That however includes rows in TOAST tables, so a lot of
data might have been left unfrozen. Secondly, page might have been left
partially empty after relcache invalidation.

This addresses both of those issues.

Discussion: https://postgr.es/m/CABOikdN-ptGv0mZntrK2Q8OtfUuAjqaYMGmkdU1dCKFtUxVLrg@mail.gmail.com
This commit is contained in:
Tomas Vondra 2021-01-24 00:24:50 +01:00
parent 183bbd1b6d
commit 39b66a91bd
2 changed files with 88 additions and 12 deletions

View File

@ -1880,8 +1880,12 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
TransactionId xid = GetCurrentTransactionId();
HeapTuple heaptup;
Buffer buffer;
Page page = NULL;
Buffer vmbuffer = InvalidBuffer;
bool starting_with_empty_page;
bool all_visible_cleared = false;
bool all_frozen_set = false;
uint8 vmstatus = 0;
/*
* Fill in tuple header fields and toast the tuple if necessary.
@ -1894,11 +1898,36 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
/*
* Find buffer to insert this tuple into. If the page is all visible,
* this will also pin the requisite visibility map page.
*
* Also pin visibility map page if COPY FREEZE inserts tuples into an
* empty page. See all_frozen_set below.
*/
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate,
&vmbuffer, NULL);
/*
* If we're inserting frozen entry into an empty page,
* set visibility map bits and PageAllVisible() hint.
*
* If we're inserting frozen entry into already all_frozen page,
* preserve this state.
*/
if (options & HEAP_INSERT_FROZEN)
{
page = BufferGetPage(buffer);
starting_with_empty_page = PageGetMaxOffsetNumber(page) == 0;
if (visibilitymap_pin_ok(BufferGetBlockNumber(buffer), vmbuffer))
vmstatus = visibilitymap_get_status(relation,
BufferGetBlockNumber(buffer), &vmbuffer);
if ((starting_with_empty_page || vmstatus & VISIBILITYMAP_ALL_FROZEN))
all_frozen_set = true;
}
/*
* We're about to do the actual insert -- but check for conflict first, to
* avoid possibly having to roll back work we've just done.
@ -1922,7 +1951,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
RelationPutHeapTuple(relation, buffer, heaptup,
(options & HEAP_INSERT_SPECULATIVE) != 0);
if (PageIsAllVisible(BufferGetPage(buffer)))
/*
* If the page is all visible, need to clear that, unless we're only
* going to add further frozen rows to it.
*
* If we're only adding already frozen rows to a page that was empty or
* marked as all visible, mark it as all-visible.
*/
if (PageIsAllVisible(BufferGetPage(buffer)) && !(options & HEAP_INSERT_FROZEN))
{
all_visible_cleared = true;
PageClearAllVisible(BufferGetPage(buffer));
@ -1930,6 +1966,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
ItemPointerGetBlockNumber(&(heaptup->t_self)),
vmbuffer, VISIBILITYMAP_VALID_BITS);
}
else if (all_frozen_set)
{
/* We only ever set all_frozen_set after reading the page. */
Assert(page);
PageSetAllVisible(page);
}
/*
* XXX Should we set PageSetPrunable on this page ?
@ -1977,6 +2020,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
xlrec.flags = 0;
if (all_visible_cleared)
xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
if (all_frozen_set)
xlrec.flags = XLH_INSERT_ALL_FROZEN_SET;
if (options & HEAP_INSERT_SPECULATIVE)
xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
@ -2025,6 +2070,29 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
END_CRIT_SECTION();
/*
* If we've frozen everything on the page, update the visibilitymap.
* We're already holding pin on the vmbuffer.
*
* No need to update the visibilitymap if it had all_frozen bit set
* before this insertion.
*/
if (all_frozen_set && ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0))
{
Assert(PageIsAllVisible(page));
Assert(visibilitymap_pin_ok(BufferGetBlockNumber(buffer), vmbuffer));
/*
* It's fine to use InvalidTransactionId here - this is only used
* when HEAP_INSERT_FROZEN is specified, which intentionally
* violates visibility rules.
*/
visibilitymap_set(relation, BufferGetBlockNumber(buffer), buffer,
InvalidXLogRecPtr, vmbuffer,
InvalidTransactionId,
VISIBILITYMAP_ALL_VISIBLE | VISIBILITYMAP_ALL_FROZEN);
}
UnlockReleaseBuffer(buffer);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
@ -8708,6 +8776,10 @@ heap_xlog_insert(XLogReaderState *record)
ItemPointerSetBlockNumber(&target_tid, blkno);
ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/* check that the mutually exclusive flags are not both set */
Assert (!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
(xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
@ -8925,6 +8997,10 @@ heap_xlog_multi_insert(XLogReaderState *record)
if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
PageSetAllVisible(page);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))

View File

@ -396,19 +396,19 @@ RelationGetBufferForTuple(Relation relation, Size len,
* target.
*/
targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
}
/*
* If the FSM knows nothing of the rel, try the last page before we
* give up and extend. This avoids one-tuple-per-page syndrome during
* bootstrapping or in a recently-started system.
*/
if (targetBlock == InvalidBlockNumber)
{
BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
/*
* If the FSM knows nothing of the rel, try the last page before we
* give up and extend. This avoids one-tuple-per-page syndrome during
* bootstrapping or in a recently-started system.
*/
if (targetBlock == InvalidBlockNumber)
{
BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
if (nblocks > 0)
targetBlock = nblocks - 1;
}
if (nblocks > 0)
targetBlock = nblocks - 1;
}
loop: