postgresql/src/backend/executor/nodeAgg.c

660 lines
17 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* nodeAgg.c
* Routines to handle aggregate nodes.
*
* Copyright (c) 1994, Regents of the University of California
*
*
* NOTE
* The implementation of Agg node has been reworked to handle legal
* SQL aggregates. (Do not expect POSTQUEL semantics.) -- ay 2/95
*
* IDENTIFICATION
* /usr/local/devel/pglite/cvs/src/backend/executor/nodeAgg.c,v 1.13 1995/08/01 20:19:07 jolly Exp
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "catalog/pg_aggregate.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "optimizer/clauses.h"
1999-07-16 07:00:38 +02:00
#include "parser/parse_type.h"
#include "utils/syscache.h"
/*
* AggFuncInfo -
* keeps the transition functions information around
*/
typedef struct AggFuncInfo
{
Oid xfn1_oid;
Oid xfn2_oid;
Oid finalfn_oid;
FmgrInfo xfn1;
FmgrInfo xfn2;
FmgrInfo finalfn;
1997-09-08 22:59:27 +02:00
} AggFuncInfo;
1999-05-26 00:43:53 +02:00
static Datum aggGetAttr(TupleTableSlot *tuple, Aggref *aggref, bool *isNull);
/* ---------------------------------------
*
* ExecAgg -
*
* ExecAgg receives tuples from its outer subplan and aggregates over
* the appropriate attribute for each (unique) aggregate in the target
* list. (The number of tuples to aggregate over depends on whether a
* GROUP BY clause is present. It might be the number of tuples in a
* group or all the tuples that satisfy the qualifications.) The value of
* each aggregate is stored in the expression context for ExecProject to
* evaluate the result tuple.
*
* ExecAgg evaluates each aggregate in the following steps: (initcond1,
* initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are
* the transition functions.)
*
* value1[i] = initcond1
* value2[i] = initcond2
* forall tuples do
* value1[i] = sfunc1(value1[i], aggregated_value)
* value2[i] = sfunc2(value2[i])
* value1[i] = finalfunc(value1[i], value2[i])
*
* If initcond1 is NULL then the first non-NULL aggregated_value is
* assigned directly to value1[i]. sfunc1 isn't applied until value1[i]
* is non-NULL.
*
* If the outer subplan is a Group node, ExecAgg returns as many tuples
* as there are groups.
*
* XXX handling of NULL doesn't work
*
* OLD COMMENTS
*
* XXX Aggregates should probably have another option: what to do
* with transfn2 if we hit a null value. "count" (transfn1 = null,
* transfn2 = increment) will want to have transfn2 called; "avg"
* (transfn1 = add, transfn2 = increment) will not. -pma 1/3/93
*
* ------------------------------------------
*/
TupleTableSlot *
1997-09-08 22:59:27 +02:00
ExecAgg(Agg *node)
{
AggState *aggstate;
EState *estate;
Plan *outerPlan;
int aggno,
nagg;
Datum *value1,
*value2;
int *noInitValue;
AggFuncInfo *aggFuncInfo;
long nTuplesAgged = 0;
ExprContext *econtext;
ProjectionInfo *projInfo;
TupleTableSlot *resultSlot;
HeapTuple oneTuple;
List *alist;
char *nulls;
bool isDone;
bool isNull = FALSE,
isNull1 = FALSE,
isNull2 = FALSE;
bool qual_result;
1999-06-12 16:07:33 +02:00
/* ---------------------
* get state info from node
* ---------------------
*/
/*
1999-05-25 18:15:34 +02:00
* We loop retrieving groups until we find one matching
* node->plan.qual
*/
do
{
aggstate = node->aggstate;
if (aggstate->agg_done)
return NULL;
estate = node->plan.state;
econtext = aggstate->csstate.cstate.cs_ExprContext;
nagg = length(node->aggs);
1999-05-25 18:15:34 +02:00
value1 = node->aggstate->csstate.cstate.cs_ExprContext->ecxt_values;
nulls = node->aggstate->csstate.cstate.cs_ExprContext->ecxt_nulls;
value2 = (Datum *) palloc(sizeof(Datum) * nagg);
MemSet(value2, 0, sizeof(Datum) * nagg);
aggFuncInfo = (AggFuncInfo *) palloc(sizeof(AggFuncInfo) * nagg);
MemSet(aggFuncInfo, 0, sizeof(AggFuncInfo) * nagg);
noInitValue = (int *) palloc(sizeof(int) * nagg);
1999-01-27 00:32:04 +01:00
MemSet(noInitValue, 0, sizeof(int) * nagg);
outerPlan = outerPlan(node);
oneTuple = NULL;
projInfo = aggstate->csstate.cstate.cs_ProjInfo;
1999-01-27 00:32:04 +01:00
aggno = -1;
foreach(alist, node->aggs)
{
Aggref *aggref = lfirst(alist);
char *aggname;
HeapTuple aggTuple;
Form_pg_aggregate aggp;
Oid xfn1_oid,
xfn2_oid,
finalfn_oid;
1999-01-27 17:15:01 +01:00
aggref->aggno = ++aggno;
1999-05-25 18:15:34 +02:00
/* ---------------------
* find transfer functions of all the aggregates and initialize
* their initial values
* ---------------------
*/
aggname = aggref->aggname;
aggTuple = SearchSysCacheTuple(AGGNAME,
PointerGetDatum(aggname),
1999-05-25 18:15:34 +02:00
ObjectIdGetDatum(aggref->basetype),
0, 0);
if (!HeapTupleIsValid(aggTuple))
elog(ERROR, "ExecAgg: cache lookup failed for aggregate \"%s\"(%s)",
aggname,
typeidTypeName(aggref->basetype));
aggp = (Form_pg_aggregate) GETSTRUCT(aggTuple);
xfn1_oid = aggp->aggtransfn1;
xfn2_oid = aggp->aggtransfn2;
finalfn_oid = aggp->aggfinalfn;
if (OidIsValid(finalfn_oid))
{
fmgr_info(finalfn_oid, &aggFuncInfo[aggno].finalfn);
aggFuncInfo[aggno].finalfn_oid = finalfn_oid;
}
if (OidIsValid(xfn2_oid))
{
fmgr_info(xfn2_oid, &aggFuncInfo[aggno].xfn2);
aggFuncInfo[aggno].xfn2_oid = xfn2_oid;
value2[aggno] = (Datum) AggNameGetInitVal((char *) aggname,
1999-05-25 18:15:34 +02:00
aggp->aggbasetype,
2,
&isNull2);
/* ------------------------------------------
* If there is a second transition function, its initial
* value must exist -- as it does not depend on data values,
* we have no other way of determining an initial value.
* ------------------------------------------
*/
if (isNull2)
elog(ERROR, "ExecAgg: agginitval2 is null");
}
if (OidIsValid(xfn1_oid))
{
fmgr_info(xfn1_oid, &aggFuncInfo[aggno].xfn1);
aggFuncInfo[aggno].xfn1_oid = xfn1_oid;
value1[aggno] = (Datum) AggNameGetInitVal((char *) aggname,
1999-05-25 18:15:34 +02:00
aggp->aggbasetype,
1,
&isNull1);
/* ------------------------------------------
* If the initial value for the first transition function
* doesn't exist in the pg_aggregate table then we let
* the first value returned from the outer procNode become
* the initial value. (This is useful for aggregates like
* max{} and min{}.)
* ------------------------------------------
*/
if (isNull1)
{
noInitValue[aggno] = 1;
nulls[aggno] = 1;
}
}
}
/* ----------------
* for each tuple from the the outer plan, apply all the aggregates
* ----------------
*/
for (;;)
{
TupleTableSlot *outerslot;
isNull = isNull1 = isNull2 = 0;
outerslot = ExecProcNode(outerPlan, (Plan *) node);
1998-11-27 20:52:36 +01:00
if (TupIsNull(outerslot))
{
1999-05-25 18:15:34 +02:00
/*
* when the outerplan doesn't return a single tuple,
* create a dummy heaptuple anyway because we still need
* to return a valid aggregate value. The value returned
* will be the initial values of the transition functions
*/
if (nTuplesAgged == 0)
{
TupleDesc tupType;
Datum *tupValue;
char *null_array;
1999-01-27 17:15:01 +01:00
AttrNumber attnum;
tupType = aggstate->csstate.css_ScanTupleSlot->ttc_tupleDescriptor;
tupValue = projInfo->pi_tupValue;
/* initially, set all the values to NULL */
1999-01-27 00:32:04 +01:00
null_array = palloc(sizeof(char) * tupType->natts);
1999-01-27 17:15:01 +01:00
for (attnum = 0; attnum < tupType->natts; attnum++)
null_array[attnum] = 'n';
oneTuple = heap_formtuple(tupType, tupValue, null_array);
pfree(null_array);
}
break;
}
1999-01-27 00:32:04 +01:00
aggno = -1;
foreach(alist, node->aggs)
{
Aggref *aggref = lfirst(alist);
1999-01-27 00:32:04 +01:00
AggFuncInfo *aggfns = &aggFuncInfo[++aggno];
Datum newVal;
Datum args[2];
/* Do we really need the special case for Var here? */
if (IsA(aggref->target, Var))
{
newVal = aggGetAttr(outerslot, aggref,
&isNull);
}
else
{
econtext->ecxt_scantuple = outerslot;
newVal = ExecEvalExpr(aggref->target, econtext,
&isNull, &isDone);
}
if (isNull && !aggref->usenulls)
continue; /* ignore this tuple for this agg */
if (aggfns->xfn1.fn_addr != NULL)
{
if (noInitValue[aggno])
{
1999-05-25 18:15:34 +02:00
/*
1999-05-25 18:15:34 +02:00
* value1 has not been initialized. This is the
* first non-NULL input value. We use it as the
* initial value for value1.
*
1999-05-25 18:15:34 +02:00
* But we can't just use it straight, we have to make
* a copy of it since the tuple from which it came
* will be freed on the next iteration of the
* scan. This requires finding out how to copy
* the Datum. We assume the datum is of the agg's
1999-05-25 18:15:34 +02:00
* basetype, or at least binary compatible with
* it.
*/
1999-05-25 18:15:34 +02:00
Type aggBaseType = typeidType(aggref->basetype);
int attlen = typeLen(aggBaseType);
bool byVal = typeByVal(aggBaseType);
if (byVal)
value1[aggno] = newVal;
else
{
1999-05-25 18:15:34 +02:00
if (attlen == -1) /* variable length */
attlen = VARSIZE((struct varlena *) newVal);
value1[aggno] = (Datum) palloc(attlen);
memcpy((char *) (value1[aggno]), (char *) newVal,
attlen);
}
noInitValue[aggno] = 0;
nulls[aggno] = 0;
}
else
{
1999-05-25 18:15:34 +02:00
/*
* apply the transition functions.
*/
args[0] = value1[aggno];
args[1] = newVal;
1999-05-25 18:15:34 +02:00
value1[aggno] = (Datum) fmgr_c(&aggfns->xfn1,
(FmgrValues *) args, &isNull1);
Assert(!isNull1);
}
}
if (aggfns->xfn2.fn_addr != NULL)
{
args[0] = value2[aggno];
1999-05-25 18:15:34 +02:00
value2[aggno] = (Datum) fmgr_c(&aggfns->xfn2,
(FmgrValues *) args, &isNull2);
Assert(!isNull2);
}
}
/*
* keep this for the projection (we only need one of these -
* all the tuples we aggregate over share the same group
* column)
*/
if (!oneTuple)
oneTuple = heap_copytuple(outerslot->val);
nTuplesAgged++;
}
/* --------------
* finalize the aggregate (if necessary), and get the resultant value
* --------------
*/
1999-01-27 00:32:04 +01:00
aggno = -1;
foreach(alist, node->aggs)
{
char *args[2];
1999-01-27 00:32:04 +01:00
AggFuncInfo *aggfns = &aggFuncInfo[++aggno];
if (noInitValue[aggno])
{
/*
* No values found for this agg; return current state.
* This seems to fix behavior for avg() aggregate. -tgl
* 12/96
*/
}
else if (aggfns->finalfn.fn_addr != NULL && nTuplesAgged > 0)
{
if (aggfns->finalfn.fn_nargs > 1)
{
args[0] = (char *) value1[aggno];
args[1] = (char *) value2[aggno];
}
else if (aggfns->xfn1.fn_addr != NULL)
args[0] = (char *) value1[aggno];
else if (aggfns->xfn2.fn_addr != NULL)
args[0] = (char *) value2[aggno];
else
elog(NOTICE, "ExecAgg: no valid transition functions??");
value1[aggno] = (Datum) fmgr_c(&aggfns->finalfn,
1999-05-25 18:15:34 +02:00
(FmgrValues *) args, &(nulls[aggno]));
}
else if (aggfns->xfn1.fn_addr != NULL)
{
/*
* value in the right place, ignore. (If you remove this
* case, fix the else part. -ay 2/95)
*/
}
else if (aggfns->xfn2.fn_addr != NULL)
value1[aggno] = value2[aggno];
else
elog(ERROR, "ExecAgg: no valid transition functions??");
}
/*
* whether the aggregation is done depends on whether we are doing
* aggregation over groups or the entire table
*/
if (nodeTag(outerPlan) == T_Group)
{
/* aggregation over groups */
aggstate->agg_done = ((Group *) outerPlan)->grpstate->grp_done;
}
else
aggstate->agg_done = TRUE;
/* ----------------
* form a projection tuple, store it in the result tuple
* slot and return it.
* ----------------
*/
ExecStoreTuple(oneTuple,
aggstate->csstate.css_ScanTupleSlot,
InvalidBuffer,
false);
econtext->ecxt_scantuple = aggstate->csstate.css_ScanTupleSlot;
resultSlot = ExecProject(projInfo, &isDone);
/*
* As long as the retrieved group does not match the
* qualifications it is ignored and the next group is fetched
*/
1999-05-25 18:15:34 +02:00
if (node->plan.qual != NULL)
qual_result = ExecQual(fix_opids(node->plan.qual), econtext);
else
qual_result = false;
if (oneTuple)
pfree(oneTuple);
}
while (node->plan.qual != NULL && qual_result != true);
return resultSlot;
}
/* -----------------
* ExecInitAgg
*
* Creates the run-time information for the agg node produced by the
* planner and initializes its outer subtree
* -----------------
*/
bool
ExecInitAgg(Agg *node, EState *estate, Plan *parent)
{
AggState *aggstate;
Plan *outerPlan;
ExprContext *econtext;
1999-05-25 18:15:34 +02:00
/*
* assign the node's execution state
*/
node->plan.state = estate;
/*
* create state structure
*/
aggstate = makeNode(AggState);
node->aggstate = aggstate;
aggstate->agg_done = FALSE;
1999-05-25 18:15:34 +02:00
/*
* assign node's base id and create expression context
*/
1999-01-27 17:15:01 +01:00
ExecAssignNodeBaseInfo(estate, &aggstate->csstate.cstate, (Plan *) parent);
ExecAssignExprContext(estate, &aggstate->csstate.cstate);
#define AGG_NSLOTS 2
/*
* tuple table initialization
*/
ExecInitScanTupleSlot(estate, &aggstate->csstate);
ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate);
econtext = aggstate->csstate.cstate.cs_ExprContext;
1999-05-25 18:15:34 +02:00
econtext->ecxt_values = (Datum *) palloc(sizeof(Datum) * length(node->aggs));
MemSet(econtext->ecxt_values, 0, sizeof(Datum) * length(node->aggs));
1999-01-27 00:32:04 +01:00
econtext->ecxt_nulls = (char *) palloc(sizeof(char) * length(node->aggs));
MemSet(econtext->ecxt_nulls, 0, sizeof(char) * length(node->aggs));
/*
* initializes child nodes
*/
outerPlan = outerPlan(node);
ExecInitNode(outerPlan, estate, (Plan *) node);
/*
* Result runs in its own context, but make it use our aggregates fix
* for 'select sum(2+2)'
*/
if (nodeTag(outerPlan) == T_Result)
{
((Result *) outerPlan)->resstate->cstate.cs_ProjInfo->pi_exprContext->ecxt_values =
econtext->ecxt_values;
((Result *) outerPlan)->resstate->cstate.cs_ProjInfo->pi_exprContext->ecxt_nulls =
econtext->ecxt_nulls;
}
/* ----------------
* initialize tuple type.
* ----------------
*/
ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate);
/*
* Initialize tuple type for both result and scan. This node does no
* projection
*/
ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate);
ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate);
return TRUE;
}
int
1997-09-08 22:59:27 +02:00
ExecCountSlotsAgg(Agg *node)
{
return ExecCountSlotsNode(outerPlan(node)) +
1999-05-25 18:15:34 +02:00
ExecCountSlotsNode(innerPlan(node)) +
AGG_NSLOTS;
}
/* ------------------------
* ExecEndAgg(node)
*
* -----------------------
*/
void
1997-09-08 22:59:27 +02:00
ExecEndAgg(Agg *node)
{
AggState *aggstate;
Plan *outerPlan;
aggstate = node->aggstate;
ExecFreeProjectionInfo(&aggstate->csstate.cstate);
outerPlan = outerPlan(node);
ExecEndNode(outerPlan, (Plan *) node);
/* clean up tuple table */
ExecClearTuple(aggstate->csstate.css_ScanTupleSlot);
}
/*****************************************************************************
* Support Routines
*****************************************************************************/
/*
* aggGetAttr -
* get the attribute (specified in the Var node in agg) to aggregate
* over from the tuple
*/
static Datum
aggGetAttr(TupleTableSlot *slot,
1999-05-26 00:43:53 +02:00
Aggref *aggref,
bool *isNull)
{
Datum result;
AttrNumber attnum;
HeapTuple heapTuple;
TupleDesc tuple_type;
Buffer buffer;
/* ----------------
* extract tuple information from the slot
* ----------------
*/
heapTuple = slot->val;
tuple_type = slot->ttc_tupleDescriptor;
buffer = slot->ttc_buffer;
attnum = ((Var *) aggref->target)->varattno;
/*
* If the attribute number is invalid, then we are supposed to return
* the entire tuple, we give back a whole slot so that callers know
* what the tuple looks like.
*/
if (attnum == InvalidAttrNumber)
{
TupleTableSlot *tempSlot;
TupleDesc td;
HeapTuple tup;
tempSlot = makeNode(TupleTableSlot);
tempSlot->ttc_shouldFree = false;
tempSlot->ttc_descIsNew = true;
1999-01-27 00:32:04 +01:00
tempSlot->ttc_tupleDescriptor = (TupleDesc) NULL;
tempSlot->ttc_buffer = InvalidBuffer;
tempSlot->ttc_whichplan = -1;
1998-11-27 20:52:36 +01:00
tup = heap_copytuple(heapTuple);
td = CreateTupleDescCopy(slot->ttc_tupleDescriptor);
ExecSetSlotDescriptor(tempSlot, td);
ExecStoreTuple(tup, tempSlot, InvalidBuffer, true);
return (Datum) tempSlot;
}
1999-05-25 18:15:34 +02:00
result = heap_getattr(heapTuple, /* tuple containing attribute */
attnum, /* attribute number of desired
* attribute */
tuple_type, /* tuple descriptor of tuple */
isNull); /* return: is attribute null? */
/* ----------------
* return null if att is null
* ----------------
*/
if (*isNull)
return (Datum) NULL;
return result;
}
void
ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
{
AggState *aggstate = node->aggstate;
ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext;
aggstate->agg_done = FALSE;
MemSet(econtext->ecxt_values, 0, sizeof(Datum) * length(node->aggs));
1999-01-27 00:32:04 +01:00
MemSet(econtext->ecxt_nulls, 0, sizeof(char) * length(node->aggs));
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (((Plan *) node)->lefttree->chgParam == NULL)
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
}