diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 533faf060d..e4a01699e4 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3647,6 +3647,21 @@ ANY num_sync ( + enable_parallel_hash (boolean) + + enable_parallel_hash configuration parameter + + + + + Enables or disables the query planner's use of hash-join plan + types with parallel hash. Has no effect if hash-join plans are not + also enabled. The default is on. + + + + enable_partition_wise_join (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index b6f80d9708..8a9793644f 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1263,7 +1263,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting in an extension. - IPC + IPC BgWorkerShutdown Waiting for background worker to shut down. @@ -1279,6 +1279,66 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ExecuteGather Waiting for activity from child process when executing Gather node. + + Hash/Batch/Allocating + Waiting for an elected Parallel Hash participant to allocate a hash table. + + + Hash/Batch/Electing + Electing a Parallel Hash participant to allocate a hash table. + + + Hash/Batch/Loading + Waiting for other Parallel Hash participants to finish loading a hash table. + + + Hash/Build/Allocating + Waiting for an elected Parallel Hash participant to allocate the initial hash table. + + + Hash/Build/Electing + Electing a Parallel Hash participant to allocate the initial hash table. + + + Hash/Build/HashingInner + Waiting for other Parallel Hash participants to finish hashing the inner relation. + + + Hash/Build/HashingOuter + Waiting for other Parallel Hash participants to finish partitioning the outer relation. + + + Hash/GrowBatches/Allocating + Waiting for an elected Parallel Hash participant to allocate more batches. + + + Hash/GrowBatches/Deciding + Electing a Parallel Hash participant to decide on future batch growth. + + + Hash/GrowBatches/Electing + Electing a Parallel Hash participant to allocate more batches. + + + Hash/GrowBatches/Finishing + Waiting for an elected Parallel Hash participant to decide on future batch growth. + + + Hash/GrowBatches/Repartitioning + Waiting for other Parallel Hash participants to finishing repartitioning. + + + Hash/GrowBuckets/Allocating + Waiting for an elected Parallel Hash participant to finish allocating more buckets. + + + Hash/GrowBuckets/Electing + Electing a Parallel Hash participant to allocate more buckets. + + + Hash/GrowBuckets/Reinserting + Waiting for other Parallel Hash participants to finish inserting tuples into new buckets. + LogicalSyncData Waiting for logical replication remote server to send data for initial table synchronization. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 604f4f5b61..b344d4b589 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -31,6 +31,7 @@ #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeHash.h" +#include "executor/nodeHashjoin.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeSeqscan.h" @@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, e->pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinEstimate((HashJoinState *) planstate, + e->pcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashEstimate((HashState *) planstate, e->pcxt); @@ -474,6 +480,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, d->pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinInitializeDSM((HashJoinState *) planstate, + d->pcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashInitializeDSM((HashState *) planstate, d->pcxt); @@ -898,6 +909,11 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinReInitializeDSM((HashJoinState *) planstate, + pcxt); + break; case T_HashState: case T_SortState: /* these nodes have DSM state, but no reinitialization is required */ @@ -1196,6 +1212,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, pwcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinInitializeWorker((HashJoinState *) planstate, + pwcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashInitializeWorker((HashState *) planstate, pwcxt); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index fcb8b56999..699dc69179 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -770,6 +770,9 @@ ExecShutdownNode(PlanState *node) case T_HashState: ExecShutdownHash((HashState *) node); break; + case T_HashJoinState: + ExecShutdownHashJoin((HashJoinState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index afd7384e94..4284e8682a 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -10,6 +10,8 @@ * IDENTIFICATION * src/backend/executor/nodeHash.c * + * See note on parallelism in nodeHashjoin.c. + * *------------------------------------------------------------------------- */ /* @@ -25,6 +27,7 @@ #include #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/pg_statistic.h" #include "commands/tablespace.h" #include "executor/execdebug.h" @@ -32,6 +35,8 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "port/atomics.h" #include "utils/dynahash.h" #include "utils/memutils.h" #include "utils/lsyscache.h" @@ -40,6 +45,8 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); +static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -49,6 +56,30 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); static void *dense_alloc(HashJoinTable hashtable, Size size); +static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, + size_t size, + dsa_pointer *shared); +static void MultiExecPrivateHash(HashState *node); +static void MultiExecParallelHash(HashState *node); +static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, + int bucketno); +static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, + HashJoinTuple tuple); +static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared); +static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); +static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); +static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); +static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); +static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, + dsa_pointer *shared); +static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, + int batchno, + size_t size); +static void ExecParallelHashMergeCounters(HashJoinTable hashtable); +static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); + /* ---------------------------------------------------------------- * ExecHash @@ -72,6 +103,39 @@ ExecHash(PlanState *pstate) */ Node * MultiExecHash(HashState *node) +{ + /* must provide our own instrumentation support */ + if (node->ps.instrument) + InstrStartNode(node->ps.instrument); + + if (node->parallel_state != NULL) + MultiExecParallelHash(node); + else + MultiExecPrivateHash(node); + + /* must provide our own instrumentation support */ + if (node->ps.instrument) + InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); + + /* + * We do not return the hash table directly because it's not a subtype of + * Node, and so would violate the MultiExecProcNode API. Instead, our + * parent Hashjoin node is expected to know how to fish it out of our node + * state. Ugly but not really worth cleaning up, since Hashjoin knows + * quite a bit more about Hash besides that. + */ + return NULL; +} + +/* ---------------------------------------------------------------- + * MultiExecPrivateHash + * + * parallel-oblivious version, building a backend-private + * hash table and (if necessary) batch files. + * ---------------------------------------------------------------- + */ +static void +MultiExecPrivateHash(HashState *node) { PlanState *outerNode; List *hashkeys; @@ -80,10 +144,6 @@ MultiExecHash(HashState *node) ExprContext *econtext; uint32 hashvalue; - /* must provide our own instrumentation support */ - if (node->ps.instrument) - InstrStartNode(node->ps.instrument); - /* * get state info from node */ @@ -138,18 +198,147 @@ MultiExecHash(HashState *node) if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - /* must provide our own instrumentation support */ - if (node->ps.instrument) - InstrStopNode(node->ps.instrument, hashtable->totalTuples); + hashtable->partialTuples = hashtable->totalTuples; +} + +/* ---------------------------------------------------------------- + * MultiExecParallelHash + * + * parallel-aware version, building a shared hash table and + * (if necessary) batch files using the combined effort of + * a set of co-operating backends. + * ---------------------------------------------------------------- + */ +static void +MultiExecParallelHash(HashState *node) +{ + ParallelHashJoinState *pstate; + PlanState *outerNode; + List *hashkeys; + HashJoinTable hashtable; + TupleTableSlot *slot; + ExprContext *econtext; + uint32 hashvalue; + Barrier *build_barrier; + int i; /* - * We do not return the hash table directly because it's not a subtype of - * Node, and so would violate the MultiExecProcNode API. Instead, our - * parent Hashjoin node is expected to know how to fish it out of our node - * state. Ugly but not really worth cleaning up, since Hashjoin knows - * quite a bit more about Hash besides that. + * get state info from node */ - return NULL; + outerNode = outerPlanState(node); + hashtable = node->hashtable; + + /* + * set expression context + */ + hashkeys = node->hashkeys; + econtext = node->ps.ps_ExprContext; + + /* + * Synchronize the parallel hash table build. At this stage we know that + * the shared hash table has been or is being set up by + * ExecHashTableCreate(), but we don't know if our peers have returned + * from there or are here in MultiExecParallelHash(), and if so how far + * through they are. To find out, we check the build_barrier phase then + * and jump to the right step in the build algorithm. + */ + pstate = hashtable->parallel_state; + build_barrier = &pstate->build_barrier; + Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING); + switch (BarrierPhase(build_barrier)) + { + case PHJ_BUILD_ALLOCATING: + + /* + * Either I just allocated the initial hash table in + * ExecHashTableCreate(), or someone else is doing that. Either + * way, wait for everyone to arrive here so we can proceed. + */ + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING); + /* Fall through. */ + + case PHJ_BUILD_HASHING_INNER: + + /* + * It's time to begin hashing, or if we just arrived here then + * hashing is already underway, so join in that effort. While + * hashing we have to be prepared to help increase the number of + * batches or buckets at any time, and if we arrived here when + * that was already underway we'll have to help complete that work + * immediately so that it's safe to access batches and buckets + * below. + */ + if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != + PHJ_GROW_BATCHES_ELECTING) + ExecParallelHashIncreaseNumBatches(hashtable); + if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != + PHJ_GROW_BUCKETS_ELECTING) + ExecParallelHashIncreaseNumBuckets(hashtable); + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + for (;;) + { + slot = ExecProcNode(outerNode); + if (TupIsNull(slot)) + break; + econtext->ecxt_innertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, hashkeys, + false, hashtable->keepNulls, + &hashvalue)) + ExecParallelHashTableInsert(hashtable, slot, hashvalue); + hashtable->partialTuples++; + } + BarrierDetach(&pstate->grow_buckets_barrier); + BarrierDetach(&pstate->grow_batches_barrier); + + /* + * Make sure that any tuples we wrote to disk are visible to + * others before anyone tries to load them. + */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].inner_tuples); + + /* + * Update shared counters. We need an accurate total tuple count + * to control the empty table optimization. + */ + ExecParallelHashMergeCounters(hashtable); + + /* + * Wait for everyone to finish building and flushing files and + * counters. + */ + if (BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_INNER)) + { + /* + * Elect one backend to disable any further growth. Batches + * are now fixed. While building them we made sure they'd fit + * in our memory budget when we load them back in later (or we + * tried to do that and gave up because we detected extreme + * skew). + */ + pstate->growth = PHJ_GROWTH_DISABLED; + } + } + + /* + * We're not yet attached to a batch. We all agree on the dimensions and + * number of inner tuples (for the empty table optimization). + */ + hashtable->curbatch = -1; + hashtable->nbuckets = pstate->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + hashtable->totalTuples = pstate->total_tuples; + ExecParallelHashEnsureBatchAccessors(hashtable); + + /* + * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE + * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't + * there already). + */ + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); } /* ---------------------------------------------------------------- @@ -240,12 +429,15 @@ ExecEndHash(HashState *node) * ---------------------------------------------------------------- */ HashJoinTable -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) { + Hash *node; HashJoinTable hashtable; Plan *outerNode; + size_t space_allowed; int nbuckets; int nbatch; + double rows; int num_skew_mcvs; int log2_nbuckets; int nkeys; @@ -258,10 +450,22 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) * "outer" subtree of this node, but the inner relation of the hashjoin). * Compute the appropriate size of the hash table. */ + node = (Hash *) state->ps.plan; outerNode = outerPlan(node); - ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, + /* + * If this is shared hash table with a partial plan, then we can't use + * outerNode->plan_rows to estimate its size. We need an estimate of the + * total number of rows across all copies of the partial plan. + */ + rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; + + ExecChooseHashTableSize(rows, outerNode->plan_width, OidIsValid(node->skewTable), + state->parallel_state != NULL, + state->parallel_state != NULL ? + state->parallel_state->nparticipants - 1 : 0, + &space_allowed, &nbuckets, &nbatch, &num_skew_mcvs); /* nbuckets must be a power of 2 */ @@ -280,7 +484,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; hashtable->log2_nbuckets_optimal = log2_nbuckets; - hashtable->buckets = NULL; + hashtable->buckets.unshared = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; hashtable->skewBucket = NULL; @@ -293,16 +497,21 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->partialTuples = 0; hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; - hashtable->spaceAllowed = work_mem * 1024L; + hashtable->spaceAllowed = space_allowed; hashtable->spaceUsedSkew = 0; hashtable->spaceAllowedSkew = hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; hashtable->chunks = NULL; + hashtable->current_chunk = NULL; + hashtable->parallel_state = state->parallel_state; + hashtable->area = state->ps.state->es_query_dsa; + hashtable->batches = NULL; #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -351,10 +560,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); - if (nbatch > 1) + if (nbatch > 1 && hashtable->parallel_state == NULL) { /* - * allocate and initialize the file arrays in hashCxt + * allocate and initialize the file arrays in hashCxt (not needed for + * parallel case which uses shared tuplestores instead of raw files) */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); @@ -365,24 +575,78 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) PrepareTempTablespaces(); } - /* - * Prepare context for the first-scan space allocations; allocate the - * hashbucket array therein, and set each bucket "empty". - */ - MemoryContextSwitchTo(hashtable->batchCxt); - - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); - - /* - * Set up for skew optimization, if possible and there's a need for more - * than one batch. (In a one-batch join, there's no point in it.) - */ - if (nbatch > 1) - ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); - MemoryContextSwitchTo(oldcxt); + if (hashtable->parallel_state) + { + ParallelHashJoinState *pstate = hashtable->parallel_state; + Barrier *build_barrier; + + /* + * Attach to the build barrier. The corresponding detach operation is + * in ExecHashTableDetach. Note that we won't attach to the + * batch_barrier for batch 0 yet. We'll attach later and start it out + * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front + * and then loaded while hashing (the standard hybrid hash join + * algorithm), and we'll coordinate that using build_barrier. + */ + build_barrier = &pstate->build_barrier; + BarrierAttach(build_barrier); + + /* + * So far we have no idea whether there are any other participants, + * and if so, what phase they are working on. The only thing we care + * about at this point is whether someone has already created the + * SharedHashJoinBatch objects and the hash table for batch 0. One + * backend will be elected to do that now if necessary. + */ + if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING && + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING)) + { + pstate->nbatch = nbatch; + pstate->space_allowed = space_allowed; + pstate->growth = PHJ_GROWTH_OK; + + /* Set up the shared state for coordinating batches. */ + ExecParallelHashJoinSetUpBatches(hashtable, nbatch); + + /* + * Allocate batch 0's hash table up front so we can load it + * directly while hashing. + */ + pstate->nbuckets = nbuckets; + ExecParallelHashTableAlloc(hashtable, 0); + } + + /* + * The next Parallel Hash synchronization point is in + * MultiExecParallelHash(), which will progress it all the way to + * PHJ_BUILD_DONE. The caller must not return control from this + * executor node between now and then. + */ + } + else + { + /* + * Prepare context for the first-scan space allocations; allocate the + * hashbucket array therein, and set each bucket "empty". + */ + MemoryContextSwitchTo(hashtable->batchCxt); + + hashtable->buckets.unshared = (HashJoinTuple *) + palloc0(nbuckets * sizeof(HashJoinTuple)); + + /* + * Set up for skew optimization, if possible and there's a need for + * more than one batch. (In a one-batch join, there's no point in + * it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + + MemoryContextSwitchTo(oldcxt); + } + return hashtable; } @@ -399,6 +663,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool try_combined_work_mem, + int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs) @@ -433,6 +700,16 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, */ hash_table_bytes = work_mem * 1024L; + /* + * Parallel Hash tries to use the combined work_mem of all workers to + * avoid the need to batch. If that won't work, it falls back to work_mem + * per worker and tries to process batches in parallel. + */ + if (try_combined_work_mem) + hash_table_bytes += hash_table_bytes * parallel_workers; + + *space_allowed = hash_table_bytes; + /* * If skew optimization is possible, estimate the number of skew buckets * that will fit in the memory allowed, and decrement the assumed space @@ -478,7 +755,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * Note that both nbuckets and nbatch must be powers of 2 to make * ExecHashGetBucketAndBatch fast. */ - max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple); + max_pointers = *space_allowed / sizeof(HashJoinTuple); max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); /* If max_pointers isn't a power of 2, must round it down to one */ mppow2 = 1L << my_log2(max_pointers); @@ -510,6 +787,21 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, int minbatch; long bucket_size; + /* + * If Parallel Hash with combined work_mem would still need multiple + * batches, we'll have to fall back to regular work_mem budget. + */ + if (try_combined_work_mem) + { + ExecChooseHashTableSize(ntuples, tupwidth, useskew, + false, parallel_workers, + space_allowed, + numbuckets, + numbatches, + num_skew_mcvs); + return; + } + /* * Estimate the number of buckets we'll want to have when work_mem is * entirely full. Each bucket will contain a bucket pointer plus @@ -564,14 +856,17 @@ ExecHashTableDestroy(HashJoinTable hashtable) /* * Make sure all the temp files are closed. We skip batch 0, since it * can't have any temp files (and the arrays might not even exist if - * nbatch is only 1). + * nbatch is only 1). Parallel hash joins don't use these files. */ - for (i = 1; i < hashtable->nbatch; i++) + if (hashtable->innerBatchFile != NULL) { - if (hashtable->innerBatchFile[i]) - BufFileClose(hashtable->innerBatchFile[i]); - if (hashtable->outerBatchFile[i]) - BufFileClose(hashtable->outerBatchFile[i]); + for (i = 1; i < hashtable->nbatch; i++) + { + if (hashtable->innerBatchFile[i]) + BufFileClose(hashtable->innerBatchFile[i]); + if (hashtable->outerBatchFile[i]) + BufFileClose(hashtable->outerBatchFile[i]); + } } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -657,8 +952,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable->nbuckets = hashtable->nbuckets_optimal; hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; - hashtable->buckets = repalloc(hashtable->buckets, - sizeof(HashJoinTuple) * hashtable->nbuckets); + hashtable->buckets.unshared = + repalloc(hashtable->buckets.unshared, + sizeof(HashJoinTuple) * hashtable->nbuckets); } /* @@ -666,14 +962,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) * buckets now and not have to keep track which tuples in the buckets have * already been processed. We will free the old chunks as we go. */ - memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); + memset(hashtable->buckets.unshared, 0, + sizeof(HashJoinTuple) * hashtable->nbuckets); oldchunks = hashtable->chunks; hashtable->chunks = NULL; /* so, let's scan through the old chunks, and all tuples in each chunk */ while (oldchunks != NULL) { - HashMemoryChunk nextchunk = oldchunks->next; + HashMemoryChunk nextchunk = oldchunks->next.unshared; /* position within the buffer (up to oldchunks->used) */ size_t idx = 0; @@ -700,8 +997,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, hashTupleSize); /* and add it back to the appropriate bucket */ - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = copyTuple; } else { @@ -750,6 +1047,380 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } } +/* + * ExecParallelHashIncreaseNumBatches + * Every participant attached to grow_barrier must run this function + * when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. + */ +static void +ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * It's unlikely, but we need to be prepared for new participants to show + * up while we're in the middle of this operation so we need to switch on + * barrier phase here. + */ + switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) + { + case PHJ_GROW_BATCHES_ELECTING: + + /* + * Elect one participant to prepare to grow the number of batches. + * This involves reallocating or resetting the buckets of batch 0 + * in preparation for all participants to begin repartitioning the + * tuples. + */ + if (BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_ELECTING)) + { + dsa_pointer_atomic *buckets; + ParallelHashJoinBatch *old_batch0; + int new_nbatch; + int i; + + /* Move the old batch out of the way. */ + old_batch0 = hashtable->batches[0].shared; + pstate->old_batches = pstate->batches; + pstate->old_nbatch = hashtable->nbatch; + pstate->batches = InvalidDsaPointer; + + /* Free this backend's old accessors. */ + ExecParallelHashCloseBatchAccessors(hashtable); + + /* Figure out how many batches to use. */ + if (hashtable->nbatch == 1) + { + /* + * We are going from single-batch to multi-batch. We need + * to switch from one large combined memory budget to the + * regular work_mem budget. + */ + pstate->space_allowed = work_mem * 1024L; + + /* + * The combined work_mem of all participants wasn't + * enough. Therefore one batch per participant would be + * approximately equivalent and would probably also be + * insufficient. So try two batches per particiant, + * rounded up to a power of two. + */ + new_nbatch = 1 << my_log2(pstate->nparticipants * 2); + } + else + { + /* + * We were already multi-batched. Try doubling the number + * of batches. + */ + new_nbatch = hashtable->nbatch * 2; + } + + /* Allocate new larger generation of batches. */ + Assert(hashtable->nbatch == pstate->nbatch); + ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); + Assert(hashtable->nbatch == pstate->nbatch); + + /* Replace or recycle batch 0's bucket array. */ + if (pstate->old_nbatch == 1) + { + double dtuples; + double dbuckets; + int new_nbuckets; + + /* + * We probably also need a smaller bucket array. How many + * tuples do we expect per batch, assuming we have only + * half of them so far? Normally we don't need to change + * the bucket array's size, because the size of each batch + * stays the same as we add more batches, but in this + * special case we move from a large batch to many smaller + * batches and it would be wasteful to keep the large + * array. + */ + dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; + dbuckets = ceil(dtuples / NTUP_PER_BUCKET); + dbuckets = Min(dbuckets, + MaxAllocSize / sizeof(dsa_pointer_atomic)); + new_nbuckets = (int) dbuckets; + new_nbuckets = Max(new_nbuckets, 1024); + new_nbuckets = 1 << my_log2(new_nbuckets); + dsa_free(hashtable->area, old_batch0->buckets); + hashtable->batches[0].shared->buckets = + dsa_allocate(hashtable->area, + sizeof(dsa_pointer_atomic) * new_nbuckets); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[0].shared->buckets); + for (i = 0; i < new_nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); + pstate->nbuckets = new_nbuckets; + } + else + { + /* Recycle the existing bucket array. */ + hashtable->batches[0].shared->buckets = old_batch0->buckets; + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, old_batch0->buckets); + for (i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); + } + + /* Move all chunks to the work queue for parallel processing. */ + pstate->chunk_work_queue = old_batch0->chunks; + + /* Disable further growth temporarily while we're growing. */ + pstate->growth = PHJ_GROWTH_DISABLED; + } + else + { + /* All other participants just flush their tuples to disk. */ + ExecParallelHashCloseBatchAccessors(hashtable); + } + /* Fall through. */ + + case PHJ_GROW_BATCHES_ALLOCATING: + /* Wait for the above to be finished. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING); + /* Fall through. */ + + case PHJ_GROW_BATCHES_REPARTITIONING: + /* Make sure that we have the current dimensions and buckets. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + /* Then partition, flush counters. */ + ExecParallelHashRepartitionFirst(hashtable); + ExecParallelHashRepartitionRest(hashtable); + ExecParallelHashMergeCounters(hashtable); + /* Wait for the above to be finished. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING); + /* Fall through. */ + + case PHJ_GROW_BATCHES_DECIDING: + + /* + * Elect one participant to clean up and decide whether further + * repartitioning is needed, or should be disabled because it's + * not helping. + */ + if (BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_DECIDING)) + { + bool space_exhausted = false; + bool extreme_skew_detected = false; + + /* Make sure that we have the current dimensions and buckets. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + + /* Are any of the new generation of batches exhausted? */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatch *batch = hashtable->batches[i].shared; + + if (batch->space_exhausted || + batch->estimated_size > pstate->space_allowed) + { + int parent; + + space_exhausted = true; + + /* + * Did this batch receive ALL of the tuples from its + * parent batch? That would indicate that further + * repartitioning isn't going to help (the hash values + * are probably all the same). + */ + parent = i % pstate->old_nbatch; + if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) + extreme_skew_detected = true; + } + } + + /* Don't keep growing if it's not helping or we'd overflow. */ + if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) + pstate->growth = PHJ_GROWTH_DISABLED; + else if (space_exhausted) + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + else + pstate->growth = PHJ_GROWTH_OK; + + /* Free the old batches in shared memory. */ + dsa_free(hashtable->area, pstate->old_batches); + pstate->old_batches = InvalidDsaPointer; + } + /* Fall through. */ + + case PHJ_GROW_BATCHES_FINISHING: + /* Wait for the above to complete. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_FINISHING); + } +} + +/* + * Repartition the tuples currently loaded into memory for inner batch 0 + * because the number of batches has been increased. Some tuples are retained + * in memory and some are written out to a later batch. + */ +static void +ExecParallelHashRepartitionFirst(HashJoinTable hashtable) +{ + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + + Assert(hashtable->nbatch = hashtable->parallel_state->nbatch); + + while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) + { + size_t idx = 0; + + /* Repartition all tuples in this chunk. */ + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + HashJoinTuple copyTuple; + dsa_pointer shared; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + Assert(batchno < hashtable->nbatch); + if (batchno == 0) + { + /* It still belongs in batch 0. Copy to a new chunk. */ + copyTuple = + ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + copyTuple->hashvalue = hashTuple->hashvalue; + memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + copyTuple, shared); + } + else + { + size_t tuple_size = + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + /* It belongs in a later batch. */ + hashtable->batches[batchno].estimated_size += tuple_size; + sts_puttuple(hashtable->batches[batchno].inner_tuples, + &hashTuple->hashvalue, tuple); + } + + /* Count this tuple. */ + ++hashtable->batches[0].old_ntuples; + ++hashtable->batches[batchno].ntuples; + + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + + /* Free this chunk. */ + dsa_free(hashtable->area, chunk_shared); + + CHECK_FOR_INTERRUPTS(); + } +} + +/* + * Help repartition inner batches 1..n. + */ +static void +ExecParallelHashRepartitionRest(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int old_nbatch = pstate->old_nbatch; + SharedTuplestoreAccessor **old_inner_tuples; + ParallelHashJoinBatch *old_batches; + int i; + + /* Get our hands on the previous generation of batches. */ + old_batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->old_batches); + old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch); + for (i = 1; i < old_nbatch; ++i) + { + ParallelHashJoinBatch *shared = + NthParallelHashJoinBatch(old_batches, i); + + old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + } + + /* Join in the effort to repartition them. */ + for (i = 1; i < old_nbatch; ++i) + { + MinimalTuple tuple; + uint32 hashvalue; + + /* Scan one partition from the previous generation. */ + sts_begin_parallel_scan(old_inner_tuples[i]); + while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) + { + size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + int bucketno; + int batchno; + + /* Decide which partition it goes to in the new generation. */ + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + + hashtable->batches[batchno].estimated_size += tuple_size; + ++hashtable->batches[batchno].ntuples; + ++hashtable->batches[i].old_ntuples; + + /* Store the tuple its new batch. */ + sts_puttuple(hashtable->batches[batchno].inner_tuples, + &hashvalue, tuple); + + CHECK_FOR_INTERRUPTS(); + } + sts_end_parallel_scan(old_inner_tuples[i]); + } + + pfree(old_inner_tuples); +} + +/* + * Transfer the backend-local per-batch counters to the shared totals. + */ +static void +ExecParallelHashMergeCounters(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + pstate->total_tuples = 0; + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; + + batch->shared->size += batch->size; + batch->shared->estimated_size += batch->estimated_size; + batch->shared->ntuples += batch->ntuples; + batch->shared->old_ntuples += batch->old_ntuples; + batch->size = 0; + batch->estimated_size = 0; + batch->ntuples = 0; + batch->old_ntuples = 0; + pstate->total_tuples += batch->shared->ntuples; + } + LWLockRelease(&pstate->lock); +} + /* * ExecHashIncreaseNumBuckets * increase the original number of buckets in order to reduce @@ -782,14 +1453,15 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) * ExecHashIncreaseNumBatches, but without all the copying into new * chunks) */ - hashtable->buckets = - (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->buckets.unshared = + (HashJoinTuple *) repalloc(hashtable->buckets.unshared, hashtable->nbuckets * sizeof(HashJoinTuple)); - memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple)); + memset(hashtable->buckets.unshared, 0, + hashtable->nbuckets * sizeof(HashJoinTuple)); /* scan through all tuples in all chunks to rebuild the hash table */ - for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) + for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) { /* process all tuples stored in this chunk */ size_t idx = 0; @@ -804,8 +1476,8 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) &bucketno, &batchno); /* add the tuple to the proper bucket */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = hashTuple; /* advance index past the tuple */ idx += MAXALIGN(HJTUPLE_OVERHEAD + @@ -817,6 +1489,93 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) } } +static void +ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + HashMemoryChunk chunk; + dsa_pointer chunk_s; + + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * It's unlikely, but we need to be prepared for new participants to show + * up while we're in the middle of this operation so we need to switch on + * barrier phase here. + */ + switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) + { + case PHJ_GROW_BUCKETS_ELECTING: + /* Elect one participant to prepare to increase nbuckets. */ + if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING)) + { + size_t size; + dsa_pointer_atomic *buckets; + + /* Double the size of the bucket array. */ + pstate->nbuckets *= 2; + size = pstate->nbuckets * sizeof(dsa_pointer_atomic); + hashtable->batches[0].shared->size += size / 2; + dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); + hashtable->batches[0].shared->buckets = + dsa_allocate(hashtable->area, size); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[0].shared->buckets); + for (i = 0; i < pstate->nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); + + /* Put the chunk list onto the work queue. */ + pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; + + /* Clear the flag. */ + pstate->growth = PHJ_GROWTH_OK; + } + /* Fall through. */ + + case PHJ_GROW_BUCKETS_ALLOCATING: + /* Wait for the above to complete. */ + BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING); + /* Fall through. */ + + case PHJ_GROW_BUCKETS_REINSERTING: + /* Reinsert all tuples into the hash table. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) + { + size_t idx = 0; + + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + Assert(batchno == 0); + + /* add the tuple to the proper bucket */ + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); + + /* advance index past the tuple */ + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING); + } +} /* * ExecHashTableInsert @@ -869,8 +1628,8 @@ ExecHashTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = hashTuple; /* * Increase the (optimal) number of buckets if we just exceeded the @@ -910,6 +1669,94 @@ ExecHashTableInsert(HashJoinTable hashtable, } } +/* + * ExecHashTableParallelInsert + * insert a tuple into a shared hash table or shared batch tuplestore + */ +void +ExecParallelHashTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + dsa_pointer shared; + int bucketno; + int batchno; + +retry: + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + + if (batchno == 0) + { + HashJoinTuple hashTuple; + + /* Try to load it into memory. */ + Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == + PHJ_BUILD_HASHING_INNER); + hashTuple = ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + if (hashTuple == NULL) + goto retry; + + /* Store the hash value in the HashJoinTuple header. */ + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + + /* Push it onto the front of the bucket's list */ + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); + } + else + { + size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + Assert(batchno > 0); + + /* Try to preallocate space in the batch if necessary. */ + if (hashtable->batches[batchno].preallocated < tuple_size) + { + if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) + goto retry; + } + + Assert(hashtable->batches[batchno].preallocated >= tuple_size); + hashtable->batches[batchno].preallocated -= tuple_size; + sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, + tuple); + } + ++hashtable->batches[batchno].ntuples; +} + +/* + * Insert a tuple into the current hash table. Unlike + * ExecParallelHashTableInsert, this version is not prepared to send the tuple + * to other batches or to run out of memory, and should only be called with + * tuples that belong in the current batch once growth has been disabled. + */ +void +ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + HashJoinTuple hashTuple; + dsa_pointer shared; + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + Assert(batchno == hashtable->curbatch); + hashTuple = ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); +} + /* * ExecHashGetHashValue * Compute the hash value for a tuple @@ -1076,11 +1923,11 @@ ExecScanHashBucket(HashJoinState *hjstate, * otherwise scan the standard hashtable bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; else - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; while (hashTuple != NULL) { @@ -1104,7 +1951,67 @@ ExecScanHashBucket(HashJoinState *hjstate, } } - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; + } + + /* + * no match + */ + return false; +} + +/* + * ExecParallelScanHashBucket + * scan a hash bucket for matches to the current outer tuple + * + * The current outer tuple must be stored in econtext->ecxt_outertuple. + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashBucket(HashJoinState *hjstate, + ExprContext *econtext) +{ + ExprState *hjclauses = hjstate->hashclauses; + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + uint32 hashvalue = hjstate->hj_CurHashValue; + + /* + * hj_CurTuple is the address of the tuple last returned from the current + * bucket, or NULL if it's time to start scanning a new bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo); + + while (hashTuple != NULL) + { + if (hashTuple->hashvalue == hashvalue) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot so ExecQual sees it */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* reset temp memory each time to avoid leaks from qual expr */ + ResetExprContext(econtext); + + if (ExecQual(hjclauses, econtext)) + { + hjstate->hj_CurTuple = hashTuple; + return true; + } + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); } /* @@ -1155,10 +2062,10 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) * bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) { - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; hjstate->hj_CurBucketNo++; } else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) @@ -1194,7 +2101,7 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return true; } - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; } /* allow this loop to be cancellable */ @@ -1226,7 +2133,7 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) + hashtable->buckets.unshared = (HashJoinTuple *) palloc0(nbuckets * sizeof(HashJoinTuple)); hashtable->spaceUsed = 0; @@ -1250,7 +2157,8 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) /* Reset all flags in the main table ... */ for (i = 0; i < hashtable->nbuckets; i++) { - for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next) + for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; + tuple = tuple->next.unshared) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } @@ -1260,7 +2168,7 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) int j = hashtable->skewBucketNums[i]; HashSkewBucket *skewBucket = hashtable->skewBucket[j]; - for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next) + for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } } @@ -1505,8 +2413,9 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the skew bucket's list */ - hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; + hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; hashtable->skewBucket[bucketNumber]->tuples = hashTuple; + Assert(hashTuple != hashTuple->next.unshared); /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; @@ -1554,7 +2463,7 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) hashTuple = bucket->tuples; while (hashTuple != NULL) { - HashJoinTuple nextHashTuple = hashTuple->next; + HashJoinTuple nextHashTuple = hashTuple->next.unshared; MinimalTuple tuple; Size tupleSize; @@ -1580,8 +2489,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, tupleSize); pfree(hashTuple); - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = copyTuple; /* We have reduced skew space, but overall space doesn't change */ hashtable->spaceUsedSkew -= tupleSize; @@ -1760,11 +2669,11 @@ dense_alloc(HashJoinTable hashtable, Size size) if (hashtable->chunks != NULL) { newChunk->next = hashtable->chunks->next; - hashtable->chunks->next = newChunk; + hashtable->chunks->next.unshared = newChunk; } else { - newChunk->next = hashtable->chunks; + newChunk->next.unshared = hashtable->chunks; hashtable->chunks = newChunk; } @@ -1789,7 +2698,7 @@ dense_alloc(HashJoinTable hashtable, Size size) newChunk->used = size; newChunk->ntuples = 1; - newChunk->next = hashtable->chunks; + newChunk->next.unshared = hashtable->chunks; hashtable->chunks = newChunk; return newChunk->data; @@ -1803,3 +2712,601 @@ dense_alloc(HashJoinTable hashtable, Size size) /* return pointer to the start of the tuple memory */ return ptr; } + +/* + * Allocate space for a tuple in shared dense storage. This is equivalent to + * dense_alloc but for Parallel Hash using shared memory. + * + * While loading a tuple into shared memory, we might run out of memory and + * decide to repartition, or determine that the load factor is too high and + * decide to expand the bucket array, or discover that another participant has + * commanded us to help do that. Return NULL if number of buckets or batches + * has changed, indicating that the caller must retry (considering the + * possibility that the tuple no longer belongs in the same batch). + */ +static HashJoinTuple +ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, + dsa_pointer *shared) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + Size chunk_size; + HashJoinTuple result; + int curbatch = hashtable->curbatch; + + size = MAXALIGN(size); + + /* + * Fast path: if there is enough space in this backend's current chunk, + * then we can allocate without any locking. + */ + chunk = hashtable->current_chunk; + if (chunk != NULL && + size < HASH_CHUNK_THRESHOLD && + chunk->maxlen - chunk->used >= size) + { + + chunk_shared = hashtable->current_chunk_shared; + Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); + *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; + result = (HashJoinTuple) (chunk->data + chunk->used); + chunk->used += size; + + Assert(chunk->used <= chunk->maxlen); + Assert(result == dsa_get_address(hashtable->area, *shared)); + + return result; + } + + /* Slow path: try to allocate a new chunk. */ + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + + /* + * Check if we need to help increase the number of buckets or batches. + */ + if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || + pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + { + ParallelHashGrowth growth = pstate->growth; + + hashtable->current_chunk = NULL; + LWLockRelease(&pstate->lock); + + /* Another participant has commanded us to help grow. */ + if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) + ExecParallelHashIncreaseNumBatches(hashtable); + else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + ExecParallelHashIncreaseNumBuckets(hashtable); + + /* The caller must retry. */ + return NULL; + } + + /* Oversized tuples get their own chunk. */ + if (size > HASH_CHUNK_THRESHOLD) + chunk_size = size + HASH_CHUNK_HEADER_SIZE; + else + chunk_size = HASH_CHUNK_SIZE; + + /* Check if it's time to grow batches or buckets. */ + if (pstate->growth != PHJ_GROWTH_DISABLED) + { + Assert(curbatch == 0); + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * Check if our space limit would be exceeded. To avoid choking on + * very large tuples or very low work_mem setting, we'll always allow + * each backend to allocate at least one chunk. + */ + if (hashtable->batches[0].at_least_one_chunk && + hashtable->batches[0].shared->size + + chunk_size > pstate->space_allowed) + { + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + hashtable->batches[0].shared->space_exhausted = true; + LWLockRelease(&pstate->lock); + + return NULL; + } + + /* Check if our load factor limit would be exceeded. */ + if (hashtable->nbatch == 1) + { + hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; + hashtable->batches[0].ntuples = 0; + if (hashtable->batches[0].shared->ntuples + 1 > + hashtable->nbuckets * NTUP_PER_BUCKET && + hashtable->nbuckets < (INT_MAX / 2)) + { + pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; + LWLockRelease(&pstate->lock); + + return NULL; + } + } + } + + /* We are cleared to allocate a new chunk. */ + chunk_shared = dsa_allocate(hashtable->area, chunk_size); + hashtable->batches[curbatch].shared->size += chunk_size; + hashtable->batches[curbatch].at_least_one_chunk = true; + + /* Set up the chunk. */ + chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); + *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; + chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; + chunk->used = size; + + /* + * Push it onto the list of chunks, so that it can be found if we need to + * increase the number of buckets or batches (batch 0 only) and later for + * freeing the memory (all batches). + */ + chunk->next.shared = hashtable->batches[curbatch].shared->chunks; + hashtable->batches[curbatch].shared->chunks = chunk_shared; + + if (size <= HASH_CHUNK_THRESHOLD) + { + /* + * Make this the current chunk so that we can use the fast path to + * fill the rest of it up in future calls. + */ + hashtable->current_chunk = chunk; + hashtable->current_chunk_shared = chunk_shared; + } + LWLockRelease(&pstate->lock); + + Assert(chunk->data == dsa_get_address(hashtable->area, *shared)); + result = (HashJoinTuple) chunk->data; + + return result; +} + +/* + * One backend needs to set up the shared batch state including tuplestores. + * Other backends will ensure they have correctly configured accessors by + * called ExecParallelHashEnsureBatchAccessors(). + */ +static void +ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatch *batches; + MemoryContext oldcxt; + int i; + + Assert(hashtable->batches == NULL); + + /* Allocate space. */ + pstate->batches = + dsa_allocate0(hashtable->area, + EstimateParallelHashJoinBatch(hashtable) * nbatch); + pstate->nbatch = nbatch; + batches = dsa_get_address(hashtable->area, pstate->batches); + + /* Use hash join memory context. */ + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + + /* Allocate this backend's accessor array. */ + hashtable->nbatch = nbatch; + hashtable->batches = (ParallelHashJoinBatchAccessor *) + palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); + + /* Set up the shared state, tuplestores and backend-local accessors. */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; + ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + char name[MAXPGPATH]; + + /* + * All members of shared were zero-initialized. We just need to set + * up the Barrier. + */ + BarrierInit(&shared->batch_barrier, 0); + if (i == 0) + { + /* Batch 0 doesn't need to be loaded. */ + BarrierAttach(&shared->batch_barrier); + while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING) + BarrierArriveAndWait(&shared->batch_barrier, 0); + BarrierDetach(&shared->batch_barrier); + } + + /* Initialize accessor state. All members were zero-initialized. */ + accessor->shared = shared; + + /* Initialize the shared tuplestores. */ + snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch); + accessor->inner_tuples = + sts_initialize(ParallelHashJoinBatchInner(shared), + pstate->nparticipants, + ParallelWorkerNumber + 1, + sizeof(uint32), + SHARED_TUPLESTORE_SINGLE_PASS, + &pstate->fileset, + name); + snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch); + accessor->outer_tuples = + sts_initialize(ParallelHashJoinBatchOuter(shared, + pstate->nparticipants), + pstate->nparticipants, + ParallelWorkerNumber + 1, + sizeof(uint32), + SHARED_TUPLESTORE_SINGLE_PASS, + &pstate->fileset, + name); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Free the current set of ParallelHashJoinBatchAccessor objects. + */ +static void +ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) +{ + int i; + + for (i = 0; i < hashtable->nbatch; ++i) + { + /* Make sure no files are left open. */ + sts_end_write(hashtable->batches[i].inner_tuples); + sts_end_write(hashtable->batches[i].outer_tuples); + sts_end_parallel_scan(hashtable->batches[i].inner_tuples); + sts_end_parallel_scan(hashtable->batches[i].outer_tuples); + } + pfree(hashtable->batches); + hashtable->batches = NULL; +} + +/* + * Make sure this backend has up-to-date accessors for the current set of + * batches. + */ +static void +ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatch *batches; + MemoryContext oldcxt; + int i; + + if (hashtable->batches != NULL) + { + if (hashtable->nbatch == pstate->nbatch) + return; + ExecParallelHashCloseBatchAccessors(hashtable); + } + + /* + * It's possible for a backend to start up very late so that the whole + * join is finished and the shm state for tracking batches has already + * been freed by ExecHashTableDetach(). In that case we'll just leave + * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives + * up early. + */ + if (!DsaPointerIsValid(pstate->batches)) + return; + + /* Use hash join memory context. */ + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + + /* Allocate this backend's accessor array. */ + hashtable->nbatch = pstate->nbatch; + hashtable->batches = (ParallelHashJoinBatchAccessor *) + palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); + + /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ + batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->batches); + + /* Set up the accessor array and attach to the tuplestores. */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; + ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + + accessor->shared = shared; + accessor->preallocated = 0; + accessor->done = false; + accessor->inner_tuples = + sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + accessor->outer_tuples = + sts_attach(ParallelHashJoinBatchOuter(shared, + pstate->nparticipants), + ParallelWorkerNumber + 1, + &pstate->fileset); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Allocate an empty shared memory hash table for a given batch. + */ +void +ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) +{ + ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; + dsa_pointer_atomic *buckets; + int nbuckets = hashtable->parallel_state->nbuckets; + int i; + + batch->buckets = + dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, batch->buckets); + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); +} + +/* + * If we are currently attached to a shared hash join batch, detach. If we + * are last to detach, clean up. + */ +void +ExecHashTableDetachBatch(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* Detach from the batch we were last working on. */ + if (BarrierArriveAndDetach(&batch->batch_barrier)) + { + /* + * Technically we shouldn't access the barrier because we're no + * longer attached, but since there is no way it's moving after + * this point it seems safe to make the following assertion. + */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); + + /* Free shared chunks and buckets. */ + while (DsaPointerIsValid(batch->chunks)) + { + HashMemoryChunk chunk = + dsa_get_address(hashtable->area, batch->chunks); + dsa_pointer next = chunk->next.shared; + + dsa_free(hashtable->area, batch->chunks); + batch->chunks = next; + } + if (DsaPointerIsValid(batch->buckets)) + { + dsa_free(hashtable->area, batch->buckets); + batch->buckets = InvalidDsaPointer; + } + } + ExecParallelHashUpdateSpacePeak(hashtable, curbatch); + /* Remember that we are not attached to a batch. */ + hashtable->curbatch = -1; + } +} + +/* + * Detach from all shared resources. If we are last to detach, clean up. + */ +void +ExecHashTableDetach(HashJoinTable hashtable) +{ + if (hashtable->parallel_state) + { + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + /* Make sure any temporary files are closed. */ + if (hashtable->batches) + { + for (i = 0; i < hashtable->nbatch; ++i) + { + sts_end_write(hashtable->batches[i].inner_tuples); + sts_end_write(hashtable->batches[i].outer_tuples); + sts_end_parallel_scan(hashtable->batches[i].inner_tuples); + sts_end_parallel_scan(hashtable->batches[i].outer_tuples); + } + } + + /* If we're last to detach, clean up shared memory. */ + if (BarrierDetach(&pstate->build_barrier)) + { + if (DsaPointerIsValid(pstate->batches)) + { + dsa_free(hashtable->area, pstate->batches); + pstate->batches = InvalidDsaPointer; + } + } + + hashtable->parallel_state = NULL; + } +} + +/* + * Get the first tuple in a given bucket identified by number. + */ +static inline HashJoinTuple +ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) +{ + HashJoinTuple tuple; + dsa_pointer p; + + Assert(hashtable->parallel_state); + p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); + tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); + + return tuple; +} + +/* + * Get the next tuple in the same bucket as 'tuple'. + */ +static inline HashJoinTuple +ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) +{ + HashJoinTuple next; + + Assert(hashtable->parallel_state); + next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); + + return next; +} + +/* + * Insert a tuple at the front of a chain of tuples in DSA memory atomically. + */ +static inline void +ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared) +{ + for (;;) + { + tuple->next.shared = dsa_pointer_atomic_read(head); + if (dsa_pointer_atomic_compare_exchange(head, + &tuple->next.shared, + tuple_shared)) + break; + } +} + +/* + * Prepare to work on a given batch. + */ +void +ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) +{ + Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); + + hashtable->curbatch = batchno; + hashtable->buckets.shared = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[batchno].shared->buckets); + hashtable->nbuckets = hashtable->parallel_state->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + hashtable->current_chunk = NULL; + hashtable->current_chunk_shared = InvalidDsaPointer; + hashtable->batches[batchno].at_least_one_chunk = false; +} + +/* + * Take the next available chunk from the queue of chunks being worked on in + * parallel. Return NULL if there are none left. Otherwise return a pointer + * to the chunk, and set *shared to the DSA pointer to the chunk. + */ +static HashMemoryChunk +ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + HashMemoryChunk chunk; + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + if (DsaPointerIsValid(pstate->chunk_work_queue)) + { + *shared = pstate->chunk_work_queue; + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, *shared); + pstate->chunk_work_queue = chunk->next.shared; + } + else + chunk = NULL; + LWLockRelease(&pstate->lock); + + return chunk; +} + +/* + * Increase the space preallocated in this backend for a given inner batch by + * at least a given amount. This allows us to track whether a given batch + * would fit in memory when loaded back in. Also increase the number of + * batches or buckets if required. + * + * This maintains a running estimation of how much space will be taken when we + * load the batch back into memory by simulating the way chunks will be handed + * out to workers. It's not perfectly accurate because the tuples will be + * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but + * it should be pretty close. It tends to overestimate by a fraction of a + * chunk per worker since all workers gang up to preallocate during hashing, + * but workers tend to reload batches alone if there are enough to go around, + * leaving fewer partially filled chunks. This effect is bounded by + * nparticipants. + * + * Return false if the number of batches or buckets has changed, and the + * caller should reconsider which batch a given tuple now belongs in and call + * again. + */ +static bool +ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; + size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); + + Assert(batchno > 0); + Assert(batchno < hashtable->nbatch); + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + + /* Has another participant commanded us to help grow? */ + if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || + pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + { + ParallelHashGrowth growth = pstate->growth; + + LWLockRelease(&pstate->lock); + if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) + ExecParallelHashIncreaseNumBatches(hashtable); + else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + ExecParallelHashIncreaseNumBuckets(hashtable); + + return false; + } + + if (pstate->growth != PHJ_GROWTH_DISABLED && + batch->at_least_one_chunk && + (batch->shared->estimated_size + size > pstate->space_allowed)) + { + /* + * We have determined that this batch would exceed the space budget if + * loaded into memory. Command all participants to help repartition. + */ + batch->shared->space_exhausted = true; + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + LWLockRelease(&pstate->lock); + + return false; + } + + batch->at_least_one_chunk = true; + batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; + batch->preallocated = want; + LWLockRelease(&pstate->lock); + + return true; +} + +/* + * Update this backend's copy of hashtable->spacePeak to account for a given + * batch. This is called at the end of hashing for batch 0, and then for each + * batch when it is done or discovered to be already done. The result is used + * for EXPLAIN output. + */ +void +ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno) +{ + size_t size; + + size = hashtable->batches[batchno].shared->size; + size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets; + hashtable->spacePeak = Max(hashtable->spacePeak, size); +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ab1632cc13..5d1dc1f401 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -10,18 +10,112 @@ * IDENTIFICATION * src/backend/executor/nodeHashjoin.c * + * PARALLELISM + * + * Hash joins can participate in parallel query execution in several ways. A + * parallel-oblivious hash join is one where the node is unaware that it is + * part of a parallel plan. In this case, a copy of the inner plan is used to + * build a copy of the hash table in every backend, and the outer plan could + * either be built from a partial or complete path, so that the results of the + * hash join are correspondingly either partial or complete. A parallel-aware + * hash join is one that behaves differently, coordinating work between + * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel + * Hash Join always appears with a Parallel Hash node. + * + * Parallel-aware hash joins use the same per-backend state machine to track + * progress through the hash join algorithm as parallel-oblivious hash joins. + * In a parallel-aware hash join, there is also a shared state machine that + * co-operating backends use to synchronize their local state machines and + * program counters. The shared state machine is managed with a Barrier IPC + * primitive. When all attached participants arrive at a barrier, the phase + * advances and all waiting participants are released. + * + * When a participant begins working on a parallel hash join, it must first + * figure out how much progress has already been made, because participants + * don't wait for each other to begin. For this reason there are switch + * statements at key points in the code where we have to synchronize our local + * state machine with the phase, and then jump to the correct part of the + * algorithm so that we can get started. + * + * One barrier called build_barrier is used to coordinate the hashing phases. + * The phase is represented by an integer which begins at zero and increments + * one by one, but in the code it is referred to by symbolic names as follows: + * + * PHJ_BUILD_ELECTING -- initial state + * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 + * PHJ_BUILD_HASHING_INNER -- all hash the inner rel + * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer + * PHJ_BUILD_DONE -- building done, probing can begin + * + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may + * be used repeatedly as required to coordinate expansions in the number of + * batches or buckets. Their phases are as follows: + * + * PHJ_GROW_BATCHES_ELECTING -- initial state + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches + * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition + * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew + * + * PHJ_GROW_BUCKETS_ELECTING -- initial state + * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets + * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples + * + * If the planner got the number of batches and buckets right, those won't be + * necessary, but on the other hand we might finish up needing to expand the + * buckets or batches multiple times while hashing the inner relation to stay + * within our memory budget and load factor target. For that reason it's a + * separate pair of barriers using circular phases. + * + * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins, + * because we need to divide the outer relation into batches up front in order + * to be able to process batches entirely independently. In contrast, the + * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' + * batches whenever it encounters them while scanning and probing, which it + * can do because it processes batches in serial order. + * + * Once PHJ_BUILD_DONE is reached, backends then split up and process + * different batches, or gang up and work together on probing batches if there + * aren't enough to go around. For each batch there is a separate barrier + * with the following phases: + * + * PHJ_BATCH_ELECTING -- initial state + * PHJ_BATCH_ALLOCATING -- one allocates buckets + * PHJ_BATCH_LOADING -- all load the hash table from disk + * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_DONE -- end + * + * Batch 0 is a special case, because it starts out in phase + * PHJ_BATCH_PROBING; populating batch 0's hash table is done during + * PHJ_BUILD_HASHING_INNER so we can skip loading. + * + * Initially we try to plan for a single-batch hash join using the combined + * work_mem of all participants to create a large shared hash table. If that + * turns out either at planning or execution time to be impossible then we + * fall back to regular work_mem sized hash tables. + * + * To avoid deadlocks, we never wait for any barrier unless it is known that + * all other backends attached to it are actively executing the node or have + * already arrived. Practically, that means that we never return a tuple + * while attached to a barrier, unless the barrier has reached its final + * state. In the slightly special case of the per-batch barrier, we return + * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/hashjoin.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" +#include "utils/sharedtuplestore.h" /* @@ -42,24 +136,34 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); +static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue); static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); +static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); /* ---------------------------------------------------------------- - * ExecHashJoin + * ExecHashJoinImpl * - * This function implements the Hybrid Hashjoin algorithm. + * This function implements the Hybrid Hashjoin algorithm. It is marked + * with an always-inline attribute so that ExecHashJoin() and + * ExecParallelHashJoin() can inline it. Compilers that respect the + * attribute should create versions specialized for parallel == true and + * parallel == false with unnecessary branches removed. * * Note: the relation we build hash table on is the "inner" * the other one is "outer". * ---------------------------------------------------------------- */ -static TupleTableSlot * /* return: a tuple or NULL */ -ExecHashJoin(PlanState *pstate) +pg_attribute_always_inline +static inline TupleTableSlot * +ExecHashJoinImpl(PlanState *pstate, bool parallel) { HashJoinState *node = castNode(HashJoinState, pstate); PlanState *outerNode; @@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate) TupleTableSlot *outerTupleSlot; uint32 hashvalue; int batchno; + ParallelHashJoinState *parallel_state; /* * get information from HashJoin node @@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate) outerNode = outerPlanState(node); hashtable = node->hj_HashTable; econtext = node->js.ps.ps_ExprContext; + parallel_state = hashNode->parallel_state; /* * Reset per-tuple memory context to free any expression evaluation @@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (parallel) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables, because no one participant can + * determine that there are no outer tuples, and it's not + * yet clear that it's worth the synchronization overhead + * of reaching consensus to figure that out. So we have + * to build the hash table. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate) node->hj_FirstOuterTupleSlot = NULL; /* - * create the hash table + * Create the hash table. If using Parallel Hash, then + * whoever gets here first will create the hash table and any + * later arrivals will merely attach to it. */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; /* - * execute the Hash node, to build the hash table + * Execute the Hash node, to build the hash table. If using + * Parallel Hash, then we'll try to help hashing unless we + * arrived too late. */ hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); @@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (parallel) + { + Barrier *build_barrier; + + build_barrier = ¶llel_state->build_barrier; + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) + { + /* + * If multi-batch, we need to hash the outer relation + * up front. + */ + if (hashtable->nbatch > 1) + ExecParallelHashJoinPartitionOuter(node); + BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER); + } + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + + /* Each backend should now select a batch to work on. */ + hashtable->curbatch = -1; + node->hj_JoinState = HJ_NEED_NEW_BATCH; + + continue; + } + else + node->hj_JoinState = HJ_NEED_NEW_OUTER; /* FALL THRU */ @@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate) /* * We don't have an outer tuple, try to get the next one */ - outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, - node, - &hashvalue); + if (parallel) + outerTupleSlot = + ExecParallelHashJoinOuterGetTuple(outerNode, node, + &hashvalue); + else + outerTupleSlot = + ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); + if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ @@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate) * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ + Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, &hashtable->outerBatchFile[batchno]); + /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate) /* * Scan the selected hash bucket for matches to current outer */ - if (!ExecScanHashBucket(node, econtext)) + if (parallel) { - /* out of matches; check for possible outer-join fill */ - node->hj_JoinState = HJ_FILL_OUTER_TUPLE; - continue; + if (!ExecParallelScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } + } + else + { + if (!ExecScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } } /* @@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate) /* * Try to advance to next batch. Done if there are no more. */ - if (!ExecHashJoinNewBatch(node)) - return NULL; /* end of join */ + if (parallel) + { + if (!ExecParallelHashJoinNewBatch(node)) + return NULL; /* end of parallel-aware join */ + } + else + { + if (!ExecHashJoinNewBatch(node)) + return NULL; /* end of parallel-oblivious join */ + } node->hj_JoinState = HJ_NEED_NEW_OUTER; break; @@ -374,6 +550,38 @@ ExecHashJoin(PlanState *pstate) } } +/* ---------------------------------------------------------------- + * ExecHashJoin + * + * Parallel-oblivious version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-aware branches removed. + */ + return ExecHashJoinImpl(pstate, false); +} + +/* ---------------------------------------------------------------- + * ExecParallelHashJoin + * + * Parallel-aware version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecParallelHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-oblivious branches removed. + */ + return ExecHashJoinImpl(pstate, true); +} + /* ---------------------------------------------------------------- * ExecInitHashJoin * @@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate = makeNode(HashJoinState); hjstate->js.ps.plan = (Plan *) node; hjstate->js.ps.state = estate; + + /* + * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() + * where this function may be replaced with a parallel version, if we + * managed to launch a parallel query. + */ hjstate->js.ps.ExecProcNode = ExecHashJoin; /* @@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node) /* * ExecHashJoinOuterGetTuple * - * get the next outer tuple for hashjoin: either by - * executing the outer plan node in the first pass, or from - * the temp files for the hashjoin batches. + * get the next outer tuple for a parallel oblivious hashjoin: either by + * executing the outer plan node in the first pass, or from the temp + * files for the hashjoin batches. * * Returns a null slot if no more outer tuples (within the current batch). * @@ -661,6 +875,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, return NULL; } +/* + * ExecHashJoinOuterGetTuple variant for the parallel case. + */ +static TupleTableSlot * +ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + + /* + * In the Parallel Hash case we only run the outer plan directly for + * single-batch hash joins. Otherwise we have to go to batch files, even + * for batch 0. + */ + if (curbatch == 0 && hashtable->nbatch == 1) + { + slot = ExecProcNode(outerNode); + + while (!TupIsNull(slot)) + { + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate), + hashvalue)) + return slot; + + /* + * That tuple couldn't match because of a NULL, so discard it and + * continue with the next one. + */ + slot = ExecProcNode(outerNode); + } + } + else if (curbatch < hashtable->nbatch) + { + MinimalTuple tuple; + + tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, + hashvalue); + if (tuple != NULL) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_OuterTupleSlot, + false); + return slot; + } + else + ExecClearTuple(hjstate->hj_OuterTupleSlot); + } + + /* End of this batch */ + return NULL; +} + /* * ExecHashJoinNewBatch * switch to a new hashjoin batch @@ -803,6 +1078,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) return true; } +/* + * Choose a batch to work on, and attach to it. Returns true if successful, + * false if there are no more batches. + */ +static bool +ExecParallelHashJoinNewBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int start_batchno; + int batchno; + + /* + * If we started up so late that the batch tracking array has been freed + * already by ExecHashTableDetach(), then we are finished. See also + * ExecParallelHashEnsureBatchAccessors(). + */ + if (hashtable->batches == NULL) + return false; + + /* + * If we were already attached to a batch, remember not to bother checking + * it again, and detach from it (possibly freeing the hash table if we are + * last to detach). + */ + if (hashtable->curbatch >= 0) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + /* + * Search for a batch that isn't done. We use an atomic counter to start + * our search at a different batch in every participant when there are + * more batches than participants. + */ + batchno = start_batchno = + pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % + hashtable->nbatch; + do + { + uint32 hashvalue; + MinimalTuple tuple; + TupleTableSlot *slot; + + if (!hashtable->batches[batchno].done) + { + SharedTuplestoreAccessor *inner_tuples; + Barrier *batch_barrier = + &hashtable->batches[batchno].shared->batch_barrier; + + switch (BarrierAttach(batch_barrier)) + { + case PHJ_BATCH_ELECTING: + + /* One backend allocates the hash table. */ + if (BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ELECTING)) + ExecParallelHashTableAlloc(hashtable, batchno); + /* Fall through. */ + + case PHJ_BATCH_ALLOCATING: + /* Wait for allocation to complete. */ + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ALLOCATING); + /* Fall through. */ + + case PHJ_BATCH_LOADING: + /* Start (or join in) loading tuples. */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + inner_tuples = hashtable->batches[batchno].inner_tuples; + sts_begin_parallel_scan(inner_tuples); + while ((tuple = sts_parallel_scan_next(inner_tuples, + &hashvalue))) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + ExecParallelHashTableInsertCurrentBatch(hashtable, slot, + hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_LOADING); + /* Fall through. */ + + case PHJ_BATCH_PROBING: + + /* + * This batch is ready to probe. Return control to + * caller. We stay attached to batch_barrier so that the + * hash table stays alive until everyone's finished + * probing it, but no participant is allowed to wait at + * this barrier again (or else a deadlock could occur). + * All attached participants must eventually call + * BarrierArriveAndDetach() so that the final phase + * PHJ_BATCH_DONE can be reached. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + + case PHJ_BATCH_DONE: + + /* + * Already done. Detach and go around again (if any + * remain). + */ + BarrierDetach(batch_barrier); + + /* + * We didn't work on this batch, but we need to observe + * its size for EXPLAIN. + */ + ExecParallelHashUpdateSpacePeak(hashtable, batchno); + hashtable->batches[batchno].done = true; + hashtable->curbatch = -1; + break; + + default: + elog(ERROR, "unexpected batch phase %d", + BarrierPhase(batch_barrier)); + } + } + batchno = (batchno + 1) % hashtable->nbatch; + } while (batchno != start_batchno); + + return false; +} + /* * ExecHashJoinSaveTuple * save a tuple to a batch file. @@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void +ExecShutdownHashJoin(HashJoinState *node) +{ + if (node->hj_HashTable) + { + /* + * Detach from shared state before DSM memory goes away. This makes + * sure that we don't have any pointers into DSM memory by the time + * ExecEndHashJoin runs. + */ + ExecHashTableDetachBatch(node->hj_HashTable); + ExecHashTableDetach(node->hj_HashTable); + } +} + +static void +ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) +{ + PlanState *outerState = outerPlanState(hjstate); + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + HashJoinTable hashtable = hjstate->hj_HashTable; + TupleTableSlot *slot; + uint32 hashvalue; + int i; + + Assert(hjstate->hj_FirstOuterTupleSlot == NULL); + + /* Execute outer plan, writing all tuples to shared tuplestores. */ + for (;;) + { + slot = ExecProcNode(outerState); + if (TupIsNull(slot)) + break; + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + false, /* outer join, currently unsupported */ + &hashvalue)) + { + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + sts_puttuple(hashtable->batches[batchno].outer_tuples, + &hashvalue, ExecFetchSlotMinimalTuple(slot)); + } + CHECK_FOR_INTERRUPTS(); + } + + /* Make sure all outer partitions are readable by any backend. */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].outer_tuples); +} + +void +ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + HashState *hashNode; + ParallelHashJoinState *pstate; + + /* + * Disable shared hash table mode if we failed to create a real DSM + * segment, because that means that we don't have a DSA area to work with. + */ + if (pcxt->seg == NULL) + return; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); + + /* + * Set up the state needed to coordinate access to the shared hash + * table(s), using the plan node ID as the toc key. + */ + pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); + shm_toc_insert(pcxt->toc, plan_node_id, pstate); + + /* + * Set up the shared hash join state with no batches initially. + * ExecHashTableCreate() will prepare at least one later and set nbatch + * and space_allowed. + */ + pstate->nbatch = 0; + pstate->space_allowed = 0; + pstate->batches = InvalidDsaPointer; + pstate->old_batches = InvalidDsaPointer; + pstate->nbuckets = 0; + pstate->growth = PHJ_GROWTH_OK; + pstate->chunk_work_queue = InvalidDsaPointer; + pg_atomic_init_u32(&pstate->distributor, 0); + pstate->nparticipants = pcxt->nworkers + 1; + pstate->total_tuples = 0; + LWLockInitialize(&pstate->lock, + LWTRANCHE_PARALLEL_HASH_JOIN); + BarrierInit(&pstate->build_barrier, 0); + BarrierInit(&pstate->grow_batches_barrier, 0); + BarrierInit(&pstate->grow_buckets_barrier, 0); + + /* Set up the space we'll use for shared temporary files. */ + SharedFileSetInit(&pstate->fileset, pcxt->seg); + + /* Initialize the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; +} + +/* ---------------------------------------------------------------- + * ExecHashJoinReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(cxt->toc, plan_node_id, false); + + /* + * It would be possible to reuse the shared hash table in single-batch + * cases by resetting and then fast-forwarding build_barrier to + * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but + * currently shared hash tables are already freed by now (by the last + * participant to detach from the batch). We could consider keeping it + * around for single-batch joins. We'd also need to adjust + * finalize_plan() so that it doesn't record a dummy dependency for + * Parallel Hash nodes, preventing the rescan optimization. For now we + * don't try. + */ + + /* Detach, freeing any remaining shared memory. */ + if (state->hj_HashTable != NULL) + { + ExecHashTableDetachBatch(state->hj_HashTable); + ExecHashTableDetach(state->hj_HashTable); + } + + /* Clear any shared batch files. */ + SharedFileSetDeleteAll(&pstate->fileset); + + /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */ + BarrierInit(&pstate->build_barrier, 0); +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt) +{ + HashState *hashNode; + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(pwcxt->toc, plan_node_id, false); + + /* Attach to the space for shared temporary files. */ + SharedFileSetAttach(&pstate->fileset, pwcxt->seg); + + /* Attach to the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index b1515dd8e1..84d717102d 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1057,6 +1057,7 @@ _copyHash(const Hash *from) COPY_SCALAR_FIELD(skewTable); COPY_SCALAR_FIELD(skewColumn); COPY_SCALAR_FIELD(skewInherit); + COPY_SCALAR_FIELD(rows_total); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index b59a5219a7..e468d7cc41 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -927,6 +927,7 @@ _outHash(StringInfo str, const Hash *node) WRITE_OID_FIELD(skewTable); WRITE_INT_FIELD(skewColumn); WRITE_BOOL_FIELD(skewInherit); + WRITE_FLOAT_FIELD(rows_total, "%.0f"); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 0d17ae89b0..1133c70a1c 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2213,6 +2213,7 @@ _readHash(void) READ_OID_FIELD(skewTable); READ_INT_FIELD(skewColumn); READ_BOOL_FIELD(skewInherit); + READ_FLOAT_FIELD(rows_total); READ_DONE(); } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 877827dcb5..c3daacd3ea 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -129,6 +129,7 @@ bool enable_hashjoin = true; bool enable_gathermerge = true; bool enable_partition_wise_join = false; bool enable_parallel_append = true; +bool enable_parallel_hash = true; typedef struct { @@ -3130,16 +3131,19 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, JoinType jointype, List *hashclauses, Path *outer_path, Path *inner_path, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + bool parallel_hash) { Cost startup_cost = 0; Cost run_cost = 0; double outer_path_rows = outer_path->rows; double inner_path_rows = inner_path->rows; + double inner_path_rows_total = inner_path_rows; int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; int num_skew_mcvs; + size_t space_allowed; /* unused */ /* cost of source data */ startup_cost += outer_path->startup_cost; @@ -3160,6 +3164,15 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * inner_path_rows; run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows; + /* + * If this is a parallel hash build, then the value we have for + * inner_rows_total currently refers only to the rows returned by each + * participant. For shared hash table size estimation, we need the total + * number, so we need to undo the division. + */ + if (parallel_hash) + inner_path_rows_total *= get_parallel_divisor(inner_path); + /* * Get hash table size that executor would use for inner relation. * @@ -3170,9 +3183,12 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * XXX at some point it might be interesting to try to account for skew * optimization in the cost estimate, but for now, we don't. */ - ExecChooseHashTableSize(inner_path_rows, + ExecChooseHashTableSize(inner_path_rows_total, inner_path->pathtarget->width, true, /* useskew */ + parallel_hash, /* try_combined_work_mem */ + outer_path->parallel_workers, + &space_allowed, &numbuckets, &numbatches, &num_skew_mcvs); @@ -3204,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, workspace->run_cost = run_cost; workspace->numbuckets = numbuckets; workspace->numbatches = numbatches; + workspace->inner_rows_total = inner_path_rows_total; } /* @@ -3226,6 +3243,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path, Path *inner_path = path->jpath.innerjoinpath; double outer_path_rows = outer_path->rows; double inner_path_rows = inner_path->rows; + double inner_path_rows_total = workspace->inner_rows_total; List *hashclauses = path->path_hashclauses; Cost startup_cost = workspace->startup_cost; Cost run_cost = workspace->run_cost; @@ -3266,6 +3284,9 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path, /* mark the path with estimated # of batches */ path->num_batches = numbatches; + /* store the total number of tuples (sum of partial row estimates) */ + path->inner_rows_total = inner_path_rows_total; + /* and compute the number of "virtual" buckets in the whole join */ virtualbuckets = (double) numbuckets * (double) numbatches; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index 02a630278f..e774130ac8 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -747,7 +747,7 @@ try_hashjoin_path(PlannerInfo *root, * never have any output pathkeys, per comments in create_hashjoin_path. */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, - outer_path, inner_path, extra); + outer_path, inner_path, extra, false); if (add_path_precheck(joinrel, workspace.startup_cost, workspace.total_cost, @@ -761,6 +761,7 @@ try_hashjoin_path(PlannerInfo *root, extra, outer_path, inner_path, + false, /* parallel_hash */ extra->restrictlist, required_outer, hashclauses)); @@ -776,6 +777,10 @@ try_hashjoin_path(PlannerInfo *root, * try_partial_hashjoin_path * Consider a partial hashjoin join path; if it appears useful, push it into * the joinrel's partial_pathlist via add_partial_path(). + * The outer side is partial. If parallel_hash is true, then the inner path + * must be partial and will be run in parallel to create one or more shared + * hash tables; otherwise the inner path must be complete and a copy of it + * is run in every process to create separate identical private hash tables. */ static void try_partial_hashjoin_path(PlannerInfo *root, @@ -784,7 +789,8 @@ try_partial_hashjoin_path(PlannerInfo *root, Path *inner_path, List *hashclauses, JoinType jointype, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + bool parallel_hash) { JoinCostWorkspace workspace; @@ -808,7 +814,7 @@ try_partial_hashjoin_path(PlannerInfo *root, * cost. Bail out right away if it looks terrible. */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, - outer_path, inner_path, extra); + outer_path, inner_path, extra, true); if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL)) return; @@ -821,6 +827,7 @@ try_partial_hashjoin_path(PlannerInfo *root, extra, outer_path, inner_path, + parallel_hash, extra->restrictlist, NULL, hashclauses)); @@ -1839,6 +1846,10 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. + * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel + * Hash, since in that case we're back to a single hash table with a + * single set of match bits for each batch, but that will require + * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && @@ -1848,11 +1859,27 @@ hash_inner_and_outer(PlannerInfo *root, bms_is_empty(joinrel->lateral_relids)) { Path *cheapest_partial_outer; + Path *cheapest_partial_inner = NULL; Path *cheapest_safe_inner = NULL; cheapest_partial_outer = (Path *) linitial(outerrel->partial_pathlist); + /* + * Can we use a partial inner plan too, so that we can build a + * shared hash table in parallel? + */ + if (innerrel->partial_pathlist != NIL && enable_parallel_hash) + { + cheapest_partial_inner = + (Path *) linitial(innerrel->partial_pathlist); + try_partial_hashjoin_path(root, joinrel, + cheapest_partial_outer, + cheapest_partial_inner, + hashclauses, jointype, extra, + true /* parallel_hash */ ); + } + /* * Normally, given that the joinrel is parallel-safe, the cheapest * total inner path will also be parallel-safe, but if not, we'll @@ -1870,7 +1897,8 @@ hash_inner_and_outer(PlannerInfo *root, try_partial_hashjoin_path(root, joinrel, cheapest_partial_outer, cheapest_safe_inner, - hashclauses, jointype, extra); + hashclauses, jointype, extra, + false /* parallel_hash */ ); } } } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index f6c83d0477..1a0d3a885f 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -4192,6 +4192,17 @@ create_hashjoin_plan(PlannerInfo *root, copy_plan_costsize(&hash_plan->plan, inner_plan); hash_plan->plan.startup_cost = hash_plan->plan.total_cost; + /* + * If parallel-aware, the executor will also need an estimate of the total + * number of rows expected from all participants so that it can size the + * shared hash table. + */ + if (best_path->jpath.path.parallel_aware) + { + hash_plan->plan.parallel_aware = true; + hash_plan->rows_total = best_path->inner_rows_total; + } + join_plan = make_hashjoin(tlist, joinclauses, otherclauses, diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 54126fbb6a..2aee156ad3 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2278,6 +2278,7 @@ create_mergejoin_path(PlannerInfo *root, * 'extra' contains various information about the join * 'outer_path' is the cheapest outer path * 'inner_path' is the cheapest inner path + * 'parallel_hash' to select Parallel Hash of inner path (shared hash table) * 'restrict_clauses' are the RestrictInfo nodes to apply at the join * 'required_outer' is the set of required outer rels * 'hashclauses' are the RestrictInfo nodes to use as hash clauses @@ -2291,6 +2292,7 @@ create_hashjoin_path(PlannerInfo *root, JoinPathExtraData *extra, Path *outer_path, Path *inner_path, + bool parallel_hash, List *restrict_clauses, Relids required_outer, List *hashclauses) @@ -2308,7 +2310,8 @@ create_hashjoin_path(PlannerInfo *root, extra->sjinfo, required_outer, &restrict_clauses); - pathnode->jpath.path.parallel_aware = false; + pathnode->jpath.path.parallel_aware = + joinrel->consider_parallel && parallel_hash; pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; /* This is a foolish way to estimate parallel_workers, but for now... */ diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 5c256ff8ab..b502c1bb9b 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3586,6 +3586,51 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_EXECUTE_GATHER: event_name = "ExecuteGather"; break; + case WAIT_EVENT_HASH_BATCH_ALLOCATING: + event_name = "Hash/Batch/Allocating"; + break; + case WAIT_EVENT_HASH_BATCH_ELECTING: + event_name = "Hash/Batch/Electing"; + break; + case WAIT_EVENT_HASH_BATCH_LOADING: + event_name = "Hash/Batch/Loading"; + break; + case WAIT_EVENT_HASH_BUILD_ALLOCATING: + event_name = "Hash/Build/Allocating"; + break; + case WAIT_EVENT_HASH_BUILD_ELECTING: + event_name = "Hash/Build/Electing"; + break; + case WAIT_EVENT_HASH_BUILD_HASHING_INNER: + event_name = "Hash/Build/HashingInner"; + break; + case WAIT_EVENT_HASH_BUILD_HASHING_OUTER: + event_name = "Hash/Build/HashingOuter"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING: + event_name = "Hash/GrowBatches/Allocating"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_DECIDING: + event_name = "Hash/GrowBatches/Deciding"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_ELECTING: + event_name = "Hash/GrowBatches/Electing"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_FINISHING: + event_name = "Hash/GrowBatches/Finishing"; + break; + case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING: + event_name = "Hash/GrowBatches/Repartitioning"; + break; + case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING: + event_name = "Hash/GrowBuckets/Allocating"; + break; + case WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING: + event_name = "Hash/GrowBuckets/Electing"; + break; + case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING: + event_name = "Hash/GrowBuckets/Reinserting"; + break; case WAIT_EVENT_LOGICAL_SYNC_DATA: event_name = "LogicalSyncData"; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0f7a96d85c..e32901d506 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -929,7 +929,15 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, - + { + {"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's user of parallel hash plans."), + NULL + }, + &enable_parallel_hash, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, gettext_noop("Enables genetic query optimization."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 842cf3601a..69f40f04b0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -301,6 +301,7 @@ #enable_sort = on #enable_tidscan = on #enable_partition_wise_join = off +#enable_parallel_hash = on # - Planner Cost Constants - diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 82acadf85b..d8c82d4e7c 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -15,7 +15,10 @@ #define HASHJOIN_H #include "nodes/execnodes.h" +#include "port/atomics.h" +#include "storage/barrier.h" #include "storage/buffile.h" +#include "storage/lwlock.h" /* ---------------------------------------------------------------- * hash-join hash table structures @@ -63,7 +66,12 @@ typedef struct HashJoinTupleData { - struct HashJoinTupleData *next; /* link to next tuple in same bucket */ + /* link to next tuple in same bucket */ + union + { + struct HashJoinTupleData *unshared; + dsa_pointer shared; + } next; uint32 hashvalue; /* tuple's hash code */ /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ } HashJoinTupleData; @@ -112,8 +120,12 @@ typedef struct HashMemoryChunkData size_t maxlen; /* size of the buffer holding the tuples */ size_t used; /* number of buffer bytes already used */ - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked - * list) */ + /* pointer to the next chunk (linked list) */ + union + { + struct HashMemoryChunkData *unshared; + dsa_pointer shared; + } next; char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */ } HashMemoryChunkData; @@ -121,8 +133,148 @@ typedef struct HashMemoryChunkData typedef struct HashMemoryChunkData *HashMemoryChunk; #define HASH_CHUNK_SIZE (32 * 1024L) +#define HASH_CHUNK_HEADER_SIZE (offsetof(HashMemoryChunkData, data)) #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) +/* + * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch + * object in shared memory to coordinate access to it. Since they are + * followed by variable-sized objects, they are arranged in contiguous memory + * but not accessed directly as an array. + */ +typedef struct ParallelHashJoinBatch +{ + dsa_pointer buckets; /* array of hash table buckets */ + Barrier batch_barrier; /* synchronization for joining this batch */ + + dsa_pointer chunks; /* chunks of tuples loaded */ + size_t size; /* size of buckets + chunks in memory */ + size_t estimated_size; /* size of buckets + chunks while writing */ + size_t ntuples; /* number of tuples loaded */ + size_t old_ntuples; /* number of tuples before repartitioning */ + bool space_exhausted; + + /* + * Variable-sized SharedTuplestore objects follow this struct in memory. + * See the accessor macros below. + */ +} ParallelHashJoinBatch; + +/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchInner(batch) \ + ((SharedTuplestore *) \ + ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch)))) + +/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchOuter(batch, nparticipants) \ + ((SharedTuplestore *) \ + ((char *) ParallelHashJoinBatchInner(batch) + \ + MAXALIGN(sts_estimate(nparticipants)))) + +/* Total size of a ParallelHashJoinBatch and tuplestores. */ +#define EstimateParallelHashJoinBatch(hashtable) \ + (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ + MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) + +/* Accessor for the nth ParallelHashJoinBatch given the base. */ +#define NthParallelHashJoinBatch(base, n) \ + ((ParallelHashJoinBatch *) \ + ((char *) (base) + \ + EstimateParallelHashJoinBatch(hashtable) * (n))) + +/* + * Each backend requires a small amount of per-batch state to interact with + * each ParalellHashJoinBatch. + */ +typedef struct ParallelHashJoinBatchAccessor +{ + ParallelHashJoinBatch *shared; /* pointer to shared state */ + + /* Per-backend partial counters to reduce contention. */ + size_t preallocated; /* pre-allocated space for this backend */ + size_t ntuples; /* number of tuples */ + size_t size; /* size of partition in memory */ + size_t estimated_size; /* size of partition on disk */ + size_t old_ntuples; /* how many tuples before repartioning? */ + bool at_least_one_chunk; /* has this backend allocated a chunk? */ + + bool done; /* flag to remember that a batch is done */ + SharedTuplestoreAccessor *inner_tuples; + SharedTuplestoreAccessor *outer_tuples; +} ParallelHashJoinBatchAccessor; + +/* + * While hashing the inner relation, any participant might determine that it's + * time to increase the number of buckets to reduce the load factor or batches + * to reduce the memory size. This is indicated by setting the growth flag to + * these values. + */ +typedef enum ParallelHashGrowth +{ + /* The current dimensions are sufficient. */ + PHJ_GROWTH_OK, + /* The load factor is too high, so we need to add buckets. */ + PHJ_GROWTH_NEED_MORE_BUCKETS, + /* The memory budget would be exhausted, so we need to repartition. */ + PHJ_GROWTH_NEED_MORE_BATCHES, + /* Repartitioning didn't help last time, so don't try to do that again. */ + PHJ_GROWTH_DISABLED +} ParallelHashGrowth; + +/* + * The shared state used to coordinate a Parallel Hash Join. This is stored + * in the DSM segment. + */ +typedef struct ParallelHashJoinState +{ + dsa_pointer batches; /* array of ParallelHashJoinBatch */ + dsa_pointer old_batches; /* previous generation during repartition */ + int nbatch; /* number of batches now */ + int old_nbatch; /* previous number of batches */ + int nbuckets; /* number of buckets */ + ParallelHashGrowth growth; /* control batch/bucket growth */ + dsa_pointer chunk_work_queue; /* chunk work queue */ + int nparticipants; + size_t space_allowed; + size_t total_tuples; /* total number of inner tuples */ + LWLock lock; /* lock protecting the above */ + + Barrier build_barrier; /* synchronization for the build phases */ + Barrier grow_batches_barrier; + Barrier grow_buckets_barrier; + pg_atomic_uint32 distributor; /* counter for load balancing */ + + SharedFileSet fileset; /* space for shared temporary files */ +} ParallelHashJoinState; + +/* The phases for building batches, used by build_barrier. */ +#define PHJ_BUILD_ELECTING 0 +#define PHJ_BUILD_ALLOCATING 1 +#define PHJ_BUILD_HASHING_INNER 2 +#define PHJ_BUILD_HASHING_OUTER 3 +#define PHJ_BUILD_DONE 4 + +/* The phases for probing each batch, used by for batch_barrier. */ +#define PHJ_BATCH_ELECTING 0 +#define PHJ_BATCH_ALLOCATING 1 +#define PHJ_BATCH_LOADING 2 +#define PHJ_BATCH_PROBING 3 +#define PHJ_BATCH_DONE 4 + +/* The phases of batch growth while hashing, for grow_batches_barrier. */ +#define PHJ_GROW_BATCHES_ELECTING 0 +#define PHJ_GROW_BATCHES_ALLOCATING 1 +#define PHJ_GROW_BATCHES_REPARTITIONING 2 +#define PHJ_GROW_BATCHES_DECIDING 3 +#define PHJ_GROW_BATCHES_FINISHING 4 +#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ + +/* The phases of bucket growth while hashing, for grow_buckets_barrier. */ +#define PHJ_GROW_BUCKETS_ELECTING 0 +#define PHJ_GROW_BUCKETS_ALLOCATING 1 +#define PHJ_GROW_BUCKETS_REINSERTING 2 +#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ @@ -133,8 +285,13 @@ typedef struct HashJoinTableData int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ /* buckets[i] is head of list of tuples in i'th in-memory bucket */ - struct HashJoinTupleData **buckets; - /* buckets array is per-batch storage, as are all the tuples */ + union + { + /* unshared array is per-batch storage, as are all the tuples */ + struct HashJoinTupleData **unshared; + /* shared array is per-query DSA area, as are all the tuples */ + dsa_pointer_atomic *shared; + } buckets; bool keepNulls; /* true to store unmatchable NULL tuples */ @@ -153,6 +310,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double partialTuples; /* # tuples obtained from inner plan by me */ double skewTuples; /* # tuples inserted into skew tuples */ /* @@ -185,6 +343,13 @@ typedef struct HashJoinTableData /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ + + /* Shared and private state for Parallel Hash. */ + HashMemoryChunk current_chunk; /* this backend's current chunk */ + dsa_area *area; /* DSA area to allocate memory from */ + ParallelHashJoinState *parallel_state; + ParallelHashJoinBatchAccessor *batches; + dsa_pointer current_chunk_shared; } HashJoinTableData; #endif /* HASHJOIN_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 0974f1edc2..84c166b395 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -17,17 +17,33 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +struct SharedHashJoinBatch; + extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags); extern Node *MultiExecHash(HashState *node); extern void ExecEndHash(HashState *node); extern void ExecReScanHash(HashState *node); -extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators, +extern HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls); +extern void ExecParallelHashTableAlloc(HashJoinTable hashtable, + int batchno); extern void ExecHashTableDestroy(HashJoinTable hashtable); +extern void ExecHashTableDetach(HashJoinTable hashtable); +extern void ExecHashTableDetachBatch(HashJoinTable hashtable); +extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, + int batchno); +void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno); + extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue); +extern void ExecParallelHashTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue); +extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue); extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, @@ -39,12 +55,16 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, int *bucketno, int *batchno); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool try_combined_work_mem, + int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs); @@ -55,6 +75,6 @@ extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwc extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable); + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index 7469bfbf60..8469085d7e 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -20,6 +20,12 @@ extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags); extern void ExecEndHashJoin(HashJoinState *node); extern void ExecReScanHashJoin(HashJoinState *node); +extern void ExecShutdownHashJoin(HashJoinState *node); +extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt); extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1a35c5c9ad..44d8c47d2c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -25,6 +25,7 @@ #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" +#include "utils/sharedtuplestore.h" #include "utils/sortsupport.h" #include "utils/tuplestore.h" #include "utils/tuplesort.h" @@ -43,6 +44,8 @@ struct ExprState; /* forward references in this file */ struct ExprContext; struct ExprEvalStep; /* avoid including execExpr.h everywhere */ +struct ParallelHashJoinState; + typedef Datum (*ExprStateEvalFunc) (struct ExprState *expression, struct ExprContext *econtext, bool *isNull); @@ -2026,6 +2029,9 @@ typedef struct HashState SharedHashInfo *shared_info; /* one entry per worker */ HashInstrumentation *hinstrument; /* this worker's entry */ + + /* Parallel hash state. */ + struct ParallelHashJoinState *parallel_state; } HashState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 02fb366680..d763da647b 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -880,6 +880,7 @@ typedef struct Hash AttrNumber skewColumn; /* outer join key's column #, or zero */ bool skewInherit; /* is outer join rel an inheritance tree? */ /* all other info is in the parent HashJoin node */ + double rows_total; /* estimate total rows if parallel_aware */ } Hash; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 1108b6a0ea..3b9d303ce4 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1464,6 +1464,7 @@ typedef struct HashPath JoinPath jpath; List *path_hashclauses; /* join clauses used for hashing */ int num_batches; /* number of batches expected */ + double inner_rows_total; /* total inner rows expected */ } HashPath; /* @@ -2315,6 +2316,7 @@ typedef struct JoinCostWorkspace /* private for cost_hashjoin code */ int numbuckets; int numbatches; + double inner_rows_total; } JoinCostWorkspace; #endif /* RELATION_H */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 5a1fbf97c3..27afc2eaeb 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern bool enable_hashjoin; extern bool enable_gathermerge; extern bool enable_partition_wise_join; extern bool enable_parallel_append; +extern bool enable_parallel_hash; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -153,7 +154,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root, JoinType jointype, List *hashclauses, Path *outer_path, Path *inner_path, - JoinPathExtraData *extra); + JoinPathExtraData *extra, + bool parallel_hash); extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, JoinPathExtraData *extra); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 99f65b44f2..3ef12b323b 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -153,6 +153,7 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root, JoinPathExtraData *extra, Path *outer_path, Path *inner_path, + bool parallel_hash, List *restrict_clauses, Relids required_outer, List *hashclauses); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 089b7c3a10..58f3a19bc6 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -803,6 +803,21 @@ typedef enum WAIT_EVENT_BGWORKER_STARTUP, WAIT_EVENT_BTREE_PAGE, WAIT_EVENT_EXECUTE_GATHER, + WAIT_EVENT_HASH_BATCH_ALLOCATING, + WAIT_EVENT_HASH_BATCH_ELECTING, + WAIT_EVENT_HASH_BATCH_LOADING, + WAIT_EVENT_HASH_BUILD_ALLOCATING, + WAIT_EVENT_HASH_BUILD_ELECTING, + WAIT_EVENT_HASH_BUILD_HASHING_INNER, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER, + WAIT_EVENT_HASH_GROW_BATCHES_ELECTING, + WAIT_EVENT_HASH_GROW_BATCHES_FINISHING, + WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING, + WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING, + WAIT_EVENT_HASH_GROW_BATCHES_DECIDING, + WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING, + WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING, + WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING, WAIT_EVENT_LOGICAL_SYNC_DATA, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WAIT_EVENT_MQ_INTERNAL, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index a347ee4d7d..97e4a0bbbd 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -211,6 +211,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_BUFFER_MAPPING, LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, + LWTRANCHE_PARALLEL_HASH_JOIN, LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_SESSION_DSA, LWTRANCHE_SESSION_RECORD_TABLE, diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index 9d3abf0ed0..a7cfdf1f44 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -5884,6 +5884,9 @@ insert into extremely_skewed update pg_class set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192 where relname = 'extremely_skewed'; +-- Make a relation with a couple of enormous tuples. +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +alter table wide set (parallel_workers = 2); -- The "optimal" case: the hash table fits in memory; we plan for 1 -- batch, we stick to that number, and peak memory usage stays within -- our work_mem budget @@ -5924,6 +5927,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '4MB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join simple s using (id); QUERY PLAN @@ -5955,6 +5959,43 @@ $$); f | f (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r join simple s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + f | f +(1 row) + rollback to settings; -- The "good" case: batches required, but we plan the right number; we -- plan for some number of batches, and we stick to that number, and @@ -5996,6 +6037,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join simple s using (id); QUERY PLAN @@ -6027,6 +6069,43 @@ $$); t | f (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r join simple s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + t | f +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -6069,6 +6148,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); QUERY PLAN @@ -6100,6 +6180,43 @@ $$); f | t (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '192kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join bigger_than_it_looks s using (id); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on bigger_than_it_looks s +(9 rows) + +select count(*) from simple r join bigger_than_it_looks s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join bigger_than_it_looks s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + f | t +(1 row) + rollback to settings; -- The "ugly" case: increasing the number of batches during execution -- doesn't help, so stop trying to fit in work_mem and hope for the @@ -6142,6 +6259,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join extremely_skewed s using (id); QUERY PLAN @@ -6171,6 +6289,42 @@ $$); 1 | 2 (1 row) +rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join extremely_skewed s using (id); + QUERY PLAN +----------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on extremely_skewed s +(9 rows) + +select count(*) from simple r join extremely_skewed s using (id); + count +------- + 20000 +(1 row) + +select * from hash_join_batches( +$$ + select count(*) from simple r join extremely_skewed s using (id); +$$); + original | final +----------+------- + 1 | 4 +(1 row) + rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. -- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate @@ -6192,10 +6346,11 @@ rollback to settings; -- that we can check that instrumentation comes back correctly. create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; alter table foo set (parallel_workers = 0); -create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t; +create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t; alter table bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -6246,6 +6401,7 @@ $$); rollback to settings; -- single-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -6293,6 +6449,108 @@ $$); f (1 row) +rollback to settings; +-- multi-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '64kB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate + -> Nested Loop Left Join + Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1))) + -> Seq Scan on foo + -> Gather + Workers Planned: 2 + -> Parallel Hash Join + Hash Cond: (b1.id = b2.id) + -> Parallel Seq Scan on bar b1 + -> Parallel Hash + -> Parallel Seq Scan on bar b2 +(11 rows) + +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + count +------- + 3 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); + multibatch +------------ + t +(1 row) + +rollback to settings; +-- single-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '4MB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate + -> Nested Loop Left Join + Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1))) + -> Seq Scan on foo + -> Gather + Workers Planned: 2 + -> Parallel Hash Join + Hash Cond: (b1.id = b2.id) + -> Parallel Seq Scan on bar b1 + -> Parallel Hash + -> Parallel Seq Scan on bar b2 +(11 rows) + +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + count +------- + 3 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); + multibatch +------------ + f +(1 row) + rollback to settings; -- A full outer join where every record is matched. -- non-parallel @@ -6383,5 +6641,49 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- exercise special code paths for huge tuples (note use of non-strict +-- expression and left join required to get the detoasted tuple into +-- the hash table) +-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and +-- sts_puttuple oversized tuple cases because it's multi-batch) +savepoint settings; +set max_parallel_workers_per_gather = 2; +set enable_parallel_hash = on; +set work_mem = '128kB'; +explain (costs off) + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); + QUERY PLAN +---------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Left Join + Hash Cond: (wide.id = wide_1.id) + -> Parallel Seq Scan on wide + -> Parallel Hash + -> Parallel Seq Scan on wide wide_1 +(9 rows) + +select length(max(s.t)) +from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); + length +-------- + 320000 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +$$); + multibatch +------------ + t +(1 row) + rollback to settings; rollback; diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 2b738aae7c..c9c8f51e1c 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -82,11 +82,12 @@ select name, setting from pg_settings where name like 'enable%'; enable_mergejoin | on enable_nestloop | on enable_parallel_append | on + enable_parallel_hash | on enable_partition_wise_join | off enable_seqscan | on enable_sort | on enable_tidscan | on -(14 rows) +(15 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql index 0e933e00d5..a6a452f960 100644 --- a/src/test/regress/sql/join.sql +++ b/src/test/regress/sql/join.sql @@ -2028,6 +2028,10 @@ update pg_class set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192 where relname = 'extremely_skewed'; +-- Make a relation with a couple of enormous tuples. +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +alter table wide set (parallel_workers = 2); + -- The "optimal" case: the hash table fits in memory; we plan for 1 -- batch, we stick to that number, and peak memory usage stays within -- our work_mem budget @@ -2050,6 +2054,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '4MB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join simple s using (id); +select count(*) from simple r join simple s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join simple s using (id); select count(*) from simple r join simple s using (id); @@ -2082,6 +2102,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join simple s using (id); +select count(*) from simple r join simple s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join simple s using (id); select count(*) from simple r join simple s using (id); @@ -2115,6 +2151,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join bigger_than_it_looks s using (id); +select count(*) from simple r join bigger_than_it_looks s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join bigger_than_it_looks s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '192kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); select count(*) from simple r join bigger_than_it_looks s using (id); @@ -2148,6 +2200,21 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join extremely_skewed s using (id); +select count(*) from simple r join extremely_skewed s using (id); +select * from hash_join_batches( +$$ + select count(*) from simple r join extremely_skewed s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join extremely_skewed s using (id); select count(*) from simple r join extremely_skewed s using (id); @@ -2175,11 +2242,12 @@ rollback to settings; create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; alter table foo set (parallel_workers = 0); -create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t; +create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t; alter table bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -2206,6 +2274,61 @@ rollback to settings; -- single-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '4MB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); +rollback to settings; + +-- multi-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '64kB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); +rollback to settings; + +-- single-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -2266,4 +2389,27 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- exercise special code paths for huge tuples (note use of non-strict +-- expression and left join required to get the detoasted tuple into +-- the hash table) + +-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and +-- sts_puttuple oversized tuple cases because it's multi-batch) +savepoint settings; +set max_parallel_workers_per_gather = 2; +set enable_parallel_hash = on; +set work_mem = '128kB'; +explain (costs off) + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +select length(max(s.t)) +from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +select final > 1 as multibatch + from hash_join_batches( +$$ + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +$$); +rollback to settings; + rollback; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e308e20184..a92c62adde 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1542,6 +1542,10 @@ ParallelBitmapHeapState ParallelCompletionPtr ParallelContext ParallelExecutorInfo +ParallelHashGrowth +ParallelHashJoinBatch +ParallelHashJoinBatchAccessor +ParallelHashJoinState ParallelHeapScanDesc ParallelIndexScanDesc ParallelSlot