Secondary refactor of heap scanning functions
Similar to 44086b097
, refactor heap scanning functions to be more
suitable for the read stream API.
Author: Melanie Plageman
Discussion: https://postgr.es/m/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg=gEQ@mail.gmail.com
This commit is contained in:
parent
2a217c3717
commit
3a4a3537a9
|
@ -83,6 +83,11 @@ static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
|
|||
static bool heap_acquire_tuplock(Relation relation, ItemPointer tid,
|
||||
LockTupleMode mode, LockWaitPolicy wait_policy,
|
||||
bool *have_tuple_lock);
|
||||
static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan,
|
||||
BlockNumber block,
|
||||
ScanDirection dir);
|
||||
static pg_noinline BlockNumber heapgettup_initial_block(HeapScanDesc scan,
|
||||
ScanDirection dir);
|
||||
static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
|
||||
uint16 old_infomask2, TransactionId add_to_xmax,
|
||||
LockTupleMode mode, bool is_update,
|
||||
|
@ -455,16 +460,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
|
|||
}
|
||||
|
||||
/*
|
||||
* heapfetchbuf - read and pin the given MAIN_FORKNUM block number.
|
||||
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
|
||||
*
|
||||
* Read the specified block of the scan relation into a buffer and pin that
|
||||
* buffer before saving it in the scan descriptor.
|
||||
* Read the next block of the scan relation into a buffer and pin that buffer
|
||||
* before saving it in the scan descriptor.
|
||||
*/
|
||||
static inline void
|
||||
heapfetchbuf(HeapScanDesc scan, BlockNumber block)
|
||||
heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
|
||||
{
|
||||
Assert(block < scan->rs_nblocks);
|
||||
|
||||
/* release previous scan buffer, if any */
|
||||
if (BufferIsValid(scan->rs_cbuf))
|
||||
{
|
||||
|
@ -479,10 +482,25 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block)
|
|||
*/
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* read page using selected strategy */
|
||||
scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block,
|
||||
RBM_NORMAL, scan->rs_strategy);
|
||||
scan->rs_cblock = block;
|
||||
if (unlikely(!scan->rs_inited))
|
||||
{
|
||||
scan->rs_cblock = heapgettup_initial_block(scan, dir);
|
||||
|
||||
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
|
||||
Assert(scan->rs_cblock != InvalidBlockNumber ||
|
||||
!BufferIsValid(scan->rs_cbuf));
|
||||
|
||||
scan->rs_inited = true;
|
||||
}
|
||||
else
|
||||
scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
|
||||
dir);
|
||||
|
||||
/* read block if valid */
|
||||
if (BlockNumberIsValid(scan->rs_cblock))
|
||||
scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
|
||||
scan->rs_cblock, RBM_NORMAL,
|
||||
scan->rs_strategy);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -492,7 +510,7 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block)
|
|||
* occur with empty tables and in parallel scans when parallel workers get all
|
||||
* of the pages before we can get a chance to get our first page.
|
||||
*/
|
||||
static BlockNumber
|
||||
static pg_noinline BlockNumber
|
||||
heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
|
||||
{
|
||||
Assert(!scan->rs_inited);
|
||||
|
@ -619,7 +637,7 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
|
|||
}
|
||||
|
||||
/*
|
||||
* heapgettup_advance_block - helper for heapgettup() and heapgettup_pagemode()
|
||||
* heapgettup_advance_block - helper for heap_fetch_next_buffer()
|
||||
*
|
||||
* Given the current block number, the scan direction, and various information
|
||||
* contained in the scan descriptor, calculate the BlockNumber to scan next
|
||||
|
@ -730,23 +748,13 @@ heapgettup(HeapScanDesc scan,
|
|||
ScanKey key)
|
||||
{
|
||||
HeapTuple tuple = &(scan->rs_ctup);
|
||||
BlockNumber block;
|
||||
Page page;
|
||||
OffsetNumber lineoff;
|
||||
int linesleft;
|
||||
|
||||
if (unlikely(!scan->rs_inited))
|
||||
{
|
||||
block = heapgettup_initial_block(scan, dir);
|
||||
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
|
||||
Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
|
||||
scan->rs_inited = true;
|
||||
}
|
||||
else
|
||||
if (likely(scan->rs_inited))
|
||||
{
|
||||
/* continue from previously returned page/tuple */
|
||||
block = scan->rs_cblock;
|
||||
|
||||
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
|
||||
page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff);
|
||||
goto continue_page;
|
||||
|
@ -756,9 +764,16 @@ heapgettup(HeapScanDesc scan,
|
|||
* advance the scan until we find a qualifying tuple or run out of stuff
|
||||
* to scan
|
||||
*/
|
||||
while (block != InvalidBlockNumber)
|
||||
while (true)
|
||||
{
|
||||
heapfetchbuf(scan, block);
|
||||
heap_fetch_next_buffer(scan, dir);
|
||||
|
||||
/* did we run out of blocks to scan? */
|
||||
if (!BufferIsValid(scan->rs_cbuf))
|
||||
break;
|
||||
|
||||
Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
|
||||
|
||||
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
|
||||
page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
|
||||
continue_page:
|
||||
|
@ -780,7 +795,7 @@ continue_page:
|
|||
|
||||
tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
|
||||
tuple->t_len = ItemIdGetLength(lpp);
|
||||
ItemPointerSet(&(tuple->t_self), block, lineoff);
|
||||
ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
|
||||
|
||||
visible = HeapTupleSatisfiesVisibility(tuple,
|
||||
scan->rs_base.rs_snapshot,
|
||||
|
@ -810,9 +825,6 @@ continue_page:
|
|||
* it's time to move to the next.
|
||||
*/
|
||||
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
|
||||
|
||||
/* get the BlockNumber to scan next */
|
||||
block = heapgettup_advance_block(scan, block, dir);
|
||||
}
|
||||
|
||||
/* end of scan */
|
||||
|
@ -832,9 +844,9 @@ continue_page:
|
|||
*
|
||||
* The internal logic is much the same as heapgettup's too, but there are some
|
||||
* differences: we do not take the buffer content lock (that only needs to
|
||||
* happen inside heapgetpage), and we iterate through just the tuples listed
|
||||
* in rs_vistuples[] rather than all tuples on the page. Notice that
|
||||
* lineindex is 0-based, where the corresponding loop variable lineoff in
|
||||
* happen inside heap_prepare_pagescan), and we iterate through just the
|
||||
* tuples listed in rs_vistuples[] rather than all tuples on the page. Notice
|
||||
* that lineindex is 0-based, where the corresponding loop variable lineoff in
|
||||
* heapgettup is 1-based.
|
||||
* ----------------
|
||||
*/
|
||||
|
@ -845,22 +857,13 @@ heapgettup_pagemode(HeapScanDesc scan,
|
|||
ScanKey key)
|
||||
{
|
||||
HeapTuple tuple = &(scan->rs_ctup);
|
||||
BlockNumber block;
|
||||
Page page;
|
||||
int lineindex;
|
||||
int linesleft;
|
||||
|
||||
if (unlikely(!scan->rs_inited))
|
||||
{
|
||||
block = heapgettup_initial_block(scan, dir);
|
||||
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
|
||||
Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
|
||||
scan->rs_inited = true;
|
||||
}
|
||||
else
|
||||
if (likely(scan->rs_inited))
|
||||
{
|
||||
/* continue from previously returned page/tuple */
|
||||
block = scan->rs_cblock; /* current page */
|
||||
page = BufferGetPage(scan->rs_cbuf);
|
||||
|
||||
lineindex = scan->rs_cindex + dir;
|
||||
|
@ -877,10 +880,15 @@ heapgettup_pagemode(HeapScanDesc scan,
|
|||
* advance the scan until we find a qualifying tuple or run out of stuff
|
||||
* to scan
|
||||
*/
|
||||
while (block != InvalidBlockNumber)
|
||||
while (true)
|
||||
{
|
||||
/* read the page */
|
||||
heapfetchbuf(scan, block);
|
||||
heap_fetch_next_buffer(scan, dir);
|
||||
|
||||
/* did we run out of blocks to scan? */
|
||||
if (!BufferIsValid(scan->rs_cbuf))
|
||||
break;
|
||||
|
||||
Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
|
||||
|
||||
/* prune the page and determine visible tuple offsets */
|
||||
heap_prepare_pagescan((TableScanDesc) scan);
|
||||
|
@ -902,7 +910,7 @@ continue_page:
|
|||
|
||||
tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
|
||||
tuple->t_len = ItemIdGetLength(lpp);
|
||||
ItemPointerSet(&(tuple->t_self), block, lineoff);
|
||||
ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
|
||||
|
||||
/* skip any tuples that don't match the scan key */
|
||||
if (key != NULL &&
|
||||
|
@ -913,9 +921,6 @@ continue_page:
|
|||
scan->rs_cindex = lineindex;
|
||||
return;
|
||||
}
|
||||
|
||||
/* get the BlockNumber to scan next */
|
||||
block = heapgettup_advance_block(scan, block, dir);
|
||||
}
|
||||
|
||||
/* end of scan */
|
||||
|
|
|
@ -2489,7 +2489,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
|
|||
visible = SampleHeapTupleVisible(scan, hscan->rs_cbuf,
|
||||
tuple, tupoffset);
|
||||
|
||||
/* in pagemode, heapgetpage did this for us */
|
||||
/* in pagemode, heap_prepare_pagescan did this for us */
|
||||
if (!pagemode)
|
||||
HeapCheckForSerializableConflictOut(visible, scan->rs_rd, tuple,
|
||||
hscan->rs_cbuf, scan->rs_snapshot);
|
||||
|
|
Loading…
Reference in New Issue