Use streaming I/O in ANALYZE.

The ANALYZE command prefetches and reads sample blocks chosen by a
BlockSampler algorithm. Instead of calling [Prefetch|Read]Buffer() for
each block, ANALYZE now uses the streaming API introduced in b5a9b18cd0.

Author: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Jakub Wartak <jakub.wartak@enterprisedb.com>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
This commit is contained in:
Thomas Munro 2024-04-08 13:16:20 +12:00
parent f587338dec
commit 041b96802e
3 changed files with 43 additions and 76 deletions

View File

@ -1055,33 +1055,36 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
} }
/* /*
* Prepare to analyze block `blockno` of `scan`. The scan has been started * Prepare to analyze the next block in the read stream. Returns false if
* the stream is exhausted and true otherwise. The scan must have been started
* with SO_TYPE_ANALYZE option. * with SO_TYPE_ANALYZE option.
* *
* This routine holds a buffer pin and lock on the heap page. They are held * This routine holds a buffer pin and lock on the heap page. They are held
* until heapam_scan_analyze_next_tuple() returns false. That is until all the * until heapam_scan_analyze_next_tuple() returns false. That is until all the
* items of the heap page are analyzed. * items of the heap page are analyzed.
*/ */
void bool
heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
BufferAccessStrategy bstrategy)
{ {
HeapScanDesc hscan = (HeapScanDesc) scan; HeapScanDesc hscan = (HeapScanDesc) scan;
/* /*
* We must maintain a pin on the target page's buffer to ensure that * We must maintain a pin on the target page's buffer to ensure that
* concurrent activity - e.g. HOT pruning - doesn't delete tuples out from * concurrent activity - e.g. HOT pruning - doesn't delete tuples out from
* under us. Hence, pin the page until we are done looking at it. We * under us. It comes from the stream already pinned. We also choose to
* also choose to hold sharelock on the buffer throughout --- we could * hold sharelock on the buffer throughout --- we could release and
* release and re-acquire sharelock for each tuple, but since we aren't * re-acquire sharelock for each tuple, but since we aren't doing much
* doing much work per tuple, the extra lock traffic is probably better * work per tuple, the extra lock traffic is probably better avoided.
* avoided.
*/ */
hscan->rs_cblock = blockno; hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
hscan->rs_cindex = FirstOffsetNumber; if (!BufferIsValid(hscan->rs_cbuf))
hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, return false;
blockno, RBM_NORMAL, bstrategy);
LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE); LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
hscan->rs_cindex = FirstOffsetNumber;
return true;
} }
/* /*

View File

@ -1102,6 +1102,20 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
return stats; return stats;
} }
/*
* Read stream callback returning the next BlockNumber as chosen by the
* BlockSampling algorithm.
*/
static BlockNumber
block_sampling_read_stream_next(ReadStream *stream,
void *callback_private_data,
void *per_buffer_data)
{
BlockSamplerData *bs = callback_private_data;
return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
}
/* /*
* acquire_sample_rows -- acquire a random sample of rows from the heap * acquire_sample_rows -- acquire a random sample of rows from the heap
* *
@ -1154,10 +1168,7 @@ acquire_sample_rows(Relation onerel, int elevel,
TableScanDesc scan; TableScanDesc scan;
BlockNumber nblocks; BlockNumber nblocks;
BlockNumber blksdone = 0; BlockNumber blksdone = 0;
#ifdef USE_PREFETCH ReadStream *stream;
int prefetch_maximum = 0; /* blocks to prefetch if enabled */
BlockSamplerData prefetch_bs;
#endif
Assert(targrows > 0); Assert(targrows > 0);
@ -1170,13 +1181,6 @@ acquire_sample_rows(Relation onerel, int elevel,
randseed = pg_prng_uint32(&pg_global_prng_state); randseed = pg_prng_uint32(&pg_global_prng_state);
nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed); nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
#ifdef USE_PREFETCH
prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
/* Create another BlockSampler, using the same seed, for prefetching */
if (prefetch_maximum)
(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
#endif
/* Report sampling block numbers */ /* Report sampling block numbers */
pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL, pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
nblocks); nblocks);
@ -1187,60 +1191,19 @@ acquire_sample_rows(Relation onerel, int elevel,
scan = heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE); scan = heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
slot = table_slot_create(onerel, NULL); slot = table_slot_create(onerel, NULL);
#ifdef USE_PREFETCH stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
vac_strategy,
/* scan->rs_rd,
* If we are doing prefetching, then go ahead and tell the kernel about MAIN_FORKNUM,
* the first set of pages we are going to want. This also moves our block_sampling_read_stream_next,
* iterator out ahead of the main one being used, where we will keep it so &bs,
* that we're always pre-fetching out prefetch_maximum number of blocks 0);
* ahead.
*/
if (prefetch_maximum)
{
for (int i = 0; i < prefetch_maximum; i++)
{
BlockNumber prefetch_block;
if (!BlockSampler_HasMore(&prefetch_bs))
break;
prefetch_block = BlockSampler_Next(&prefetch_bs);
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
}
}
#endif
/* Outer loop over blocks to sample */ /* Outer loop over blocks to sample */
while (BlockSampler_HasMore(&bs)) while (heapam_scan_analyze_next_block(scan, stream))
{ {
BlockNumber targblock = BlockSampler_Next(&bs);
#ifdef USE_PREFETCH
BlockNumber prefetch_targblock = InvalidBlockNumber;
/*
* Make sure that every time the main BlockSampler is moved forward
* that our prefetch BlockSampler also gets moved forward, so that we
* always stay out ahead.
*/
if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
prefetch_targblock = BlockSampler_Next(&prefetch_bs);
#endif
vacuum_delay_point(); vacuum_delay_point();
heapam_scan_analyze_next_block(scan, targblock, vac_strategy);
#ifdef USE_PREFETCH
/*
* When pre-fetching, after we get a block, tell the kernel about the
* next one we will want, if there's any left.
*/
if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
#endif
while (heapam_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot)) while (heapam_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
{ {
/* /*
@ -1290,6 +1253,8 @@ acquire_sample_rows(Relation onerel, int elevel,
++blksdone); ++blksdone);
} }
read_stream_end(stream);
ExecDropSingleTupleTableSlot(slot); ExecDropSingleTupleTableSlot(slot);
heap_endscan(scan); heap_endscan(scan);

View File

@ -413,9 +413,8 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup,
struct GlobalVisState *vistest); struct GlobalVisState *vistest);
/* in heap/heapam_handler.c*/ /* in heap/heapam_handler.c*/
extern void heapam_scan_analyze_next_block(TableScanDesc scan, extern bool heapam_scan_analyze_next_block(TableScanDesc scan,
BlockNumber blockno, ReadStream *stream);
BufferAccessStrategy bstrategy);
extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan, extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan,
TransactionId OldestXmin, TransactionId OldestXmin,
double *liverows, double *deadrows, double *liverows, double *deadrows,