Fix costing for parallel aggregation.

The original patch kind of ignored the fact that we were doing something
different from a costing point of view, but nobody noticed.  This patch
fixes that oversight.

David Rowley
This commit is contained in:
Robert Haas 2016-04-12 16:24:55 -04:00
parent 46d73e0d65
commit deb71fa971
3 changed files with 92 additions and 25 deletions

View File

@ -3262,6 +3262,8 @@ create_grouping_paths(PlannerInfo *root,
RelOptInfo *grouped_rel; RelOptInfo *grouped_rel;
PathTarget *partial_grouping_target = NULL; PathTarget *partial_grouping_target = NULL;
AggClauseCosts agg_costs; AggClauseCosts agg_costs;
AggClauseCosts agg_partial_costs; /* parallel only */
AggClauseCosts agg_final_costs; /* parallel only */
Size hashaggtablesize; Size hashaggtablesize;
double dNumGroups; double dNumGroups;
double dNumPartialGroups = 0; double dNumPartialGroups = 0;
@ -3346,8 +3348,10 @@ create_grouping_paths(PlannerInfo *root,
MemSet(&agg_costs, 0, sizeof(AggClauseCosts)); MemSet(&agg_costs, 0, sizeof(AggClauseCosts));
if (parse->hasAggs) if (parse->hasAggs)
{ {
count_agg_clauses(root, (Node *) target->exprs, &agg_costs); count_agg_clauses(root, (Node *) target->exprs, &agg_costs, true,
count_agg_clauses(root, parse->havingQual, &agg_costs); false, false);
count_agg_clauses(root, parse->havingQual, &agg_costs, true, false,
false);
} }
/* /*
@ -3422,6 +3426,25 @@ create_grouping_paths(PlannerInfo *root,
NIL, NIL,
NIL); NIL);
/*
* Collect statistics about aggregates for estimating costs of
* performing aggregation in parallel.
*/
MemSet(&agg_partial_costs, 0, sizeof(AggClauseCosts));
MemSet(&agg_final_costs, 0, sizeof(AggClauseCosts));
if (parse->hasAggs)
{
/* partial phase */
count_agg_clauses(root, (Node *) partial_grouping_target->exprs,
&agg_partial_costs, false, false, true);
/* final phase */
count_agg_clauses(root, (Node *) target->exprs, &agg_final_costs,
true, true, true);
count_agg_clauses(root, parse->havingQual, &agg_final_costs, true,
true, true);
}
if (can_sort) if (can_sort)
{ {
/* Checked in set_grouped_rel_consider_parallel() */ /* Checked in set_grouped_rel_consider_parallel() */
@ -3457,7 +3480,7 @@ create_grouping_paths(PlannerInfo *root,
parse->groupClause ? AGG_SORTED : AGG_PLAIN, parse->groupClause ? AGG_SORTED : AGG_PLAIN,
parse->groupClause, parse->groupClause,
NIL, NIL,
&agg_costs, &agg_partial_costs,
dNumPartialGroups, dNumPartialGroups,
false, false,
false, false,
@ -3482,7 +3505,7 @@ create_grouping_paths(PlannerInfo *root,
hashaggtablesize = hashaggtablesize =
estimate_hashagg_tablesize(cheapest_partial_path, estimate_hashagg_tablesize(cheapest_partial_path,
&agg_costs, &agg_partial_costs,
dNumPartialGroups); dNumPartialGroups);
/* /*
@ -3499,7 +3522,7 @@ create_grouping_paths(PlannerInfo *root,
AGG_HASHED, AGG_HASHED,
parse->groupClause, parse->groupClause,
NIL, NIL,
&agg_costs, &agg_partial_costs,
dNumPartialGroups, dNumPartialGroups,
false, false,
false, false,
@ -3631,7 +3654,7 @@ create_grouping_paths(PlannerInfo *root,
parse->groupClause ? AGG_SORTED : AGG_PLAIN, parse->groupClause ? AGG_SORTED : AGG_PLAIN,
parse->groupClause, parse->groupClause,
(List *) parse->havingQual, (List *) parse->havingQual,
&agg_costs, &agg_final_costs,
dNumGroups, dNumGroups,
true, true,
true, true,
@ -3691,7 +3714,7 @@ create_grouping_paths(PlannerInfo *root,
Path *path = (Path *) linitial(grouped_rel->partial_pathlist); Path *path = (Path *) linitial(grouped_rel->partial_pathlist);
hashaggtablesize = estimate_hashagg_tablesize(path, hashaggtablesize = estimate_hashagg_tablesize(path,
&agg_costs, &agg_final_costs,
dNumGroups); dNumGroups);
if (hashaggtablesize < work_mem * 1024L) if (hashaggtablesize < work_mem * 1024L)
@ -3713,7 +3736,7 @@ create_grouping_paths(PlannerInfo *root,
AGG_HASHED, AGG_HASHED,
parse->groupClause, parse->groupClause,
(List *) parse->havingQual, (List *) parse->havingQual,
&agg_costs, &agg_final_costs,
dNumGroups, dNumGroups,
true, true,
true, true,

View File

@ -61,6 +61,9 @@ typedef struct
{ {
PlannerInfo *root; PlannerInfo *root;
AggClauseCosts *costs; AggClauseCosts *costs;
bool finalizeAggs;
bool combineStates;
bool serialStates;
} count_agg_clauses_context; } count_agg_clauses_context;
typedef struct typedef struct
@ -540,12 +543,16 @@ contain_agg_clause_walker(Node *node, void *context)
* are no subqueries. There mustn't be outer-aggregate references either. * are no subqueries. There mustn't be outer-aggregate references either.
*/ */
void void
count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs) count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs,
bool finalizeAggs, bool combineStates, bool serialStates)
{ {
count_agg_clauses_context context; count_agg_clauses_context context;
context.root = root; context.root = root;
context.costs = costs; context.costs = costs;
context.finalizeAggs = finalizeAggs;
context.combineStates = combineStates;
context.serialStates = serialStates;
(void) count_agg_clauses_walker(clause, &context); (void) count_agg_clauses_walker(clause, &context);
} }
@ -562,6 +569,9 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
Form_pg_aggregate aggform; Form_pg_aggregate aggform;
Oid aggtransfn; Oid aggtransfn;
Oid aggfinalfn; Oid aggfinalfn;
Oid aggcombinefn;
Oid aggserialfn;
Oid aggdeserialfn;
Oid aggtranstype; Oid aggtranstype;
int32 aggtransspace; int32 aggtransspace;
QualCost argcosts; QualCost argcosts;
@ -583,6 +593,9 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
aggtransfn = aggform->aggtransfn; aggtransfn = aggform->aggtransfn;
aggfinalfn = aggform->aggfinalfn; aggfinalfn = aggform->aggfinalfn;
aggcombinefn = aggform->aggcombinefn;
aggserialfn = aggform->aggserialfn;
aggdeserialfn = aggform->aggdeserialfn;
aggtranstype = aggform->aggtranstype; aggtranstype = aggform->aggtranstype;
aggtransspace = aggform->aggtransspace; aggtransspace = aggform->aggtransspace;
ReleaseSysCache(aggTuple); ReleaseSysCache(aggTuple);
@ -592,28 +605,58 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
if (aggref->aggorder != NIL || aggref->aggdistinct != NIL) if (aggref->aggorder != NIL || aggref->aggdistinct != NIL)
costs->numOrderedAggs++; costs->numOrderedAggs++;
/* add component function execution costs to appropriate totals */ /*
costs->transCost.per_tuple += get_func_cost(aggtransfn) * cpu_operator_cost; * Add the appropriate component function execution costs to
if (OidIsValid(aggfinalfn)) * appropriate totals.
costs->finalCost += get_func_cost(aggfinalfn) * cpu_operator_cost; */
if (context->combineStates)
{
/* charge for combining previously aggregated states */
costs->transCost.per_tuple += get_func_cost(aggcombinefn) * cpu_operator_cost;
/* also add the input expressions' cost to per-input-row costs */ /* charge for deserialization, when appropriate */
cost_qual_eval_node(&argcosts, (Node *) aggref->args, context->root); if (context->serialStates && OidIsValid(aggdeserialfn))
costs->transCost.startup += argcosts.startup; costs->transCost.per_tuple += get_func_cost(aggdeserialfn) * cpu_operator_cost;
costs->transCost.per_tuple += argcosts.per_tuple; }
else
costs->transCost.per_tuple += get_func_cost(aggtransfn) * cpu_operator_cost;
if (context->finalizeAggs)
{
if (OidIsValid(aggfinalfn))
costs->finalCost += get_func_cost(aggfinalfn) * cpu_operator_cost;
}
else if (context->serialStates)
{
if (OidIsValid(aggserialfn))
costs->finalCost += get_func_cost(aggserialfn) * cpu_operator_cost;
}
/* /*
* Add any filter's cost to per-input-row costs. * Some costs will already have been incurred by the initial aggregate
* * node, so we mustn't include these again.
* XXX Ideally we should reduce input expression costs according to
* filter selectivity, but it's not clear it's worth the trouble.
*/ */
if (aggref->aggfilter) if (!context->combineStates)
{ {
cost_qual_eval_node(&argcosts, (Node *) aggref->aggfilter, /* add the input expressions' cost to per-input-row costs */
context->root); cost_qual_eval_node(&argcosts, (Node *) aggref->args, context->root);
costs->transCost.startup += argcosts.startup; costs->transCost.startup += argcosts.startup;
costs->transCost.per_tuple += argcosts.per_tuple; costs->transCost.per_tuple += argcosts.per_tuple;
/*
* Add any filter's cost to per-input-row costs.
*
* XXX Ideally we should reduce input expression costs according
* to filter selectivity, but it's not clear it's worth the
* trouble.
*/
if (aggref->aggfilter)
{
cost_qual_eval_node(&argcosts, (Node *) aggref->aggfilter,
context->root);
costs->transCost.startup += argcosts.startup;
costs->transCost.per_tuple += argcosts.per_tuple;
}
} }
/* /*

View File

@ -67,7 +67,8 @@ extern List *make_ands_implicit(Expr *clause);
extern PartialAggType aggregates_allow_partial(Node *clause); extern PartialAggType aggregates_allow_partial(Node *clause);
extern bool contain_agg_clause(Node *clause); extern bool contain_agg_clause(Node *clause);
extern void count_agg_clauses(PlannerInfo *root, Node *clause, extern void count_agg_clauses(PlannerInfo *root, Node *clause,
AggClauseCosts *costs); AggClauseCosts *costs, bool finalizeAggs,
bool combineStates, bool serialStates);
extern bool contain_window_function(Node *clause); extern bool contain_window_function(Node *clause);
extern WindowFuncLists *find_window_functions(Node *clause, Index maxWinRef); extern WindowFuncLists *find_window_functions(Node *clause, Index maxWinRef);