postgresql/src/backend/optimizer/prep/prepagg.c

697 lines
22 KiB
C

/*-------------------------------------------------------------------------
*
* prepagg.c
* Routines to preprocess aggregate function calls
*
* If there are identical aggregate calls in the query, they only need to
* be computed once. Also, some aggregate functions can share the same
* transition state, so that we only need to call the final function for
* them separately. These optimizations are independent of how the
* aggregates are executed.
*
* preprocess_aggrefs() detects those cases, creates AggInfo and
* AggTransInfo structs for each aggregate and transition state that needs
* to be computed, and sets the 'aggno' and 'transno' fields in the Aggrefs
* accordingly. It also resolves polymorphic transition types, and sets
* the 'aggtranstype' fields accordingly.
*
* XXX: The AggInfo and AggTransInfo structs are thrown away after
* planning, so executor startup has to perform some of the same lookups
* of transition functions and initial values that we do here. One day, we
* might want to carry that information to the Agg nodes to save the effort
* at executor startup. The Agg nodes are constructed much later in the
* planning, however, so it's not trivial.
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/optimizer/prep/prepagg.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_type.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pathnodes.h"
#include "optimizer/cost.h"
#include "optimizer/optimizer.h"
#include "optimizer/plancat.h"
#include "optimizer/prep.h"
#include "parser/parse_agg.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
static bool preprocess_aggrefs_walker(Node *node, PlannerInfo *root);
static int find_compatible_agg(PlannerInfo *root, Aggref *newagg,
List **same_input_transnos);
static int find_compatible_trans(PlannerInfo *root, Aggref *newagg,
bool shareable,
Oid aggtransfn, Oid aggtranstype,
int transtypeLen, bool transtypeByVal,
Oid aggcombinefn,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
List *transnos);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
/* -----------------
* Resolve the transition type of all Aggrefs, and determine which Aggrefs
* can share aggregate or transition state.
*
* Information about the aggregates and transition functions are collected
* in the root->agginfos and root->aggtransinfos lists. The 'aggtranstype',
* 'aggno', and 'aggtransno' fields of each Aggref are filled in.
*
* NOTE: This modifies the Aggrefs in the input expression in-place!
*
* We try to optimize by detecting duplicate aggregate functions so that
* their state and final values are re-used, rather than needlessly being
* re-calculated independently. We also detect aggregates that are not
* the same, but which can share the same transition state.
*
* Scenarios:
*
* 1. Identical aggregate function calls appear in the query:
*
* SELECT SUM(x) FROM ... HAVING SUM(x) > 0
*
* Since these aggregates are identical, we only need to calculate
* the value once. Both aggregates will share the same 'aggno' value.
*
* 2. Two different aggregate functions appear in the query, but the
* aggregates have the same arguments, transition functions and
* initial values (and, presumably, different final functions):
*
* SELECT AVG(x), STDDEV(x) FROM ...
*
* In this case we must create a new AggInfo for the varying aggregate,
* and we need to call the final functions separately, but we need
* only run the transition function once. (This requires that the
* final functions be nondestructive of the transition state, but
* that's required anyway for other reasons.)
*
* For either of these optimizations to be valid, all aggregate properties
* used in the transition phase must be the same, including any modifiers
* such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
* contain any volatile functions.
* -----------------
*/
void
preprocess_aggrefs(PlannerInfo *root, Node *clause)
{
(void) preprocess_aggrefs_walker(clause, root);
}
static void
preprocess_aggref(Aggref *aggref, PlannerInfo *root)
{
HeapTuple aggTuple;
Form_pg_aggregate aggform;
Oid aggtransfn;
Oid aggfinalfn;
Oid aggcombinefn;
Oid aggserialfn;
Oid aggdeserialfn;
Oid aggtranstype;
int32 aggtranstypmod;
int32 aggtransspace;
bool shareable;
int aggno;
int transno;
List *same_input_transnos;
int16 resulttypeLen;
bool resulttypeByVal;
Datum textInitVal;
Datum initValue;
bool initValueIsNull;
bool transtypeByVal;
int16 transtypeLen;
Oid inputTypes[FUNC_MAX_ARGS];
int numArguments;
Assert(aggref->agglevelsup == 0);
/*
* Fetch info about the aggregate from pg_aggregate. Note it's correct to
* ignore the moving-aggregate variant, since what we're concerned with
* here is aggregates not window functions.
*/
aggTuple = SearchSysCache1(AGGFNOID,
ObjectIdGetDatum(aggref->aggfnoid));
if (!HeapTupleIsValid(aggTuple))
elog(ERROR, "cache lookup failed for aggregate %u",
aggref->aggfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
aggtransfn = aggform->aggtransfn;
aggfinalfn = aggform->aggfinalfn;
aggcombinefn = aggform->aggcombinefn;
aggserialfn = aggform->aggserialfn;
aggdeserialfn = aggform->aggdeserialfn;
aggtranstype = aggform->aggtranstype;
aggtransspace = aggform->aggtransspace;
/*
* Resolve the possibly-polymorphic aggregate transition type.
*/
/* extract argument types (ignoring any ORDER BY expressions) */
numArguments = get_aggregate_argtypes(aggref, inputTypes);
/* resolve actual type of transition state, if polymorphic */
aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid,
aggtranstype,
inputTypes,
numArguments);
aggref->aggtranstype = aggtranstype;
/*
* If transition state is of same type as first aggregated input, assume
* it's the same typmod (same width) as well. This works for cases like
* MAX/MIN and is probably somewhat reasonable otherwise.
*/
aggtranstypmod = -1;
if (aggref->args)
{
TargetEntry *tle = (TargetEntry *) linitial(aggref->args);
if (aggtranstype == exprType((Node *) tle->expr))
aggtranstypmod = exprTypmod((Node *) tle->expr);
}
/*
* If finalfn is marked read-write, we can't share transition states; but
* it is okay to share states for AGGMODIFY_SHAREABLE aggs.
*
* In principle, in a partial aggregate, we could share the transition
* state even if the final function is marked as read-write, because the
* partial aggregate doesn't execute the final function. But it's too
* early to know whether we're going perform a partial aggregate.
*/
shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE);
/* get info about the output value's datatype */
get_typlenbyval(aggref->aggtype,
&resulttypeLen,
&resulttypeByVal);
/* get initial value */
textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
Anum_pg_aggregate_agginitval,
&initValueIsNull);
if (initValueIsNull)
initValue = (Datum) 0;
else
initValue = GetAggInitVal(textInitVal, aggtranstype);
ReleaseSysCache(aggTuple);
/*
* 1. See if this is identical to another aggregate function call that
* we've seen already.
*/
aggno = find_compatible_agg(root, aggref, &same_input_transnos);
if (aggno != -1)
{
AggInfo *agginfo = list_nth_node(AggInfo, root->agginfos, aggno);
agginfo->aggrefs = lappend(agginfo->aggrefs, aggref);
transno = agginfo->transno;
}
else
{
AggInfo *agginfo = makeNode(AggInfo);
agginfo->finalfn_oid = aggfinalfn;
agginfo->aggrefs = list_make1(aggref);
agginfo->shareable = shareable;
aggno = list_length(root->agginfos);
root->agginfos = lappend(root->agginfos, agginfo);
/*
* Count it, and check for cases requiring ordered input. Note that
* ordered-set aggs always have nonempty aggorder. Any ordered-input
* case also defeats partial aggregation.
*/
if (aggref->aggorder != NIL || aggref->aggdistinct != NIL)
{
root->numOrderedAggs++;
root->hasNonPartialAggs = true;
}
get_typlenbyval(aggtranstype,
&transtypeLen,
&transtypeByVal);
/*
* 2. See if this aggregate can share transition state with another
* aggregate that we've initialized already.
*/
transno = find_compatible_trans(root, aggref, shareable,
aggtransfn, aggtranstype,
transtypeLen, transtypeByVal,
aggcombinefn,
aggserialfn, aggdeserialfn,
initValue, initValueIsNull,
same_input_transnos);
if (transno == -1)
{
AggTransInfo *transinfo = makeNode(AggTransInfo);
transinfo->args = aggref->args;
transinfo->aggfilter = aggref->aggfilter;
transinfo->transfn_oid = aggtransfn;
transinfo->combinefn_oid = aggcombinefn;
transinfo->serialfn_oid = aggserialfn;
transinfo->deserialfn_oid = aggdeserialfn;
transinfo->aggtranstype = aggtranstype;
transinfo->aggtranstypmod = aggtranstypmod;
transinfo->transtypeLen = transtypeLen;
transinfo->transtypeByVal = transtypeByVal;
transinfo->aggtransspace = aggtransspace;
transinfo->initValue = initValue;
transinfo->initValueIsNull = initValueIsNull;
transno = list_length(root->aggtransinfos);
root->aggtransinfos = lappend(root->aggtransinfos, transinfo);
/*
* Check whether partial aggregation is feasible, unless we
* already found out that we can't do it.
*/
if (!root->hasNonPartialAggs)
{
/*
* If there is no combine function, then partial aggregation
* is not possible.
*/
if (!OidIsValid(transinfo->combinefn_oid))
root->hasNonPartialAggs = true;
/*
* If we have any aggs with transtype INTERNAL then we must
* check whether they have serialization/deserialization
* functions; if not, we can't serialize partial-aggregation
* results.
*/
else if (transinfo->aggtranstype == INTERNALOID)
{
if (!OidIsValid(transinfo->serialfn_oid) ||
!OidIsValid(transinfo->deserialfn_oid))
root->hasNonSerialAggs = true;
/*
* array_agg_serialize and array_agg_deserialize make use
* of the aggregate non-byval input type's send and
* receive functions. There's a chance that the type
* being aggregated has one or both of these functions
* missing. In this case we must not allow the
* aggregate's serial and deserial functions to be used.
* It would be nice not to have special case this and
* instead provide some sort of supporting function within
* the aggregate to do this, but for now, that seems like
* overkill for this one case.
*/
if ((transinfo->serialfn_oid == F_ARRAY_AGG_SERIALIZE ||
transinfo->deserialfn_oid == F_ARRAY_AGG_DESERIALIZE) &&
!agg_args_support_sendreceive(aggref))
root->hasNonSerialAggs = true;
}
}
}
agginfo->transno = transno;
}
/*
* Fill in the fields in the Aggref (aggtranstype was set above already)
*/
aggref->aggno = aggno;
aggref->aggtransno = transno;
}
static bool
preprocess_aggrefs_walker(Node *node, PlannerInfo *root)
{
if (node == NULL)
return false;
if (IsA(node, Aggref))
{
Aggref *aggref = (Aggref *) node;
preprocess_aggref(aggref, root);
/*
* We assume that the parser checked that there are no aggregates (of
* this level anyway) in the aggregated arguments, direct arguments,
* or filter clause. Hence, we need not recurse into any of them.
*/
return false;
}
Assert(!IsA(node, SubLink));
return expression_tree_walker(node, preprocess_aggrefs_walker,
(void *) root);
}
/*
* find_compatible_agg - search for a previously initialized per-Agg struct
*
* Searches the previously looked at aggregates to find one which is compatible
* with this one, with the same input parameters. If no compatible aggregate
* can be found, returns -1.
*
* As a side-effect, this also collects a list of existing, shareable per-Trans
* structs with matching inputs. If no identical Aggref is found, the list is
* passed later to find_compatible_trans, to see if we can at least reuse
* the state value of another aggregate.
*/
static int
find_compatible_agg(PlannerInfo *root, Aggref *newagg,
List **same_input_transnos)
{
ListCell *lc;
int aggno;
*same_input_transnos = NIL;
/* we mustn't reuse the aggref if it contains volatile function calls */
if (contain_volatile_functions((Node *) newagg))
return -1;
/*
* Search through the list of already seen aggregates. If we find an
* existing identical aggregate call, then we can re-use that one. While
* searching, we'll also collect a list of Aggrefs with the same input
* parameters. If no matching Aggref is found, the caller can potentially
* still re-use the transition state of one of them. (At this stage we
* just compare the parsetrees; whether different aggregates share the
* same transition function will be checked later.)
*/
aggno = -1;
foreach(lc, root->agginfos)
{
AggInfo *agginfo = lfirst_node(AggInfo, lc);
Aggref *existingRef;
aggno++;
existingRef = linitial_node(Aggref, agginfo->aggrefs);
/* all of the following must be the same or it's no match */
if (newagg->inputcollid != existingRef->inputcollid ||
newagg->aggtranstype != existingRef->aggtranstype ||
newagg->aggstar != existingRef->aggstar ||
newagg->aggvariadic != existingRef->aggvariadic ||
newagg->aggkind != existingRef->aggkind ||
!equal(newagg->args, existingRef->args) ||
!equal(newagg->aggorder, existingRef->aggorder) ||
!equal(newagg->aggdistinct, existingRef->aggdistinct) ||
!equal(newagg->aggfilter, existingRef->aggfilter))
continue;
/* if it's the same aggregate function then report exact match */
if (newagg->aggfnoid == existingRef->aggfnoid &&
newagg->aggtype == existingRef->aggtype &&
newagg->aggcollid == existingRef->aggcollid &&
equal(newagg->aggdirectargs, existingRef->aggdirectargs))
{
list_free(*same_input_transnos);
*same_input_transnos = NIL;
return aggno;
}
/*
* Not identical, but it had the same inputs. If the final function
* permits sharing, return its transno to the caller, in case we can
* re-use its per-trans state. (If there's already sharing going on,
* we might report a transno more than once. find_compatible_trans is
* cheap enough that it's not worth spending cycles to avoid that.)
*/
if (agginfo->shareable)
*same_input_transnos = lappend_int(*same_input_transnos,
agginfo->transno);
}
return -1;
}
/*
* find_compatible_trans - search for a previously initialized per-Trans
* struct
*
* Searches the list of transnos for a per-Trans struct with the same
* transition function and initial condition. (The inputs have already been
* verified to match.)
*/
static int
find_compatible_trans(PlannerInfo *root, Aggref *newagg, bool shareable,
Oid aggtransfn, Oid aggtranstype,
int transtypeLen, bool transtypeByVal,
Oid aggcombinefn,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
List *transnos)
{
ListCell *lc;
/* If this aggregate can't share transition states, give up */
if (!shareable)
return -1;
foreach(lc, transnos)
{
int transno = lfirst_int(lc);
AggTransInfo *pertrans = list_nth_node(AggTransInfo,
root->aggtransinfos,
transno);
/*
* if the transfns or transition state types are not the same then the
* state can't be shared.
*/
if (aggtransfn != pertrans->transfn_oid ||
aggtranstype != pertrans->aggtranstype)
continue;
/*
* The serialization and deserialization functions must match, if
* present, as we're unable to share the trans state for aggregates
* which will serialize or deserialize into different formats.
* Remember that these will be InvalidOid if they're not required for
* this agg node.
*/
if (aggserialfn != pertrans->serialfn_oid ||
aggdeserialfn != pertrans->deserialfn_oid)
continue;
/*
* Combine function must also match. We only care about the combine
* function with partial aggregates, but it's too early in the
* planning to know if we will do partial aggregation, so be
* conservative.
*/
if (aggcombinefn != pertrans->combinefn_oid)
continue;
/*
* Check that the initial condition matches, too.
*/
if (initValueIsNull && pertrans->initValueIsNull)
return transno;
if (!initValueIsNull && !pertrans->initValueIsNull &&
datumIsEqual(initValue, pertrans->initValue,
transtypeByVal, transtypeLen))
return transno;
}
return -1;
}
static Datum
GetAggInitVal(Datum textInitVal, Oid transtype)
{
Oid typinput,
typioparam;
char *strInitVal;
Datum initVal;
getTypeInputInfo(transtype, &typinput, &typioparam);
strInitVal = TextDatumGetCString(textInitVal);
initVal = OidInputFunctionCall(typinput, strInitVal,
typioparam, -1);
pfree(strInitVal);
return initVal;
}
/*
* get_agg_clause_costs
* Process the PlannerInfo's 'aggtransinfos' and 'agginfos' lists
* accumulating the cost information about them.
*
* 'aggsplit' tells us the expected partial-aggregation mode, which affects
* the cost estimates.
*
* NOTE that the costs are ADDED to those already in *costs ... so the caller
* is responsible for zeroing the struct initially.
*
* For each AggTransInfo, we add the cost of an aggregate transition using
* either the transfn or combinefn depending on the 'aggsplit' value. We also
* account for the costs of any aggfilters and any serializations and
* deserializations of the transition state and also estimate the total space
* needed for the transition states as if each aggregate's state was stored in
* memory concurrently (as would be done in a HashAgg plan).
*
* For each AggInfo in the 'agginfos' list we add the cost of running the
* final function and the direct args, if any.
*/
void
get_agg_clause_costs(PlannerInfo *root, AggSplit aggsplit, AggClauseCosts *costs)
{
ListCell *lc;
foreach(lc, root->aggtransinfos)
{
AggTransInfo *transinfo = lfirst_node(AggTransInfo, lc);
/*
* Add the appropriate component function execution costs to
* appropriate totals.
*/
if (DO_AGGSPLIT_COMBINE(aggsplit))
{
/* charge for combining previously aggregated states */
add_function_cost(root, transinfo->combinefn_oid, NULL,
&costs->transCost);
}
else
add_function_cost(root, transinfo->transfn_oid, NULL,
&costs->transCost);
if (DO_AGGSPLIT_DESERIALIZE(aggsplit) &&
OidIsValid(transinfo->deserialfn_oid))
add_function_cost(root, transinfo->deserialfn_oid, NULL,
&costs->transCost);
if (DO_AGGSPLIT_SERIALIZE(aggsplit) &&
OidIsValid(transinfo->serialfn_oid))
add_function_cost(root, transinfo->serialfn_oid, NULL,
&costs->finalCost);
/*
* These costs are incurred only by the initial aggregate node, so we
* mustn't include them again at upper levels.
*/
if (!DO_AGGSPLIT_COMBINE(aggsplit))
{
/* add the input expressions' cost to per-input-row costs */
QualCost argcosts;
cost_qual_eval_node(&argcosts, (Node *) transinfo->args, root);
costs->transCost.startup += argcosts.startup;
costs->transCost.per_tuple += argcosts.per_tuple;
/*
* Add any filter's cost to per-input-row costs.
*
* XXX Ideally we should reduce input expression costs according
* to filter selectivity, but it's not clear it's worth the
* trouble.
*/
if (transinfo->aggfilter)
{
cost_qual_eval_node(&argcosts, (Node *) transinfo->aggfilter,
root);
costs->transCost.startup += argcosts.startup;
costs->transCost.per_tuple += argcosts.per_tuple;
}
}
/*
* If the transition type is pass-by-value then it doesn't add
* anything to the required size of the hashtable. If it is
* pass-by-reference then we have to add the estimated size of the
* value itself, plus palloc overhead.
*/
if (!transinfo->transtypeByVal)
{
int32 avgwidth;
/* Use average width if aggregate definition gave one */
if (transinfo->aggtransspace > 0)
avgwidth = transinfo->aggtransspace;
else if (transinfo->transfn_oid == F_ARRAY_APPEND)
{
/*
* If the transition function is array_append(), it'll use an
* expanded array as transvalue, which will occupy at least
* ALLOCSET_SMALL_INITSIZE and possibly more. Use that as the
* estimate for lack of a better idea.
*/
avgwidth = ALLOCSET_SMALL_INITSIZE;
}
else
{
avgwidth = get_typavgwidth(transinfo->aggtranstype, transinfo->aggtranstypmod);
}
avgwidth = MAXALIGN(avgwidth);
costs->transitionSpace += avgwidth + 2 * sizeof(void *);
}
else if (transinfo->aggtranstype == INTERNALOID)
{
/*
* INTERNAL transition type is a special case: although INTERNAL
* is pass-by-value, it's almost certainly being used as a pointer
* to some large data structure. The aggregate definition can
* provide an estimate of the size. If it doesn't, then we assume
* ALLOCSET_DEFAULT_INITSIZE, which is a good guess if the data is
* being kept in a private memory context, as is done by
* array_agg() for instance.
*/
if (transinfo->aggtransspace > 0)
costs->transitionSpace += transinfo->aggtransspace;
else
costs->transitionSpace += ALLOCSET_DEFAULT_INITSIZE;
}
}
foreach(lc, root->agginfos)
{
AggInfo *agginfo = lfirst_node(AggInfo, lc);
Aggref *aggref = linitial_node(Aggref, agginfo->aggrefs);
/*
* Add the appropriate component function execution costs to
* appropriate totals.
*/
if (!DO_AGGSPLIT_SKIPFINAL(aggsplit) &&
OidIsValid(agginfo->finalfn_oid))
add_function_cost(root, agginfo->finalfn_oid, NULL,
&costs->finalCost);
/*
* If there are direct arguments, treat their evaluation cost like the
* cost of the finalfn.
*/
if (aggref->aggdirectargs)
{
QualCost argcosts;
cost_qual_eval_node(&argcosts, (Node *) aggref->aggdirectargs,
root);
costs->finalCost.startup += argcosts.startup;
costs->finalCost.per_tuple += argcosts.per_tuple;
}
}
}