diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 9092b4b309..67bdcb2b27 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3051,29 +3051,111 @@ show_hashagg_info(AggState *aggstate, ExplainState *es) Agg *agg = (Agg *) aggstate->ss.ps.plan; int64 memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024; - Assert(IsA(aggstate, AggState)); - if (agg->aggstrategy != AGG_HASHED && agg->aggstrategy != AGG_MIXED) return; - if (es->costs && aggstate->hash_planned_partitions > 0) + if (es->format != EXPLAIN_FORMAT_TEXT) { - ExplainPropertyInteger("Planned Partitions", NULL, - aggstate->hash_planned_partitions, es); + + if (es->costs && aggstate->hash_planned_partitions > 0) + { + ExplainPropertyInteger("Planned Partitions", NULL, + aggstate->hash_planned_partitions, es); + } + + if (!es->analyze) + return; + + /* EXPLAIN ANALYZE */ + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es); + if (aggstate->hash_batches_used > 0) + { + ExplainPropertyInteger("Disk Usage", "kB", + aggstate->hash_disk_used, es); + ExplainPropertyInteger("HashAgg Batches", NULL, + aggstate->hash_batches_used, es); + } + } + else + { + bool gotone = false; + + if (es->costs && aggstate->hash_planned_partitions > 0) + { + ExplainIndentText(es); + appendStringInfo(es->str, "Planned Partitions: %d", + aggstate->hash_planned_partitions); + gotone = true; + } + + if (!es->analyze) + { + if (gotone) + appendStringInfoChar(es->str, '\n'); + return; + } + + if (!gotone) + ExplainIndentText(es); + else + appendStringInfoString(es->str, " "); + + appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB", + memPeakKb); + + if (aggstate->hash_batches_used > 0) + appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d", + aggstate->hash_disk_used, + aggstate->hash_batches_used); + appendStringInfoChar(es->str, '\n'); } - if (!es->analyze) - return; - - /* EXPLAIN ANALYZE */ - ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es); - if (aggstate->hash_batches_used > 0) + /* Display stats for each parallel worker */ + if (es->analyze && aggstate->shared_info != NULL) { - ExplainPropertyInteger("Disk Usage", "kB", - aggstate->hash_disk_used, es); - ExplainPropertyInteger("HashAgg Batches", NULL, - aggstate->hash_batches_used, es); + for (int n = 0; n < aggstate->shared_info->num_workers; n++) + { + AggregateInstrumentation *sinstrument; + uint64 hash_disk_used; + int hash_batches_used; + + sinstrument = &aggstate->shared_info->sinstrument[n]; + hash_disk_used = sinstrument->hash_disk_used; + hash_batches_used = sinstrument->hash_batches_used; + memPeakKb = (sinstrument->hash_mem_peak + 1023) / 1024; + + if (es->workers_state) + ExplainOpenWorker(n, es); + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + + appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB", + memPeakKb); + + if (hash_batches_used > 0) + appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d", + hash_disk_used, hash_batches_used); + appendStringInfoChar(es->str, '\n'); + } + else + { + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, + es); + if (hash_batches_used > 0) + { + ExplainPropertyInteger("Disk Usage", "kB", hash_disk_used, + es); + ExplainPropertyInteger("HashAgg Batches", NULL, + hash_batches_used, es); + } + } + + if (es->workers_state) + ExplainCloseWorker(n, es); + } } } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 41cb41481d..382e78fb7f 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAgg.h" #include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" @@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggEstimate((AggState *) planstate, e->pcxt); + break; default: break; } @@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate, /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggInitializeDSM((AggState *) planstate, d->pcxt); + break; default: break; } @@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, case T_HashState: ExecHashRetrieveInstrumentation((HashState *) planstate); break; + case T_AggState: + ExecAggRetrieveInstrumentation((AggState *) planstate); + break; default: break; } @@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate, pwcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggInitializeWorker((AggState *) planstate, pwcxt); + break; default: break; } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 331acee281..a20554ae65 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -240,6 +240,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" @@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node) int numGroupingSets = Max(node->maxsets, 1); int setno; + /* + * When ending a parallel worker, copy the statistics gathered by the + * worker back into shared memory so that it can be picked up by the main + * process to report in EXPLAIN ANALYZE. + */ + if (node->shared_info && IsParallelWorker()) + { + AggregateInstrumentation *si; + + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); + si = &node->shared_info->sinstrument[ParallelWorkerNumber]; + si->hash_batches_used = node->hash_batches_used; + si->hash_disk_used = node->hash_disk_used; + si->hash_mem_peak = node->hash_mem_peak; + } + /* Make sure we have closed any open tuplesorts */ if (node->sort_in) @@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ } + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + + /* ---------------------------------------------------------------- + * ExecAggEstimate + * + * Estimate space required to propagate aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggEstimate(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation)); + size = add_size(size, offsetof(SharedAggInfo, sinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeDSM + * + * Initialize DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + pcxt->nworkers * sizeof(AggregateInstrumentation); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeWorker + * + * Attach worker to DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt) +{ + node->shared_info = + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); +} + +/* ---------------------------------------------------------------- + * ExecAggRetrieveInstrumentation + * + * Transfer aggregate statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecAggRetrieveInstrumentation(AggState *node) +{ + Size size; + SharedAggInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + node->shared_info->num_workers * sizeof(AggregateInstrumentation); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 92c2337fd3..bb0805abe0 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -14,6 +14,7 @@ #ifndef NODEAGG_H #define NODEAGG_H +#include "access/parallel.h" #include "nodes/execnodes.h" @@ -323,4 +324,10 @@ extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions); +/* parallel instrumentation support */ +extern void ExecAggEstimate(AggState *node, ParallelContext *pcxt); +extern void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt); +extern void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt); +extern void ExecAggRetrieveInstrumentation(AggState *node); + #endif /* NODEAGG_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 98e0072b8a..f5dfa32d55 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2101,6 +2101,27 @@ typedef struct GroupState bool grp_done; /* indicates completion of Group scan */ } GroupState; +/* --------------------- + * per-worker aggregate information + * --------------------- + */ +typedef struct AggregateInstrumentation +{ + Size hash_mem_peak; /* peak hash table memory usage */ + uint64 hash_disk_used; /* kB of disk space used */ + int hash_batches_used; /* batches used during entire execution */ +} AggregateInstrumentation; + +/* ---------------- + * Shared memory container for per-worker aggregate information + * ---------------- + */ +typedef struct SharedAggInfo +{ + int num_workers; + AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]; +} SharedAggInfo; + /* --------------------- * AggState information * @@ -2190,6 +2211,7 @@ typedef struct AggState AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ ProjectionInfo *combinedproj; /* projection machinery */ + SharedAggInfo *shared_info; /* one entry per worker */ } AggState; /* ----------------