More refactoring of heapgettup() and heapgettup_pagemode()

Here we further simplify the code in heapgettup() and
heapgettup_pagemode() to make better use of the helper functions added in
the previous recent refactors in this area.

In passing, remove an unneeded cast added in 8ca6d49f6.

Author: Melanie Plageman
Reviewed-by: Andres Freund, David Rowley
Discussion: https://postgr.es/m/CAAKRu_YSOnhKsDyFcqJsKtBSrd32DP-jjXmv7hL0BPD-z0TGXQ@mail.gmail.com
This commit is contained in:
David Rowley 2023-02-07 17:24:07 +13:00
parent 9ba37b2cb6
commit cfcf56f923
1 changed files with 79 additions and 170 deletions

View File

@ -567,7 +567,7 @@ heapgettup_start_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
*linesleft = PageGetMaxOffsetNumber((Page) page) - FirstOffsetNumber + 1; *linesleft = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1;
if (ScanDirectionIsForward(dir)) if (ScanDirectionIsForward(dir))
*lineoff = FirstOffsetNumber; *lineoff = FirstOffsetNumber;
@ -732,34 +732,17 @@ heapgettup(HeapScanDesc scan,
ScanKey key) ScanKey key)
{ {
HeapTuple tuple = &(scan->rs_ctup); HeapTuple tuple = &(scan->rs_ctup);
bool backward = ScanDirectionIsBackward(dir);
BlockNumber block; BlockNumber block;
Page page; Page page;
OffsetNumber lineoff; OffsetNumber lineoff;
int linesleft; int linesleft;
ItemId lpp;
if (unlikely(!scan->rs_inited)) if (unlikely(!scan->rs_inited))
{ {
block = heapgettup_initial_block(scan, dir); block = heapgettup_initial_block(scan, dir);
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
/* Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
* Check if we have reached the end of the scan already. This could
* happen if the table is empty or if the parallel workers have
* already finished the scan before we did anything ourselves
*/
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
scan->rs_inited = true; scan->rs_inited = true;
heapgetpage((TableScanDesc) scan, block);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
} }
else else
{ {
@ -768,15 +751,20 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff); page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff);
goto continue_page;
} }
/* /*
* advance the scan until we find a qualifying tuple or run out of stuff * advance the scan until we find a qualifying tuple or run out of stuff
* to scan * to scan
*/ */
lpp = PageGetItemId(page, lineoff); while (block != InvalidBlockNumber)
for (;;)
{ {
heapgetpage((TableScanDesc) scan, block);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
continue_page:
/* /*
* Only continue scanning the page while we have lines left. * Only continue scanning the page while we have lines left.
* *
@ -784,53 +772,39 @@ heapgettup(HeapScanDesc scan,
* PageGetMaxOffsetNumber(); both for forward scans when we resume the * PageGetMaxOffsetNumber(); both for forward scans when we resume the
* table scan, and for when we start scanning a new page. * table scan, and for when we start scanning a new page.
*/ */
while (linesleft > 0) for (; linesleft > 0; linesleft--, lineoff += dir)
{ {
if (ItemIdIsNormal(lpp)) bool visible;
{ ItemId lpp = PageGetItemId(page, lineoff);
bool valid;
tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); if (!ItemIdIsNormal(lpp))
tuple->t_len = ItemIdGetLength(lpp); continue;
ItemPointerSet(&(tuple->t_self), block, lineoff);
/* tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
* if current tuple qualifies, return it. tuple->t_len = ItemIdGetLength(lpp);
*/ ItemPointerSet(&(tuple->t_self), block, lineoff);
valid = HeapTupleSatisfiesVisibility(tuple,
scan->rs_base.rs_snapshot,
scan->rs_cbuf);
HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, visible = HeapTupleSatisfiesVisibility(tuple,
tuple, scan->rs_cbuf, scan->rs_base.rs_snapshot,
scan->rs_base.rs_snapshot); scan->rs_cbuf);
if (valid && key != NULL) HeapCheckForSerializableConflictOut(visible, scan->rs_base.rs_rd,
valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), tuple, scan->rs_cbuf,
nkeys, key); scan->rs_base.rs_snapshot);
if (valid) /* skip tuples not visible to this snapshot */
{ if (!visible)
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); continue;
scan->rs_coffset = lineoff;
return;
}
}
/* /* skip any tuples that don't match the scan key */
* otherwise move to the next item on the page if (key != NULL &&
*/ !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
--linesleft; nkeys, key))
if (backward) continue;
{
--lpp; /* move back in this page's ItemId array */ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
--lineoff; scan->rs_coffset = lineoff;
} return;
else
{
++lpp; /* move forward in this page's ItemId array */
++lineoff;
}
} }
/* /*
@ -841,40 +815,16 @@ heapgettup(HeapScanDesc scan,
/* get the BlockNumber to scan next */ /* get the BlockNumber to scan next */
block = heapgettup_advance_block(scan, block, dir); block = heapgettup_advance_block(scan, block, dir);
/*
* return NULL if we've exhausted all the pages
*/
if (block == InvalidBlockNumber)
{
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
return;
}
heapgetpage((TableScanDesc) scan, block);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd,
page);
linesleft = PageGetMaxOffsetNumber(page);
if (backward)
{
lineoff = linesleft;
lpp = PageGetItemId(page, linesleft);
}
else
{
lineoff = FirstOffsetNumber;
lpp = PageGetItemId(page, FirstOffsetNumber);
}
} }
/* end of scan */
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
} }
/* ---------------- /* ----------------
@ -897,35 +847,16 @@ heapgettup_pagemode(HeapScanDesc scan,
ScanKey key) ScanKey key)
{ {
HeapTuple tuple = &(scan->rs_ctup); HeapTuple tuple = &(scan->rs_ctup);
bool backward = ScanDirectionIsBackward(dir);
BlockNumber block; BlockNumber block;
Page page; Page page;
int lineindex; int lineindex;
OffsetNumber lineoff;
int linesleft; int linesleft;
ItemId lpp;
if (unlikely(!scan->rs_inited)) if (unlikely(!scan->rs_inited))
{ {
block = heapgettup_initial_block(scan, dir); block = heapgettup_initial_block(scan, dir);
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
/* Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
* Check if we have reached the end of the scan already. This could
* happen if the table is empty or if the other workers in a parallel
* scan have already finished the scan.
*/
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
linesleft = scan->rs_ntuples;
lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
scan->rs_inited = true; scan->rs_inited = true;
} }
else else
@ -940,16 +871,31 @@ heapgettup_pagemode(HeapScanDesc scan,
linesleft = scan->rs_ntuples - lineindex; linesleft = scan->rs_ntuples - lineindex;
else else
linesleft = scan->rs_cindex; linesleft = scan->rs_cindex;
/* lineindex now references the next or previous visible tid */
goto continue_page;
} }
/* /*
* advance the scan until we find a qualifying tuple or run out of stuff * advance the scan until we find a qualifying tuple or run out of stuff
* to scan * to scan
*/ */
for (;;) while (block != InvalidBlockNumber)
{ {
while (linesleft > 0) heapgetpage((TableScanDesc) scan, block);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
linesleft = scan->rs_ntuples;
lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
/* lineindex now references the next or previous visible tid */
continue_page:
for (; linesleft > 0; linesleft--, lineindex += dir)
{ {
ItemId lpp;
OffsetNumber lineoff;
lineoff = scan->rs_vistuples[lineindex]; lineoff = scan->rs_vistuples[lineindex];
lpp = PageGetItemId(page, lineoff); lpp = PageGetItemId(page, lineoff);
Assert(ItemIdIsNormal(lpp)); Assert(ItemIdIsNormal(lpp));
@ -958,64 +904,27 @@ heapgettup_pagemode(HeapScanDesc scan,
tuple->t_len = ItemIdGetLength(lpp); tuple->t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(tuple->t_self), block, lineoff); ItemPointerSet(&(tuple->t_self), block, lineoff);
/* /* skip any tuples that don't match the scan key */
* if current tuple qualifies, return it. if (key != NULL &&
*/ !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
if (key != NULL) nkeys, key))
{ continue;
bool valid;
valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), scan->rs_cindex = lineindex;
nkeys, key); return;
if (valid)
{
scan->rs_cindex = lineindex;
return;
}
}
else
{
scan->rs_cindex = lineindex;
return;
}
/*
* otherwise move to the next item on the page
*/
--linesleft;
if (backward)
--lineindex;
else
++lineindex;
} }
/* get the BlockNumber to scan next */ /* get the BlockNumber to scan next */
block = heapgettup_advance_block(scan, block, dir); block = heapgettup_advance_block(scan, block, dir);
/*
* return NULL if we've exhausted all the pages
*/
if (block == InvalidBlockNumber)
{
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
return;
}
heapgetpage((TableScanDesc) scan, block);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
linesleft = scan->rs_ntuples;
if (backward)
lineindex = linesleft - 1;
else
lineindex = 0;
} }
/* end of scan */
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
scan->rs_cbuf = InvalidBuffer;
scan->rs_cblock = InvalidBlockNumber;
tuple->t_data = NULL;
scan->rs_inited = false;
} }