diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c index 4830cb3fee..a781c5d98d 100644 --- a/contrib/bloom/blutils.c +++ b/contrib/bloom/blutils.c @@ -122,6 +122,7 @@ blhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amparallelvacuumoptions = diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml index f107c43d6a..cc4135e394 100644 --- a/doc/src/sgml/indexam.sgml +++ b/doc/src/sgml/indexam.sgml @@ -123,6 +123,8 @@ typedef struct IndexAmRoutine bool ampredlocks; /* does AM support parallel scan? */ bool amcanparallel; + /* does AM support parallel build? */ + bool amcanbuildparallel; /* does AM support columns included with clause INCLUDE? */ bool amcaninclude; /* does AM use maintenance_work_mem? */ @@ -286,6 +288,11 @@ ambuild (Relation heapRelation, and compute the keys that need to be inserted into the index. The function must return a palloc'd struct containing statistics about the new index. + The amcanbuildparallel flag indicates whether + the access method supports parallel index builds. When set to true, + the system will attempt to allocate parallel workers for the build. + Access methods supporting only non-parallel index builds should leave + this flag set to false. diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 14be939ad8..23f081389b 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -33,6 +33,7 @@ #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "storage/freespace.h" +#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" @@ -40,7 +41,119 @@ #include "utils/index_selfuncs.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/tuplesort.h" +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) + +/* + * Status record for spooling/sorting phase. + */ +typedef struct BrinSpool +{ + Tuplesortstate *sortstate; /* state data for tuplesort.c */ + Relation heap; + Relation index; +} BrinSpool; + +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. + */ +typedef struct BrinShared +{ + /* + * These fields are not modified during the build. They primarily exist + * for the benefit of worker processes that need to create state + * corresponding to that used by the leader. + */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + BlockNumber pagesPerRange; + int scantuplesortstates; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can use + * results built by the workers (and before leader can write the data into + * the index). + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all fields before heapdesc. + * + * These fields contain status information of interest to BRIN index + * builds that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of the scans. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * ParallelTableScanDescData data follows. Can't directly embed here, as + * implementations of the parallel table scan desc interface might need + * stronger alignment. + */ +} BrinShared; + +/* + * Return pointer to a BrinShared's parallel table scan. + * + * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just + * MAXALIGN. + */ +#define ParallelTableScanFromBrinShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared))) + +/* + * Status for leader in parallel index build. + */ +typedef struct BrinLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipanttuplesorts is the exact number of worker processes + * successfully launched, plus one leader process if it participates as a + * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader + * participating as a worker). + */ + int nparticipanttuplesorts; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * brinshared is the shared state for entire build. sharedsort is the + * shared, tuplesort-managed state passed to each process tuplesort. + * snapshot is the snapshot used by the scan iff an MVCC snapshot is + * required. + */ + BrinShared *brinshared; + Sharedsort *sharedsort; + Snapshot snapshot; + WalUsage *walusage; + BufferUsage *bufferusage; +} BrinLeader; /* * We use a BrinBuildState during initial construction of a BRIN index. @@ -49,7 +162,8 @@ typedef struct BrinBuildState { Relation bs_irel; - int bs_numtuples; + double bs_numtuples; + double bs_reltuples; Buffer bs_currentInsertBuf; BlockNumber bs_pagesPerRange; BlockNumber bs_currRangeStart; @@ -57,9 +171,19 @@ typedef struct BrinBuildState BrinRevmap *bs_rmAccess; BrinDesc *bs_bdesc; BrinMemTuple *bs_dtuple; + BrinTuple *bs_emptyTuple; Size bs_emptyTupleLen; MemoryContext bs_context; + + /* + * bs_leader is only present when a parallel index build is performed, and + * only in the leader process. (Actually, only the leader process has a + * BrinBuildState.) + */ + BrinLeader *bs_leader; + int bs_worker_id; + BrinSpool *bs_spool; } BrinBuildState; /* @@ -94,6 +218,7 @@ static void terminate_brin_buildstate(BrinBuildState *state); static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange, bool include_partial, double *numSummarized, double *numExisting); static void form_and_insert_tuple(BrinBuildState *state); +static void form_and_spill_tuple(BrinBuildState *state); static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b); static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy); @@ -103,6 +228,20 @@ static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys); static void brin_fill_empty_ranges(BrinBuildState *state, BlockNumber prevRange, BlockNumber maxRange); +/* parallel index builds */ +static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request); +static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state); +static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, + Relation heap, Relation index); +static void _brin_parallel_scan_and_build(BrinBuildState *buildstate, + BrinSpool *brinspool, + BrinShared *brinshared, + Sharedsort *sharedsort, + Relation heap, Relation index, + int sortmem, bool progress); + /* * BRIN handler function: return IndexAmRoutine with access method parameters * and callbacks. @@ -127,6 +266,7 @@ brinhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = true; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = true; @@ -882,6 +1022,65 @@ brinbuildCallback(Relation index, values, isnull); } +/* + * Per-heap-tuple callback for table_index_build_scan with parallelism. + * + * A version of the callback used by parallel index builds. The main difference + * is that instead of writing the BRIN tuples into the index, we write them + * into a shared tuplesort, and leave the insertion up to the leader (which may + * reorder them a bit etc.). The callback also does not generate empty ranges, + * those will be added by the leader when merging results from workers. + */ +static void +brinbuildCallbackParallel(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate) +{ + BrinBuildState *state = (BrinBuildState *) brstate; + BlockNumber thisblock; + + thisblock = ItemPointerGetBlockNumber(tid); + + /* + * If we're in a block that belongs to a future range, summarize what + * we've got and start afresh. Note the scan might have skipped many + * pages, if they were devoid of live tuples; we do not create emptry BRIN + * ranges here - the leader is responsible for filling them in. + */ + if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1) + { + + BRIN_elog((DEBUG2, + "brinbuildCallback: completed a range: %u--%u", + state->bs_currRangeStart, + state->bs_currRangeStart + state->bs_pagesPerRange)); + + /* create the index tuple and write it into the tuplesort */ + form_and_spill_tuple(state); + + /* + * Set state to correspond to the next range (for this block). + * + * This skips ranges that are either empty (and so we don't get any + * tuples to summarize), or processed by other workers. We can't + * differentiate those cases here easily, so we leave it up to the + * leader to fill empty ranges where needed. + */ + state->bs_currRangeStart + = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange); + + /* re-initialize state for it */ + brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc); + } + + /* Accumulate the current tuple into the running state */ + (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple, + values, isnull); +} + /* * brinbuild() -- build a new BRIN index. */ @@ -944,29 +1143,105 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) state = initialize_brin_buildstate(index, revmap, pagesPerRange, RelationGetNumberOfBlocks(heap)); + state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + state->bs_spool->heap = heap; + state->bs_spool->index = index; + + /* + * Attempt to launch parallel worker scan when required + * + * XXX plan_create_index_workers makes the number of workers dependent on + * maintenance_work_mem, requiring 32MB for each worker. That makes sense + * for btree, but not for BRIN, which can do away with much less memory. + * So maybe make that somehow less strict, optionally? + */ + if (indexInfo->ii_ParallelWorkers > 0) + _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); + /* * Now scan the relation. No syncscan allowed here because we want the * heap blocks in physical order. - */ - reltuples = table_index_build_scan(heap, index, indexInfo, false, true, - brinbuildCallback, (void *) state, NULL); - - /* process the final batch */ - form_and_insert_tuple(state); - - /* - * Backfill the final ranges with empty data. * - * This saves us from doing what amounts to full table scans when the - * index with a predicate like WHERE (nonnull_column IS NULL), or other - * very selective predicates. + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state */ - brin_fill_empty_ranges(state, - state->bs_currRangeStart, - state->bs_maxRangeStart); + if (state->bs_leader) + { + SortCoordinate coordinate; + + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = + state->bs_leader->nparticipanttuplesorts; + coordinate->sharedsort = state->bs_leader->sharedsort; + + + /* + * Begin serial/leader tuplesort. + * + * In cases where parallelism is involved, the leader receives the + * same share of maintenance_work_mem as a serial sort (it is + * generally treated in the same way as a serial sort once we return). + * Parallel worker Tuplesortstates will have received only a fraction + * of maintenance_work_mem, though. + * + * We rely on the lifetime of the Leader Tuplesortstate almost not + * overlapping with any worker Tuplesortstate's lifetime. There may + * be some small overlap, but that's okay because we rely on leader + * Tuplesortstate only allocating a small, fixed amount of memory + * here. When its tuplesort_performsort() is called (by our caller), + * and significant amounts of memory are likely to be used, all + * workers must have already freed almost all memory held by their + * Tuplesortstates (they are about to go away completely, too). The + * overall effect is that maintenance_work_mem always represents an + * absolute high watermark on the amount of memory used by a CREATE + * INDEX operation, regardless of the use of parallelism or any other + * factor. + */ + state->bs_spool->sortstate = + tuplesort_begin_index_brin(heap, index, + maintenance_work_mem, coordinate, + TUPLESORT_NONE); + + /* + * In parallel mode, wait for workers to complete, and then read all + * tuples from the shared tuplesort and insert them into the index. + */ + _brin_end_parallel(state->bs_leader, state); + } + else /* no parallel index build */ + { + reltuples = table_index_build_scan(heap, index, indexInfo, false, true, + brinbuildCallback, (void *) state, NULL); + + /* + * process the final batch + * + * XXX Note this does not update state->bs_currRangeStart, i.e. it + * stays set to the last range added to the index. This is OK, because + * that's what brin_fill_empty_ranges expects. + */ + form_and_insert_tuple(state); + + /* + * Backfill the final ranges with empty data. + * + * This saves us from doing what amounts to full table scans when the + * index with a predicate like WHERE (nonnull_column IS NULL), or + * other very selective predicates. + */ + brin_fill_empty_ranges(state, + state->bs_currRangeStart, + state->bs_maxRangeStart); + + /* track the number of relation tuples */ + state->bs_reltuples = reltuples; + } /* release resources */ idxtuples = state->bs_numtuples; + reltuples = state->bs_reltuples; brinRevmapTerminate(state->bs_rmAccess); terminate_brin_buildstate(state); @@ -1387,12 +1662,19 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap, state->bs_irel = idxRel; state->bs_numtuples = 0; + state->bs_reltuples = 0; state->bs_currentInsertBuf = InvalidBuffer; state->bs_pagesPerRange = pagesPerRange; state->bs_currRangeStart = 0; state->bs_rmAccess = revmap; state->bs_bdesc = brin_build_desc(idxRel); state->bs_dtuple = brin_new_memtuple(state->bs_bdesc); + state->bs_leader = NULL; + state->bs_worker_id = 0; + state->bs_spool = NULL; + state->bs_context = CurrentMemoryContext; + state->bs_emptyTuple = NULL; + state->bs_emptyTupleLen = 0; /* Remember the memory context to use for an empty tuple, if needed. */ state->bs_context = CurrentMemoryContext; @@ -1701,6 +1983,32 @@ form_and_insert_tuple(BrinBuildState *state) pfree(tup); } +/* + * Given a deformed tuple in the build state, convert it into the on-disk + * format and write it to a (shared) tuplesort (the leader will insert it + * into the index later). + */ +static void +form_and_spill_tuple(BrinBuildState *state) +{ + BrinTuple *tup; + Size size; + + /* don't insert empty tuples in parallel build */ + if (state->bs_dtuple->bt_empty_range) + return; + + tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart, + state->bs_dtuple, &size); + + /* write the BRIN tuple to the tuplesort */ + tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size); + + state->bs_numtuples++; + + pfree(tup); +} + /* * Given two deformed tuples, adjust the first one so that it's consistent * with the summary values in both. @@ -2021,6 +2329,554 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys) return true; } +static void +_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estbrinshared; + Size estsort; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader)); + WalUsage *walusage; + BufferUsage *bufferusage; + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of brin + * index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", + request); + + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. + */ + estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + + shm_toc_estimate_keys(&pcxt->estimator, 2); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE + * and PARALLEL_KEY_BUFFER_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgWalUsage or + * pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); + /* Initialize immutable state */ + brinshared->heaprelid = RelationGetRelid(heap); + brinshared->indexrelid = RelationGetRelid(index); + brinshared->isconcurrent = isconcurrent; + brinshared->scantuplesortstates = scantuplesortstates; + brinshared->pagesPerRange = buildstate->bs_pagesPerRange; + ConditionVariableInit(&brinshared->workersdonecv); + SpinLockInit(&brinshared->mutex); + + /* Initialize mutable state */ + brinshared->nparticipantsdone = 0; + brinshared->reltuples = 0.0; + brinshared->indtuples = 0.0; + + table_parallelscan_initialize(heap, + ParallelTableScanFromBrinShared(brinshared), + snapshot); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + brinleader->pcxt = pcxt; + brinleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + brinleader->nparticipanttuplesorts++; + brinleader->brinshared = brinshared; + brinleader->sharedsort = sharedsort; + brinleader->snapshot = snapshot; + brinleader->walusage = walusage; + brinleader->bufferusage = bufferusage; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _brin_end_parallel(brinleader, NULL); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->bs_leader = brinleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _brin_leader_participate_as_worker(buildstate, heap, index); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) +{ + int i; + BrinTuple *btup; + BrinMemTuple *memtuple = NULL; + Size tuplen; + BrinShared *brinshared = brinleader->brinshared; + BlockNumber prevblkno = InvalidBlockNumber; + BrinSpool *spool; + MemoryContext rangeCxt, + oldCxt; + + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(brinleader->pcxt); + + /* + * If we didn't actually launch workers, we still have to make sure to + * exit parallel mode. + */ + if (!state) + goto cleanup; + + /* copy the data into leader state (we have to wait for the workers ) */ + state->bs_reltuples = brinshared->reltuples; + state->bs_numtuples = brinshared->indtuples; + + /* do the actual sort in the leader */ + spool = state->bs_spool; + tuplesort_performsort(spool->sortstate); + + /* + * Initialize BrinMemTuple we'll use to union summaries from workers (in + * case they happened to produce parts of the same paga range). + */ + memtuple = brin_new_memtuple(state->bs_bdesc); + + /* + * Create a memory context we'll reset to combine results for a single + * page range (received from the workers). We don't expect huge number of + * overlaps under regular circumstances, because for large tables the + * chunk size is likely larger than the BRIN page range), but it can + * happen, and the union functions may do all kinds of stuff. So we better + * reset the context once in a while. + */ + rangeCxt = AllocSetContextCreate(CurrentMemoryContext, + "brin union", + ALLOCSET_DEFAULT_SIZES); + oldCxt = MemoryContextSwitchTo(rangeCxt); + + /* + * Read the BRIN tuples from the shared tuplesort, sorted by block number. + * That probably gives us an index that is cheaper to scan, thanks to + * mostly getting data from the same index page as before. + */ + while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL) + { + /* Ranges should be multiples of pages_per_range for the index. */ + Assert(btup->bt_blkno % brinshared->pagesPerRange == 0); + + /* + * Do we need to union summaries for the same page range? + * + * If this is the first brin tuple we read, then just deform it into + * the memtuple, and continue with the next one from tuplesort. We + * however may need to insert empty summaries into the index. + * + * If it's the same block as the last we saw, we simply union the brin + * tuple into it, and we're done - we don't even need to insert empty + * ranges, because that was done earlier when we saw the first brin + * tuple (for this range). + * + * Finally, if it's not the first brin tuple, and it's not the same + * page range, we need to do the insert and then deform the tuple into + * the memtuple. Then we'll insert empty ranges before the new brin + * tuple, if needed. + */ + if (prevblkno == InvalidBlockNumber) + { + /* First brin tuples, just deform into memtuple. */ + memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple); + + /* continue to insert empty pages before thisblock */ + } + else if (memtuple->bt_blkno == btup->bt_blkno) + { + /* + * Not the first brin tuple, but same page range as the previous + * one, so we can merge it into the memtuple. + */ + union_tuples(state->bs_bdesc, memtuple, btup); + continue; + } + else + { + BrinTuple *tmp; + Size len; + + /* + * We got brin tuple for a different page range, so form a brin + * tuple from the memtuple, insert it, and re-init the memtuple + * from the new brin tuple. + */ + tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno, + memtuple, &len); + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len); + + /* + * Reset the per-output-range context. This frees all the memory + * possibly allocated by the union functions, and also the BRIN + * tuple we just formed and inserted. + */ + MemoryContextReset(rangeCxt); + + memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple); + + /* continue to insert empty pages before thisblock */ + } + + /* Fill empty ranges for all ranges missing in the tuplesort. */ + brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno); + + prevblkno = btup->bt_blkno; + } + + tuplesort_end(spool->sortstate); + + /* Fill the BRIN tuple for the last page range with data. */ + if (prevblkno != InvalidBlockNumber) + { + BrinTuple *tmp; + Size len; + + tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno, + memtuple, &len); + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len); + + pfree(tmp); + } + + /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */ + brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart); + + /* + * Switch back to the original memory context, and destroy the one we + * created to isolate the union_tuple calls. + */ + MemoryContextSwitchTo(oldCxt); + MemoryContextDelete(rangeCxt); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) + */ + for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + +cleanup: + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(brinleader->snapshot)) + UnregisterSnapshot(brinleader->snapshot); + DestroyParallelContext(brinleader->pcxt); + ExitParallelMode(); +} + +/* + * Returns size of shared memory required to store state for a parallel + * brin index build based on the snapshot its parallel scan will use. + */ +static Size +_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(sizeof(BrinShared)), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index) +{ + BrinLeader *brinleader = buildstate->bs_leader; + int sortmem; + + /* Allocate memory and initialize private spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = buildstate->bs_spool->heap; + buildstate->bs_spool->index = buildstate->bs_spool->index; + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts; + + /* Perform work common to all participants */ + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared, + brinleader->sharedsort, heap, index, sortmem, true); +} + +/* + * Perform a worker's portion of a parallel sort. + * + * This generates a tuplesort for passed btspool, and a second tuplesort + * state if a second btspool is need (i.e. for unique index builds). All + * other spool fields should already be set when this is called. + * + * sortmem is the amount of working memory to use within each worker, + * expressed in KBs. + * + * When this returns, workers are done, and need only release resources. + */ +static void +_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool, + BrinShared *brinshared, Sharedsort *sharedsort, + Relation heap, Relation index, int sortmem, + bool progress) +{ + SortCoordinate coordinate; + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* Begin "partial" tuplesort */ + brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap, + brinspool->index, + sortmem, coordinate, + TUPLESORT_NONE); + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(index); + indexInfo->ii_Concurrent = brinshared->isconcurrent; + + scan = table_beginscan_parallel(heap, + ParallelTableScanFromBrinShared(brinshared)); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + brinbuildCallbackParallel, state, scan); + + /* insert the last item */ + form_and_spill_tuple(state); + + /* sort the BRIN ranges built by this worker */ + tuplesort_performsort(brinspool->sortstate); + + state->bs_reltuples += reltuples; + + /* + * Done. Record ambuild statistics. + */ + SpinLockAcquire(&brinshared->mutex); + brinshared->nparticipantsdone++; + brinshared->reltuples += state->bs_reltuples; + brinshared->indtuples += state->bs_numtuples; + SpinLockRelease(&brinshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&brinshared->workersdonecv); + + tuplesort_end(brinspool->sortstate); +} + +/* + * Perform work within a launched parallel process. + */ +void +_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinBuildState *buildstate; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + WalUsage *walusage; + BufferUsage *bufferusage; + int sortmem; + + /* + * The only possible status flag that can be set to the parallel worker is + * PROC_IN_SAFE_IC. + */ + Assert((MyProc->statusFlags == 0) || + (MyProc->statusFlags == PROC_IN_SAFE_IC)); + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up brin shared state */ + brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!brinshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = table_open(brinshared->heaprelid, heapLockmode); + indexRel = index_open(brinshared->indexrelid, indexLockmode); + + buildstate = initialize_brin_buildstate(indexRel, NULL, + brinshared->pagesPerRange, + InvalidBlockNumber); + + /* Initialize worker's own spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = heapRel; + buildstate->bs_spool->index = indexRel; + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, + brinshared, sharedsort, + heapRel, indexRel, sortmem, false); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + /* * brin_build_empty_tuple * Maybe initialize a BRIN tuple representing empty range. diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c index a875c5d3d7..9b1a0ac345 100644 --- a/src/backend/access/gin/ginutil.c +++ b/src/backend/access/gin/ginutil.c @@ -54,6 +54,7 @@ ginhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = true; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = true; amroutine->amsummarizing = false; diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 9a1bf8f66c..e052ba8bda 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -76,6 +76,7 @@ gisthandler(PG_FUNCTION_ARGS) amroutine->amclusterable = true; amroutine->ampredlocks = true; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 6443ff21bd..905519692c 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -73,6 +73,7 @@ hashhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = true; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 0930f9b37e..6c8cd93fa0 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -112,6 +112,7 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amclusterable = true; amroutine->ampredlocks = true; amroutine->amcanparallel = true; + amroutine->amcanbuildparallel = true; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 30c00876a5..fd4b615710 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -60,6 +60,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 194a1207be..d78314062e 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/brin.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -145,6 +146,9 @@ static const struct { "_bt_parallel_build_main", _bt_parallel_build_main }, + { + "_brin_parallel_build_main", _brin_parallel_build_main + }, { "parallel_vacuum_main", parallel_vacuum_main } diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index b8c7945322..7b186c0220 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2982,7 +2982,7 @@ index_build(Relation heapRelation, * Note that planner considers parallel safety for us. */ if (parallel && IsNormalProcessingMode() && - indexRelation->rd_rel->relam == BTREE_AM_OID) + indexRelation->rd_indam->amcanbuildparallel) indexInfo->ii_ParallelWorkers = plan_create_index_workers(RelationGetRelid(heapRelation), RelationGetRelid(indexRelation)); diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c index 2cd508e513..90fc605f1c 100644 --- a/src/backend/utils/sort/tuplesortvariants.c +++ b/src/backend/utils/sort/tuplesortvariants.c @@ -19,6 +19,7 @@ #include "postgres.h" +#include "access/brin_tuple.h" #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" @@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups, int count); static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups, int count); +static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, + int count); static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups, int count); static int comparetup_heap(const SortTuple *a, const SortTuple *b, @@ -69,10 +72,16 @@ static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_index_brin(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len); +static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b, @@ -128,6 +137,16 @@ typedef struct uint32 max_buckets; } TuplesortIndexHashArg; +/* + * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase. + */ +typedef struct +{ + TuplesortIndexArg index; + + /* XXX do we need something here? */ +} TuplesortIndexBrinArg; + /* * Data struture pointed by "TuplesortPublic.arg" for the Datum case. * Set by tuplesort_begin_datum and used only by the DatumTuple routines. @@ -140,6 +159,21 @@ typedef struct int datumTypeLen; } TuplesortDatumArg; +/* + * Computing BrinTuple size with only the tuple is difficult, so we want to track + * the length referenced by the SortTuple. That's what BrinSortTuple is meant + * to do - it's essentially a BrinTuple prefixed by its length. + */ +typedef struct BrinSortTuple +{ + Size tuplen; + BrinTuple tuple; +} BrinSortTuple; + +/* Size of the BrinSortTuple, given length of the BrinTuple. */ +#define BRINSORTTUPLE_SIZE(len) (offsetof(BrinSortTuple, tuple) + (len)) + + Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, @@ -527,6 +561,47 @@ tuplesort_begin_index_gist(Relation heapRel, return state; } +Tuplesortstate * +tuplesort_begin_index_brin(Relation heapRel, + Relation indexRel, + int workMem, + SortCoordinate coordinate, + int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext; + TuplesortIndexBrinArg *arg; + + oldcontext = MemoryContextSwitchTo(base->maincontext); + arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg)); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin index sort: workMem = %d, randomAccess = %c", + workMem, + sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + base->nKeys = 1; /* Only one sort column, the block number */ + + base->removeabbrev = removeabbrev_index_brin; + base->comparetup = comparetup_index_brin; + base->writetup = writetup_index_brin; + base->readtup = readtup_index_brin; + base->haveDatum1 = true; + base->arg = arg; + + arg->index.heapRel = heapRel; + arg->index.indexRel = indexRel; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, @@ -707,6 +782,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, !stup.isnull1); } +/* + * Collect one BRIN tuple while collecting input data for sort. + */ +void +tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size) +{ + SortTuple stup; + BrinSortTuple *bstup; + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext); + + /* allocate space for the whole BRIN sort tuple */ + bstup = palloc(BRINSORTTUPLE_SIZE(size)); + + bstup->tuplen = size; + memcpy(&bstup->tuple, tuple, size); + + stup.tuple = bstup; + stup.datum1 = tuple->bt_blkno; + stup.isnull1 = false; + + tuplesort_puttuple_common(state, &stup, + base->sortKeys && + base->sortKeys->abbrev_converter && + !stup.isnull1); + + MemoryContextSwitchTo(oldcontext); +} + /* * Accept one Datum while collecting input data for sort. * @@ -850,6 +954,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward) return (IndexTuple) stup.tuple; } +/* + * Fetch the next BRIN tuple in either forward or back direction. + * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory + * context, and must not be freed by caller. Caller may not rely on tuple + * remaining valid after any further manipulation of tuplesort. + */ +BrinTuple * +tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext); + SortTuple stup; + BrinSortTuple *btup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + stup.tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + if (!stup.tuple) + return NULL; + + btup = (BrinSortTuple *) stup.tuple; + + *len = btup->tuplen; + + return &btup->tuple; +} + /* * Fetch the next Datum in either forward or back direction. * Returns false if no more datums. @@ -1564,6 +1697,80 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } +/* + * Routines specialized for BrinTuple case + */ + +static void +removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count) +{ + int i; + + for (i = 0; i < count; i++) + { + BrinSortTuple *tuple; + + tuple = stups[i].tuple; + stups[i].datum1 = tuple->tuple.bt_blkno; + } +} + +static int +comparetup_index_brin(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + Assert(TuplesortstateGetPublic(state)->haveDatum1); + + if (DatumGetUInt32(a->datum1) > DatumGetUInt32(b->datum1)) + return 1; + + if (DatumGetUInt32(a->datum1) < DatumGetUInt32(b->datum1)) + return -1; + + /* silence compilers */ + return 0; +} + +static void +writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + BrinSortTuple *tuple = (BrinSortTuple *) stup->tuple; + unsigned int tuplen = tuple->tuplen; + + tuplen = tuplen + sizeof(tuplen); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen); + if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); +} + +static void +readtup_index_brin(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len) +{ + BrinSortTuple *tuple; + TuplesortPublic *base = TuplesortstateGetPublic(state); + unsigned int tuplen = len - sizeof(unsigned int); + + /* + * Allocate space for the BRIN sort tuple, which is BrinTuple with an + * extra length field. + */ + tuple = (BrinSortTuple *) tuplesort_readtup_alloc(state, + BRINSORTTUPLE_SIZE(tuplen)); + + tuple->tuplen = tuplen; + + LogicalTapeReadExact(tape, &tuple->tuple, tuplen); + if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); + stup->tuple = (void *) tuple; + + /* set up first-column key value, which is block number */ + stup->datum1 = tuple->tuple.bt_blkno; +} + /* * Routines specialized for DatumTuple case */ diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h index 244459587f..df85ae3aac 100644 --- a/src/include/access/amapi.h +++ b/src/include/access/amapi.h @@ -243,6 +243,8 @@ typedef struct IndexAmRoutine bool ampredlocks; /* does AM support parallel scan? */ bool amcanparallel; + /* does AM support parallel build? */ + bool amcanbuildparallel; /* does AM support columns included with clause INCLUDE? */ bool amcaninclude; /* does AM use maintenance_work_mem? */ diff --git a/src/include/access/brin.h b/src/include/access/brin.h index ed66f1b3d5..3451ecb211 100644 --- a/src/include/access/brin.h +++ b/src/include/access/brin.h @@ -11,6 +11,7 @@ #define BRIN_H #include "nodes/execnodes.h" +#include "storage/shm_toc.h" #include "utils/relcache.h" @@ -52,4 +53,6 @@ typedef struct BrinStatsData extern void brinGetStats(Relation index, BrinStatsData *stats); +extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc); + #endif /* BRIN_H */ diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 9ed2de76cd..357eb35311 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -21,6 +21,7 @@ #ifndef TUPLESORT_H #define TUPLESORT_H +#include "access/brin_tuple.h" #include "access/itup.h" #include "executor/tuptable.h" #include "storage/dsm.h" @@ -282,6 +283,9 @@ typedef struct * The "index_hash" API is similar to index_btree, but the tuples are * actually sorted by their hash codes not the raw data. * + * The "index_brin" API is similar to index_btree, but the tuples are + * BrinTuple and are sorted by their block number not the raw data. + * * Parallel sort callers are required to coordinate multiple tuplesort states * in a leader process and one or more worker processes. The leader process * must launch workers, and have each perform an independent "partial" @@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel, Relation indexRel, int workMem, SortCoordinate coordinate, int sortopt); +extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel, + Relation indexRel, + int workMem, SortCoordinate coordinate, + int sortopt); extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, @@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup); extern void tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ItemPointer self, const Datum *values, const bool *isnull); +extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len); extern void tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull); @@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev); extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward); extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward); +extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len, + bool forward); extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy, Datum *val, bool *isNull, Datum *abbrev); diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c index cbdae7ab7a..eaa0c483b7 100644 --- a/src/test/modules/dummy_index_am/dummy_index_am.c +++ b/src/test/modules/dummy_index_am/dummy_index_am.c @@ -294,6 +294,7 @@ dihandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = false; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0a324aa4e7..1053f676c3 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -297,13 +297,17 @@ BpChar BrinBuildState BrinDesc BrinInsertState +BrinLeader BrinMemTuple BrinMetaPageData BrinOpaque BrinOpcInfo BrinOptions BrinRevmap +BrinShared +BrinSortTuple BrinSpecialSpace +BrinSpool BrinStatsData BrinTuple BrinValues @@ -2883,6 +2887,7 @@ TupleTableSlotOps TuplesortClusterArg TuplesortDatumArg TuplesortIndexArg +TuplesortIndexBrinArg TuplesortIndexBTreeArg TuplesortIndexHashArg TuplesortInstrumentation