diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 7e4fbafc53..2156385ac8 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2379,62 +2379,87 @@ show_sort_info(SortState *sortstate, ExplainState *es) static void show_hash_info(HashState *hashstate, ExplainState *es) { - HashInstrumentation *hinstrument = NULL; + HashInstrumentation hinstrument = {0}; /* * In a parallel query, the leader process may or may not have run the * hash join, and even if it did it may not have built a hash table due to * timing (if it started late it might have seen no tuples in the outer * relation and skipped building the hash table). Therefore we have to be - * prepared to get instrumentation data from a worker if there is no hash - * table. + * prepared to get instrumentation data from all participants. */ if (hashstate->hashtable) - { - hinstrument = (HashInstrumentation *) - palloc(sizeof(HashInstrumentation)); - ExecHashGetInstrumentation(hinstrument, hashstate->hashtable); - } - else if (hashstate->shared_info) + ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable); + + /* + * Merge results from workers. In the parallel-oblivious case, the + * results from all participants should be identical, except where + * participants didn't run the join at all so have no data. In the + * parallel-aware case, we need to consider all the results. Each worker + * may have seen a different subset of batches and we want to find the + * highest memory usage for any one batch across all batches. + */ + if (hashstate->shared_info) { SharedHashInfo *shared_info = hashstate->shared_info; - int i; + int i; - /* Find the first worker that built a hash table. */ for (i = 0; i < shared_info->num_workers; ++i) { - if (shared_info->hinstrument[i].nbatch > 0) + HashInstrumentation *worker_hi = &shared_info->hinstrument[i]; + + if (worker_hi->nbatch > 0) { - hinstrument = &shared_info->hinstrument[i]; - break; + /* + * Every participant should agree on the buckets, so to be + * sure we have a value we'll just overwrite each time. + */ + hinstrument.nbuckets = worker_hi->nbuckets; + hinstrument.nbuckets_original = worker_hi->nbuckets_original; + + /* + * Normally every participant should agree on the number of + * batches too, but it's possible for a backend that started + * late and missed the whole join not to have the final nbatch + * number. So we'll take the largest number. + */ + hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch); + hinstrument.nbatch_original = worker_hi->nbatch_original; + + /* + * In a parallel-aware hash join, for now we report the + * maximum peak memory reported by any worker. + */ + hinstrument.space_peak = + Max(hinstrument.space_peak, worker_hi->space_peak); } } } - if (hinstrument) + if (hinstrument.nbatch > 0) { - long spacePeakKb = (hinstrument->space_peak + 1023) / 1024; + long spacePeakKb = (hinstrument.space_peak + 1023) / 1024; if (es->format != EXPLAIN_FORMAT_TEXT) { - ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es); + ExplainPropertyLong("Hash Buckets", hinstrument.nbuckets, es); ExplainPropertyLong("Original Hash Buckets", - hinstrument->nbuckets_original, es); - ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es); + hinstrument.nbuckets_original, es); + ExplainPropertyLong("Hash Batches", hinstrument.nbatch, es); ExplainPropertyLong("Original Hash Batches", - hinstrument->nbatch_original, es); + hinstrument.nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hinstrument->nbatch_original != hinstrument->nbatch || - hinstrument->nbuckets_original != hinstrument->nbuckets) + else if (hinstrument.nbatch_original != hinstrument.nbatch || + hinstrument.nbuckets_original != hinstrument.nbuckets) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", - hinstrument->nbuckets, - hinstrument->nbuckets_original, - hinstrument->nbatch, - hinstrument->nbatch_original, + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, spacePeakKb); } else @@ -2442,7 +2467,7 @@ show_hash_info(HashState *hashstate, ExplainState *es) appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "Buckets: %d Batches: %d Memory Usage: %ldkB\n", - hinstrument->nbuckets, hinstrument->nbatch, + hinstrument.nbuckets, hinstrument.nbatch, spacePeakKb); } } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 04eb3650aa..4e1a2806b5 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -3090,7 +3090,16 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) batch->buckets = InvalidDsaPointer; } } - ExecParallelHashUpdateSpacePeak(hashtable, curbatch); + + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + /* Remember that we are not attached to a batch. */ hashtable->curbatch = -1; } @@ -3295,19 +3304,3 @@ ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) return true; } - -/* - * Update this backend's copy of hashtable->spacePeak to account for a given - * batch. This is called at the end of hashing for batch 0, and then for each - * batch when it is done or discovered to be already done. The result is used - * for EXPLAIN output. - */ -void -ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno) -{ - size_t size; - - size = hashtable->batches[batchno].shared->size; - size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets; - hashtable->spacePeak = Max(hashtable->spacePeak, size); -} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5d1dc1f401..817bcf0471 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1186,12 +1186,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * remain). */ BarrierDetach(batch_barrier); - - /* - * We didn't work on this batch, but we need to observe - * its size for EXPLAIN. - */ - ExecParallelHashUpdateSpacePeak(hashtable, batchno); hashtable->batches[batchno].done = true; hashtable->curbatch = -1; break; diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 84c166b395..367dfff018 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -33,7 +33,6 @@ extern void ExecHashTableDetach(HashJoinTable hashtable); extern void ExecHashTableDetachBatch(HashJoinTable hashtable); extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno); -void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno); extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot,