diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d881f4cd46..2c9bb0c7ee 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_nkeys = nkeys; scan->rs_base.rs_flags = flags; scan->rs_base.rs_parallel = parallel_scan; + scan->rs_base.rs_private = + palloc(sizeof(ParallelBlockTableScanWorkerData)); scan->rs_strategy = NULL; /* set in initscan */ /* diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 4b2bb29559..4e8553de2a 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -25,10 +25,24 @@ #include "access/tableam.h" #include "access/xact.h" #include "optimizer/plancat.h" +#include "port/pg_bitutils.h" #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +/* + * Constants to control the behavior of block allocation to parallel workers + * during a parallel seqscan. Technically these values do not need to be + * powers of 2, but having them as powers of 2 makes the math more optimal + * and makes the ramp-down stepping more even. + */ + +/* The number of I/O chunks we try to break a parallel seqscan down into */ +#define PARALLEL_SEQSCAN_NCHUNKS 2048 +/* Ramp down size of allocations when we've only this number of chunks left */ +#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64 +/* Cap the size of parallel I/O chunks to this number of blocks */ +#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 8192 /* GUC variables */ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; @@ -408,10 +422,37 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) * to set the startblock once. */ void -table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWorker pbscanwork, + ParallelBlockTableScanDesc pbscan) { BlockNumber sync_startpage = InvalidBlockNumber; + /* Reset the state we use for controlling allocation size. */ + memset(pbscanwork, 0, sizeof(*pbscanwork)); + + StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE, + "pg_nextpower2_32 may be too small for non-standard BlockNumber width"); + + /* + * We determine the chunk size based on the size of the relation. First we + * split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then + * take the next highest power of 2 number of the chunk size. This means + * we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS + * and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks. + */ + pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks / + PARALLEL_SEQSCAN_NCHUNKS, 1)); + + /* + * Ensure we don't go over the maximum chunk size with larger tables. This + * means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger + * tables. Too large a chunk size has been shown to be detrimental to + * synchronous scan performance. + */ + pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size, + PARALLEL_SEQSCAN_MAX_CHUNK_SIZE); + retry: /* Grab the spinlock. */ SpinLockAcquire(&pbscan->phs_mutex); @@ -451,13 +492,40 @@ retry: * backend gets an InvalidBlockNumber return. */ BlockNumber -table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWorker pbscanwork, + ParallelBlockTableScanDesc pbscan) { BlockNumber page; uint64 nallocated; /* - * phs_nallocated tracks how many pages have been allocated to workers + * The logic below allocates block numbers out to parallel workers in a + * way that each worker will receive a set of consecutive block numbers to + * scan. Earlier versions of this would allocate the next highest block + * number to the next worker to call this function. This would generally + * result in workers never receiving consecutive block numbers. Some + * operating systems would not detect the sequential I/O pattern due to + * each backend being a different process which could result in poor + * performance due to inefficient or no readahead. To work around this + * issue, we now allocate a range of block numbers for each worker and + * when they come back for another block, we give them the next one in + * that range until the range is complete. When the worker completes the + * range of blocks we then allocate another range for it and return the + * first block number from that range. + * + * Here we name these ranges of blocks "chunks". The initial size of + * these chunks is determined in table_block_parallelscan_startblock_init + * based on the size of the relation. Towards the end of the scan, we + * start making reductions in the size of the chunks in order to attempt + * to divide the remaining work over all the workers as evenly as + * possible. + * + * Here pbscanwork is local worker memory. phsw_chunk_remaining tracks + * the number of blocks remaining in the chunk. When that reaches 0 then + * we must allocate a new chunk for the worker. + * + * phs_nallocated tracks how many blocks have been allocated to workers * already. When phs_nallocated >= rs_nblocks, all blocks have been * allocated. * @@ -468,10 +536,50 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca * wide because of that, to avoid wrapping around when rs_nblocks is close * to 2^32. * - * The actual page to return is calculated by adding the counter to the + * The actual block to return is calculated by adding the counter to the * starting block number, modulo nblocks. */ - nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); + + /* + * First check if we have any remaining blocks in a previous chunk for + * this worker. We must consume all of the blocks from that before we + * allocate a new chunk to the worker. + */ + if (pbscanwork->phsw_chunk_remaining > 0) + { + /* + * Give them the next block in the range and update the remaining + * number of blocks. + */ + nallocated = ++pbscanwork->phsw_nallocated; + pbscanwork->phsw_chunk_remaining--; + } + else + { + /* + * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks + * remaining in the scan, we half the chunk size. Since we reduce the + * chunk size here, we'll hit this again after doing + * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few + * iterations of this, we'll end up doing the last few blocks with the + * chunk size set to 1. + */ + if (pbscanwork->phsw_chunk_size > 1 && + pbscanwork->phsw_nallocated > pbscan->phs_nblocks - + (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS)) + pbscanwork->phsw_chunk_size >>= 1; + + nallocated = pbscanwork->phsw_nallocated = + pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, + pbscanwork->phsw_chunk_size); + + /* + * Set the remaining number of blocks in this chunk so that subsequent + * calls from this worker continue on with this chunk until it's done. + */ + pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1; + } + if (nallocated >= pbscan->phs_nblocks) page = InvalidBlockNumber; /* all blocks have been allocated */ else diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6f0258831f..5645976951 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -42,9 +42,9 @@ typedef struct TableScanDescData */ uint32 rs_flags; + void *rs_private; /* per-worker private memory for AM to use */ struct ParallelTableScanDescData *rs_parallel; /* parallel scan * information */ - } TableScanDescData; typedef struct TableScanDescData *TableScanDesc; @@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData } ParallelBlockTableScanDescData; typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc; +/* + * Per backend state for parallel table scan, for block-oriented storage. + */ +typedef struct ParallelBlockTableScanWorkerData +{ + uint64 phsw_nallocated; /* Current # of blocks into the scan */ + uint32 phsw_chunk_remaining; /* # blocks left in this chunk */ + uint32 phsw_chunk_size; /* The number of blocks to allocate in + * each I/O chunk for the scan */ +} ParallelBlockTableScanWorkerData; +typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker; + /* * Base class for fetches from a table via an index. This is the base-class * for such scans, which needs to be embedded in the respective struct for diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 0d28f01ca9..7ba72c84e0 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1793,8 +1793,10 @@ extern Size table_block_parallelscan_initialize(Relation rel, extern void table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan); extern BlockNumber table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanDesc pbscan); extern void table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanDesc pbscan);