diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9c5aaacc51..b6c72e1d1e 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -32,6 +32,7 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" +#include "optimizer/prep.h" #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" @@ -2944,16 +2945,7 @@ estimate_path_cost_size(PlannerInfo *root, MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); if (root->parse->hasAggs) { - get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist, - AGGSPLIT_SIMPLE, &aggcosts); - - /* - * The cost of aggregates in the HAVING qual will be the same - * for each child as it is for the parent, so there's no need - * to use a translated version of havingQual. - */ - get_agg_clause_costs(root, (Node *) root->parse->havingQual, - AGGSPLIT_SIMPLE, &aggcosts); + get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts); } /* Get number of grouping columns and possible number of groups */ diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index d76836c09b..79b325c7cf 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -99,8 +99,7 @@ static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, * the same as the per-query context of the associated ExprContext. * * Any Aggref, WindowFunc, or SubPlan nodes found in the tree are added to - * the lists of such nodes held by the parent PlanState (or more accurately, - * the AggrefExprState etc. nodes created for them are added). + * the lists of such nodes held by the parent PlanState. * * Note: there is no ExecEndExpr function; we assume that any resource * cleanup needed will be handled by just releasing the memory context @@ -779,18 +778,15 @@ ExecInitExprRec(Expr *node, ExprState *state, case T_Aggref: { Aggref *aggref = (Aggref *) node; - AggrefExprState *astate = makeNode(AggrefExprState); scratch.opcode = EEOP_AGGREF; - scratch.d.aggref.astate = astate; - astate->aggref = aggref; + scratch.d.aggref.aggno = aggref->aggno; if (state->parent && IsA(state->parent, AggState)) { AggState *aggstate = (AggState *) state->parent; - aggstate->aggs = lappend(aggstate->aggs, astate); - aggstate->numaggs++; + aggstate->aggs = lappend(aggstate->aggs, aggref); } else { diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 26c2b49632..c09371ad58 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -1494,12 +1494,12 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) * Returns a Datum whose value is the precomputed aggregate value * found in the given expression context. */ - AggrefExprState *aggref = op->d.aggref.astate; + int aggno = op->d.aggref.aggno; Assert(econtext->ecxt_aggvalues != NULL); - *op->resvalue = econtext->ecxt_aggvalues[aggref->aggno]; - *op->resnull = econtext->ecxt_aggnulls[aggref->aggno]; + *op->resvalue = econtext->ecxt_aggvalues[aggno]; + *op->resnull = econtext->ecxt_aggnulls[aggno]; EEO_NEXT(); } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 5bf9e99bbc..dc640feb63 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -465,14 +465,6 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments); -static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, - int lastaggno, List **same_input_transnos); -static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, - bool shareable, - Oid aggtransfn, Oid aggtranstype, - Oid aggserialfn, Oid aggdeserialfn, - Datum initValue, bool initValueIsNull, - List *transnos); /* @@ -3244,9 +3236,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) Plan *outerPlan; ExprContext *econtext; TupleDesc scanDesc; - int numaggs, - transno, - aggno; + int max_aggno; + int max_transno; + int numaggrefs; + int numaggs; + int numtrans; int phase; int phaseidx; ListCell *l; @@ -3422,9 +3416,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * semantics, and it's forbidden by the spec. Because it is true, we * don't need to worry about evaluating the aggs in any particular order. * - * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState - * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs - * in the targetlist are found during ExecAssignProjectionInfo, below. + * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs. + * Aggrefs in the qual are found here; Aggrefs in the targetlist are found + * during ExecAssignProjectionInfo, above. */ aggstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) aggstate); @@ -3432,8 +3426,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* * We should now have found all Aggrefs in the targetlist and quals. */ - numaggs = aggstate->numaggs; - Assert(numaggs == list_length(aggstate->aggs)); + numaggrefs = list_length(aggstate->aggs); + max_aggno = -1; + max_transno = -1; + foreach(l, aggstate->aggs) + { + Aggref *aggref = (Aggref *) lfirst(l); + + max_aggno = Max(max_aggno, aggref->aggno); + max_transno = Max(max_transno, aggref->aggtransno); + } + numaggs = max_aggno + 1; + numtrans = max_transno + 1; /* * For each phase, prepare grouping set data and fmgr lookup data for @@ -3604,7 +3608,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); - pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs); + pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans); aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; @@ -3695,92 +3699,41 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) select_current_set(aggstate, 0, false); } - /* ----------------- + /* * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg and per-trans data. - * - * We try to optimize by detecting duplicate aggregate functions so that - * their state and final values are re-used, rather than needlessly being - * re-calculated independently. We also detect aggregates that are not - * the same, but which can share the same transition state. - * - * Scenarios: - * - * 1. Identical aggregate function calls appear in the query: - * - * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 - * - * Since these aggregates are identical, we only need to calculate - * the value once. Both aggregates will share the same 'aggno' value. - * - * 2. Two different aggregate functions appear in the query, but the - * aggregates have the same arguments, transition functions and - * initial values (and, presumably, different final functions): - * - * SELECT AVG(x), STDDEV(x) FROM ... - * - * In this case we must create a new peragg for the varying aggregate, - * and we need to call the final functions separately, but we need - * only run the transition function once. (This requires that the - * final functions be nondestructive of the transition state, but - * that's required anyway for other reasons.) - * - * For either of these optimizations to be valid, all aggregate properties - * used in the transition phase must be the same, including any modifiers - * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't - * contain any volatile functions. - * ----------------- */ - aggno = -1; - transno = -1; foreach(l, aggstate->aggs) { - AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l); - Aggref *aggref = aggrefstate->aggref; + Aggref *aggref = lfirst(l); AggStatePerAgg peragg; AggStatePerTrans pertrans; - int existing_aggno; - int existing_transno; - List *same_input_transnos; Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; int numDirectArgs; HeapTuple aggTuple; Form_pg_aggregate aggform; AclResult aclresult; - Oid transfn_oid, - finalfn_oid; - bool shareable; + Oid finalfn_oid; Oid serialfn_oid, deserialfn_oid; + Oid aggOwner; Expr *finalfnexpr; Oid aggtranstype; - Datum textInitVal; - Datum initValue; - bool initValueIsNull; /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); /* ... and the split mode should match */ Assert(aggref->aggsplit == aggstate->aggsplit); - /* 1. Check for already processed aggs which can be re-used */ - existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, - &same_input_transnos); - if (existing_aggno != -1) - { - /* - * Existing compatible agg found. so just point the Aggref to the - * same per-agg struct. - */ - aggrefstate->aggno = existing_aggno; - continue; - } + peragg = &peraggs[aggref->aggno]; + + /* Check if we initialized the state for this aggregate already. */ + if (peragg->aggref != NULL) + continue; - /* Mark Aggref state node with assigned index in the result array */ - peragg = &peraggs[++aggno]; peragg->aggref = aggref; - aggrefstate->aggno = aggno; + peragg->transno = aggref->aggtransno; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, @@ -3802,36 +3755,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggtranstype = aggref->aggtranstype; Assert(OidIsValid(aggtranstype)); - /* - * If this aggregation is performing state combines, then instead of - * using the transition function, we'll use the combine function - */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - { - transfn_oid = aggform->aggcombinefn; - - /* If not set then the planner messed up */ - if (!OidIsValid(transfn_oid)) - elog(ERROR, "combinefn not set for aggregate function"); - } - else - transfn_oid = aggform->aggtransfn; - /* Final function only required if we're finalizing the aggregates */ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) peragg->finalfn_oid = finalfn_oid = InvalidOid; else peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* - * If finalfn is marked read-write, we can't share transition states; - * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also, - * if we're not executing the finalfn here, we can share regardless. - */ - shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) || - (finalfn_oid == InvalidOid); - peragg->shareable = shareable; - serialfn_oid = InvalidOid; deserialfn_oid = InvalidOid; @@ -3871,7 +3800,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; - Oid aggOwner; procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); @@ -3881,12 +3809,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; ReleaseSysCache(procTuple); - aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, - ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, OBJECT_FUNCTION, - get_func_name(transfn_oid)); - InvokeFunctionExecuteHook(transfn_oid); if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -3959,51 +3881,60 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) &peragg->resulttypeByVal); /* - * initval is potentially null, so don't try to access it as a struct - * field. Must do it the hard way with SysCacheGetAttr. + * Build working state for invoking the transition function, if we + * haven't done it already. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, - &initValueIsNull); - if (initValueIsNull) - initValue = (Datum) 0; - else - initValue = GetAggInitVal(textInitVal, aggtranstype); + pertrans = &pertransstates[aggref->aggtransno]; + if (pertrans->aggref == NULL) + { + Datum textInitVal; + Datum initValue; + bool initValueIsNull; + Oid transfn_oid; - /* - * 2. Build working state for invoking the transition function, or - * look up previously initialized working state, if we can share it. - * - * find_compatible_peragg() already collected a list of shareable - * per-Trans's with the same inputs. Check if any of them have the - * same transition function and initial value. - */ - existing_transno = find_compatible_pertrans(aggstate, aggref, - shareable, - transfn_oid, aggtranstype, - serialfn_oid, deserialfn_oid, - initValue, initValueIsNull, - same_input_transnos); - if (existing_transno != -1) - { /* - * Existing compatible trans found, so just point the 'peragg' to - * the same per-trans struct, and mark the trans state as shared. + * If this aggregation is performing state combines, then instead + * of using the transition function, we'll use the combine + * function */ - pertrans = &pertransstates[existing_transno]; - pertrans->aggshared = true; - peragg->transno = existing_transno; - } - else - { - pertrans = &pertransstates[++transno]; + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + { + transfn_oid = aggform->aggcombinefn; + + /* If not set then the planner messed up */ + if (!OidIsValid(transfn_oid)) + elog(ERROR, "combinefn not set for aggregate function"); + } + else + transfn_oid = aggform->aggtransfn; + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(transfn_oid)); + InvokeFunctionExecuteHook(transfn_oid); + + /* + * initval is potentially null, so don't try to access it as a + * struct field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, + Anum_pg_aggregate_agginitval, + &initValueIsNull); + if (initValueIsNull) + initValue = (Datum) 0; + else + initValue = GetAggInitVal(textInitVal, aggtranstype); + build_pertrans_for_aggref(pertrans, aggstate, estate, aggref, transfn_oid, aggtranstype, serialfn_oid, deserialfn_oid, initValue, initValueIsNull, inputTypes, numArguments); - peragg->transno = transno; } + else + pertrans->aggshared = true; ReleaseSysCache(aggTuple); } @@ -4011,8 +3942,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * Update aggstate->numaggs to be the number of unique aggregates found. * Also set numstates to the number of unique transition states found. */ - aggstate->numaggs = aggno + 1; - aggstate->numtrans = transno + 1; + aggstate->numaggs = numaggs; + aggstate->numtrans = numtrans; /* * Last, check whether any more aggregates got added onto the node while @@ -4024,7 +3955,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * need to work hard on a helpful error message; but we defend against it * here anyway, just to be sure.) */ - if (numaggs != list_length(aggstate->aggs)) + if (numaggrefs != list_length(aggstate->aggs)) ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); @@ -4420,147 +4351,6 @@ GetAggInitVal(Datum textInitVal, Oid transtype) return initVal; } -/* - * find_compatible_peragg - search for a previously initialized per-Agg struct - * - * Searches the previously looked at aggregates to find one which is compatible - * with this one, with the same input parameters. If no compatible aggregate - * can be found, returns -1. - * - * As a side-effect, this also collects a list of existing, shareable per-Trans - * structs with matching inputs. If no identical Aggref is found, the list is - * passed later to find_compatible_pertrans, to see if we can at least reuse - * the state value of another aggregate. - */ -static int -find_compatible_peragg(Aggref *newagg, AggState *aggstate, - int lastaggno, List **same_input_transnos) -{ - int aggno; - AggStatePerAgg peraggs; - - *same_input_transnos = NIL; - - /* we mustn't reuse the aggref if it contains volatile function calls */ - if (contain_volatile_functions((Node *) newagg)) - return -1; - - peraggs = aggstate->peragg; - - /* - * Search through the list of already seen aggregates. If we find an - * existing identical aggregate call, then we can re-use that one. While - * searching, we'll also collect a list of Aggrefs with the same input - * parameters. If no matching Aggref is found, the caller can potentially - * still re-use the transition state of one of them. (At this stage we - * just compare the parsetrees; whether different aggregates share the - * same transition function will be checked later.) - */ - for (aggno = 0; aggno <= lastaggno; aggno++) - { - AggStatePerAgg peragg; - Aggref *existingRef; - - peragg = &peraggs[aggno]; - existingRef = peragg->aggref; - - /* all of the following must be the same or it's no match */ - if (newagg->inputcollid != existingRef->inputcollid || - newagg->aggtranstype != existingRef->aggtranstype || - newagg->aggstar != existingRef->aggstar || - newagg->aggvariadic != existingRef->aggvariadic || - newagg->aggkind != existingRef->aggkind || - !equal(newagg->args, existingRef->args) || - !equal(newagg->aggorder, existingRef->aggorder) || - !equal(newagg->aggdistinct, existingRef->aggdistinct) || - !equal(newagg->aggfilter, existingRef->aggfilter)) - continue; - - /* if it's the same aggregate function then report exact match */ - if (newagg->aggfnoid == existingRef->aggfnoid && - newagg->aggtype == existingRef->aggtype && - newagg->aggcollid == existingRef->aggcollid && - equal(newagg->aggdirectargs, existingRef->aggdirectargs)) - { - list_free(*same_input_transnos); - *same_input_transnos = NIL; - return aggno; - } - - /* - * Not identical, but it had the same inputs. If the final function - * permits sharing, return its transno to the caller, in case we can - * re-use its per-trans state. (If there's already sharing going on, - * we might report a transno more than once. find_compatible_pertrans - * is cheap enough that it's not worth spending cycles to avoid that.) - */ - if (peragg->shareable) - *same_input_transnos = lappend_int(*same_input_transnos, - peragg->transno); - } - - return -1; -} - -/* - * find_compatible_pertrans - search for a previously initialized per-Trans - * struct - * - * Searches the list of transnos for a per-Trans struct with the same - * transition function and initial condition. (The inputs have already been - * verified to match.) - */ -static int -find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, - Oid aggtransfn, Oid aggtranstype, - Oid aggserialfn, Oid aggdeserialfn, - Datum initValue, bool initValueIsNull, - List *transnos) -{ - ListCell *lc; - - /* If this aggregate can't share transition states, give up */ - if (!shareable) - return -1; - - foreach(lc, transnos) - { - int transno = lfirst_int(lc); - AggStatePerTrans pertrans = &aggstate->pertrans[transno]; - - /* - * if the transfns or transition state types are not the same then the - * state can't be shared. - */ - if (aggtransfn != pertrans->transfn_oid || - aggtranstype != pertrans->aggtranstype) - continue; - - /* - * The serialization and deserialization functions must match, if - * present, as we're unable to share the trans state for aggregates - * which will serialize or deserialize into different formats. - * Remember that these will be InvalidOid if they're not required for - * this agg node. - */ - if (aggserialfn != pertrans->serialfn_oid || - aggdeserialfn != pertrans->deserialfn_oid) - continue; - - /* - * Check that the initial condition matches, too. - */ - if (initValueIsNull && pertrans->initValueIsNull) - return transno; - - if (!initValueIsNull && !pertrans->initValueIsNull && - datumIsEqual(initValue, pertrans->initValue, - pertrans->transtypeByVal, pertrans->transtypeLen)) - return transno; - } - return -1; -} - void ExecEndAgg(AggState *node) { diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index eb1dea658c..f232397cab 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -1849,20 +1849,11 @@ llvm_compile_expr(ExprState *state) case EEOP_AGGREF: { - AggrefExprState *aggref = op->d.aggref.astate; - LLVMValueRef v_aggnop; LLVMValueRef v_aggno; LLVMValueRef value, isnull; - /* - * At this point aggref->aggno is not yet set (it's set up - * in ExecInitAgg() after initializing the expression). So - * load it from memory each time round. - */ - v_aggnop = l_ptr_const(&aggref->aggno, - l_ptr(LLVMInt32Type())); - v_aggno = LLVMBuildLoad(b, v_aggnop, "v_aggno"); + v_aggno = l_int32_const(op->d.aggref.aggno); /* load agg value / null */ value = l_load_gep1(b, v_aggvalues, v_aggno, "aggvalue"); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 5a591d0a75..47b9ffd401 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1492,6 +1492,8 @@ _copyAggref(const Aggref *from) COPY_SCALAR_FIELD(aggkind); COPY_SCALAR_FIELD(agglevelsup); COPY_SCALAR_FIELD(aggsplit); + COPY_SCALAR_FIELD(aggno); + COPY_SCALAR_FIELD(aggtransno); COPY_LOCATION_FIELD(location); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index e2895a8985..6cccaea124 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -232,6 +232,8 @@ _equalAggref(const Aggref *a, const Aggref *b) COMPARE_SCALAR_FIELD(aggkind); COMPARE_SCALAR_FIELD(agglevelsup); COMPARE_SCALAR_FIELD(aggsplit); + COMPARE_SCALAR_FIELD(aggno); + COMPARE_SCALAR_FIELD(aggtransno); COMPARE_LOCATION_FIELD(location); return true; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index f26498cea2..bd5694d88e 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1153,6 +1153,8 @@ _outAggref(StringInfo str, const Aggref *node) WRITE_CHAR_FIELD(aggkind); WRITE_UINT_FIELD(agglevelsup); WRITE_ENUM_FIELD(aggsplit, AggSplit); + WRITE_INT_FIELD(aggno); + WRITE_INT_FIELD(aggtransno); WRITE_LOCATION_FIELD(location); } diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index ab7b535caa..169d5581b9 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -615,6 +615,8 @@ _readAggref(void) READ_CHAR_FIELD(aggkind); READ_UINT_FIELD(agglevelsup); READ_ENUM_FIELD(aggsplit, AggSplit); + READ_INT_FIELD(aggno); + READ_INT_FIELD(aggtransno); READ_LOCATION_FIELD(location); READ_DONE(); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index f1dfdc1a4a..22d6935824 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -2439,7 +2439,8 @@ cost_agg(Path *path, PlannerInfo *root, * than or equal to one, all groups are expected to fit in memory; * otherwise we expect to spill. */ - hashentrysize = hash_agg_entry_size(aggcosts->numAggs, input_width, + hashentrysize = hash_agg_entry_size(list_length(root->aggtransinfos), + input_width, aggcosts->transitionSpace); hash_agg_set_limits(hashentrysize, numGroups, 0, &mem_limit, &ngroups_limit, &num_partitions); diff --git a/src/backend/optimizer/plan/planagg.c b/src/backend/optimizer/plan/planagg.c index 8634940efc..48c4fee892 100644 --- a/src/backend/optimizer/plan/planagg.c +++ b/src/backend/optimizer/plan/planagg.c @@ -47,7 +47,7 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" -static bool find_minmax_aggs_walker(Node *node, List **context); +static bool can_minmax_aggs(PlannerInfo *root, List **context); static bool build_minmax_path(PlannerInfo *root, MinMaxAggInfo *mminfo, Oid eqop, Oid sortop, bool nulls_first); static void minmax_qp_callback(PlannerInfo *root, void *extra); @@ -66,7 +66,8 @@ static Oid fetch_agg_sort_op(Oid aggfnoid); * query_planner(), because we generate indexscan paths by cloning the * planner's state and invoking query_planner() on a modified version of * the query parsetree. Thus, all preprocessing needed before query_planner() - * must already be done. + * must already be done. This relies on the list of aggregates in + * root->agginfos, so preprocess_aggrefs() must have been called already, too. */ void preprocess_minmax_aggregates(PlannerInfo *root) @@ -140,9 +141,7 @@ preprocess_minmax_aggregates(PlannerInfo *root) * all are MIN/MAX aggregates. Stop as soon as we find one that isn't. */ aggs_list = NIL; - if (find_minmax_aggs_walker((Node *) root->processed_tlist, &aggs_list)) - return; - if (find_minmax_aggs_walker(parse->havingQual, &aggs_list)) + if (!can_minmax_aggs(root, &aggs_list)) return; /* @@ -227,38 +226,33 @@ preprocess_minmax_aggregates(PlannerInfo *root) } /* - * find_minmax_aggs_walker - * Recursively scan the Aggref nodes in an expression tree, and check - * that each one is a MIN/MAX aggregate. If so, build a list of the + * can_minmax_aggs + * Walk through all the aggregates in the query, and check + * if they are all MIN/MAX aggregates. If so, build a list of the * distinct aggregate calls in the tree. * - * Returns true if a non-MIN/MAX aggregate is found, false otherwise. - * (This seemingly-backward definition is used because expression_tree_walker - * aborts the scan on true return, which is what we want.) - * - * Found aggregates are added to the list at *context; it's up to the caller - * to initialize the list to NIL. + * Returns false if a non-MIN/MAX aggregate is found, true otherwise. * * This does not descend into subqueries, and so should be used only after * reduction of sublinks to subplans. There mustn't be outer-aggregate * references either. */ static bool -find_minmax_aggs_walker(Node *node, List **context) +can_minmax_aggs(PlannerInfo *root, List **context) { - if (node == NULL) - return false; - if (IsA(node, Aggref)) + ListCell *lc; + + foreach(lc, root->agginfos) { - Aggref *aggref = (Aggref *) node; + AggInfo *agginfo = (AggInfo *) lfirst(lc); + Aggref *aggref = agginfo->representative_aggref; Oid aggsortop; TargetEntry *curTarget; MinMaxAggInfo *mminfo; - ListCell *l; Assert(aggref->agglevelsup == 0); if (list_length(aggref->args) != 1) - return true; /* it couldn't be MIN/MAX */ + return false; /* it couldn't be MIN/MAX */ /* * ORDER BY is usually irrelevant for MIN/MAX, but it can change the @@ -274,7 +268,7 @@ find_minmax_aggs_walker(Node *node, List **context) * quickly. */ if (aggref->aggorder != NIL) - return true; + return false; /* note: we do not care if DISTINCT is mentioned ... */ /* @@ -283,30 +277,19 @@ find_minmax_aggs_walker(Node *node, List **context) * now, just punt. */ if (aggref->aggfilter != NULL) - return true; + return false; aggsortop = fetch_agg_sort_op(aggref->aggfnoid); if (!OidIsValid(aggsortop)) - return true; /* not a MIN/MAX aggregate */ + return false; /* not a MIN/MAX aggregate */ curTarget = (TargetEntry *) linitial(aggref->args); if (contain_mutable_functions((Node *) curTarget->expr)) - return true; /* not potentially indexable */ + return false; /* not potentially indexable */ if (type_is_rowtype(exprType((Node *) curTarget->expr))) - return true; /* IS NOT NULL would have weird semantics */ - - /* - * Check whether it's already in the list, and add it if not. - */ - foreach(l, *context) - { - mminfo = (MinMaxAggInfo *) lfirst(l); - if (mminfo->aggfnoid == aggref->aggfnoid && - equal(mminfo->target, curTarget->expr)) - return false; - } + return false; /* IS NOT NULL would have weird semantics */ mminfo = makeNode(MinMaxAggInfo); mminfo->aggfnoid = aggref->aggfnoid; @@ -318,16 +301,8 @@ find_minmax_aggs_walker(Node *node, List **context) mminfo->param = NULL; *context = lappend(*context, mminfo); - - /* - * We need not recurse into the argument, since it can't contain any - * aggregates. - */ - return false; } - Assert(!IsA(node, SubLink)); - return expression_tree_walker(node, find_minmax_aggs_walker, - (void *) context); + return true; } /* @@ -368,6 +343,8 @@ build_minmax_path(PlannerInfo *root, MinMaxAggInfo *mminfo, subroot->plan_params = NIL; subroot->outer_params = NULL; subroot->init_plans = NIL; + subroot->agginfos = NIL; + subroot->aggtransinfos = NIL; subroot->parse = parse = copyObject(root->parse); IncrementVarSublevelsUp((Node *) parse, 1, 1); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 986d7a52e3..247f7d4625 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -152,7 +152,6 @@ static RelOptInfo *create_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, PathTarget *target, bool target_parallel_safe, - const AggClauseCosts *agg_costs, grouping_sets_data *gd); static bool is_degenerate_grouping(PlannerInfo *root); static void create_degenerate_grouping_paths(PlannerInfo *root, @@ -228,8 +227,7 @@ static RelOptInfo *create_partial_grouping_paths(PlannerInfo *root, GroupPathExtraData *extra, bool force_rel_creation); static void gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel); -static bool can_partial_agg(PlannerInfo *root, - const AggClauseCosts *agg_costs); +static bool can_partial_agg(PlannerInfo *root); static void apply_scanjoin_target_to_paths(PlannerInfo *root, RelOptInfo *rel, List *scanjoin_targets, @@ -1944,7 +1942,6 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, bool scanjoin_target_parallel_safe; bool scanjoin_target_same_exprs; bool have_grouping; - AggClauseCosts agg_costs; WindowFuncLists *wflists = NULL; List *activeWindows = NIL; grouping_sets_data *gset_data = NULL; @@ -1975,25 +1972,16 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, root->processed_tlist = preprocess_targetlist(root); /* - * Collect statistics about aggregates for estimating costs, and mark - * all the aggregates with resolved aggtranstypes. We must do this - * before slicing and dicing the tlist into various pathtargets, else - * some copies of the Aggref nodes might escape being marked with the - * correct transtypes. - * - * Note: currently, we do not detect duplicate aggregates here. This - * may result in somewhat-overestimated cost, which is fine for our - * purposes since all Paths will get charged the same. But at some - * point we might wish to do that detection in the planner, rather - * than during executor startup. + * Mark all the aggregates with resolved aggtranstypes, and detect + * aggregates that are duplicates or can share transition state. We + * must do this before slicing and dicing the tlist into various + * pathtargets, else some copies of the Aggref nodes might escape + * being marked. */ - MemSet(&agg_costs, 0, sizeof(AggClauseCosts)); if (parse->hasAggs) { - get_agg_clause_costs(root, (Node *) root->processed_tlist, - AGGSPLIT_SIMPLE, &agg_costs); - get_agg_clause_costs(root, parse->havingQual, AGGSPLIT_SIMPLE, - &agg_costs); + preprocess_aggrefs(root, (Node *) root->processed_tlist); + preprocess_aggrefs(root, (Node *) parse->havingQual); } /* @@ -2198,7 +2186,6 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, current_rel, grouping_target, grouping_target_parallel_safe, - &agg_costs, gset_data); /* Fix things up if grouping_target contains SRFs */ if (parse->hasTargetSRFs) @@ -3790,7 +3777,6 @@ get_number_of_groups(PlannerInfo *root, * * input_rel: contains the source-data Paths * target: the pathtarget for the result Paths to compute - * agg_costs: cost info about all aggregates in query (in AGGSPLIT_SIMPLE mode) * gd: grouping sets data including list of grouping sets and their clauses * * Note: all Paths in input_rel are expected to return the target computed @@ -3801,12 +3787,15 @@ create_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, PathTarget *target, bool target_parallel_safe, - const AggClauseCosts *agg_costs, grouping_sets_data *gd) { Query *parse = root->parse; RelOptInfo *grouped_rel; RelOptInfo *partially_grouped_rel; + AggClauseCosts agg_costs; + + MemSet(&agg_costs, 0, sizeof(AggClauseCosts)); + get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &agg_costs); /* * Create grouping relation to hold fully aggregated grouping and/or @@ -3862,14 +3851,14 @@ create_grouping_paths(PlannerInfo *root, * the other gating conditions, so we want to do it last. */ if ((parse->groupClause != NIL && - agg_costs->numOrderedAggs == 0 && + root->numOrderedAggs == 0 && (gd ? gd->any_hashable : grouping_is_hashable(parse->groupClause)))) flags |= GROUPING_CAN_USE_HASH; /* * Determine whether partial aggregation is possible. */ - if (can_partial_agg(root, agg_costs)) + if (can_partial_agg(root)) flags |= GROUPING_CAN_PARTIAL_AGG; extra.flags = flags; @@ -3890,7 +3879,7 @@ create_grouping_paths(PlannerInfo *root, extra.patype = PARTITIONWISE_AGGREGATE_NONE; create_ordinary_grouping_paths(root, input_rel, grouped_rel, - agg_costs, gd, &extra, + &agg_costs, gd, &extra, &partially_grouped_rel); } @@ -4248,7 +4237,8 @@ consider_groupingsets_paths(PlannerInfo *root, l_start = lnext(gd->rollups, l_start); } - hashsize = estimate_hashagg_tablesize(path, + hashsize = estimate_hashagg_tablesize(root, + path, agg_costs, dNumGroups - exclude_groups); @@ -4382,7 +4372,8 @@ consider_groupingsets_paths(PlannerInfo *root, /* * Account first for space needed for groups we can't sort at all. */ - availspace -= estimate_hashagg_tablesize(path, + availspace -= estimate_hashagg_tablesize(root, + path, agg_costs, gd->dNumHashGroups); @@ -4433,7 +4424,8 @@ consider_groupingsets_paths(PlannerInfo *root, if (rollup->hashable) { - double sz = estimate_hashagg_tablesize(path, + double sz = estimate_hashagg_tablesize(root, + path, agg_costs, rollup->numGroups); @@ -6926,20 +6918,12 @@ create_partial_grouping_paths(PlannerInfo *root, MemSet(agg_final_costs, 0, sizeof(AggClauseCosts)); if (parse->hasAggs) { - List *partial_target_exprs; - /* partial phase */ - partial_target_exprs = partially_grouped_rel->reltarget->exprs; - get_agg_clause_costs(root, (Node *) partial_target_exprs, - AGGSPLIT_INITIAL_SERIAL, + get_agg_clause_costs(root, AGGSPLIT_INITIAL_SERIAL, agg_partial_costs); /* final phase */ - get_agg_clause_costs(root, (Node *) grouped_rel->reltarget->exprs, - AGGSPLIT_FINAL_DESERIAL, - agg_final_costs); - get_agg_clause_costs(root, extra->havingQual, - AGGSPLIT_FINAL_DESERIAL, + get_agg_clause_costs(root, AGGSPLIT_FINAL_DESERIAL, agg_final_costs); } @@ -7324,7 +7308,7 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel) * Returns true when possible, false otherwise. */ static bool -can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs) +can_partial_agg(PlannerInfo *root) { Query *parse = root->parse; @@ -7341,7 +7325,7 @@ can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs) /* We don't know how to do grouping sets in parallel. */ return false; } - else if (agg_costs->hasNonPartial || agg_costs->hasNonSerial) + else if (root->hasNonPartialAggs || root->hasNonSerialAggs) { /* Insufficient support for partial mode. */ return false; diff --git a/src/backend/optimizer/prep/Makefile b/src/backend/optimizer/prep/Makefile index 5733df4573..6f8c6c8208 100644 --- a/src/backend/optimizer/prep/Makefile +++ b/src/backend/optimizer/prep/Makefile @@ -13,6 +13,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = \ + prepagg.o \ prepjointree.o \ prepqual.o \ preptlist.o \ diff --git a/src/backend/optimizer/prep/prepagg.c b/src/backend/optimizer/prep/prepagg.c new file mode 100644 index 0000000000..34ac985a66 --- /dev/null +++ b/src/backend/optimizer/prep/prepagg.c @@ -0,0 +1,678 @@ +/*------------------------------------------------------------------------- + * + * prepagg.c + * Routines to preprocess aggregate function calls + * + * If there are identical aggregate calls in the query, they only need to + * be computed once. Also, some aggregate functions can share the same + * transition state, so that we only need to call the final function for + * them separately. These optimizations are independent of how the + * aggregates are executed. + * + * preprocess_aggrefs() detects those cases, creates AggInfo and + * AggTransInfo structs for each aggregate and transition state that needs + * to be computed, and sets the 'aggno' and 'transno' fields in the Aggrefs + * accordingly. It also resolves polymorphic transition types, and sets + * the 'aggtranstype' fields accordingly. + * + * XXX: The AggInfo and AggTransInfo structs are thrown away after + * planning, so executor startup has to perform some of the same lookups + * of transition functions and initial values that we do here. One day, we + * might want to carry that information to the Agg nodes to save the effort + * at executor startup. The Agg nodes are constructed much later in the + * planning, however, so it's not trivial. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/optimizer/prep/prepagg.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "catalog/pg_aggregate.h" +#include "catalog/pg_type.h" +#include "nodes/nodeFuncs.h" +#include "nodes/pathnodes.h" +#include "optimizer/clauses.h" +#include "optimizer/cost.h" +#include "optimizer/optimizer.h" +#include "optimizer/plancat.h" +#include "optimizer/prep.h" +#include "parser/parse_agg.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +static bool preprocess_aggrefs_walker(Node *node, PlannerInfo *root); +static int find_compatible_agg(PlannerInfo *root, Aggref *newagg, + List **same_input_transnos); +static int find_compatible_trans(PlannerInfo *root, Aggref *newagg, + bool shareable, + Oid aggtransfn, Oid aggtranstype, + int transtypeLen, bool transtypeByVal, + Oid aggcombinefn, + Oid aggserialfn, Oid aggdeserialfn, + Datum initValue, bool initValueIsNull, + List *transnos); +static Datum GetAggInitVal(Datum textInitVal, Oid transtype); + +/* ----------------- + * Resolve the transition type of all Aggrefs, and determine which Aggrefs + * can share aggregate or transition state. + * + * Information about the aggregates and transition functions are collected + * in the root->agginfos and root->aggtransinfos lists. The 'aggtranstype', + * 'aggno', and 'aggtransno' fields in are filled in in each Aggref. + * + * NOTE: This modifies the Aggrefs in the input expression in-place! + * + * We try to optimize by detecting duplicate aggregate functions so that + * their state and final values are re-used, rather than needlessly being + * re-calculated independently. We also detect aggregates that are not + * the same, but which can share the same transition state. + * + * Scenarios: + * + * 1. Identical aggregate function calls appear in the query: + * + * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 + * + * Since these aggregates are identical, we only need to calculate + * the value once. Both aggregates will share the same 'aggno' value. + * + * 2. Two different aggregate functions appear in the query, but the + * aggregates have the same arguments, transition functions and + * initial values (and, presumably, different final functions): + * + * SELECT AVG(x), STDDEV(x) FROM ... + * + * In this case we must create a new AggInfo for the varying aggregate, + * and we need to call the final functions separately, but we need + * only run the transition function once. (This requires that the + * final functions be nondestructive of the transition state, but + * that's required anyway for other reasons.) + * + * For either of these optimizations to be valid, all aggregate properties + * used in the transition phase must be the same, including any modifiers + * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't + * contain any volatile functions. + * ----------------- + */ +void +preprocess_aggrefs(PlannerInfo *root, Node *clause) +{ + (void) preprocess_aggrefs_walker(clause, root); +} + +static void +preprocess_aggref(Aggref *aggref, PlannerInfo *root) +{ + HeapTuple aggTuple; + Form_pg_aggregate aggform; + Oid aggtransfn; + Oid aggfinalfn; + Oid aggcombinefn; + Oid aggserialfn; + Oid aggdeserialfn; + Oid aggtranstype; + int32 aggtranstypmod; + int32 aggtransspace; + bool shareable; + int aggno; + int transno; + List *same_input_transnos; + int16 resulttypeLen; + bool resulttypeByVal; + Datum textInitVal; + Datum initValue; + bool initValueIsNull; + bool transtypeByVal; + int16 transtypeLen; + Oid inputTypes[FUNC_MAX_ARGS]; + int numArguments; + + Assert(aggref->agglevelsup == 0); + + /* + * Fetch info about the aggregate from pg_aggregate. Note it's correct to + * ignore the moving-aggregate variant, since what we're concerned with + * here is aggregates not window functions. + */ + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + aggtransfn = aggform->aggtransfn; + aggfinalfn = aggform->aggfinalfn; + aggcombinefn = aggform->aggcombinefn; + aggserialfn = aggform->aggserialfn; + aggdeserialfn = aggform->aggdeserialfn; + aggtranstype = aggform->aggtranstype; + aggtransspace = aggform->aggtransspace; + + /* + * Resolve the possibly-polymorphic aggregate transition type. + */ + + /* extract argument types (ignoring any ORDER BY expressions) */ + numArguments = get_aggregate_argtypes(aggref, inputTypes); + + /* resolve actual type of transition state, if polymorphic */ + aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, + aggtranstype, + inputTypes, + numArguments); + aggref->aggtranstype = aggtranstype; + + /* + * If transition state is of same type as first aggregated input, assume + * it's the same typmod (same width) as well. This works for cases like + * MAX/MIN and is probably somewhat reasonable otherwise. + */ + aggtranstypmod = -1; + if (aggref->args) + { + TargetEntry *tle = (TargetEntry *) linitial(aggref->args); + + if (aggtranstype == exprType((Node *) tle->expr)) + aggtranstypmod = exprTypmod((Node *) tle->expr); + } + + /* + * If finalfn is marked read-write, we can't share transition states; but + * it is okay to share states for AGGMODIFY_SHAREABLE aggs. + * + * In principle, in a partial aggregate, we could share the transition + * state even if the final function is marked as read-write, because the + * partial aggregate doesn't execute the final function. But it's too + * early to know whether we're going perform a partial aggregate. + */ + shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE); + + /* get info about the output value's datatype */ + get_typlenbyval(aggref->aggtype, + &resulttypeLen, + &resulttypeByVal); + + /* get initial value */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, + Anum_pg_aggregate_agginitval, + &initValueIsNull); + if (initValueIsNull) + initValue = (Datum) 0; + else + initValue = GetAggInitVal(textInitVal, aggtranstype); + + ReleaseSysCache(aggTuple); + + /* + * 1. See if this is identical to another aggregate function call that + * we've seen already. + */ + aggno = find_compatible_agg(root, aggref, &same_input_transnos); + if (aggno != -1) + { + AggInfo *agginfo = list_nth(root->agginfos, aggno); + + transno = agginfo->transno; + } + else + { + AggInfo *agginfo = palloc(sizeof(AggInfo)); + + agginfo->finalfn_oid = aggfinalfn; + agginfo->representative_aggref = aggref; + agginfo->shareable = shareable; + + aggno = list_length(root->agginfos); + root->agginfos = lappend(root->agginfos, agginfo); + + /* + * Count it, and check for cases requiring ordered input. Note that + * ordered-set aggs always have nonempty aggorder. Any ordered-input + * case also defeats partial aggregation. + */ + if (aggref->aggorder != NIL || aggref->aggdistinct != NIL) + { + root->numOrderedAggs++; + root->hasNonPartialAggs = true; + } + + get_typlenbyval(aggtranstype, + &transtypeLen, + &transtypeByVal); + + /* + * 2. See if this aggregate can share transition state with another + * aggregate that we've initialized already. + */ + transno = find_compatible_trans(root, aggref, shareable, + aggtransfn, aggtranstype, + transtypeLen, transtypeByVal, + aggcombinefn, + aggserialfn, aggdeserialfn, + initValue, initValueIsNull, + same_input_transnos); + if (transno == -1) + { + AggTransInfo *transinfo = palloc(sizeof(AggTransInfo)); + + transinfo->args = aggref->args; + transinfo->aggfilter = aggref->aggfilter; + transinfo->transfn_oid = aggtransfn; + transinfo->combinefn_oid = aggcombinefn; + transinfo->serialfn_oid = aggserialfn; + transinfo->deserialfn_oid = aggdeserialfn; + transinfo->aggtranstype = aggtranstype; + transinfo->aggtranstypmod = aggtranstypmod; + transinfo->transtypeLen = transtypeLen; + transinfo->transtypeByVal = transtypeByVal; + transinfo->aggtransspace = aggtransspace; + transinfo->initValue = initValue; + transinfo->initValueIsNull = initValueIsNull; + + transno = list_length(root->aggtransinfos); + root->aggtransinfos = lappend(root->aggtransinfos, transinfo); + + /* + * Check whether partial aggregation is feasible, unless we + * already found out that we can't do it. + */ + if (!root->hasNonPartialAggs) + { + /* + * If there is no combine function, then partial aggregation + * is not possible. + */ + if (!OidIsValid(transinfo->combinefn_oid)) + root->hasNonPartialAggs = true; + + /* + * If we have any aggs with transtype INTERNAL then we must + * check whether they have serialization/deserialization + * functions; if not, we can't serialize partial-aggregation + * results. + */ + else if (transinfo->aggtranstype == INTERNALOID && + (!OidIsValid(transinfo->serialfn_oid) || + !OidIsValid(transinfo->deserialfn_oid))) + root->hasNonSerialAggs = true; + } + } + agginfo->transno = transno; + } + + /* + * Fill in the fields in the Aggref (aggtranstype was set above already) + */ + aggref->aggno = aggno; + aggref->aggtransno = transno; +} + +static bool +preprocess_aggrefs_walker(Node *node, PlannerInfo *root) +{ + if (node == NULL) + return false; + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + + preprocess_aggref(aggref, root); + + /* + * We assume that the parser checked that there are no aggregates (of + * this level anyway) in the aggregated arguments, direct arguments, + * or filter clause. Hence, we need not recurse into any of them. + */ + return false; + } + Assert(!IsA(node, SubLink)); + return expression_tree_walker(node, preprocess_aggrefs_walker, + (void *) root); +} + + +/* + * find_compatible_agg - search for a previously initialized per-Agg struct + * + * Searches the previously looked at aggregates to find one which is compatible + * with this one, with the same input parameters. If no compatible aggregate + * can be found, returns -1. + * + * As a side-effect, this also collects a list of existing, shareable per-Trans + * structs with matching inputs. If no identical Aggref is found, the list is + * passed later to find_compatible_trans, to see if we can at least reuse + * the state value of another aggregate. + */ +static int +find_compatible_agg(PlannerInfo *root, Aggref *newagg, + List **same_input_transnos) +{ + ListCell *lc; + int aggno; + + *same_input_transnos = NIL; + + /* we mustn't reuse the aggref if it contains volatile function calls */ + if (contain_volatile_functions((Node *) newagg)) + return -1; + + /* + * Search through the list of already seen aggregates. If we find an + * existing identical aggregate call, then we can re-use that one. While + * searching, we'll also collect a list of Aggrefs with the same input + * parameters. If no matching Aggref is found, the caller can potentially + * still re-use the transition state of one of them. (At this stage we + * just compare the parsetrees; whether different aggregates share the + * same transition function will be checked later.) + */ + aggno = -1; + foreach(lc, root->agginfos) + { + AggInfo *agginfo = (AggInfo *) lfirst(lc); + Aggref *existingRef; + + aggno++; + + existingRef = agginfo->representative_aggref; + + /* all of the following must be the same or it's no match */ + if (newagg->inputcollid != existingRef->inputcollid || + newagg->aggtranstype != existingRef->aggtranstype || + newagg->aggstar != existingRef->aggstar || + newagg->aggvariadic != existingRef->aggvariadic || + newagg->aggkind != existingRef->aggkind || + !equal(newagg->args, existingRef->args) || + !equal(newagg->aggorder, existingRef->aggorder) || + !equal(newagg->aggdistinct, existingRef->aggdistinct) || + !equal(newagg->aggfilter, existingRef->aggfilter)) + continue; + + /* if it's the same aggregate function then report exact match */ + if (newagg->aggfnoid == existingRef->aggfnoid && + newagg->aggtype == existingRef->aggtype && + newagg->aggcollid == existingRef->aggcollid && + equal(newagg->aggdirectargs, existingRef->aggdirectargs)) + { + list_free(*same_input_transnos); + *same_input_transnos = NIL; + return aggno; + } + + /* + * Not identical, but it had the same inputs. If the final function + * permits sharing, return its transno to the caller, in case we can + * re-use its per-trans state. (If there's already sharing going on, + * we might report a transno more than once. find_compatible_trans is + * cheap enough that it's not worth spending cycles to avoid that.) + */ + if (agginfo->shareable) + *same_input_transnos = lappend_int(*same_input_transnos, + agginfo->transno); + } + + return -1; +} + +/* + * find_compatible_trans - search for a previously initialized per-Trans + * struct + * + * Searches the list of transnos for a per-Trans struct with the same + * transition function and initial condition. (The inputs have already been + * verified to match.) + */ +static int +find_compatible_trans(PlannerInfo *root, Aggref *newagg, bool shareable, + Oid aggtransfn, Oid aggtranstype, + int transtypeLen, bool transtypeByVal, + Oid aggcombinefn, + Oid aggserialfn, Oid aggdeserialfn, + Datum initValue, bool initValueIsNull, + List *transnos) +{ + ListCell *lc; + + /* If this aggregate can't share transition states, give up */ + if (!shareable) + return -1; + + foreach(lc, transnos) + { + int transno = lfirst_int(lc); + AggTransInfo *pertrans = (AggTransInfo *) list_nth(root->aggtransinfos, transno); + + /* + * if the transfns or transition state types are not the same then the + * state can't be shared. + */ + if (aggtransfn != pertrans->transfn_oid || + aggtranstype != pertrans->aggtranstype) + continue; + + /* + * The serialization and deserialization functions must match, if + * present, as we're unable to share the trans state for aggregates + * which will serialize or deserialize into different formats. + * Remember that these will be InvalidOid if they're not required for + * this agg node. + */ + if (aggserialfn != pertrans->serialfn_oid || + aggdeserialfn != pertrans->deserialfn_oid) + continue; + + /* + * Combine function must also match. We only care about the combine + * function with partial aggregates, but it's too early in the + * planning to know if we will do partial aggregation, so be + * conservative. + */ + if (aggcombinefn != pertrans->combinefn_oid) + continue; + + /* + * Check that the initial condition matches, too. + */ + if (initValueIsNull && pertrans->initValueIsNull) + return transno; + + if (!initValueIsNull && !pertrans->initValueIsNull && + datumIsEqual(initValue, pertrans->initValue, + transtypeByVal, transtypeLen)) + return transno; + } + return -1; +} + +static Datum +GetAggInitVal(Datum textInitVal, Oid transtype) +{ + Oid typinput, + typioparam; + char *strInitVal; + Datum initVal; + + getTypeInputInfo(transtype, &typinput, &typioparam); + strInitVal = TextDatumGetCString(textInitVal); + initVal = OidInputFunctionCall(typinput, strInitVal, + typioparam, -1); + pfree(strInitVal); + return initVal; +} + + +/* + * get_agg_clause_costs + * Recursively find the Aggref nodes in an expression tree, and + * accumulate cost information about them. + * + * 'aggsplit' tells us the expected partial-aggregation mode, which affects + * the cost estimates. + * + * NOTE that the counts/costs are ADDED to those already in *costs ... so + * the caller is responsible for zeroing the struct initially. + * + * We count the nodes, estimate their execution costs, and estimate the total + * space needed for their transition state values if all are evaluated in + * parallel (as would be done in a HashAgg plan). Also, we check whether + * partial aggregation is feasible. See AggClauseCosts for the exact set + * of statistics collected. + * + * In addition, we mark Aggref nodes with the correct aggtranstype, so + * that that doesn't need to be done repeatedly. (That makes this function's + * name a bit of a misnomer.) + * + * This does not descend into subqueries, and so should be used only after + * reduction of sublinks to subplans, or in contexts where it's known there + * are no subqueries. There mustn't be outer-aggregate references either. + */ +void +get_agg_clause_costs(PlannerInfo *root, AggSplit aggsplit, AggClauseCosts *costs) +{ + ListCell *lc; + + foreach(lc, root->aggtransinfos) + { + AggTransInfo *transinfo = (AggTransInfo *) lfirst(lc); + + /* + * Add the appropriate component function execution costs to + * appropriate totals. + */ + if (DO_AGGSPLIT_COMBINE(aggsplit)) + { + /* charge for combining previously aggregated states */ + add_function_cost(root, transinfo->combinefn_oid, NULL, + &costs->transCost); + } + else + add_function_cost(root, transinfo->transfn_oid, NULL, + &costs->transCost); + if (DO_AGGSPLIT_DESERIALIZE(aggsplit) && + OidIsValid(transinfo->deserialfn_oid)) + add_function_cost(root, transinfo->deserialfn_oid, NULL, + &costs->transCost); + if (DO_AGGSPLIT_SERIALIZE(aggsplit) && + OidIsValid(transinfo->serialfn_oid)) + add_function_cost(root, transinfo->serialfn_oid, NULL, + &costs->finalCost); + + /* + * These costs are incurred only by the initial aggregate node, so we + * mustn't include them again at upper levels. + */ + if (!DO_AGGSPLIT_COMBINE(aggsplit)) + { + /* add the input expressions' cost to per-input-row costs */ + QualCost argcosts; + + cost_qual_eval_node(&argcosts, (Node *) transinfo->args, root); + costs->transCost.startup += argcosts.startup; + costs->transCost.per_tuple += argcosts.per_tuple; + + /* + * Add any filter's cost to per-input-row costs. + * + * XXX Ideally we should reduce input expression costs according + * to filter selectivity, but it's not clear it's worth the + * trouble. + */ + if (transinfo->aggfilter) + { + cost_qual_eval_node(&argcosts, (Node *) transinfo->aggfilter, + root); + costs->transCost.startup += argcosts.startup; + costs->transCost.per_tuple += argcosts.per_tuple; + } + } + + /* + * If the transition type is pass-by-value then it doesn't add + * anything to the required size of the hashtable. If it is + * pass-by-reference then we have to add the estimated size of the + * value itself, plus palloc overhead. + */ + if (!transinfo->transtypeByVal) + { + int32 avgwidth; + + /* Use average width if aggregate definition gave one */ + if (transinfo->aggtransspace > 0) + avgwidth = transinfo->aggtransspace; + else if (transinfo->transfn_oid == F_ARRAY_APPEND) + { + /* + * If the transition function is array_append(), it'll use an + * expanded array as transvalue, which will occupy at least + * ALLOCSET_SMALL_INITSIZE and possibly more. Use that as the + * estimate for lack of a better idea. + */ + avgwidth = ALLOCSET_SMALL_INITSIZE; + } + else + { + avgwidth = get_typavgwidth(transinfo->aggtranstype, transinfo->aggtranstypmod); + } + + avgwidth = MAXALIGN(avgwidth); + costs->transitionSpace += avgwidth + 2 * sizeof(void *); + } + else if (transinfo->aggtranstype == INTERNALOID) + { + /* + * INTERNAL transition type is a special case: although INTERNAL + * is pass-by-value, it's almost certainly being used as a pointer + * to some large data structure. The aggregate definition can + * provide an estimate of the size. If it doesn't, then we assume + * ALLOCSET_DEFAULT_INITSIZE, which is a good guess if the data is + * being kept in a private memory context, as is done by + * array_agg() for instance. + */ + if (transinfo->aggtransspace > 0) + costs->transitionSpace += transinfo->aggtransspace; + else + costs->transitionSpace += ALLOCSET_DEFAULT_INITSIZE; + } + } + + foreach(lc, root->agginfos) + { + AggInfo *agginfo = (AggInfo *) lfirst(lc); + Aggref *aggref = agginfo->representative_aggref; + + /* + * Add the appropriate component function execution costs to + * appropriate totals. + */ + if (!DO_AGGSPLIT_SKIPFINAL(aggsplit) && + OidIsValid(agginfo->finalfn_oid)) + add_function_cost(root, agginfo->finalfn_oid, NULL, + &costs->finalCost); + + /* + * If there are direct arguments, treat their evaluation cost like the + * cost of the finalfn. + */ + if (aggref->aggdirectargs) + { + QualCost argcosts; + + cost_qual_eval_node(&argcosts, (Node *) aggref->aggdirectargs, + root); + costs->finalCost.startup += argcosts.startup; + costs->finalCost.per_tuple += argcosts.per_tuple; + } + } +} diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 85ef873caa..587d494c34 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -53,14 +53,6 @@ #include "utils/syscache.h" #include "utils/typcache.h" - -typedef struct -{ - PlannerInfo *root; - AggSplit aggsplit; - AggClauseCosts *costs; -} get_agg_clause_costs_context; - typedef struct { ParamListInfo boundParams; @@ -98,8 +90,6 @@ typedef struct } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); -static bool get_agg_clause_costs_walker(Node *node, - get_agg_clause_costs_context *context); static bool find_window_functions_walker(Node *node, WindowFuncLists *lists); static bool contain_subplans_walker(Node *node, void *context); static bool contain_mutable_functions_walker(Node *node, void *context); @@ -200,284 +190,6 @@ contain_agg_clause_walker(Node *node, void *context) return expression_tree_walker(node, contain_agg_clause_walker, context); } -/* - * get_agg_clause_costs - * Recursively find the Aggref nodes in an expression tree, and - * accumulate cost information about them. - * - * 'aggsplit' tells us the expected partial-aggregation mode, which affects - * the cost estimates. - * - * NOTE that the counts/costs are ADDED to those already in *costs ... so - * the caller is responsible for zeroing the struct initially. - * - * We count the nodes, estimate their execution costs, and estimate the total - * space needed for their transition state values if all are evaluated in - * parallel (as would be done in a HashAgg plan). Also, we check whether - * partial aggregation is feasible. See AggClauseCosts for the exact set - * of statistics collected. - * - * In addition, we mark Aggref nodes with the correct aggtranstype, so - * that that doesn't need to be done repeatedly. (That makes this function's - * name a bit of a misnomer.) - * - * This does not descend into subqueries, and so should be used only after - * reduction of sublinks to subplans, or in contexts where it's known there - * are no subqueries. There mustn't be outer-aggregate references either. - */ -void -get_agg_clause_costs(PlannerInfo *root, Node *clause, AggSplit aggsplit, - AggClauseCosts *costs) -{ - get_agg_clause_costs_context context; - - context.root = root; - context.aggsplit = aggsplit; - context.costs = costs; - (void) get_agg_clause_costs_walker(clause, &context); -} - -static bool -get_agg_clause_costs_walker(Node *node, get_agg_clause_costs_context *context) -{ - if (node == NULL) - return false; - if (IsA(node, Aggref)) - { - Aggref *aggref = (Aggref *) node; - AggClauseCosts *costs = context->costs; - HeapTuple aggTuple; - Form_pg_aggregate aggform; - Oid aggtransfn; - Oid aggfinalfn; - Oid aggcombinefn; - Oid aggserialfn; - Oid aggdeserialfn; - Oid aggtranstype; - int32 aggtransspace; - QualCost argcosts; - - Assert(aggref->agglevelsup == 0); - - /* - * Fetch info about aggregate from pg_aggregate. Note it's correct to - * ignore the moving-aggregate variant, since what we're concerned - * with here is aggregates not window functions. - */ - aggTuple = SearchSysCache1(AGGFNOID, - ObjectIdGetDatum(aggref->aggfnoid)); - if (!HeapTupleIsValid(aggTuple)) - elog(ERROR, "cache lookup failed for aggregate %u", - aggref->aggfnoid); - aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); - aggtransfn = aggform->aggtransfn; - aggfinalfn = aggform->aggfinalfn; - aggcombinefn = aggform->aggcombinefn; - aggserialfn = aggform->aggserialfn; - aggdeserialfn = aggform->aggdeserialfn; - aggtranstype = aggform->aggtranstype; - aggtransspace = aggform->aggtransspace; - ReleaseSysCache(aggTuple); - - /* - * Resolve the possibly-polymorphic aggregate transition type, unless - * already done in a previous pass over the expression. - */ - if (OidIsValid(aggref->aggtranstype)) - aggtranstype = aggref->aggtranstype; - else - { - Oid inputTypes[FUNC_MAX_ARGS]; - int numArguments; - - /* extract argument types (ignoring any ORDER BY expressions) */ - numArguments = get_aggregate_argtypes(aggref, inputTypes); - - /* resolve actual type of transition state, if polymorphic */ - aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid, - aggtranstype, - inputTypes, - numArguments); - aggref->aggtranstype = aggtranstype; - } - - /* - * Count it, and check for cases requiring ordered input. Note that - * ordered-set aggs always have nonempty aggorder. Any ordered-input - * case also defeats partial aggregation. - */ - costs->numAggs++; - if (aggref->aggorder != NIL || aggref->aggdistinct != NIL) - { - costs->numOrderedAggs++; - costs->hasNonPartial = true; - } - - /* - * Check whether partial aggregation is feasible, unless we already - * found out that we can't do it. - */ - if (!costs->hasNonPartial) - { - /* - * If there is no combine function, then partial aggregation is - * not possible. - */ - if (!OidIsValid(aggcombinefn)) - costs->hasNonPartial = true; - - /* - * If we have any aggs with transtype INTERNAL then we must check - * whether they have serialization/deserialization functions; if - * not, we can't serialize partial-aggregation results. - */ - else if (aggtranstype == INTERNALOID && - (!OidIsValid(aggserialfn) || !OidIsValid(aggdeserialfn))) - costs->hasNonSerial = true; - } - - /* - * Add the appropriate component function execution costs to - * appropriate totals. - */ - if (DO_AGGSPLIT_COMBINE(context->aggsplit)) - { - /* charge for combining previously aggregated states */ - add_function_cost(context->root, aggcombinefn, NULL, - &costs->transCost); - } - else - add_function_cost(context->root, aggtransfn, NULL, - &costs->transCost); - if (DO_AGGSPLIT_DESERIALIZE(context->aggsplit) && - OidIsValid(aggdeserialfn)) - add_function_cost(context->root, aggdeserialfn, NULL, - &costs->transCost); - if (DO_AGGSPLIT_SERIALIZE(context->aggsplit) && - OidIsValid(aggserialfn)) - add_function_cost(context->root, aggserialfn, NULL, - &costs->finalCost); - if (!DO_AGGSPLIT_SKIPFINAL(context->aggsplit) && - OidIsValid(aggfinalfn)) - add_function_cost(context->root, aggfinalfn, NULL, - &costs->finalCost); - - /* - * These costs are incurred only by the initial aggregate node, so we - * mustn't include them again at upper levels. - */ - if (!DO_AGGSPLIT_COMBINE(context->aggsplit)) - { - /* add the input expressions' cost to per-input-row costs */ - cost_qual_eval_node(&argcosts, (Node *) aggref->args, context->root); - costs->transCost.startup += argcosts.startup; - costs->transCost.per_tuple += argcosts.per_tuple; - - /* - * Add any filter's cost to per-input-row costs. - * - * XXX Ideally we should reduce input expression costs according - * to filter selectivity, but it's not clear it's worth the - * trouble. - */ - if (aggref->aggfilter) - { - cost_qual_eval_node(&argcosts, (Node *) aggref->aggfilter, - context->root); - costs->transCost.startup += argcosts.startup; - costs->transCost.per_tuple += argcosts.per_tuple; - } - } - - /* - * If there are direct arguments, treat their evaluation cost like the - * cost of the finalfn. - */ - if (aggref->aggdirectargs) - { - cost_qual_eval_node(&argcosts, (Node *) aggref->aggdirectargs, - context->root); - costs->finalCost.startup += argcosts.startup; - costs->finalCost.per_tuple += argcosts.per_tuple; - } - - /* - * If the transition type is pass-by-value then it doesn't add - * anything to the required size of the hashtable. If it is - * pass-by-reference then we have to add the estimated size of the - * value itself, plus palloc overhead. - */ - if (!get_typbyval(aggtranstype)) - { - int32 avgwidth; - - /* Use average width if aggregate definition gave one */ - if (aggtransspace > 0) - avgwidth = aggtransspace; - else if (aggtransfn == F_ARRAY_APPEND) - { - /* - * If the transition function is array_append(), it'll use an - * expanded array as transvalue, which will occupy at least - * ALLOCSET_SMALL_INITSIZE and possibly more. Use that as the - * estimate for lack of a better idea. - */ - avgwidth = ALLOCSET_SMALL_INITSIZE; - } - else - { - /* - * If transition state is of same type as first aggregated - * input, assume it's the same typmod (same width) as well. - * This works for cases like MAX/MIN and is probably somewhat - * reasonable otherwise. - */ - int32 aggtranstypmod = -1; - - if (aggref->args) - { - TargetEntry *tle = (TargetEntry *) linitial(aggref->args); - - if (aggtranstype == exprType((Node *) tle->expr)) - aggtranstypmod = exprTypmod((Node *) tle->expr); - } - - avgwidth = get_typavgwidth(aggtranstype, aggtranstypmod); - } - - avgwidth = MAXALIGN(avgwidth); - costs->transitionSpace += avgwidth + 2 * sizeof(void *); - } - else if (aggtranstype == INTERNALOID) - { - /* - * INTERNAL transition type is a special case: although INTERNAL - * is pass-by-value, it's almost certainly being used as a pointer - * to some large data structure. The aggregate definition can - * provide an estimate of the size. If it doesn't, then we assume - * ALLOCSET_DEFAULT_INITSIZE, which is a good guess if the data is - * being kept in a private memory context, as is done by - * array_agg() for instance. - */ - if (aggtransspace > 0) - costs->transitionSpace += aggtransspace; - else - costs->transitionSpace += ALLOCSET_DEFAULT_INITSIZE; - } - - /* - * We assume that the parser checked that there are no aggregates (of - * this level anyway) in the aggregated arguments, direct arguments, - * or filter clause. Hence, we need not recurse into any of them. - */ - return false; - } - Assert(!IsA(node, SubLink)); - return expression_tree_walker(node, get_agg_clause_costs_walker, - (void *) context); -} - - /***************************************************************************** * Window-function clause manipulation *****************************************************************************/ diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 8b4e3ca5e1..23ac2a2fe6 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -769,6 +769,8 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, aggref->aggkind = aggkind; /* agglevelsup will be set by transformAggregateCall */ aggref->aggsplit = AGGSPLIT_SIMPLE; /* planner might change this */ + aggref->aggno = -1; /* planner will set aggno and aggtransno */ + aggref->aggtransno = -1; aggref->location = location; /* diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c index bec357fcef..80bd60f876 100644 --- a/src/backend/utils/adt/selfuncs.c +++ b/src/backend/utils/adt/selfuncs.c @@ -3839,12 +3839,14 @@ estimate_hash_bucket_stats(PlannerInfo *root, Node *hashkey, double nbuckets, * won't store them. Is this a problem? */ double -estimate_hashagg_tablesize(Path *path, const AggClauseCosts *agg_costs, - double dNumGroups) +estimate_hashagg_tablesize(PlannerInfo *root, Path *path, + const AggClauseCosts *agg_costs, double dNumGroups) { - Size hashentrysize = hash_agg_entry_size(agg_costs->numAggs, - path->pathtarget->width, - agg_costs->transitionSpace); + Size hashentrysize; + + hashentrysize = hash_agg_entry_size(list_length(root->aggtransinfos), + path->pathtarget->width, + agg_costs->transitionSpace); /* * Note that this disregards the effect of fill-factor and growth policy diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 1276ff8bda..087efddff9 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202011231 +#define CATALOG_VERSION_NO 202011241 #endif diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index b792de1bc9..abb489e206 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -564,8 +564,7 @@ typedef struct ExprEvalStep /* for EEOP_AGGREF */ struct { - /* out-of-line state, modified by nodeAgg.c */ - AggrefExprState *astate; + int aggno; } aggref; /* for EEOP_GROUPING_FUNC */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index f6824bf2e1..61ba4c3666 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -746,17 +746,6 @@ typedef tuplehash_iterator TupleHashIterator; * ---------------------------------------------------------------- */ -/* ---------------- - * AggrefExprState node - * ---------------- - */ -typedef struct AggrefExprState -{ - NodeTag type; - Aggref *aggref; /* expression plan node */ - int aggno; /* ID number for agg within its plan node */ -} AggrefExprState; - /* ---------------- * WindowFuncExprState node * ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 7ddd8c011b..3684f87a88 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -206,10 +206,9 @@ typedef enum NodeTag * Most Expr-based plan nodes do not have a corresponding expression state * node, they're fully handled within execExpr* - but sometimes the state * needs to be shared with other parts of the executor, as for example - * with AggrefExprState, which nodeAgg.c has to modify. + * with SubPlanState, which nodeSubplan.c has to modify. */ T_ExprState, - T_AggrefExprState, T_WindowFuncExprState, T_SetExprState, T_SubPlanState, diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 8f62d61702..abe6f570e3 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -55,10 +55,6 @@ typedef struct QualCost */ typedef struct AggClauseCosts { - int numAggs; /* total number of aggregate functions */ - int numOrderedAggs; /* number w/ DISTINCT/ORDER BY/WITHIN GROUP */ - bool hasNonPartial; /* does any agg not support partial mode? */ - bool hasNonSerial; /* is any partial agg non-serializable? */ QualCost transCost; /* total per-input-row execution costs */ QualCost finalCost; /* total per-aggregated-row costs */ Size transitionSpace; /* space for pass-by-ref transition data */ @@ -348,6 +344,15 @@ struct PlannerInfo bool hasAlternativeSubPlans; /* true if we've made any of those */ bool hasRecursion; /* true if planning a recursive WITH item */ + /* + * Information about aggregates. Filled by preprocess_aggrefs(). + */ + List *agginfos; /* AggInfo structs */ + List *aggtransinfos; /* AggTransInfo structs */ + int numOrderedAggs; /* number w/ DISTINCT/ORDER BY/WITHIN GROUP */ + bool hasNonPartialAggs; /* does any agg not support partial mode? */ + bool hasNonSerialAggs; /* is any partial agg non-serializable? */ + /* These fields are used only when hasRecursion is true: */ int wt_param_id; /* PARAM_EXEC ID for the work table */ struct Path *non_recursive_path; /* a path for non-recursive term */ @@ -2549,4 +2554,71 @@ typedef struct JoinCostWorkspace double inner_rows_total; } JoinCostWorkspace; +/* + * AggInfo holds information about an aggregate that needs to be computed. + * Multiple Aggrefs in a query can refer to the same AggInfo by having the + * same 'aggno' value, so that the aggregate is computed only once. + */ +typedef struct AggInfo +{ + /* + * Link to an Aggref expr this state value is for. + * + * There can be multiple identical Aggref's sharing the same per-agg. This + * points to the first one of them. + */ + Aggref *representative_aggref; + + int transno; + + /* + * "shareable" is false if this agg cannot share state values with other + * aggregates because the final function is read-write. + */ + bool shareable; + + /* Oid of the final function or InvalidOid */ + Oid finalfn_oid; + +} AggInfo; + +/* + * AggTransInfo holds information about transition state that is used by one + * or more aggregates in the query. Multiple aggregates can share the same + * transition state, if they have the same inputs and the same transition + * function. Aggrefs that share the same transition info have the same + * 'aggtransno' value. + */ +typedef struct AggTransInfo +{ + List *args; + Expr *aggfilter; + + /* Oid of the state transition function */ + Oid transfn_oid; + + /* Oid of the serialization function or InvalidOid */ + Oid serialfn_oid; + + /* Oid of the deserialization function or InvalidOid */ + Oid deserialfn_oid; + + /* Oid of the combine function or InvalidOid */ + Oid combinefn_oid; + + /* Oid of state value's datatype */ + Oid aggtranstype; + int32 aggtranstypmod; + int transtypeLen; + bool transtypeByVal; + int32 aggtransspace; + + /* + * initial value from pg_aggregate entry + */ + Datum initValue; + bool initValueIsNull; + +} AggTransInfo; + #endif /* PATHNODES_H */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 5b190bb99b..cdbe781c73 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -305,6 +305,12 @@ typedef struct Param * a crosscheck that the Aggrefs match the plan; but note that when aggsplit * indicates a non-final mode, aggtype reflects the transition data type * not the SQL-level output type of the aggregate. + * + * aggno and aggtransno are -1 in the parse stage, and are set in planning. + * Aggregates with the same 'aggno' represent the same aggregate expression, + * and can share the result. Aggregates with same 'transno' but different + * 'aggno' can share the same transition state, only the final function needs + * to be called separately. */ typedef struct Aggref { @@ -326,6 +332,8 @@ typedef struct Aggref char aggkind; /* aggregate kind (see pg_aggregate.h) */ Index agglevelsup; /* > 0 if agg belongs to outer query */ AggSplit aggsplit; /* expected agg-splitting mode of parent Agg */ + int aggno; /* unique ID within the Agg node */ + int aggtransno; /* unique ID of transition state in the Agg */ int location; /* token location, or -1 if unknown */ } Aggref; diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 2584ffc72f..68855d0cee 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -24,8 +24,6 @@ typedef struct } WindowFuncLists; extern bool contain_agg_clause(Node *clause); -extern void get_agg_clause_costs(PlannerInfo *root, Node *clause, - AggSplit aggsplit, AggClauseCosts *costs); extern bool contain_window_function(Node *clause); extern WindowFuncLists *find_window_functions(Node *clause, Index maxWinRef); diff --git a/src/include/optimizer/prep.h b/src/include/optimizer/prep.h index 19c92302b0..0abe6bec00 100644 --- a/src/include/optimizer/prep.h +++ b/src/include/optimizer/prep.h @@ -38,9 +38,17 @@ extern List *preprocess_targetlist(PlannerInfo *root); extern PlanRowMark *get_plan_rowmark(List *rowmarks, Index rtindex); +/* + * prototypes for prepagg.c + */ +extern void get_agg_clause_costs(PlannerInfo *root, AggSplit aggsplit, + AggClauseCosts *agg_costs); +extern void preprocess_aggrefs(PlannerInfo *root, Node *clause); + /* * prototypes for prepunion.c */ extern RelOptInfo *plan_set_operations(PlannerInfo *root); + #endif /* PREP_H */ diff --git a/src/include/utils/selfuncs.h b/src/include/utils/selfuncs.h index 7ac4a06391..3a2cfb7efa 100644 --- a/src/include/utils/selfuncs.h +++ b/src/include/utils/selfuncs.h @@ -200,7 +200,7 @@ extern void estimate_hash_bucket_stats(PlannerInfo *root, Node *hashkey, double nbuckets, Selectivity *mcv_freq, Selectivity *bucketsize_frac); -extern double estimate_hashagg_tablesize(Path *path, +extern double estimate_hashagg_tablesize(PlannerInfo *root, Path *path, const AggClauseCosts *agg_costs, double dNumGroups); diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out index 45c698daf4..dfa4b036b5 100644 --- a/src/test/regress/expected/partition_aggregate.out +++ b/src/test/regress/expected/partition_aggregate.out @@ -1412,11 +1412,12 @@ SELECT y, sum(x), avg(x), count(*) FROM pagg_tab_para GROUP BY y HAVING avg(x) < (4 rows) -- Test when parent can produce parallel paths but not any (or some) of its children +-- (Use one more aggregate to tilt the cost estimates for the plan we want) ALTER TABLE pagg_tab_para_p1 SET (parallel_workers = 0); ALTER TABLE pagg_tab_para_p3 SET (parallel_workers = 0); ANALYZE pagg_tab_para; EXPLAIN (COSTS OFF) -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; QUERY PLAN ------------------------------------------------------------------------------------------- Sort @@ -1436,21 +1437,21 @@ SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < -> Parallel Seq Scan on pagg_tab_para_p2 pagg_tab_para_2 (15 rows) -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; - x | sum | avg | count -----+------+--------------------+------- - 0 | 5000 | 5.0000000000000000 | 1000 - 1 | 6000 | 6.0000000000000000 | 1000 - 10 | 5000 | 5.0000000000000000 | 1000 - 11 | 6000 | 6.0000000000000000 | 1000 - 20 | 5000 | 5.0000000000000000 | 1000 - 21 | 6000 | 6.0000000000000000 | 1000 +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; + x | sum | avg | sum | count +----+------+--------------------+-------+------- + 0 | 5000 | 5.0000000000000000 | 5000 | 1000 + 1 | 6000 | 6.0000000000000000 | 7000 | 1000 + 10 | 5000 | 5.0000000000000000 | 15000 | 1000 + 11 | 6000 | 6.0000000000000000 | 17000 | 1000 + 20 | 5000 | 5.0000000000000000 | 25000 | 1000 + 21 | 6000 | 6.0000000000000000 | 27000 | 1000 (6 rows) ALTER TABLE pagg_tab_para_p2 SET (parallel_workers = 0); ANALYZE pagg_tab_para; EXPLAIN (COSTS OFF) -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; QUERY PLAN ---------------------------------------------------------------------------------- Sort @@ -1470,15 +1471,15 @@ SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < -> Seq Scan on pagg_tab_para_p3 pagg_tab_para_3 (15 rows) -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; - x | sum | avg | count -----+------+--------------------+------- - 0 | 5000 | 5.0000000000000000 | 1000 - 1 | 6000 | 6.0000000000000000 | 1000 - 10 | 5000 | 5.0000000000000000 | 1000 - 11 | 6000 | 6.0000000000000000 | 1000 - 20 | 5000 | 5.0000000000000000 | 1000 - 21 | 6000 | 6.0000000000000000 | 1000 +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; + x | sum | avg | sum | count +----+------+--------------------+-------+------- + 0 | 5000 | 5.0000000000000000 | 5000 | 1000 + 1 | 6000 | 6.0000000000000000 | 7000 | 1000 + 10 | 5000 | 5.0000000000000000 | 15000 | 1000 + 11 | 6000 | 6.0000000000000000 | 17000 | 1000 + 20 | 5000 | 5.0000000000000000 | 25000 | 1000 + 21 | 6000 | 6.0000000000000000 | 27000 | 1000 (6 rows) -- Reset parallelism parameters to get partitionwise aggregation plan. diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql index 117f65ecb4..c17294b15b 100644 --- a/src/test/regress/sql/partition_aggregate.sql +++ b/src/test/regress/sql/partition_aggregate.sql @@ -308,20 +308,21 @@ SELECT y, sum(x), avg(x), count(*) FROM pagg_tab_para GROUP BY y HAVING avg(x) < SELECT y, sum(x), avg(x), count(*) FROM pagg_tab_para GROUP BY y HAVING avg(x) < 12 ORDER BY 1, 2, 3; -- Test when parent can produce parallel paths but not any (or some) of its children +-- (Use one more aggregate to tilt the cost estimates for the plan we want) ALTER TABLE pagg_tab_para_p1 SET (parallel_workers = 0); ALTER TABLE pagg_tab_para_p3 SET (parallel_workers = 0); ANALYZE pagg_tab_para; EXPLAIN (COSTS OFF) -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; ALTER TABLE pagg_tab_para_p2 SET (parallel_workers = 0); ANALYZE pagg_tab_para; EXPLAIN (COSTS OFF) -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; -SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; +SELECT x, sum(y), avg(y), sum(x+y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3; -- Reset parallelism parameters to get partitionwise aggregation plan. RESET min_parallel_table_scan_size; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fde701bfd4..4c40ae37b2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -50,7 +50,6 @@ AggStatePerPhase AggStatePerTrans AggStrategy Aggref -AggrefExprState AlenState Alias AllocBlock