Allow parallel aggregate on string_agg and array_agg

This adds combine, serial and deserial functions for the array_agg() and
string_agg() aggregate functions, thus allowing these aggregates to
partake in partial aggregations.  This allows both parallel aggregation to
take place when these aggregates are present and also allows additional
partition-wise aggregation plan shapes to include plans that require
additional aggregation once the partially aggregated results from the
partitions have been combined.

Author: David Rowley
Reviewed-by: Andres Freund, Tomas Vondra, Stephen Frost, Tom Lane
Discussion: https://postgr.es/m/CAKJS1f9sx_6GTcvd6TMuZnNtCh0VhBzhX6FZqw17TgVFH-ga_A@mail.gmail.com
This commit is contained in:
David Rowley 2023-01-23 17:35:01 +13:00
parent 5a3a95385b
commit 16fd03e956
13 changed files with 1103 additions and 30 deletions

View File

@ -19746,7 +19746,7 @@ SELECT NULLIF(value, '(none)') ...
<para>
Collects all the input values, including nulls, into an array.
</para></entry>
<entry>No</entry>
<entry>Yes</entry>
</row>
<row>
@ -19759,7 +19759,7 @@ SELECT NULLIF(value, '(none)') ...
dimension. (The inputs must all have the same dimensionality, and
cannot be empty or null.)
</para></entry>
<entry>No</entry>
<entry>Yes</entry>
</row>
<row>
@ -20099,7 +20099,7 @@ SELECT NULLIF(value, '(none)') ...
after the first is preceded by the
corresponding <parameter>delimiter</parameter> (if it's not null).
</para></entry>
<entry>No</entry>
<entry>Yes</entry>
</row>
<row>

View File

@ -305,10 +305,30 @@ preprocess_aggref(Aggref *aggref, PlannerInfo *root)
* functions; if not, we can't serialize partial-aggregation
* results.
*/
else if (transinfo->aggtranstype == INTERNALOID &&
(!OidIsValid(transinfo->serialfn_oid) ||
!OidIsValid(transinfo->deserialfn_oid)))
root->hasNonSerialAggs = true;
else if (transinfo->aggtranstype == INTERNALOID)
{
if (!OidIsValid(transinfo->serialfn_oid) ||
!OidIsValid(transinfo->deserialfn_oid))
root->hasNonSerialAggs = true;
/*
* array_agg_serialize and array_agg_deserialize make use
* of the aggregate non-byval input type's send and
* receive functions. There's a chance that the type
* being aggregated has one or both of these functions
* missing. In this case we must not allow the
* aggregate's serial and deserial functions to be used.
* It would be nice not to have special case this and
* instead provide some sort of supporting function within
* the aggregate to do this, but for now, that seems like
* overkill for this one case.
*/
if ((transinfo->serialfn_oid == F_ARRAY_AGG_SERIALIZE ||
transinfo->deserialfn_oid == F_ARRAY_AGG_DESERIALIZE) &&
!agg_args_support_sendreceive(aggref))
root->hasNonSerialAggs = true;
}
}
}
agginfo->transno = transno;

View File

@ -14,6 +14,7 @@
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_type.h"
@ -28,7 +29,7 @@
#include "rewrite/rewriteManip.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
typedef struct
{
@ -1947,6 +1948,40 @@ resolve_aggregate_transtype(Oid aggfuncid,
return aggtranstype;
}
/*
* agg_args_support_sendreceive
* Returns true if all non-byval of aggref's arg types have send and
* receive functions.
*/
bool
agg_args_support_sendreceive(Aggref *aggref)
{
ListCell *lc;
foreach(lc, aggref->args)
{
HeapTuple typeTuple;
Form_pg_type pt;
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Oid type = exprType((Node *) tle->expr);
typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
if (!HeapTupleIsValid(typeTuple))
elog(ERROR, "cache lookup failed for type %u", type);
pt = (Form_pg_type) GETSTRUCT(typeTuple);
if (!pt->typbyval &&
(!OidIsValid(pt->typsend) || !OidIsValid(pt->typreceive)))
{
ReleaseSysCache(typeTuple);
return false;
}
ReleaseSysCache(typeTuple);
}
return true;
}
/*
* Create an expression tree for the transition function of an aggregate.
* This is needed so that polymorphic functions can be used within an

View File

@ -13,12 +13,33 @@
#include "postgres.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "common/int.h"
#include "port/pg_bitutils.h"
#include "utils/array.h"
#include "utils/datum.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
/*
* SerialIOData
* Used for caching element-type data in array_agg_serialize
*/
typedef struct SerialIOData
{
FmgrInfo typsend;
} SerialIOData;
/*
* DeserialIOData
* Used for caching element-type data in array_agg_deserialize
*/
typedef struct DeserialIOData
{
FmgrInfo typreceive;
Oid typioparam;
} DeserialIOData;
static Datum array_position_common(FunctionCallInfo fcinfo);
@ -499,6 +520,316 @@ array_agg_transfn(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
Datum
array_agg_combine(PG_FUNCTION_ARGS)
{
ArrayBuildState *state1;
ArrayBuildState *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1);
if (state2 == NULL)
{
/*
* NULL state2 is easy, just return state1, which we know is already
* in the agg_context
*/
if (state1 == NULL)
PG_RETURN_NULL();
PG_RETURN_POINTER(state1);
}
if (state1 == NULL)
{
/* We must copy state2's data into the agg_context */
state1 = initArrayResultWithSize(state2->element_type, agg_context,
false, state2->alen);
old_context = MemoryContextSwitchTo(agg_context);
for (int i = 0; i < state2->nelems; i++)
{
if (!state2->dnulls[i])
state1->dvalues[i] = datumCopy(state2->dvalues[i],
state1->typbyval,
state1->typlen);
else
state1->dvalues[i] = (Datum) 0;
}
MemoryContextSwitchTo(old_context);
memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems);
state1->nelems = state2->nelems;
PG_RETURN_POINTER(state1);
}
else if (state2->nelems > 0)
{
/* We only need to combine the two states if state2 has any elements */
int reqsize = state1->nelems + state2->nelems;
MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext);
Assert(state1->element_type == state2->element_type);
/* Enlarge state1 arrays if needed */
if (state1->alen < reqsize)
{
/* Use a power of 2 size rather than allocating just reqsize */
state1->alen = pg_nextpower2_32(reqsize);
state1->dvalues = (Datum *) repalloc(state1->dvalues,
state1->alen * sizeof(Datum));
state1->dnulls = (bool *) repalloc(state1->dnulls,
state1->alen * sizeof(bool));
}
/* Copy in the state2 elements to the end of the state1 arrays */
for (int i = 0; i < state2->nelems; i++)
{
if (!state2->dnulls[i])
state1->dvalues[i + state1->nelems] =
datumCopy(state2->dvalues[i],
state1->typbyval,
state1->typlen);
else
state1->dvalues[i + state1->nelems] = (Datum) 0;
}
memcpy(&state1->dnulls[state1->nelems], state2->dnulls,
sizeof(bool) * state2->nelems);
state1->nelems = reqsize;
MemoryContextSwitchTo(oldContext);
}
PG_RETURN_POINTER(state1);
}
/*
* array_agg_serialize
* Serialize ArrayBuildState into bytea.
*/
Datum
array_agg_serialize(PG_FUNCTION_ARGS)
{
ArrayBuildState *state;
StringInfoData buf;
bytea *result;
/* cannot be called directly because of internal-type argument */
Assert(AggCheckCallContext(fcinfo, NULL));
state = (ArrayBuildState *) PG_GETARG_POINTER(0);
pq_begintypsend(&buf);
/*
* element_type. Putting this first is more convenient in deserialization
*/
pq_sendint32(&buf, state->element_type);
/*
* nelems -- send first so we know how large to make the dvalues and
* dnulls array during deserialization.
*/
pq_sendint64(&buf, state->nelems);
/* alen can be decided during deserialization */
/* typlen */
pq_sendint16(&buf, state->typlen);
/* typbyval */
pq_sendbyte(&buf, state->typbyval);
/* typalign */
pq_sendbyte(&buf, state->typalign);
/* dnulls */
pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems);
/*
* dvalues. By agreement with array_agg_deserialize, when the element
* type is byval, we just transmit the Datum array as-is, including any
* null elements. For by-ref types, we must invoke the element type's
* send function, and we skip null elements (which is why the nulls flags
* must be sent first).
*/
if (state->typbyval)
pq_sendbytes(&buf, (char *) state->dvalues,
sizeof(Datum) * state->nelems);
else
{
SerialIOData *iodata;
int i;
/* Avoid repeat catalog lookups for typsend function */
iodata = (SerialIOData *) fcinfo->flinfo->fn_extra;
if (iodata == NULL)
{
Oid typsend;
bool typisvarlena;
iodata = (SerialIOData *)
MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
sizeof(SerialIOData));
getTypeBinaryOutputInfo(state->element_type, &typsend,
&typisvarlena);
fmgr_info_cxt(typsend, &iodata->typsend,
fcinfo->flinfo->fn_mcxt);
fcinfo->flinfo->fn_extra = (void *) iodata;
}
for (i = 0; i < state->nelems; i++)
{
bytea *outputbytes;
if (state->dnulls[i])
continue;
outputbytes = SendFunctionCall(&iodata->typsend,
state->dvalues[i]);
pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
pq_sendbytes(&buf, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
Datum
array_agg_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate;
ArrayBuildState *result;
StringInfoData buf;
Oid element_type;
int64 nelems;
const char *temp;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
sstate = PG_GETARG_BYTEA_PP(0);
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard recv-function infrastructure.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf,
VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
/* element_type */
element_type = pq_getmsgint(&buf, 4);
/* nelems */
nelems = pq_getmsgint64(&buf);
/* Create output ArrayBuildState with the needed number of elements */
result = initArrayResultWithSize(element_type, CurrentMemoryContext,
false, nelems);
result->nelems = nelems;
/* typlen */
result->typlen = pq_getmsgint(&buf, 2);
/* typbyval */
result->typbyval = pq_getmsgbyte(&buf);
/* typalign */
result->typalign = pq_getmsgbyte(&buf);
/* dnulls */
temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems);
memcpy(result->dnulls, temp, sizeof(bool) * nelems);
/* dvalues --- see comment in array_agg_serialize */
if (result->typbyval)
{
temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems);
memcpy(result->dvalues, temp, sizeof(Datum) * nelems);
}
else
{
DeserialIOData *iodata;
/* Avoid repeat catalog lookups for typreceive function */
iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra;
if (iodata == NULL)
{
Oid typreceive;
iodata = (DeserialIOData *)
MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
sizeof(DeserialIOData));
getTypeBinaryInputInfo(element_type, &typreceive,
&iodata->typioparam);
fmgr_info_cxt(typreceive, &iodata->typreceive,
fcinfo->flinfo->fn_mcxt);
fcinfo->flinfo->fn_extra = (void *) iodata;
}
for (int i = 0; i < nelems; i++)
{
int itemlen;
StringInfoData elem_buf;
char csave;
if (result->dnulls[i])
{
result->dvalues[i] = (Datum) 0;
continue;
}
itemlen = pq_getmsgint(&buf, 4);
if (itemlen < 0 || itemlen > (buf.len - buf.cursor))
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("insufficient data left in message")));
/*
* Rather than copying data around, we just set up a phony
* StringInfo pointing to the correct portion of the input buffer.
* We assume we can scribble on the input buffer so as to maintain
* the convention that StringInfos have a trailing null.
*/
elem_buf.data = &buf.data[buf.cursor];
elem_buf.maxlen = itemlen + 1;
elem_buf.len = itemlen;
elem_buf.cursor = 0;
buf.cursor += itemlen;
csave = buf.data[buf.cursor];
buf.data[buf.cursor] = '\0';
/* Now call the element's receiveproc */
result->dvalues[i] = ReceiveFunctionCall(&iodata->typreceive,
&elem_buf,
iodata->typioparam,
-1);
buf.data[buf.cursor] = csave;
}
}
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
Datum
array_agg_finalfn(PG_FUNCTION_ARGS)
{
@ -578,6 +909,299 @@ array_agg_array_transfn(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
Datum
array_agg_array_combine(PG_FUNCTION_ARGS)
{
ArrayBuildStateArr *state1;
ArrayBuildStateArr *state2;
MemoryContext agg_context;
MemoryContext old_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1);
if (state2 == NULL)
{
/*
* NULL state2 is easy, just return state1, which we know is already
* in the agg_context
*/
if (state1 == NULL)
PG_RETURN_NULL();
PG_RETURN_POINTER(state1);
}
if (state1 == NULL)
{
/* We must copy state2's data into the agg_context */
old_context = MemoryContextSwitchTo(agg_context);
state1 = initArrayResultArr(state2->array_type, InvalidOid,
agg_context, false);
state1->abytes = state2->abytes;
state1->data = (char *) palloc(state1->abytes);
if (state2->nullbitmap)
{
int size = (state2->aitems + 7) / 8;
state1->nullbitmap = (bits8 *) palloc(size);
memcpy(state1->nullbitmap, state2->nullbitmap, size);
}
memcpy(state1->data, state2->data, state2->nbytes);
state1->nbytes = state2->nbytes;
state1->aitems = state2->aitems;
state1->nitems = state2->nitems;
state1->ndims = state2->ndims;
memcpy(state1->dims, state2->dims, sizeof(state2->dims));
memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs));
state1->array_type = state2->array_type;
state1->element_type = state2->element_type;
MemoryContextSwitchTo(old_context);
PG_RETURN_POINTER(state1);
}
/* We only need to combine the two states if state2 has any items */
else if (state2->nitems > 0)
{
MemoryContext oldContext;
int reqsize = state1->nbytes + state2->nbytes;
int i;
/*
* Check the states are compatible with each other. Ensure we use the
* same error messages that are listed in accumArrayResultArr so that
* the same error is shown as would have been if we'd not used the
* combine function for the aggregation.
*/
if (state1->ndims != state2->ndims)
ereport(ERROR,
(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
errmsg("cannot accumulate arrays of different dimensionality")));
/* Check dimensions match ignoring the first dimension. */
for (i = 1; i < state1->ndims; i++)
{
if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i])
ereport(ERROR,
(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
errmsg("cannot accumulate arrays of different dimensionality")));
}
oldContext = MemoryContextSwitchTo(state1->mcontext);
/*
* If there's not enough space in state1 then we'll need to reallocate
* more.
*/
if (state1->abytes < reqsize)
{
/* use a power of 2 size rather than allocating just reqsize */
state1->abytes = pg_nextpower2_32(reqsize);
state1->data = (char *) repalloc(state1->data, state1->abytes);
}
if (state2->nullbitmap)
{
int newnitems = state1->nitems + state2->nitems;
if (state1->nullbitmap == NULL)
{
/*
* First input with nulls; we must retrospectively handle any
* previous inputs by marking all their items non-null.
*/
state1->aitems = pg_nextpower2_32(Max(256, newnitems + 1));
state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8);
array_bitmap_copy(state1->nullbitmap, 0,
NULL, 0,
state1->nitems);
}
else if (newnitems > state1->aitems)
{
int newaitems = state1->aitems + state2->aitems;
state1->aitems = pg_nextpower2_32(newaitems);
state1->nullbitmap = (bits8 *)
repalloc(state1->nullbitmap, (state1->aitems + 7) / 8);
}
array_bitmap_copy(state1->nullbitmap, state1->nitems,
state2->nullbitmap, 0,
state2->nitems);
}
memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes);
state1->nbytes += state2->nbytes;
state1->nitems += state2->nitems;
state1->dims[0] += state2->dims[0];
/* remaing dims already match, per test above */
Assert(state1->array_type == state2->array_type);
Assert(state1->element_type == state2->element_type);
MemoryContextSwitchTo(oldContext);
}
PG_RETURN_POINTER(state1);
}
/*
* array_agg_array_serialize
* Serialize ArrayBuildStateArr into bytea.
*/
Datum
array_agg_array_serialize(PG_FUNCTION_ARGS)
{
ArrayBuildStateArr *state;
StringInfoData buf;
bytea *result;
/* cannot be called directly because of internal-type argument */
Assert(AggCheckCallContext(fcinfo, NULL));
state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
pq_begintypsend(&buf);
/*
* element_type. Putting this first is more convenient in deserialization
* so that we can init the new state sooner.
*/
pq_sendint32(&buf, state->element_type);
/* array_type */
pq_sendint32(&buf, state->array_type);
/* nbytes */
pq_sendint32(&buf, state->nbytes);
/* data */
pq_sendbytes(&buf, state->data, state->nbytes);
/* abytes */
pq_sendint32(&buf, state->abytes);
/* aitems */
pq_sendint32(&buf, state->aitems);
/* nullbitmap */
if (state->nullbitmap)
{
Assert(state->aitems > 0);
pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8);
}
/* nitems */
pq_sendint32(&buf, state->nitems);
/* ndims */
pq_sendint32(&buf, state->ndims);
/* dims: XXX should we just send ndims elements? */
pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims));
/* lbs */
pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs));
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
Datum
array_agg_array_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate;
ArrayBuildStateArr *result;
StringInfoData buf;
Oid element_type;
Oid array_type;
int nbytes;
const char *temp;
/* cannot be called directly because of internal-type argument */
Assert(AggCheckCallContext(fcinfo, NULL));
sstate = PG_GETARG_BYTEA_PP(0);
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard recv-function infrastructure.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf,
VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
/* element_type */
element_type = pq_getmsgint(&buf, 4);
/* array_type */
array_type = pq_getmsgint(&buf, 4);
/* nbytes */
nbytes = pq_getmsgint(&buf, 4);
result = initArrayResultArr(array_type, element_type,
CurrentMemoryContext, false);
result->abytes = 1024;
while (result->abytes < nbytes)
result->abytes *= 2;
result->data = (char *) palloc(result->abytes);
/* data */
temp = pq_getmsgbytes(&buf, nbytes);
memcpy(result->data, temp, nbytes);
result->nbytes = nbytes;
/* abytes */
result->abytes = pq_getmsgint(&buf, 4);
/* aitems: might be 0 */
result->aitems = pq_getmsgint(&buf, 4);
/* nullbitmap */
if (result->aitems > 0)
{
int size = (result->aitems + 7) / 8;
result->nullbitmap = (bits8 *) palloc(size);
temp = pq_getmsgbytes(&buf, size);
memcpy(result->nullbitmap, temp, size);
}
else
result->nullbitmap = NULL;
/* nitems */
result->nitems = pq_getmsgint(&buf, 4);
/* ndims */
result->ndims = pq_getmsgint(&buf, 4);
/* dims */
temp = pq_getmsgbytes(&buf, sizeof(result->dims));
memcpy(result->dims, temp, sizeof(result->dims));
/* lbs */
temp = pq_getmsgbytes(&buf, sizeof(result->lbs));
memcpy(result->lbs, temp, sizeof(result->lbs));
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
Datum
array_agg_array_finalfn(PG_FUNCTION_ARGS)
{

View File

@ -5262,6 +5262,24 @@ array_insert_slice(ArrayType *destArray,
*/
ArrayBuildState *
initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
{
/*
* When using a subcontext, we can afford to start with a somewhat larger
* initial array size. Without subcontexts, we'd better hope that most of
* the states stay small ...
*/
return initArrayResultWithSize(element_type, rcontext, subcontext,
subcontext ? 64 : 8);
}
/*
* initArrayResultWithSize
* As initArrayResult, but allow the initial size of the allocated arrays
* to be specified.
*/
ArrayBuildState *
initArrayResultWithSize(Oid element_type, MemoryContext rcontext,
bool subcontext, int initsize)
{
ArrayBuildState *astate;
MemoryContext arr_context = rcontext;
@ -5276,7 +5294,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
MemoryContextAlloc(arr_context, sizeof(ArrayBuildState));
astate->mcontext = arr_context;
astate->private_cxt = subcontext;
astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */
astate->alen = initsize;
astate->dvalues = (Datum *)
MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum));
astate->dnulls = (bool *)

View File

@ -506,29 +506,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
/* Append the value unless null. */
/* Append the value unless null, preceding it with the delimiter. */
if (!PG_ARGISNULL(1))
{
bytea *value = PG_GETARG_BYTEA_PP(1);
bool isfirst = false;
/* On the first time through, we ignore the delimiter. */
/*
* You might think we can just throw away the first delimiter, however
* we must keep it as we may be a parallel worker doing partial
* aggregation building a state to send to the main process. We need
* to keep the delimiter of every aggregation so that the combine
* function can properly join up the strings of two separately
* partially aggregated results. The first delimiter is only stripped
* off in the final function. To know how much to strip off the front
* of the string, we store the length of the first delimiter in the
* StringInfo's cursor field, which we don't otherwise need here.
*/
if (state == NULL)
{
state = makeStringAggState(fcinfo);
else if (!PG_ARGISNULL(2))
isfirst = true;
}
if (!PG_ARGISNULL(2))
{
bytea *delim = PG_GETARG_BYTEA_PP(2);
appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim));
appendBinaryStringInfo(state, VARDATA_ANY(delim),
VARSIZE_ANY_EXHDR(delim));
if (isfirst)
state->cursor = VARSIZE_ANY_EXHDR(delim);
}
appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value));
appendBinaryStringInfo(state, VARDATA_ANY(value),
VARSIZE_ANY_EXHDR(value));
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
PG_RETURN_POINTER(state);
if (state)
PG_RETURN_POINTER(state);
PG_RETURN_NULL();
}
Datum
@ -543,11 +564,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS)
if (state != NULL)
{
/* As per comment in transfn, strip data before the cursor position */
bytea *result;
int strippedlen = state->len - state->cursor;
result = (bytea *) palloc(state->len + VARHDRSZ);
SET_VARSIZE(result, state->len + VARHDRSZ);
memcpy(VARDATA(result), state->data, state->len);
result = (bytea *) palloc(strippedlen + VARHDRSZ);
SET_VARSIZE(result, strippedlen + VARHDRSZ);
memcpy(VARDATA(result), &state->data[state->cursor], strippedlen);
PG_RETURN_BYTEA_P(result);
}
else
@ -5372,23 +5395,171 @@ string_agg_transfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
/* Append the value unless null. */
/* Append the value unless null, preceding it with the delimiter. */
if (!PG_ARGISNULL(1))
{
/* On the first time through, we ignore the delimiter. */
if (state == NULL)
state = makeStringAggState(fcinfo);
else if (!PG_ARGISNULL(2))
appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */
text *value = PG_GETARG_TEXT_PP(1);
bool isfirst = false;
appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */
/*
* You might think we can just throw away the first delimiter, however
* we must keep it as we may be a parallel worker doing partial
* aggregation building a state to send to the main process. We need
* to keep the delimiter of every aggregation so that the combine
* function can properly join up the strings of two separately
* partially aggregated results. The first delimiter is only stripped
* off in the final function. To know how much to strip off the front
* of the string, we store the length of the first delimiter in the
* StringInfo's cursor field, which we don't otherwise need here.
*/
if (state == NULL)
{
state = makeStringAggState(fcinfo);
isfirst = true;
}
if (!PG_ARGISNULL(2))
{
text *delim = PG_GETARG_TEXT_PP(2);
appendStringInfoText(state, delim);
if (isfirst)
state->cursor = VARSIZE_ANY_EXHDR(delim);
}
appendStringInfoText(state, value);
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
PG_RETURN_POINTER(state);
if (state)
PG_RETURN_POINTER(state);
PG_RETURN_NULL();
}
/*
* string_agg_combine
* Aggregate combine function for string_agg(text) and string_agg(bytea)
*/
Datum
string_agg_combine(PG_FUNCTION_ARGS)
{
StringInfo state1;
StringInfo state2;
MemoryContext agg_context;
if (!AggCheckCallContext(fcinfo, &agg_context))
elog(ERROR, "aggregate function called in non-aggregate context");
state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1);
if (state2 == NULL)
{
/*
* NULL state2 is easy, just return state1, which we know is already
* in the agg_context
*/
if (state1 == NULL)
PG_RETURN_NULL();
PG_RETURN_POINTER(state1);
}
if (state1 == NULL)
{
/* We must copy state2's data into the agg_context */
MemoryContext old_context;
old_context = MemoryContextSwitchTo(agg_context);
state1 = makeStringAggState(fcinfo);
appendBinaryStringInfo(state1, state2->data, state2->len);
state1->cursor = state2->cursor;
MemoryContextSwitchTo(old_context);
}
else if (state2->len > 0)
{
/* Combine ... state1->cursor does not change in this case */
appendBinaryStringInfo(state1, state2->data, state2->len);
}
PG_RETURN_POINTER(state1);
}
/*
* string_agg_serialize
* Aggregate serialize function for string_agg(text) and string_agg(bytea)
*
* This is strict, so we need not handle NULL input
*/
Datum
string_agg_serialize(PG_FUNCTION_ARGS)
{
StringInfo state;
StringInfoData buf;
bytea *result;
/* cannot be called directly because of internal-type argument */
Assert(AggCheckCallContext(fcinfo, NULL));
state = (StringInfo) PG_GETARG_POINTER(0);
pq_begintypsend(&buf);
/* cursor */
pq_sendint(&buf, state->cursor, 4);
/* data */
pq_sendbytes(&buf, state->data, state->len);
result = pq_endtypsend(&buf);
PG_RETURN_BYTEA_P(result);
}
/*
* string_agg_deserialize
* Aggregate deserial function for string_agg(text) and string_agg(bytea)
*
* This is strict, so we need not handle NULL input
*/
Datum
string_agg_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate;
StringInfo result;
StringInfoData buf;
char *data;
int datalen;
/* cannot be called directly because of internal-type argument */
Assert(AggCheckCallContext(fcinfo, NULL));
sstate = PG_GETARG_BYTEA_PP(0);
/*
* Copy the bytea into a StringInfo so that we can "receive" it using the
* standard recv-function infrastructure.
*/
initStringInfo(&buf);
appendBinaryStringInfo(&buf,
VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
result = makeStringAggState(fcinfo);
/* cursor */
result->cursor = pq_getmsgint(&buf, 4);
/* data */
datalen = VARSIZE_ANY_EXHDR(sstate) - 4;
data = (char *) pq_getmsgbytes(&buf, datalen);
appendBinaryStringInfo(result, data, datalen);
pq_getmsgend(&buf);
pfree(buf.data);
PG_RETURN_POINTER(result);
}
Datum
@ -5402,7 +5573,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
if (state != NULL)
PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len));
{
/* As per comment in transfn, strip data before the cursor position */
PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor],
state->len - state->cursor));
}
else
PG_RETURN_NULL();
}

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202301201
#define CATALOG_VERSION_NO 202301231
#endif

View File

@ -537,19 +537,28 @@
# array
{ aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn',
aggfinalfn => 'array_agg_finalfn', aggfinalextra => 't',
aggtranstype => 'internal' },
aggcombinefn => 'array_agg_combine', aggserialfn => 'array_agg_serialize',
aggdeserialfn => 'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn',
aggfinalextra => 't', aggtranstype => 'internal' },
{ aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn',
aggcombinefn => 'array_agg_array_combine',
aggserialfn => 'array_agg_array_serialize',
aggdeserialfn => 'array_agg_array_deserialize',
aggfinalfn => 'array_agg_array_finalfn', aggfinalextra => 't',
aggtranstype => 'internal' },
# text
{ aggfnoid => 'string_agg(text,text)', aggtransfn => 'string_agg_transfn',
aggcombinefn => 'string_agg_combine', aggserialfn => 'string_agg_serialize',
aggdeserialfn => 'string_agg_deserialize',
aggfinalfn => 'string_agg_finalfn', aggtranstype => 'internal' },
# bytea
{ aggfnoid => 'string_agg(bytea,bytea)',
aggtransfn => 'bytea_string_agg_transfn',
aggcombinefn => 'string_agg_combine',
aggserialfn => 'string_agg_serialize',
aggdeserialfn => 'string_agg_deserialize',
aggfinalfn => 'bytea_string_agg_finalfn', aggtranstype => 'internal' },
# range

View File

@ -1672,6 +1672,15 @@
{ oid => '2333', descr => 'aggregate transition function',
proname => 'array_agg_transfn', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal anynonarray', prosrc => 'array_agg_transfn' },
{ oid => '9328', descr => 'aggregate combine function',
proname => 'array_agg_combine', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal internal', prosrc => 'array_agg_combine' },
{ oid => '9329', descr => 'aggregate serial function',
proname => 'array_agg_serialize', prorettype => 'bytea',
proargtypes => 'internal', prosrc => 'array_agg_serialize' },
{ oid => '9330', descr => 'aggregate deserial function',
proname => 'array_agg_deserialize', prorettype => 'internal',
proargtypes => 'bytea internal', prosrc => 'array_agg_deserialize' },
{ oid => '2334', descr => 'aggregate final function',
proname => 'array_agg_finalfn', proisstrict => 'f', prorettype => 'anyarray',
proargtypes => 'internal anynonarray', prosrc => 'array_agg_finalfn' },
@ -1683,6 +1692,15 @@
proname => 'array_agg_array_transfn', proisstrict => 'f',
prorettype => 'internal', proargtypes => 'internal anyarray',
prosrc => 'array_agg_array_transfn' },
{ oid => '9331', descr => 'aggregate combine function',
proname => 'array_agg_array_combine', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal internal', prosrc => 'array_agg_array_combine' },
{ oid => '9332', descr => 'aggregate serial function',
proname => 'array_agg_array_serialize', prorettype => 'bytea',
proargtypes => 'internal', prosrc => 'array_agg_array_serialize' },
{ oid => '9333', descr => 'aggregate deserial function',
proname => 'array_agg_array_deserialize', prorettype => 'internal',
proargtypes => 'bytea internal', prosrc => 'array_agg_array_deserialize' },
{ oid => '4052', descr => 'aggregate final function',
proname => 'array_agg_array_finalfn', proisstrict => 'f',
prorettype => 'anyarray', proargtypes => 'internal anyarray',
@ -4955,6 +4973,15 @@
{ oid => '3535', descr => 'aggregate transition function',
proname => 'string_agg_transfn', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal text text', prosrc => 'string_agg_transfn' },
{ oid => '9334', descr => 'aggregate combine function',
proname => 'string_agg_combine', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal internal', prosrc => 'string_agg_combine' },
{ oid => '9335', descr => 'aggregate serial function',
proname => 'string_agg_serialize', prorettype => 'bytea',
proargtypes => 'internal', prosrc => 'string_agg_serialize' },
{ oid => '9336', descr => 'aggregate deserial function',
proname => 'string_agg_deserialize', prorettype => 'internal',
proargtypes => 'bytea internal', prosrc => 'string_agg_deserialize' },
{ oid => '3536', descr => 'aggregate final function',
proname => 'string_agg_finalfn', proisstrict => 'f', prorettype => 'text',
proargtypes => 'internal', prosrc => 'string_agg_finalfn' },

View File

@ -35,6 +35,8 @@ extern Oid resolve_aggregate_transtype(Oid aggfuncid,
Oid *inputTypes,
int numArguments);
extern bool agg_args_support_sendreceive(Aggref *aggref);
extern void build_aggregate_transfn_expr(Oid *agg_input_types,
int agg_num_inputs,
int agg_num_direct_inputs,

View File

@ -409,6 +409,9 @@ extern bool array_contains_nulls(ArrayType *array);
extern ArrayBuildState *initArrayResult(Oid element_type,
MemoryContext rcontext, bool subcontext);
extern ArrayBuildState *initArrayResultWithSize(Oid element_type,
MemoryContext rcontext,
bool subcontext, int initsize);
extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate,
Datum dvalue, bool disnull,
Oid element_type,

View File

@ -1862,6 +1862,104 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table;
(1 row)
drop table bytea_test_table;
-- Test parallel string_agg and array_agg
create table pagg_test (x int, y int);
insert into pagg_test
select (case x % 4 when 1 then null else x end), x % 10
from generate_series(1,5000) x;
set parallel_setup_cost TO 0;
set parallel_tuple_cost TO 0;
set parallel_leader_participation TO 0;
set min_parallel_table_scan_size = 0;
set bytea_output = 'escape';
-- create a view as we otherwise have to repeat this query a few times.
create view v_pagg_test AS
select
y,
min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
from (
select
y,
unnest(regexp_split_to_array(a1.t, ','))::int AS t,
unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
unnest(a1.a) AS a,
unnest(a1.aa) AS aa
from (
select
y,
string_agg(x::text, ',') AS t,
string_agg(x::text::bytea, ',') AS b,
array_agg(x) AS a,
array_agg(ARRAY[x]) AS aa
from pagg_test
group by y
) a1
) a2
group by y;
-- Ensure results are correct.
select * from v_pagg_test order by y;
y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct
---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500
1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250
2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500
3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250
4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500
5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250
6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500
7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250
8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500
9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250
(10 rows)
-- Ensure parallel aggregation is actually being used.
explain (costs off) select * from v_pagg_test order by y;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Group Key: pagg_test.y
-> Sort
Sort Key: pagg_test.y, (((unnest(regexp_split_to_array((string_agg((pagg_test.x)::text, ','::text)), ','::text))))::integer)
-> Result
-> ProjectSet
-> Finalize HashAggregate
Group Key: pagg_test.y
-> Gather
Workers Planned: 2
-> Partial HashAggregate
Group Key: pagg_test.y
-> Parallel Seq Scan on pagg_test
(13 rows)
set max_parallel_workers_per_gather = 0;
-- Ensure results are the same without parallel aggregation.
select * from v_pagg_test order by y;
y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct
---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500
1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250
2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500
3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250
4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500
5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250
6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500
7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250
8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500
9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250
(10 rows)
-- Clean up
reset max_parallel_workers_per_gather;
reset bytea_output;
reset min_parallel_table_scan_size;
reset parallel_leader_participation;
reset parallel_tuple_cost;
reset parallel_setup_cost;
drop view v_pagg_test;
drop table pagg_test;
-- FILTER tests
select min(unique1) filter (where unique1 > 100) from tenk1;
min

View File

@ -717,6 +717,68 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table;
drop table bytea_test_table;
-- Test parallel string_agg and array_agg
create table pagg_test (x int, y int);
insert into pagg_test
select (case x % 4 when 1 then null else x end), x % 10
from generate_series(1,5000) x;
set parallel_setup_cost TO 0;
set parallel_tuple_cost TO 0;
set parallel_leader_participation TO 0;
set min_parallel_table_scan_size = 0;
set bytea_output = 'escape';
-- create a view as we otherwise have to repeat this query a few times.
create view v_pagg_test AS
select
y,
min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
from (
select
y,
unnest(regexp_split_to_array(a1.t, ','))::int AS t,
unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
unnest(a1.a) AS a,
unnest(a1.aa) AS aa
from (
select
y,
string_agg(x::text, ',') AS t,
string_agg(x::text::bytea, ',') AS b,
array_agg(x) AS a,
array_agg(ARRAY[x]) AS aa
from pagg_test
group by y
) a1
) a2
group by y;
-- Ensure results are correct.
select * from v_pagg_test order by y;
-- Ensure parallel aggregation is actually being used.
explain (costs off) select * from v_pagg_test order by y;
set max_parallel_workers_per_gather = 0;
-- Ensure results are the same without parallel aggregation.
select * from v_pagg_test order by y;
-- Clean up
reset max_parallel_workers_per_gather;
reset bytea_output;
reset min_parallel_table_scan_size;
reset parallel_leader_participation;
reset parallel_tuple_cost;
reset parallel_setup_cost;
drop view v_pagg_test;
drop table pagg_test;
-- FILTER tests
select min(unique1) filter (where unique1 > 100) from tenk1;