Improve speed of aggregates that use array_append as transition function.

In the previous coding, if an aggregate's transition function returned an
expanded array, nodeAgg.c and nodeWindowAgg.c would always copy it and thus
force it into the flat representation.  This led to ping-ponging between
flat and expanded formats, which costs a lot.  For an aggregate using
array_append as transition function, I measured about a 15X slowdown
compared to the pre-9.5 code, when working on simple int[] arrays.
Of course, the old code was already O(N^2) in this usage due to copying
flat arrays all the time, but it wasn't quite this inefficient.

To fix, teach nodeAgg.c and nodeWindowAgg.c to allow expanded transition
values without copying, so long as the transition function takes care to
return the transition value already properly parented under the aggcontext.
That puts a bit of extra responsibility on the transition function, but
doing it this way allows us to not need any extra logic in the fast path
of advance_transition_function (ie, with a pass-by-value transition value,
or with a modified-in-place pass-by-reference value).  We already know
that that's a hot spot so I'm loath to add any cycles at all there.  Also,
while only array_append currently knows how to follow this convention,
this solution allows other transition functions to opt-in without needing
to have a whitelist in the core aggregation code.

(The reason we would need a whitelist is that currently, if you pass a
R/W expanded-object pointer to an arbitrary function, it's allowed to do
anything with it including deleting it; that breaks the core agg code's
assumption that it should free discarded values.  Returning a value under
aggcontext is the transition function's signal that it knows it is an
aggregate transition function and will play nice.  Possibly the API rules
for expanded objects should be refined, but that would not be a
back-patchable change.)

With this fix, an aggregate using array_append is no longer O(N^2), so it's
much faster than pre-9.5 code rather than much slower.  It's still a bit
slower than the bespoke infrastructure for array_agg, but the differential
seems to be only about 10%-20% rather than orders of magnitude.

Discussion: <6315.1477677885@sss.pgh.pa.us>
This commit is contained in:
Tom Lane 2016-10-30 12:27:41 -04:00
parent a775406ec4
commit 9a00f03e47
5 changed files with 142 additions and 33 deletions

View File

@ -626,7 +626,7 @@ if (AggCheckCallContext(fcinfo, NULL))
function, the first input
must be a temporary state value and can therefore safely be modified
in-place rather than allocating a new copy.
See <literal>int8inc()</> for an example.
See <function>int8inc()</> for an example.
(This is the <emphasis>only</>
case where it is safe for a function to modify a pass-by-reference input.
In particular, final functions for normal aggregates must not
@ -634,6 +634,20 @@ if (AggCheckCallContext(fcinfo, NULL))
re-executed on the same final state value.)
</para>
<para>
The second argument of <function>AggCheckCallContext</> can be used to
retrieve the memory context in which aggregate state values are being kept.
This is useful for transition functions that wish to use <quote>expanded</>
objects (see <xref linkend="xtypes-toast">) as their state values.
On first call, the transition function should return an expanded object
whose memory context is a child of the aggregate state context, and then
keep returning the same expanded object on subsequent calls. See
<function>array_append()</> for an example. (<function>array_append()</>
is not the transition function of any built-in aggregate, but it is written
to behave efficiently when used as transition function of a custom
aggregate.)
</para>
<para>
Another support routine available to aggregate functions written in C
is <function>AggGetAggref</>, which returns the <literal>Aggref</>

View File

@ -91,10 +91,13 @@
* transition value or a previous function result, and in either case its
* value need not be preserved. See int8inc() for an example. Notice that
* advance_transition_function() is coded to avoid a data copy step when
* the previous transition value pointer is returned. Also, some
* transition functions want to store working state in addition to the
* nominal transition value; they can use the memory context returned by
* AggCheckCallContext() to do that.
* the previous transition value pointer is returned. It is also possible
* to avoid repeated data copying when the transition value is an expanded
* object: to do that, the transition function must take care to return
* an expanded object that is in a child context of the memory context
* returned by AggCheckCallContext(). Also, some transition functions want
* to store working state in addition to the nominal transition value; they
* can use the memory context returned by AggCheckCallContext() to do that.
*
* Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
* AggState is available as context in earlier releases (back to 8.1),
@ -791,8 +794,10 @@ advance_transition_function(AggState *aggstate,
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything.
* free the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything. Also, if transfn returned a
* pointer to a R/W expanded object that is already a child of the
* aggcontext, assume we can adopt that value without copying it.
*/
if (!pertrans->transtypeByVal &&
DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
@ -800,12 +805,25 @@ advance_transition_function(AggState *aggstate,
if (!fcinfo->isnull)
{
MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
newVal = datumCopy(newVal,
pertrans->transtypeByVal,
pertrans->transtypeLen);
if (DatumIsReadWriteExpandedObject(newVal,
false,
pertrans->transtypeLen) &&
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
/* do nothing */ ;
else
newVal = datumCopy(newVal,
pertrans->transtypeByVal,
pertrans->transtypeLen);
}
if (!pergroupstate->transValueIsNull)
pfree(DatumGetPointer(pergroupstate->transValue));
{
if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
false,
pertrans->transtypeLen))
DeleteExpandedObject(pergroupstate->transValue);
else
pfree(DatumGetPointer(pergroupstate->transValue));
}
}
pergroupstate->transValue = newVal;
@ -1053,8 +1071,11 @@ advance_combine_function(AggState *aggstate,
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if the combine function returned a
* pointer to its first input, we don't need to do anything.
* free the prior transValue. But if the combine function returned a
* pointer to its first input, we don't need to do anything. Also, if the
* combine function returned a pointer to a R/W expanded object that is
* already a child of the aggcontext, assume we can adopt that value
* without copying it.
*/
if (!pertrans->transtypeByVal &&
DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
@ -1062,12 +1083,25 @@ advance_combine_function(AggState *aggstate,
if (!fcinfo->isnull)
{
MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
newVal = datumCopy(newVal,
pertrans->transtypeByVal,
pertrans->transtypeLen);
if (DatumIsReadWriteExpandedObject(newVal,
false,
pertrans->transtypeLen) &&
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
/* do nothing */ ;
else
newVal = datumCopy(newVal,
pertrans->transtypeByVal,
pertrans->transtypeLen);
}
if (!pergroupstate->transValueIsNull)
pfree(DatumGetPointer(pergroupstate->transValue));
{
if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
false,
pertrans->transtypeLen))
DeleteExpandedObject(pergroupstate->transValue);
else
pfree(DatumGetPointer(pergroupstate->transValue));
}
}
pergroupstate->transValue = newVal;
@ -1333,7 +1367,9 @@ finalize_aggregate(AggState *aggstate,
(void *) aggstate, NULL);
/* Fill in the transition state value */
fcinfo.arg[0] = pergroupstate->transValue;
fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
pergroupstate->transValueIsNull,
pertrans->transtypeLen);
fcinfo.argnull[0] = pergroupstate->transValueIsNull;
anynull |= pergroupstate->transValueIsNull;
@ -1360,6 +1396,7 @@ finalize_aggregate(AggState *aggstate,
}
else
{
/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
*resultVal = pergroupstate->transValue;
*resultIsNull = pergroupstate->transValueIsNull;
}
@ -1410,7 +1447,9 @@ finalize_partialaggregate(AggState *aggstate,
{
FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
fcinfo->arg[0] = pergroupstate->transValue;
fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
pergroupstate->transValueIsNull,
pertrans->transtypeLen);
fcinfo->argnull[0] = pergroupstate->transValueIsNull;
*resultVal = FunctionCallInvoke(fcinfo);
@ -1419,6 +1458,7 @@ finalize_partialaggregate(AggState *aggstate,
}
else
{
/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
*resultVal = pergroupstate->transValue;
*resultIsNull = pergroupstate->transValueIsNull;
}

View File

@ -362,8 +362,10 @@ advance_windowaggregate(WindowAggState *winstate,
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything.
* free the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything. Also, if transfn returned a
* pointer to a R/W expanded object that is already a child of the
* aggcontext, assume we can adopt that value without copying it.
*/
if (!peraggstate->transtypeByVal &&
DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
@ -371,12 +373,25 @@ advance_windowaggregate(WindowAggState *winstate,
if (!fcinfo->isnull)
{
MemoryContextSwitchTo(peraggstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
if (DatumIsReadWriteExpandedObject(newVal,
false,
peraggstate->transtypeLen) &&
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
/* do nothing */ ;
else
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
}
if (!peraggstate->transValueIsNull)
pfree(DatumGetPointer(peraggstate->transValue));
{
if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
false,
peraggstate->transtypeLen))
DeleteExpandedObject(peraggstate->transValue);
else
pfree(DatumGetPointer(peraggstate->transValue));
}
}
MemoryContextSwitchTo(oldContext);
@ -513,8 +528,10 @@ advance_windowaggregate_base(WindowAggState *winstate,
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if invtransfn returned a pointer to
* its first input, we don't need to do anything.
* free the prior transValue. But if invtransfn returned a pointer to its
* first input, we don't need to do anything. Also, if invtransfn
* returned a pointer to a R/W expanded object that is already a child of
* the aggcontext, assume we can adopt that value without copying it.
*
* Note: the checks for null values here will never fire, but it seems
* best to have this stanza look just like advance_windowaggregate.
@ -525,12 +542,25 @@ advance_windowaggregate_base(WindowAggState *winstate,
if (!fcinfo->isnull)
{
MemoryContextSwitchTo(peraggstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
if (DatumIsReadWriteExpandedObject(newVal,
false,
peraggstate->transtypeLen) &&
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
/* do nothing */ ;
else
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
}
if (!peraggstate->transValueIsNull)
pfree(DatumGetPointer(peraggstate->transValue));
{
if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
false,
peraggstate->transtypeLen))
DeleteExpandedObject(peraggstate->transValue);
else
pfree(DatumGetPointer(peraggstate->transValue));
}
}
MemoryContextSwitchTo(oldContext);
@ -568,7 +598,9 @@ finalize_windowaggregate(WindowAggState *winstate,
numFinalArgs,
perfuncstate->winCollation,
(void *) winstate, NULL);
fcinfo.arg[0] = peraggstate->transValue;
fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
peraggstate->transValueIsNull,
peraggstate->transtypeLen);
fcinfo.argnull[0] = peraggstate->transValueIsNull;
anynull = peraggstate->transValueIsNull;
@ -596,6 +628,7 @@ finalize_windowaggregate(WindowAggState *winstate,
}
else
{
/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
*result = peraggstate->transValue;
*isnull = peraggstate->transValueIsNull;
}

View File

@ -647,6 +647,16 @@ get_agg_clause_costs_walker(Node *node, get_agg_clause_costs_context *context)
/* Use average width if aggregate definition gave one */
if (aggtransspace > 0)
avgwidth = aggtransspace;
else if (aggtransfn == F_ARRAY_APPEND)
{
/*
* If the transition function is array_append(), it'll use an
* expanded array as transvalue, which will occupy at least
* ALLOCSET_SMALL_INITSIZE and possibly more. Use that as the
* estimate for lack of a better idea.
*/
avgwidth = ALLOCSET_SMALL_INITSIZE;
}
else
{
/*

View File

@ -32,6 +32,10 @@ static Datum array_position_common(FunctionCallInfo fcinfo);
* Caution: if the input is a read/write pointer, this returns the input
* argument; so callers must be sure that their changes are "safe", that is
* they cannot leave the array in a corrupt state.
*
* If we're being called as an aggregate function, make sure any newly-made
* expanded array is allocated in the aggregate state context, so as to save
* copying operations.
*/
static ExpandedArrayHeader *
fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
@ -39,6 +43,7 @@ fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
ExpandedArrayHeader *eah;
Oid element_type;
ArrayMetaState *my_extra;
MemoryContext resultcxt;
/* If first time through, create datatype cache struct */
my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
@ -51,10 +56,17 @@ fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
fcinfo->flinfo->fn_extra = my_extra;
}
/* Figure out which context we want the result in */
if (!AggCheckCallContext(fcinfo, &resultcxt))
resultcxt = CurrentMemoryContext;
/* Now collect the array value */
if (!PG_ARGISNULL(argno))
{
MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
MemoryContextSwitchTo(oldcxt);
}
else
{
@ -72,7 +84,7 @@ fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
errmsg("input data type is not an array")));
eah = construct_empty_expanded_array(element_type,
CurrentMemoryContext,
resultcxt,
my_extra);
}