Perform one only projection to compute agg arguments.

Previously we did a ExecProject() for each individual aggregate
argument. That turned out to be a performance bottleneck in queries with
multiple aggregates.

Doing all the argument computations in one ExecProject() is quite a bit
cheaper because ExecProject's fastpath can do the work at once in a
relatively tight loop, and because it can get all the required columns
with a single slot_getsomeattr and save some other redundant setup
costs.

Author: Andres Freund
Reviewed-By: Heikki Linnakangas
Discussion: https://postgr.es/m/20161103110721.h5i5t5saxfk5eeik@alap3.anarazel.de
This commit is contained in:
Andres Freund 2016-11-30 16:08:11 -08:00
parent 6d46f4783e
commit 8ed3f11bb0
2 changed files with 119 additions and 52 deletions

View File

@ -160,6 +160,7 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/nodeAgg.h" #include "executor/nodeAgg.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/tlist.h" #include "optimizer/tlist.h"
@ -213,6 +214,9 @@ typedef struct AggStatePerTransData
*/ */
int numInputs; int numInputs;
/* offset of input columns in AggState->evalslot */
int inputoff;
/* /*
* Number of aggregated input columns to pass to the transfn. This * Number of aggregated input columns to pass to the transfn. This
* includes the ORDER BY columns for ordered-set aggs, but not for plain * includes the ORDER BY columns for ordered-set aggs, but not for plain
@ -234,7 +238,6 @@ typedef struct AggStatePerTransData
/* ExprStates of the FILTER and argument expressions. */ /* ExprStates of the FILTER and argument expressions. */
ExprState *aggfilter; /* state of FILTER expression, if any */ ExprState *aggfilter; /* state of FILTER expression, if any */
List *args; /* states of aggregated-argument expressions */
List *aggdirectargs; /* states of direct-argument expressions */ List *aggdirectargs; /* states of direct-argument expressions */
/* /*
@ -291,19 +294,19 @@ typedef struct AggStatePerTransData
transtypeByVal; transtypeByVal;
/* /*
* Stuff for evaluation of inputs. We used to just use ExecEvalExpr, but * Stuff for evaluation of aggregate inputs in cases where the aggregate
* with the addition of ORDER BY we now need at least a slot for passing * requires sorted input. The arguments themselves will be evaluated via
* data to the sort object, which requires a tupledesc, so we might as * AggState->evalslot/evalproj for all aggregates at once, but we only
* well go whole hog and use ExecProject too. * want to sort the relevant columns for individual aggregates.
*/ */
TupleDesc evaldesc; /* descriptor of input tuples */ TupleDesc sortdesc; /* descriptor of input tuples */
ProjectionInfo *evalproj; /* projection machinery */
/* /*
* Slots for holding the evaluated input arguments. These are set up * Slots for holding the evaluated input arguments. These are set up
* during ExecInitAgg() and then used for each input row. * during ExecInitAgg() and then used for each input row requiring
* procesessing besides what's done in AggState->evalproj.
*/ */
TupleTableSlot *evalslot; /* current input tuple */ TupleTableSlot *sortslot; /* current input tuple */
TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */
/* /*
@ -621,14 +624,14 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
*/ */
if (pertrans->numInputs == 1) if (pertrans->numInputs == 1)
pertrans->sortstates[aggstate->current_set] = pertrans->sortstates[aggstate->current_set] =
tuplesort_begin_datum(pertrans->evaldesc->attrs[0]->atttypid, tuplesort_begin_datum(pertrans->sortdesc->attrs[0]->atttypid,
pertrans->sortOperators[0], pertrans->sortOperators[0],
pertrans->sortCollations[0], pertrans->sortCollations[0],
pertrans->sortNullsFirst[0], pertrans->sortNullsFirst[0],
work_mem, false); work_mem, false);
else else
pertrans->sortstates[aggstate->current_set] = pertrans->sortstates[aggstate->current_set] =
tuplesort_begin_heap(pertrans->evaldesc, tuplesort_begin_heap(pertrans->sortdesc,
pertrans->numSortCols, pertrans->numSortCols,
pertrans->sortColIdx, pertrans->sortColIdx,
pertrans->sortOperators, pertrans->sortOperators,
@ -847,6 +850,11 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
int setno = 0; int setno = 0;
int numGroupingSets = Max(aggstate->phase->numsets, 1); int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numTrans = aggstate->numtrans; int numTrans = aggstate->numtrans;
TupleTableSlot *slot = aggstate->evalslot;
/* compute input for all aggregates */
if (aggstate->evalproj)
aggstate->evalslot = ExecProject(aggstate->evalproj, NULL);
for (transno = 0; transno < numTrans; transno++) for (transno = 0; transno < numTrans; transno++)
{ {
@ -854,7 +862,7 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
ExprState *filter = pertrans->aggfilter; ExprState *filter = pertrans->aggfilter;
int numTransInputs = pertrans->numTransInputs; int numTransInputs = pertrans->numTransInputs;
int i; int i;
TupleTableSlot *slot; int inputoff = pertrans->inputoff;
/* Skip anything FILTERed out */ /* Skip anything FILTERed out */
if (filter) if (filter)
@ -868,13 +876,10 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
continue; continue;
} }
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(pertrans->evalproj, NULL);
if (pertrans->numSortCols > 0) if (pertrans->numSortCols > 0)
{ {
/* DISTINCT and/or ORDER BY case */ /* DISTINCT and/or ORDER BY case */
Assert(slot->tts_nvalid == pertrans->numInputs); Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff));
/* /*
* If the transfn is strict, we want to check for nullity before * If the transfn is strict, we want to check for nullity before
@ -887,7 +892,7 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
{ {
for (i = 0; i < numTransInputs; i++) for (i = 0; i < numTransInputs; i++)
{ {
if (slot->tts_isnull[i]) if (slot->tts_isnull[i + inputoff])
break; break;
} }
if (i < numTransInputs) if (i < numTransInputs)
@ -899,10 +904,25 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
/* OK, put the tuple into the tuplesort object */ /* OK, put the tuple into the tuplesort object */
if (pertrans->numInputs == 1) if (pertrans->numInputs == 1)
tuplesort_putdatum(pertrans->sortstates[setno], tuplesort_putdatum(pertrans->sortstates[setno],
slot->tts_values[0], slot->tts_values[inputoff],
slot->tts_isnull[0]); slot->tts_isnull[inputoff]);
else else
tuplesort_puttupleslot(pertrans->sortstates[setno], slot); {
/*
* Copy slot contents, starting from inputoff, into sort
* slot.
*/
ExecClearTuple(pertrans->sortslot);
memcpy(pertrans->sortslot->tts_values,
&slot->tts_values[inputoff],
pertrans->numInputs * sizeof(Datum));
memcpy(pertrans->sortslot->tts_isnull,
&slot->tts_isnull[inputoff],
pertrans->numInputs * sizeof(bool));
pertrans->sortslot->tts_nvalid = pertrans->numInputs;
ExecStoreVirtualTuple(pertrans->sortslot);
tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot);
}
} }
} }
else else
@ -915,8 +935,8 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
Assert(slot->tts_nvalid >= numTransInputs); Assert(slot->tts_nvalid >= numTransInputs);
for (i = 0; i < numTransInputs; i++) for (i = 0; i < numTransInputs; i++)
{ {
fcinfo->arg[i + 1] = slot->tts_values[i]; fcinfo->arg[i + 1] = slot->tts_values[i + inputoff];
fcinfo->argnull[i + 1] = slot->tts_isnull[i]; fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff];
} }
for (setno = 0; setno < numGroupingSets; setno++) for (setno = 0; setno < numGroupingSets; setno++)
@ -943,20 +963,24 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
{ {
int transno; int transno;
int numTrans = aggstate->numtrans; int numTrans = aggstate->numtrans;
TupleTableSlot *slot = NULL;
/* combine not supported with grouping sets */ /* combine not supported with grouping sets */
Assert(aggstate->phase->numsets == 0); Assert(aggstate->phase->numsets == 0);
/* compute input for all aggregates */
if (aggstate->evalproj)
slot = ExecProject(aggstate->evalproj, NULL);
for (transno = 0; transno < numTrans; transno++) for (transno = 0; transno < numTrans; transno++)
{ {
AggStatePerTrans pertrans = &aggstate->pertrans[transno]; AggStatePerTrans pertrans = &aggstate->pertrans[transno];
AggStatePerGroup pergroupstate = &pergroup[transno]; AggStatePerGroup pergroupstate = &pergroup[transno];
TupleTableSlot *slot;
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
int inputoff = pertrans->inputoff;
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(pertrans->evalproj, NULL);
Assert(slot->tts_nvalid >= 1); Assert(slot->tts_nvalid >= 1);
Assert(slot->tts_nvalid + inputoff >= 1);
/* /*
* deserialfn_oid will be set if we must deserialize the input state * deserialfn_oid will be set if we must deserialize the input state
@ -965,18 +989,18 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
if (OidIsValid(pertrans->deserialfn_oid)) if (OidIsValid(pertrans->deserialfn_oid))
{ {
/* Don't call a strict deserialization function with NULL input */ /* Don't call a strict deserialization function with NULL input */
if (pertrans->deserialfn.fn_strict && slot->tts_isnull[0]) if (pertrans->deserialfn.fn_strict && slot->tts_isnull[inputoff])
{ {
fcinfo->arg[1] = slot->tts_values[0]; fcinfo->arg[1] = slot->tts_values[inputoff];
fcinfo->argnull[1] = slot->tts_isnull[0]; fcinfo->argnull[1] = slot->tts_isnull[inputoff];
} }
else else
{ {
FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo; FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo;
MemoryContext oldContext; MemoryContext oldContext;
dsinfo->arg[0] = slot->tts_values[0]; dsinfo->arg[0] = slot->tts_values[inputoff];
dsinfo->argnull[0] = slot->tts_isnull[0]; dsinfo->argnull[0] = slot->tts_isnull[inputoff];
/* Dummy second argument for type-safety reasons */ /* Dummy second argument for type-safety reasons */
dsinfo->arg[1] = PointerGetDatum(NULL); dsinfo->arg[1] = PointerGetDatum(NULL);
dsinfo->argnull[1] = false; dsinfo->argnull[1] = false;
@ -995,8 +1019,8 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
} }
else else
{ {
fcinfo->arg[1] = slot->tts_values[0]; fcinfo->arg[1] = slot->tts_values[inputoff];
fcinfo->argnull[1] = slot->tts_isnull[0]; fcinfo->argnull[1] = slot->tts_isnull[inputoff];
} }
advance_combine_function(aggstate, pertrans, pergroupstate); advance_combine_function(aggstate, pertrans, pergroupstate);
@ -1233,7 +1257,7 @@ process_ordered_aggregate_multi(AggState *aggstate,
{ {
MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
TupleTableSlot *slot1 = pertrans->evalslot; TupleTableSlot *slot1 = pertrans->sortslot;
TupleTableSlot *slot2 = pertrans->uniqslot; TupleTableSlot *slot2 = pertrans->uniqslot;
int numTransInputs = pertrans->numTransInputs; int numTransInputs = pertrans->numTransInputs;
int numDistinctCols = pertrans->numDistinctCols; int numDistinctCols = pertrans->numDistinctCols;
@ -2343,10 +2367,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
transno, transno,
aggno; aggno;
int phase; int phase;
List *combined_inputeval;
ListCell *l; ListCell *l;
Bitmapset *all_grouped_cols = NULL; Bitmapset *all_grouped_cols = NULL;
int numGroupingSets = 1; int numGroupingSets = 1;
int numPhases; int numPhases;
int column_offset;
int i = 0; int i = 0;
int j = 0; int j = 0;
@ -2928,6 +2954,53 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->numaggs = aggno + 1; aggstate->numaggs = aggno + 1;
aggstate->numtrans = transno + 1; aggstate->numtrans = transno + 1;
/*
* Build a single projection computing the aggregate arguments for all
* aggregates at once, that's considerably faster than doing it separately
* for each.
*
* First create a targetlist combining the targetlist of all the
* transitions.
*/
combined_inputeval = NIL;
column_offset = 0;
for (transno = 0; transno < aggstate->numtrans; transno++)
{
AggStatePerTrans pertrans = &pertransstates[transno];
ListCell *arg;
pertrans->inputoff = column_offset;
/*
* Adjust resno in a copied target entries, to point into the combined
* slot.
*/
foreach(arg, pertrans->aggref->args)
{
TargetEntry *source_tle = (TargetEntry *) lfirst(arg);
TargetEntry *tle;
Assert(IsA(source_tle, TargetEntry));
tle = flatCopyTargetEntry(source_tle);
tle->resno += column_offset;
combined_inputeval = lappend(combined_inputeval, tle);
}
column_offset += list_length(pertrans->aggref->args);
}
/* and then create a projection for that targetlist */
aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false);
aggstate->evalslot = ExecInitExtraTupleSlot(estate);
combined_inputeval = (List *) ExecInitExpr((Expr *) combined_inputeval,
(PlanState *) aggstate);
aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval,
aggstate->tmpcontext,
aggstate->evalslot,
NULL);
ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc);
return aggstate; return aggstate;
} }
@ -3098,24 +3171,12 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
} }
/*
* Get a tupledesc corresponding to the aggregated inputs (including sort
* expressions) of the agg.
*/
pertrans->evaldesc = ExecTypeFromTL(aggref->args, false);
/* Create slot we're going to do argument evaluation in */
pertrans->evalslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(pertrans->evalslot, pertrans->evaldesc);
/* Initialize the input and FILTER expressions */ /* Initialize the input and FILTER expressions */
naggs = aggstate->numaggs; naggs = aggstate->numaggs;
pertrans->aggfilter = ExecInitExpr(aggref->aggfilter, pertrans->aggfilter = ExecInitExpr(aggref->aggfilter,
(PlanState *) aggstate); (PlanState *) aggstate);
pertrans->aggdirectargs = (List *) ExecInitExpr((Expr *) aggref->aggdirectargs, pertrans->aggdirectargs = (List *) ExecInitExpr((Expr *) aggref->aggdirectargs,
(PlanState *) aggstate); (PlanState *) aggstate);
pertrans->args = (List *) ExecInitExpr((Expr *) aggref->args,
(PlanState *) aggstate);
/* /*
* Complain if the aggregate's arguments contain any aggregates; nested * Complain if the aggregate's arguments contain any aggregates; nested
@ -3127,12 +3188,6 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
(errcode(ERRCODE_GROUPING_ERROR), (errcode(ERRCODE_GROUPING_ERROR),
errmsg("aggregate function calls cannot be nested"))); errmsg("aggregate function calls cannot be nested")));
/* Set up projection info for evaluation */
pertrans->evalproj = ExecBuildProjectionInfo(pertrans->args,
aggstate->tmpcontext,
pertrans->evalslot,
NULL);
/* /*
* If we're doing either DISTINCT or ORDER BY for a plain agg, then we * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
* have a list of SortGroupClause nodes; fish out the data in them and * have a list of SortGroupClause nodes; fish out the data in them and
@ -3165,6 +3220,14 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
if (numSortCols > 0) if (numSortCols > 0)
{ {
/*
* Get a tupledesc and slot corresponding to the aggregated inputs
* (including sort expressions) of the agg.
*/
pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
pertrans->sortslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc);
/* /*
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
* (yet) * (yet)
@ -3183,7 +3246,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
/* we will need an extra slot to store prior values */ /* we will need an extra slot to store prior values */
pertrans->uniqslot = ExecInitExtraTupleSlot(estate); pertrans->uniqslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(pertrans->uniqslot, ExecSetSlotDescriptor(pertrans->uniqslot,
pertrans->evaldesc); pertrans->sortdesc);
} }
/* Extract the sort information for use later */ /* Extract the sort information for use later */

View File

@ -1863,6 +1863,10 @@ typedef struct AggState
List *hash_needed; /* list of columns needed in hash table */ List *hash_needed; /* list of columns needed in hash table */
bool table_filled; /* hash table filled yet? */ bool table_filled; /* hash table filled yet? */
TupleHashIterator hashiter; /* for iterating through hash table */ TupleHashIterator hashiter; /* for iterating through hash table */
/* support for evaluation of agg inputs */
TupleTableSlot *evalslot; /* slot for agg inputs */
ProjectionInfo *evalproj; /* projection machinery */
TupleDesc evaldesc; /* descriptor of input tuples */
} AggState; } AggState;
/* ---------------- /* ----------------