From 9bdb300dedf086cc54edf740088208e6b24307ef Mon Sep 17 00:00:00 2001 From: David Rowley Date: Fri, 19 Jun 2020 17:24:27 +1200 Subject: [PATCH] Fix EXPLAIN ANALYZE for parallel HashAgg plans Since 1f39bce02, HashAgg nodes have had the ability to spill to disk when memory consumption exceeds work_mem. That commit added new properties to EXPLAIN ANALYZE to show the maximum memory usage and disk usage, however, it didn't quite go as far as showing that information for parallel workers. Since workers may have experienced something very different from the main process, we should show this information per worker, as is done in Sort. Reviewed-by: Justin Pryzby Reviewed-by: Jeff Davis Discussion: https://postgr.es/m/CAApHDvpEKbfZa18mM1TD7qV6PG+w97pwCWq5tVD0dX7e11gRJw@mail.gmail.com Backpatch-through: 13, where the hashagg spilling code was added. --- src/backend/commands/explain.c | 112 ++++++++++++++++++++++++---- src/backend/executor/execParallel.c | 19 ++++- src/backend/executor/nodeAgg.c | 103 +++++++++++++++++++++++++ src/include/executor/nodeAgg.h | 7 ++ src/include/nodes/execnodes.h | 22 ++++++ 5 files changed, 245 insertions(+), 18 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 9092b4b309..67bdcb2b27 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3051,29 +3051,111 @@ show_hashagg_info(AggState *aggstate, ExplainState *es) Agg *agg = (Agg *) aggstate->ss.ps.plan; int64 memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024; - Assert(IsA(aggstate, AggState)); - if (agg->aggstrategy != AGG_HASHED && agg->aggstrategy != AGG_MIXED) return; - if (es->costs && aggstate->hash_planned_partitions > 0) + if (es->format != EXPLAIN_FORMAT_TEXT) { - ExplainPropertyInteger("Planned Partitions", NULL, - aggstate->hash_planned_partitions, es); + + if (es->costs && aggstate->hash_planned_partitions > 0) + { + ExplainPropertyInteger("Planned Partitions", NULL, + aggstate->hash_planned_partitions, es); + } + + if (!es->analyze) + return; + + /* EXPLAIN ANALYZE */ + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es); + if (aggstate->hash_batches_used > 0) + { + ExplainPropertyInteger("Disk Usage", "kB", + aggstate->hash_disk_used, es); + ExplainPropertyInteger("HashAgg Batches", NULL, + aggstate->hash_batches_used, es); + } + } + else + { + bool gotone = false; + + if (es->costs && aggstate->hash_planned_partitions > 0) + { + ExplainIndentText(es); + appendStringInfo(es->str, "Planned Partitions: %d", + aggstate->hash_planned_partitions); + gotone = true; + } + + if (!es->analyze) + { + if (gotone) + appendStringInfoChar(es->str, '\n'); + return; + } + + if (!gotone) + ExplainIndentText(es); + else + appendStringInfoString(es->str, " "); + + appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB", + memPeakKb); + + if (aggstate->hash_batches_used > 0) + appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d", + aggstate->hash_disk_used, + aggstate->hash_batches_used); + appendStringInfoChar(es->str, '\n'); } - if (!es->analyze) - return; - - /* EXPLAIN ANALYZE */ - ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es); - if (aggstate->hash_batches_used > 0) + /* Display stats for each parallel worker */ + if (es->analyze && aggstate->shared_info != NULL) { - ExplainPropertyInteger("Disk Usage", "kB", - aggstate->hash_disk_used, es); - ExplainPropertyInteger("HashAgg Batches", NULL, - aggstate->hash_batches_used, es); + for (int n = 0; n < aggstate->shared_info->num_workers; n++) + { + AggregateInstrumentation *sinstrument; + uint64 hash_disk_used; + int hash_batches_used; + + sinstrument = &aggstate->shared_info->sinstrument[n]; + hash_disk_used = sinstrument->hash_disk_used; + hash_batches_used = sinstrument->hash_batches_used; + memPeakKb = (sinstrument->hash_mem_peak + 1023) / 1024; + + if (es->workers_state) + ExplainOpenWorker(n, es); + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + + appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB", + memPeakKb); + + if (hash_batches_used > 0) + appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d", + hash_disk_used, hash_batches_used); + appendStringInfoChar(es->str, '\n'); + } + else + { + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, + es); + if (hash_batches_used > 0) + { + ExplainPropertyInteger("Disk Usage", "kB", hash_disk_used, + es); + ExplainPropertyInteger("HashAgg Batches", NULL, + hash_batches_used, es); + } + } + + if (es->workers_state) + ExplainCloseWorker(n, es); + } } } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 41cb41481d..382e78fb7f 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAgg.h" #include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" @@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggEstimate((AggState *) planstate, e->pcxt); + break; default: break; } @@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate, /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggInitializeDSM((AggState *) planstate, d->pcxt); + break; default: break; } @@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, case T_HashState: ExecHashRetrieveInstrumentation((HashState *) planstate); break; + case T_AggState: + ExecAggRetrieveInstrumentation((AggState *) planstate); + break; default: break; } @@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate, pwcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggInitializeWorker((AggState *) planstate, pwcxt); + break; default: break; } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 331acee281..a20554ae65 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -240,6 +240,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" @@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node) int numGroupingSets = Max(node->maxsets, 1); int setno; + /* + * When ending a parallel worker, copy the statistics gathered by the + * worker back into shared memory so that it can be picked up by the main + * process to report in EXPLAIN ANALYZE. + */ + if (node->shared_info && IsParallelWorker()) + { + AggregateInstrumentation *si; + + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); + si = &node->shared_info->sinstrument[ParallelWorkerNumber]; + si->hash_batches_used = node->hash_batches_used; + si->hash_disk_used = node->hash_disk_used; + si->hash_mem_peak = node->hash_mem_peak; + } + /* Make sure we have closed any open tuplesorts */ if (node->sort_in) @@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ } + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + + /* ---------------------------------------------------------------- + * ExecAggEstimate + * + * Estimate space required to propagate aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggEstimate(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation)); + size = add_size(size, offsetof(SharedAggInfo, sinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeDSM + * + * Initialize DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + pcxt->nworkers * sizeof(AggregateInstrumentation); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeWorker + * + * Attach worker to DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt) +{ + node->shared_info = + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); +} + +/* ---------------------------------------------------------------- + * ExecAggRetrieveInstrumentation + * + * Transfer aggregate statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecAggRetrieveInstrumentation(AggState *node) +{ + Size size; + SharedAggInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + node->shared_info->num_workers * sizeof(AggregateInstrumentation); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 92c2337fd3..bb0805abe0 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -14,6 +14,7 @@ #ifndef NODEAGG_H #define NODEAGG_H +#include "access/parallel.h" #include "nodes/execnodes.h" @@ -323,4 +324,10 @@ extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions); +/* parallel instrumentation support */ +extern void ExecAggEstimate(AggState *node, ParallelContext *pcxt); +extern void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt); +extern void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt); +extern void ExecAggRetrieveInstrumentation(AggState *node); + #endif /* NODEAGG_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 98e0072b8a..f5dfa32d55 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2101,6 +2101,27 @@ typedef struct GroupState bool grp_done; /* indicates completion of Group scan */ } GroupState; +/* --------------------- + * per-worker aggregate information + * --------------------- + */ +typedef struct AggregateInstrumentation +{ + Size hash_mem_peak; /* peak hash table memory usage */ + uint64 hash_disk_used; /* kB of disk space used */ + int hash_batches_used; /* batches used during entire execution */ +} AggregateInstrumentation; + +/* ---------------- + * Shared memory container for per-worker aggregate information + * ---------------- + */ +typedef struct SharedAggInfo +{ + int num_workers; + AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]; +} SharedAggInfo; + /* --------------------- * AggState information * @@ -2190,6 +2211,7 @@ typedef struct AggState AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ ProjectionInfo *combinedproj; /* projection machinery */ + SharedAggInfo *shared_info; /* one entry per worker */ } AggState; /* ----------------