Minor cleanup of the BRIN parallel build code

Commit b437571714 added support for parallel builds for BRIN indexes,
using code similar to BTREE parallel builds, and also a new tuplesort
variant. This commit simplifies the new code in two ways:

* The "spool" grouping tuplesort and the heap/index is not necessary.
  The heap/index are available as separate arguments, causing confusion.
  So remove the spool, and use the tuplesort directly.

* The new tuplesort variant does not need the heap/index, as it sorts
  simply by the range block number, without accessing the tuple data.
  So simplify that too.

Initial report and patch by Ranier Vilela, further cleanup by me.

Author: Ranier Vilela
Discussion: https://postgr.es/m/CAEudQAqD7f2i4iyEaAz-5o-bf6zXVX-AkNUBm-YjUXEemaEh6A%40mail.gmail.com
This commit is contained in:
Tomas Vondra 2023-12-30 22:50:54 +01:00
parent 5632d6e18a
commit 6c63bcbf3c
4 changed files with 39 additions and 90 deletions

View File

@ -50,16 +50,6 @@
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
/*
* Status record for spooling/sorting phase.
*/
typedef struct BrinSpool
{
Tuplesortstate *sortstate; /* state data for tuplesort.c */
Relation heap;
Relation index;
} BrinSpool;
/* /*
* Status for index builds performed in parallel. This is allocated in a * Status for index builds performed in parallel. This is allocated in a
* dynamic shared memory segment. * dynamic shared memory segment.
@ -183,7 +173,13 @@ typedef struct BrinBuildState
*/ */
BrinLeader *bs_leader; BrinLeader *bs_leader;
int bs_worker_id; int bs_worker_id;
BrinSpool *bs_spool;
/*
* The sortstate is used by workers (including the leader). It has to be
* part of the build state, because that's the only thing passed to the
* build callback etc.
*/
Tuplesortstate *bs_sortstate;
} BrinBuildState; } BrinBuildState;
/* /*
@ -231,12 +227,11 @@ static void brin_fill_empty_ranges(BrinBuildState *state,
/* parallel index builds */ /* parallel index builds */
static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
bool isconcurrent, int request); bool isconcurrent, int request);
static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
Relation heap, Relation index); Relation heap, Relation index);
static void _brin_parallel_scan_and_build(BrinBuildState *buildstate, static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
BrinSpool *brinspool,
BrinShared *brinshared, BrinShared *brinshared,
Sharedsort *sharedsort, Sharedsort *sharedsort,
Relation heap, Relation index, Relation heap, Relation index,
@ -1143,10 +1138,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
state = initialize_brin_buildstate(index, revmap, pagesPerRange, state = initialize_brin_buildstate(index, revmap, pagesPerRange,
RelationGetNumberOfBlocks(heap)); RelationGetNumberOfBlocks(heap));
state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
state->bs_spool->heap = heap;
state->bs_spool->index = index;
/* /*
* Attempt to launch parallel worker scan when required * Attempt to launch parallel worker scan when required
* *
@ -1160,11 +1151,13 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
indexInfo->ii_ParallelWorkers); indexInfo->ii_ParallelWorkers);
/* /*
* Now scan the relation. No syncscan allowed here because we want the
* heap blocks in physical order.
*
* If parallel build requested and at least one worker process was * If parallel build requested and at least one worker process was
* successfully launched, set up coordination state * successfully launched, set up coordination state, wait for workers to
* complete. Then read all tuples from the shared tuplesort and insert
* them into the index.
*
* In serial mode, simply scan the table and build the index one index
* tuple at a time.
*/ */
if (state->bs_leader) if (state->bs_leader)
{ {
@ -1176,9 +1169,8 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
state->bs_leader->nparticipanttuplesorts; state->bs_leader->nparticipanttuplesorts;
coordinate->sharedsort = state->bs_leader->sharedsort; coordinate->sharedsort = state->bs_leader->sharedsort;
/* /*
* Begin serial/leader tuplesort. * Begin leader tuplesort.
* *
* In cases where parallelism is involved, the leader receives the * In cases where parallelism is involved, the leader receives the
* same share of maintenance_work_mem as a serial sort (it is * same share of maintenance_work_mem as a serial sort (it is
@ -1199,19 +1191,18 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* INDEX operation, regardless of the use of parallelism or any other * INDEX operation, regardless of the use of parallelism or any other
* factor. * factor.
*/ */
state->bs_spool->sortstate = state->bs_sortstate =
tuplesort_begin_index_brin(heap, index, tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
maintenance_work_mem, coordinate,
TUPLESORT_NONE); TUPLESORT_NONE);
/*
* In parallel mode, wait for workers to complete, and then read all
* tuples from the shared tuplesort and insert them into the index.
*/
_brin_end_parallel(state->bs_leader, state); _brin_end_parallel(state->bs_leader, state);
} }
else /* no parallel index build */ else /* no parallel index build */
{ {
/*
* Now scan the relation. No syncscan allowed here because we want
* the heap blocks in physical order.
*/
reltuples = table_index_build_scan(heap, index, indexInfo, false, true, reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
brinbuildCallback, (void *) state, NULL); brinbuildCallback, (void *) state, NULL);
@ -1671,7 +1662,7 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
state->bs_dtuple = brin_new_memtuple(state->bs_bdesc); state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
state->bs_leader = NULL; state->bs_leader = NULL;
state->bs_worker_id = 0; state->bs_worker_id = 0;
state->bs_spool = NULL; state->bs_sortstate = NULL;
state->bs_context = CurrentMemoryContext; state->bs_context = CurrentMemoryContext;
state->bs_emptyTuple = NULL; state->bs_emptyTuple = NULL;
state->bs_emptyTupleLen = 0; state->bs_emptyTupleLen = 0;
@ -2002,7 +1993,7 @@ form_and_spill_tuple(BrinBuildState *state)
state->bs_dtuple, &size); state->bs_dtuple, &size);
/* write the BRIN tuple to the tuplesort */ /* write the BRIN tuple to the tuplesort */
tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size); tuplesort_putbrintuple(state->bs_sortstate, tup, size);
state->bs_numtuples++; state->bs_numtuples++;
@ -2522,7 +2513,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
Size tuplen; Size tuplen;
BrinShared *brinshared = brinleader->brinshared; BrinShared *brinshared = brinleader->brinshared;
BlockNumber prevblkno = InvalidBlockNumber; BlockNumber prevblkno = InvalidBlockNumber;
BrinSpool *spool;
MemoryContext rangeCxt, MemoryContext rangeCxt,
oldCxt; oldCxt;
@ -2541,8 +2531,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
state->bs_numtuples = brinshared->indtuples; state->bs_numtuples = brinshared->indtuples;
/* do the actual sort in the leader */ /* do the actual sort in the leader */
spool = state->bs_spool; tuplesort_performsort(state->bs_sortstate);
tuplesort_performsort(spool->sortstate);
/* /*
* Initialize BrinMemTuple we'll use to union summaries from workers (in * Initialize BrinMemTuple we'll use to union summaries from workers (in
@ -2568,7 +2557,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
* That probably gives us an index that is cheaper to scan, thanks to * That probably gives us an index that is cheaper to scan, thanks to
* mostly getting data from the same index page as before. * mostly getting data from the same index page as before.
*/ */
while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL) while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
{ {
/* Ranges should be multiples of pages_per_range for the index. */ /* Ranges should be multiples of pages_per_range for the index. */
Assert(btup->bt_blkno % brinshared->pagesPerRange == 0); Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
@ -2640,7 +2629,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
prevblkno = btup->bt_blkno; prevblkno = btup->bt_blkno;
} }
tuplesort_end(spool->sortstate); tuplesort_end(state->bs_sortstate);
/* Fill the BRIN tuple for the last page range with data. */ /* Fill the BRIN tuple for the last page range with data. */
if (prevblkno != InvalidBlockNumber) if (prevblkno != InvalidBlockNumber)
@ -2704,11 +2693,6 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
BrinLeader *brinleader = buildstate->bs_leader; BrinLeader *brinleader = buildstate->bs_leader;
int sortmem; int sortmem;
/* Allocate memory and initialize private spool */
buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
buildstate->bs_spool->heap = buildstate->bs_spool->heap;
buildstate->bs_spool->index = buildstate->bs_spool->index;
/* /*
* Might as well use reliable figure when doling out maintenance_work_mem * Might as well use reliable figure when doling out maintenance_work_mem
* (when requested number of workers were not launched, this will be * (when requested number of workers were not launched, this will be
@ -2717,16 +2701,14 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts; sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
/* Perform work common to all participants */ /* Perform work common to all participants */
_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared, _brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
brinleader->sharedsort, heap, index, sortmem, true); brinleader->sharedsort, heap, index, sortmem, true);
} }
/* /*
* Perform a worker's portion of a parallel sort. * Perform a worker's portion of a parallel sort.
* *
* This generates a tuplesort for passed btspool, and a second tuplesort * This generates a tuplesort for the worker portion of the table.
* state if a second btspool is need (i.e. for unique index builds). All
* other spool fields should already be set when this is called.
* *
* sortmem is the amount of working memory to use within each worker, * sortmem is the amount of working memory to use within each worker,
* expressed in KBs. * expressed in KBs.
@ -2734,10 +2716,10 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
* When this returns, workers are done, and need only release resources. * When this returns, workers are done, and need only release resources.
*/ */
static void static void
_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool, _brin_parallel_scan_and_build(BrinBuildState *state,
BrinShared *brinshared, Sharedsort *sharedsort, BrinShared *brinshared, Sharedsort *sharedsort,
Relation heap, Relation index, int sortmem, Relation heap, Relation index,
bool progress) int sortmem, bool progress)
{ {
SortCoordinate coordinate; SortCoordinate coordinate;
TableScanDesc scan; TableScanDesc scan;
@ -2751,10 +2733,8 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
coordinate->sharedsort = sharedsort; coordinate->sharedsort = sharedsort;
/* Begin "partial" tuplesort */ /* Begin "partial" tuplesort */
brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap, state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
brinspool->index, TUPLESORT_NONE);
sortmem, coordinate,
TUPLESORT_NONE);
/* Join parallel scan */ /* Join parallel scan */
indexInfo = BuildIndexInfo(index); indexInfo = BuildIndexInfo(index);
@ -2770,7 +2750,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
form_and_spill_tuple(state); form_and_spill_tuple(state);
/* sort the BRIN ranges built by this worker */ /* sort the BRIN ranges built by this worker */
tuplesort_performsort(brinspool->sortstate); tuplesort_performsort(state->bs_sortstate);
state->bs_reltuples += reltuples; state->bs_reltuples += reltuples;
@ -2786,7 +2766,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
/* Notify leader */ /* Notify leader */
ConditionVariableSignal(&brinshared->workersdonecv); ConditionVariableSignal(&brinshared->workersdonecv);
tuplesort_end(brinspool->sortstate); tuplesort_end(state->bs_sortstate);
} }
/* /*
@ -2844,11 +2824,6 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
brinshared->pagesPerRange, brinshared->pagesPerRange,
InvalidBlockNumber); InvalidBlockNumber);
/* Initialize worker's own spool */
buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
buildstate->bs_spool->heap = heapRel;
buildstate->bs_spool->index = indexRel;
/* Look up shared state private to tuplesort.c */ /* Look up shared state private to tuplesort.c */
sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
tuplesort_attach_shared(sharedsort, seg); tuplesort_attach_shared(sharedsort, seg);
@ -2863,8 +2838,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
*/ */
sortmem = maintenance_work_mem / brinshared->scantuplesortstates; sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
brinshared, sharedsort,
heapRel, indexRel, sortmem, false); heapRel, indexRel, sortmem, false);
/* Report WAL/buffer usage during parallel execution */ /* Report WAL/buffer usage during parallel execution */

View File

@ -137,16 +137,6 @@ typedef struct
uint32 max_buckets; uint32 max_buckets;
} TuplesortIndexHashArg; } TuplesortIndexHashArg;
/*
* Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase.
*/
typedef struct
{
TuplesortIndexArg index;
/* XXX do we need something here? */
} TuplesortIndexBrinArg;
/* /*
* Data struture pointed by "TuplesortPublic.arg" for the Datum case. * Data struture pointed by "TuplesortPublic.arg" for the Datum case.
* Set by tuplesort_begin_datum and used only by the DatumTuple routines. * Set by tuplesort_begin_datum and used only by the DatumTuple routines.
@ -562,20 +552,13 @@ tuplesort_begin_index_gist(Relation heapRel,
} }
Tuplesortstate * Tuplesortstate *
tuplesort_begin_index_brin(Relation heapRel, tuplesort_begin_index_brin(int workMem,
Relation indexRel,
int workMem,
SortCoordinate coordinate, SortCoordinate coordinate,
int sortopt) int sortopt)
{ {
Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
sortopt); sortopt);
TuplesortPublic *base = TuplesortstateGetPublic(state); TuplesortPublic *base = TuplesortstateGetPublic(state);
MemoryContext oldcontext;
TuplesortIndexBrinArg *arg;
oldcontext = MemoryContextSwitchTo(base->maincontext);
arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg));
#ifdef TRACE_SORT #ifdef TRACE_SORT
if (trace_sort) if (trace_sort)
@ -592,12 +575,7 @@ tuplesort_begin_index_brin(Relation heapRel,
base->writetup = writetup_index_brin; base->writetup = writetup_index_brin;
base->readtup = readtup_index_brin; base->readtup = readtup_index_brin;
base->haveDatum1 = true; base->haveDatum1 = true;
base->arg = arg; base->arg = NULL;
arg->index.heapRel = heapRel;
arg->index.indexRel = indexRel;
MemoryContextSwitchTo(oldcontext);
return state; return state;
} }

View File

@ -430,9 +430,7 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
Relation indexRel, Relation indexRel,
int workMem, SortCoordinate coordinate, int workMem, SortCoordinate coordinate,
int sortopt); int sortopt);
extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel, extern Tuplesortstate *tuplesort_begin_index_brin(int workMem, SortCoordinate coordinate,
Relation indexRel,
int workMem, SortCoordinate coordinate,
int sortopt); int sortopt);
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, Oid sortCollation, Oid sortOperator, Oid sortCollation,

View File

@ -307,7 +307,6 @@ BrinRevmap
BrinShared BrinShared
BrinSortTuple BrinSortTuple
BrinSpecialSpace BrinSpecialSpace
BrinSpool
BrinStatsData BrinStatsData
BrinTuple BrinTuple
BrinValues BrinValues