diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c index bfee244aa1..d231e5331f 100644 --- a/contrib/bloom/blinsert.c +++ b/contrib/bloom/blinsert.c @@ -135,7 +135,8 @@ blbuild(Relation heap, Relation index, IndexInfo *indexInfo) /* Do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, - bloomBuildCallback, (void *) &buildstate); + bloomBuildCallback, (void *) &buildstate, + NULL); /* * There are could be some items in cached page. Flush this page if diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index f951ddb41e..c45979dee4 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2022,7 +2022,8 @@ include_dir 'conf.d' When changing this value, consider also adjusting - and + , + , and . @@ -2070,6 +2071,44 @@ include_dir 'conf.d' + + max_parallel_maintenance_workers (integer) + + max_parallel_maintenance_workers configuration parameter + + + + + Sets the maximum number of parallel workers that can be + started by a single utility command. Currently, the only + parallel utility command that supports the use of parallel + workers is CREATE INDEX, and only when + building a B-tree index. Parallel workers are taken from the + pool of processes established by , limited by . Note that the requested + number of workers may not actually be available at runtime. + If this occurs, the utility operation will run with fewer + workers than expected. The default value is 2. Setting this + value to 0 disables the use of parallel workers by utility + commands. + + + + Note that parallel utility commands should not consume + substantially more memory than equivalent non-parallel + operations. This strategy differs from that of parallel + query, where resource limits generally apply per worker + process. Parallel utility commands treat the resource limit + maintenance_work_mem as a limit to be applied to + the entire utility command, regardless of the number of + parallel worker processes. However, parallel utility + commands may still consume substantially more CPU resources + and I/O bandwidth. + + + + max_parallel_workers (integer) @@ -2079,8 +2118,9 @@ include_dir 'conf.d' Sets the maximum number of workers that the system can support for - parallel queries. The default value is 8. When increasing or + parallel operations. The default value is 8. When increasing or decreasing this value, consider also adjusting + and . Also, note that a setting for this value which is higher than will have no effect, diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 8a9793644f..e138d1ef07 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1263,7 +1263,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting in an extension. - IPC + IPC BgWorkerShutdown Waiting for background worker to shut down. @@ -1371,6 +1371,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ParallelBitmapScan Waiting for parallel bitmap scan to become initialized. + + ParallelCreateIndexScan + Waiting for parallel CREATE INDEX workers to finish heap scan. + ProcArrayGroupUpdate Waiting for group leader to clear transaction id at transaction end. @@ -3900,13 +3904,15 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, sort-start - (int, bool, int, int, bool) + (int, bool, int, int, bool, int) Probe that fires when a sort operation is started. arg0 indicates heap, index or datum sort. arg1 is true for unique-value enforcement. arg2 is the number of key columns. arg3 is the number of kilobytes of work memory allowed. - arg4 is true if random access to the sort result is required. + arg4 is true if random access to the sort result is required. + arg5 indicates serial when 0, parallel worker when + 1, or parallel leader when 2. sort-done diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml index 5137fe6383..f464557de8 100644 --- a/doc/src/sgml/ref/create_index.sgml +++ b/doc/src/sgml/ref/create_index.sgml @@ -599,6 +599,64 @@ Indexes: which would drive the machine into swapping. + + PostgreSQL can build indexes while + leveraging multiple CPUs in order to process the table rows faster. + This feature is known as parallel index + build. For index methods that support building indexes + in parallel (currently, only B-tree), + maintenance_work_mem specifies the maximum + amount of memory that can be used by each index build operation as + a whole, regardless of how many worker processes were started. + Generally, a cost model automatically determines how many worker + processes should be requested, if any. + + + + Parallel index builds may benefit from increasing + maintenance_work_mem where an equivalent serial + index build will see little or no benefit. Note that + maintenance_work_mem may influence the number of + worker processes requested, since parallel workers must have at + least a 32MB share of the total + maintenance_work_mem budget. There must also be + a remaining 32MB share for the leader process. + Increasing + may allow more workers to be used, which will reduce the time + needed for index creation, so long as the index build is not + already I/O bound. Of course, there should also be sufficient + CPU capacity that would otherwise lie idle. + + + + Setting a value for parallel_workers via directly controls how many parallel + worker processes will be requested by a CREATE + INDEX against the table. This bypasses the cost model + completely, and prevents maintenance_work_mem + from affecting how many parallel workers are requested. Setting + parallel_workers to 0 via ALTER + TABLE will disable parallel index builds on the table in + all cases. + + + + + You might want to reset parallel_workers after + setting it as part of tuning an index build. This avoids + inadvertent changes to query plans, since + parallel_workers affects + all parallel table scans. + + + + + While CREATE INDEX with the + CONCURRENTLY option supports parallel builds + without special restrictions, only the first table scan is actually + performed in parallel. + + Use to remove an index. diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index a0c9a6d257..d2df40d543 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -1228,8 +1228,8 @@ WITH ( MODULUS numeric_literal, REM This sets the number of workers that should be used to assist a parallel scan of this table. If not set, the system will determine a value based on the relation size. The actual number of workers chosen by the planner - may be less, for example due to - the setting of . + or by utility statements that use parallel scans may be less, for example + due to the setting of . diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 5027872267..68b3371665 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -706,7 +706,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) * heap blocks in physical order. */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, false, - brinbuildCallback, (void *) state); + brinbuildCallback, (void *) state, NULL); /* process the final batch */ form_and_insert_tuple(state); @@ -1205,7 +1205,7 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel, state->bs_currRangeStart = heapBlk; IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true, heapBlk, scanNumBlks, - brinbuildCallback, (void *) state); + brinbuildCallback, (void *) state, NULL); /* * Now we update the values obtained by the scan with the placeholder diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 473cc3d6b3..23f7285547 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -391,7 +391,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) * prefers to receive tuples in TID order. */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, false, - ginBuildCallback, (void *) &buildstate); + ginBuildCallback, (void *) &buildstate, NULL); /* dump remaining entries to the index */ oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c index d22318a5f1..434f15f014 100644 --- a/src/backend/access/gist/gistbuild.c +++ b/src/backend/access/gist/gistbuild.c @@ -203,7 +203,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo) * Do the heap scan. */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, - gistBuildCallback, (void *) &buildstate); + gistBuildCallback, (void *) &buildstate, NULL); /* * If buffering was used, flush out all the tuples that are still in the diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 718e2be1cd..e337439ada 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -159,7 +159,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) /* do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, - hashbuildCallback, (void *) &buildstate); + hashbuildCallback, (void *) &buildstate, NULL); if (buildstate.spool) { diff --git a/src/backend/access/hash/hashsort.c b/src/backend/access/hash/hashsort.c index 7d3790a473..b70964f429 100644 --- a/src/backend/access/hash/hashsort.c +++ b/src/backend/access/hash/hashsort.c @@ -82,6 +82,7 @@ _h_spoolinit(Relation heap, Relation index, uint32 num_buckets) hspool->low_mask, hspool->max_buckets, maintenance_work_mem, + NULL, false); return hspool; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index be263850cd..8a846e7dba 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1627,7 +1627,16 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, SpinLockInit(&target->phs_mutex); target->phs_startblock = InvalidBlockNumber; pg_atomic_init_u64(&target->phs_nallocated, 0); - SerializeSnapshot(snapshot, target->phs_snapshot_data); + if (IsMVCCSnapshot(snapshot)) + { + SerializeSnapshot(snapshot, target->phs_snapshot_data); + target->phs_snapshot_any = false; + } + else + { + Assert(snapshot == SnapshotAny); + target->phs_snapshot_any = true; + } } /* ---------------- @@ -1655,11 +1664,22 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) Snapshot snapshot; Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); - snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); - RegisterSnapshot(snapshot); + + if (!parallel_scan->phs_snapshot_any) + { + /* Snapshot was serialized -- restore it */ + snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); + RegisterSnapshot(snapshot); + } + else + { + /* SnapshotAny passed by caller (not serialized) */ + snapshot = SnapshotAny; + } return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, - true, true, true, false, false, true); + true, true, true, false, false, + !parallel_scan->phs_snapshot_any); } /* ---------------- diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index a344c4490e..8158508d8c 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -21,36 +21,19 @@ #include "access/nbtree.h" #include "access/relscan.h" #include "access/xlog.h" -#include "catalog/index.h" #include "commands/vacuum.h" +#include "nodes/execnodes.h" #include "pgstat.h" #include "storage/condition_variable.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/smgr.h" -#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/builtins.h" #include "utils/index_selfuncs.h" #include "utils/memutils.h" -/* Working state for btbuild and its callback */ -typedef struct -{ - bool isUnique; - bool haveDead; - Relation heapRel; - BTSpool *spool; - - /* - * spool2 is needed only when the index is a unique index. Dead tuples are - * put into spool2 instead of spool in order to avoid uniqueness check. - */ - BTSpool *spool2; - double indtuples; -} BTBuildState; - /* Working state needed by btvacuumpage */ typedef struct { @@ -104,12 +87,6 @@ typedef struct BTParallelScanDescData typedef struct BTParallelScanDescData *BTParallelScanDesc; -static void btbuildCallback(Relation index, - HeapTuple htup, - Datum *values, - bool *isnull, - bool tupleIsAlive, - void *state); static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state, BTCycleId cycleid); @@ -166,115 +143,6 @@ bthandler(PG_FUNCTION_ARGS) PG_RETURN_POINTER(amroutine); } -/* - * btbuild() -- build a new btree index. - */ -IndexBuildResult * -btbuild(Relation heap, Relation index, IndexInfo *indexInfo) -{ - IndexBuildResult *result; - double reltuples; - BTBuildState buildstate; - - buildstate.isUnique = indexInfo->ii_Unique; - buildstate.haveDead = false; - buildstate.heapRel = heap; - buildstate.spool = NULL; - buildstate.spool2 = NULL; - buildstate.indtuples = 0; - -#ifdef BTREE_BUILD_STATS - if (log_btree_build_stats) - ResetUsage(); -#endif /* BTREE_BUILD_STATS */ - - /* - * We expect to be called exactly once for any index relation. If that's - * not the case, big trouble's what we have. - */ - if (RelationGetNumberOfBlocks(index) != 0) - elog(ERROR, "index \"%s\" already contains data", - RelationGetRelationName(index)); - - buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false); - - /* - * If building a unique index, put dead tuples in a second spool to keep - * them out of the uniqueness check. - */ - if (indexInfo->ii_Unique) - buildstate.spool2 = _bt_spoolinit(heap, index, false, true); - - /* do the heap scan */ - reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, - btbuildCallback, (void *) &buildstate); - - /* okay, all heap tuples are indexed */ - if (buildstate.spool2 && !buildstate.haveDead) - { - /* spool2 turns out to be unnecessary */ - _bt_spooldestroy(buildstate.spool2); - buildstate.spool2 = NULL; - } - - /* - * Finish the build by (1) completing the sort of the spool file, (2) - * inserting the sorted tuples into btree pages and (3) building the upper - * levels. - */ - _bt_leafbuild(buildstate.spool, buildstate.spool2); - _bt_spooldestroy(buildstate.spool); - if (buildstate.spool2) - _bt_spooldestroy(buildstate.spool2); - -#ifdef BTREE_BUILD_STATS - if (log_btree_build_stats) - { - ShowUsage("BTREE BUILD STATS"); - ResetUsage(); - } -#endif /* BTREE_BUILD_STATS */ - - /* - * Return statistics - */ - result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); - - result->heap_tuples = reltuples; - result->index_tuples = buildstate.indtuples; - - return result; -} - -/* - * Per-tuple callback from IndexBuildHeapScan - */ -static void -btbuildCallback(Relation index, - HeapTuple htup, - Datum *values, - bool *isnull, - bool tupleIsAlive, - void *state) -{ - BTBuildState *buildstate = (BTBuildState *) state; - - /* - * insert the index tuple into the appropriate spool file for subsequent - * processing - */ - if (tupleIsAlive || buildstate->spool2 == NULL) - _bt_spool(buildstate->spool, &htup->t_self, values, isnull); - else - { - /* dead tuples are put into spool2 */ - buildstate->haveDead = true; - _bt_spool(buildstate->spool2, &htup->t_self, values, isnull); - } - - buildstate->indtuples += 1; -} - /* * btbuildempty() -- build an empty btree index in the initialization fork */ diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index f6159db1cd..521ae6e5f7 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -67,28 +67,168 @@ #include "postgres.h" #include "access/nbtree.h" +#include "access/parallel.h" +#include "access/relscan.h" +#include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" +#include "catalog/index.h" #include "miscadmin.h" +#include "pgstat.h" #include "storage/smgr.h" -#include "tcop/tcopprot.h" +#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/rel.h" #include "utils/sortsupport.h" #include "utils/tuplesort.h" +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_BTREE_SHARED UINT64CONST(0xA000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003) + +/* + * DISABLE_LEADER_PARTICIPATION disables the leader's participation in + * parallel index builds. This may be useful as a debugging aid. +#undef DISABLE_LEADER_PARTICIPATION + */ + /* * Status record for spooling/sorting phase. (Note we may have two of * these due to the special requirements for uniqueness-checking with * dead tuples.) */ -struct BTSpool +typedef struct BTSpool { Tuplesortstate *sortstate; /* state data for tuplesort.c */ Relation heap; Relation index; bool isunique; -}; +} BTSpool; + +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. Note that there is a separate tuplesort TOC + * entry, private to tuplesort.c but allocated by this module on its behalf. + */ +typedef struct BTShared +{ + /* + * These fields are not modified during the sort. They primarily exist + * for the benefit of worker processes that need to create BTSpool state + * corresponding to that used by the leader. + */ + Oid heaprelid; + Oid indexrelid; + bool isunique; + bool isconcurrent; + int scantuplesortstates; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can use + * mutable state that workers maintain during scan (and before leader can + * proceed to tuplesort_performsort()). + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all fields before heapdesc. + * + * These fields contain status information of interest to B-Tree index + * builds that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of parallel scan. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * havedead indicates if RECENTLY_DEAD tuples were encountered during + * build. + * + * indtuples is the total number of tuples that made it into the index. + * + * brokenhotchain indicates if any worker detected a broken HOT chain + * during build. + */ + int nparticipantsdone; + double reltuples; + bool havedead; + double indtuples; + bool brokenhotchain; + + /* + * This variable-sized field must come last. + * + * See _bt_parallel_estimate_shared(). + */ + ParallelHeapScanDescData heapdesc; +} BTShared; + +/* + * Status for leader in parallel index build. + */ +typedef struct BTLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipanttuplesorts is the exact number of worker processes + * successfully launched, plus one leader process if it participates as a + * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader + * participating as a worker). + */ + int nparticipanttuplesorts; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * btshared is the shared state for entire build. sharedsort is the + * shared, tuplesort-managed state passed to each process tuplesort. + * sharedsort2 is the corresponding btspool2 shared state, used only when + * building unique indexes. snapshot is the snapshot used by the scan iff + * an MVCC snapshot is required. + */ + BTShared *btshared; + Sharedsort *sharedsort; + Sharedsort *sharedsort2; + Snapshot snapshot; +} BTLeader; + +/* + * Working state for btbuild and its callback. + * + * When parallel CREATE INDEX is used, there is a BTBuildState for each + * participant. + */ +typedef struct BTBuildState +{ + bool isunique; + bool havedead; + Relation heap; + BTSpool *spool; + + /* + * spool2 is needed only when the index is a unique index. Dead tuples are + * put into spool2 instead of spool in order to avoid uniqueness check. + */ + BTSpool *spool2; + double indtuples; + + /* + * btleader is only present when a parallel index build is performed, and + * only in the leader process. (Actually, only the leader has a + * BTBuildState. Workers have their own spool and spool2, though.) + */ + BTLeader *btleader; +} BTBuildState; /* * Status record for a btree page being built. We have one of these @@ -128,6 +268,14 @@ typedef struct BTWriteState } BTWriteState; +static double _bt_spools_heapscan(Relation heap, Relation index, + BTBuildState *buildstate, IndexInfo *indexInfo); +static void _bt_spooldestroy(BTSpool *btspool); +static void _bt_spool(BTSpool *btspool, ItemPointer self, + Datum *values, bool *isnull); +static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2); +static void _bt_build_callback(Relation index, HeapTuple htup, Datum *values, + bool *isnull, bool tupleIsAlive, void *state); static Page _bt_blnewpage(uint32 level); static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level); static void _bt_slideleft(Page page); @@ -138,45 +286,219 @@ static void _bt_buildadd(BTWriteState *wstate, BTPageState *state, static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state); static void _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2); +static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, + int request); +static void _bt_end_parallel(BTLeader *btleader); +static Size _bt_parallel_estimate_shared(Snapshot snapshot); +static double _bt_parallel_heapscan(BTBuildState *buildstate, + bool *brokenhotchain); +static void _bt_leader_participate_as_worker(BTBuildState *buildstate); +static void _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, + BTShared *btshared, Sharedsort *sharedsort, + Sharedsort *sharedsort2, int sortmem); /* - * Interface routines + * btbuild() -- build a new btree index. */ +IndexBuildResult * +btbuild(Relation heap, Relation index, IndexInfo *indexInfo) +{ + IndexBuildResult *result; + BTBuildState buildstate; + double reltuples; +#ifdef BTREE_BUILD_STATS + if (log_btree_build_stats) + ResetUsage(); +#endif /* BTREE_BUILD_STATS */ + + buildstate.isunique = indexInfo->ii_Unique; + buildstate.havedead = false; + buildstate.heap = heap; + buildstate.spool = NULL; + buildstate.spool2 = NULL; + buildstate.indtuples = 0; + buildstate.btleader = NULL; + + /* + * We expect to be called exactly once for any index relation. If that's + * not the case, big trouble's what we have. + */ + if (RelationGetNumberOfBlocks(index) != 0) + elog(ERROR, "index \"%s\" already contains data", + RelationGetRelationName(index)); + + reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo); + + /* + * Finish the build by (1) completing the sort of the spool file, (2) + * inserting the sorted tuples into btree pages and (3) building the upper + * levels. Finally, it may also be necessary to end use of parallelism. + */ + _bt_leafbuild(buildstate.spool, buildstate.spool2); + _bt_spooldestroy(buildstate.spool); + if (buildstate.spool2) + _bt_spooldestroy(buildstate.spool2); + if (buildstate.btleader) + _bt_end_parallel(buildstate.btleader); + + result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); + + result->heap_tuples = reltuples; + result->index_tuples = buildstate.indtuples; + +#ifdef BTREE_BUILD_STATS + if (log_btree_build_stats) + { + ShowUsage("BTREE BUILD STATS"); + ResetUsage(); + } +#endif /* BTREE_BUILD_STATS */ + + return result; +} /* - * create and initialize a spool structure + * Create and initialize one or two spool structures, and save them in caller's + * buildstate argument. May also fill-in fields within indexInfo used by index + * builds. + * + * Scans the heap, possibly in parallel, filling spools with IndexTuples. This + * routine encapsulates all aspects of managing parallelism. Caller need only + * call _bt_end_parallel() in parallel case after it is done with spool/spool2. + * + * Returns the total number of heap tuples scanned. */ -BTSpool * -_bt_spoolinit(Relation heap, Relation index, bool isunique, bool isdead) +static double +_bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate, + IndexInfo *indexInfo) { BTSpool *btspool = (BTSpool *) palloc0(sizeof(BTSpool)); - int btKbytes; - - btspool->heap = heap; - btspool->index = index; - btspool->isunique = isunique; + SortCoordinate coordinate = NULL; + double reltuples = 0; /* * We size the sort area as maintenance_work_mem rather than work_mem to * speed index creation. This should be OK since a single backend can't - * run multiple index creations in parallel. Note that creation of a - * unique index actually requires two BTSpool objects. We expect that the - * second one (for dead tuples) won't get very full, so we give it only - * work_mem. + * run multiple index creations in parallel (see also: notes on + * parallelism and maintenance_work_mem below). */ - btKbytes = isdead ? work_mem : maintenance_work_mem; - btspool->sortstate = tuplesort_begin_index_btree(heap, index, isunique, - btKbytes, false); + btspool->heap = heap; + btspool->index = index; + btspool->isunique = indexInfo->ii_Unique; - return btspool; + /* Save as primary spool */ + buildstate->spool = btspool; + + /* Attempt to launch parallel worker scan when required */ + if (indexInfo->ii_ParallelWorkers > 0) + _bt_begin_parallel(buildstate, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); + + /* + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state + */ + if (buildstate->btleader) + { + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = + buildstate->btleader->nparticipanttuplesorts; + coordinate->sharedsort = buildstate->btleader->sharedsort; + } + + /* + * Begin serial/leader tuplesort. + * + * In cases where parallelism is involved, the leader receives the same + * share of maintenance_work_mem as a serial sort (it is generally treated + * in the same way as a serial sort once we return). Parallel worker + * Tuplesortstates will have received only a fraction of + * maintenance_work_mem, though. + * + * We rely on the lifetime of the Leader Tuplesortstate almost not + * overlapping with any worker Tuplesortstate's lifetime. There may be + * some small overlap, but that's okay because we rely on leader + * Tuplesortstate only allocating a small, fixed amount of memory here. + * When its tuplesort_performsort() is called (by our caller), and + * significant amounts of memory are likely to be used, all workers must + * have already freed almost all memory held by their Tuplesortstates + * (they are about to go away completely, too). The overall effect is + * that maintenance_work_mem always represents an absolute high watermark + * on the amount of memory used by a CREATE INDEX operation, regardless of + * the use of parallelism or any other factor. + */ + buildstate->spool->sortstate = + tuplesort_begin_index_btree(heap, index, buildstate->isunique, + maintenance_work_mem, coordinate, + false); + + /* + * If building a unique index, put dead tuples in a second spool to keep + * them out of the uniqueness check. We expect that the second spool (for + * dead tuples) won't get very full, so we give it only work_mem. + */ + if (indexInfo->ii_Unique) + { + BTSpool *btspool2 = (BTSpool *) palloc0(sizeof(BTSpool)); + SortCoordinate coordinate2 = NULL; + + /* Initialize secondary spool */ + btspool2->heap = heap; + btspool2->index = index; + btspool2->isunique = false; + /* Save as secondary spool */ + buildstate->spool2 = btspool2; + + if (buildstate->btleader) + { + /* + * Set up non-private state that is passed to + * tuplesort_begin_index_btree() about the basic high level + * coordination of a parallel sort. + */ + coordinate2 = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate2->isWorker = false; + coordinate2->nParticipants = + buildstate->btleader->nparticipanttuplesorts; + coordinate2->sharedsort = buildstate->btleader->sharedsort2; + } + + /* + * We expect that the second one (for dead tuples) won't get very + * full, so we give it only work_mem + */ + buildstate->spool2->sortstate = + tuplesort_begin_index_btree(heap, index, false, work_mem, + coordinate2, false); + } + + /* Fill spool using either serial or parallel heap scan */ + if (!buildstate->btleader) + reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, + _bt_build_callback, (void *) buildstate, + NULL); + else + reltuples = _bt_parallel_heapscan(buildstate, + &indexInfo->ii_BrokenHotChain); + + /* okay, all heap tuples are spooled */ + if (buildstate->spool2 && !buildstate->havedead) + { + /* spool2 turns out to be unnecessary */ + _bt_spooldestroy(buildstate->spool2); + buildstate->spool2 = NULL; + } + + return reltuples; } /* * clean up a spool structure and its substructures. */ -void +static void _bt_spooldestroy(BTSpool *btspool) { tuplesort_end(btspool->sortstate); @@ -186,7 +508,7 @@ _bt_spooldestroy(BTSpool *btspool) /* * spool an index entry into the sort file. */ -void +static void _bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull) { tuplesort_putindextuplevalues(btspool->sortstate, btspool->index, @@ -197,7 +519,7 @@ _bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull) * given a spool loaded by successive calls to _bt_spool, * create an entire btree. */ -void +static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2) { BTWriteState wstate; @@ -231,11 +553,34 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2) _bt_load(&wstate, btspool, btspool2); } - /* - * Internal routines. + * Per-tuple callback from IndexBuildHeapScan */ +static void +_bt_build_callback(Relation index, + HeapTuple htup, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *state) +{ + BTBuildState *buildstate = (BTBuildState *) state; + /* + * insert the index tuple into the appropriate spool file for subsequent + * processing + */ + if (tupleIsAlive || buildstate->spool2 == NULL) + _bt_spool(buildstate->spool, &htup->t_self, values, isnull); + else + { + /* dead tuples are put into spool2 */ + buildstate->havedead = true; + _bt_spool(buildstate->spool2, &htup->t_self, values, isnull); + } + + buildstate->indtuples += 1; +} /* * allocate workspace for a new, clean btree page, not linked to any siblings. @@ -819,3 +1164,488 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) smgrimmedsync(wstate->index->rd_smgr, MAIN_FORKNUM); } } + +/* + * Create parallel context, and launch workers for leader. + * + * buildstate argument should be initialized (with the exception of the + * tuplesort state in spools, which may later be created based on shared + * state initially set up here). + * + * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY. + * + * request is the target number of parallel worker processes to launch. + * + * Sets buildstate's BTLeader, which caller must use to shut down parallel + * mode by passing it to _bt_end_parallel() at the very end of its index + * build. If not even a single worker process can be launched, this is + * never set, and caller should proceed with a serial index build. + */ +static void +_bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estbtshared; + Size estsort; + BTShared *btshared; + Sharedsort *sharedsort; + Sharedsort *sharedsort2; + BTSpool *btspool = buildstate->spool; + BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader)); + bool leaderparticipates = true; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of btree + * index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main", + request, true); + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for at least two keys -- our own + * PARALLEL_KEY_BTREE_SHARED workspace, and PARALLEL_KEY_TUPLESORT + * tuplesort workspace + */ + estbtshared = _bt_parallel_estimate_shared(snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estbtshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + + /* + * Unique case requires a second spool, and so we may have to account for + * a third shared workspace -- PARALLEL_KEY_TUPLESORT_SPOOL2 + */ + if (!btspool->isunique) + shm_toc_estimate_keys(&pcxt->estimator, 2); + else + { + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + shm_toc_estimate_keys(&pcxt->estimator, 3); + } + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* Store shared build state, for which we reserved space */ + btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estbtshared); + /* Initialize immutable state */ + btshared->heaprelid = RelationGetRelid(btspool->heap); + btshared->indexrelid = RelationGetRelid(btspool->index); + btshared->isunique = btspool->isunique; + btshared->isconcurrent = isconcurrent; + btshared->scantuplesortstates = scantuplesortstates; + ConditionVariableInit(&btshared->workersdonecv); + SpinLockInit(&btshared->mutex); + /* Initialize mutable state */ + btshared->nparticipantsdone = 0; + btshared->reltuples = 0.0; + btshared->havedead = false; + btshared->indtuples = 0.0; + btshared->brokenhotchain = false; + heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BTREE_SHARED, btshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + + /* Unique case requires a second spool, and associated shared state */ + if (!btspool->isunique) + sharedsort2 = NULL; + else + { + /* + * Store additional shared tuplesort-private state, for which we + * reserved space. Then, initialize opaque state using tuplesort + * routine. + */ + sharedsort2 = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort2, scantuplesortstates, + pcxt->seg); + + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT_SPOOL2, sharedsort2); + } + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + btleader->pcxt = pcxt; + btleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + btleader->nparticipanttuplesorts++; + btleader->btshared = btshared; + btleader->sharedsort = sharedsort; + btleader->sharedsort2 = sharedsort2; + btleader->snapshot = snapshot; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _bt_end_parallel(btleader); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->btleader = btleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _bt_leader_participate_as_worker(buildstate); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_bt_end_parallel(BTLeader *btleader) +{ + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(btleader->pcxt); + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(btleader->snapshot)) + UnregisterSnapshot(btleader->snapshot); + DestroyParallelContext(btleader->pcxt); + ExitParallelMode(); +} + +/* + * Returns size of shared memory required to store state for a parallel + * btree index build based on the snapshot its parallel scan will use. + */ +static Size +_bt_parallel_estimate_shared(Snapshot snapshot) +{ + if (!IsMVCCSnapshot(snapshot)) + { + Assert(snapshot == SnapshotAny); + return sizeof(BTShared); + } + + return add_size(offsetof(BTShared, heapdesc) + + offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +} + +/* + * Within leader, wait for end of heap scan. + * + * When called, parallel heap scan started by _bt_begin_parallel() will + * already be underway within worker processes (when leader participates + * as a worker, we should end up here just as workers are finishing). + * + * Fills in fields needed for ambuild statistics, and lets caller set + * field indicating that some worker encountered a broken HOT chain. + * + * Returns the total number of heap tuples scanned. + */ +static double +_bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain) +{ + BTShared *btshared = buildstate->btleader->btshared; + int nparticipanttuplesorts; + double reltuples; + + nparticipanttuplesorts = buildstate->btleader->nparticipanttuplesorts; + for (;;) + { + SpinLockAcquire(&btshared->mutex); + if (btshared->nparticipantsdone == nparticipanttuplesorts) + { + buildstate->havedead = btshared->havedead; + buildstate->indtuples = btshared->indtuples; + *brokenhotchain = btshared->brokenhotchain; + reltuples = btshared->reltuples; + SpinLockRelease(&btshared->mutex); + break; + } + SpinLockRelease(&btshared->mutex); + + ConditionVariableSleep(&btshared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); + + return reltuples; +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_bt_leader_participate_as_worker(BTBuildState *buildstate) +{ + BTLeader *btleader = buildstate->btleader; + BTSpool *leaderworker; + BTSpool *leaderworker2; + int sortmem; + + /* Allocate memory and initialize private spool */ + leaderworker = (BTSpool *) palloc0(sizeof(BTSpool)); + leaderworker->heap = buildstate->spool->heap; + leaderworker->index = buildstate->spool->index; + leaderworker->isunique = buildstate->spool->isunique; + + /* Initialize second spool, if required */ + if (!btleader->btshared->isunique) + leaderworker2 = NULL; + else + { + /* Allocate memory for worker's own private secondary spool */ + leaderworker2 = (BTSpool *) palloc0(sizeof(BTSpool)); + + /* Initialize worker's own secondary spool */ + leaderworker2->heap = leaderworker->heap; + leaderworker2->index = leaderworker->index; + leaderworker2->isunique = false; + } + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / btleader->nparticipanttuplesorts; + + /* Perform work common to all participants */ + _bt_parallel_scan_and_sort(leaderworker, leaderworker2, btleader->btshared, + btleader->sharedsort, btleader->sharedsort2, + sortmem); + +#ifdef BTREE_BUILD_STATS + if (log_btree_build_stats) + { + ShowUsage("BTREE BUILD (Leader Partial Spool) STATISTICS"); + ResetUsage(); + } +#endif /* BTREE_BUILD_STATS */ +} + +/* + * Perform work within a launched parallel process. + */ +void +_bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + BTSpool *btspool; + BTSpool *btspool2; + BTShared *btshared; + Sharedsort *sharedsort; + Sharedsort *sharedsort2; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + int sortmem; + +#ifdef BTREE_BUILD_STATS + if (log_btree_build_stats) + ResetUsage(); +#endif /* BTREE_BUILD_STATS */ + + /* Look up shared state */ + btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!btshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = heap_open(btshared->heaprelid, heapLockmode); + indexRel = index_open(btshared->indexrelid, indexLockmode); + + /* Initialize worker's own spool */ + btspool = (BTSpool *) palloc0(sizeof(BTSpool)); + btspool->heap = heapRel; + btspool->index = indexRel; + btspool->isunique = btshared->isunique; + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + if (!btshared->isunique) + { + btspool2 = NULL; + sharedsort2 = NULL; + } + else + { + /* Allocate memory for worker's own private secondary spool */ + btspool2 = (BTSpool *) palloc0(sizeof(BTSpool)); + + /* Initialize worker's own secondary spool */ + btspool2->heap = btspool->heap; + btspool2->index = btspool->index; + btspool2->isunique = false; + /* Look up shared state private to tuplesort.c */ + sharedsort2 = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT_SPOOL2, false); + tuplesort_attach_shared(sharedsort2, seg); + } + + /* Perform sorting of spool, and possibly a spool2 */ + sortmem = maintenance_work_mem / btshared->scantuplesortstates; + _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort, + sharedsort2, sortmem); + +#ifdef BTREE_BUILD_STATS + if (log_btree_build_stats) + { + ShowUsage("BTREE BUILD (Worker Partial Spool) STATISTICS"); + ResetUsage(); + } +#endif /* BTREE_BUILD_STATS */ + + index_close(indexRel, indexLockmode); + heap_close(heapRel, heapLockmode); +} + +/* + * Perform a worker's portion of a parallel sort. + * + * This generates a tuplesort for passed btspool, and a second tuplesort + * state if a second btspool is need (i.e. for unique index builds). All + * other spool fields should already be set when this is called. + * + * sortmem is the amount of working memory to use within each worker, + * expressed in KBs. + * + * When this returns, workers are done, and need only release resources. + */ +static void +_bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, + BTShared *btshared, Sharedsort *sharedsort, + Sharedsort *sharedsort2, int sortmem) +{ + SortCoordinate coordinate; + BTBuildState buildstate; + HeapScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* Begin "partial" tuplesort */ + btspool->sortstate = tuplesort_begin_index_btree(btspool->heap, + btspool->index, + btspool->isunique, + sortmem, coordinate, + false); + + /* + * Just as with serial case, there may be a second spool. If so, a + * second, dedicated spool2 partial tuplesort is required. + */ + if (btspool2) + { + SortCoordinate coordinate2; + + /* + * We expect that the second one (for dead tuples) won't get very + * full, so we give it only work_mem (unless sortmem is less for + * worker). Worker processes are generally permitted to allocate + * work_mem independently. + */ + coordinate2 = palloc0(sizeof(SortCoordinateData)); + coordinate2->isWorker = true; + coordinate2->nParticipants = -1; + coordinate2->sharedsort = sharedsort2; + btspool2->sortstate = + tuplesort_begin_index_btree(btspool->heap, btspool->index, false, + Min(sortmem, work_mem), coordinate2, + false); + } + + /* Fill in buildstate for _bt_build_callback() */ + buildstate.isunique = btshared->isunique; + buildstate.havedead = false; + buildstate.heap = btspool->heap; + buildstate.spool = btspool; + buildstate.spool2 = btspool2; + buildstate.indtuples = 0; + buildstate.btleader = NULL; + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(btspool->index); + indexInfo->ii_Concurrent = btshared->isconcurrent; + scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc); + reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo, + true, _bt_build_callback, + (void *) &buildstate, scan); + + /* + * Execute this worker's part of the sort. + * + * Unlike leader and serial cases, we cannot avoid calling + * tuplesort_performsort() for spool2 if it ends up containing no dead + * tuples (this is disallowed for workers by tuplesort). + */ + tuplesort_performsort(btspool->sortstate); + if (btspool2) + tuplesort_performsort(btspool2->sortstate); + + /* + * Done. Record ambuild statistics, and whether we encountered a broken + * HOT chain. + */ + SpinLockAcquire(&btshared->mutex); + btshared->nparticipantsdone++; + btshared->reltuples += reltuples; + if (buildstate.havedead) + btshared->havedead = true; + btshared->indtuples += buildstate.indtuples; + if (indexInfo->ii_BrokenHotChain) + btshared->brokenhotchain = true; + SpinLockRelease(&btshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&btshared->workersdonecv); + + /* We can end tuplesorts immediately */ + tuplesort_end(btspool->sortstate); + if (btspool2) + tuplesort_end(btspool2->sortstate); +} diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c index d2aec6df3e..34d9b48f15 100644 --- a/src/backend/access/spgist/spginsert.c +++ b/src/backend/access/spgist/spginsert.c @@ -138,7 +138,8 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo) ALLOCSET_DEFAULT_SIZES); reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, - spgistBuildCallback, (void *) &buildstate); + spgistBuildCallback, (void *) &buildstate, + NULL); MemoryContextDelete(buildstate.tmpCtx); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 5b45b07e7c..a325933940 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" #include "access/xact.h" @@ -129,6 +130,9 @@ static const struct { { "ParallelQueryMain", ParallelQueryMain + }, + { + "_bt_parallel_build_main", _bt_parallel_build_main } }; @@ -146,7 +150,7 @@ static void ParallelWorkerShutdown(int code, Datum arg); */ ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, - int nworkers) + int nworkers, bool serializable_okay) { MemoryContext oldcontext; ParallelContext *pcxt; @@ -167,9 +171,11 @@ CreateParallelContext(const char *library_name, const char *function_name, /* * If we are running under serializable isolation, we can't use parallel * workers, at least not until somebody enhances that mechanism to be - * parallel-aware. + * parallel-aware. Utility statement callers may ask us to ignore this + * restriction because they're always able to safely ignore the fact that + * SIREAD locks do not work with parallelism. */ - if (IsolationIsSerializable()) + if (IsolationIsSerializable() && !serializable_okay) nworkers = 0; /* We might be running in a short-lived memory context. */ diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 80860128fb..28ff2f0979 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -1137,7 +1137,7 @@ build_indices(void) heap = heap_open(ILHead->il_heap, NoLock); ind = index_open(ILHead->il_ind, NoLock); - index_build(heap, ind, ILHead->il_info, false, false); + index_build(heap, ind, ILHead->il_info, false, false, false); index_close(ind, NoLock); heap_close(heap, NoLock); diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 774c07b03a..0f34f5381a 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -2841,7 +2841,7 @@ RelationTruncateIndexes(Relation heapRelation) /* Initialize the index and rebuild */ /* Note: we do not need to re-establish pkey setting */ - index_build(heapRelation, currentIndex, indexInfo, false, true); + index_build(heapRelation, currentIndex, indexInfo, false, true, false); /* We're done with this index */ index_close(currentIndex, NoLock); diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 849a469127..f2cb6d7fb8 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -56,6 +56,7 @@ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" +#include "optimizer/planner.h" #include "parser/parser.h" #include "rewrite/rewriteManip.h" #include "storage/bufmgr.h" @@ -902,7 +903,7 @@ index_create(Relation heapRelation, Assert(indexRelationId == RelationGetRelid(indexRelation)); /* - * Obtain exclusive lock on it. Although no other backends can see it + * Obtain exclusive lock on it. Although no other transactions can see it * until we commit, this prevents deadlock-risk complaints from lock * manager in cases such as CLUSTER. */ @@ -1159,7 +1160,8 @@ index_create(Relation heapRelation, } else { - index_build(heapRelation, indexRelation, indexInfo, isprimary, false); + index_build(heapRelation, indexRelation, indexInfo, isprimary, false, + true); } /* @@ -1746,6 +1748,7 @@ BuildIndexInfo(Relation index) /* initialize index-build state to default */ ii->ii_Concurrent = false; ii->ii_BrokenHotChain = false; + ii->ii_ParallelWorkers = 0; /* set up for possible use by index AM */ ii->ii_Am = index->rd_rel->relam; @@ -2164,6 +2167,7 @@ index_update_stats(Relation rel, * * isprimary tells whether to mark the index as a primary-key index. * isreindex indicates we are recreating a previously-existing index. + * parallel indicates if parallelism may be useful. * * Note: when reindexing an existing index, isprimary can be false even if * the index is a PK; it's already properly marked and need not be re-marked. @@ -2177,7 +2181,8 @@ index_build(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, bool isprimary, - bool isreindex) + bool isreindex, + bool parallel) { IndexBuildResult *stats; Oid save_userid; @@ -2192,10 +2197,31 @@ index_build(Relation heapRelation, Assert(PointerIsValid(indexRelation->rd_amroutine->ambuild)); Assert(PointerIsValid(indexRelation->rd_amroutine->ambuildempty)); - ereport(DEBUG1, - (errmsg("building index \"%s\" on table \"%s\"", - RelationGetRelationName(indexRelation), - RelationGetRelationName(heapRelation)))); + /* + * Determine worker process details for parallel CREATE INDEX. Currently, + * only btree has support for parallel builds. + * + * Note that planner considers parallel safety for us. + */ + if (parallel && IsNormalProcessingMode() && + indexRelation->rd_rel->relam == BTREE_AM_OID) + indexInfo->ii_ParallelWorkers = + plan_create_index_workers(RelationGetRelid(heapRelation), + RelationGetRelid(indexRelation)); + + if (indexInfo->ii_ParallelWorkers == 0) + ereport(DEBUG1, + (errmsg("building index \"%s\" on table \"%s\" serially", + RelationGetRelationName(indexRelation), + RelationGetRelationName(heapRelation)))); + else + ereport(DEBUG1, + (errmsg_plural("building index \"%s\" on table \"%s\" with request for %d parallel worker", + "building index \"%s\" on table \"%s\" with request for %d parallel workers", + indexInfo->ii_ParallelWorkers, + RelationGetRelationName(indexRelation), + RelationGetRelationName(heapRelation), + indexInfo->ii_ParallelWorkers))); /* * Switch to the table owner's userid, so that any index functions are run @@ -2347,13 +2373,14 @@ IndexBuildHeapScan(Relation heapRelation, IndexInfo *indexInfo, bool allow_sync, IndexBuildCallback callback, - void *callback_state) + void *callback_state, + HeapScanDesc scan) { return IndexBuildHeapRangeScan(heapRelation, indexRelation, indexInfo, allow_sync, false, 0, InvalidBlockNumber, - callback, callback_state); + callback, callback_state, scan); } /* @@ -2375,11 +2402,11 @@ IndexBuildHeapRangeScan(Relation heapRelation, BlockNumber start_blockno, BlockNumber numblocks, IndexBuildCallback callback, - void *callback_state) + void *callback_state, + HeapScanDesc scan) { bool is_system_catalog; bool checking_uniqueness; - HeapScanDesc scan; HeapTuple heapTuple; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; @@ -2389,6 +2416,7 @@ IndexBuildHeapRangeScan(Relation heapRelation, EState *estate; ExprContext *econtext; Snapshot snapshot; + bool need_unregister_snapshot = false; TransactionId OldestXmin; BlockNumber root_blkno = InvalidBlockNumber; OffsetNumber root_offsets[MaxHeapTuplesPerPage]; @@ -2432,27 +2460,59 @@ IndexBuildHeapRangeScan(Relation heapRelation, * concurrent build, or during bootstrap, we take a regular MVCC snapshot * and index whatever's live according to that. */ - if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent) - { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); - OldestXmin = InvalidTransactionId; /* not used */ + OldestXmin = InvalidTransactionId; - /* "any visible" mode is not compatible with this */ - Assert(!anyvisible); + /* okay to ignore lazy VACUUMs here */ + if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent) + OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM); + + if (!scan) + { + /* + * Serial index build. + * + * Must begin our own heap scan in this case. We may also need to + * register a snapshot whose lifetime is under our direct control. + */ + if (!TransactionIdIsValid(OldestXmin)) + { + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + need_unregister_snapshot = true; + } + else + snapshot = SnapshotAny; + + scan = heap_beginscan_strat(heapRelation, /* relation */ + snapshot, /* snapshot */ + 0, /* number of keys */ + NULL, /* scan key */ + true, /* buffer access strategy OK */ + allow_sync); /* syncscan OK? */ } else { - snapshot = SnapshotAny; - /* okay to ignore lazy VACUUMs here */ - OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM); + /* + * Parallel index build. + * + * Parallel case never registers/unregisters own snapshot. Snapshot + * is taken from parallel heap scan, and is SnapshotAny or an MVCC + * snapshot, based on same criteria as serial case. + */ + Assert(!IsBootstrapProcessingMode()); + Assert(allow_sync); + snapshot = scan->rs_snapshot; } - scan = heap_beginscan_strat(heapRelation, /* relation */ - snapshot, /* snapshot */ - 0, /* number of keys */ - NULL, /* scan key */ - true, /* buffer access strategy OK */ - allow_sync); /* syncscan OK? */ + /* + * Must call GetOldestXmin() with SnapshotAny. Should never call + * GetOldestXmin() with MVCC snapshot. (It's especially worth checking + * this for parallel builds, since ambuild routines that support parallel + * builds must work these details out for themselves.) + */ + Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot)); + Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) : + !TransactionIdIsValid(OldestXmin)); + Assert(snapshot == SnapshotAny || !anyvisible); /* set our scan endpoints */ if (!allow_sync) @@ -2783,8 +2843,8 @@ IndexBuildHeapRangeScan(Relation heapRelation, heap_endscan(scan); - /* we can now forget our snapshot, if set */ - if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent) + /* we can now forget our snapshot, if set and registered by us */ + if (need_unregister_snapshot) UnregisterSnapshot(snapshot); ExecDropSingleTupleTableSlot(slot); @@ -3027,7 +3087,7 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot) state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator, InvalidOid, false, maintenance_work_mem, - false); + NULL, false); state.htups = state.itups = state.tups_inserted = 0; (void) index_bulk_delete(&ivinfo, NULL, @@ -3552,7 +3612,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks, char persistence, /* Initialize the index and rebuild */ /* Note: we do not need to re-establish pkey setting */ - index_build(heapRelation, iRel, indexInfo, false, true); + index_build(heapRelation, iRel, indexInfo, false, true, true); } PG_CATCH(); { @@ -3911,8 +3971,7 @@ SetReindexProcessing(Oid heapOid, Oid indexOid) static void ResetReindexProcessing(void) { - if (IsInParallelMode()) - elog(ERROR, "cannot modify reindex state during a parallel operation"); + /* This may be called in leader error path */ currentlyReindexedHeap = InvalidOid; currentlyReindexedIndex = InvalidOid; } diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c index cf37011b73..dcbad1286b 100644 --- a/src/backend/catalog/toasting.c +++ b/src/backend/catalog/toasting.c @@ -315,6 +315,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, indexInfo->ii_ReadyForInserts = true; indexInfo->ii_Concurrent = false; indexInfo->ii_BrokenHotChain = false; + indexInfo->ii_ParallelWorkers = 0; indexInfo->ii_Am = BTREE_AM_OID; indexInfo->ii_AmCache = NULL; indexInfo->ii_Context = CurrentMemoryContext; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 1701548d84..5d481dd50d 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -909,7 +909,8 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose, /* Set up sorting if wanted */ if (use_sort) tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex, - maintenance_work_mem, false); + maintenance_work_mem, + NULL, false); else tuplesort = NULL; diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index a9461a4b06..7c46613215 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -380,6 +380,10 @@ DefineIndex(Oid relationId, * this will typically require the caller to have already locked the * relation. To avoid lock upgrade hazards, that lock should be at least * as strong as the one we take here. + * + * NB: If the lock strength here ever changes, code that is run by + * parallel workers under the control of certain particular ambuild + * functions will need to be updated, too. */ lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock; rel = heap_open(relationId, lockmode); @@ -617,6 +621,7 @@ DefineIndex(Oid relationId, indexInfo->ii_ReadyForInserts = !stmt->concurrent; indexInfo->ii_Concurrent = stmt->concurrent; indexInfo->ii_BrokenHotChain = false; + indexInfo->ii_ParallelWorkers = 0; indexInfo->ii_Am = accessMethodId; indexInfo->ii_AmCache = NULL; indexInfo->ii_Context = CurrentMemoryContext; @@ -1000,7 +1005,7 @@ DefineIndex(Oid relationId, indexInfo->ii_BrokenHotChain = false; /* Now build the index */ - index_build(rel, indexRelation, indexInfo, stmt->primary, false); + index_build(rel, indexRelation, indexInfo, stmt->primary, false, true); /* Close both the relations, but keep the locks */ heap_close(rel, NoLock); diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f8b72ebab9..14b0b89463 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -592,7 +592,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ - pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); + pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false); pei->pcxt = pcxt; /* diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index ec62e7fb38..a86d4b68ea 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -373,7 +373,7 @@ initialize_phase(AggState *aggstate, int newphase) sortnode->collations, sortnode->nullsFirst, work_mem, - false); + NULL, false); } aggstate->current_phase = newphase; @@ -450,7 +450,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, pertrans->sortOperators[0], pertrans->sortCollations[0], pertrans->sortNullsFirst[0], - work_mem, false); + work_mem, NULL, false); } else pertrans->sortstates[aggstate->current_set] = @@ -460,7 +460,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, pertrans->sortOperators, pertrans->sortCollations, pertrans->sortNullsFirst, - work_mem, false); + work_mem, NULL, false); } /* diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 9c68de8565..d61c859fce 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -93,7 +93,7 @@ ExecSort(PlanState *pstate) plannode->collations, plannode->nullsFirst, work_mem, - node->randomAccess); + NULL, node->randomAccess); if (node->bounded) tuplesort_set_bound(tuplesortstate, node->bound); node->tuplesortstate = (void *) tuplesortstate; diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index fd1a58336b..5bff90e1bc 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -720,7 +720,8 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) { int parallel_workers; - parallel_workers = compute_parallel_worker(rel, rel->pages, -1); + parallel_workers = compute_parallel_worker(rel, rel->pages, -1, + max_parallel_workers_per_gather); /* If any limit was set to zero, the user doesn't want a parallel scan. */ if (parallel_workers <= 0) @@ -3299,7 +3300,8 @@ create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0, NULL, NULL); - parallel_workers = compute_parallel_worker(rel, pages_fetched, -1); + parallel_workers = compute_parallel_worker(rel, pages_fetched, -1, + max_parallel_workers_per_gather); if (parallel_workers <= 0) return; @@ -3319,9 +3321,13 @@ create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, * * "index_pages" is the number of pages from the index that we expect to scan, or * -1 if we don't expect to scan any. + * + * "max_workers" is caller's limit on the number of workers. This typically + * comes from a GUC. */ int -compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages) +compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages, + int max_workers) { int parallel_workers = 0; @@ -3392,10 +3398,8 @@ compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages) } } - /* - * In no case use more than max_parallel_workers_per_gather workers. - */ - parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather); + /* In no case use more than caller supplied maximum number of workers */ + parallel_workers = Min(parallel_workers, max_workers); return parallel_workers; } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 8679b14b29..29fea48ee2 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -682,7 +682,9 @@ cost_index(IndexPath *path, PlannerInfo *root, double loop_count, * order. */ path->path.parallel_workers = compute_parallel_worker(baserel, - rand_heap_pages, index_pages); + rand_heap_pages, + index_pages, + max_parallel_workers_per_gather); /* * Fall out if workers can't be assigned for parallel scan, because in diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2a4e22b6c8..740de4957d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -5793,6 +5793,142 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); } +/* + * plan_create_index_workers + * Use the planner to decide how many parallel worker processes + * CREATE INDEX should request for use + * + * tableOid is the table on which the index is to be built. indexOid is the + * OID of an index to be created or reindexed (which must be a btree index). + * + * Return value is the number of parallel worker processes to request. It + * may be unsafe to proceed if this is 0. Note that this does not include the + * leader participating as a worker (value is always a number of parallel + * worker processes). + * + * Note: caller had better already hold some type of lock on the table and + * index. + */ +int +plan_create_index_workers(Oid tableOid, Oid indexOid) +{ + PlannerInfo *root; + Query *query; + PlannerGlobal *glob; + RangeTblEntry *rte; + Relation heap; + Relation index; + RelOptInfo *rel; + int parallel_workers; + BlockNumber heap_blocks; + double reltuples; + double allvisfrac; + + /* Return immediately when parallelism disabled */ + if (max_parallel_maintenance_workers == 0) + return 0; + + /* Set up largely-dummy planner state */ + query = makeNode(Query); + query->commandType = CMD_SELECT; + + glob = makeNode(PlannerGlobal); + + root = makeNode(PlannerInfo); + root->parse = query; + root->glob = glob; + root->query_level = 1; + root->planner_cxt = CurrentMemoryContext; + root->wt_param_id = -1; + + /* + * Build a minimal RTE. + * + * Set the target's table to be an inheritance parent. This is a kludge + * that prevents problems within get_relation_info(), which does not + * expect that any IndexOptInfo is currently undergoing REINDEX. + */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = tableOid; + rte->relkind = RELKIND_RELATION; /* Don't be too picky. */ + rte->lateral = false; + rte->inh = true; + rte->inFromCl = true; + query->rtable = list_make1(rte); + + /* Set up RTE/RelOptInfo arrays */ + setup_simple_rel_arrays(root); + + /* Build RelOptInfo */ + rel = build_simple_rel(root, 1, NULL); + + heap = heap_open(tableOid, NoLock); + index = index_open(indexOid, NoLock); + + /* + * Determine if it's safe to proceed. + * + * Currently, parallel workers can't access the leader's temporary tables. + * Furthermore, any index predicate or index expressions must be parallel + * safe. + */ + if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP || + !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) || + !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index))) + { + parallel_workers = 0; + goto done; + } + + /* + * If parallel_workers storage parameter is set for the table, accept that + * as the number of parallel worker processes to launch (though still cap + * at max_parallel_maintenance_workers). Note that we deliberately do not + * consider any other factor when parallel_workers is set. (e.g., memory + * use by workers.) + */ + if (rel->rel_parallel_workers != -1) + { + parallel_workers = Min(rel->rel_parallel_workers, + max_parallel_maintenance_workers); + goto done; + } + + /* + * Estimate heap relation size ourselves, since rel->pages cannot be + * trusted (heap RTE was marked as inheritance parent) + */ + estimate_rel_size(heap, NULL, &heap_blocks, &reltuples, &allvisfrac); + + /* + * Determine number of workers to scan the heap relation using generic + * model + */ + parallel_workers = compute_parallel_worker(rel, heap_blocks, -1, + max_parallel_maintenance_workers); + + /* + * Cap workers based on available maintenance_work_mem as needed. + * + * Note that each tuplesort participant receives an even share of the + * total maintenance_work_mem budget. Aim to leave participants + * (including the leader as a participant) with no less than 32MB of + * memory. This leaves cases where maintenance_work_mem is set to 64MB + * immediately past the threshold of being capable of launching a single + * parallel worker to sort. + */ + while (parallel_workers > 0 && + maintenance_work_mem / (parallel_workers + 1) < 32768L) + parallel_workers--; + +done: + index_close(index, NoLock); + heap_close(heap, NoLock); + + return parallel_workers; +} + /* * get_partitioned_child_rels * Returns a list of the RT indexes of the partitioned child relations diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 605b1832be..96ba216387 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3655,6 +3655,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PARALLEL_BITMAP_SCAN: event_name = "ParallelBitmapScan"; break; + case WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN: + event_name = "ParallelCreateIndexScan"; + break; case WAIT_EVENT_PROCARRAY_GROUP_UPDATE: event_name = "ProcArrayGroupUpdate"; break; diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 4de6121ab9..c058c3fc43 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -271,7 +271,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) * Open a file that was previously created in another backend (or this one) * with BufFileCreateShared in the same SharedFileSet using the same name. * The backend that created the file must have called BufFileClose() or - * BufFileExport() to make sure that it is ready to be opened by other + * BufFileExportShared() to make sure that it is ready to be opened by other * backends and render it read-only. */ BufFile * @@ -800,3 +800,62 @@ BufFileTellBlock(BufFile *file) } #endif + +/* + * Return the current file size. Counts any holes left behind by + * BufFileViewAppend as part of the size. + */ +off_t +BufFileSize(BufFile *file) +{ + return ((file->numFiles - 1) * (off_t) MAX_PHYSICAL_FILESIZE) + + FileGetSize(file->files[file->numFiles - 1]); +} + +/* + * Append the contents of source file (managed within shared fileset) to + * end of target file (managed within same shared fileset). + * + * Note that operation subsumes ownership of underlying resources from + * "source". Caller should never call BufFileClose against source having + * called here first. Resource owners for source and target must match, + * too. + * + * This operation works by manipulating lists of segment files, so the + * file content is always appended at a MAX_PHYSICAL_FILESIZE-aligned + * boundary, typically creating empty holes before the boundary. These + * areas do not contain any interesting data, and cannot be read from by + * caller. + * + * Returns the block number within target where the contents of source + * begins. Caller should apply this as an offset when working off block + * positions that are in terms of the original BufFile space. + */ +long +BufFileAppend(BufFile *target, BufFile *source) +{ + long startBlock = target->numFiles * BUFFILE_SEG_SIZE; + int newNumFiles = target->numFiles + source->numFiles; + int i; + + Assert(target->fileset != NULL); + Assert(source->readOnly); + Assert(!source->dirty); + Assert(source->fileset != NULL); + + if (target->resowner != source->resowner) + elog(ERROR, "could not append BufFile with non-matching resource owner"); + + target->files = (File *) + repalloc(target->files, sizeof(File) * newNumFiles); + target->offsets = (off_t *) + repalloc(target->offsets, sizeof(off_t) * newNumFiles); + for (i = target->numFiles; i < newNumFiles; i++) + { + target->files[i] = source->files[i - target->numFiles]; + target->offsets[i] = 0L; + } + target->numFiles = newNumFiles; + + return startBlock; +} diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 71516a9a5a..2a18e94ff4 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -2262,6 +2262,16 @@ FileGetRawMode(File file) return VfdCache[file].fileMode; } +/* + * FileGetSize - returns the size of file + */ +off_t +FileGetSize(File file) +{ + Assert(FileIsValid(file)); + return VfdCache[file].fileSize; +} + /* * Make room for another allocatedDescs[] array entry if needed and possible. * Returns true if an array element is available. diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c index 79dbfd1a05..63d9c67027 100644 --- a/src/backend/utils/adt/orderedsetaggs.c +++ b/src/backend/utils/adt/orderedsetaggs.c @@ -291,6 +291,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) qstate->sortCollations, qstate->sortNullsFirsts, work_mem, + NULL, qstate->rescan_needed); else osastate->sortstate = tuplesort_begin_datum(qstate->sortColType, @@ -298,6 +299,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) qstate->sortCollation, qstate->sortNullsFirst, work_mem, + NULL, qstate->rescan_needed); osastate->number_of_rows = 0; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 54fa4a389e..446040d816 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -112,6 +112,7 @@ bool enableFsync = true; bool allowSystemTableMods = false; int work_mem = 1024; int maintenance_work_mem = 16384; +int max_parallel_maintenance_workers = 2; /* * Primary determinants of sizes of shared-memory structures. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 5884fa905e..87ba67661a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2734,6 +2734,16 @@ static struct config_int ConfigureNamesInt[] = check_autovacuum_max_workers, NULL, NULL }, + { + {"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Sets the maximum number of parallel processes per maintenance operation."), + NULL + }, + &max_parallel_maintenance_workers, + 2, 0, 1024, + NULL, NULL, NULL + }, + { {"max_parallel_workers_per_gather", PGC_USERSET, RESOURCES_ASYNCHRONOUS, gettext_noop("Sets the maximum number of parallel processes per executor node."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index abffde6b2b..9a3535559e 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -163,10 +163,11 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 # (change requires restart) +#max_parallel_maintenance_workers = 2 # taken from max_parallel_workers #max_parallel_workers_per_gather = 2 # taken from max_parallel_workers #parallel_leader_participation = on #max_parallel_workers = 8 # maximum number of max_worker_processes that - # can be used in parallel queries + # can be used in parallel operations #old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate # (change requires restart) #backend_flush_after = 0 # measured in pages, 0 disables diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 560d8ccda3..ad06e8e2ea 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -52,7 +52,7 @@ provider postgresql { probe query__done(const char *); probe statement__status(const char *); - probe sort__start(int, bool, int, int, bool); + probe sort__start(int, bool, int, int, bool, int); probe sort__done(bool, long); probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool); diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 2d07b3d3f5..6b7c10bcfc 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -64,6 +64,14 @@ * care that all calls for a single LogicalTapeSet are made in the same * palloc context. * + * To support parallel sort operations involving coordinated callers to + * tuplesort.c routines across multiple workers, it is necessary to + * concatenate each worker BufFile/tapeset into one single logical tapeset + * managed by the leader. Workers should have produced one final + * materialized tape (their entire output) when this happens in leader. + * There will always be the same number of runs as input tapes, and the same + * number of input tapes as participants (worker Tuplesortstates). + * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -76,6 +84,7 @@ #include "postgres.h" #include "storage/buffile.h" +#include "utils/builtins.h" #include "utils/logtape.h" #include "utils/memutils.h" @@ -129,16 +138,21 @@ typedef struct LogicalTape * a frozen tape. (When reading from an unfrozen tape, we use a larger * read buffer that holds multiple blocks, so the "current" block is * ambiguous.) + * + * When concatenation of worker tape BufFiles is performed, an offset to + * the first block in the unified BufFile space is applied during reads. */ long firstBlockNumber; long curBlockNumber; long nextBlockNumber; + long offsetBlockNumber; /* * Buffer for current data block(s). */ char *buffer; /* physical buffer (separately palloc'd) */ int buffer_size; /* allocated size of the buffer */ + int max_size; /* highest useful, safe buffer_size */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ } LogicalTape; @@ -159,10 +173,13 @@ struct LogicalTapeSet * by ltsGetFreeBlock(), and it is always greater than or equal to * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are * blocks that have been allocated for a tape, but have not been written - * to the underlying file yet. + * to the underlying file yet. nHoleBlocks tracks the total number of + * blocks that are in unused holes between worker spaces following BufFile + * concatenation. */ long nBlocksAllocated; /* # of blocks allocated */ long nBlocksWritten; /* # of blocks used in underlying file */ + long nHoleBlocks; /* # of "hole" blocks left */ /* * We store the numbers of recycled-and-available blocks in freeBlocks[]. @@ -192,6 +209,8 @@ static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer); static long ltsGetFreeBlock(LogicalTapeSet *lts); static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); +static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, + SharedFileSet *fileset); /* @@ -213,6 +232,11 @@ ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer) * previous tape isn't flushed to disk until the end of the sort, so you * get one-block hole, where the last block of the previous tape will * later go. + * + * Note that BufFile concatenation can leave "holes" in BufFile between + * worker-owned block ranges. These are tracked for reporting purposes + * only. We never read from nor write to these hole blocks, and so they + * are not considered here. */ while (blocknum > lts->nBlocksWritten) { @@ -267,15 +291,18 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt) do { char *thisbuf = lt->buffer + lt->nbytes; + long datablocknum = lt->nextBlockNumber; /* Fetch next block number */ - if (lt->nextBlockNumber == -1L) + if (datablocknum == -1L) break; /* EOF */ + /* Apply worker offset, needed for leader tapesets */ + datablocknum += lt->offsetBlockNumber; /* Read the block */ - ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf); + ltsReadBlock(lts, datablocknum, (void *) thisbuf); if (!lt->frozen) - ltsReleaseBlock(lts, lt->nextBlockNumber); + ltsReleaseBlock(lts, datablocknum); lt->curBlockNumber = lt->nextBlockNumber; lt->nbytes += TapeBlockGetNBytes(thisbuf); @@ -370,13 +397,116 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) lts->blocksSorted = false; } +/* + * Claim ownership of a set of logical tapes from existing shared BufFiles. + * + * Caller should be leader process. Though tapes are marked as frozen in + * workers, they are not frozen when opened within leader, since unfrozen tapes + * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized + * for random access.) + */ +static void +ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, + SharedFileSet *fileset) +{ + LogicalTape *lt = NULL; + long tapeblocks; + long nphysicalblocks = 0L; + int i; + + /* Should have at least one worker tape, plus leader's tape */ + Assert(lts->nTapes >= 2); + + /* + * Build concatenated view of all BufFiles, remembering the block number + * where each source file begins. No changes are needed for leader/last + * tape. + */ + for (i = 0; i < lts->nTapes - 1; i++) + { + char filename[MAXPGPATH]; + BufFile *file; + + lt = <s->tapes[i]; + + pg_itoa(i, filename); + file = BufFileOpenShared(fileset, filename); + + /* + * Stash first BufFile, and concatenate subsequent BufFiles to that. + * Store block offset into each tape as we go. + */ + lt->firstBlockNumber = shared[i].firstblocknumber; + if (i == 0) + { + lts->pfile = file; + lt->offsetBlockNumber = 0L; + } + else + { + lt->offsetBlockNumber = BufFileAppend(lts->pfile, file); + } + /* Don't allocate more for read buffer than could possibly help */ + lt->max_size = Min(MaxAllocSize, shared[i].buffilesize); + tapeblocks = shared[i].buffilesize / BLCKSZ; + nphysicalblocks += tapeblocks; + } + + /* + * Set # of allocated blocks, as well as # blocks written. Use extent of + * new BufFile space (from 0 to end of last worker's tape space) for this. + * Allocated/written blocks should include space used by holes left + * between concatenated BufFiles. + */ + lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks; + lts->nBlocksWritten = lts->nBlocksAllocated; + + /* + * Compute number of hole blocks so that we can later work backwards, and + * instrument number of physical blocks. We don't simply use physical + * blocks directly for instrumentation because this would break if we ever + * subsequently wrote to worker tape. + * + * Working backwards like this keeps our options open. If shared BufFiles + * ever support being written to post-export, logtape.c can automatically + * take advantage of that. We'd then support writing to the leader tape + * while recycling space from worker tapes, because the leader tape has a + * zero offset (write routines won't need to have extra logic to apply an + * offset). + * + * The only thing that currently prevents writing to the leader tape from + * working is the fact that BufFiles opened using BufFileOpenShared() are + * read-only by definition, but that could be changed if it seemed + * worthwhile. For now, writing to the leader tape will raise a "Bad file + * descriptor" error, so tuplesort must avoid writing to the leader tape + * altogether. + */ + lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks; +} + /* * Create a set of logical tapes in a temporary underlying file. * - * Each tape is initialized in write state. + * Each tape is initialized in write state. Serial callers pass ntapes, + * NULL argument for shared, and -1 for worker. Parallel worker callers + * pass ntapes, a shared file handle, NULL shared argument, and their own + * worker number. Leader callers, which claim shared worker tapes here, + * must supply non-sentinel values for all arguments except worker number, + * which should be -1. + * + * Leader caller is passing back an array of metadata each worker captured + * when LogicalTapeFreeze() was called for their final result tapes. Passed + * tapes array is actually sized ntapes - 1, because it includes only + * worker tapes, whereas leader requires its own leader tape. Note that we + * rely on the assumption that reclaimed worker tapes will only be read + * from once by leader, and never written to again (tapes are initialized + * for writing, but that's only to be consistent). Leader may not write to + * its own tape purely due to a restriction in the shared buffile + * infrastructure that may be lifted in the future. */ LogicalTapeSet * -LogicalTapeSetCreate(int ntapes) +LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, + int worker) { LogicalTapeSet *lts; LogicalTape *lt; @@ -388,9 +518,9 @@ LogicalTapeSetCreate(int ntapes) Assert(ntapes > 0); lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) + ntapes * sizeof(LogicalTape)); - lts->pfile = BufFileCreateTemp(false); lts->nBlocksAllocated = 0L; lts->nBlocksWritten = 0L; + lts->nHoleBlocks = 0L; lts->forgetFreeSpace = false; lts->blocksSorted = true; /* a zero-length array is sorted ... */ lts->freeBlocksLen = 32; /* reasonable initial guess */ @@ -412,11 +542,36 @@ LogicalTapeSetCreate(int ntapes) lt->dirty = false; lt->firstBlockNumber = -1L; lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; lt->buffer = NULL; lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; lt->pos = 0; lt->nbytes = 0; } + + /* + * Create temp BufFile storage as required. + * + * Leader concatenates worker tapes, which requires special adjustment to + * final tapeset data. Things are simpler for the worker case and the + * serial case, though. They are generally very similar -- workers use a + * shared fileset, whereas serial sorts use a conventional serial BufFile. + */ + if (shared) + ltsConcatWorkerTapes(lts, shared, fileset); + else if (fileset) + { + char filename[MAXPGPATH]; + + pg_itoa(worker, filename); + lts->pfile = BufFileCreateShared(fileset, filename); + } + else + lts->pfile = BufFileCreateTemp(false); + return lts; } @@ -470,6 +625,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->writing); + Assert(lt->offsetBlockNumber == 0L); /* Allocate data buffer and first block on first write */ if (lt->buffer == NULL) @@ -566,12 +722,9 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) if (buffer_size < BLCKSZ) buffer_size = BLCKSZ; - /* - * palloc() larger than MaxAllocSize would fail (a multi-gigabyte - * buffer is unlikely to be helpful, anyway) - */ - if (buffer_size > MaxAllocSize) - buffer_size = MaxAllocSize; + /* palloc() larger than max_size is unlikely to be helpful */ + if (buffer_size > lt->max_size) + buffer_size = lt->max_size; /* round down to BLCKSZ boundary */ buffer_size -= buffer_size % BLCKSZ; @@ -698,15 +851,22 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, * tape is rewound (after rewind is too late!). It performs a rewind * and switch to read mode "for free". An immediately following rewind- * for-read call is OK but not necessary. + * + * share output argument is set with details of storage used for tape after + * freezing, which may be passed to LogicalTapeSetCreate within leader + * process later. This metadata is only of interest to worker callers + * freezing their final output for leader (single materialized tape). + * Serial sorts should set share to NULL. */ void -LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum) +LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) { LogicalTape *lt; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->writing); + Assert(lt->offsetBlockNumber == 0L); /* * Completion of a write phase. Flush last partial data block, and rewind @@ -749,6 +909,14 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum) else lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next; lt->nbytes = TapeBlockGetNBytes(lt->buffer); + + /* Handle extra steps when caller is to share its tapeset */ + if (share) + { + BufFileExportShared(lts->pfile); + share->firstblocknumber = lt->firstBlockNumber; + share->buffilesize = BufFileSize(lts->pfile); + } } /* @@ -874,6 +1042,7 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum, Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; + Assert(lt->offsetBlockNumber == 0L); /* With a larger buffer, 'pos' wouldn't be the same as offset within page */ Assert(lt->buffer_size == BLCKSZ); @@ -888,5 +1057,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long LogicalTapeSetBlocks(LogicalTapeSet *lts) { - return lts->nBlocksAllocated; + return lts->nBlocksAllocated - lts->nHoleBlocks; } diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index eecc66cafa..041bdc2fa7 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -74,6 +74,14 @@ * above. Nonetheless, with large workMem we can have many tapes (but not * too many -- see the comments in tuplesort_merge_order). * + * This module supports parallel sorting. Parallel sorts involve coordination + * among one or more worker processes, and a leader process, each with its own + * tuplesort state. The leader process (or, more accurately, the + * Tuplesortstate associated with a leader process) creates a full tapeset + * consisting of worker tapes with one run to merge; a run for every + * worker process. This is then merged. Worker processes are guaranteed to + * produce exactly one output run from their partial input. + * * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -113,6 +121,10 @@ #define DATUM_SORT 2 #define CLUSTER_SORT 3 +/* Sort parallel code from state for sort__start probes */ +#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \ + (state)->worker >= 0 ? 1 : 2) + /* GUC variables */ #ifdef TRACE_SORT bool trace_sort = false; @@ -374,6 +386,25 @@ struct Tuplesortstate int markpos_offset; /* saved "current", or offset in tape block */ bool markpos_eof; /* saved "eof_reached" */ + /* + * These variables are used during parallel sorting. + * + * worker is our worker identifier. Follows the general convention that + * -1 value relates to a leader tuplesort, and values >= 0 worker + * tuplesorts. (-1 can also be a serial tuplesort.) + * + * shared is mutable shared memory state, which is used to coordinate + * parallel sorts. + * + * nParticipants is the number of worker Tuplesortstates known by the + * leader to have actually been launched, which implies that they must + * finish a run leader can merge. Typically includes a worker state held + * by the leader process itself. Set in the leader Tuplesortstate only. + */ + int worker; + Sharedsort *shared; + int nParticipants; + /* * The sortKeys variable is used by every case other than the hash index * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the @@ -435,6 +466,39 @@ struct Tuplesortstate #endif }; +/* + * Private mutable state of tuplesort-parallel-operation. This is allocated + * in shared memory. + */ +struct Sharedsort +{ + /* mutex protects all fields prior to tapes */ + slock_t mutex; + + /* + * currentWorker generates ordinal identifier numbers for parallel sort + * workers. These start from 0, and are always gapless. + * + * Workers increment workersFinished to indicate having finished. If this + * is equal to state.nParticipants within the leader, leader is ready to + * merge worker runs. + */ + int currentWorker; + int workersFinished; + + /* Temporary file space */ + SharedFileSet fileset; + + /* Size of tapes flexible array */ + int nTapes; + + /* + * Tapes array used by workers to report back information needed by the + * leader to concatenate all worker tapes into one for merging + */ + TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]; +}; + /* * Is the given tuple allocated from the slab memory arena? */ @@ -465,6 +529,9 @@ struct Tuplesortstate #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) +#define SERIAL(state) ((state)->shared == NULL) +#define WORKER(state) ((state)->shared && (state)->worker != -1) +#define LEADER(state) ((state)->shared && (state)->worker == -1) /* * NOTES about on-tape representation of tuples: @@ -521,10 +588,13 @@ struct Tuplesortstate } while(0) -static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess); +static Tuplesortstate *tuplesort_begin_common(int workMem, + SortCoordinate coordinate, + bool randomAccess); static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); static bool consider_abort_common(Tuplesortstate *state); -static void inittapes(Tuplesortstate *state); +static void inittapes(Tuplesortstate *state, bool mergeruns); +static void inittapestate(Tuplesortstate *state, int maxTapes); static void selectnewtape(Tuplesortstate *state); static void init_slab_allocator(Tuplesortstate *state, int numSlots); static void mergeruns(Tuplesortstate *state); @@ -572,6 +642,10 @@ static void writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); +static int worker_get_identifier(Tuplesortstate *state); +static void worker_freeze_result_tape(Tuplesortstate *state); +static void worker_nomergeruns(Tuplesortstate *state); +static void leader_takeover_tapes(Tuplesortstate *state); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); /* @@ -604,13 +678,18 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); */ static Tuplesortstate * -tuplesort_begin_common(int workMem, bool randomAccess) +tuplesort_begin_common(int workMem, SortCoordinate coordinate, + bool randomAccess) { Tuplesortstate *state; MemoryContext sortcontext; MemoryContext tuplecontext; MemoryContext oldcontext; + /* See leader_takeover_tapes() remarks on randomAccess support */ + if (coordinate && randomAccess) + elog(ERROR, "random access disallowed under parallel sort"); + /* * Create a working memory context for this sort operation. All data * needed by the sort will live inside this context. @@ -650,7 +729,14 @@ tuplesort_begin_common(int workMem, bool randomAccess) state->bounded = false; state->tuples = true; state->boundUsed = false; - state->allowedMem = workMem * (int64) 1024; + + /* + * workMem is forced to be at least 64KB, the current minimum valid value + * for the work_mem GUC. This is a defense against parallel sort callers + * that divide out memory among many workers in a way that leaves each + * with very little memory. + */ + state->allowedMem = Max(workMem, 64) * (int64) 1024; state->availMem = state->allowedMem; state->sortcontext = sortcontext; state->tuplecontext = tuplecontext; @@ -684,6 +770,33 @@ tuplesort_begin_common(int workMem, bool randomAccess) state->result_tape = -1; /* flag that result tape has not been formed */ + /* + * Initialize parallel-related state based on coordination information + * from caller + */ + if (!coordinate) + { + /* Serial sort */ + state->shared = NULL; + state->worker = -1; + state->nParticipants = -1; + } + else if (coordinate->isWorker) + { + /* Parallel worker produces exactly one final run from all input */ + state->shared = coordinate->sharedsort; + state->worker = worker_get_identifier(state); + state->nParticipants = -1; + } + else + { + /* Parallel leader state only used for final merge */ + state->shared = coordinate->sharedsort; + state->worker = -1; + state->nParticipants = coordinate->nParticipants; + Assert(state->nParticipants >= 1); + } + MemoryContextSwitchTo(oldcontext); return state; @@ -694,9 +807,10 @@ tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, - int workMem, bool randomAccess) + int workMem, SortCoordinate coordinate, bool randomAccess) { - Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + randomAccess); MemoryContext oldcontext; int i; @@ -717,7 +831,8 @@ tuplesort_begin_heap(TupleDesc tupDesc, false, /* no unique check */ nkeys, workMem, - randomAccess); + randomAccess, + PARALLEL_SORT(state)); state->comparetup = comparetup_heap; state->copytup = copytup_heap; @@ -764,9 +879,11 @@ tuplesort_begin_heap(TupleDesc tupDesc, Tuplesortstate * tuplesort_begin_cluster(TupleDesc tupDesc, Relation indexRel, - int workMem, bool randomAccess) + int workMem, + SortCoordinate coordinate, bool randomAccess) { - Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + randomAccess); ScanKey indexScanKey; MemoryContext oldcontext; int i; @@ -789,7 +906,8 @@ tuplesort_begin_cluster(TupleDesc tupDesc, false, /* no unique check */ state->nKeys, workMem, - randomAccess); + randomAccess, + PARALLEL_SORT(state)); state->comparetup = comparetup_cluster; state->copytup = copytup_cluster; @@ -857,9 +975,12 @@ Tuplesortstate * tuplesort_begin_index_btree(Relation heapRel, Relation indexRel, bool enforceUnique, - int workMem, bool randomAccess) + int workMem, + SortCoordinate coordinate, + bool randomAccess) { - Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + randomAccess); ScanKey indexScanKey; MemoryContext oldcontext; int i; @@ -880,7 +1001,8 @@ tuplesort_begin_index_btree(Relation heapRel, enforceUnique, state->nKeys, workMem, - randomAccess); + randomAccess, + PARALLEL_SORT(state)); state->comparetup = comparetup_index_btree; state->copytup = copytup_index; @@ -934,9 +1056,12 @@ tuplesort_begin_index_hash(Relation heapRel, uint32 high_mask, uint32 low_mask, uint32 max_buckets, - int workMem, bool randomAccess) + int workMem, + SortCoordinate coordinate, + bool randomAccess) { - Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + randomAccess); MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(state->sortcontext); @@ -973,10 +1098,11 @@ tuplesort_begin_index_hash(Relation heapRel, Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, - bool nullsFirstFlag, - int workMem, bool randomAccess) + bool nullsFirstFlag, int workMem, + SortCoordinate coordinate, bool randomAccess) { - Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + randomAccess); MemoryContext oldcontext; int16 typlen; bool typbyval; @@ -996,7 +1122,8 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, false, /* no unique check */ 1, workMem, - randomAccess); + randomAccess, + PARALLEL_SORT(state)); state->comparetup = comparetup_datum; state->copytup = copytup_datum; @@ -1054,7 +1181,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, * delayed calls at the moment.) * * This is a hint only. The tuplesort may still return more tuples than - * requested. + * requested. Parallel leader tuplesorts will always ignore the hint. */ void tuplesort_set_bound(Tuplesortstate *state, int64 bound) @@ -1063,6 +1190,7 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound) Assert(state->status == TSS_INITIAL); Assert(state->memtupcount == 0); Assert(!state->bounded); + Assert(!WORKER(state)); #ifdef DEBUG_BOUNDED_SORT /* Honor GUC setting that disables the feature (for easy testing) */ @@ -1070,6 +1198,10 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound) return; #endif + /* Parallel leader ignores hint */ + if (LEADER(state)) + return; + /* We want to be able to compute bound * 2, so limit the setting */ if (bound > (int64) (INT_MAX / 2)) return; @@ -1128,11 +1260,13 @@ tuplesort_end(Tuplesortstate *state) if (trace_sort) { if (state->tapeset) - elog(LOG, "external sort ended, %ld disk blocks used: %s", - spaceUsed, pg_rusage_show(&state->ru_start)); + elog(LOG, "%s of %d ended, %ld disk blocks used: %s", + SERIAL(state) ? "external sort" : "parallel external sort", + state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); else - elog(LOG, "internal sort ended, %ld KB used: %s", - spaceUsed, pg_rusage_show(&state->ru_start)); + elog(LOG, "%s of %d ended, %ld KB used: %s", + SERIAL(state) ? "internal sort" : "unperformed parallel sort", + state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); } TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed); @@ -1503,6 +1637,8 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) static void puttuple_common(Tuplesortstate *state, SortTuple *tuple) { + Assert(!LEADER(state)); + switch (state->status) { case TSS_INITIAL: @@ -1556,7 +1692,7 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple) /* * Nope; time to switch to tape-based operation. */ - inittapes(state); + inittapes(state, true); /* * Dump all tuples. @@ -1658,8 +1794,8 @@ tuplesort_performsort(Tuplesortstate *state) #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "performsort starting: %s", - pg_rusage_show(&state->ru_start)); + elog(LOG, "performsort of %d starting: %s", + state->worker, pg_rusage_show(&state->ru_start)); #endif switch (state->status) @@ -1668,14 +1804,39 @@ tuplesort_performsort(Tuplesortstate *state) /* * We were able to accumulate all the tuples within the allowed - * amount of memory. Just qsort 'em and we're done. + * amount of memory, or leader to take over worker tapes */ - tuplesort_sort_memtuples(state); + if (SERIAL(state)) + { + /* Just qsort 'em and we're done */ + tuplesort_sort_memtuples(state); + state->status = TSS_SORTEDINMEM; + } + else if (WORKER(state)) + { + /* + * Parallel workers must still dump out tuples to tape. No + * merge is required to produce single output run, though. + */ + inittapes(state, false); + dumptuples(state, true); + worker_nomergeruns(state); + state->status = TSS_SORTEDONTAPE; + } + else + { + /* + * Leader will take over worker tapes and merge worker runs. + * Note that mergeruns sets the correct state->status. + */ + leader_takeover_tapes(state); + mergeruns(state); + } state->current = 0; state->eof_reached = false; + state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; - state->status = TSS_SORTEDINMEM; break; case TSS_BOUNDED: @@ -1698,8 +1859,8 @@ tuplesort_performsort(Tuplesortstate *state) /* * Finish tape-based sort. First, flush all tuples remaining in * memory out to tape; then merge until we have a single remaining - * run (or, if !randomAccess, one run per tape). Note that - * mergeruns sets the correct state->status. + * run (or, if !randomAccess and !WORKER(), one run per tape). + * Note that mergeruns sets the correct state->status. */ dumptuples(state, true); mergeruns(state); @@ -1718,12 +1879,12 @@ tuplesort_performsort(Tuplesortstate *state) if (trace_sort) { if (state->status == TSS_FINALMERGE) - elog(LOG, "performsort done (except %d-way final merge): %s", - state->activeTapes, + elog(LOG, "performsort of %d done (except %d-way final merge): %s", + state->worker, state->activeTapes, pg_rusage_show(&state->ru_start)); else - elog(LOG, "performsort done: %s", - pg_rusage_show(&state->ru_start)); + elog(LOG, "performsort of %d done: %s", + state->worker, pg_rusage_show(&state->ru_start)); } #endif @@ -1744,6 +1905,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, unsigned int tuplen; size_t nmoved; + Assert(!WORKER(state)); + switch (state->status) { case TSS_SORTEDINMEM: @@ -2127,6 +2290,7 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward) */ Assert(forward); Assert(ntuples >= 0); + Assert(!WORKER(state)); switch (state->status) { @@ -2221,57 +2385,40 @@ tuplesort_merge_order(int64 allowedMem) /* * inittapes - initialize for tape sorting. * - * This is called only if we have found we don't have room to sort in memory. + * This is called only if we have found we won't sort in memory. */ static void -inittapes(Tuplesortstate *state) +inittapes(Tuplesortstate *state, bool mergeruns) { int maxTapes, j; - int64 tapeSpace; - /* Compute number of tapes to use: merge order plus 1 */ - maxTapes = tuplesort_merge_order(state->allowedMem) + 1; + Assert(!LEADER(state)); - state->maxTapes = maxTapes; - state->tapeRange = maxTapes - 1; + if (mergeruns) + { + /* Compute number of tapes to use: merge order plus 1 */ + maxTapes = tuplesort_merge_order(state->allowedMem) + 1; + } + else + { + /* Workers can sometimes produce single run, output without merge */ + Assert(WORKER(state)); + maxTapes = MINORDER + 1; + } #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "switching to external sort with %d tapes: %s", - maxTapes, pg_rusage_show(&state->ru_start)); + elog(LOG, "%d switching to external sort with %d tapes: %s", + state->worker, maxTapes, pg_rusage_show(&state->ru_start)); #endif - /* - * Decrease availMem to reflect the space needed for tape buffers, when - * writing the initial runs; but don't decrease it to the point that we - * have no room for tuples. (That case is only likely to occur if sorting - * pass-by-value Datums; in all other scenarios the memtuples[] array is - * unlikely to occupy more than half of allowedMem. In the pass-by-value - * case it's not important to account for tuple space, so we don't care if - * LACKMEM becomes inaccurate.) - */ - tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; - - if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) - USEMEM(state, tapeSpace); - - /* - * Make sure that the temp file(s) underlying the tape set are created in - * suitable temp tablespaces. - */ - PrepareTempTablespaces(); - - /* - * Create the tape set and allocate the per-tape data arrays. - */ - state->tapeset = LogicalTapeSetCreate(maxTapes); - - state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); - state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); - state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); - state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); - state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); + /* Create the tape set and allocate the per-tape data arrays */ + inittapestate(state, maxTapes); + state->tapeset = + LogicalTapeSetCreate(maxTapes, NULL, + state->shared ? &state->shared->fileset : NULL, + state->worker); state->currentRun = 0; @@ -2294,6 +2441,47 @@ inittapes(Tuplesortstate *state) state->status = TSS_BUILDRUNS; } +/* + * inittapestate - initialize generic tape management state + */ +static void +inittapestate(Tuplesortstate *state, int maxTapes) +{ + int64 tapeSpace; + + /* + * Decrease availMem to reflect the space needed for tape buffers; but + * don't decrease it to the point that we have no room for tuples. (That + * case is only likely to occur if sorting pass-by-value Datums; in all + * other scenarios the memtuples[] array is unlikely to occupy more than + * half of allowedMem. In the pass-by-value case it's not important to + * account for tuple space, so we don't care if LACKMEM becomes + * inaccurate.) + */ + tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; + + if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) + USEMEM(state, tapeSpace); + + /* + * Make sure that the temp file(s) underlying the tape set are created in + * suitable temp tablespaces. For parallel sorts, this should have been + * called already, but it doesn't matter if it is called a second time. + */ + PrepareTempTablespaces(); + + state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); + state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); + state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); + state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); + state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); + + /* Record # of tapes allocated (for duration of sort) */ + state->maxTapes = maxTapes; + /* Record maximum # of tapes usable as inputs when merging */ + state->tapeRange = maxTapes - 1; +} + /* * selectnewtape -- select new tape for new initial run. * @@ -2471,8 +2659,8 @@ mergeruns(Tuplesortstate *state) */ #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", - (state->availMem) / 1024, numInputTapes); + elog(LOG, "%d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", + state->worker, state->availMem / 1024, numInputTapes); #endif state->read_buffer_size = Max(state->availMem / numInputTapes, 0); @@ -2490,7 +2678,7 @@ mergeruns(Tuplesortstate *state) * pass remains. If we don't have to produce a materialized sorted * tape, we can stop at this point and do the final merge on-the-fly. */ - if (!state->randomAccess) + if (!state->randomAccess && !WORKER(state)) { bool allOneRun = true; @@ -2575,7 +2763,10 @@ mergeruns(Tuplesortstate *state) * a waste of cycles anyway... */ state->result_tape = state->tp_tapenum[state->tapeRange]; - LogicalTapeFreeze(state->tapeset, state->result_tape); + if (!WORKER(state)) + LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); + else + worker_freeze_result_tape(state); state->status = TSS_SORTEDONTAPE; /* Release the read buffers of all the other tapes, by rewinding them. */ @@ -2644,8 +2835,8 @@ mergeonerun(Tuplesortstate *state) #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "finished %d-way merge step: %s", state->activeTapes, - pg_rusage_show(&state->ru_start)); + elog(LOG, "%d finished %d-way merge step: %s", state->worker, + state->activeTapes, pg_rusage_show(&state->ru_start)); #endif } @@ -2779,8 +2970,9 @@ dumptuples(Tuplesortstate *state, bool alltuples) #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "starting quicksort of run %d: %s", - state->currentRun, pg_rusage_show(&state->ru_start)); + elog(LOG, "%d starting quicksort of run %d: %s", + state->worker, state->currentRun, + pg_rusage_show(&state->ru_start)); #endif /* @@ -2791,8 +2983,9 @@ dumptuples(Tuplesortstate *state, bool alltuples) #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "finished quicksort of run %d: %s", - state->currentRun, pg_rusage_show(&state->ru_start)); + elog(LOG, "%d finished quicksort of run %d: %s", + state->worker, state->currentRun, + pg_rusage_show(&state->ru_start)); #endif memtupwrite = state->memtupcount; @@ -2818,8 +3011,8 @@ dumptuples(Tuplesortstate *state, bool alltuples) #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "finished writing run %d to tape %d: %s", - state->currentRun, state->destTape, + elog(LOG, "%d finished writing run %d to tape %d: %s", + state->worker, state->currentRun, state->destTape, pg_rusage_show(&state->ru_start)); #endif @@ -3031,6 +3224,7 @@ make_bounded_heap(Tuplesortstate *state) Assert(state->status == TSS_INITIAL); Assert(state->bounded); Assert(tupcount >= state->bound); + Assert(SERIAL(state)); /* Reverse sort direction so largest entry will be at root */ reversedirection(state); @@ -3078,6 +3272,7 @@ sort_bounded_heap(Tuplesortstate *state) Assert(state->status == TSS_BOUNDED); Assert(state->bounded); Assert(tupcount == state->bound); + Assert(SERIAL(state)); /* * We can unheapify in place because each delete-top call will remove the @@ -3112,6 +3307,8 @@ sort_bounded_heap(Tuplesortstate *state) static void tuplesort_sort_memtuples(Tuplesortstate *state) { + Assert(!LEADER(state)); + if (state->memtupcount > 1) { /* Can we use the single-key sort function? */ @@ -4151,6 +4348,230 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, &tuplen, sizeof(tuplen)); } +/* + * Parallel sort routines + */ + +/* + * tuplesort_estimate_shared - estimate required shared memory allocation + * + * nWorkers is an estimate of the number of workers (it's the number that + * will be requested). + */ +Size +tuplesort_estimate_shared(int nWorkers) +{ + Size tapesSize; + + Assert(nWorkers > 0); + + /* Make sure that BufFile shared state is MAXALIGN'd */ + tapesSize = mul_size(sizeof(TapeShare), nWorkers); + tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes))); + + return tapesSize; +} + +/* + * tuplesort_initialize_shared - initialize shared tuplesort state + * + * Must be called from leader process before workers are launched, to + * establish state needed up-front for worker tuplesortstates. nWorkers + * should match the argument passed to tuplesort_estimate_shared(). + */ +void +tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg) +{ + int i; + + Assert(nWorkers > 0); + + SpinLockInit(&shared->mutex); + shared->currentWorker = 0; + shared->workersFinished = 0; + SharedFileSetInit(&shared->fileset, seg); + shared->nTapes = nWorkers; + for (i = 0; i < nWorkers; i++) + { + shared->tapes[i].firstblocknumber = 0L; + shared->tapes[i].buffilesize = 0; + } +} + +/* + * tuplesort_attach_shared - attach to shared tuplesort state + * + * Must be called by all worker processes. + */ +void +tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg) +{ + /* Attach to SharedFileSet */ + SharedFileSetAttach(&shared->fileset, seg); +} + +/* + * worker_get_identifier - Assign and return ordinal identifier for worker + * + * The order in which these are assigned is not well defined, and should not + * matter; worker numbers across parallel sort participants need only be + * distinct and gapless. logtape.c requires this. + * + * Note that the identifiers assigned from here have no relation to + * ParallelWorkerNumber number, to avoid making any assumption about + * caller's requirements. However, we do follow the ParallelWorkerNumber + * convention of representing a non-worker with worker number -1. This + * includes the leader, as well as serial Tuplesort processes. + */ +static int +worker_get_identifier(Tuplesortstate *state) +{ + Sharedsort *shared = state->shared; + int worker; + + Assert(WORKER(state)); + + SpinLockAcquire(&shared->mutex); + worker = shared->currentWorker++; + SpinLockRelease(&shared->mutex); + + return worker; +} + +/* + * worker_freeze_result_tape - freeze worker's result tape for leader + * + * This is called by workers just after the result tape has been determined, + * instead of calling LogicalTapeFreeze() directly. They do so because + * workers require a few additional steps over similar serial + * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra + * steps are around freeing now unneeded resources, and representing to + * leader that worker's input run is available for its merge. + * + * There should only be one final output run for each worker, which consists + * of all tuples that were originally input into worker. + */ +static void +worker_freeze_result_tape(Tuplesortstate *state) +{ + Sharedsort *shared = state->shared; + TapeShare output; + + Assert(WORKER(state)); + Assert(state->result_tape != -1); + Assert(state->memtupcount == 0); + + /* + * Free most remaining memory, in case caller is sensitive to our holding + * on to it. memtuples may not be a tiny merge heap at this point. + */ + pfree(state->memtuples); + /* Be tidy */ + state->memtuples = NULL; + state->memtupsize = 0; + + /* + * Parallel worker requires result tape metadata, which is to be stored in + * shared memory for leader + */ + LogicalTapeFreeze(state->tapeset, state->result_tape, &output); + + /* Store properties of output tape, and update finished worker count */ + SpinLockAcquire(&shared->mutex); + shared->tapes[state->worker] = output; + shared->workersFinished++; + SpinLockRelease(&shared->mutex); +} + +/* + * worker_nomergeruns - dump memtuples in worker, without merging + * + * This called as an alternative to mergeruns() with a worker when no + * merging is required. + */ +static void +worker_nomergeruns(Tuplesortstate *state) +{ + Assert(WORKER(state)); + Assert(state->result_tape == -1); + + state->result_tape = state->tp_tapenum[state->destTape]; + worker_freeze_result_tape(state); +} + +/* + * leader_takeover_tapes - create tapeset for leader from worker tapes + * + * So far, leader Tuplesortstate has performed no actual sorting. By now, all + * sorting has occurred in workers, all of which must have already returned + * from tuplesort_performsort(). + * + * When this returns, leader process is left in a state that is virtually + * indistinguishable from it having generated runs as a serial external sort + * might have. + */ +static void +leader_takeover_tapes(Tuplesortstate *state) +{ + Sharedsort *shared = state->shared; + int nParticipants = state->nParticipants; + int workersFinished; + int j; + + Assert(LEADER(state)); + Assert(nParticipants >= 1); + + SpinLockAcquire(&shared->mutex); + workersFinished = shared->workersFinished; + SpinLockRelease(&shared->mutex); + + if (nParticipants != workersFinished) + elog(ERROR, "cannot take over tapes before all workers finish"); + + /* + * Create the tapeset from worker tapes, including a leader-owned tape at + * the end. Parallel workers are far more expensive than logical tapes, + * so the number of tapes allocated here should never be excessive. + * + * We still have a leader tape, though it's not possible to write to it + * due to restrictions in the shared fileset infrastructure used by + * logtape.c. It will never be written to in practice because + * randomAccess is disallowed for parallel sorts. + */ + inittapestate(state, nParticipants + 1); + state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes, + &shared->fileset, state->worker); + + /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ + state->currentRun = nParticipants; + + /* + * Initialize variables of Algorithm D to be consistent with runs from + * workers having been generated in the leader. + * + * There will always be exactly 1 run per worker, and exactly one input + * tape per run, because workers always output exactly 1 run, even when + * there were no input tuples for workers to sort. + */ + for (j = 0; j < state->maxTapes; j++) + { + /* One real run; no dummy runs for worker tapes */ + state->tp_fib[j] = 1; + state->tp_runs[j] = 1; + state->tp_dummy[j] = 0; + state->tp_tapenum[j] = j; + } + /* Leader tape gets one dummy run, and no real runs */ + state->tp_fib[state->tapeRange] = 0; + state->tp_runs[state->tapeRange] = 0; + state->tp_dummy[state->tapeRange] = 1; + + state->Level = 1; + state->destTape = 0; + + state->status = TSS_BUILDRUNS; +} + /* * Convenience routine to free a tuple previously loaded into sort memory */ diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index d28f413c66..0f6a40168c 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -21,6 +21,7 @@ #include "catalog/pg_index.h" #include "lib/stringinfo.h" #include "storage/bufmgr.h" +#include "storage/shm_toc.h" /* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */ typedef uint16 BTCycleId; @@ -430,8 +431,6 @@ typedef BTScanOpaqueData *BTScanOpaque; /* * external entry points for btree, in nbtree.c */ -extern IndexBuildResult *btbuild(Relation heap, Relation index, - struct IndexInfo *indexInfo); extern void btbuildempty(Relation index); extern bool btinsert(Relation rel, Datum *values, bool *isnull, ItemPointer ht_ctid, Relation heapRel, @@ -547,13 +546,8 @@ extern bool btvalidate(Oid opclassoid); /* * prototypes for functions in nbtsort.c */ -typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */ - -extern BTSpool *_bt_spoolinit(Relation heap, Relation index, - bool isunique, bool isdead); -extern void _bt_spooldestroy(BTSpool *btspool); -extern void _bt_spool(BTSpool *btspool, ItemPointer self, - Datum *values, bool *isnull); -extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2); +extern IndexBuildResult *btbuild(Relation heap, Relation index, + struct IndexInfo *indexInfo); +extern void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc); #endif /* NBTREE_H */ diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index d0c218b185..025691fd82 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -59,7 +59,9 @@ extern PGDLLIMPORT bool InitializingParallelWorker; #define IsParallelWorker() (ParallelWorkerNumber >= 0) -extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers); +extern ParallelContext *CreateParallelContext(const char *library_name, + const char *function_name, int nworkers, + bool serializable_okay); extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 9c603ca637..18c7dedd5d 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -39,6 +39,7 @@ typedef struct ParallelHeapScanDescData BlockNumber phs_startblock; /* starting block number */ pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to * workers so far. */ + bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */ char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; } ParallelHeapScanDescData; diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 235e180299..a5cd8ddb1e 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -104,14 +104,16 @@ extern void index_build(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, bool isprimary, - bool isreindex); + bool isreindex, + bool parallel); extern double IndexBuildHeapScan(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, bool allow_sync, IndexBuildCallback callback, - void *callback_state); + void *callback_state, + HeapScanDesc scan); extern double IndexBuildHeapRangeScan(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, @@ -120,7 +122,8 @@ extern double IndexBuildHeapRangeScan(Relation heapRelation, BlockNumber start_blockno, BlockNumber end_blockno, IndexBuildCallback callback, - void *callback_state); + void *callback_state, + HeapScanDesc scan); extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 54ee273747..429c055489 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -241,6 +241,7 @@ extern bool enableFsync; extern PGDLLIMPORT bool allowSystemTableMods; extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT int maintenance_work_mem; +extern PGDLLIMPORT int max_parallel_maintenance_workers; extern int VacuumCostPageHit; extern int VacuumCostPageMiss; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1bf67455e0..a2a2a9f3d4 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -132,11 +132,12 @@ typedef struct ExprState * ReadyForInserts is it valid for inserts? * Concurrent are we doing a concurrent index build? * BrokenHotChain did we detect any broken HOT chains? + * ParallelWorkers # of workers requested (excludes leader) * AmCache private cache area for index AM * Context memory context holding this IndexInfo * - * ii_Concurrent and ii_BrokenHotChain are used only during index build; - * they're conventionally set to false otherwise. + * ii_Concurrent, ii_BrokenHotChain, and ii_ParallelWorkers are used only + * during index build; they're conventionally zeroed otherwise. * ---------------- */ typedef struct IndexInfo @@ -158,6 +159,7 @@ typedef struct IndexInfo bool ii_ReadyForInserts; bool ii_Concurrent; bool ii_BrokenHotChain; + int ii_ParallelWorkers; Oid ii_Am; void *ii_AmCache; MemoryContext ii_Context; diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 0072b7aa0d..b6be259ff7 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -55,7 +55,7 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed, extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel); extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages, - double index_pages); + double index_pages, int max_workers); extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, Path *bitmapqual); extern void generate_partition_wise_join_paths(PlannerInfo *root, diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 29173d36c4..0d8b88d78b 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -56,6 +56,7 @@ extern Expr *expression_planner(Expr *expr); extern Expr *preprocess_phv_expression(PlannerInfo *root, Expr *expr); extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid); +extern int plan_create_index_workers(Oid tableOid, Oid indexOid); extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti, bool *part_cols_updated); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 3d3c0b64fc..be2f59239b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -826,6 +826,7 @@ typedef enum WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_PARALLEL_BITMAP_SCAN, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN, WAIT_EVENT_PROCARRAY_GROUP_UPDATE, WAIT_EVENT_CLOG_GROUP_UPDATE, WAIT_EVENT_REPLICATION_ORIGIN_DROP, diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index a3df056a61..a6cdeb451c 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -43,6 +43,8 @@ extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size); extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence); extern void BufFileTell(BufFile *file, int *fileno, off_t *offset); extern int BufFileSeekBlock(BufFile *file, long blknum); +extern off_t BufFileSize(BufFile *file); +extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); extern void BufFileExportShared(BufFile *file); diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index db5ca16679..4244e7b1fd 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -78,6 +78,7 @@ extern char *FilePathName(File file); extern int FileGetRawDesc(File file); extern int FileGetRawFlags(File file); extern mode_t FileGetRawMode(File file); +extern off_t FileGetSize(File file); /* Operations used for sharing named temporary files */ extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 88662c10a4..9bf1d80142 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -16,15 +16,49 @@ #ifndef LOGTAPE_H #define LOGTAPE_H +#include "storage/sharedfileset.h" + /* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */ typedef struct LogicalTapeSet LogicalTapeSet; +/* + * The approach tuplesort.c takes to parallel external sorts is that workers, + * whose state is almost the same as independent serial sorts, are made to + * produce a final materialized tape of sorted output in all cases. This is + * frozen, just like any case requiring a final materialized tape. However, + * there is one difference, which is that freezing will also export an + * underlying shared fileset BufFile for sharing. Freezing produces TapeShare + * metadata for the worker when this happens, which is passed along through + * shared memory to leader. + * + * The leader process can then pass an array of TapeShare metadata (one per + * worker participant) to LogicalTapeSetCreate(), alongside a handle to a + * shared fileset, which is sufficient to construct a new logical tapeset that + * consists of each of the tapes materialized by workers. + * + * Note that while logtape.c does create an empty leader tape at the end of the + * tapeset in the leader case, it can never be written to due to a restriction + * in the shared buffile infrastructure. + */ +typedef struct TapeShare +{ + /* + * firstblocknumber is first block that should be read from materialized + * tape. + * + * buffilesize is the size of associated BufFile following freezing. + */ + long firstblocknumber; + off_t buffilesize; +} TapeShare; + /* * prototypes for functions in logtape.c */ -extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes); +extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared, + SharedFileSet *fileset, int worker); extern void LogicalTapeSetClose(LogicalTapeSet *lts); extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts); extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, @@ -34,7 +68,8 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size); extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); -extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum); +extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, + TapeShare *share); extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 5d57c503ab..d2e6754f04 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -8,7 +8,8 @@ * if necessary). It works efficiently for both small and large amounts * of data. Small amounts are sorted in-memory using qsort(). Large * amounts are sorted using temporary files and a standard external sort - * algorithm. + * algorithm. Parallel sorts use a variant of this external sort + * algorithm, and are typically only used for large amounts of data. * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -23,13 +24,39 @@ #include "access/itup.h" #include "executor/tuptable.h" #include "fmgr.h" +#include "storage/dsm.h" #include "utils/relcache.h" -/* Tuplesortstate is an opaque type whose details are not known outside - * tuplesort.c. +/* + * Tuplesortstate and Sharedsort are opaque types whose details are not + * known outside tuplesort.c. */ typedef struct Tuplesortstate Tuplesortstate; +typedef struct Sharedsort Sharedsort; + +/* + * Tuplesort parallel coordination state, allocated by each participant in + * local memory. Participant caller initializes everything. See usage notes + * below. + */ +typedef struct SortCoordinateData +{ + /* Worker process? If not, must be leader. */ + bool isWorker; + + /* + * Leader-process-passed number of participants known launched (workers + * set this to -1). Includes state within leader needed for it to + * participate as a worker, if any. + */ + int nParticipants; + + /* Private opaque state (points to shared memory) */ + Sharedsort *sharedsort; +} SortCoordinateData; + +typedef struct SortCoordinateData *SortCoordinate; /* * Data structures for reporting sort statistics. Note that @@ -66,6 +93,8 @@ typedef struct TuplesortInstrumentation * sorting HeapTuples and two more for sorting IndexTuples. Yet another * API supports sorting bare Datums. * + * Serial sort callers should pass NULL for their coordinate argument. + * * The "heap" API actually stores/sorts MinimalTuples, which means it doesn't * preserve the system columns (tuple identity and transaction visibility * info). The sort keys are specified by column numbers within the tuples @@ -84,30 +113,107 @@ typedef struct TuplesortInstrumentation * * The "index_hash" API is similar to index_btree, but the tuples are * actually sorted by their hash codes not the raw data. + * + * Parallel sort callers are required to coordinate multiple tuplesort states + * in a leader process and one or more worker processes. The leader process + * must launch workers, and have each perform an independent "partial" + * tuplesort, typically fed by the parallel heap interface. The leader later + * produces the final output (internally, it merges runs output by workers). + * + * Callers must do the following to perform a sort in parallel using multiple + * worker processes: + * + * 1. Request tuplesort-private shared memory for n workers. Use + * tuplesort_estimate_shared() to get the required size. + * 2. Have leader process initialize allocated shared memory using + * tuplesort_initialize_shared(). Launch workers. + * 3. Initialize a coordinate argument within both the leader process, and + * for each worker process. This has a pointer to the shared + * tuplesort-private structure, as well as some caller-initialized fields. + * Leader's coordinate argument reliably indicates number of workers + * launched (this is unused by workers). + * 4. Begin a tuplesort using some appropriate tuplesort_begin* routine, + * (passing the coordinate argument) within each worker. The workMem + * arguments need not be identical. All other arguments should match + * exactly, though. + * 5. tuplesort_attach_shared() should be called by all workers. Feed tuples + * to each worker, and call tuplesort_performsort() within each when input + * is exhausted. + * 6. Call tuplesort_end() in each worker process. Worker processes can shut + * down once tuplesort_end() returns. + * 7. Begin a tuplesort in the leader using the same tuplesort_begin* + * routine, passing a leader-appropriate coordinate argument (this can + * happen as early as during step 3, actually, since we only need to know + * the number of workers successfully launched). The leader must now wait + * for workers to finish. Caller must use own mechanism for ensuring that + * next step isn't reached until all workers have called and returned from + * tuplesort_performsort(). (Note that it's okay if workers have already + * also called tuplesort_end() by then.) + * 8. Call tuplesort_performsort() in leader. Consume output using the + * appropriate tuplesort_get* routine. Leader can skip this step if + * tuplesort turns out to be unnecessary. + * 9. Call tuplesort_end() in leader. + * + * This division of labor assumes nothing about how input tuples are produced, + * but does require that caller combine the state of multiple tuplesorts for + * any purpose other than producing the final output. For example, callers + * must consider that tuplesort_get_stats() reports on only one worker's role + * in a sort (or the leader's role), and not statistics for the sort as a + * whole. + * + * Note that callers may use the leader process to sort runs as if it was an + * independent worker process (prior to the process performing a leader sort + * to produce the final sorted output). Doing so only requires a second + * "partial" tuplesort within the leader process, initialized like that of a + * worker process. The steps above don't touch on this directly. The only + * difference is that the tuplesort_attach_shared() call is never needed within + * leader process, because the backend as a whole holds the shared fileset + * reference. A worker Tuplesortstate in leader is expected to do exactly the + * same amount of total initial processing work as a worker process + * Tuplesortstate, since the leader process has nothing else to do before + * workers finish. + * + * Note that only a very small amount of memory will be allocated prior to + * the leader state first consuming input, and that workers will free the + * vast majority of their memory upon returning from tuplesort_performsort(). + * Callers can rely on this to arrange for memory to be used in a way that + * respects a workMem-style budget across an entire parallel sort operation. + * + * Callers are responsible for parallel safety in general. However, they + * can at least rely on there being no parallel safety hazards within + * tuplesort, because tuplesort thinks of the sort as several independent + * sorts whose results are combined. Since, in general, the behavior of + * sort operators is immutable, caller need only worry about the parallel + * safety of whatever the process is through which input tuples are + * generated (typically, caller uses a parallel heap scan). */ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc, - Relation indexRel, - int workMem, bool randomAccess); + Relation indexRel, int workMem, + SortCoordinate coordinate, bool randomAccess); extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel, Relation indexRel, bool enforceUnique, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel, Relation indexRel, uint32 high_mask, uint32 low_mask, uint32 max_buckets, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound); @@ -141,10 +247,16 @@ extern const char *tuplesort_space_type_name(TuplesortSpaceType t); extern int tuplesort_merge_order(int64 allowedMem); +extern Size tuplesort_estimate_shared(int nworkers); +extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, + dsm_segment *seg); +extern void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg); + /* * These routines may only be called if randomAccess was specified 'true'. * Likewise, backwards scan in gettuple/getdatum is only allowed if - * randomAccess was specified. + * randomAccess was specified. Note that parallel sorts do not support + * randomAccess. */ extern void tuplesort_rescan(Tuplesortstate *state); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a42ff9794a..d4765ce3b0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -165,6 +165,7 @@ BTArrayKeyInfo BTBuildState BTCycleId BTIndexStat +BTLeader BTMetaPageData BTOneVacInfo BTPS_State @@ -178,6 +179,7 @@ BTScanOpaqueData BTScanPos BTScanPosData BTScanPosItem +BTShared BTSortArrayContext BTSpool BTStack @@ -2047,6 +2049,7 @@ SharedSortInfo SharedTuplestore SharedTuplestoreAccessor SharedTypmodTableEntry +Sharedsort ShellTypeInfo ShippableCacheEntry ShippableCacheKey @@ -2091,6 +2094,8 @@ Sort SortBy SortByDir SortByNulls +SortCoordinate +SortCoordinateData SortGroupClause SortItem SortPath @@ -2234,6 +2239,7 @@ TableSpaceOpts TablespaceList TablespaceListCell TapeBlockTrailer +TapeShare TarMethodData TarMethodFile TargetEntry