From 56788d2156fc32bd5737e7ac716d70e6a269b7bc Mon Sep 17 00:00:00 2001 From: David Rowley Date: Sun, 26 Jul 2020 21:02:45 +1200 Subject: [PATCH] Allocate consecutive blocks during parallel seqscans Previously we would allocate blocks to parallel workers during a parallel sequential scan 1 block at a time. Since other workers were likely to request a block before a worker returns for another block number to work on, this could lead to non-sequential I/O patterns in each worker which could cause the operating system's readahead to perform poorly or not at all. Here we change things so that we allocate consecutive "chunks" of blocks to workers and have them work on those until they're done, at which time we allocate another chunk for the worker. The size of these chunks is based on the size of the relation. Initial patch here was by Thomas Munro which showed some good improvements just having a fixed chunk size of 64 blocks with a simple ramp-down near the end of the scan. The revisions of the patch to make the chunk size based on the relation size and the adjusted ramp-down in powers of two was done by me, along with quite extensive benchmarking to determine the optimal chunk sizes. For the most part, benchmarks have shown significant performance improvements for large parallel sequential scans on Linux, FreeBSD and Windows using SSDs. It's less clear how this affects the performance of cloud providers. Tests done so far are unable to obtain stable enough performance to provide meaningful benchmark results. It is possible that this could cause some performance regressions on more obscure filesystems, so we may need to later provide users with some ability to get something closer to the old behavior. For now, let's leave that until we see that it's really required. Author: Thomas Munro, David Rowley Reviewed-by: Ranier Vilela, Soumyadeep Chakraborty, Robert Haas Reviewed-by: Amit Kapila, Kirk Jamison Discussion: https://postgr.es/m/CA+hUKGJ_EErDv41YycXcbMbCBkztA34+z1ts9VQH+ACRuvpxig@mail.gmail.com --- src/backend/access/heap/heapam.c | 22 ++++-- src/backend/access/table/tableam.c | 118 +++++++++++++++++++++++++++-- src/include/access/relscan.h | 14 +++- src/include/access/tableam.h | 2 + 4 files changed, 144 insertions(+), 12 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d881f4cd46..2c9bb0c7ee 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + (ParallelBlockTableScanWorker) scan->rs_base.rs_private; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_nkeys = nkeys; scan->rs_base.rs_flags = flags; scan->rs_base.rs_parallel = parallel_scan; + scan->rs_base.rs_private = + palloc(sizeof(ParallelBlockTableScanWorkerData)); scan->rs_strategy = NULL; /* set in initscan */ /* diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 4b2bb29559..4e8553de2a 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -25,10 +25,24 @@ #include "access/tableam.h" #include "access/xact.h" #include "optimizer/plancat.h" +#include "port/pg_bitutils.h" #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +/* + * Constants to control the behavior of block allocation to parallel workers + * during a parallel seqscan. Technically these values do not need to be + * powers of 2, but having them as powers of 2 makes the math more optimal + * and makes the ramp-down stepping more even. + */ + +/* The number of I/O chunks we try to break a parallel seqscan down into */ +#define PARALLEL_SEQSCAN_NCHUNKS 2048 +/* Ramp down size of allocations when we've only this number of chunks left */ +#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64 +/* Cap the size of parallel I/O chunks to this number of blocks */ +#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 8192 /* GUC variables */ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; @@ -408,10 +422,37 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) * to set the startblock once. */ void -table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWorker pbscanwork, + ParallelBlockTableScanDesc pbscan) { BlockNumber sync_startpage = InvalidBlockNumber; + /* Reset the state we use for controlling allocation size. */ + memset(pbscanwork, 0, sizeof(*pbscanwork)); + + StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE, + "pg_nextpower2_32 may be too small for non-standard BlockNumber width"); + + /* + * We determine the chunk size based on the size of the relation. First we + * split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then + * take the next highest power of 2 number of the chunk size. This means + * we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS + * and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks. + */ + pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks / + PARALLEL_SEQSCAN_NCHUNKS, 1)); + + /* + * Ensure we don't go over the maximum chunk size with larger tables. This + * means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger + * tables. Too large a chunk size has been shown to be detrimental to + * synchronous scan performance. + */ + pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size, + PARALLEL_SEQSCAN_MAX_CHUNK_SIZE); + retry: /* Grab the spinlock. */ SpinLockAcquire(&pbscan->phs_mutex); @@ -451,13 +492,40 @@ retry: * backend gets an InvalidBlockNumber return. */ BlockNumber -table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWorker pbscanwork, + ParallelBlockTableScanDesc pbscan) { BlockNumber page; uint64 nallocated; /* - * phs_nallocated tracks how many pages have been allocated to workers + * The logic below allocates block numbers out to parallel workers in a + * way that each worker will receive a set of consecutive block numbers to + * scan. Earlier versions of this would allocate the next highest block + * number to the next worker to call this function. This would generally + * result in workers never receiving consecutive block numbers. Some + * operating systems would not detect the sequential I/O pattern due to + * each backend being a different process which could result in poor + * performance due to inefficient or no readahead. To work around this + * issue, we now allocate a range of block numbers for each worker and + * when they come back for another block, we give them the next one in + * that range until the range is complete. When the worker completes the + * range of blocks we then allocate another range for it and return the + * first block number from that range. + * + * Here we name these ranges of blocks "chunks". The initial size of + * these chunks is determined in table_block_parallelscan_startblock_init + * based on the size of the relation. Towards the end of the scan, we + * start making reductions in the size of the chunks in order to attempt + * to divide the remaining work over all the workers as evenly as + * possible. + * + * Here pbscanwork is local worker memory. phsw_chunk_remaining tracks + * the number of blocks remaining in the chunk. When that reaches 0 then + * we must allocate a new chunk for the worker. + * + * phs_nallocated tracks how many blocks have been allocated to workers * already. When phs_nallocated >= rs_nblocks, all blocks have been * allocated. * @@ -468,10 +536,50 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca * wide because of that, to avoid wrapping around when rs_nblocks is close * to 2^32. * - * The actual page to return is calculated by adding the counter to the + * The actual block to return is calculated by adding the counter to the * starting block number, modulo nblocks. */ - nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); + + /* + * First check if we have any remaining blocks in a previous chunk for + * this worker. We must consume all of the blocks from that before we + * allocate a new chunk to the worker. + */ + if (pbscanwork->phsw_chunk_remaining > 0) + { + /* + * Give them the next block in the range and update the remaining + * number of blocks. + */ + nallocated = ++pbscanwork->phsw_nallocated; + pbscanwork->phsw_chunk_remaining--; + } + else + { + /* + * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks + * remaining in the scan, we half the chunk size. Since we reduce the + * chunk size here, we'll hit this again after doing + * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few + * iterations of this, we'll end up doing the last few blocks with the + * chunk size set to 1. + */ + if (pbscanwork->phsw_chunk_size > 1 && + pbscanwork->phsw_nallocated > pbscan->phs_nblocks - + (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS)) + pbscanwork->phsw_chunk_size >>= 1; + + nallocated = pbscanwork->phsw_nallocated = + pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, + pbscanwork->phsw_chunk_size); + + /* + * Set the remaining number of blocks in this chunk so that subsequent + * calls from this worker continue on with this chunk until it's done. + */ + pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1; + } + if (nallocated >= pbscan->phs_nblocks) page = InvalidBlockNumber; /* all blocks have been allocated */ else diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6f0258831f..5645976951 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -42,9 +42,9 @@ typedef struct TableScanDescData */ uint32 rs_flags; + void *rs_private; /* per-worker private memory for AM to use */ struct ParallelTableScanDescData *rs_parallel; /* parallel scan * information */ - } TableScanDescData; typedef struct TableScanDescData *TableScanDesc; @@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData } ParallelBlockTableScanDescData; typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc; +/* + * Per backend state for parallel table scan, for block-oriented storage. + */ +typedef struct ParallelBlockTableScanWorkerData +{ + uint64 phsw_nallocated; /* Current # of blocks into the scan */ + uint32 phsw_chunk_remaining; /* # blocks left in this chunk */ + uint32 phsw_chunk_size; /* The number of blocks to allocate in + * each I/O chunk for the scan */ +} ParallelBlockTableScanWorkerData; +typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker; + /* * Base class for fetches from a table via an index. This is the base-class * for such scans, which needs to be embedded in the respective struct for diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 0d28f01ca9..7ba72c84e0 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1793,8 +1793,10 @@ extern Size table_block_parallelscan_initialize(Relation rel, extern void table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan); extern BlockNumber table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanDesc pbscan); extern void table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanDesc pbscan);