Further refactor of heapgettup and heapgettup_pagemode

Backward and forward scans share much of the same page acquisition code.
Here we consolidate that code to reduce some duplication.

Additionally, add a new rs_coffset field to HeapScanDescData to track the
offset of the current tuple.  The new field fits nicely into the padding
between a bool and BlockNumber field and saves having to look at the last
returned tuple to figure out which offset we should be looking at for the
current tuple.

Author: Melanie Plageman
Reviewed-by: David Rowley
Discussion: https://postgr.es/m/CAAKRu_bvkhka0CZQun28KTqhuUh5ZqY=_T8QEqZqOL02rpi2bw@mail.gmail.com
This commit is contained in:
David Rowley 2023-02-03 11:48:39 +13:00
parent cdf6518ef0
commit f9bc34fcb6
2 changed files with 64 additions and 139 deletions

View File

@ -576,102 +576,68 @@ heapgettup(HeapScanDesc scan,
BlockNumber block; BlockNumber block;
bool finished; bool finished;
Page page; Page page;
int lines;
OffsetNumber lineoff; OffsetNumber lineoff;
int linesleft; int linesleft;
ItemId lpp; ItemId lpp;
if (unlikely(!scan->rs_inited))
{
block = heapgettup_initial_block(scan, dir);
/* /*
* calculate next starting lineoff, given scan direction * Check if we have reached the end of the scan already. This could
* happen if the table is empty or if the parallel workers have
* already finished the scan before we did anything ourselves
*/ */
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
linesleft = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1;
if (ScanDirectionIsForward(dir))
lineoff = FirstOffsetNumber; /* first offnum */
else
lineoff = (OffsetNumber) linesleft;
scan->rs_inited = true;
}
else
{
/* continue from previously returned page/tuple */
block = scan->rs_cblock;
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
if (ScanDirectionIsForward(dir)) if (ScanDirectionIsForward(dir))
{ {
if (!scan->rs_inited) lineoff = OffsetNumberNext(scan->rs_coffset);
{ linesleft = PageGetMaxOffsetNumber(page) - lineoff + 1;
block = heapgettup_initial_block(scan, dir);
/*
* Check if we have reached the end of the scan already. This
* could happen if the table is empty or if the parallel workers
* have already finished the scan before we did anything ourselves
*/
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
} }
else else
{ {
/* continue from previously returned page/tuple */
block = scan->rs_cblock; /* current page */
lineoff = /* next offnum */
OffsetNumberNext(ItemPointerGetOffsetNumber(&(tuple->t_self)));
}
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
lines = PageGetMaxOffsetNumber(page);
/* block and lineoff now reference the physically next tid */
linesleft = lines - lineoff + 1;
}
else
{
if (!scan->rs_inited)
{
block = heapgettup_initial_block(scan, dir);
/*
* Check if we have reached the end of the scan already. This
* could happen if the table is empty.
*/
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
lines = PageGetMaxOffsetNumber(page);
lineoff = lines; /* final offnum */
scan->rs_inited = true;
}
else
{
/* continue from previously returned page/tuple */
block = scan->rs_cblock; /* current page */
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
lines = PageGetMaxOffsetNumber(page);
/* /*
* The previous returned tuple may have been vacuumed since the * The previous returned tuple may have been vacuumed since the
* previous scan when we use a non-MVCC snapshot, so we must * previous scan when we use a non-MVCC snapshot, so we must
* re-establish the lineoff <= PageGetMaxOffsetNumber(page) * re-establish the lineoff <= PageGetMaxOffsetNumber(page)
* invariant * invariant
*/ */
lineoff = /* previous offnum */ lineoff = Min(PageGetMaxOffsetNumber(page),
Min(lines, OffsetNumberPrev(scan->rs_coffset));
OffsetNumberPrev(ItemPointerGetOffsetNumber(&(tuple->t_self))));
}
/* block and lineoff now reference the physically previous tid */
linesleft = lineoff; linesleft = lineoff;
} }
}
/* /*
* advance the scan until we find a qualifying tuple or run out of stuff * advance the scan until we find a qualifying tuple or run out of stuff
@ -715,6 +681,7 @@ heapgettup(HeapScanDesc scan,
if (valid) if (valid)
{ {
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
scan->rs_coffset = lineoff;
return; return;
} }
} }
@ -807,12 +774,11 @@ heapgettup(HeapScanDesc scan,
page = BufferGetPage(scan->rs_cbuf); page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page); TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
lines = PageGetMaxOffsetNumber(page); linesleft = PageGetMaxOffsetNumber(page);
linesleft = lines;
if (backward) if (backward)
{ {
lineoff = lines; lineoff = linesleft;
lpp = PageGetItemId(page, lines); lpp = PageGetItemId(page, linesleft);
} }
else else
{ {
@ -846,87 +812,46 @@ heapgettup_pagemode(HeapScanDesc scan,
BlockNumber block; BlockNumber block;
bool finished; bool finished;
Page page; Page page;
int lines;
int lineindex; int lineindex;
OffsetNumber lineoff; OffsetNumber lineoff;
int linesleft; int linesleft;
ItemId lpp; ItemId lpp;
if (unlikely(!scan->rs_inited))
{
block = heapgettup_initial_block(scan, dir);
/* /*
* calculate next starting lineindex, given scan direction * Check if we have reached the end of the scan already. This could
* happen if the table is empty or if the other workers in a parallel
* scan have already finished the scan.
*/ */
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
linesleft = scan->rs_ntuples;
lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
scan->rs_inited = true;
}
else
{
/* continue from previously returned page/tuple */
block = scan->rs_cblock; /* current page */
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
lineindex = scan->rs_cindex + dir;
if (ScanDirectionIsForward(dir)) if (ScanDirectionIsForward(dir))
{ linesleft = scan->rs_ntuples - lineindex;
if (!scan->rs_inited)
{
block = heapgettup_initial_block(scan, dir);
/*
* Check if we have reached the end of the scan already. This
* could happen if the table is empty or if the parallel workers
* have already finished the scan before we did anything ourselves
*/
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
lineindex = 0;
scan->rs_inited = true;
}
else else
{ linesleft = scan->rs_cindex;
/* continue from previously returned page/tuple */
block = scan->rs_cblock; /* current page */
lineindex = scan->rs_cindex + 1;
}
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
lines = scan->rs_ntuples;
/* block and lineindex now reference the next visible tid */
linesleft = lines - lineindex;
}
else
{
if (!scan->rs_inited)
{
block = heapgettup_initial_block(scan, dir);
/*
* Check if we have reached the end of the scan already. This
* could happen if the table is empty.
*/
if (block == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
heapgetpage((TableScanDesc) scan, block);
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
lines = scan->rs_ntuples;
lineindex = lines - 1;
scan->rs_inited = true;
}
else
{
/* continue from previously returned page/tuple */
block = scan->rs_cblock; /* current page */
page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
lines = scan->rs_ntuples;
lineindex = scan->rs_cindex - 1;
}
/* block and lineindex now reference the previous visible tid */
linesleft = lineindex + 1;
} }
/* /*
@ -1041,10 +966,9 @@ heapgettup_pagemode(HeapScanDesc scan,
page = BufferGetPage(scan->rs_cbuf); page = BufferGetPage(scan->rs_cbuf);
TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
lines = scan->rs_ntuples; linesleft = scan->rs_ntuples;
linesleft = lines;
if (backward) if (backward)
lineindex = lines - 1; lineindex = linesleft - 1;
else else
lineindex = 0; lineindex = 0;
} }

View File

@ -57,6 +57,7 @@ typedef struct HeapScanDescData
/* scan current state */ /* scan current state */
bool rs_inited; /* false = scan not init'd yet */ bool rs_inited; /* false = scan not init'd yet */
OffsetNumber rs_coffset; /* current offset # in non-page-at-a-time mode */
BlockNumber rs_cblock; /* current block # in scan, if any */ BlockNumber rs_cblock; /* current block # in scan, if any */
Buffer rs_cbuf; /* current buffer in scan, if any */ Buffer rs_cbuf; /* current buffer in scan, if any */
/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */