From 1804284042e659e7d16904e7bbb0ad546394b6a3 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 20 Dec 2017 23:39:21 -0800 Subject: [PATCH] Add parallel-aware hash joins. Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel Hash Join with Parallel Hash. While hash joins could already appear in parallel queries, they were previously always parallel-oblivious and had a partial subplan only on the outer side, meaning that the work of the inner subplan was duplicated in every worker. After this commit, the planner will consider using a partial subplan on the inner side too, using the Parallel Hash node to divide the work over the available CPU cores and combine its results in shared memory. If the join needs to be split into multiple batches in order to respect work_mem, then workers process different batches as much as possible and then work together on the remaining batches. The advantages of a parallel-aware hash join over a parallel-oblivious hash join used in a parallel query are that it: * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * divides the work of building the hash table over the CPUs One disadvantage is that there is some communication between the participating CPUs which might outweigh the benefits of parallelism in the case of small hash tables. This is avoided by the planner's existing reluctance to supply partial plans for small scans, but it may be necessary to estimate synchronization costs in future if that situation changes. Another is that outer batch 0 must be written to disk if multiple batches are required. A potential future advantage of parallel-aware hash joins is that right and full outer joins could be supported, since there is a single set of matched bits for each hashtable, but that is not yet implemented. A new GUC enable_parallel_hash is defined to control the feature, defaulting to on. Author: Thomas Munro Reviewed-By: Andres Freund, Robert Haas Tested-By: Rafia Sabih, Prabhat Sahu Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com --- doc/src/sgml/config.sgml | 15 + doc/src/sgml/monitoring.sgml | 62 +- src/backend/executor/execParallel.c | 21 + src/backend/executor/execProcnode.c | 3 + src/backend/executor/nodeHash.c | 1651 ++++++++++++++++- src/backend/executor/nodeHashjoin.c | 617 +++++- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/outfuncs.c | 1 + src/backend/nodes/readfuncs.c | 1 + src/backend/optimizer/path/costsize.c | 25 +- src/backend/optimizer/path/joinpath.c | 36 +- src/backend/optimizer/plan/createplan.c | 11 + src/backend/optimizer/util/pathnode.c | 5 +- src/backend/postmaster/pgstat.c | 45 + src/backend/utils/misc/guc.c | 10 +- src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/executor/hashjoin.h | 175 +- src/include/executor/nodeHash.h | 24 +- src/include/executor/nodeHashjoin.h | 6 + src/include/nodes/execnodes.h | 6 + src/include/nodes/plannodes.h | 1 + src/include/nodes/relation.h | 2 + src/include/optimizer/cost.h | 4 +- src/include/optimizer/pathnode.h | 1 + src/include/pgstat.h | 15 + src/include/storage/lwlock.h | 1 + src/test/regress/expected/join.out | 304 ++- src/test/regress/expected/sysviews.out | 3 +- src/test/regress/sql/join.sql | 148 +- src/tools/pgindent/typedefs.list | 4 + 30 files changed, 3087 insertions(+), 112 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 533faf060d..e4a01699e4 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3647,6 +3647,21 @@ ANY num_sync ( + enable_parallel_hash (boolean) + + enable_parallel_hash configuration parameter + + + + + Enables or disables the query planner's use of hash-join plan + types with parallel hash. Has no effect if hash-join plans are not + also enabled. The default is on. + + + + enable_partition_wise_join (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index b6f80d9708..8a9793644f 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1263,7 +1263,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting in an extension. - IPC + IPC BgWorkerShutdown Waiting for background worker to shut down. @@ -1279,6 +1279,66 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ExecuteGather Waiting for activity from child process when executing Gather node. + + Hash/Batch/Allocating + Waiting for an elected Parallel Hash participant to allocate a hash table. + + + Hash/Batch/Electing + Electing a Parallel Hash participant to allocate a hash table. + + + Hash/Batch/Loading + Waiting for other Parallel Hash participants to finish loading a hash table. + + + Hash/Build/Allocating + Waiting for an elected Parallel Hash participant to allocate the initial hash table. + + + Hash/Build/Electing + Electing a Parallel Hash participant to allocate the initial hash table. + + + Hash/Build/HashingInner + Waiting for other Parallel Hash participants to finish hashing the inner relation. + + + Hash/Build/HashingOuter + Waiting for other Parallel Hash participants to finish partitioning the outer relation. + + + Hash/GrowBatches/Allocating + Waiting for an elected Parallel Hash participant to allocate more batches. + + + Hash/GrowBatches/Deciding + Electing a Parallel Hash participant to decide on future batch growth. + + + Hash/GrowBatches/Electing + Electing a Parallel Hash participant to allocate more batches. + + + Hash/GrowBatches/Finishing + Waiting for an elected Parallel Hash participant to decide on future batch growth. + + + Hash/GrowBatches/Repartitioning + Waiting for other Parallel Hash participants to finishing repartitioning. + + + Hash/GrowBuckets/Allocating + Waiting for an elected Parallel Hash participant to finish allocating more buckets. + + + Hash/GrowBuckets/Electing + Electing a Parallel Hash participant to allocate more buckets. + + + Hash/GrowBuckets/Reinserting + Waiting for other Parallel Hash participants to finish inserting tuples into new buckets. + LogicalSyncData Waiting for logical replication remote server to send data for initial table synchronization. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 604f4f5b61..b344d4b589 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -31,6 +31,7 @@ #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeHash.h" +#include "executor/nodeHashjoin.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeSeqscan.h" @@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, e->pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinEstimate((HashJoinState *) planstate, + e->pcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashEstimate((HashState *) planstate, e->pcxt); @@ -474,6 +480,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, d->pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinInitializeDSM((HashJoinState *) planstate, + d->pcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashInitializeDSM((HashState *) planstate, d->pcxt); @@ -898,6 +909,11 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinReInitializeDSM((HashJoinState *) planstate, + pcxt); + break; case T_HashState: case T_SortState: /* these nodes have DSM state, but no reinitialization is required */ @@ -1196,6 +1212,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, pwcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinInitializeWorker((HashJoinState *) planstate, + pwcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashInitializeWorker((HashState *) planstate, pwcxt); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index fcb8b56999..699dc69179 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -770,6 +770,9 @@ ExecShutdownNode(PlanState *node) case T_HashState: ExecShutdownHash((HashState *) node); break; + case T_HashJoinState: + ExecShutdownHashJoin((HashJoinState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index afd7384e94..4284e8682a 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -10,6 +10,8 @@ * IDENTIFICATION * src/backend/executor/nodeHash.c * + * See note on parallelism in nodeHashjoin.c. + * *------------------------------------------------------------------------- */ /* @@ -25,6 +27,7 @@ #include #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/pg_statistic.h" #include "commands/tablespace.h" #include "executor/execdebug.h" @@ -32,6 +35,8 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "port/atomics.h" #include "utils/dynahash.h" #include "utils/memutils.h" #include "utils/lsyscache.h" @@ -40,6 +45,8 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); +static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -49,6 +56,30 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); static void *dense_alloc(HashJoinTable hashtable, Size size); +static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, + size_t size, + dsa_pointer *shared); +static void MultiExecPrivateHash(HashState *node); +static void MultiExecParallelHash(HashState *node); +static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, + int bucketno); +static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, + HashJoinTuple tuple); +static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared); +static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); +static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); +static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); +static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); +static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, + dsa_pointer *shared); +static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, + int batchno, + size_t size); +static void ExecParallelHashMergeCounters(HashJoinTable hashtable); +static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); + /* ---------------------------------------------------------------- * ExecHash @@ -72,6 +103,39 @@ ExecHash(PlanState *pstate) */ Node * MultiExecHash(HashState *node) +{ + /* must provide our own instrumentation support */ + if (node->ps.instrument) + InstrStartNode(node->ps.instrument); + + if (node->parallel_state != NULL) + MultiExecParallelHash(node); + else + MultiExecPrivateHash(node); + + /* must provide our own instrumentation support */ + if (node->ps.instrument) + InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); + + /* + * We do not return the hash table directly because it's not a subtype of + * Node, and so would violate the MultiExecProcNode API. Instead, our + * parent Hashjoin node is expected to know how to fish it out of our node + * state. Ugly but not really worth cleaning up, since Hashjoin knows + * quite a bit more about Hash besides that. + */ + return NULL; +} + +/* ---------------------------------------------------------------- + * MultiExecPrivateHash + * + * parallel-oblivious version, building a backend-private + * hash table and (if necessary) batch files. + * ---------------------------------------------------------------- + */ +static void +MultiExecPrivateHash(HashState *node) { PlanState *outerNode; List *hashkeys; @@ -80,10 +144,6 @@ MultiExecHash(HashState *node) ExprContext *econtext; uint32 hashvalue; - /* must provide our own instrumentation support */ - if (node->ps.instrument) - InstrStartNode(node->ps.instrument); - /* * get state info from node */ @@ -138,18 +198,147 @@ MultiExecHash(HashState *node) if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - /* must provide our own instrumentation support */ - if (node->ps.instrument) - InstrStopNode(node->ps.instrument, hashtable->totalTuples); + hashtable->partialTuples = hashtable->totalTuples; +} + +/* ---------------------------------------------------------------- + * MultiExecParallelHash + * + * parallel-aware version, building a shared hash table and + * (if necessary) batch files using the combined effort of + * a set of co-operating backends. + * ---------------------------------------------------------------- + */ +static void +MultiExecParallelHash(HashState *node) +{ + ParallelHashJoinState *pstate; + PlanState *outerNode; + List *hashkeys; + HashJoinTable hashtable; + TupleTableSlot *slot; + ExprContext *econtext; + uint32 hashvalue; + Barrier *build_barrier; + int i; /* - * We do not return the hash table directly because it's not a subtype of - * Node, and so would violate the MultiExecProcNode API. Instead, our - * parent Hashjoin node is expected to know how to fish it out of our node - * state. Ugly but not really worth cleaning up, since Hashjoin knows - * quite a bit more about Hash besides that. + * get state info from node */ - return NULL; + outerNode = outerPlanState(node); + hashtable = node->hashtable; + + /* + * set expression context + */ + hashkeys = node->hashkeys; + econtext = node->ps.ps_ExprContext; + + /* + * Synchronize the parallel hash table build. At this stage we know that + * the shared hash table has been or is being set up by + * ExecHashTableCreate(), but we don't know if our peers have returned + * from there or are here in MultiExecParallelHash(), and if so how far + * through they are. To find out, we check the build_barrier phase then + * and jump to the right step in the build algorithm. + */ + pstate = hashtable->parallel_state; + build_barrier = &pstate->build_barrier; + Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING); + switch (BarrierPhase(build_barrier)) + { + case PHJ_BUILD_ALLOCATING: + + /* + * Either I just allocated the initial hash table in + * ExecHashTableCreate(), or someone else is doing that. Either + * way, wait for everyone to arrive here so we can proceed. + */ + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING); + /* Fall through. */ + + case PHJ_BUILD_HASHING_INNER: + + /* + * It's time to begin hashing, or if we just arrived here then + * hashing is already underway, so join in that effort. While + * hashing we have to be prepared to help increase the number of + * batches or buckets at any time, and if we arrived here when + * that was already underway we'll have to help complete that work + * immediately so that it's safe to access batches and buckets + * below. + */ + if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != + PHJ_GROW_BATCHES_ELECTING) + ExecParallelHashIncreaseNumBatches(hashtable); + if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != + PHJ_GROW_BUCKETS_ELECTING) + ExecParallelHashIncreaseNumBuckets(hashtable); + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + for (;;) + { + slot = ExecProcNode(outerNode); + if (TupIsNull(slot)) + break; + econtext->ecxt_innertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, hashkeys, + false, hashtable->keepNulls, + &hashvalue)) + ExecParallelHashTableInsert(hashtable, slot, hashvalue); + hashtable->partialTuples++; + } + BarrierDetach(&pstate->grow_buckets_barrier); + BarrierDetach(&pstate->grow_batches_barrier); + + /* + * Make sure that any tuples we wrote to disk are visible to + * others before anyone tries to load them. + */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].inner_tuples); + + /* + * Update shared counters. We need an accurate total tuple count + * to control the empty table optimization. + */ + ExecParallelHashMergeCounters(hashtable); + + /* + * Wait for everyone to finish building and flushing files and + * counters. + */ + if (BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_INNER)) + { + /* + * Elect one backend to disable any further growth. Batches + * are now fixed. While building them we made sure they'd fit + * in our memory budget when we load them back in later (or we + * tried to do that and gave up because we detected extreme + * skew). + */ + pstate->growth = PHJ_GROWTH_DISABLED; + } + } + + /* + * We're not yet attached to a batch. We all agree on the dimensions and + * number of inner tuples (for the empty table optimization). + */ + hashtable->curbatch = -1; + hashtable->nbuckets = pstate->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + hashtable->totalTuples = pstate->total_tuples; + ExecParallelHashEnsureBatchAccessors(hashtable); + + /* + * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE + * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't + * there already). + */ + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); } /* ---------------------------------------------------------------- @@ -240,12 +429,15 @@ ExecEndHash(HashState *node) * ---------------------------------------------------------------- */ HashJoinTable -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) { + Hash *node; HashJoinTable hashtable; Plan *outerNode; + size_t space_allowed; int nbuckets; int nbatch; + double rows; int num_skew_mcvs; int log2_nbuckets; int nkeys; @@ -258,10 +450,22 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) * "outer" subtree of this node, but the inner relation of the hashjoin). * Compute the appropriate size of the hash table. */ + node = (Hash *) state->ps.plan; outerNode = outerPlan(node); - ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, + /* + * If this is shared hash table with a partial plan, then we can't use + * outerNode->plan_rows to estimate its size. We need an estimate of the + * total number of rows across all copies of the partial plan. + */ + rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; + + ExecChooseHashTableSize(rows, outerNode->plan_width, OidIsValid(node->skewTable), + state->parallel_state != NULL, + state->parallel_state != NULL ? + state->parallel_state->nparticipants - 1 : 0, + &space_allowed, &nbuckets, &nbatch, &num_skew_mcvs); /* nbuckets must be a power of 2 */ @@ -280,7 +484,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; hashtable->log2_nbuckets_optimal = log2_nbuckets; - hashtable->buckets = NULL; + hashtable->buckets.unshared = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; hashtable->skewBucket = NULL; @@ -293,16 +497,21 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->partialTuples = 0; hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; - hashtable->spaceAllowed = work_mem * 1024L; + hashtable->spaceAllowed = space_allowed; hashtable->spaceUsedSkew = 0; hashtable->spaceAllowedSkew = hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; hashtable->chunks = NULL; + hashtable->current_chunk = NULL; + hashtable->parallel_state = state->parallel_state; + hashtable->area = state->ps.state->es_query_dsa; + hashtable->batches = NULL; #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -351,10 +560,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); - if (nbatch > 1) + if (nbatch > 1 && hashtable->parallel_state == NULL) { /* - * allocate and initialize the file arrays in hashCxt + * allocate and initialize the file arrays in hashCxt (not needed for + * parallel case which uses shared tuplestores instead of raw files) */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); @@ -365,24 +575,78 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) PrepareTempTablespaces(); } - /* - * Prepare context for the first-scan space allocations; allocate the - * hashbucket array therein, and set each bucket "empty". - */ - MemoryContextSwitchTo(hashtable->batchCxt); - - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); - - /* - * Set up for skew optimization, if possible and there's a need for more - * than one batch. (In a one-batch join, there's no point in it.) - */ - if (nbatch > 1) - ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); - MemoryContextSwitchTo(oldcxt); + if (hashtable->parallel_state) + { + ParallelHashJoinState *pstate = hashtable->parallel_state; + Barrier *build_barrier; + + /* + * Attach to the build barrier. The corresponding detach operation is + * in ExecHashTableDetach. Note that we won't attach to the + * batch_barrier for batch 0 yet. We'll attach later and start it out + * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front + * and then loaded while hashing (the standard hybrid hash join + * algorithm), and we'll coordinate that using build_barrier. + */ + build_barrier = &pstate->build_barrier; + BarrierAttach(build_barrier); + + /* + * So far we have no idea whether there are any other participants, + * and if so, what phase they are working on. The only thing we care + * about at this point is whether someone has already created the + * SharedHashJoinBatch objects and the hash table for batch 0. One + * backend will be elected to do that now if necessary. + */ + if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING && + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING)) + { + pstate->nbatch = nbatch; + pstate->space_allowed = space_allowed; + pstate->growth = PHJ_GROWTH_OK; + + /* Set up the shared state for coordinating batches. */ + ExecParallelHashJoinSetUpBatches(hashtable, nbatch); + + /* + * Allocate batch 0's hash table up front so we can load it + * directly while hashing. + */ + pstate->nbuckets = nbuckets; + ExecParallelHashTableAlloc(hashtable, 0); + } + + /* + * The next Parallel Hash synchronization point is in + * MultiExecParallelHash(), which will progress it all the way to + * PHJ_BUILD_DONE. The caller must not return control from this + * executor node between now and then. + */ + } + else + { + /* + * Prepare context for the first-scan space allocations; allocate the + * hashbucket array therein, and set each bucket "empty". + */ + MemoryContextSwitchTo(hashtable->batchCxt); + + hashtable->buckets.unshared = (HashJoinTuple *) + palloc0(nbuckets * sizeof(HashJoinTuple)); + + /* + * Set up for skew optimization, if possible and there's a need for + * more than one batch. (In a one-batch join, there's no point in + * it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + + MemoryContextSwitchTo(oldcxt); + } + return hashtable; } @@ -399,6 +663,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool try_combined_work_mem, + int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs) @@ -433,6 +700,16 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, */ hash_table_bytes = work_mem * 1024L; + /* + * Parallel Hash tries to use the combined work_mem of all workers to + * avoid the need to batch. If that won't work, it falls back to work_mem + * per worker and tries to process batches in parallel. + */ + if (try_combined_work_mem) + hash_table_bytes += hash_table_bytes * parallel_workers; + + *space_allowed = hash_table_bytes; + /* * If skew optimization is possible, estimate the number of skew buckets * that will fit in the memory allowed, and decrement the assumed space @@ -478,7 +755,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * Note that both nbuckets and nbatch must be powers of 2 to make * ExecHashGetBucketAndBatch fast. */ - max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple); + max_pointers = *space_allowed / sizeof(HashJoinTuple); max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); /* If max_pointers isn't a power of 2, must round it down to one */ mppow2 = 1L << my_log2(max_pointers); @@ -510,6 +787,21 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, int minbatch; long bucket_size; + /* + * If Parallel Hash with combined work_mem would still need multiple + * batches, we'll have to fall back to regular work_mem budget. + */ + if (try_combined_work_mem) + { + ExecChooseHashTableSize(ntuples, tupwidth, useskew, + false, parallel_workers, + space_allowed, + numbuckets, + numbatches, + num_skew_mcvs); + return; + } + /* * Estimate the number of buckets we'll want to have when work_mem is * entirely full. Each bucket will contain a bucket pointer plus @@ -564,14 +856,17 @@ ExecHashTableDestroy(HashJoinTable hashtable) /* * Make sure all the temp files are closed. We skip batch 0, since it * can't have any temp files (and the arrays might not even exist if - * nbatch is only 1). + * nbatch is only 1). Parallel hash joins don't use these files. */ - for (i = 1; i < hashtable->nbatch; i++) + if (hashtable->innerBatchFile != NULL) { - if (hashtable->innerBatchFile[i]) - BufFileClose(hashtable->innerBatchFile[i]); - if (hashtable->outerBatchFile[i]) - BufFileClose(hashtable->outerBatchFile[i]); + for (i = 1; i < hashtable->nbatch; i++) + { + if (hashtable->innerBatchFile[i]) + BufFileClose(hashtable->innerBatchFile[i]); + if (hashtable->outerBatchFile[i]) + BufFileClose(hashtable->outerBatchFile[i]); + } } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -657,8 +952,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable->nbuckets = hashtable->nbuckets_optimal; hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; - hashtable->buckets = repalloc(hashtable->buckets, - sizeof(HashJoinTuple) * hashtable->nbuckets); + hashtable->buckets.unshared = + repalloc(hashtable->buckets.unshared, + sizeof(HashJoinTuple) * hashtable->nbuckets); } /* @@ -666,14 +962,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) * buckets now and not have to keep track which tuples in the buckets have * already been processed. We will free the old chunks as we go. */ - memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); + memset(hashtable->buckets.unshared, 0, + sizeof(HashJoinTuple) * hashtable->nbuckets); oldchunks = hashtable->chunks; hashtable->chunks = NULL; /* so, let's scan through the old chunks, and all tuples in each chunk */ while (oldchunks != NULL) { - HashMemoryChunk nextchunk = oldchunks->next; + HashMemoryChunk nextchunk = oldchunks->next.unshared; /* position within the buffer (up to oldchunks->used) */ size_t idx = 0; @@ -700,8 +997,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, hashTupleSize); /* and add it back to the appropriate bucket */ - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = copyTuple; } else { @@ -750,6 +1047,380 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } } +/* + * ExecParallelHashIncreaseNumBatches + * Every participant attached to grow_barrier must run this function + * when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. + */ +static void +ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * It's unlikely, but we need to be prepared for new participants to show + * up while we're in the middle of this operation so we need to switch on + * barrier phase here. + */ + switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) + { + case PHJ_GROW_BATCHES_ELECTING: + + /* + * Elect one participant to prepare to grow the number of batches. + * This involves reallocating or resetting the buckets of batch 0 + * in preparation for all participants to begin repartitioning the + * tuples. + */ + if (BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_ELECTING)) + { + dsa_pointer_atomic *buckets; + ParallelHashJoinBatch *old_batch0; + int new_nbatch; + int i; + + /* Move the old batch out of the way. */ + old_batch0 = hashtable->batches[0].shared; + pstate->old_batches = pstate->batches; + pstate->old_nbatch = hashtable->nbatch; + pstate->batches = InvalidDsaPointer; + + /* Free this backend's old accessors. */ + ExecParallelHashCloseBatchAccessors(hashtable); + + /* Figure out how many batches to use. */ + if (hashtable->nbatch == 1) + { + /* + * We are going from single-batch to multi-batch. We need + * to switch from one large combined memory budget to the + * regular work_mem budget. + */ + pstate->space_allowed = work_mem * 1024L; + + /* + * The combined work_mem of all participants wasn't + * enough. Therefore one batch per participant would be + * approximately equivalent and would probably also be + * insufficient. So try two batches per particiant, + * rounded up to a power of two. + */ + new_nbatch = 1 << my_log2(pstate->nparticipants * 2); + } + else + { + /* + * We were already multi-batched. Try doubling the number + * of batches. + */ + new_nbatch = hashtable->nbatch * 2; + } + + /* Allocate new larger generation of batches. */ + Assert(hashtable->nbatch == pstate->nbatch); + ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); + Assert(hashtable->nbatch == pstate->nbatch); + + /* Replace or recycle batch 0's bucket array. */ + if (pstate->old_nbatch == 1) + { + double dtuples; + double dbuckets; + int new_nbuckets; + + /* + * We probably also need a smaller bucket array. How many + * tuples do we expect per batch, assuming we have only + * half of them so far? Normally we don't need to change + * the bucket array's size, because the size of each batch + * stays the same as we add more batches, but in this + * special case we move from a large batch to many smaller + * batches and it would be wasteful to keep the large + * array. + */ + dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; + dbuckets = ceil(dtuples / NTUP_PER_BUCKET); + dbuckets = Min(dbuckets, + MaxAllocSize / sizeof(dsa_pointer_atomic)); + new_nbuckets = (int) dbuckets; + new_nbuckets = Max(new_nbuckets, 1024); + new_nbuckets = 1 << my_log2(new_nbuckets); + dsa_free(hashtable->area, old_batch0->buckets); + hashtable->batches[0].shared->buckets = + dsa_allocate(hashtable->area, + sizeof(dsa_pointer_atomic) * new_nbuckets); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[0].shared->buckets); + for (i = 0; i < new_nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); + pstate->nbuckets = new_nbuckets; + } + else + { + /* Recycle the existing bucket array. */ + hashtable->batches[0].shared->buckets = old_batch0->buckets; + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, old_batch0->buckets); + for (i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); + } + + /* Move all chunks to the work queue for parallel processing. */ + pstate->chunk_work_queue = old_batch0->chunks; + + /* Disable further growth temporarily while we're growing. */ + pstate->growth = PHJ_GROWTH_DISABLED; + } + else + { + /* All other participants just flush their tuples to disk. */ + ExecParallelHashCloseBatchAccessors(hashtable); + } + /* Fall through. */ + + case PHJ_GROW_BATCHES_ALLOCATING: + /* Wait for the above to be finished. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING); + /* Fall through. */ + + case PHJ_GROW_BATCHES_REPARTITIONING: + /* Make sure that we have the current dimensions and buckets. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + /* Then partition, flush counters. */ + ExecParallelHashRepartitionFirst(hashtable); + ExecParallelHashRepartitionRest(hashtable); + ExecParallelHashMergeCounters(hashtable); + /* Wait for the above to be finished. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING); + /* Fall through. */ + + case PHJ_GROW_BATCHES_DECIDING: + + /* + * Elect one participant to clean up and decide whether further + * repartitioning is needed, or should be disabled because it's + * not helping. + */ + if (BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_DECIDING)) + { + bool space_exhausted = false; + bool extreme_skew_detected = false; + + /* Make sure that we have the current dimensions and buckets. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + + /* Are any of the new generation of batches exhausted? */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatch *batch = hashtable->batches[i].shared; + + if (batch->space_exhausted || + batch->estimated_size > pstate->space_allowed) + { + int parent; + + space_exhausted = true; + + /* + * Did this batch receive ALL of the tuples from its + * parent batch? That would indicate that further + * repartitioning isn't going to help (the hash values + * are probably all the same). + */ + parent = i % pstate->old_nbatch; + if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) + extreme_skew_detected = true; + } + } + + /* Don't keep growing if it's not helping or we'd overflow. */ + if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) + pstate->growth = PHJ_GROWTH_DISABLED; + else if (space_exhausted) + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + else + pstate->growth = PHJ_GROWTH_OK; + + /* Free the old batches in shared memory. */ + dsa_free(hashtable->area, pstate->old_batches); + pstate->old_batches = InvalidDsaPointer; + } + /* Fall through. */ + + case PHJ_GROW_BATCHES_FINISHING: + /* Wait for the above to complete. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_FINISHING); + } +} + +/* + * Repartition the tuples currently loaded into memory for inner batch 0 + * because the number of batches has been increased. Some tuples are retained + * in memory and some are written out to a later batch. + */ +static void +ExecParallelHashRepartitionFirst(HashJoinTable hashtable) +{ + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + + Assert(hashtable->nbatch = hashtable->parallel_state->nbatch); + + while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) + { + size_t idx = 0; + + /* Repartition all tuples in this chunk. */ + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + HashJoinTuple copyTuple; + dsa_pointer shared; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + Assert(batchno < hashtable->nbatch); + if (batchno == 0) + { + /* It still belongs in batch 0. Copy to a new chunk. */ + copyTuple = + ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + copyTuple->hashvalue = hashTuple->hashvalue; + memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + copyTuple, shared); + } + else + { + size_t tuple_size = + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + /* It belongs in a later batch. */ + hashtable->batches[batchno].estimated_size += tuple_size; + sts_puttuple(hashtable->batches[batchno].inner_tuples, + &hashTuple->hashvalue, tuple); + } + + /* Count this tuple. */ + ++hashtable->batches[0].old_ntuples; + ++hashtable->batches[batchno].ntuples; + + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + + /* Free this chunk. */ + dsa_free(hashtable->area, chunk_shared); + + CHECK_FOR_INTERRUPTS(); + } +} + +/* + * Help repartition inner batches 1..n. + */ +static void +ExecParallelHashRepartitionRest(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int old_nbatch = pstate->old_nbatch; + SharedTuplestoreAccessor **old_inner_tuples; + ParallelHashJoinBatch *old_batches; + int i; + + /* Get our hands on the previous generation of batches. */ + old_batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->old_batches); + old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch); + for (i = 1; i < old_nbatch; ++i) + { + ParallelHashJoinBatch *shared = + NthParallelHashJoinBatch(old_batches, i); + + old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + } + + /* Join in the effort to repartition them. */ + for (i = 1; i < old_nbatch; ++i) + { + MinimalTuple tuple; + uint32 hashvalue; + + /* Scan one partition from the previous generation. */ + sts_begin_parallel_scan(old_inner_tuples[i]); + while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) + { + size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + int bucketno; + int batchno; + + /* Decide which partition it goes to in the new generation. */ + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + + hashtable->batches[batchno].estimated_size += tuple_size; + ++hashtable->batches[batchno].ntuples; + ++hashtable->batches[i].old_ntuples; + + /* Store the tuple its new batch. */ + sts_puttuple(hashtable->batches[batchno].inner_tuples, + &hashvalue, tuple); + + CHECK_FOR_INTERRUPTS(); + } + sts_end_parallel_scan(old_inner_tuples[i]); + } + + pfree(old_inner_tuples); +} + +/* + * Transfer the backend-local per-batch counters to the shared totals. + */ +static void +ExecParallelHashMergeCounters(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + pstate->total_tuples = 0; + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; + + batch->shared->size += batch->size; + batch->shared->estimated_size += batch->estimated_size; + batch->shared->ntuples += batch->ntuples; + batch->shared->old_ntuples += batch->old_ntuples; + batch->size = 0; + batch->estimated_size = 0; + batch->ntuples = 0; + batch->old_ntuples = 0; + pstate->total_tuples += batch->shared->ntuples; + } + LWLockRelease(&pstate->lock); +} + /* * ExecHashIncreaseNumBuckets * increase the original number of buckets in order to reduce @@ -782,14 +1453,15 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) * ExecHashIncreaseNumBatches, but without all the copying into new * chunks) */ - hashtable->buckets = - (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->buckets.unshared = + (HashJoinTuple *) repalloc(hashtable->buckets.unshared, hashtable->nbuckets * sizeof(HashJoinTuple)); - memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple)); + memset(hashtable->buckets.unshared, 0, + hashtable->nbuckets * sizeof(HashJoinTuple)); /* scan through all tuples in all chunks to rebuild the hash table */ - for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) + for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) { /* process all tuples stored in this chunk */ size_t idx = 0; @@ -804,8 +1476,8 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) &bucketno, &batchno); /* add the tuple to the proper bucket */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = hashTuple; /* advance index past the tuple */ idx += MAXALIGN(HJTUPLE_OVERHEAD + @@ -817,6 +1489,93 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) } } +static void +ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + HashMemoryChunk chunk; + dsa_pointer chunk_s; + + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * It's unlikely, but we need to be prepared for new participants to show + * up while we're in the middle of this operation so we need to switch on + * barrier phase here. + */ + switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) + { + case PHJ_GROW_BUCKETS_ELECTING: + /* Elect one participant to prepare to increase nbuckets. */ + if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING)) + { + size_t size; + dsa_pointer_atomic *buckets; + + /* Double the size of the bucket array. */ + pstate->nbuckets *= 2; + size = pstate->nbuckets * sizeof(dsa_pointer_atomic); + hashtable->batches[0].shared->size += size / 2; + dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); + hashtable->batches[0].shared->buckets = + dsa_allocate(hashtable->area, size); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[0].shared->buckets); + for (i = 0; i < pstate->nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); + + /* Put the chunk list onto the work queue. */ + pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; + + /* Clear the flag. */ + pstate->growth = PHJ_GROWTH_OK; + } + /* Fall through. */ + + case PHJ_GROW_BUCKETS_ALLOCATING: + /* Wait for the above to complete. */ + BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING); + /* Fall through. */ + + case PHJ_GROW_BUCKETS_REINSERTING: + /* Reinsert all tuples into the hash table. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) + { + size_t idx = 0; + + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + Assert(batchno == 0); + + /* add the tuple to the proper bucket */ + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); + + /* advance index past the tuple */ + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING); + } +} /* * ExecHashTableInsert @@ -869,8 +1628,8 @@ ExecHashTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = hashTuple; /* * Increase the (optimal) number of buckets if we just exceeded the @@ -910,6 +1669,94 @@ ExecHashTableInsert(HashJoinTable hashtable, } } +/* + * ExecHashTableParallelInsert + * insert a tuple into a shared hash table or shared batch tuplestore + */ +void +ExecParallelHashTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + dsa_pointer shared; + int bucketno; + int batchno; + +retry: + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + + if (batchno == 0) + { + HashJoinTuple hashTuple; + + /* Try to load it into memory. */ + Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == + PHJ_BUILD_HASHING_INNER); + hashTuple = ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + if (hashTuple == NULL) + goto retry; + + /* Store the hash value in the HashJoinTuple header. */ + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + + /* Push it onto the front of the bucket's list */ + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); + } + else + { + size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + Assert(batchno > 0); + + /* Try to preallocate space in the batch if necessary. */ + if (hashtable->batches[batchno].preallocated < tuple_size) + { + if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) + goto retry; + } + + Assert(hashtable->batches[batchno].preallocated >= tuple_size); + hashtable->batches[batchno].preallocated -= tuple_size; + sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, + tuple); + } + ++hashtable->batches[batchno].ntuples; +} + +/* + * Insert a tuple into the current hash table. Unlike + * ExecParallelHashTableInsert, this version is not prepared to send the tuple + * to other batches or to run out of memory, and should only be called with + * tuples that belong in the current batch once growth has been disabled. + */ +void +ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + HashJoinTuple hashTuple; + dsa_pointer shared; + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + Assert(batchno == hashtable->curbatch); + hashTuple = ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); +} + /* * ExecHashGetHashValue * Compute the hash value for a tuple @@ -1076,11 +1923,11 @@ ExecScanHashBucket(HashJoinState *hjstate, * otherwise scan the standard hashtable bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; else - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; while (hashTuple != NULL) { @@ -1104,7 +1951,67 @@ ExecScanHashBucket(HashJoinState *hjstate, } } - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; + } + + /* + * no match + */ + return false; +} + +/* + * ExecParallelScanHashBucket + * scan a hash bucket for matches to the current outer tuple + * + * The current outer tuple must be stored in econtext->ecxt_outertuple. + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashBucket(HashJoinState *hjstate, + ExprContext *econtext) +{ + ExprState *hjclauses = hjstate->hashclauses; + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + uint32 hashvalue = hjstate->hj_CurHashValue; + + /* + * hj_CurTuple is the address of the tuple last returned from the current + * bucket, or NULL if it's time to start scanning a new bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo); + + while (hashTuple != NULL) + { + if (hashTuple->hashvalue == hashvalue) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot so ExecQual sees it */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* reset temp memory each time to avoid leaks from qual expr */ + ResetExprContext(econtext); + + if (ExecQual(hjclauses, econtext)) + { + hjstate->hj_CurTuple = hashTuple; + return true; + } + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); } /* @@ -1155,10 +2062,10 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) * bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) { - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; hjstate->hj_CurBucketNo++; } else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) @@ -1194,7 +2101,7 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return true; } - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; } /* allow this loop to be cancellable */ @@ -1226,7 +2133,7 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) + hashtable->buckets.unshared = (HashJoinTuple *) palloc0(nbuckets * sizeof(HashJoinTuple)); hashtable->spaceUsed = 0; @@ -1250,7 +2157,8 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) /* Reset all flags in the main table ... */ for (i = 0; i < hashtable->nbuckets; i++) { - for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next) + for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; + tuple = tuple->next.unshared) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } @@ -1260,7 +2168,7 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) int j = hashtable->skewBucketNums[i]; HashSkewBucket *skewBucket = hashtable->skewBucket[j]; - for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next) + for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } } @@ -1505,8 +2413,9 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the skew bucket's list */ - hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; + hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; hashtable->skewBucket[bucketNumber]->tuples = hashTuple; + Assert(hashTuple != hashTuple->next.unshared); /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; @@ -1554,7 +2463,7 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) hashTuple = bucket->tuples; while (hashTuple != NULL) { - HashJoinTuple nextHashTuple = hashTuple->next; + HashJoinTuple nextHashTuple = hashTuple->next.unshared; MinimalTuple tuple; Size tupleSize; @@ -1580,8 +2489,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, tupleSize); pfree(hashTuple); - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = copyTuple; /* We have reduced skew space, but overall space doesn't change */ hashtable->spaceUsedSkew -= tupleSize; @@ -1760,11 +2669,11 @@ dense_alloc(HashJoinTable hashtable, Size size) if (hashtable->chunks != NULL) { newChunk->next = hashtable->chunks->next; - hashtable->chunks->next = newChunk; + hashtable->chunks->next.unshared = newChunk; } else { - newChunk->next = hashtable->chunks; + newChunk->next.unshared = hashtable->chunks; hashtable->chunks = newChunk; } @@ -1789,7 +2698,7 @@ dense_alloc(HashJoinTable hashtable, Size size) newChunk->used = size; newChunk->ntuples = 1; - newChunk->next = hashtable->chunks; + newChunk->next.unshared = hashtable->chunks; hashtable->chunks = newChunk; return newChunk->data; @@ -1803,3 +2712,601 @@ dense_alloc(HashJoinTable hashtable, Size size) /* return pointer to the start of the tuple memory */ return ptr; } + +/* + * Allocate space for a tuple in shared dense storage. This is equivalent to + * dense_alloc but for Parallel Hash using shared memory. + * + * While loading a tuple into shared memory, we might run out of memory and + * decide to repartition, or determine that the load factor is too high and + * decide to expand the bucket array, or discover that another participant has + * commanded us to help do that. Return NULL if number of buckets or batches + * has changed, indicating that the caller must retry (considering the + * possibility that the tuple no longer belongs in the same batch). + */ +static HashJoinTuple +ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, + dsa_pointer *shared) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + Size chunk_size; + HashJoinTuple result; + int curbatch = hashtable->curbatch; + + size = MAXALIGN(size); + + /* + * Fast path: if there is enough space in this backend's current chunk, + * then we can allocate without any locking. + */ + chunk = hashtable->current_chunk; + if (chunk != NULL && + size < HASH_CHUNK_THRESHOLD && + chunk->maxlen - chunk->used >= size) + { + + chunk_shared = hashtable->current_chunk_shared; + Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); + *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; + result = (HashJoinTuple) (chunk->data + chunk->used); + chunk->used += size; + + Assert(chunk->used <= chunk->maxlen); + Assert(result == dsa_get_address(hashtable->area, *shared)); + + return result; + } + + /* Slow path: try to allocate a new chunk. */ + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + + /* + * Check if we need to help increase the number of buckets or batches. + */ + if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || + pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + { + ParallelHashGrowth growth = pstate->growth; + + hashtable->current_chunk = NULL; + LWLockRelease(&pstate->lock); + + /* Another participant has commanded us to help grow. */ + if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) + ExecParallelHashIncreaseNumBatches(hashtable); + else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + ExecParallelHashIncreaseNumBuckets(hashtable); + + /* The caller must retry. */ + return NULL; + } + + /* Oversized tuples get their own chunk. */ + if (size > HASH_CHUNK_THRESHOLD) + chunk_size = size + HASH_CHUNK_HEADER_SIZE; + else + chunk_size = HASH_CHUNK_SIZE; + + /* Check if it's time to grow batches or buckets. */ + if (pstate->growth != PHJ_GROWTH_DISABLED) + { + Assert(curbatch == 0); + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * Check if our space limit would be exceeded. To avoid choking on + * very large tuples or very low work_mem setting, we'll always allow + * each backend to allocate at least one chunk. + */ + if (hashtable->batches[0].at_least_one_chunk && + hashtable->batches[0].shared->size + + chunk_size > pstate->space_allowed) + { + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + hashtable->batches[0].shared->space_exhausted = true; + LWLockRelease(&pstate->lock); + + return NULL; + } + + /* Check if our load factor limit would be exceeded. */ + if (hashtable->nbatch == 1) + { + hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; + hashtable->batches[0].ntuples = 0; + if (hashtable->batches[0].shared->ntuples + 1 > + hashtable->nbuckets * NTUP_PER_BUCKET && + hashtable->nbuckets < (INT_MAX / 2)) + { + pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; + LWLockRelease(&pstate->lock); + + return NULL; + } + } + } + + /* We are cleared to allocate a new chunk. */ + chunk_shared = dsa_allocate(hashtable->area, chunk_size); + hashtable->batches[curbatch].shared->size += chunk_size; + hashtable->batches[curbatch].at_least_one_chunk = true; + + /* Set up the chunk. */ + chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); + *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; + chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; + chunk->used = size; + + /* + * Push it onto the list of chunks, so that it can be found if we need to + * increase the number of buckets or batches (batch 0 only) and later for + * freeing the memory (all batches). + */ + chunk->next.shared = hashtable->batches[curbatch].shared->chunks; + hashtable->batches[curbatch].shared->chunks = chunk_shared; + + if (size <= HASH_CHUNK_THRESHOLD) + { + /* + * Make this the current chunk so that we can use the fast path to + * fill the rest of it up in future calls. + */ + hashtable->current_chunk = chunk; + hashtable->current_chunk_shared = chunk_shared; + } + LWLockRelease(&pstate->lock); + + Assert(chunk->data == dsa_get_address(hashtable->area, *shared)); + result = (HashJoinTuple) chunk->data; + + return result; +} + +/* + * One backend needs to set up the shared batch state including tuplestores. + * Other backends will ensure they have correctly configured accessors by + * called ExecParallelHashEnsureBatchAccessors(). + */ +static void +ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatch *batches; + MemoryContext oldcxt; + int i; + + Assert(hashtable->batches == NULL); + + /* Allocate space. */ + pstate->batches = + dsa_allocate0(hashtable->area, + EstimateParallelHashJoinBatch(hashtable) * nbatch); + pstate->nbatch = nbatch; + batches = dsa_get_address(hashtable->area, pstate->batches); + + /* Use hash join memory context. */ + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + + /* Allocate this backend's accessor array. */ + hashtable->nbatch = nbatch; + hashtable->batches = (ParallelHashJoinBatchAccessor *) + palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); + + /* Set up the shared state, tuplestores and backend-local accessors. */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; + ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + char name[MAXPGPATH]; + + /* + * All members of shared were zero-initialized. We just need to set + * up the Barrier. + */ + BarrierInit(&shared->batch_barrier, 0); + if (i == 0) + { + /* Batch 0 doesn't need to be loaded. */ + BarrierAttach(&shared->batch_barrier); + while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING) + BarrierArriveAndWait(&shared->batch_barrier, 0); + BarrierDetach(&shared->batch_barrier); + } + + /* Initialize accessor state. All members were zero-initialized. */ + accessor->shared = shared; + + /* Initialize the shared tuplestores. */ + snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch); + accessor->inner_tuples = + sts_initialize(ParallelHashJoinBatchInner(shared), + pstate->nparticipants, + ParallelWorkerNumber + 1, + sizeof(uint32), + SHARED_TUPLESTORE_SINGLE_PASS, + &pstate->fileset, + name); + snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch); + accessor->outer_tuples = + sts_initialize(ParallelHashJoinBatchOuter(shared, + pstate->nparticipants), + pstate->nparticipants, + ParallelWorkerNumber + 1, + sizeof(uint32), + SHARED_TUPLESTORE_SINGLE_PASS, + &pstate->fileset, + name); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Free the current set of ParallelHashJoinBatchAccessor objects. + */ +static void +ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) +{ + int i; + + for (i = 0; i < hashtable->nbatch; ++i) + { + /* Make sure no files are left open. */ + sts_end_write(hashtable->batches[i].inner_tuples); + sts_end_write(hashtable->batches[i].outer_tuples); + sts_end_parallel_scan(hashtable->batches[i].inner_tuples); + sts_end_parallel_scan(hashtable->batches[i].outer_tuples); + } + pfree(hashtable->batches); + hashtable->batches = NULL; +} + +/* + * Make sure this backend has up-to-date accessors for the current set of + * batches. + */ +static void +ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatch *batches; + MemoryContext oldcxt; + int i; + + if (hashtable->batches != NULL) + { + if (hashtable->nbatch == pstate->nbatch) + return; + ExecParallelHashCloseBatchAccessors(hashtable); + } + + /* + * It's possible for a backend to start up very late so that the whole + * join is finished and the shm state for tracking batches has already + * been freed by ExecHashTableDetach(). In that case we'll just leave + * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives + * up early. + */ + if (!DsaPointerIsValid(pstate->batches)) + return; + + /* Use hash join memory context. */ + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + + /* Allocate this backend's accessor array. */ + hashtable->nbatch = pstate->nbatch; + hashtable->batches = (ParallelHashJoinBatchAccessor *) + palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); + + /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ + batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->batches); + + /* Set up the accessor array and attach to the tuplestores. */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; + ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + + accessor->shared = shared; + accessor->preallocated = 0; + accessor->done = false; + accessor->inner_tuples = + sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + accessor->outer_tuples = + sts_attach(ParallelHashJoinBatchOuter(shared, + pstate->nparticipants), + ParallelWorkerNumber + 1, + &pstate->fileset); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Allocate an empty shared memory hash table for a given batch. + */ +void +ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) +{ + ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; + dsa_pointer_atomic *buckets; + int nbuckets = hashtable->parallel_state->nbuckets; + int i; + + batch->buckets = + dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, batch->buckets); + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); +} + +/* + * If we are currently attached to a shared hash join batch, detach. If we + * are last to detach, clean up. + */ +void +ExecHashTableDetachBatch(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* Detach from the batch we were last working on. */ + if (BarrierArriveAndDetach(&batch->batch_barrier)) + { + /* + * Technically we shouldn't access the barrier because we're no + * longer attached, but since there is no way it's moving after + * this point it seems safe to make the following assertion. + */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); + + /* Free shared chunks and buckets. */ + while (DsaPointerIsValid(batch->chunks)) + { + HashMemoryChunk chunk = + dsa_get_address(hashtable->area, batch->chunks); + dsa_pointer next = chunk->next.shared; + + dsa_free(hashtable->area, batch->chunks); + batch->chunks = next; + } + if (DsaPointerIsValid(batch->buckets)) + { + dsa_free(hashtable->area, batch->buckets); + batch->buckets = InvalidDsaPointer; + } + } + ExecParallelHashUpdateSpacePeak(hashtable, curbatch); + /* Remember that we are not attached to a batch. */ + hashtable->curbatch = -1; + } +} + +/* + * Detach from all shared resources. If we are last to detach, clean up. + */ +void +ExecHashTableDetach(HashJoinTable hashtable) +{ + if (hashtable->parallel_state) + { + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + /* Make sure any temporary files are closed. */ + if (hashtable->batches) + { + for (i = 0; i < hashtable->nbatch; ++i) + { + sts_end_write(hashtable->batches[i].inner_tuples); + sts_end_write(hashtable->batches[i].outer_tuples); + sts_end_parallel_scan(hashtable->batches[i].inner_tuples); + sts_end_parallel_scan(hashtable->batches[i].outer_tuples); + } + } + + /* If we're last to detach, clean up shared memory. */ + if (BarrierDetach(&pstate->build_barrier)) + { + if (DsaPointerIsValid(pstate->batches)) + { + dsa_free(hashtable->area, pstate->batches); + pstate->batches = InvalidDsaPointer; + } + } + + hashtable->parallel_state = NULL; + } +} + +/* + * Get the first tuple in a given bucket identified by number. + */ +static inline HashJoinTuple +ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) +{ + HashJoinTuple tuple; + dsa_pointer p; + + Assert(hashtable->parallel_state); + p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); + tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); + + return tuple; +} + +/* + * Get the next tuple in the same bucket as 'tuple'. + */ +static inline HashJoinTuple +ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) +{ + HashJoinTuple next; + + Assert(hashtable->parallel_state); + next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); + + return next; +} + +/* + * Insert a tuple at the front of a chain of tuples in DSA memory atomically. + */ +static inline void +ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared) +{ + for (;;) + { + tuple->next.shared = dsa_pointer_atomic_read(head); + if (dsa_pointer_atomic_compare_exchange(head, + &tuple->next.shared, + tuple_shared)) + break; + } +} + +/* + * Prepare to work on a given batch. + */ +void +ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) +{ + Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); + + hashtable->curbatch = batchno; + hashtable->buckets.shared = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[batchno].shared->buckets); + hashtable->nbuckets = hashtable->parallel_state->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + hashtable->current_chunk = NULL; + hashtable->current_chunk_shared = InvalidDsaPointer; + hashtable->batches[batchno].at_least_one_chunk = false; +} + +/* + * Take the next available chunk from the queue of chunks being worked on in + * parallel. Return NULL if there are none left. Otherwise return a pointer + * to the chunk, and set *shared to the DSA pointer to the chunk. + */ +static HashMemoryChunk +ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + HashMemoryChunk chunk; + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + if (DsaPointerIsValid(pstate->chunk_work_queue)) + { + *shared = pstate->chunk_work_queue; + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, *shared); + pstate->chunk_work_queue = chunk->next.shared; + } + else + chunk = NULL; + LWLockRelease(&pstate->lock); + + return chunk; +} + +/* + * Increase the space preallocated in this backend for a given inner batch by + * at least a given amount. This allows us to track whether a given batch + * would fit in memory when loaded back in. Also increase the number of + * batches or buckets if required. + * + * This maintains a running estimation of how much space will be taken when we + * load the batch back into memory by simulating the way chunks will be handed + * out to workers. It's not perfectly accurate because the tuples will be + * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but + * it should be pretty close. It tends to overestimate by a fraction of a + * chunk per worker since all workers gang up to preallocate during hashing, + * but workers tend to reload batches alone if there are enough to go around, + * leaving fewer partially filled chunks. This effect is bounded by + * nparticipants. + * + * Return false if the number of batches or buckets has changed, and the + * caller should reconsider which batch a given tuple now belongs in and call + * again. + */ +static bool +ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; + size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); + + Assert(batchno > 0); + Assert(batchno < hashtable->nbatch); + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + + /* Has another participant commanded us to help grow? */ + if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || + pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + { + ParallelHashGrowth growth = pstate->growth; + + LWLockRelease(&pstate->lock); + if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) + ExecParallelHashIncreaseNumBatches(hashtable); + else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + ExecParallelHashIncreaseNumBuckets(hashtable); + + return false; + } + + if (pstate->growth != PHJ_GROWTH_DISABLED && + batch->at_least_one_chunk && + (batch->shared->estimated_size + size > pstate->space_allowed)) + { + /* + * We have determined that this batch would exceed the space budget if + * loaded into memory. Command all participants to help repartition. + */ + batch->shared->space_exhausted = true; + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + LWLockRelease(&pstate->lock); + + return false; + } + + batch->at_least_one_chunk = true; + batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; + batch->preallocated = want; + LWLockRelease(&pstate->lock); + + return true; +} + +/* + * Update this backend's copy of hashtable->spacePeak to account for a given + * batch. This is called at the end of hashing for batch 0, and then for each + * batch when it is done or discovered to be already done. The result is used + * for EXPLAIN output. + */ +void +ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno) +{ + size_t size; + + size = hashtable->batches[batchno].shared->size; + size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets; + hashtable->spacePeak = Max(hashtable->spacePeak, size); +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ab1632cc13..5d1dc1f401 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -10,18 +10,112 @@ * IDENTIFICATION * src/backend/executor/nodeHashjoin.c * + * PARALLELISM + * + * Hash joins can participate in parallel query execution in several ways. A + * parallel-oblivious hash join is one where the node is unaware that it is + * part of a parallel plan. In this case, a copy of the inner plan is used to + * build a copy of the hash table in every backend, and the outer plan could + * either be built from a partial or complete path, so that the results of the + * hash join are correspondingly either partial or complete. A parallel-aware + * hash join is one that behaves differently, coordinating work between + * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel + * Hash Join always appears with a Parallel Hash node. + * + * Parallel-aware hash joins use the same per-backend state machine to track + * progress through the hash join algorithm as parallel-oblivious hash joins. + * In a parallel-aware hash join, there is also a shared state machine that + * co-operating backends use to synchronize their local state machines and + * program counters. The shared state machine is managed with a Barrier IPC + * primitive. When all attached participants arrive at a barrier, the phase + * advances and all waiting participants are released. + * + * When a participant begins working on a parallel hash join, it must first + * figure out how much progress has already been made, because participants + * don't wait for each other to begin. For this reason there are switch + * statements at key points in the code where we have to synchronize our local + * state machine with the phase, and then jump to the correct part of the + * algorithm so that we can get started. + * + * One barrier called build_barrier is used to coordinate the hashing phases. + * The phase is represented by an integer which begins at zero and increments + * one by one, but in the code it is referred to by symbolic names as follows: + * + * PHJ_BUILD_ELECTING -- initial state + * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 + * PHJ_BUILD_HASHING_INNER -- all hash the inner rel + * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer + * PHJ_BUILD_DONE -- building done, probing can begin + * + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may + * be used repeatedly as required to coordinate expansions in the number of + * batches or buckets. Their phases are as follows: + * + * PHJ_GROW_BATCHES_ELECTING -- initial state + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches + * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition + * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew + * + * PHJ_GROW_BUCKETS_ELECTING -- initial state + * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets + * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples + * + * If the planner got the number of batches and buckets right, those won't be + * necessary, but on the other hand we might finish up needing to expand the + * buckets or batches multiple times while hashing the inner relation to stay + * within our memory budget and load factor target. For that reason it's a + * separate pair of barriers using circular phases. + * + * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins, + * because we need to divide the outer relation into batches up front in order + * to be able to process batches entirely independently. In contrast, the + * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' + * batches whenever it encounters them while scanning and probing, which it + * can do because it processes batches in serial order. + * + * Once PHJ_BUILD_DONE is reached, backends then split up and process + * different batches, or gang up and work together on probing batches if there + * aren't enough to go around. For each batch there is a separate barrier + * with the following phases: + * + * PHJ_BATCH_ELECTING -- initial state + * PHJ_BATCH_ALLOCATING -- one allocates buckets + * PHJ_BATCH_LOADING -- all load the hash table from disk + * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_DONE -- end + * + * Batch 0 is a special case, because it starts out in phase + * PHJ_BATCH_PROBING; populating batch 0's hash table is done during + * PHJ_BUILD_HASHING_INNER so we can skip loading. + * + * Initially we try to plan for a single-batch hash join using the combined + * work_mem of all participants to create a large shared hash table. If that + * turns out either at planning or execution time to be impossible then we + * fall back to regular work_mem sized hash tables. + * + * To avoid deadlocks, we never wait for any barrier unless it is known that + * all other backends attached to it are actively executing the node or have + * already arrived. Practically, that means that we never return a tuple + * while attached to a barrier, unless the barrier has reached its final + * state. In the slightly special case of the per-batch barrier, we return + * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/hashjoin.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" +#include "utils/sharedtuplestore.h" /* @@ -42,24 +136,34 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); +static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue); static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); +static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); /* ---------------------------------------------------------------- - * ExecHashJoin + * ExecHashJoinImpl * - * This function implements the Hybrid Hashjoin algorithm. + * This function implements the Hybrid Hashjoin algorithm. It is marked + * with an always-inline attribute so that ExecHashJoin() and + * ExecParallelHashJoin() can inline it. Compilers that respect the + * attribute should create versions specialized for parallel == true and + * parallel == false with unnecessary branches removed. * * Note: the relation we build hash table on is the "inner" * the other one is "outer". * ---------------------------------------------------------------- */ -static TupleTableSlot * /* return: a tuple or NULL */ -ExecHashJoin(PlanState *pstate) +pg_attribute_always_inline +static inline TupleTableSlot * +ExecHashJoinImpl(PlanState *pstate, bool parallel) { HashJoinState *node = castNode(HashJoinState, pstate); PlanState *outerNode; @@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate) TupleTableSlot *outerTupleSlot; uint32 hashvalue; int batchno; + ParallelHashJoinState *parallel_state; /* * get information from HashJoin node @@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate) outerNode = outerPlanState(node); hashtable = node->hj_HashTable; econtext = node->js.ps.ps_ExprContext; + parallel_state = hashNode->parallel_state; /* * Reset per-tuple memory context to free any expression evaluation @@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (parallel) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables, because no one participant can + * determine that there are no outer tuples, and it's not + * yet clear that it's worth the synchronization overhead + * of reaching consensus to figure that out. So we have + * to build the hash table. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate) node->hj_FirstOuterTupleSlot = NULL; /* - * create the hash table + * Create the hash table. If using Parallel Hash, then + * whoever gets here first will create the hash table and any + * later arrivals will merely attach to it. */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; /* - * execute the Hash node, to build the hash table + * Execute the Hash node, to build the hash table. If using + * Parallel Hash, then we'll try to help hashing unless we + * arrived too late. */ hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); @@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (parallel) + { + Barrier *build_barrier; + + build_barrier = ¶llel_state->build_barrier; + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) + { + /* + * If multi-batch, we need to hash the outer relation + * up front. + */ + if (hashtable->nbatch > 1) + ExecParallelHashJoinPartitionOuter(node); + BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER); + } + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + + /* Each backend should now select a batch to work on. */ + hashtable->curbatch = -1; + node->hj_JoinState = HJ_NEED_NEW_BATCH; + + continue; + } + else + node->hj_JoinState = HJ_NEED_NEW_OUTER; /* FALL THRU */ @@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate) /* * We don't have an outer tuple, try to get the next one */ - outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, - node, - &hashvalue); + if (parallel) + outerTupleSlot = + ExecParallelHashJoinOuterGetTuple(outerNode, node, + &hashvalue); + else + outerTupleSlot = + ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); + if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ @@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate) * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ + Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, &hashtable->outerBatchFile[batchno]); + /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate) /* * Scan the selected hash bucket for matches to current outer */ - if (!ExecScanHashBucket(node, econtext)) + if (parallel) { - /* out of matches; check for possible outer-join fill */ - node->hj_JoinState = HJ_FILL_OUTER_TUPLE; - continue; + if (!ExecParallelScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } + } + else + { + if (!ExecScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } } /* @@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate) /* * Try to advance to next batch. Done if there are no more. */ - if (!ExecHashJoinNewBatch(node)) - return NULL; /* end of join */ + if (parallel) + { + if (!ExecParallelHashJoinNewBatch(node)) + return NULL; /* end of parallel-aware join */ + } + else + { + if (!ExecHashJoinNewBatch(node)) + return NULL; /* end of parallel-oblivious join */ + } node->hj_JoinState = HJ_NEED_NEW_OUTER; break; @@ -374,6 +550,38 @@ ExecHashJoin(PlanState *pstate) } } +/* ---------------------------------------------------------------- + * ExecHashJoin + * + * Parallel-oblivious version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-aware branches removed. + */ + return ExecHashJoinImpl(pstate, false); +} + +/* ---------------------------------------------------------------- + * ExecParallelHashJoin + * + * Parallel-aware version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecParallelHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-oblivious branches removed. + */ + return ExecHashJoinImpl(pstate, true); +} + /* ---------------------------------------------------------------- * ExecInitHashJoin * @@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate = makeNode(HashJoinState); hjstate->js.ps.plan = (Plan *) node; hjstate->js.ps.state = estate; + + /* + * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() + * where this function may be replaced with a parallel version, if we + * managed to launch a parallel query. + */ hjstate->js.ps.ExecProcNode = ExecHashJoin; /* @@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node) /* * ExecHashJoinOuterGetTuple * - * get the next outer tuple for hashjoin: either by - * executing the outer plan node in the first pass, or from - * the temp files for the hashjoin batches. + * get the next outer tuple for a parallel oblivious hashjoin: either by + * executing the outer plan node in the first pass, or from the temp + * files for the hashjoin batches. * * Returns a null slot if no more outer tuples (within the current batch). * @@ -661,6 +875,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, return NULL; } +/* + * ExecHashJoinOuterGetTuple variant for the parallel case. + */ +static TupleTableSlot * +ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + + /* + * In the Parallel Hash case we only run the outer plan directly for + * single-batch hash joins. Otherwise we have to go to batch files, even + * for batch 0. + */ + if (curbatch == 0 && hashtable->nbatch == 1) + { + slot = ExecProcNode(outerNode); + + while (!TupIsNull(slot)) + { + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate), + hashvalue)) + return slot; + + /* + * That tuple couldn't match because of a NULL, so discard it and + * continue with the next one. + */ + slot = ExecProcNode(outerNode); + } + } + else if (curbatch < hashtable->nbatch) + { + MinimalTuple tuple; + + tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, + hashvalue); + if (tuple != NULL) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_OuterTupleSlot, + false); + return slot; + } + else + ExecClearTuple(hjstate->hj_OuterTupleSlot); + } + + /* End of this batch */ + return NULL; +} + /* * ExecHashJoinNewBatch * switch to a new hashjoin batch @@ -803,6 +1078,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) return true; } +/* + * Choose a batch to work on, and attach to it. Returns true if successful, + * false if there are no more batches. + */ +static bool +ExecParallelHashJoinNewBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int start_batchno; + int batchno; + + /* + * If we started up so late that the batch tracking array has been freed + * already by ExecHashTableDetach(), then we are finished. See also + * ExecParallelHashEnsureBatchAccessors(). + */ + if (hashtable->batches == NULL) + return false; + + /* + * If we were already attached to a batch, remember not to bother checking + * it again, and detach from it (possibly freeing the hash table if we are + * last to detach). + */ + if (hashtable->curbatch >= 0) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + /* + * Search for a batch that isn't done. We use an atomic counter to start + * our search at a different batch in every participant when there are + * more batches than participants. + */ + batchno = start_batchno = + pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % + hashtable->nbatch; + do + { + uint32 hashvalue; + MinimalTuple tuple; + TupleTableSlot *slot; + + if (!hashtable->batches[batchno].done) + { + SharedTuplestoreAccessor *inner_tuples; + Barrier *batch_barrier = + &hashtable->batches[batchno].shared->batch_barrier; + + switch (BarrierAttach(batch_barrier)) + { + case PHJ_BATCH_ELECTING: + + /* One backend allocates the hash table. */ + if (BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ELECTING)) + ExecParallelHashTableAlloc(hashtable, batchno); + /* Fall through. */ + + case PHJ_BATCH_ALLOCATING: + /* Wait for allocation to complete. */ + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ALLOCATING); + /* Fall through. */ + + case PHJ_BATCH_LOADING: + /* Start (or join in) loading tuples. */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + inner_tuples = hashtable->batches[batchno].inner_tuples; + sts_begin_parallel_scan(inner_tuples); + while ((tuple = sts_parallel_scan_next(inner_tuples, + &hashvalue))) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + ExecParallelHashTableInsertCurrentBatch(hashtable, slot, + hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_LOADING); + /* Fall through. */ + + case PHJ_BATCH_PROBING: + + /* + * This batch is ready to probe. Return control to + * caller. We stay attached to batch_barrier so that the + * hash table stays alive until everyone's finished + * probing it, but no participant is allowed to wait at + * this barrier again (or else a deadlock could occur). + * All attached participants must eventually call + * BarrierArriveAndDetach() so that the final phase + * PHJ_BATCH_DONE can be reached. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + + case PHJ_BATCH_DONE: + + /* + * Already done. Detach and go around again (if any + * remain). + */ + BarrierDetach(batch_barrier); + + /* + * We didn't work on this batch, but we need to observe + * its size for EXPLAIN. + */ + ExecParallelHashUpdateSpacePeak(hashtable, batchno); + hashtable->batches[batchno].done = true; + hashtable->curbatch = -1; + break; + + default: + elog(ERROR, "unexpected batch phase %d", + BarrierPhase(batch_barrier)); + } + } + batchno = (batchno + 1) % hashtable->nbatch; + } while (batchno != start_batchno); + + return false; +} + /* * ExecHashJoinSaveTuple * save a tuple to a batch file. @@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void +ExecShutdownHashJoin(HashJoinState *node) +{ + if (node->hj_HashTable) + { + /* + * Detach from shared state before DSM memory goes away. This makes + * sure that we don't have any pointers into DSM memory by the time + * ExecEndHashJoin runs. + */ + ExecHashTableDetachBatch(node->hj_HashTable); + ExecHashTableDetach(node->hj_HashTable); + } +} + +static void +ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) +{ + PlanState *outerState = outerPlanState(hjstate); + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + HashJoinTable hashtable = hjstate->hj_HashTable; + TupleTableSlot *slot; + uint32 hashvalue; + int i; + + Assert(hjstate->hj_FirstOuterTupleSlot == NULL); + + /* Execute outer plan, writing all tuples to shared tuplestores. */ + for (;;) + { + slot = ExecProcNode(outerState); + if (TupIsNull(slot)) + break; + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + false, /* outer join, currently unsupported */ + &hashvalue)) + { + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + sts_puttuple(hashtable->batches[batchno].outer_tuples, + &hashvalue, ExecFetchSlotMinimalTuple(slot)); + } + CHECK_FOR_INTERRUPTS(); + } + + /* Make sure all outer partitions are readable by any backend. */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].outer_tuples); +} + +void +ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + HashState *hashNode; + ParallelHashJoinState *pstate; + + /* + * Disable shared hash table mode if we failed to create a real DSM + * segment, because that means that we don't have a DSA area to work with. + */ + if (pcxt->seg == NULL) + return; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); + + /* + * Set up the state needed to coordinate access to the shared hash + * table(s), using the plan node ID as the toc key. + */ + pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); + shm_toc_insert(pcxt->toc, plan_node_id, pstate); + + /* + * Set up the shared hash join state with no batches initially. + * ExecHashTableCreate() will prepare at least one later and set nbatch + * and space_allowed. + */ + pstate->nbatch = 0; + pstate->space_allowed = 0; + pstate->batches = InvalidDsaPointer; + pstate->old_batches = InvalidDsaPointer; + pstate->nbuckets = 0; + pstate->growth = PHJ_GROWTH_OK; + pstate->chunk_work_queue = InvalidDsaPointer; + pg_atomic_init_u32(&pstate->distributor, 0); + pstate->nparticipants = pcxt->nworkers + 1; + pstate->total_tuples = 0; + LWLockInitialize(&pstate->lock, + LWTRANCHE_PARALLEL_HASH_JOIN); + BarrierInit(&pstate->build_barrier, 0); + BarrierInit(&pstate->grow_batches_barrier, 0); + BarrierInit(&pstate->grow_buckets_barrier, 0); + + /* Set up the space we'll use for shared temporary files. */ + SharedFileSetInit(&pstate->fileset, pcxt->seg); + + /* Initialize the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; +} + +/* ---------------------------------------------------------------- + * ExecHashJoinReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(cxt->toc, plan_node_id, false); + + /* + * It would be possible to reuse the shared hash table in single-batch + * cases by resetting and then fast-forwarding build_barrier to + * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but + * currently shared hash tables are already freed by now (by the last + * participant to detach from the batch). We could consider keeping it + * around for single-batch joins. We'd also need to adjust + * finalize_plan() so that it doesn't record a dummy dependency for + * Parallel Hash nodes, preventing the rescan optimization. For now we + * don't try. + */ + + /* Detach, freeing any remaining shared memory. */ + if (state->hj_HashTable != NULL) + { + ExecHashTableDetachBatch(state->hj_HashTable); + ExecHashTableDetach(state->hj_HashTable); + } + + /* Clear any shared batch files. */ + SharedFileSetDeleteAll(&pstate->fileset); + + /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */ + BarrierInit(&pstate->build_barrier, 0); +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt) +{ + HashState *hashNode; + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(pwcxt->toc, plan_node_id, false); + + /* Attach to the space for shared temporary files. */ + SharedFileSetAttach(&pstate->fileset, pwcxt->seg); + + /* Attach to the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index b1515dd8e1..84d717102d 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1057,6 +1057,7 @@ _copyHash(const Hash *from) COPY_SCALAR_FIELD(skewTable); COPY_SCALAR_FIELD(skewColumn); COPY_SCALAR_FIELD(skewInherit); + COPY_SCALAR_FIELD(rows_total); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index b59a5219a7..e468d7cc41 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -927,6 +927,7 @@ _outHash(StringInfo str, const Hash *node) WRITE_OID_FIELD(skewTable); WRITE_INT_FIELD(skewColumn); WRITE_BOOL_FIELD(skewInherit); + WRITE_FLOAT_FIELD(rows_total, "%.0f"); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 0d17ae89b0..1133c70a1c 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2213,6 +2213,7 @@ _readHash(void) READ_OID_FIELD(skewTable); READ_INT_FIELD(skewColumn); READ_BOOL_FIELD(skewInherit); + READ_FLOAT_FIELD(rows_total); READ_DONE(); } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 877827dcb5..c3daacd3ea 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -129,6 +129,7 @@ bool enable_hashjoin = true; bool enable_gathermerge = true; bool enable_partition_wise_join = false; bool enable_parallel_append = true; +bool enable_parallel_hash = true; typedef struct { @@ -3130,16 +3131,19 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, JoinType jointype, List *hashclauses, Path *outer_path, Path *inner_path, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + bool parallel_hash) { Cost startup_cost = 0; Cost run_cost = 0; double outer_path_rows = outer_path->rows; double inner_path_rows = inner_path->rows; + double inner_path_rows_total = inner_path_rows; int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; int num_skew_mcvs; + size_t space_allowed; /* unused */ /* cost of source data */ startup_cost += outer_path->startup_cost; @@ -3160,6 +3164,15 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * inner_path_rows; run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows; + /* + * If this is a parallel hash build, then the value we have for + * inner_rows_total currently refers only to the rows returned by each + * participant. For shared hash table size estimation, we need the total + * number, so we need to undo the division. + */ + if (parallel_hash) + inner_path_rows_total *= get_parallel_divisor(inner_path); + /* * Get hash table size that executor would use for inner relation. * @@ -3170,9 +3183,12 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * XXX at some point it might be interesting to try to account for skew * optimization in the cost estimate, but for now, we don't. */ - ExecChooseHashTableSize(inner_path_rows, + ExecChooseHashTableSize(inner_path_rows_total, inner_path->pathtarget->width, true, /* useskew */ + parallel_hash, /* try_combined_work_mem */ + outer_path->parallel_workers, + &space_allowed, &numbuckets, &numbatches, &num_skew_mcvs); @@ -3204,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, workspace->run_cost = run_cost; workspace->numbuckets = numbuckets; workspace->numbatches = numbatches; + workspace->inner_rows_total = inner_path_rows_total; } /* @@ -3226,6 +3243,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path, Path *inner_path = path->jpath.innerjoinpath; double outer_path_rows = outer_path->rows; double inner_path_rows = inner_path->rows; + double inner_path_rows_total = workspace->inner_rows_total; List *hashclauses = path->path_hashclauses; Cost startup_cost = workspace->startup_cost; Cost run_cost = workspace->run_cost; @@ -3266,6 +3284,9 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path, /* mark the path with estimated # of batches */ path->num_batches = numbatches; + /* store the total number of tuples (sum of partial row estimates) */ + path->inner_rows_total = inner_path_rows_total; + /* and compute the number of "virtual" buckets in the whole join */ virtualbuckets = (double) numbuckets * (double) numbatches; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index 02a630278f..e774130ac8 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -747,7 +747,7 @@ try_hashjoin_path(PlannerInfo *root, * never have any output pathkeys, per comments in create_hashjoin_path. */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, - outer_path, inner_path, extra); + outer_path, inner_path, extra, false); if (add_path_precheck(joinrel, workspace.startup_cost, workspace.total_cost, @@ -761,6 +761,7 @@ try_hashjoin_path(PlannerInfo *root, extra, outer_path, inner_path, + false, /* parallel_hash */ extra->restrictlist, required_outer, hashclauses)); @@ -776,6 +777,10 @@ try_hashjoin_path(PlannerInfo *root, * try_partial_hashjoin_path * Consider a partial hashjoin join path; if it appears useful, push it into * the joinrel's partial_pathlist via add_partial_path(). + * The outer side is partial. If parallel_hash is true, then the inner path + * must be partial and will be run in parallel to create one or more shared + * hash tables; otherwise the inner path must be complete and a copy of it + * is run in every process to create separate identical private hash tables. */ static void try_partial_hashjoin_path(PlannerInfo *root, @@ -784,7 +789,8 @@ try_partial_hashjoin_path(PlannerInfo *root, Path *inner_path, List *hashclauses, JoinType jointype, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + bool parallel_hash) { JoinCostWorkspace workspace; @@ -808,7 +814,7 @@ try_partial_hashjoin_path(PlannerInfo *root, * cost. Bail out right away if it looks terrible. */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, - outer_path, inner_path, extra); + outer_path, inner_path, extra, true); if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL)) return; @@ -821,6 +827,7 @@ try_partial_hashjoin_path(PlannerInfo *root, extra, outer_path, inner_path, + parallel_hash, extra->restrictlist, NULL, hashclauses)); @@ -1839,6 +1846,10 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. + * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel + * Hash, since in that case we're back to a single hash table with a + * single set of match bits for each batch, but that will require + * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && @@ -1848,11 +1859,27 @@ hash_inner_and_outer(PlannerInfo *root, bms_is_empty(joinrel->lateral_relids)) { Path *cheapest_partial_outer; + Path *cheapest_partial_inner = NULL; Path *cheapest_safe_inner = NULL; cheapest_partial_outer = (Path *) linitial(outerrel->partial_pathlist); + /* + * Can we use a partial inner plan too, so that we can build a + * shared hash table in parallel? + */ + if (innerrel->partial_pathlist != NIL && enable_parallel_hash) + { + cheapest_partial_inner = + (Path *) linitial(innerrel->partial_pathlist); + try_partial_hashjoin_path(root, joinrel, + cheapest_partial_outer, + cheapest_partial_inner, + hashclauses, jointype, extra, + true /* parallel_hash */ ); + } + /* * Normally, given that the joinrel is parallel-safe, the cheapest * total inner path will also be parallel-safe, but if not, we'll @@ -1870,7 +1897,8 @@ hash_inner_and_outer(PlannerInfo *root, try_partial_hashjoin_path(root, joinrel, cheapest_partial_outer, cheapest_safe_inner, - hashclauses, jointype, extra); + hashclauses, jointype, extra, + false /* parallel_hash */ ); } } } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index f6c83d0477..1a0d3a885f 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -4192,6 +4192,17 @@ create_hashjoin_plan(PlannerInfo *root, copy_plan_costsize(&hash_plan->plan, inner_plan); hash_plan->plan.startup_cost = hash_plan->plan.total_cost; + /* + * If parallel-aware, the executor will also need an estimate of the total + * number of rows expected from all participants so that it can size the + * shared hash table. + */ + if (best_path->jpath.path.parallel_aware) + { + hash_plan->plan.parallel_aware = true; + hash_plan->rows_total = best_path->inner_rows_total; + } + join_plan = make_hashjoin(tlist, joinclauses, otherclauses, diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 54126fbb6a..2aee156ad3 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2278,6 +2278,7 @@ create_mergejoin_path(PlannerInfo *root, * 'extra' contains various information about the join * 'outer_path' is the cheapest outer path * 'inner_path' is the cheapest inner path + * 'parallel_hash' to select Parallel Hash of inner path (shared hash table) * 'restrict_clauses' are the RestrictInfo nodes to apply at the join * 'required_outer' is the set of required outer rels * 'hashclauses' are the RestrictInfo nodes to use as hash clauses @@ -2291,6 +2292,7 @@ create_hashjoin_path(PlannerInfo *root, JoinPathExtraData *extra, Path *outer_path, Path *inner_path, + bool parallel_hash, List *restrict_clauses, Relids required_outer, List *hashclauses) @@ -2308,7 +2310,8 @@ create_hashjoin_path(PlannerInfo *root, extra->sjinfo, required_outer, &restrict_clauses); - pathnode->jpath.path.parallel_aware = false; + pathnode->jpath.path.parallel_aware = + joinrel->consider_parallel && parallel_hash; pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; /* This is a foolish way to estimate parallel_workers, but for now... */ diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 5c256ff8ab..b502c1bb9b 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3586,6 +3586,51 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_EXECUTE_GATHER: event_name = "ExecuteGather"; break; + case WAIT_EVENT_HASH_BATCH_ALLOCATING: + event_name = "Hash/Batch/Allocating"; + break; + case WAIT_EVENT_HASH_BATCH_ELECTING: + event_name = "Hash/Batch/Electing"; + break; + case WAIT_EVENT_HASH_BATCH_LOADING: + event_name = "Hash/Batch/Loading"; + break; + case WAIT_EVENT_HASH_BUILD_ALLOCATING: + event_name = "Hash/Build/Allocating"; + break; + case WAIT_EVENT_HASH_BUILD_ELECTING: + event_name = "Hash/Build/Electing"; + break; + case WAIT_EVENT_HASH_BUILD_HASHING_INNER: + event_name = "Hash/Build/HashingInner"; + break; + case WAIT_EVENT_HASH_BUILD_HASHING_OUTER: + event_name = "Hash/Build/HashingOuter"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING: + event_name = "Hash/GrowBatches/Allocating"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_DECIDING: + event_name = "Hash/GrowBatches/Deciding"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_ELECTING: + event_name = "Hash/GrowBatches/Electing"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_FINISHING: + event_name = "Hash/GrowBatches/Finishing"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING: + event_name = "Hash/GrowBatches/Repartitioning"; + break; + case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING: + event_name = "Hash/GrowBuckets/Allocating"; + break; + case WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING: + event_name = "Hash/GrowBuckets/Electing"; + break; + case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING: + event_name = "Hash/GrowBuckets/Reinserting"; + break; case WAIT_EVENT_LOGICAL_SYNC_DATA: event_name = "LogicalSyncData"; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0f7a96d85c..e32901d506 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -929,7 +929,15 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, - + { + {"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's user of parallel hash plans."), + NULL + }, + &enable_parallel_hash, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, gettext_noop("Enables genetic query optimization."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 842cf3601a..69f40f04b0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -301,6 +301,7 @@ #enable_sort = on #enable_tidscan = on #enable_partition_wise_join = off +#enable_parallel_hash = on # - Planner Cost Constants - diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 82acadf85b..d8c82d4e7c 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -15,7 +15,10 @@ #define HASHJOIN_H #include "nodes/execnodes.h" +#include "port/atomics.h" +#include "storage/barrier.h" #include "storage/buffile.h" +#include "storage/lwlock.h" /* ---------------------------------------------------------------- * hash-join hash table structures @@ -63,7 +66,12 @@ typedef struct HashJoinTupleData { - struct HashJoinTupleData *next; /* link to next tuple in same bucket */ + /* link to next tuple in same bucket */ + union + { + struct HashJoinTupleData *unshared; + dsa_pointer shared; + } next; uint32 hashvalue; /* tuple's hash code */ /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ } HashJoinTupleData; @@ -112,8 +120,12 @@ typedef struct HashMemoryChunkData size_t maxlen; /* size of the buffer holding the tuples */ size_t used; /* number of buffer bytes already used */ - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked - * list) */ + /* pointer to the next chunk (linked list) */ + union + { + struct HashMemoryChunkData *unshared; + dsa_pointer shared; + } next; char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */ } HashMemoryChunkData; @@ -121,8 +133,148 @@ typedef struct HashMemoryChunkData typedef struct HashMemoryChunkData *HashMemoryChunk; #define HASH_CHUNK_SIZE (32 * 1024L) +#define HASH_CHUNK_HEADER_SIZE (offsetof(HashMemoryChunkData, data)) #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) +/* + * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch + * object in shared memory to coordinate access to it. Since they are + * followed by variable-sized objects, they are arranged in contiguous memory + * but not accessed directly as an array. + */ +typedef struct ParallelHashJoinBatch +{ + dsa_pointer buckets; /* array of hash table buckets */ + Barrier batch_barrier; /* synchronization for joining this batch */ + + dsa_pointer chunks; /* chunks of tuples loaded */ + size_t size; /* size of buckets + chunks in memory */ + size_t estimated_size; /* size of buckets + chunks while writing */ + size_t ntuples; /* number of tuples loaded */ + size_t old_ntuples; /* number of tuples before repartitioning */ + bool space_exhausted; + + /* + * Variable-sized SharedTuplestore objects follow this struct in memory. + * See the accessor macros below. + */ +} ParallelHashJoinBatch; + +/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchInner(batch) \ + ((SharedTuplestore *) \ + ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch)))) + +/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchOuter(batch, nparticipants) \ + ((SharedTuplestore *) \ + ((char *) ParallelHashJoinBatchInner(batch) + \ + MAXALIGN(sts_estimate(nparticipants)))) + +/* Total size of a ParallelHashJoinBatch and tuplestores. */ +#define EstimateParallelHashJoinBatch(hashtable) \ + (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ + MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) + +/* Accessor for the nth ParallelHashJoinBatch given the base. */ +#define NthParallelHashJoinBatch(base, n) \ + ((ParallelHashJoinBatch *) \ + ((char *) (base) + \ + EstimateParallelHashJoinBatch(hashtable) * (n))) + +/* + * Each backend requires a small amount of per-batch state to interact with + * each ParalellHashJoinBatch. + */ +typedef struct ParallelHashJoinBatchAccessor +{ + ParallelHashJoinBatch *shared; /* pointer to shared state */ + + /* Per-backend partial counters to reduce contention. */ + size_t preallocated; /* pre-allocated space for this backend */ + size_t ntuples; /* number of tuples */ + size_t size; /* size of partition in memory */ + size_t estimated_size; /* size of partition on disk */ + size_t old_ntuples; /* how many tuples before repartioning? */ + bool at_least_one_chunk; /* has this backend allocated a chunk? */ + + bool done; /* flag to remember that a batch is done */ + SharedTuplestoreAccessor *inner_tuples; + SharedTuplestoreAccessor *outer_tuples; +} ParallelHashJoinBatchAccessor; + +/* + * While hashing the inner relation, any participant might determine that it's + * time to increase the number of buckets to reduce the load factor or batches + * to reduce the memory size. This is indicated by setting the growth flag to + * these values. + */ +typedef enum ParallelHashGrowth +{ + /* The current dimensions are sufficient. */ + PHJ_GROWTH_OK, + /* The load factor is too high, so we need to add buckets. */ + PHJ_GROWTH_NEED_MORE_BUCKETS, + /* The memory budget would be exhausted, so we need to repartition. */ + PHJ_GROWTH_NEED_MORE_BATCHES, + /* Repartitioning didn't help last time, so don't try to do that again. */ + PHJ_GROWTH_DISABLED +} ParallelHashGrowth; + +/* + * The shared state used to coordinate a Parallel Hash Join. This is stored + * in the DSM segment. + */ +typedef struct ParallelHashJoinState +{ + dsa_pointer batches; /* array of ParallelHashJoinBatch */ + dsa_pointer old_batches; /* previous generation during repartition */ + int nbatch; /* number of batches now */ + int old_nbatch; /* previous number of batches */ + int nbuckets; /* number of buckets */ + ParallelHashGrowth growth; /* control batch/bucket growth */ + dsa_pointer chunk_work_queue; /* chunk work queue */ + int nparticipants; + size_t space_allowed; + size_t total_tuples; /* total number of inner tuples */ + LWLock lock; /* lock protecting the above */ + + Barrier build_barrier; /* synchronization for the build phases */ + Barrier grow_batches_barrier; + Barrier grow_buckets_barrier; + pg_atomic_uint32 distributor; /* counter for load balancing */ + + SharedFileSet fileset; /* space for shared temporary files */ +} ParallelHashJoinState; + +/* The phases for building batches, used by build_barrier. */ +#define PHJ_BUILD_ELECTING 0 +#define PHJ_BUILD_ALLOCATING 1 +#define PHJ_BUILD_HASHING_INNER 2 +#define PHJ_BUILD_HASHING_OUTER 3 +#define PHJ_BUILD_DONE 4 + +/* The phases for probing each batch, used by for batch_barrier. */ +#define PHJ_BATCH_ELECTING 0 +#define PHJ_BATCH_ALLOCATING 1 +#define PHJ_BATCH_LOADING 2 +#define PHJ_BATCH_PROBING 3 +#define PHJ_BATCH_DONE 4 + +/* The phases of batch growth while hashing, for grow_batches_barrier. */ +#define PHJ_GROW_BATCHES_ELECTING 0 +#define PHJ_GROW_BATCHES_ALLOCATING 1 +#define PHJ_GROW_BATCHES_REPARTITIONING 2 +#define PHJ_GROW_BATCHES_DECIDING 3 +#define PHJ_GROW_BATCHES_FINISHING 4 +#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ + +/* The phases of bucket growth while hashing, for grow_buckets_barrier. */ +#define PHJ_GROW_BUCKETS_ELECTING 0 +#define PHJ_GROW_BUCKETS_ALLOCATING 1 +#define PHJ_GROW_BUCKETS_REINSERTING 2 +#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ @@ -133,8 +285,13 @@ typedef struct HashJoinTableData int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ /* buckets[i] is head of list of tuples in i'th in-memory bucket */ - struct HashJoinTupleData **buckets; - /* buckets array is per-batch storage, as are all the tuples */ + union + { + /* unshared array is per-batch storage, as are all the tuples */ + struct HashJoinTupleData **unshared; + /* shared array is per-query DSA area, as are all the tuples */ + dsa_pointer_atomic *shared; + } buckets; bool keepNulls; /* true to store unmatchable NULL tuples */ @@ -153,6 +310,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double partialTuples; /* # tuples obtained from inner plan by me */ double skewTuples; /* # tuples inserted into skew tuples */ /* @@ -185,6 +343,13 @@ typedef struct HashJoinTableData /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ + + /* Shared and private state for Parallel Hash. */ + HashMemoryChunk current_chunk; /* this backend's current chunk */ + dsa_area *area; /* DSA area to allocate memory from */ + ParallelHashJoinState *parallel_state; + ParallelHashJoinBatchAccessor *batches; + dsa_pointer current_chunk_shared; } HashJoinTableData; #endif /* HASHJOIN_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 0974f1edc2..84c166b395 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -17,17 +17,33 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +struct SharedHashJoinBatch; + extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags); extern Node *MultiExecHash(HashState *node); extern void ExecEndHash(HashState *node); extern void ExecReScanHash(HashState *node); -extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators, +extern HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls); +extern void ExecParallelHashTableAlloc(HashJoinTable hashtable, + int batchno); extern void ExecHashTableDestroy(HashJoinTable hashtable); +extern void ExecHashTableDetach(HashJoinTable hashtable); +extern void ExecHashTableDetachBatch(HashJoinTable hashtable); +extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, + int batchno); +void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno); + extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue); +extern void ExecParallelHashTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue); +extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue); extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, @@ -39,12 +55,16 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, int *bucketno, int *batchno); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool try_combined_work_mem, + int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs); @@ -55,6 +75,6 @@ extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwc extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable); + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index 7469bfbf60..8469085d7e 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -20,6 +20,12 @@ extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags); extern void ExecEndHashJoin(HashJoinState *node); extern void ExecReScanHashJoin(HashJoinState *node); +extern void ExecShutdownHashJoin(HashJoinState *node); +extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt); extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1a35c5c9ad..44d8c47d2c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -25,6 +25,7 @@ #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" +#include "utils/sharedtuplestore.h" #include "utils/sortsupport.h" #include "utils/tuplestore.h" #include "utils/tuplesort.h" @@ -43,6 +44,8 @@ struct ExprState; /* forward references in this file */ struct ExprContext; struct ExprEvalStep; /* avoid including execExpr.h everywhere */ +struct ParallelHashJoinState; + typedef Datum (*ExprStateEvalFunc) (struct ExprState *expression, struct ExprContext *econtext, bool *isNull); @@ -2026,6 +2029,9 @@ typedef struct HashState SharedHashInfo *shared_info; /* one entry per worker */ HashInstrumentation *hinstrument; /* this worker's entry */ + + /* Parallel hash state. */ + struct ParallelHashJoinState *parallel_state; } HashState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 02fb366680..d763da647b 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -880,6 +880,7 @@ typedef struct Hash AttrNumber skewColumn; /* outer join key's column #, or zero */ bool skewInherit; /* is outer join rel an inheritance tree? */ /* all other info is in the parent HashJoin node */ + double rows_total; /* estimate total rows if parallel_aware */ } Hash; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 1108b6a0ea..3b9d303ce4 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1464,6 +1464,7 @@ typedef struct HashPath JoinPath jpath; List *path_hashclauses; /* join clauses used for hashing */ int num_batches; /* number of batches expected */ + double inner_rows_total; /* total inner rows expected */ } HashPath; /* @@ -2315,6 +2316,7 @@ typedef struct JoinCostWorkspace /* private for cost_hashjoin code */ int numbuckets; int numbatches; + double inner_rows_total; } JoinCostWorkspace; #endif /* RELATION_H */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 5a1fbf97c3..27afc2eaeb 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern bool enable_hashjoin; extern bool enable_gathermerge; extern bool enable_partition_wise_join; extern bool enable_parallel_append; +extern bool enable_parallel_hash; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -153,7 +154,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root, JoinType jointype, List *hashclauses, Path *outer_path, Path *inner_path, - JoinPathExtraData *extra); + JoinPathExtraData *extra, + bool parallel_hash); extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, JoinPathExtraData *extra); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 99f65b44f2..3ef12b323b 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -153,6 +153,7 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root, JoinPathExtraData *extra, Path *outer_path, Path *inner_path, + bool parallel_hash, List *restrict_clauses, Relids required_outer, List *hashclauses); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 089b7c3a10..58f3a19bc6 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -803,6 +803,21 @@ typedef enum WAIT_EVENT_BGWORKER_STARTUP, WAIT_EVENT_BTREE_PAGE, WAIT_EVENT_EXECUTE_GATHER, + WAIT_EVENT_HASH_BATCH_ALLOCATING, + WAIT_EVENT_HASH_BATCH_ELECTING, + WAIT_EVENT_HASH_BATCH_LOADING, + WAIT_EVENT_HASH_BUILD_ALLOCATING, + WAIT_EVENT_HASH_BUILD_ELECTING, + WAIT_EVENT_HASH_BUILD_HASHING_INNER, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER, + WAIT_EVENT_HASH_GROW_BATCHES_ELECTING, + WAIT_EVENT_HASH_GROW_BATCHES_FINISHING, + WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING, + WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING, + WAIT_EVENT_HASH_GROW_BATCHES_DECIDING, + WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING, + WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING, + WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING, WAIT_EVENT_LOGICAL_SYNC_DATA, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WAIT_EVENT_MQ_INTERNAL, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index a347ee4d7d..97e4a0bbbd 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -211,6 +211,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_BUFFER_MAPPING, LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, + LWTRANCHE_PARALLEL_HASH_JOIN, LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_SESSION_DSA, LWTRANCHE_SESSION_RECORD_TABLE, diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index 9d3abf0ed0..a7cfdf1f44 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -5884,6 +5884,9 @@ insert into extremely_skewed update pg_class set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192 where relname = 'extremely_skewed'; +-- Make a relation with a couple of enormous tuples. +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +alter table wide set (parallel_workers = 2); -- The "optimal" case: the hash table fits in memory; we plan for 1 -- batch, we stick to that number, and peak memory usage stays within -- our work_mem budget @@ -5924,6 +5927,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '4MB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join simple s using (id); QUERY PLAN @@ -5955,6 +5959,43 @@ $$); f | f (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r join simple s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + f | f +(1 row) + rollback to settings; -- The "good" case: batches required, but we plan the right number; we -- plan for some number of batches, and we stick to that number, and @@ -5996,6 +6037,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join simple s using (id); QUERY PLAN @@ -6027,6 +6069,43 @@ $$); t | f (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r join simple s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + t | f +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -6069,6 +6148,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); QUERY PLAN @@ -6100,6 +6180,43 @@ $$); f | t (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '192kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join bigger_than_it_looks s using (id); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on bigger_than_it_looks s +(9 rows) + +select count(*) from simple r join bigger_than_it_looks s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join bigger_than_it_looks s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + f | t +(1 row) + rollback to settings; -- The "ugly" case: increasing the number of batches during execution -- doesn't help, so stop trying to fit in work_mem and hope for the @@ -6142,6 +6259,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join extremely_skewed s using (id); QUERY PLAN @@ -6171,6 +6289,42 @@ $$); 1 | 2 (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join extremely_skewed s using (id); + QUERY PLAN +----------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on extremely_skewed s +(9 rows) + +select count(*) from simple r join extremely_skewed s using (id); + count +------- + 20000 +(1 row) + +select * from hash_join_batches( +$$ + select count(*) from simple r join extremely_skewed s using (id); +$$); + original | final +----------+------- + 1 | 4 +(1 row) + rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. -- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate @@ -6192,10 +6346,11 @@ rollback to settings; -- that we can check that instrumentation comes back correctly. create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; alter table foo set (parallel_workers = 0); -create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t; +create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t; alter table bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -6246,6 +6401,7 @@ $$); rollback to settings; -- single-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -6293,6 +6449,108 @@ $$); f (1 row) +rollback to settings; +-- multi-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '64kB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate + -> Nested Loop Left Join + Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1))) + -> Seq Scan on foo + -> Gather + Workers Planned: 2 + -> Parallel Hash Join + Hash Cond: (b1.id = b2.id) + -> Parallel Seq Scan on bar b1 + -> Parallel Hash + -> Parallel Seq Scan on bar b2 +(11 rows) + +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + count +------- + 3 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); + multibatch +------------ + t +(1 row) + +rollback to settings; +-- single-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '4MB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate + -> Nested Loop Left Join + Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1))) + -> Seq Scan on foo + -> Gather + Workers Planned: 2 + -> Parallel Hash Join + Hash Cond: (b1.id = b2.id) + -> Parallel Seq Scan on bar b1 + -> Parallel Hash + -> Parallel Seq Scan on bar b2 +(11 rows) + +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + count +------- + 3 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); + multibatch +------------ + f +(1 row) + rollback to settings; -- A full outer join where every record is matched. -- non-parallel @@ -6383,5 +6641,49 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- exercise special code paths for huge tuples (note use of non-strict +-- expression and left join required to get the detoasted tuple into +-- the hash table) +-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and +-- sts_puttuple oversized tuple cases because it's multi-batch) +savepoint settings; +set max_parallel_workers_per_gather = 2; +set enable_parallel_hash = on; +set work_mem = '128kB'; +explain (costs off) + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); + QUERY PLAN +---------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Left Join + Hash Cond: (wide.id = wide_1.id) + -> Parallel Seq Scan on wide + -> Parallel Hash + -> Parallel Seq Scan on wide wide_1 +(9 rows) + +select length(max(s.t)) +from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); + length +-------- + 320000 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +$$); + multibatch +------------ + t +(1 row) + rollback to settings; rollback; diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 2b738aae7c..c9c8f51e1c 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -82,11 +82,12 @@ select name, setting from pg_settings where name like 'enable%'; enable_mergejoin | on enable_nestloop | on enable_parallel_append | on + enable_parallel_hash | on enable_partition_wise_join | off enable_seqscan | on enable_sort | on enable_tidscan | on -(14 rows) +(15 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql index 0e933e00d5..a6a452f960 100644 --- a/src/test/regress/sql/join.sql +++ b/src/test/regress/sql/join.sql @@ -2028,6 +2028,10 @@ update pg_class set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192 where relname = 'extremely_skewed'; +-- Make a relation with a couple of enormous tuples. +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +alter table wide set (parallel_workers = 2); + -- The "optimal" case: the hash table fits in memory; we plan for 1 -- batch, we stick to that number, and peak memory usage stays within -- our work_mem budget @@ -2050,6 +2054,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '4MB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join simple s using (id); +select count(*) from simple r join simple s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join simple s using (id); select count(*) from simple r join simple s using (id); @@ -2082,6 +2102,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join simple s using (id); +select count(*) from simple r join simple s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join simple s using (id); select count(*) from simple r join simple s using (id); @@ -2115,6 +2151,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join bigger_than_it_looks s using (id); +select count(*) from simple r join bigger_than_it_looks s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join bigger_than_it_looks s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '192kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); select count(*) from simple r join bigger_than_it_looks s using (id); @@ -2148,6 +2200,21 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join extremely_skewed s using (id); +select count(*) from simple r join extremely_skewed s using (id); +select * from hash_join_batches( +$$ + select count(*) from simple r join extremely_skewed s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join extremely_skewed s using (id); select count(*) from simple r join extremely_skewed s using (id); @@ -2175,11 +2242,12 @@ rollback to settings; create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; alter table foo set (parallel_workers = 0); -create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t; +create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t; alter table bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -2206,6 +2274,61 @@ rollback to settings; -- single-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '4MB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); +rollback to settings; + +-- multi-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '64kB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); +rollback to settings; + +-- single-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -2266,4 +2389,27 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- exercise special code paths for huge tuples (note use of non-strict +-- expression and left join required to get the detoasted tuple into +-- the hash table) + +-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and +-- sts_puttuple oversized tuple cases because it's multi-batch) +savepoint settings; +set max_parallel_workers_per_gather = 2; +set enable_parallel_hash = on; +set work_mem = '128kB'; +explain (costs off) + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +select length(max(s.t)) +from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +select final > 1 as multibatch + from hash_join_batches( +$$ + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +$$); +rollback to settings; + rollback; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e308e20184..a92c62adde 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1542,6 +1542,10 @@ ParallelBitmapHeapState ParallelCompletionPtr ParallelContext ParallelExecutorInfo +ParallelHashGrowth +ParallelHashJoinBatch +ParallelHashJoinBatchAccessor +ParallelHashJoinState ParallelHeapScanDesc ParallelIndexScanDesc ParallelSlot