diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 8a6dfd64e8..82ed5b3e1c 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -254,6 +254,11 @@ typedef struct AggStatePerTransData */ Aggref *aggref; + /* + * Is this state value actually being shared by more than one Aggref? + */ + bool aggshared; + /* * Nominal number of arguments for aggregate function. For plain aggs, * this excludes any ORDER BY expressions. For ordered-set aggs, this @@ -3360,9 +3365,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) { /* * Existing compatible trans found, so just point the 'peragg' to - * the same per-trans struct. + * the same per-trans struct, and mark the trans state as shared. */ pertrans = &pertransstates[existing_transno]; + pertrans->aggshared = true; peragg->transno = existing_transno; } else @@ -3512,6 +3518,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans, /* Begin filling in the pertrans data */ pertrans->aggref = aggref; + pertrans->aggshared = false; pertrans->aggCollation = aggref->inputcollid; pertrans->transfn_oid = aggtransfn; pertrans->serialfn_oid = aggserialfn; @@ -4161,17 +4168,18 @@ AggGetAggref(FunctionCallInfo fcinfo) { if (fcinfo->context && IsA(fcinfo->context, AggState)) { + AggState *aggstate = (AggState *) fcinfo->context; AggStatePerAgg curperagg; AggStatePerTrans curpertrans; /* check curperagg (valid when in a final function) */ - curperagg = ((AggState *) fcinfo->context)->curperagg; + curperagg = aggstate->curperagg; if (curperagg) return curperagg->aggref; /* check curpertrans (valid when in a transition function) */ - curpertrans = ((AggState *) fcinfo->context)->curpertrans; + curpertrans = aggstate->curpertrans; if (curpertrans) return curpertrans->aggref; @@ -4201,6 +4209,44 @@ AggGetTempMemoryContext(FunctionCallInfo fcinfo) return NULL; } +/* + * AggStateIsShared - find out whether transition state is shared + * + * If the function is being called as an aggregate support function, + * return TRUE if the aggregate's transition state is shared across + * multiple aggregates, FALSE if it is not. + * + * Returns TRUE if not called as an aggregate support function. + * This is intended as a conservative answer, ie "no you'd better not + * scribble on your input". In particular, will return TRUE if the + * aggregate is being used as a window function, which is a scenario + * in which changing the transition state is a bad idea. We might + * want to refine the behavior for the window case in future. + */ +bool +AggStateIsShared(FunctionCallInfo fcinfo) +{ + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + AggState *aggstate = (AggState *) fcinfo->context; + AggStatePerAgg curperagg; + AggStatePerTrans curpertrans; + + /* check curperagg (valid when in a final function) */ + curperagg = aggstate->curperagg; + + if (curperagg) + return aggstate->pertrans[curperagg->transno].aggshared; + + /* check curpertrans (valid when in a transition function) */ + curpertrans = aggstate->curpertrans; + + if (curpertrans) + return curpertrans->aggshared; + } + return true; +} + /* * AggRegisterCallback - register a cleanup callback for an aggregate * diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c index 25905a3287..1e323d9444 100644 --- a/src/backend/utils/adt/orderedsetaggs.c +++ b/src/backend/utils/adt/orderedsetaggs.c @@ -40,14 +40,22 @@ * create just once per query because they will not change across groups. * The per-query struct and subsidiary data live in the executor's per-query * memory context, and go away implicitly at ExecutorEnd(). + * + * These structs are set up during the first call of the transition function. + * Because we allow nodeAgg.c to merge ordered-set aggregates (but not + * hypothetical aggregates) with identical inputs and transition functions, + * this info must not depend on the particular aggregate (ie, particular + * final-function), nor on the direct argument(s) of the aggregate. */ typedef struct OSAPerQueryState { - /* Aggref for this aggregate: */ + /* Representative Aggref for this aggregate: */ Aggref *aggref; /* Memory context containing this struct and other per-query data: */ MemoryContext qcontext; + /* Do we expect multiple final-function calls within one group? */ + bool rescan_needed; /* These fields are used only when accumulating tuples: */ @@ -91,6 +99,8 @@ typedef struct OSAPerGroupState Tuplesortstate *sortstate; /* Number of normal rows inserted into sortstate: */ int64 number_of_rows; + /* Have we already done tuplesort_performsort? */ + bool sort_done; } OSAPerGroupState; static void ordered_set_shutdown(Datum arg); @@ -146,6 +156,9 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) qstate->aggref = aggref; qstate->qcontext = qcontext; + /* We need to support rescans if the trans state is shared */ + qstate->rescan_needed = AggStateIsShared(fcinfo); + /* Extract the sort information */ sortlist = aggref->aggorder; numSortCols = list_length(sortlist); @@ -277,15 +290,18 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) qstate->sortOperators, qstate->sortCollations, qstate->sortNullsFirsts, - work_mem, false); + work_mem, + qstate->rescan_needed); else osastate->sortstate = tuplesort_begin_datum(qstate->sortColType, qstate->sortOperator, qstate->sortCollation, qstate->sortNullsFirst, - work_mem, false); + work_mem, + qstate->rescan_needed); osastate->number_of_rows = 0; + osastate->sort_done = false; /* Now register a shutdown callback to clean things up at end of group */ AggRegisterCallback(fcinfo, @@ -306,14 +322,12 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) * group) by ExecutorEnd. But we must take care to release any potential * non-memory resources. * - * This callback is arguably unnecessary, since we don't support use of - * ordered-set aggs in AGG_HASHED mode and there is currently no non-error - * code path in non-hashed modes wherein nodeAgg.c won't call the finalfn - * after calling the transfn one or more times. So in principle we could rely - * on the finalfn to delete the tuplestore etc. However, it's possible that - * such a code path might exist in future, and in any case it'd be - * notationally tedious and sometimes require extra data copying to ensure - * we always delete the tuplestore in the finalfn. + * In the case where we're not expecting multiple finalfn calls, we could + * arguably rely on the finalfn to clean up; but it's easier and more testable + * if we just do it the same way in either case. Note that many of the + * finalfns could *not* free the tuplesort object, at least not without extra + * data copying, because what they return is a pointer to a datum inside the + * tuplesort object. */ static void ordered_set_shutdown(Datum arg) @@ -436,8 +450,14 @@ percentile_disc_final(PG_FUNCTION_ARGS) if (osastate->number_of_rows == 0) PG_RETURN_NULL(); - /* Finish the sort */ - tuplesort_performsort(osastate->sortstate); + /* Finish the sort, or rescan if we already did */ + if (!osastate->sort_done) + { + tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; + } + else + tuplesort_rescan(osastate->sortstate); /*---------- * We need the smallest K such that (K/N) >= percentile. @@ -457,13 +477,6 @@ percentile_disc_final(PG_FUNCTION_ARGS) if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL)) elog(ERROR, "missing row in percentile_disc"); - /* - * Note: we *cannot* clean up the tuplesort object here, because the value - * to be returned is allocated inside its sortcontext. We could use - * datumCopy to copy it out of there, but it doesn't seem worth the - * trouble, since the cleanup callback will clear the tuplesort later. - */ - /* We shouldn't have stored any nulls, but do the right thing anyway */ if (isnull) PG_RETURN_NULL(); @@ -543,8 +556,14 @@ percentile_cont_final_common(FunctionCallInfo fcinfo, Assert(expect_type == osastate->qstate->sortColType); - /* Finish the sort */ - tuplesort_performsort(osastate->sortstate); + /* Finish the sort, or rescan if we already did */ + if (!osastate->sort_done) + { + tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; + } + else + tuplesort_rescan(osastate->sortstate); first_row = floor(percentile * (osastate->number_of_rows - 1)); second_row = ceil(percentile * (osastate->number_of_rows - 1)); @@ -575,13 +594,6 @@ percentile_cont_final_common(FunctionCallInfo fcinfo, val = lerpfunc(first_val, second_val, proportion); } - /* - * Note: we *cannot* clean up the tuplesort object here, because the value - * to be returned may be allocated inside its sortcontext. We could use - * datumCopy to copy it out of there, but it doesn't seem worth the - * trouble, since the cleanup callback will clear the tuplesort later. - */ - PG_RETURN_DATUM(val); } @@ -779,8 +791,14 @@ percentile_disc_multi_final(PG_FUNCTION_ARGS) */ if (i < num_percentiles) { - /* Finish the sort */ - tuplesort_performsort(osastate->sortstate); + /* Finish the sort, or rescan if we already did */ + if (!osastate->sort_done) + { + tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; + } + else + tuplesort_rescan(osastate->sortstate); for (; i < num_percentiles; i++) { @@ -804,11 +822,6 @@ percentile_disc_multi_final(PG_FUNCTION_ARGS) } } - /* - * We could clean up the tuplesort object after forming the array, but - * probably not worth the trouble. - */ - /* We make the output array the same shape as the input */ PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull, ARR_NDIM(param), @@ -902,8 +915,14 @@ percentile_cont_multi_final_common(FunctionCallInfo fcinfo, */ if (i < num_percentiles) { - /* Finish the sort */ - tuplesort_performsort(osastate->sortstate); + /* Finish the sort, or rescan if we already did */ + if (!osastate->sort_done) + { + tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; + } + else + tuplesort_rescan(osastate->sortstate); for (; i < num_percentiles; i++) { @@ -962,11 +981,6 @@ percentile_cont_multi_final_common(FunctionCallInfo fcinfo, } } - /* - * We could clean up the tuplesort object after forming the array, but - * probably not worth the trouble. - */ - /* We make the output array the same shape as the input */ PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull, ARR_NDIM(param), @@ -1043,8 +1057,14 @@ mode_final(PG_FUNCTION_ARGS) shouldfree = !(osastate->qstate->typByVal); - /* Finish the sort */ - tuplesort_performsort(osastate->sortstate); + /* Finish the sort, or rescan if we already did */ + if (!osastate->sort_done) + { + tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; + } + else + tuplesort_rescan(osastate->sortstate); /* Scan tuples and count frequencies */ while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val)) @@ -1097,13 +1117,6 @@ mode_final(PG_FUNCTION_ARGS) if (shouldfree && !last_val_is_mode) pfree(DatumGetPointer(last_val)); - /* - * Note: we *cannot* clean up the tuplesort object here, because the value - * to be returned is allocated inside its sortcontext. We could use - * datumCopy to copy it out of there, but it doesn't seem worth the - * trouble, since the cleanup callback will clear the tuplesort later. - */ - if (mode_freq) PG_RETURN_DATUM(mode_val); else @@ -1174,6 +1187,9 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag, hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc); + /* because we need a hypothetical row, we can't share transition state */ + Assert(!osastate->sort_done); + /* insert the hypothetical row into the sort */ slot = osastate->qstate->tupslot; ExecClearTuple(slot); @@ -1190,6 +1206,7 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag, /* finish the sort */ tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; /* iterate till we find the hypothetical row */ while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot, NULL)) @@ -1207,10 +1224,6 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag, ExecClearTuple(slot); - /* Might as well clean up the tuplesort object immediately */ - tuplesort_end(osastate->sortstate); - osastate->sortstate = NULL; - return rank; } @@ -1329,6 +1342,9 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS) /* Get short-term context we can use for execTuplesMatch */ tmpcontext = AggGetTempMemoryContext(fcinfo); + /* because we need a hypothetical row, we can't share transition state */ + Assert(!osastate->sort_done); + /* insert the hypothetical row into the sort */ slot = osastate->qstate->tupslot; ExecClearTuple(slot); @@ -1345,6 +1361,7 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS) /* finish the sort */ tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; /* * We alternate fetching into tupslot and extraslot so that we have the @@ -1391,10 +1408,6 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS) ExecDropSingleTupleTableSlot(extraslot); - /* Might as well clean up the tuplesort object immediately */ - tuplesort_end(osastate->sortstate); - osastate->sortstate = NULL; - rank = rank - duplicate_count; PG_RETURN_INT64(rank); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 7c1756ae08..9a7f5b25a3 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201710141 +#define CATALOG_VERSION_NO 201710161 #endif diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 5769f6430a..13f1bce5af 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -318,13 +318,13 @@ DATA(insert ( 3267 n 0 jsonb_agg_transfn jsonb_agg_finalfn - - - - - - DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* ordered-set and hypothetical-set aggregates */ -DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); diff --git a/src/include/fmgr.h b/src/include/fmgr.h index b604a5c162..a68ec91c68 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -698,6 +698,7 @@ extern int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext); extern fmAggrefPtr AggGetAggref(FunctionCallInfo fcinfo); extern MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo); +extern bool AggStateIsShared(FunctionCallInfo fcinfo); extern void AggRegisterCallback(FunctionCallInfo fcinfo, fmExprContextCallbackFunction func, Datum arg); diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index c4ea86ff05..3408cf3333 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -1866,7 +1866,7 @@ NOTICE: avg_transfn called with 3 2 | 6 (1 row) --- ideally these would share state, but we have to fix the OSAs first. +-- exercise cases where OSAs share state select percentile_cont(0.5) within group (order by a), percentile_disc(0.5) within group (order by a) @@ -1876,6 +1876,16 @@ from (values(1::float8),(3),(5),(7)) t(a); 4 | 3 (1 row) +select + percentile_cont(0.25) within group (order by a), + percentile_disc(0.5) within group (order by a) +from (values(1::float8),(3),(5),(7)) t(a); + percentile_cont | percentile_disc +-----------------+----------------- + 2.5 | 3 +(1 row) + +-- these can't share state currently select rank(4) within group (order by a), dense_rank(4) within group (order by a) diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index fefbef89e0..55c8528fd5 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -741,12 +741,18 @@ select my_avg(one) filter (where one > 1),my_sum(one) from (values(1),(3)) t(one -- this should not share the state due to different input columns. select my_avg(one),my_sum(two) from (values(1,2),(3,4)) t(one,two); --- ideally these would share state, but we have to fix the OSAs first. +-- exercise cases where OSAs share state select percentile_cont(0.5) within group (order by a), percentile_disc(0.5) within group (order by a) from (values(1::float8),(3),(5),(7)) t(a); +select + percentile_cont(0.25) within group (order by a), + percentile_disc(0.5) within group (order by a) +from (values(1::float8),(3),(5),(7)) t(a); + +-- these can't share state currently select rank(4) within group (order by a), dense_rank(4) within group (order by a)