From b437571714707bc6466abde1a0af5e69aaade09c Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Fri, 8 Dec 2023 18:15:23 +0100 Subject: [PATCH] Allow parallel CREATE INDEX for BRIN indexes Allow using multiple worker processes to build BRIN index, which until now was supported only for BTREE indexes. For large tables this often results in significant speedup when the build is CPU-bound. The work is split in a simple way - each worker builds BRIN summaries on a subset of the table, determined by the regular parallel scan used to read the data, and feeds them into a shared tuplesort which sorts them by blkno (start of the range). The leader then reads this sorted stream of ranges, merges duplicates (which may happen if the parallel scan does not align with BRIN pages_per_range), and adds the resulting ranges into the index. The number of duplicate results produced by workers (requiring merging in the leader process) should be fairly small, thanks to how parallel scans assign chunks to workers. The likelihood of duplicate results may increase for higher pages_per_range values, but then there are fewer page ranges in total. In any case, we expect the merging to be much cheaper than summarization, so this should be a win. Most of the parallelism infrastructure is a simplified copy of the code used by BTREE indexes, omitting the parts irrelevant for BRIN indexes (e.g. uniqueness checks). This also introduces a new index AM flag amcanbuildparallel, determining whether to attempt to start parallel workers for the index build. Original patch by me, with reviews and substantial reworks by Matthias van de Meent, certainly enough to make him a co-author. Author: Tomas Vondra, Matthias van de Meent Reviewed-by: Matthias van de Meent Discussion: https://postgr.es/m/c2ee7d69-ce17-43f2-d1a0-9811edbda6e6%40enterprisedb.com --- contrib/bloom/blutils.c | 1 + doc/src/sgml/indexam.sgml | 7 + src/backend/access/brin/brin.c | 888 +++++++++++++++++- src/backend/access/gin/ginutil.c | 1 + src/backend/access/gist/gist.c | 1 + src/backend/access/hash/hash.c | 1 + src/backend/access/nbtree/nbtree.c | 1 + src/backend/access/spgist/spgutils.c | 1 + src/backend/access/transam/parallel.c | 4 + src/backend/catalog/index.c | 2 +- src/backend/utils/sort/tuplesortvariants.c | 207 ++++ src/include/access/amapi.h | 2 + src/include/access/brin.h | 3 + src/include/utils/tuplesort.h | 11 + .../modules/dummy_index_am/dummy_index_am.c | 1 + src/tools/pgindent/typedefs.list | 5 + 16 files changed, 1119 insertions(+), 17 deletions(-) diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c index 4830cb3fee..a781c5d98d 100644 --- a/contrib/bloom/blutils.c +++ b/contrib/bloom/blutils.c @@ -122,6 +122,7 @@ blhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amparallelvacuumoptions = diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml index f107c43d6a..cc4135e394 100644 --- a/doc/src/sgml/indexam.sgml +++ b/doc/src/sgml/indexam.sgml @@ -123,6 +123,8 @@ typedef struct IndexAmRoutine bool ampredlocks; /* does AM support parallel scan? */ bool amcanparallel; + /* does AM support parallel build? */ + bool amcanbuildparallel; /* does AM support columns included with clause INCLUDE? */ bool amcaninclude; /* does AM use maintenance_work_mem? */ @@ -286,6 +288,11 @@ ambuild (Relation heapRelation, and compute the keys that need to be inserted into the index. The function must return a palloc'd struct containing statistics about the new index. + The amcanbuildparallel flag indicates whether + the access method supports parallel index builds. When set to true, + the system will attempt to allocate parallel workers for the build. + Access methods supporting only non-parallel index builds should leave + this flag set to false. diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 14be939ad8..23f081389b 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -33,6 +33,7 @@ #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "storage/freespace.h" +#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" @@ -40,7 +41,119 @@ #include "utils/index_selfuncs.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/tuplesort.h" +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) + +/* + * Status record for spooling/sorting phase. + */ +typedef struct BrinSpool +{ + Tuplesortstate *sortstate; /* state data for tuplesort.c */ + Relation heap; + Relation index; +} BrinSpool; + +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. + */ +typedef struct BrinShared +{ + /* + * These fields are not modified during the build. They primarily exist + * for the benefit of worker processes that need to create state + * corresponding to that used by the leader. + */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + BlockNumber pagesPerRange; + int scantuplesortstates; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can use + * results built by the workers (and before leader can write the data into + * the index). + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all fields before heapdesc. + * + * These fields contain status information of interest to BRIN index + * builds that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of the scans. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * ParallelTableScanDescData data follows. Can't directly embed here, as + * implementations of the parallel table scan desc interface might need + * stronger alignment. + */ +} BrinShared; + +/* + * Return pointer to a BrinShared's parallel table scan. + * + * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just + * MAXALIGN. + */ +#define ParallelTableScanFromBrinShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared))) + +/* + * Status for leader in parallel index build. + */ +typedef struct BrinLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipanttuplesorts is the exact number of worker processes + * successfully launched, plus one leader process if it participates as a + * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader + * participating as a worker). + */ + int nparticipanttuplesorts; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * brinshared is the shared state for entire build. sharedsort is the + * shared, tuplesort-managed state passed to each process tuplesort. + * snapshot is the snapshot used by the scan iff an MVCC snapshot is + * required. + */ + BrinShared *brinshared; + Sharedsort *sharedsort; + Snapshot snapshot; + WalUsage *walusage; + BufferUsage *bufferusage; +} BrinLeader; /* * We use a BrinBuildState during initial construction of a BRIN index. @@ -49,7 +162,8 @@ typedef struct BrinBuildState { Relation bs_irel; - int bs_numtuples; + double bs_numtuples; + double bs_reltuples; Buffer bs_currentInsertBuf; BlockNumber bs_pagesPerRange; BlockNumber bs_currRangeStart; @@ -57,9 +171,19 @@ typedef struct BrinBuildState BrinRevmap *bs_rmAccess; BrinDesc *bs_bdesc; BrinMemTuple *bs_dtuple; + BrinTuple *bs_emptyTuple; Size bs_emptyTupleLen; MemoryContext bs_context; + + /* + * bs_leader is only present when a parallel index build is performed, and + * only in the leader process. (Actually, only the leader process has a + * BrinBuildState.) + */ + BrinLeader *bs_leader; + int bs_worker_id; + BrinSpool *bs_spool; } BrinBuildState; /* @@ -94,6 +218,7 @@ static void terminate_brin_buildstate(BrinBuildState *state); static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange, bool include_partial, double *numSummarized, double *numExisting); static void form_and_insert_tuple(BrinBuildState *state); +static void form_and_spill_tuple(BrinBuildState *state); static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b); static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy); @@ -103,6 +228,20 @@ static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys); static void brin_fill_empty_ranges(BrinBuildState *state, BlockNumber prevRange, BlockNumber maxRange); +/* parallel index builds */ +static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request); +static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state); +static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, + Relation heap, Relation index); +static void _brin_parallel_scan_and_build(BrinBuildState *buildstate, + BrinSpool *brinspool, + BrinShared *brinshared, + Sharedsort *sharedsort, + Relation heap, Relation index, + int sortmem, bool progress); + /* * BRIN handler function: return IndexAmRoutine with access method parameters * and callbacks. @@ -127,6 +266,7 @@ brinhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = true; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = true; @@ -882,6 +1022,65 @@ brinbuildCallback(Relation index, values, isnull); } +/* + * Per-heap-tuple callback for table_index_build_scan with parallelism. + * + * A version of the callback used by parallel index builds. The main difference + * is that instead of writing the BRIN tuples into the index, we write them + * into a shared tuplesort, and leave the insertion up to the leader (which may + * reorder them a bit etc.). The callback also does not generate empty ranges, + * those will be added by the leader when merging results from workers. + */ +static void +brinbuildCallbackParallel(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate) +{ + BrinBuildState *state = (BrinBuildState *) brstate; + BlockNumber thisblock; + + thisblock = ItemPointerGetBlockNumber(tid); + + /* + * If we're in a block that belongs to a future range, summarize what + * we've got and start afresh. Note the scan might have skipped many + * pages, if they were devoid of live tuples; we do not create emptry BRIN + * ranges here - the leader is responsible for filling them in. + */ + if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1) + { + + BRIN_elog((DEBUG2, + "brinbuildCallback: completed a range: %u--%u", + state->bs_currRangeStart, + state->bs_currRangeStart + state->bs_pagesPerRange)); + + /* create the index tuple and write it into the tuplesort */ + form_and_spill_tuple(state); + + /* + * Set state to correspond to the next range (for this block). + * + * This skips ranges that are either empty (and so we don't get any + * tuples to summarize), or processed by other workers. We can't + * differentiate those cases here easily, so we leave it up to the + * leader to fill empty ranges where needed. + */ + state->bs_currRangeStart + = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange); + + /* re-initialize state for it */ + brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc); + } + + /* Accumulate the current tuple into the running state */ + (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple, + values, isnull); +} + /* * brinbuild() -- build a new BRIN index. */ @@ -944,29 +1143,105 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) state = initialize_brin_buildstate(index, revmap, pagesPerRange, RelationGetNumberOfBlocks(heap)); + state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + state->bs_spool->heap = heap; + state->bs_spool->index = index; + + /* + * Attempt to launch parallel worker scan when required + * + * XXX plan_create_index_workers makes the number of workers dependent on + * maintenance_work_mem, requiring 32MB for each worker. That makes sense + * for btree, but not for BRIN, which can do away with much less memory. + * So maybe make that somehow less strict, optionally? + */ + if (indexInfo->ii_ParallelWorkers > 0) + _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); + /* * Now scan the relation. No syncscan allowed here because we want the * heap blocks in physical order. - */ - reltuples = table_index_build_scan(heap, index, indexInfo, false, true, - brinbuildCallback, (void *) state, NULL); - - /* process the final batch */ - form_and_insert_tuple(state); - - /* - * Backfill the final ranges with empty data. * - * This saves us from doing what amounts to full table scans when the - * index with a predicate like WHERE (nonnull_column IS NULL), or other - * very selective predicates. + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state */ - brin_fill_empty_ranges(state, - state->bs_currRangeStart, - state->bs_maxRangeStart); + if (state->bs_leader) + { + SortCoordinate coordinate; + + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = + state->bs_leader->nparticipanttuplesorts; + coordinate->sharedsort = state->bs_leader->sharedsort; + + + /* + * Begin serial/leader tuplesort. + * + * In cases where parallelism is involved, the leader receives the + * same share of maintenance_work_mem as a serial sort (it is + * generally treated in the same way as a serial sort once we return). + * Parallel worker Tuplesortstates will have received only a fraction + * of maintenance_work_mem, though. + * + * We rely on the lifetime of the Leader Tuplesortstate almost not + * overlapping with any worker Tuplesortstate's lifetime. There may + * be some small overlap, but that's okay because we rely on leader + * Tuplesortstate only allocating a small, fixed amount of memory + * here. When its tuplesort_performsort() is called (by our caller), + * and significant amounts of memory are likely to be used, all + * workers must have already freed almost all memory held by their + * Tuplesortstates (they are about to go away completely, too). The + * overall effect is that maintenance_work_mem always represents an + * absolute high watermark on the amount of memory used by a CREATE + * INDEX operation, regardless of the use of parallelism or any other + * factor. + */ + state->bs_spool->sortstate = + tuplesort_begin_index_brin(heap, index, + maintenance_work_mem, coordinate, + TUPLESORT_NONE); + + /* + * In parallel mode, wait for workers to complete, and then read all + * tuples from the shared tuplesort and insert them into the index. + */ + _brin_end_parallel(state->bs_leader, state); + } + else /* no parallel index build */ + { + reltuples = table_index_build_scan(heap, index, indexInfo, false, true, + brinbuildCallback, (void *) state, NULL); + + /* + * process the final batch + * + * XXX Note this does not update state->bs_currRangeStart, i.e. it + * stays set to the last range added to the index. This is OK, because + * that's what brin_fill_empty_ranges expects. + */ + form_and_insert_tuple(state); + + /* + * Backfill the final ranges with empty data. + * + * This saves us from doing what amounts to full table scans when the + * index with a predicate like WHERE (nonnull_column IS NULL), or + * other very selective predicates. + */ + brin_fill_empty_ranges(state, + state->bs_currRangeStart, + state->bs_maxRangeStart); + + /* track the number of relation tuples */ + state->bs_reltuples = reltuples; + } /* release resources */ idxtuples = state->bs_numtuples; + reltuples = state->bs_reltuples; brinRevmapTerminate(state->bs_rmAccess); terminate_brin_buildstate(state); @@ -1387,12 +1662,19 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap, state->bs_irel = idxRel; state->bs_numtuples = 0; + state->bs_reltuples = 0; state->bs_currentInsertBuf = InvalidBuffer; state->bs_pagesPerRange = pagesPerRange; state->bs_currRangeStart = 0; state->bs_rmAccess = revmap; state->bs_bdesc = brin_build_desc(idxRel); state->bs_dtuple = brin_new_memtuple(state->bs_bdesc); + state->bs_leader = NULL; + state->bs_worker_id = 0; + state->bs_spool = NULL; + state->bs_context = CurrentMemoryContext; + state->bs_emptyTuple = NULL; + state->bs_emptyTupleLen = 0; /* Remember the memory context to use for an empty tuple, if needed. */ state->bs_context = CurrentMemoryContext; @@ -1701,6 +1983,32 @@ form_and_insert_tuple(BrinBuildState *state) pfree(tup); } +/* + * Given a deformed tuple in the build state, convert it into the on-disk + * format and write it to a (shared) tuplesort (the leader will insert it + * into the index later). + */ +static void +form_and_spill_tuple(BrinBuildState *state) +{ + BrinTuple *tup; + Size size; + + /* don't insert empty tuples in parallel build */ + if (state->bs_dtuple->bt_empty_range) + return; + + tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart, + state->bs_dtuple, &size); + + /* write the BRIN tuple to the tuplesort */ + tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size); + + state->bs_numtuples++; + + pfree(tup); +} + /* * Given two deformed tuples, adjust the first one so that it's consistent * with the summary values in both. @@ -2021,6 +2329,554 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys) return true; } +static void +_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estbrinshared; + Size estsort; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader)); + WalUsage *walusage; + BufferUsage *bufferusage; + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of brin + * index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", + request); + + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. + */ + estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + + shm_toc_estimate_keys(&pcxt->estimator, 2); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE + * and PARALLEL_KEY_BUFFER_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgWalUsage or + * pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); + /* Initialize immutable state */ + brinshared->heaprelid = RelationGetRelid(heap); + brinshared->indexrelid = RelationGetRelid(index); + brinshared->isconcurrent = isconcurrent; + brinshared->scantuplesortstates = scantuplesortstates; + brinshared->pagesPerRange = buildstate->bs_pagesPerRange; + ConditionVariableInit(&brinshared->workersdonecv); + SpinLockInit(&brinshared->mutex); + + /* Initialize mutable state */ + brinshared->nparticipantsdone = 0; + brinshared->reltuples = 0.0; + brinshared->indtuples = 0.0; + + table_parallelscan_initialize(heap, + ParallelTableScanFromBrinShared(brinshared), + snapshot); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + brinleader->pcxt = pcxt; + brinleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + brinleader->nparticipanttuplesorts++; + brinleader->brinshared = brinshared; + brinleader->sharedsort = sharedsort; + brinleader->snapshot = snapshot; + brinleader->walusage = walusage; + brinleader->bufferusage = bufferusage; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _brin_end_parallel(brinleader, NULL); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->bs_leader = brinleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _brin_leader_participate_as_worker(buildstate, heap, index); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) +{ + int i; + BrinTuple *btup; + BrinMemTuple *memtuple = NULL; + Size tuplen; + BrinShared *brinshared = brinleader->brinshared; + BlockNumber prevblkno = InvalidBlockNumber; + BrinSpool *spool; + MemoryContext rangeCxt, + oldCxt; + + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(brinleader->pcxt); + + /* + * If we didn't actually launch workers, we still have to make sure to + * exit parallel mode. + */ + if (!state) + goto cleanup; + + /* copy the data into leader state (we have to wait for the workers ) */ + state->bs_reltuples = brinshared->reltuples; + state->bs_numtuples = brinshared->indtuples; + + /* do the actual sort in the leader */ + spool = state->bs_spool; + tuplesort_performsort(spool->sortstate); + + /* + * Initialize BrinMemTuple we'll use to union summaries from workers (in + * case they happened to produce parts of the same paga range). + */ + memtuple = brin_new_memtuple(state->bs_bdesc); + + /* + * Create a memory context we'll reset to combine results for a single + * page range (received from the workers). We don't expect huge number of + * overlaps under regular circumstances, because for large tables the + * chunk size is likely larger than the BRIN page range), but it can + * happen, and the union functions may do all kinds of stuff. So we better + * reset the context once in a while. + */ + rangeCxt = AllocSetContextCreate(CurrentMemoryContext, + "brin union", + ALLOCSET_DEFAULT_SIZES); + oldCxt = MemoryContextSwitchTo(rangeCxt); + + /* + * Read the BRIN tuples from the shared tuplesort, sorted by block number. + * That probably gives us an index that is cheaper to scan, thanks to + * mostly getting data from the same index page as before. + */ + while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL) + { + /* Ranges should be multiples of pages_per_range for the index. */ + Assert(btup->bt_blkno % brinshared->pagesPerRange == 0); + + /* + * Do we need to union summaries for the same page range? + * + * If this is the first brin tuple we read, then just deform it into + * the memtuple, and continue with the next one from tuplesort. We + * however may need to insert empty summaries into the index. + * + * If it's the same block as the last we saw, we simply union the brin + * tuple into it, and we're done - we don't even need to insert empty + * ranges, because that was done earlier when we saw the first brin + * tuple (for this range). + * + * Finally, if it's not the first brin tuple, and it's not the same + * page range, we need to do the insert and then deform the tuple into + * the memtuple. Then we'll insert empty ranges before the new brin + * tuple, if needed. + */ + if (prevblkno == InvalidBlockNumber) + { + /* First brin tuples, just deform into memtuple. */ + memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple); + + /* continue to insert empty pages before thisblock */ + } + else if (memtuple->bt_blkno == btup->bt_blkno) + { + /* + * Not the first brin tuple, but same page range as the previous + * one, so we can merge it into the memtuple. + */ + union_tuples(state->bs_bdesc, memtuple, btup); + continue; + } + else + { + BrinTuple *tmp; + Size len; + + /* + * We got brin tuple for a different page range, so form a brin + * tuple from the memtuple, insert it, and re-init the memtuple + * from the new brin tuple. + */ + tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno, + memtuple, &len); + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len); + + /* + * Reset the per-output-range context. This frees all the memory + * possibly allocated by the union functions, and also the BRIN + * tuple we just formed and inserted. + */ + MemoryContextReset(rangeCxt); + + memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple); + + /* continue to insert empty pages before thisblock */ + } + + /* Fill empty ranges for all ranges missing in the tuplesort. */ + brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno); + + prevblkno = btup->bt_blkno; + } + + tuplesort_end(spool->sortstate); + + /* Fill the BRIN tuple for the last page range with data. */ + if (prevblkno != InvalidBlockNumber) + { + BrinTuple *tmp; + Size len; + + tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno, + memtuple, &len); + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len); + + pfree(tmp); + } + + /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */ + brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart); + + /* + * Switch back to the original memory context, and destroy the one we + * created to isolate the union_tuple calls. + */ + MemoryContextSwitchTo(oldCxt); + MemoryContextDelete(rangeCxt); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) + */ + for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + +cleanup: + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(brinleader->snapshot)) + UnregisterSnapshot(brinleader->snapshot); + DestroyParallelContext(brinleader->pcxt); + ExitParallelMode(); +} + +/* + * Returns size of shared memory required to store state for a parallel + * brin index build based on the snapshot its parallel scan will use. + */ +static Size +_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(sizeof(BrinShared)), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index) +{ + BrinLeader *brinleader = buildstate->bs_leader; + int sortmem; + + /* Allocate memory and initialize private spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = buildstate->bs_spool->heap; + buildstate->bs_spool->index = buildstate->bs_spool->index; + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts; + + /* Perform work common to all participants */ + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared, + brinleader->sharedsort, heap, index, sortmem, true); +} + +/* + * Perform a worker's portion of a parallel sort. + * + * This generates a tuplesort for passed btspool, and a second tuplesort + * state if a second btspool is need (i.e. for unique index builds). All + * other spool fields should already be set when this is called. + * + * sortmem is the amount of working memory to use within each worker, + * expressed in KBs. + * + * When this returns, workers are done, and need only release resources. + */ +static void +_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool, + BrinShared *brinshared, Sharedsort *sharedsort, + Relation heap, Relation index, int sortmem, + bool progress) +{ + SortCoordinate coordinate; + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* Begin "partial" tuplesort */ + brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap, + brinspool->index, + sortmem, coordinate, + TUPLESORT_NONE); + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(index); + indexInfo->ii_Concurrent = brinshared->isconcurrent; + + scan = table_beginscan_parallel(heap, + ParallelTableScanFromBrinShared(brinshared)); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + brinbuildCallbackParallel, state, scan); + + /* insert the last item */ + form_and_spill_tuple(state); + + /* sort the BRIN ranges built by this worker */ + tuplesort_performsort(brinspool->sortstate); + + state->bs_reltuples += reltuples; + + /* + * Done. Record ambuild statistics. + */ + SpinLockAcquire(&brinshared->mutex); + brinshared->nparticipantsdone++; + brinshared->reltuples += state->bs_reltuples; + brinshared->indtuples += state->bs_numtuples; + SpinLockRelease(&brinshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&brinshared->workersdonecv); + + tuplesort_end(brinspool->sortstate); +} + +/* + * Perform work within a launched parallel process. + */ +void +_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinBuildState *buildstate; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + WalUsage *walusage; + BufferUsage *bufferusage; + int sortmem; + + /* + * The only possible status flag that can be set to the parallel worker is + * PROC_IN_SAFE_IC. + */ + Assert((MyProc->statusFlags == 0) || + (MyProc->statusFlags == PROC_IN_SAFE_IC)); + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up brin shared state */ + brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!brinshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = table_open(brinshared->heaprelid, heapLockmode); + indexRel = index_open(brinshared->indexrelid, indexLockmode); + + buildstate = initialize_brin_buildstate(indexRel, NULL, + brinshared->pagesPerRange, + InvalidBlockNumber); + + /* Initialize worker's own spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = heapRel; + buildstate->bs_spool->index = indexRel; + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, + brinshared, sharedsort, + heapRel, indexRel, sortmem, false); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + /* * brin_build_empty_tuple * Maybe initialize a BRIN tuple representing empty range. diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c index a875c5d3d7..9b1a0ac345 100644 --- a/src/backend/access/gin/ginutil.c +++ b/src/backend/access/gin/ginutil.c @@ -54,6 +54,7 @@ ginhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = true; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = true; amroutine->amsummarizing = false; diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 9a1bf8f66c..e052ba8bda 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -76,6 +76,7 @@ gisthandler(PG_FUNCTION_ARGS) amroutine->amclusterable = true; amroutine->ampredlocks = true; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 6443ff21bd..905519692c 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -73,6 +73,7 @@ hashhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = true; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 0930f9b37e..6c8cd93fa0 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -112,6 +112,7 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amclusterable = true; amroutine->ampredlocks = true; amroutine->amcanparallel = true; + amroutine->amcanbuildparallel = true; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 30c00876a5..fd4b615710 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -60,6 +60,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 194a1207be..d78314062e 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/brin.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -145,6 +146,9 @@ static const struct { "_bt_parallel_build_main", _bt_parallel_build_main }, + { + "_brin_parallel_build_main", _brin_parallel_build_main + }, { "parallel_vacuum_main", parallel_vacuum_main } diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index b8c7945322..7b186c0220 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2982,7 +2982,7 @@ index_build(Relation heapRelation, * Note that planner considers parallel safety for us. */ if (parallel && IsNormalProcessingMode() && - indexRelation->rd_rel->relam == BTREE_AM_OID) + indexRelation->rd_indam->amcanbuildparallel) indexInfo->ii_ParallelWorkers = plan_create_index_workers(RelationGetRelid(heapRelation), RelationGetRelid(indexRelation)); diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c index 2cd508e513..90fc605f1c 100644 --- a/src/backend/utils/sort/tuplesortvariants.c +++ b/src/backend/utils/sort/tuplesortvariants.c @@ -19,6 +19,7 @@ #include "postgres.h" +#include "access/brin_tuple.h" #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" @@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups, int count); static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups, int count); +static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, + int count); static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups, int count); static int comparetup_heap(const SortTuple *a, const SortTuple *b, @@ -69,10 +72,16 @@ static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_index_brin(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len); +static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b, @@ -128,6 +137,16 @@ typedef struct uint32 max_buckets; } TuplesortIndexHashArg; +/* + * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase. + */ +typedef struct +{ + TuplesortIndexArg index; + + /* XXX do we need something here? */ +} TuplesortIndexBrinArg; + /* * Data struture pointed by "TuplesortPublic.arg" for the Datum case. * Set by tuplesort_begin_datum and used only by the DatumTuple routines. @@ -140,6 +159,21 @@ typedef struct int datumTypeLen; } TuplesortDatumArg; +/* + * Computing BrinTuple size with only the tuple is difficult, so we want to track + * the length referenced by the SortTuple. That's what BrinSortTuple is meant + * to do - it's essentially a BrinTuple prefixed by its length. + */ +typedef struct BrinSortTuple +{ + Size tuplen; + BrinTuple tuple; +} BrinSortTuple; + +/* Size of the BrinSortTuple, given length of the BrinTuple. */ +#define BRINSORTTUPLE_SIZE(len) (offsetof(BrinSortTuple, tuple) + (len)) + + Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, @@ -527,6 +561,47 @@ tuplesort_begin_index_gist(Relation heapRel, return state; } +Tuplesortstate * +tuplesort_begin_index_brin(Relation heapRel, + Relation indexRel, + int workMem, + SortCoordinate coordinate, + int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext; + TuplesortIndexBrinArg *arg; + + oldcontext = MemoryContextSwitchTo(base->maincontext); + arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg)); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin index sort: workMem = %d, randomAccess = %c", + workMem, + sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + base->nKeys = 1; /* Only one sort column, the block number */ + + base->removeabbrev = removeabbrev_index_brin; + base->comparetup = comparetup_index_brin; + base->writetup = writetup_index_brin; + base->readtup = readtup_index_brin; + base->haveDatum1 = true; + base->arg = arg; + + arg->index.heapRel = heapRel; + arg->index.indexRel = indexRel; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, @@ -707,6 +782,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, !stup.isnull1); } +/* + * Collect one BRIN tuple while collecting input data for sort. + */ +void +tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size) +{ + SortTuple stup; + BrinSortTuple *bstup; + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext); + + /* allocate space for the whole BRIN sort tuple */ + bstup = palloc(BRINSORTTUPLE_SIZE(size)); + + bstup->tuplen = size; + memcpy(&bstup->tuple, tuple, size); + + stup.tuple = bstup; + stup.datum1 = tuple->bt_blkno; + stup.isnull1 = false; + + tuplesort_puttuple_common(state, &stup, + base->sortKeys && + base->sortKeys->abbrev_converter && + !stup.isnull1); + + MemoryContextSwitchTo(oldcontext); +} + /* * Accept one Datum while collecting input data for sort. * @@ -850,6 +954,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward) return (IndexTuple) stup.tuple; } +/* + * Fetch the next BRIN tuple in either forward or back direction. + * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory + * context, and must not be freed by caller. Caller may not rely on tuple + * remaining valid after any further manipulation of tuplesort. + */ +BrinTuple * +tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext); + SortTuple stup; + BrinSortTuple *btup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + stup.tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + if (!stup.tuple) + return NULL; + + btup = (BrinSortTuple *) stup.tuple; + + *len = btup->tuplen; + + return &btup->tuple; +} + /* * Fetch the next Datum in either forward or back direction. * Returns false if no more datums. @@ -1564,6 +1697,80 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } +/* + * Routines specialized for BrinTuple case + */ + +static void +removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count) +{ + int i; + + for (i = 0; i < count; i++) + { + BrinSortTuple *tuple; + + tuple = stups[i].tuple; + stups[i].datum1 = tuple->tuple.bt_blkno; + } +} + +static int +comparetup_index_brin(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + Assert(TuplesortstateGetPublic(state)->haveDatum1); + + if (DatumGetUInt32(a->datum1) > DatumGetUInt32(b->datum1)) + return 1; + + if (DatumGetUInt32(a->datum1) < DatumGetUInt32(b->datum1)) + return -1; + + /* silence compilers */ + return 0; +} + +static void +writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + BrinSortTuple *tuple = (BrinSortTuple *) stup->tuple; + unsigned int tuplen = tuple->tuplen; + + tuplen = tuplen + sizeof(tuplen); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen); + if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); +} + +static void +readtup_index_brin(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len) +{ + BrinSortTuple *tuple; + TuplesortPublic *base = TuplesortstateGetPublic(state); + unsigned int tuplen = len - sizeof(unsigned int); + + /* + * Allocate space for the BRIN sort tuple, which is BrinTuple with an + * extra length field. + */ + tuple = (BrinSortTuple *) tuplesort_readtup_alloc(state, + BRINSORTTUPLE_SIZE(tuplen)); + + tuple->tuplen = tuplen; + + LogicalTapeReadExact(tape, &tuple->tuple, tuplen); + if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); + stup->tuple = (void *) tuple; + + /* set up first-column key value, which is block number */ + stup->datum1 = tuple->tuple.bt_blkno; +} + /* * Routines specialized for DatumTuple case */ diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h index 244459587f..df85ae3aac 100644 --- a/src/include/access/amapi.h +++ b/src/include/access/amapi.h @@ -243,6 +243,8 @@ typedef struct IndexAmRoutine bool ampredlocks; /* does AM support parallel scan? */ bool amcanparallel; + /* does AM support parallel build? */ + bool amcanbuildparallel; /* does AM support columns included with clause INCLUDE? */ bool amcaninclude; /* does AM use maintenance_work_mem? */ diff --git a/src/include/access/brin.h b/src/include/access/brin.h index ed66f1b3d5..3451ecb211 100644 --- a/src/include/access/brin.h +++ b/src/include/access/brin.h @@ -11,6 +11,7 @@ #define BRIN_H #include "nodes/execnodes.h" +#include "storage/shm_toc.h" #include "utils/relcache.h" @@ -52,4 +53,6 @@ typedef struct BrinStatsData extern void brinGetStats(Relation index, BrinStatsData *stats); +extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc); + #endif /* BRIN_H */ diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 9ed2de76cd..357eb35311 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -21,6 +21,7 @@ #ifndef TUPLESORT_H #define TUPLESORT_H +#include "access/brin_tuple.h" #include "access/itup.h" #include "executor/tuptable.h" #include "storage/dsm.h" @@ -282,6 +283,9 @@ typedef struct * The "index_hash" API is similar to index_btree, but the tuples are * actually sorted by their hash codes not the raw data. * + * The "index_brin" API is similar to index_btree, but the tuples are + * BrinTuple and are sorted by their block number not the raw data. + * * Parallel sort callers are required to coordinate multiple tuplesort states * in a leader process and one or more worker processes. The leader process * must launch workers, and have each perform an independent "partial" @@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel, Relation indexRel, int workMem, SortCoordinate coordinate, int sortopt); +extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel, + Relation indexRel, + int workMem, SortCoordinate coordinate, + int sortopt); extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, @@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup); extern void tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ItemPointer self, const Datum *values, const bool *isnull); +extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len); extern void tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull); @@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev); extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward); extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward); +extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len, + bool forward); extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy, Datum *val, bool *isNull, Datum *abbrev); diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c index cbdae7ab7a..eaa0c483b7 100644 --- a/src/test/modules/dummy_index_am/dummy_index_am.c +++ b/src/test/modules/dummy_index_am/dummy_index_am.c @@ -294,6 +294,7 @@ dihandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0a324aa4e7..1053f676c3 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -297,13 +297,17 @@ BpChar BrinBuildState BrinDesc BrinInsertState +BrinLeader BrinMemTuple BrinMetaPageData BrinOpaque BrinOpcInfo BrinOptions BrinRevmap +BrinShared +BrinSortTuple BrinSpecialSpace +BrinSpool BrinStatsData BrinTuple BrinValues @@ -2883,6 +2887,7 @@ TupleTableSlotOps TuplesortClusterArg TuplesortDatumArg TuplesortIndexArg +TuplesortIndexBrinArg TuplesortIndexBTreeArg TuplesortIndexHashArg TuplesortInstrumentation