From b7b0f3f27241e424b7103397489464d907cef2c4 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 8 Apr 2024 01:48:27 +1200 Subject: [PATCH] Use streaming I/O in sequential scans. Instead of calling ReadBuffer() for each block, heap sequential scans and TID range scans now use the streaming API introduced in b5a9b18cd0. Author: Melanie Plageman Reviewed-by: Andres Freund Reviewed-by: Thomas Munro Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com --- src/backend/access/heap/heapam.c | 241 +++++++++++++++++++++---------- src/include/access/heapam.h | 15 ++ 2 files changed, 180 insertions(+), 76 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index a32acc9047..2663f52d1a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +/* + * Streaming read API callback for parallel sequential scans. Returns the next + * block the caller wants from the read stream or InvalidBlockNumber when done. + */ +static BlockNumber +heap_scan_stream_read_next_parallel(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) callback_private_data; + + Assert(ScanDirectionIsForward(scan->rs_dir)); + Assert(scan->rs_base.rs_parallel); + + if (unlikely(!scan->rs_inited)) + { + /* parallel scan */ + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + + /* may return InvalidBlockNumber if there are no more blocks */ + scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + scan->rs_inited = true; + } + else + { + scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc) + scan->rs_base.rs_parallel); + } + + return scan->rs_prefetch_block; +} + +/* + * Streaming read API callback for serial sequential and TID range scans. + * Returns the next block the caller wants from the read stream or + * InvalidBlockNumber when done. + */ +static BlockNumber +heap_scan_stream_read_next_serial(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) callback_private_data; + + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and + * because heap scans go forward before going backward (e.g. CURSORs). + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan) /* * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM. * - * Read the next block of the scan relation into a buffer and pin that buffer - * before saving it in the scan descriptor. + * Read the next block of the scan relation from the read stream and save it + * in the scan descriptor. It is already pinned. */ static inline void heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_read_stream); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); - - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(scan->rs_cblock != InvalidBlockNumber || - !BufferIsValid(scan->rs_cbuf)); - - scan->rs_inited = true; + scan->rs_prefetch_block = scan->rs_cblock; + read_stream_reset(scan->rs_read_stream); } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, - dir); - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, - scan->rs_strategy); + scan->rs_dir = dir; + + scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -560,6 +629,7 @@ static pg_noinline BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) { Assert(!scan->rs_inited); + Assert(scan->rs_base.rs_parallel == NULL); /* When there are no pages to scan, return InvalidBlockNumber */ if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) @@ -567,27 +637,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) if (ScanDirectionIsForward(dir)) { - /* serial scan */ - if (scan->rs_base.rs_parallel == NULL) - return scan->rs_startblock; - else - { - /* parallel scan */ - table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - scan->rs_parallelworkerdata, - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); - - /* may return InvalidBlockNumber if there are no more blocks */ - return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - scan->rs_parallelworkerdata, - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); - } + return scan->rs_startblock; } else { - /* backward parallel scan not supported */ - Assert(scan->rs_base.rs_parallel == NULL); - /* * Disable reporting to syncscan logic in a backwards scan; it's not * very likely anyone else is doing the same thing at the same time, @@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft, static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir) { - if (ScanDirectionIsForward(dir)) + Assert(scan->rs_base.rs_parallel == NULL); + + if (likely(ScanDirectionIsForward(dir))) { - if (scan->rs_base.rs_parallel == NULL) + block++; + + /* wrap back to the start of the heap */ + if (block >= scan->rs_nblocks) + block = 0; + + /* + * Report our new scan position for synchronization purposes. We don't + * do that when moving backwards, however. That would just mess up any + * other forward-moving scanners. + * + * Note: we do this before checking for end of scan so that the final + * state of the position hint is back at the start of the rel. That's + * not strictly necessary, but otherwise when you run the same query + * multiple times the starting position would shift a little bit + * backwards on every invocation, which is confusing. We don't + * guarantee any specific ordering in general, though. + */ + if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) + ss_report_location(scan->rs_base.rs_rd, block); + + /* we're done if we're back at where we started */ + if (block == scan->rs_startblock) + return InvalidBlockNumber; + + /* check if the limit imposed by heap_setscanlimits() is met */ + if (scan->rs_numblocks != InvalidBlockNumber) { - block++; - - /* wrap back to the start of the heap */ - if (block >= scan->rs_nblocks) - block = 0; - - /* - * Report our new scan position for synchronization purposes. We - * don't do that when moving backwards, however. That would just - * mess up any other forward-moving scanners. - * - * Note: we do this before checking for end of scan so that the - * final state of the position hint is back at the start of the - * rel. That's not strictly necessary, but otherwise when you run - * the same query multiple times the starting position would shift - * a little bit backwards on every invocation, which is confusing. - * We don't guarantee any specific ordering in general, though. - */ - if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) - ss_report_location(scan->rs_base.rs_rd, block); - - /* we're done if we're back at where we started */ - if (block == scan->rs_startblock) + if (--scan->rs_numblocks == 0) return InvalidBlockNumber; - - /* check if the limit imposed by heap_setscanlimits() is met */ - if (scan->rs_numblocks != InvalidBlockNumber) - { - if (--scan->rs_numblocks == 0) - return InvalidBlockNumber; - } - - return block; - } - else - { - return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc) - scan->rs_base.rs_parallel); } + + return block; } else { @@ -879,6 +925,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -974,6 +1021,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + scan->rs_read_stream = NULL; + + /* + * Set up a read stream for sequential scans and TID range scans. This + * should be done after initscan() because initscan() allocates the + * BufferAccessStrategy object passed to the streaming read API. + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + ReadStreamBlockNumberCB cb; + + if (scan->rs_base.rs_parallel) + cb = heap_scan_stream_read_next_parallel; + else + cb = heap_scan_stream_read_next_serial; + + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, + scan->rs_strategy, + scan->rs_base.rs_rd, + MAIN_FORKNUM, + cb, + scan, + 0); + } + + return (TableScanDesc) scan; } @@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, Assert(scan->rs_empty_tuples_pending == 0); + /* + * The read stream is reset on rescan. This must be done before + * initscan(), as some state referred to by read_stream_reset() is reset + * in initscan(). + */ + if (scan->rs_read_stream) + read_stream_reset(scan->rs_read_stream); + /* * reinitialize scan descriptor */ @@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan) Assert(scan->rs_empty_tuples_pending == 0); + /* + * Must free the read stream before freeing the BufferAccessStrategy. + */ + if (scan->rs_read_stream) + read_stream_end(scan->rs_read_stream); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 750ea30852..48936826bc 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -25,6 +25,7 @@ #include "storage/bufpage.h" #include "storage/dsm.h" #include "storage/lockdefs.h" +#include "storage/read_stream.h" #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -70,6 +71,20 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + /* For scans that stream reads */ + ReadStream *rs_read_stream; + + /* + * For sequential scans and TID range scans to stream reads. The read + * stream is allocated at the beginning of the scan and reset on rescan or + * when the scan direction changes. The scan direction is saved each time + * a new page is requested. If the scan direction changes from one page to + * the next, the read stream releases all previously pinned buffers and + * resets the prefetch block. + */ + ScanDirection rs_dir; + BlockNumber rs_prefetch_block; + /* * For parallel scans to store page allocation data. NULL when not * performing a parallel scan.